카테고리 없음

[Spring for Apache Kafka] 3. Receiving Messages: Message Listener

noahkim_ 2025. 5. 6. 19:03

이 문서는 해당 섹션의 일부분만 정리되었음.

 

1. Message Listeners

  • Kafka에서 메시지를 처리하려면 리스너를 제공해야 함
인터페이스 설명
MessageListener 개별 메시지 처리
AcknowledgingMessageListener 메시지 처리 후 수동으로 오프셋을 커밋
ConsumerAwareMessageListener 메시지 처리 시 Consumer 객체 접근
BatchMessageListener 배치 메시지 처리
BatchAcknowledgingMessageListener 배치 메시지 처리 후 수동 커밋

 

예제

더보기

MessageListener

@KafkaListener(topics = "topic1")
public void onMessage(ConsumerRecord<String, String> record) {
    System.out.println(record.value());
}

 

AcknowledgingMessageListener

@KafkaListener(topics = "topic1")
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
    System.out.println(record.value());
    ack.acknowledge();  // 메시지를 처리한 후 수동으로 오프셋을 커밋
}

 

ConsumerAwareMessageListener

@KafkaListener(topics = "topic1")
public void onMessage(ConsumerRecord<String, String> record, Consumer<?, ?> consumer) {
    System.out.println(record.value());
    // Consumer 객체에 접근하여 추가 작업 가능
}

 

BatchMessageListener

@KafkaListener(topics = "topic1")
public void onMessage(List<ConsumerRecord<String, String>> records) {
    records.forEach(record -> System.out.println(record.value()));
}

 

BatchAcknowledgingMessageListener

@KafkaListener(topics = "topic1")
public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
    records.forEach(record -> System.out.println(record.value()));
    ack.acknowledge();  // 모든 메시지를 처리한 후 수동으로 오프셋을 커밋
}

 

2. Message Listener Containers

  • Kafka Consumer 관리 컨테이너
  • 네트워크에서 메시지를 poll()하고, 해당 메시지를 리스너에게 전달
  • 하나의 컨슈머당 하나의 컨테이너로 관리함
항목 설명
트랜잭션 관리
Kafka Consumer가 메시지를 처리하는 동안 트랜잭션을 관리
- 메시지의 처리 상태에 따라 커밋 또는 롤백 수행
스레드 관리
여러 스레드를 활용하여 병렬로 메시지 처리
오류 처리 메시지 처리 중 오류가 발생하면 예외 처리로 변환함
- @MessageExceptionHandler: 리스너에서 발생한 예외를 처리합니다.
재시도
메시지 처리 중 실패가 발생한 경우, 일정 횟수만큼 재시도를 시도합니다.
- 재시도 로직을 설정할 수 있습니다.

 

Interceptor

  • 트랜잭션이 시작되기 전, 호출됨 (interceptBeforeTx 설정 시, 트랜잭션 시작 후 호출)

 

구현체
종류 설명
RecordInterceptor
단일 레코드를 처리하는 리스너용 인터셉터
- 리스너 호출 전, null 반환 시 리스너 호출 안 됨
- 리스너 호출 후, 예외 발생해도 호출 가능
BatchInterceptor 배치 리스너용 인터셉터
CompositeRecordInterceptor 여러 개의 interceptor를 체인처럼 연결할 수 있음.
CompositeBatchInterceptor 여러 개의 interceptor를 체인처럼 연결할 수 있음.

 

유의 사항
  • interceptor 안에서는 consumer의 position이나 커밋된 offset을 변경하는 작업을 하면 안 됨.
  • 레코드를 변조할 경우에도 topic, partition, offset은 그대로 유지해야 함 (안 그러면 레코드 유실 가능).

 

DefaultKafkaConsumerFactory

  • KafkaConsumer 객체를 생성하는 클래스
  • KafkaListenerContainer나 @KafkaListener를 사용할 때 내부적으로 KafkaConsumer 인스턴스를 만들어줌

 

설정

더보기
@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
              "org.apache.kafka.clients.consumer.RoundRobinAssignor");

    return new DefaultKafkaConsumerFactory<>(props);
}

 

KafkaMessageListenerContainer

  • 단일 스레드로 모든 토픽/파티션의 메시지를 수신

 

옵션
기능 설명
logContainerConfig
컨테이너가 시작될 때 설정 요약을 INFO 레벨로 로깅.
commitLogLevel
offset 커밋 관련 로그 레벨 지정 가능
missingTopicsFatal 설정한 토픽이 존재하지 않으면 컨테이너가 시작 실패.
authExceptionRetryInterval
인증/인가 오류 발생 시 재시도 대기 시간 설정 가능

 

 

예제) 생성자

더보기
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                                     ContainerProperties containerProperties)
  • ConsumerFactory와 ContainerProperties를 전달받아 생성

 

예제) 설정

더보기
@Bean
public KafkaMessageListenerContainer<String, String> messageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties containerProps = new ContainerProperties("test-topic");      
    containerProps.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);  // Offset 커밋 로그 레벨
    containerProps.setLogContainerConfig(true);  // 컨테이너 설정 요약 로그
    containerProps.setMissingTopicsFatal(true);  // 설정한 토픽이 없으면 시작 실패
    containerProps.setAuthExceptionRetryInterval(5000);  // 인증 오류 발생 시 5초 대기 후 재시도

    return new KafkaMessageListenerContainer<>(consumerFactory, containerProps);
}

 

ConcurrentMessageListenerContainer

  • 메시지를 '동시에 여러 쓰레드(컨슈머)로' 받게 해주는 컴포넌트
  • 다수의 KafkaMessageListenerContainer에 위임받음
