1. DefaultKafkaProducerFactory
- 기본적으로 모든 클라이언트에서 공유하는 단일 Kafka 프로듀서를 생성
설정) DefaultKafkaProducerFactory
더보기
@Configuration
public class KafkaProducerConfig {
@Bean
public DefaultKafkaProducerFactory<String, String> producerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put("producerPerThread", true); // 스레드별 프로듀서 생성
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
2. KafkaTemplate
- Kafka 토픽에 데이터를 전송하는 편리한 메서드 제공
- Kafka 프로듀서를 감싸고 있음
예시) KafkaTemplate
더보기
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
CompletableFuture<SendResult<String, String>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(data);
}
else {
handleFailure(data, record, ex);
}
});
}
send()
- 메시지 전송은 비동기적으로 이루어짐 (CompletableFuture<SendResult> 반환)
- ProducerListener를 통해 성공 및 실패 콜백을 처리할 수 있음.
- 기본적으로 LoggingProducerListener가 제공됨. (전송 실패 시 오류를 로그로 남김)
3. RoutingKafkaTemplate
- 실행 시점에 목적지 토픽 이름을 기준으로 ProducerFactory를 선택할 수 있는 기능을 제공
- 단, 트랜잭션, 실행, 플러시 및 메트릭 작업 지원 ❌
설정) RoutingKafkaTemplate
더보기
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile("two"), bytesPF);
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
return new RoutingKafkaTemplate(map);
}
4. ReplyingKafkaTemplate
- 헤더 기반으로 요청-응답 패턴이 이루어짐
- 요청 서버와 응답 서버가 명확히 분리될 때 사용됩니다. (같은 서버에서 사용하면 무한루프 남)
헤더명 | 역할 |
KafkaHeaders.CORRELATION_ID |
요청과 응답을 연결(매칭)하기 위한 ID.
즉, "이 응답은 이 요청에 대한 것"임을 알기 위함 |
KafkaHeaders.REPLY_TOPIC |
서버(Consumer)가 응답을 어디(어떤 토픽)로 보내야 할지 지정
|
KafkaHeaders.REPLY_PARTITION |
(옵션) 서버가 응답을 보낼 때 특정 파티션을 지정할 수 있음
|
예제) ReplyingKafkaTemplate
더보기
@Autowired
private ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// 요청 보내고 응답 받기
public String sendRequestAndGetReply() throws Exception {
// 요청 메시지 만들기
ProducerRecord<String, String> record =
new ProducerRecord<>("request-topic", "hello");
// 응답 토픽과 Correlation ID 설정
record.headers().add(KafkaHeaders.REPLY_TOPIC, "reply-topic".getBytes());
// sendAndReceive 호출 -> 요청 보내고 응답 비동기 기다림
RequestReplyFuture<String, String, String> future = replyingKafkaTemplate.sendAndReceive(record);
// 응답 받기
ConsumerRecord<String, String> response = future.get();
return response.value(); // 예: "HELLO"
}
- sendAndReceive(): 응답을 CompletableFuture로 비동기적으로 받을 수 있습니다
@SendTo
@KafkaListener(topics = "request-topic")
@SendTo("reply-topic") // 응답을 이 토픽으로 보내라
public String handleRequest(String message) {
System.out.println("요청 받은 메시지: " + message);
return message.toUpperCase(); // 예: "HELLO"
}
Reply Type Message<?>
- @KafkaListener가 응답 메시지를 발행할 때 사용하는 타입
- 헤더 설정으로 요청-응답 패턴 구현
Spring Kafka 버전 | 동작 요약 |
2.5 이전 |
직접 reply topic, correlation id 세팅 필요
|
2.5 이후 |
reply topic, correlation id 자동 세팅
|
3.3 이후 |
요청 메시지의 key도 reply에 자동 복사
|
예제) Message
더보기
2.5 이전
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo) // 답장 보낼 토픽
.setHeader(KafkaHeaders.KEY, 42) // 답장 메시지에 사용할 키
.setHeader(KafkaHeaders.CORRELATION_ID, correlation) // 요청과 답장을 연결할 ID
.build();
}
2.5 이후
@KafkaListener(id = "requestor", topics = "request")
@SendTo // @SendTo에 reply topic을 지정하거나, 요청 메시지에 REPLY_TOPIC이 있으면 자동 세팅
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.KEY, 42) // 답장 키만 설정, 나머지는 자동
.build();
}
5. AggregatingReplyingKafkaTemplate
- 여러 응답을 일정 크기나 조건에 맞춰 집계한 후 하나의 응답으로 프로듀싱하는 Kafka 클라이언트
설정 항목 | 설명 |
releaseStrategy |
응답을 집계하고 완료할 시점을 결정하는 전략
- 예: 응답 수가 일정 크기에 도달하면 결과 반환 |
returnPartialOnTimeout |
타임아웃이 발생해도 일부 응답만으로 결과를 반환하도록 설정. (기본값: false)
|
AckMode.MANUAL |
오프셋을 수동으로 커밋하도록 설정.
- 응답이 모두 집계되었을 때만 커밋. |
예제) AggregatingReplyingKafkaTemplate
더보기
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
coll -> coll.size() >= 3, // releaseStrategy: 3개의 응답을 받은 후 결과 반환
true); // returnPartialOnTimeout: 타임아웃 발생해도 일부 응답 반환
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
template.sendAndReceive(new ProducerRecord<>("requestTopic", 1, "RequestData"));
// 3개의 응답이 수집될 때까지 기다림
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord = future.get(30, TimeUnit.SECONDS);
- 응답: ConsumerRecord의 컬렉션으로 수집하여 반환
ListenerContainerProperties properties = new ListenerContainerProperties();
properties.setAckMode(AckMode.MANUAL); // 수동으로 오프셋을 커밋
출처