Spring Boot로 Kafka 메시지 처리(Producer/Consumer 예제)
Spring Boot와 Kafka를 연동해 Producer와 Consumer를 구현하는 방법을 단계별 예제로 설명한다. 프로젝트 설정과 주요 코드 예제를 포함한 개발 자료
목차
소개
이 글은 Spring Boot를 사용해 Kafka로 메시지를 주고받는 과정을 쉽게 이해하도록 정리한 내용이다. 초보자도 따라올 수 있도록 설정부터 간단한 Producer와 Consumer 구현까지 예제를 중심으로 설명한다. 핵심 키워드는 spring boot kafka 예제, spring kafka producer consumer, kafka spring boot 설정 이다.
환경과 의존성
기본 환경은 Java 11 이상, Spring Boot 2.7.x 또는 3.x, 로컬 Kafka(zookeeper 포함 또는 Confluent 등)이다. Maven 기준 주요 의존성은 다음과 같다.
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
설정
application.yml에서 Kafka 접속과 직렬화 설정을 정의한다. 로컬 테스트용으로 간단히 설정한 예는 다음과 같다.
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: demo-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
Kafka Configuration
공통 설정을 Java 설정으로 작성하면 유연하게 확장할 수 있다. ProducerFactory와 KafkaTemplate, ConcurrentKafkaListenerContainerFactory를 등록한다.
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
// Map<String, Object> props = new HashMap<>(); // 설정 추가 가능
return new DefaultKafkaProducerFactory<>(/* props */);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
}
Producer 구현
KafkaTemplate을 주입받아 메시지를 전송한다. 전송 결과는 ListenableFuture로 확인 가능하다.
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class SimpleProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public SimpleProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message).addCallback(
success -> System.out.println("전송 성공: " + message),
failure -> System.err.println("전송 실패: " + failure.getMessage())
);
}
}
Consumer 구현
@KafkaListener를 사용하면 특정 토픽을 쉽게 구독할 수 있다. 메시지 처리는 메소드 내부에서 구현한다.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class SimpleConsumer {
@KafkaListener(topics = "demo-topic", groupId = "demo-group")
public void listen(String message) {
System.out.println("수신된 메시지: " + message);
// 비즈니스 로직 처리
}
}
실행 및 확인
- 로컬 Kafka가 실행되어 있어야 한다.
- 토픽은 필요시 미리 생성하거나 자동 생성 기능에 의존한다.
- 애플리케이션을 실행한 뒤 Producer를 호출해 메시지를 보낸다.
- Consumer 로그에서 메시지 수신 여부를 확인한다.
주의사항과 팁
- 데이터 직렬화 형식(String, JSON 등)을 환경에 맞게 통일한다.
- 운영 환경에서는 bootstrap-servers와 보안 설정을 별도 관리한다.
- Consumer는 재처리 전략과 예외 핸들링을 설계해야 한다.
마무리
여기까지 spring boot kafka 예제 중심으로 spring kafka producer consumer 구성과 kafka spring boot 설정 방법을 살펴봤다. 예제를 기반으로 메시지 포맷, 에러 처리, 모니터링을 추가하면 운영 준비가 된다.