Spring/Spring for Apache Kafka

[Spring for Apache Kafka] 3-3. Receiving Messages: Post-Processing

noahkim_ 2025. 5. 7. 14:48

1. Forwarding Listener Results using @SendTo

  • @KafkaListener에 @SendTo 애너테이션을 함께 사용하면, 리스너 메소드의 반환값을 지정된 토픽으로 자동 전달함

 

토픽 지정

유형 설명 예시
고정된 토픽 애플리케이션 초기화 시 지정된 토픽에 메시지 전달 @SendTo("someTopic")
애플리케이션 초기화 시 결정되는 SpEL 애플리케이션 초기화 시 평가되어 결정됨 @SendTo("#{someExpression}")
실행 시 결정되는 SpEL 동적으로 토픽을 결정
(request, source, result 등 사용 가능)
@SendTo("!{request.value()}")
빈 토픽 이름 !{source.headers['kafka_replyTopic']}로 간주 @SendTo()

 

설정

더보기
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
    return factory;
}

 

항목 설명 예시 및 세부 사항
리스너 컨테이너 팩토리 Kafka 리스너의 설정을 관리하는 팩토리.
KafkaTemplate을 설정해야 함
자동으로 설정되거나 수동으로 설정할 수 있음
(예: @EnableKafka와 함께)
ReplyHeadersConfigurer 응답 메시지의 헤더를 설정하는 데 사용됨.
메시지에 대한 헤더 설정을 처리
응답 메시지에 추가 헤더를 설정하기 위해 사용
(예: ReplyHeadersConfigurer로 헤더 추가)

 

예제

메시지 해더 설정

더보기
@KafkaListener(id = "messageReturned", topics = "someTopic")
public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
        @Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .setHeader("someOtherHeader", "someValue")
            .build();
}

 

예외 처리

더보기
@KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic", errorHandler = "voidSendToErrorHandler")
@SendTo("failures")
public void voidListenerWithReplyingErrorHandler(String in) {
    throw new RuntimeException("fail");
}

@Bean
public KafkaListenerErrorHandler voidSendToErrorHandler() {
    return (m, e) -> {
        return ... // 실패한 메시지 및 관련 정보
    };
}

 

Iterable 반환

더보기
@KafkaListener(topics = "someTopic", splitIterables = false)
public Collection<String> batchListener(List<String> in) {
    return in.stream().map(String::toUpperCase).collect(Collectors.toList());
}

 

2. Filtering Messages

  • 특정 시나리오에서는 리밸런싱과 같은 이유로 이미 처리된 메시지가 다시 전달될 수 있음
  • 이때, 프레임워크는 해당 메시지의 처리 여부를 알지 못합니다.

 

FilteringMessageListenerAdapter

  • 증복 메시지를 필터링하는 역할
  • 리스너가 필터링 어댑터로 감싸지도록 하여, 해당 리스너가 메시지 필터링 기능을 갖도록 함
  • RecordFilterStrategy 구현체를 사용하여 필터링 기준을 정의함

 

RecordFilterStrategy

  • 필터링 기준을 정의하는 인터페이스


예제) 단일 메시지

더보기

특정 단어로 필터링

public class CustomRecordFilterStrategy implements RecordFilterStrategy<String, String> {

    @Override
    public boolean filter(ConsumerRecord<String, String> record) {
        // 메시지 값이 "duplicate"일 경우 필터링
        return record.value().equals("duplicate");
    }
}
@KafkaListener(id = "filteredListener", topics = "someTopic", filter = "customRecordFilterStrategy")
public void listen(String message) {
    System.out.println("Received message: " + message);
}

 

 

 

예제) 배치 메시지

더보기

빈 배치 전략

public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
    @Override
    public List<ConsumerRecord<String, String>> filterBatch(List<ConsumerRecord<String, String>> consumerRecords) {
        // 예시: 배치 내 특정 조건에 맞지 않는 메시지들을 모두 필터링
        return consumerRecords.stream()
            .filter(record -> !"specificKey".equals(record.key())) // 특정 키를 가진 메시지만 제외
            .collect(Collectors.toList());
    }

    @Override
    public boolean ignoreEmptyBatch() {
        return true;
    }
}
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
    ...
}
  • 개별 메시지 필터링 결과로 모든 메시지가 제거되었을 때, 그 결과로 빈 배치가 생기면 이 빈 배치를 무시할 수 있도록 하는 전략
  • 빈 배치가 발생했을 때 리스너가 호출되지 않도록 할 수 있음

 


출처