Spring/Spring for Apache Kafka

[Spring for Apache Kafka] 3-2. Receiving Messages: @KafkaListener

noahkim_ 2025. 5. 7. 14:45

1. @KafkaListener Annotation

  • Bean 메서드를 Kafka 리스너로 지정할 때 사용
  • Listener Container 안에서 실행되며, 내부적으로 MessagingMessageListenerAdapter로 감싸집니다
    • 메서드 파라미터에 맞게 Converter 등을 설정해줌
    • 대부분의 속성들은 SpEL(#{...}) 또는 **프로퍼티 플레이스홀더(${...})**를 이용해 동적으로 지정할 수 있음

 

Record Listeners

  • @KafkaListener를 붙이면 적용됨
  • Listener Container Factory 필요 (이름이 kafkaListenerContainerFactory인 빈을 찾음)

 

예제

더보기
public class Listener {
    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        // 메시지 처리
    }
}

 

Explicit Partition Assignment

  • 특정 토픽의 특정 파티션만 리스닝하도록 직접 지정할 수 있습니다

 

예제

더보기
@KafkaListener(id = "thing2", topicPartitions = {
    @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
    @TopicPartition(topic = "topic2", partitions = "0",
        partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
    // 메시지 처리
}
@KafkaListener(id = "pp", autoStartup = "false",
    topicPartitions = @TopicPartition(topic = "topic1",
        partitions = "0-5,7,10-15"))
public void process(String in) {
    // 0,1,2,3,4,5,7,10,11,12,13,14,15번 파티션 수신
}
@KafkaListener(id = "thing3", topicPartitions = {
    @TopicPartition(topic = "topic1",
        partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
    // 0~5번 파티션에 대해 offset 0부터 수신
}
@KafkaListener(id = "seekPositionTime", topicPartitions = {
    @TopicPartition(topic = "topic1", partitionOffsets = {
        @PartitionOffset(partition = "0", initialOffset = "723916800000", seekPosition = "TIMESTAMP"),
        @PartitionOffset(partition = "1", initialOffset = "0", seekPosition = "BEGINNING"),
        @PartitionOffset(partition = "2", initialOffset = "0", seekPosition = "END")
    })
})
public void listen(ConsumerRecord<?, ?> record) {
    // 파티션 0: 특정 timestamp 기준
    // 파티션 1: 가장 처음부터
    // 파티션 2: 가장 마지막부터
}

 

  • END : 가장 마지막 오프셋부터 시작 (initialOffset 무시)
  • BEGINNING : 가장 처음부터 시작 (initialOffset 무시)
  • TIMESTAMP : 특정 timestamp에 해당하는 오프셋부터 시작 (initialOffset이 timestamp를 의미)

 

 

Manual Acknowledgment

  • Kafka 리스너에서 메시지를 수동으로 직접 ack(커밋)하는 방법

 

예제

더보기
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> 
        kafkaManualAckListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); // ✅ 수동 모드
    return factory;
}
@KafkaListener(id = "cat", topics = "myTopic",
        containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    System.out.println("수신한 메시지: " + data);

    // ✅ 여기서 직접 ack 호출 (오프셋 커밋)
    ack.acknowledge();
}

 

Consumer Record Metadata

  • Spring Kafka에서 메시지 메타데이터를 메시디 헤더에 기록해줍니다.
헤더 이름 설명
KafkaHeaders.OFFSET
오프셋
KafkaHeaders.RECEIVED_KEY
KafkaHeaders.RECEIVED_TOPIC
토픽
KafkaHeaders.RECEIVED_PARTITION
파티션
KafkaHeaders.RECEIVED_TIMESTAMP
메시지가 수신된 타임스탬프
KafkaHeaders.TIMESTAMP_TYPE
타임스탬프의 타입 (예: CREATE, LOG_APPEND)

 

예시

더보기
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
    // 메타데이터와 함께 메시지 처리
}
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
    // 메타데이터 활용
}
  • ConsumerRecordMetadata로 한번에 받을 수 있음

 

Batch Listeners

  • 소비자가 poll() 메서드로 가져온 전체 메시지 배치를 리스닝하는 기능입니다
  • batchListener 속성을 활성화해야 합니다.

 

설정

더보기
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);  // 배치 리스너 활성화
    return factory;
}

 

예시

더보기
@KafkaListener(id = "batchListener", topics = "myTopic", batchListener = true)
public void listen(List<String> list) {
    ...
}
@KafkaListener(id = "batchListener", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}
  • 여러 개의 메시지를 리스트(List) 형태로 받을 수 있습니다.

 

@KafkaListener(id = "batchListenerWithConsumerRecord", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
    ...
}
  • ConsumerRecord 객체를 리스트로 받을 수 있습니다

 

Annotation Properties

속성 설명
id consumer group.id
groupId consumer group.id (id 속성을 사용하면 무시됨)
idIsGroup
id 속성을 group.id로 사용할지 여부를 제어하는 옵션
topics 구독할 토픽
beanRef 특수 토큰 지정
properties
consumer properties

 

예시

더보기
@KafkaListener(topics = {"topic1", "topic2"})
public void listen(String message) {
    System.out.println("Received message: " + message);
}
@KafkaListener(id = "myGroupId", topics = "myTopic", idIsGroup = false)
public void listen(String message) {
    System.out.println("Received message: " + message);
}
  • 이 경우엔 groupId가 group.id로 설정됨

 

@Bean
public MyListener myListener() {
    return new MyListener("customTopic");
}

@KafkaListener(beanRef = "myListener", topics = "#{myListener.topic}")
public void listen(String message) {
    System.out.println("Received message: " + message);
}
@KafkaListener(topics = "myTopic", properties = {
    "max.poll.interval.ms:60000",
    "consumer.fetch.min.bytes:100"
})
public void listen(String message) {
    System.out.println("Received message: " + message);
}

 

SpEL
속성 설명
groupId
SpEL(스프링 표현식 언어)을 사용하여 동적으로 group.id를 설정할 수 있습니다.
예: #{someBean.someProperty}.group
topics
SpEL을 사용하여 동적으로 토픽을 설정할 수 있습니다.
예: #{someBean.topic}
__listener
현재 빈 인스턴스를 나타내는 특수 토큰
빈의 속성 값을 참조할 수 있습니다.
properties
SpEL을 사용하여 Kafka 소비자 속성을 설정할 수 있습니다.
예: "value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer"

 

2. @KafkaListener Attribute Modification

AnnotationEnhancer

  • 속성 값들을 수정할 수 있는 함수형 인터페이스
  • 모든 리스너에 대해 일괄적으로 속성이 변경 가능함
  • AnnotationEnhancer 빈 정의는 static으로 선언해야 함 (애플리케이션 컨텍스트 초기화 과정에서 필요)

 

설정

더보기
@Bean
public static AnnotationEnhancer groupIdEnhancer() {
    return (attrs, element) -> {
        attrs.put("groupId", attrs.get("id") + "." + 
            (element instanceof Class
                ? ((Class<?>) element).getSimpleName()
                : ((Method) element).getDeclaringClass().getSimpleName()
                    +  "." + ((Method) element).getName()));
        return attrs;
    };
}
  • id 속성을 기반으로 groupId를 동적으로 생성 

 


출처