항목 설명
Client ID 규칙
client.id가 설정되면 컨테이너 번호 추가됨
→ JMX 모니터링 시 Consumer MBean 이름이 유일해야 하기 때문.
Metrics 제공
MessageListenerContainer는 KafkaConsumer.metrics() 기반 모니터링 지표 제공.
→ ConcurrentMessageListenerContainer는 여러 하위 컨테이너의 metrics를 Map으로 묶어서 제공.
파티션 할당 전략
컨슈머 그룹 안에서 파티션을 자동 분배.
- 컨슈머 수 ≥ 파티션 수: 1:1 매핑
- 컨슈머 수 < 파티션 수: 일부 컨슈머가 여러 파티션을 담당.

주요 전략
- RangeAssignor: 연속 파티션 할당
- RoundRobinAssignor: 균등 분배
idleBetweenPolls
컨슈머가 poll() 호출하고 다음 poll()까지 쉬는 시간
- poll 간격 = idleBetweenPolls 설정값과 max.poll.interval.ms 중 더 작은 값 으로 결정됨.
TopicPartitionOffsets
특정 토픽-파티션-오프셋을 수동 지정할 때 사용
ConcurrentMessageListenerContainer는 지정된 TopicPartitionOffset 리스트를 하위 컨테이너에 균등하게 분배함.

 

설정

더보기
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
        ConsumerFactory<String, String> consumerFactory) {
    
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(3);

    // 여기서 ContainerProperties의 기본 속성들 설정
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // 수동 커밋 모드
    factory.getContainerProperties().setIdleBetweenPolls(5000L); // 폴링 사이 대기 시간 5초
    factory.getContainerProperties().setPollTimeout(3000L); // poll timeout
    
    // 모든 컨테이너가 동일한 리스너 사용
    factory.getContainerProperties().setMessageListener((MessageListener<String, String>) record -> {
        System.out.println("Received: " + record.value());
    });

    return factory;
}

 

설정) TopicPartitionOffsets

더보기
List<TopicPartitionOffset> offsets = Arrays.asList(
    new TopicPartitionOffset("topic-name", 0, 1000L),
    new TopicPartitionOffset("topic-name", 1, 2000L)
);

ContainerProperties containerProps = new ContainerProperties(offsets.toArray(new TopicPartitionOffset[0]));
containerProps.setMessageListener((MessageListener<String, String>) record -> {
    System.out.println("Received: " + record.value());
});

ConcurrentMessageListenerContainer<String, String> container =
        new ConcurrentMessageListenerContainer<>(consumerFactory, containerProps);
        
container.start();

 

설정) 파티션 분배

더보기

범위 분배

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RangeAssignor

 

 

균등 분배

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

 

예외) 파티션 분배: 범위 분배

더보기

여러 토픽에 대한 파티션 분배

  • 컨슈머 그룹이 여러 토픽을 구독하고, 각 토픽에 여러 개의 파티션이 있을 경우
  • RangeAssignor는 파티션을 연속적인 범위로 묶어서 분배하려 함
  • 이때, 파티션 수가 많은 토픽부터 소비자에게 할당되고 적은 토픽은 나중에 할당될 수 있음

 

Committing Offsets

enable.auto.commit
  • true: kafka가 자동으로 오프셋 커밋
  • false: 컨테이너가 직접 오프셋 커밋 제어 (AckMode에 따라 동작됨)

 

AckMode
AckMode 설명
RECORD
레코드 하나를 처리한 후 즉시 오프셋 커밋
BATCH
poll()로 가져온 모든 레코드를 처리 완료 후 오프셋 커밋
TIME
모든 레코드 처리 후, 마지막 커밋 이후 ackTime 시간이 지나면 커밋
COUNT
모든 레코드 처리 후, 마지막 커밋 이후 ackCount개 레코드가 처리되었으면 커밋
COUNT_TIME
TIME 또는 COUNT 둘 중 하나 조건 만족하면 커밋
MANUAL 리스너가 acknowledge()를 호출해야 함 (호출 후 BATCH처럼 커밋)
MANUAL_IMMEDIATE
리스너가 acknowledge() 호출 즉시 커밋

 

syncCommits
  • 컨테이너 옵션
  • 오프셋 커밋을 동기 혹은 비동기 요청할 지 결정함

 

예제

더보기
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);

// 동기 커밋 사용
factory.getContainerProperties().setSyncCommits(true);

// 비동기 커밋 사용하려면
// factory.getContainerProperties().setSyncCommits(false);

 

Acknowledgment 인터페이스
  • 리스너가 직접 오프셋 커밋 제어를 하도록 하는 인터페이스
메서드 설명
acknowledge()
성공한 레코드까지 정상 커밋
nack()
지금까지 성공한 레코드까지만 커밋
실패한 레코드들은 다시 poll()로 재시도
(반드시 AckMode.MANUAL 설정 필요)

 

예제

더보기
@KafkaListener(topics = "test-topic", containerFactory = "kafkaListenerContainerFactory")
public void listen(String message, Acknowledgment acknowledgment) {
    try {
        System.out.println("처리할 메시지: " + message);
        
        // 메시지 처리 성공
        acknowledgment.acknowledge();  // 오프셋 수동 커밋
    } catch (Exception e) {
        // 메시지 처리 실패
        System.err.println("메시지 처리 실패: " + e.getMessage());
        // nack() 등을 사용할 수도 있음 (AckMode.MANUAL일 때)
    }
}

 


출처