Spring/Spring for Apache Kafka

[Spring for Apache Kafka] 4. Application Events

noahkim_ 2025. 5. 6. 19:17

1. 이벤트

  • Spring Kafka는 여러 소비자 관련 이벤트를 발행함
카테고리 이벤트명 설명 주요 속성
소비자 생애주기
ConsumerStartingEvent 소비자 스레드가 생성될 때 발생
(poll 시작 전)
container, source
ConsumerStartedEvent 소비자가 실제로 폴링을 시작하려고 할 때 발생
container, source
ConsumerFailedToStartEvent 소비자가 시작되지 못한 경우 발생
(consumerStartTimeout 초과)
container, source
ConsumerStoppingEvent 소비자가 정지 직전에 발생
container, partitions
ConsumerStoppedEvent 소비자가 정지된 후 발생
container, source, reason
ConsumerRetryAuthEvent 인증 또는 권한 부여 실패 시 재시도 시작
container, reason
ConsumerRetryAuthSuccessfulEvent 인증/권한 부여 재시도 성공 시 발생
container, source
유휴/활성 상태
ListenerContainerIdleEvent 컨테이너가 일정 시간 동안 메시지를 받지 못했을 때 발생
container, topicPartition, consumer, paused, idleTime
ListenerContainerNoLongerIdleEvent 유휴 상태였던 컨테이너가 다시 메시지를 소비할 때 발생
container, source
ListenerContainerPartitionIdleEvent 특정 파티션이 일정 시간 동안 메시지를 받지 못했을 때 발생
container, topicPartition, consumer, paused, idleTime
ListenerContainerPartitionNoLongerIdleEvent 유휴 상태였던 파티션이 다시 메시지를 소비할 때 발생
container, source
NonResponsiveConsumerEvent 컨슈머가 poll()에서 오래 멈춰있을 때 발생
container, topicPartitions, consumer, paused, timeSinceLastPoll
일시정지/재개
ConsumerPausedEvent 컨테이너 전체가 일시정지 되었을 때 발생
container, partitions
ConsumerResumedEvent 컨테이너 전체가 재개되었을 때 발생
container, partitions
ConsumerPartitionPausedEvent 특정 파티션이 일시정지 되었을 때 발생
container, partition
ConsumerPartitionResumedEvent 특정 파티션이 재개되었을 때 발생
container, partition
오류
ConsumerRetryAuthEvent 인증 실패로 인한 재시도 발생
container, reason
ConsumerFailedToStartEvent 스레드 부족 등으로 소비자가 시작 실패
container, source
ConsumerStoppedEvent 에러로 인한 소비자 정지
container, reason
컨테이너 정지
ContainerStoppedEvent 모든 컨슈머가 정지했을 때 발생
(부모 컨테이너도 발행)
container, source
ConcurrentContainerStoppedEvent ConcurrentMessageListenerContainer가 정지했을 때 발생
container, source

 

속성

용어 설명
source
이벤트를 발생시킨 리스너 컨테이너 인스턴스
container
부모 컨테이너 또는 본인 컨테이너 인스턴스
reason
이벤트 종료 이유 (NORMAL, ERROR, FENCED, AUTH, NO_OFFSET 등)
idleTime
마지막 메시지를 받은 이후 경과한 시간 (밀리초 단위)
timeSinceLastPoll
마지막 poll() 호출 이후 경과 시간 (밀리초 단위)
partitions
컨테이너 또는 이벤트 대상인 파티션 리스트
partition
이벤트 대상이 되는 단일 파티션
topicPartitions
이벤트 대상이 되는 토픽+파티션 리스트
topicPartition
이벤트 대상이 되는 단일 토픽+파티션
consumer
실제 Kafka Consumer 객체
paused
현재 consumer 또는 partition이 일시정지 상태인지 여부

 

2. 이벤트 발행

예제

ListenerContainerIdleEvent

더보기
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    ...
    containerProps.setIdleEventInterval(60000L);
    ...
    KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
    return container;
}
// @KafkaListener 용

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.getContainerProperties().setIdleEventInterval(60000L);
    ...
    return factory;
}
  • setIdleEventInterval(): 컨테이너가 어느 시간 이상으로 메시지를 받지 못하면 ListenerContainerIdleEvent 발행

 

3. 이벤트 캡쳐

  • Spring의 ApplicationListener로 모든 컨테이너의 이벤트를 캡쳐할 수 있음
    • listener ID를 확인해서 특정 컨테이너의 이벤트만 필터링할 필요가 있음.
    • @EventListener의 condition 속성을 이용해서 원하는 이벤트만 받을 수 있음.

 

컨테이너 중지 시 유의사항

  • 리스너 스레드에서 바로 컨테이너를 중지하면 안됨
  • 다른 스레드로 중지해야 함

 

유휴 상태에서 현재 오프셋 확인

  • ConsumerSeekAware 인터페이스 구현

 

 

예제) @KafkaListener와 @EventListener

더보기
public class Listener {

    @KafkaListener(id = "qux", topics = "annotated")
    public void listen4(@Payload String foo, Acknowledgment ack) {
        ...
    }

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        ...
    }

}

 

예제) 새로운 스레드로 컨테이너 중지하기

더보기
@Component
public class Listener {

    private final KafkaListenerEndpointRegistry registry;

    public Listener(KafkaListenerEndpointRegistry registry) {
        this.registry = registry;
    }

    @KafkaListener(id = "myListener", topics = "myTopic")
    public void listen(String data) {
        // 메시지 소비 처리
    }

    @EventListener(condition = "#event.listenerId.startsWith('myListener')")
    public void idleEventListener(ListenerContainerIdleEvent event) {
        // 새로운 스레드로 stop() 호출
        new Thread(() -> {
            MessageListenerContainer container = registry.getListenerContainer(event.getListenerId());
            if (container != null) {
                container.stop();
            }
        }).start();
    }
}
  • 자식 컨테이너를 직접 정지하는 게 아니라, 부모 ConcurrentContainer를 정지해야 한다.

 

예제) 유휴 상태에서 현재 오프셋 확인

더보기
@Component
public class Listener implements ConsumerSeekAware {

    @KafkaListener(id = "myListener", topics = "myTopic")
    public void listen(String data) {
        // 메시지 소비 처리
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, Consumer<?, ?> consumer) {
        // 유휴 상태일 때 호출됨
        assignments.forEach((topicPartition, offset) -> {
            System.out.println("현재 파티션: " + topicPartition + ", 오프셋: " + offset);
        });
    }

    // 나머지 ConsumerSeekAware 메서드들은 필요 없으면 빈 구현으로 놔둬도 됨
    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {}

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {}
}
  • onIdleContainer(): 유휴 상태가 감지되면 자동 호출.

 


출처