1. @KafkaListener Annotation
- Bean 메서드를 Kafka 리스너로 지정할 때 사용
- Listener Container 안에서 실행되며, 내부적으로 MessagingMessageListenerAdapter로 감싸집니다
- 메서드 파라미터에 맞게 Converter 등을 설정해줌
- 대부분의 속성들은 SpEL(#{...}) 또는 **프로퍼티 플레이스홀더(${...})**를 이용해 동적으로 지정할 수 있음
Record Listeners
- @KafkaListener를 붙이면 적용됨
- Listener Container Factory 필요 (이름이 kafkaListenerContainerFactory인 빈을 찾음)
예제
더보기
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
// 메시지 처리
}
}
Explicit Partition Assignment
- 특정 토픽의 특정 파티션만 리스닝하도록 직접 지정할 수 있습니다
예제
더보기
@KafkaListener(id = "thing2", topicPartitions = {
@TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
// 메시지 처리
}
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5,7,10-15"))
public void process(String in) {
// 0,1,2,3,4,5,7,10,11,12,13,14,15번 파티션 수신
}
@KafkaListener(id = "thing3", topicPartitions = {
@TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
// 0~5번 파티션에 대해 offset 0부터 수신
}
@KafkaListener(id = "seekPositionTime", topicPartitions = {
@TopicPartition(topic = "topic1", partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "723916800000", seekPosition = "TIMESTAMP"),
@PartitionOffset(partition = "1", initialOffset = "0", seekPosition = "BEGINNING"),
@PartitionOffset(partition = "2", initialOffset = "0", seekPosition = "END")
})
})
public void listen(ConsumerRecord<?, ?> record) {
// 파티션 0: 특정 timestamp 기준
// 파티션 1: 가장 처음부터
// 파티션 2: 가장 마지막부터
}
- END : 가장 마지막 오프셋부터 시작 (initialOffset 무시)
- BEGINNING : 가장 처음부터 시작 (initialOffset 무시)
- TIMESTAMP : 특정 timestamp에 해당하는 오프셋부터 시작 (initialOffset이 timestamp를 의미)
Manual Acknowledgment
- Kafka 리스너에서 메시지를 수동으로 직접 ack(커밋)하는 방법
예제
더보기
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); // ✅ 수동 모드
return factory;
}
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
System.out.println("수신한 메시지: " + data);
// ✅ 여기서 직접 ack 호출 (오프셋 커밋)
ack.acknowledge();
}
Consumer Record Metadata
- Spring Kafka에서 메시지 메타데이터를 메시디 헤더에 기록해줍니다.
헤더 이름 | 설명 |
KafkaHeaders.OFFSET |
오프셋
|
KafkaHeaders.RECEIVED_KEY |
키
|
KafkaHeaders.RECEIVED_TOPIC |
토픽
|
KafkaHeaders.RECEIVED_PARTITION |
파티션
|
KafkaHeaders.RECEIVED_TIMESTAMP |
메시지가 수신된 타임스탬프
|
KafkaHeaders.TIMESTAMP_TYPE |
타임스탬프의 타입 (예: CREATE, LOG_APPEND)
|
예시
더보기
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
// 메타데이터와 함께 메시지 처리
}
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
// 메타데이터 활용
}
- ConsumerRecordMetadata로 한번에 받을 수 있음
Batch Listeners
- 소비자가 poll() 메서드로 가져온 전체 메시지 배치를 리스닝하는 기능입니다
- batchListener 속성을 활성화해야 합니다.
설정
더보기
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // 배치 리스너 활성화
return factory;
}
예시
더보기
@KafkaListener(id = "batchListener", topics = "myTopic", batchListener = true)
public void listen(List<String> list) {
...
}
@KafkaListener(id = "batchListener", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}
- 여러 개의 메시지를 리스트(List) 형태로 받을 수 있습니다.
@KafkaListener(id = "batchListenerWithConsumerRecord", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
- ConsumerRecord 객체를 리스트로 받을 수 있습니다
Annotation Properties
속성 | 설명 |
id | consumer group.id |
groupId | consumer group.id (id 속성을 사용하면 무시됨) |
idIsGroup |
id 속성을 group.id로 사용할지 여부를 제어하는 옵션
|
topics | 구독할 토픽 |
beanRef | 특수 토큰 지정 |
properties |
consumer properties
|
예시
더보기
@KafkaListener(topics = {"topic1", "topic2"})
public void listen(String message) {
System.out.println("Received message: " + message);
}
@KafkaListener(id = "myGroupId", topics = "myTopic", idIsGroup = false)
public void listen(String message) {
System.out.println("Received message: " + message);
}
- 이 경우엔 groupId가 group.id로 설정됨
@Bean
public MyListener myListener() {
return new MyListener("customTopic");
}
@KafkaListener(beanRef = "myListener", topics = "#{myListener.topic}")
public void listen(String message) {
System.out.println("Received message: " + message);
}
@KafkaListener(topics = "myTopic", properties = {
"max.poll.interval.ms:60000",
"consumer.fetch.min.bytes:100"
})
public void listen(String message) {
System.out.println("Received message: " + message);
}
SpEL
속성 | 설명 |
groupId |
SpEL(스프링 표현식 언어)을 사용하여 동적으로 group.id를 설정할 수 있습니다.
예: #{someBean.someProperty}.group |
topics |
SpEL을 사용하여 동적으로 토픽을 설정할 수 있습니다.
예: #{someBean.topic} |
__listener |
현재 빈 인스턴스를 나타내는 특수 토큰
빈의 속성 값을 참조할 수 있습니다. |
properties |
SpEL을 사용하여 Kafka 소비자 속성을 설정할 수 있습니다.
예: "value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer" |
2. @KafkaListener Attribute Modification
AnnotationEnhancer
- 속성 값들을 수정할 수 있는 함수형 인터페이스
- 모든 리스너에 대해 일괄적으로 속성이 변경 가능함
- AnnotationEnhancer 빈 정의는 static으로 선언해야 함 (애플리케이션 컨텍스트 초기화 과정에서 필요)
설정
더보기
@Bean
public static AnnotationEnhancer groupIdEnhancer() {
return (attrs, element) -> {
attrs.put("groupId", attrs.get("id") + "." +
(element instanceof Class
? ((Class<?>) element).getSimpleName()
: ((Method) element).getDeclaringClass().getSimpleName()
+ "." + ((Method) element).getName()));
return attrs;
};
}
- id 속성을 기반으로 groupId를 동적으로 생성
출처
'Spring > Spring for Apache Kafka' 카테고리의 다른 글
[Spring for Apache Kafka] Transactions (0) | 2025.05.08 |
---|---|
[Spring for Apache Kafka] 3-3. Receiving Messages: Post-Processing (0) | 2025.05.07 |
[Spring for Apache Kafka] 4. Application Events (0) | 2025.05.06 |
[Spring for Apache Kafka] 3-1. Receiving Messages: Message Listener (0) | 2025.05.06 |
[Spring for Apache Kafka] 2. Sending Messages (0) | 2025.05.06 |