카테고리 없음

[Spring for Apache Kafka] 2. Sending Messages

noahkim_ 2025. 5. 6. 18:58
 

1. DefaultKafkaProducerFactory

  • 기본적으로 모든 클라이언트에서 공유하는 단일 Kafka 프로듀서를 생성

 

설정) DefaultKafkaProducerFactory

더보기
@Configuration
public class KafkaProducerConfig {
    
    @Bean
    public DefaultKafkaProducerFactory<String, String> producerFactory() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put("producerPerThread", true); // 스레드별 프로듀서 생성
        return new DefaultKafkaProducerFactory<>(configs);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 

2. KafkaTemplate

  • Kafka 토픽에 데이터를 전송하는 편리한 메서드 제공
  • Kafka 프로듀서를 감싸고 있음

 

예시) KafkaTemplate

더보기
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    CompletableFuture<SendResult<String, String>> future = template.send(record);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            handleSuccess(data);
        }
        else {
            handleFailure(data, record, ex);
        }
    });
}
send()
  • 메시지 전송은 비동기적으로 이루어짐 (CompletableFuture<SendResult> 반환)
  • ProducerListener를 통해 성공 및 실패 콜백을 처리할 수 있음. 
  • 기본적으로 LoggingProducerListener가 제공됨. (전송 실패 시 오류를 로그로 남김)

 

3. RoutingKafkaTemplate

  • 실행 시점에 목적지 토픽 이름을 기준으로 ProducerFactory를 선택할 수 있는 기능을 제공
  • 단, 트랜잭션, 실행, 플러시 및 메트릭 작업 지원 ❌

 

설정) RoutingKafkaTemplate

더보기
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
        ProducerFactory<Object, Object> pf) {

    // Clone the PF with a different Serializer, register with Spring for shutdown
    Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
    context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);

    Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
    map.put(Pattern.compile("two"), bytesPF);
    map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
    return new RoutingKafkaTemplate(map);
}

 

4. ReplyingKafkaTemplate

  • 헤더 기반으로 요청-응답 패턴이 이루어짐
  • 요청 서버와 응답 서버가 명확히 분리될 때 사용됩니다. (같은 서버에서 사용하면 무한루프 남)
헤더명 역할
KafkaHeaders.CORRELATION_ID
요청과 응답을 연결(매칭)하기 위한 ID.
즉, "이 응답은 이 요청에 대한 것"임을 알기 위함
KafkaHeaders.REPLY_TOPIC
서버(Consumer)가 응답을 어디(어떤 토픽)로 보내야 할지 지정
KafkaHeaders.REPLY_PARTITION
(옵션) 서버가 응답을 보낼 때 특정 파티션을 지정할 수 있음

 

예제) ReplyingKafkaTemplate

더보기
@Autowired
private ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

// 요청 보내고 응답 받기
public String sendRequestAndGetReply() throws Exception {
    // 요청 메시지 만들기
    ProducerRecord<String, String> record =
            new ProducerRecord<>("request-topic", "hello");

    // 응답 토픽과 Correlation ID 설정
    record.headers().add(KafkaHeaders.REPLY_TOPIC, "reply-topic".getBytes());
    
    // sendAndReceive 호출 -> 요청 보내고 응답 비동기 기다림
    RequestReplyFuture<String, String, String> future = replyingKafkaTemplate.sendAndReceive(record);
    
    // 응답 받기
    ConsumerRecord<String, String> response = future.get();
    return response.value();  // 예: "HELLO"
}
  • sendAndReceive(): 응답을 CompletableFuture로 비동기적으로 받을 수 있습니다

@SendTo

@KafkaListener(topics = "request-topic")
@SendTo("reply-topic")  // 응답을 이 토픽으로 보내라
public String handleRequest(String message) {
    System.out.println("요청 받은 메시지: " + message);
    return message.toUpperCase();  // 예: "HELLO"
}

 

Reply Type Message<?>

  • @KafkaListener가 응답 메시지를 발행할 때 사용하는 타입
  • 헤더 설정으로 요청-응답 패턴 구현
Spring Kafka 버전 동작 요약
2.5 이전
직접 reply topic, correlation id 세팅 필요
2.5 이후
reply topic, correlation id 자동 세팅
3.3 이후
요청 메시지의 key도 reply에 자동 복사

 

예제) Message

더보기

2.5 이전

@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)          // 답장 보낼 토픽
            .setHeader(KafkaHeaders.KEY, 42)                 // 답장 메시지에 사용할 키
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation) // 요청과 답장을 연결할 ID
            .build();
}

 

2.5 이후

@KafkaListener(id = "requestor", topics = "request")
@SendTo // @SendTo에 reply topic을 지정하거나, 요청 메시지에 REPLY_TOPIC이 있으면 자동 세팅
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.KEY, 42) // 답장 키만 설정, 나머지는 자동
            .build();
}

  

5. AggregatingReplyingKafkaTemplate

  • 여러 응답을 일정 크기나 조건에 맞춰 집계한 후 하나의 응답으로 프로듀싱하는 Kafka 클라이언트
설정 항목 설명
releaseStrategy
응답을 집계하고 완료할 시점을 결정하는 전략
- 예: 응답 수가 일정 크기에 도달하면 결과 반환
returnPartialOnTimeout
타임아웃이 발생해도 일부 응답만으로 결과를 반환하도록 설정. (기본값: false)
AckMode.MANUAL
오프셋을 수동으로 커밋하도록 설정.
- 응답이 모두 집계되었을 때만 커밋.

 

예제) AggregatingReplyingKafkaTemplate

더보기
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
    new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
        coll -> coll.size() >= 3,   // releaseStrategy: 3개의 응답을 받은 후 결과 반환
        true);                      // returnPartialOnTimeout: 타임아웃 발생해도 일부 응답 반환       

RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
    template.sendAndReceive(new ProducerRecord<>("requestTopic", 1, "RequestData"));

// 3개의 응답이 수집될 때까지 기다림
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord = future.get(30, TimeUnit.SECONDS);
  • 응답: ConsumerRecord의 컬렉션으로 수집하여 반환

 

ListenerContainerProperties properties = new ListenerContainerProperties();
properties.setAckMode(AckMode.MANUAL); // 수동으로 오프셋을 커밋

 


출처