1. 이벤트
- Spring Kafka는 여러 소비자 관련 이벤트를 발행함
카테고리 | 이벤트명 | 설명 | 주요 속성 |
소비자 생애주기
|
ConsumerStartingEvent | 소비자 스레드가 생성될 때 발생 (poll 시작 전) |
container, source
|
ConsumerStartedEvent | 소비자가 실제로 폴링을 시작하려고 할 때 발생 |
container, source
|
|
ConsumerFailedToStartEvent | 소비자가 시작되지 못한 경우 발생 (consumerStartTimeout 초과) |
container, source
|
|
ConsumerStoppingEvent | 소비자가 정지 직전에 발생 |
container, partitions
|
|
ConsumerStoppedEvent | 소비자가 정지된 후 발생 |
container, source, reason
|
|
ConsumerRetryAuthEvent | 인증 또는 권한 부여 실패 시 재시도 시작 |
container, reason
|
|
ConsumerRetryAuthSuccessfulEvent | 인증/권한 부여 재시도 성공 시 발생 |
container, source
|
|
유휴/활성 상태
|
ListenerContainerIdleEvent | 컨테이너가 일정 시간 동안 메시지를 받지 못했을 때 발생 |
container, topicPartition, consumer, paused, idleTime
|
ListenerContainerNoLongerIdleEvent | 유휴 상태였던 컨테이너가 다시 메시지를 소비할 때 발생 |
container, source
|
|
ListenerContainerPartitionIdleEvent | 특정 파티션이 일정 시간 동안 메시지를 받지 못했을 때 발생 |
container, topicPartition, consumer, paused, idleTime
|
|
ListenerContainerPartitionNoLongerIdleEvent | 유휴 상태였던 파티션이 다시 메시지를 소비할 때 발생 |
container, source
|
|
NonResponsiveConsumerEvent | 컨슈머가 poll()에서 오래 멈춰있을 때 발생 |
container, topicPartitions, consumer, paused, timeSinceLastPoll
|
|
일시정지/재개
|
ConsumerPausedEvent | 컨테이너 전체가 일시정지 되었을 때 발생 |
container, partitions
|
ConsumerResumedEvent | 컨테이너 전체가 재개되었을 때 발생 |
container, partitions
|
|
ConsumerPartitionPausedEvent | 특정 파티션이 일시정지 되었을 때 발생 |
container, partition
|
|
ConsumerPartitionResumedEvent | 특정 파티션이 재개되었을 때 발생 |
container, partition
|
|
오류
|
ConsumerRetryAuthEvent | 인증 실패로 인한 재시도 발생 |
container, reason
|
ConsumerFailedToStartEvent | 스레드 부족 등으로 소비자가 시작 실패 |
container, source
|
|
ConsumerStoppedEvent | 에러로 인한 소비자 정지 |
container, reason
|
|
컨테이너 정지
|
ContainerStoppedEvent | 모든 컨슈머가 정지했을 때 발생 (부모 컨테이너도 발행) |
container, source
|
ConcurrentContainerStoppedEvent | ConcurrentMessageListenerContainer가 정지했을 때 발생 |
container, source
|
속성
용어 | 설명 |
source |
이벤트를 발생시킨 리스너 컨테이너 인스턴스
|
container |
부모 컨테이너 또는 본인 컨테이너 인스턴스
|
reason |
이벤트 종료 이유 (NORMAL, ERROR, FENCED, AUTH, NO_OFFSET 등)
|
idleTime |
마지막 메시지를 받은 이후 경과한 시간 (밀리초 단위)
|
timeSinceLastPoll |
마지막 poll() 호출 이후 경과 시간 (밀리초 단위)
|
partitions |
컨테이너 또는 이벤트 대상인 파티션 리스트
|
partition |
이벤트 대상이 되는 단일 파티션
|
topicPartitions |
이벤트 대상이 되는 토픽+파티션 리스트
|
topicPartition |
이벤트 대상이 되는 단일 토픽+파티션
|
consumer |
실제 Kafka Consumer 객체
|
paused |
현재 consumer 또는 partition이 일시정지 상태인지 여부
|
2. 이벤트 발행
예제
ListenerContainerIdleEvent
더보기
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
return container;
}
// @KafkaListener 용
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
- setIdleEventInterval(): 컨테이너가 어느 시간 이상으로 메시지를 받지 못하면 ListenerContainerIdleEvent 발행
3. 이벤트 캡쳐
- Spring의 ApplicationListener로 모든 컨테이너의 이벤트를 캡쳐할 수 있음
- listener ID를 확인해서 특정 컨테이너의 이벤트만 필터링할 필요가 있음.
- @EventListener의 condition 속성을 이용해서 원하는 이벤트만 받을 수 있음.
컨테이너 중지 시 유의사항
- 리스너 스레드에서 바로 컨테이너를 중지하면 안됨
- 다른 스레드로 중지해야 함
유휴 상태에서 현재 오프셋 확인
- ConsumerSeekAware 인터페이스 구현
예제) @KafkaListener와 @EventListener
더보기
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
예제) 새로운 스레드로 컨테이너 중지하기
더보기
@Component
public class Listener {
private final KafkaListenerEndpointRegistry registry;
public Listener(KafkaListenerEndpointRegistry registry) {
this.registry = registry;
}
@KafkaListener(id = "myListener", topics = "myTopic")
public void listen(String data) {
// 메시지 소비 처리
}
@EventListener(condition = "#event.listenerId.startsWith('myListener')")
public void idleEventListener(ListenerContainerIdleEvent event) {
// 새로운 스레드로 stop() 호출
new Thread(() -> {
MessageListenerContainer container = registry.getListenerContainer(event.getListenerId());
if (container != null) {
container.stop();
}
}).start();
}
}
- 자식 컨테이너를 직접 정지하는 게 아니라, 부모 ConcurrentContainer를 정지해야 한다.
예제) 유휴 상태에서 현재 오프셋 확인
더보기
@Component
public class Listener implements ConsumerSeekAware {
@KafkaListener(id = "myListener", topics = "myTopic")
public void listen(String data) {
// 메시지 소비 처리
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, Consumer<?, ?> consumer) {
// 유휴 상태일 때 호출됨
assignments.forEach((topicPartition, offset) -> {
System.out.println("현재 파티션: " + topicPartition + ", 오프셋: " + offset);
});
}
// 나머지 ConsumerSeekAware 메서드들은 필요 없으면 빈 구현으로 놔둬도 됨
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {}
}
- onIdleContainer(): 유휴 상태가 감지되면 자동 호출.
출처
'Spring > Spring for Apache Kafka' 카테고리의 다른 글
[Spring for Apache Kafka] 3-3. Receiving Messages: Post-Processing (0) | 2025.05.07 |
---|---|
[Spring for Apache Kafka] 3-2. Receiving Messages: @KafkaListener (0) | 2025.05.07 |
[Spring for Apache Kafka] 3-1. Receiving Messages: Message Listener (0) | 2025.05.06 |
[Spring for Apache Kafka] 2. Sending Messages (0) | 2025.05.06 |
[Spring for Apache Kafka] 1. Basic (0) | 2025.05.06 |