1. Forwarding Listener Results using @SendTo
- @KafkaListener에 @SendTo 애너테이션을 함께 사용하면, 리스너 메소드의 반환값을 지정된 토픽으로 자동 전달함
토픽 지정
유형 | 설명 | 예시 |
고정된 토픽 | 애플리케이션 초기화 시 지정된 토픽에 메시지 전달 | @SendTo("someTopic") |
애플리케이션 초기화 시 결정되는 SpEL | 애플리케이션 초기화 시 평가되어 결정됨 | @SendTo("#{someExpression}") |
실행 시 결정되는 SpEL | 동적으로 토픽을 결정 (request, source, result 등 사용 가능) |
@SendTo("!{request.value()}") |
빈 토픽 이름 | !{source.headers['kafka_replyTopic']}로 간주 | @SendTo() |
설정
더보기
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
return factory;
}
항목 | 설명 | 예시 및 세부 사항 |
리스너 컨테이너 팩토리 | Kafka 리스너의 설정을 관리하는 팩토리. KafkaTemplate을 설정해야 함 |
자동으로 설정되거나 수동으로 설정할 수 있음
(예: @EnableKafka와 함께) |
ReplyHeadersConfigurer | 응답 메시지의 헤더를 설정하는 데 사용됨. 메시지에 대한 헤더 설정을 처리 |
응답 메시지에 추가 헤더를 설정하기 위해 사용
(예: ReplyHeadersConfigurer로 헤더 추가) |
예제
메시지 해더 설정
더보기
@KafkaListener(id = "messageReturned", topics = "someTopic")
public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
@Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.setHeader("someOtherHeader", "someValue")
.build();
}
예외 처리
더보기
@KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic", errorHandler = "voidSendToErrorHandler")
@SendTo("failures")
public void voidListenerWithReplyingErrorHandler(String in) {
throw new RuntimeException("fail");
}
@Bean
public KafkaListenerErrorHandler voidSendToErrorHandler() {
return (m, e) -> {
return ... // 실패한 메시지 및 관련 정보
};
}
Iterable 반환
더보기
@KafkaListener(topics = "someTopic", splitIterables = false)
public Collection<String> batchListener(List<String> in) {
return in.stream().map(String::toUpperCase).collect(Collectors.toList());
}
2. Filtering Messages
- 특정 시나리오에서는 리밸런싱과 같은 이유로 이미 처리된 메시지가 다시 전달될 수 있음
- 이때, 프레임워크는 해당 메시지의 처리 여부를 알지 못합니다.
FilteringMessageListenerAdapter
- 증복 메시지를 필터링하는 역할
- 리스너가 필터링 어댑터로 감싸지도록 하여, 해당 리스너가 메시지 필터링 기능을 갖도록 함
- RecordFilterStrategy 구현체를 사용하여 필터링 기준을 정의함
RecordFilterStrategy
- 필터링 기준을 정의하는 인터페이스
예제) 단일 메시지
더보기
특정 단어로 필터링
public class CustomRecordFilterStrategy implements RecordFilterStrategy<String, String> {
@Override
public boolean filter(ConsumerRecord<String, String> record) {
// 메시지 값이 "duplicate"일 경우 필터링
return record.value().equals("duplicate");
}
}
@KafkaListener(id = "filteredListener", topics = "someTopic", filter = "customRecordFilterStrategy")
public void listen(String message) {
System.out.println("Received message: " + message);
}
예제) 배치 메시지
더보기
빈 배치 전략
public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
@Override
public List<ConsumerRecord<String, String>> filterBatch(List<ConsumerRecord<String, String>> consumerRecords) {
// 예시: 배치 내 특정 조건에 맞지 않는 메시지들을 모두 필터링
return consumerRecords.stream()
.filter(record -> !"specificKey".equals(record.key())) // 특정 키를 가진 메시지만 제외
.collect(Collectors.toList());
}
@Override
public boolean ignoreEmptyBatch() {
return true;
}
}
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
- 개별 메시지 필터링 결과로 모든 메시지가 제거되었을 때, 그 결과로 빈 배치가 생기면 이 빈 배치를 무시할 수 있도록 하는 전략
- 빈 배치가 발생했을 때 리스너가 호출되지 않도록 할 수 있음
출처
'Spring > Spring for Apache Kafka' 카테고리의 다른 글
[Spring for Apache Kafka] Exactly Once Semantics (0) | 2025.05.08 |
---|---|
[Spring for Apache Kafka] Transactions (0) | 2025.05.08 |
[Spring for Apache Kafka] 3-2. Receiving Messages: @KafkaListener (0) | 2025.05.07 |
[Spring for Apache Kafka] 4. Application Events (0) | 2025.05.06 |
[Spring for Apache Kafka] 3-1. Receiving Messages: Message Listener (0) | 2025.05.06 |