Spring/Spring for Apache Kafka 9

[Spring for Apache kafka] Apache Kafka Streams Support

1. Basics주요 구성 요소구성요소설명StreamsBuilder스트림 처리 로직을 정의하는 객체 생성 (KStream, KTable 등)KafkaStreamsStreamsBuilder로 만든 스트림들을 실제로 실행하는 객체- kafka 클러스터와 연결- 토폴로지를 실행하고 스트림 처리를 시작/종료함 동작 흐름StreamsBuilder: Stream 처리 로직 정의StreamsConfig: 설정 (Kafka 클러스터 정보, 기본 직렬화/역직렬화 설정, 보안 설정 등)KafkaStreams: 스트림 처리 시작KafkaStreams: 스트림 처리 종료 2. Spring ManagementStreamsBuilderFactoryBeanStreamsBuilder를 스프링 빈으로 등록하고 Spring 컨텍스트에 ..

[Spring for Apache Kafka] Exactly Once Semantics

1. Exactly Once Semantics읽기 → 처리 → 쓰기 과정이 정확히 한 번만 실행되는 것을 보장 2. 처리 흐름컨슈머KafkaAwareTransactionManager를 리스너 컨테이너에 설정하면, 리스너 호출 전에 트랜잭션을 시작합니다.리스너 내부에서 수행하는 KafkaTemplate 작업들은 트랜잭션에 참여합니다.리스너가 정상적으로 처리하면,producer.sendOffsetsToTransaction()를 호출해서 컨슈머 오프셋을 트랜잭션에 포함시킵니다.이후 트랜잭션을 커밋합니다.리스너에서 예외가 발생하면,트랜잭션을 롤백하고,컨슈머는 오프셋을 다시 설정해서, 실패한 레코드를 다음 poll() 때 다시 읽어 재처리합니다 3. 셋팅spring: kafka: producer: ..

[Spring for Apache Kafka] Transactions

1. 트랜잭션 활성화 방법DefaultKafkaProducerFactory의 transactionIdPrefix 설정트랜잭션이 활성화됨트랜잭션 전용 Producer 캐시를 유지함각 프로듀서의 transactional.id는 transactionIdPrefix + 번호(n) 형식으로 생성됨spring boot는 spring.kafka.producer.transaction-id-prefix 만 설정하면 됨 2. KafkaTransactionManagerSpring의 PlatformTransactionManager를 구현한 클래스Spring 트랜잭션 지원 방식과 함께 사용이 가능함 (@Transactional, TransactionTemplate 등)KafkaTemplate의 모든 작업은 트랜잭션 범위 안에서..

[Spring for Apache Kafka] 3-3. Receiving Messages: Post-Processing

1. Forwarding Listener Results using @SendTo@KafkaListener에 @SendTo 애너테이션을 함께 사용하면, 리스너 메소드의 반환값을 지정된 토픽으로 자동 전달함 토픽 지정유형설명예시고정된 토픽애플리케이션 초기화 시 지정된 토픽에 메시지 전달@SendTo("someTopic")애플리케이션 초기화 시 결정되는 SpEL애플리케이션 초기화 시 평가되어 결정됨@SendTo("#{someExpression}")실행 시 결정되는 SpEL동적으로 토픽을 결정 (request, source, result 등 사용 가능)@SendTo("!{request.value()}")빈 토픽 이름!{source.headers['kafka_replyTopic']}로 간주@SendTo() 설정더..

[Spring for Apache Kafka] 3-2. Receiving Messages: @KafkaListener

1. @KafkaListener AnnotationBean 메서드를 Kafka 리스너로 지정할 때 사용Listener Container 안에서 실행되며, 내부적으로 MessagingMessageListenerAdapter로 감싸집니다메서드 파라미터에 맞게 Converter 등을 설정해줌대부분의 속성들은 SpEL(#{...}) 또는 **프로퍼티 플레이스홀더(${...})**를 이용해 동적으로 지정할 수 있음 Record Listeners@KafkaListener를 붙이면 적용됨Listener Container Factory 필요 (이름이 kafkaListenerContainerFactory인 빈을 찾음) 예제더보기public class Listener { @KafkaListener(id = "foo"..

[Spring for Apache Kafka] 4. Application Events

1. 이벤트Spring Kafka는 여러 소비자 관련 이벤트를 발행함카테고리이벤트명설명주요 속성소비자 생애주기ConsumerStartingEvent소비자 스레드가 생성될 때 발생(poll 시작 전)container, sourceConsumerStartedEvent소비자가 실제로 폴링을 시작하려고 할 때 발생container, sourceConsumerFailedToStartEvent소비자가 시작되지 못한 경우 발생(consumerStartTimeout 초과)container, sourceConsumerStoppingEvent소비자가 정지 직전에 발생container, partitionsConsumerStoppedEvent소비자가 정지된 후 발생container, source, reasonConsumerR..

[Spring for Apache Kafka] 3-1. Receiving Messages: Message Listener

이 문서는 해당 섹션의 일부분만 정리되었음. 1. Message ListenersKafka에서 메시지를 처리하려면 리스너를 제공해야 함인터페이스설명MessageListener개별 메시지 처리AcknowledgingMessageListener메시지 처리 후 수동으로 오프셋을 커밋ConsumerAwareMessageListener메시지 처리 시 Consumer 객체 접근BatchMessageListener배치 메시지 처리BatchAcknowledgingMessageListener배치 메시지 처리 후 수동 커밋 예제더보기더보기MessageListener@KafkaListener(topics = "topic1")public void onMessage(ConsumerRecord record) { System..

[Spring for Apache Kafka] 2. Sending Messages

1. DefaultKafkaProducerFactory기본적으로 모든 클라이언트에서 공유하는 단일 Kafka 프로듀서를 생성 설정) DefaultKafkaProducerFactory더보기@Configurationpublic class KafkaProducerConfig { @Bean public DefaultKafkaProducerFactory producerFactory() { Map configs = new HashMap(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONF..

[Spring for Apache Kafka] 1. Basic

1. Connecting to KafkaFactory구성 요소역할ProducerFactory메시지 전송을 위한 팩토리 생성ConsumerFactory메시지 수신을 위한 팩토리 생성 FactoryListenerFactory의 Listener는 프로듀서/컨슈머가 만들어지거나 닫힐 때 호출하는 메서드를 정의한 인터페이스생성된 프로듀서/컨슈머의 bean id는 factory의 beanName + kafka client id 패턴으로 정의됩니다. 예제더보기public class MyProducerFactoryListener implements ProducerFactory.Listener { @Override public void producerAdded(String id, Producer produce..