728x90
반응형
Kafka와 연결하기 위해 spring-kafka를 추가해야 합니다.
1) Gradle (build.gradle.kts)
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.kafka:spring-kafka")
implementation("org.springframework.boot:spring-boot-starter-data-jpa")
}
2) application.yml 파일
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: product-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
⚠ auto-offset-reset: earliest 설정
- earliest: 가장 처음부터 메시지를 소비
- latest: 가장 마지막 메시지부터 소비
. Kafka 토픽 생성
Kafka는 메시지를 전송하는 **토픽(Topic)**을 사용합니다. 먼저, 토픽을 생성해야 합니다.
4.1. 토픽 자동 생성 설정
Spring Boot에서 자동으로 Kafka 토픽을 생성할 수 있도록 설정합니다.
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic createTopic() {
return new NewTopic("my-topic", 3, (short) 1); // 토픽명, 파티션 개수, 복제 개수
}
}
5. Kafka Producer (메시지 전송)
Kafka에 메시지를 전송하는 프로듀서를 구현합니다.
5.1. Producer Service 구현
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
kafkaTemplate.send("my-topic", message);
System.out.println("Sent message: " + message);
}
}
6. Kafka Consumer (메시지 소비)
Kafka에서 메시지를 받아 처리하는 컨슈머를 구현합니다.
6.1. Consumer Listener 설정
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "my-topic", groupId = "my-consumer-group")
public void consumeMessage(String message) {
System.out.println("Received message: " + message);
}
}
@KafkaListener 어노테이션
- topics = "my-topic": 특정 토픽을 구독
- groupId = "my-consumer-group": 특정 컨슈머 그룹에 속함
- auto-offset-reset=earliest 설정이면 저장된 메시지부터 읽음
7. REST API를 통한 Kafka 메시지 전송
Kafka 메시지를 HTTP API를 통해 전송할 수 있도록 컨트롤러를 만듭니다.
7.1. KafkaController 구현
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/kafka")
public class KafkaController {
private final KafkaProducerService kafkaProducerService;
public KafkaController(KafkaProducerService kafkaProducerService) {
this.kafkaProducerService = kafkaProducerService;
}
@PostMapping("/send")
public String sendMessage(@RequestParam String message) {
kafkaProducerService.sendMessage(message);
return "Message sent: " + message;
}
}
728x90
반응형
'Backend' 카테고리의 다른 글
[Backend] Kafka 명령어 (0) | 2025.03.15 |
---|---|
[Backend] kafka 알아보기 (0) | 2025.03.13 |
[Backend] Kafka 설정하기 - Docker container 만들기 (0) | 2025.03.13 |
댓글