전체 글 552

[Spring for Apache kafka] Apache Kafka Streams Support

1. Basics주요 구성 요소구성요소설명StreamsBuilder스트림 처리 로직을 정의하는 객체 생성 (KStream, KTable 등)KafkaStreamsStreamsBuilder로 만든 스트림들을 실제로 실행하는 객체- kafka 클러스터와 연결- 토폴로지를 실행하고 스트림 처리를 시작/종료함 동작 흐름StreamsBuilder: Stream 처리 로직 정의StreamsConfig: 설정 (Kafka 클러스터 정보, 기본 직렬화/역직렬화 설정, 보안 설정 등)KafkaStreams: 스트림 처리 시작KafkaStreams: 스트림 처리 종료 2. Spring ManagementStreamsBuilderFactoryBeanStreamsBuilder를 스프링 빈으로 등록하고 Spring 컨텍스트에 ..

카테고리 없음 2025.05.09

[Spring for Apache Kafka] Exactly Once Semantics

1. Exactly Once Semantics읽기 → 처리 → 쓰기 과정이 정확히 한 번만 실행되는 것을 보장 2. 처리 흐름컨슈머KafkaAwareTransactionManager를 리스너 컨테이너에 설정하면, 리스너 호출 전에 트랜잭션을 시작합니다.리스너 내부에서 수행하는 KafkaTemplate 작업들은 트랜잭션에 참여합니다.리스너가 정상적으로 처리하면,producer.sendOffsetsToTransaction()를 호출해서 컨슈머 오프셋을 트랜잭션에 포함시킵니다.이후 트랜잭션을 커밋합니다.리스너에서 예외가 발생하면,트랜잭션을 롤백하고,컨슈머는 오프셋을 다시 설정해서, 실패한 레코드를 다음 poll() 때 다시 읽어 재처리합니다 3. 셋팅spring: kafka: producer: ..

카테고리 없음 2025.05.08

[Spring for Apache Kafka] Transactions

1. 트랜잭션 활성화 방법DefaultKafkaProducerFactory의 transactionIdPrefix 설정트랜잭션이 활성화됨트랜잭션 전용 Producer 캐시를 유지함각 프로듀서의 transactional.id는 transactionIdPrefix + 번호(n) 형식으로 생성됨spring boot는 spring.kafka.producer.transaction-id-prefix 만 설정하면 됨 2. KafkaTransactionManagerSpring의 PlatformTransactionManager를 구현한 클래스Spring 트랜잭션 지원 방식과 함께 사용이 가능함 (@Transactional, TransactionTemplate 등)KafkaTemplate의 모든 작업은 트랜잭션 범위 안에서..

카테고리 없음 2025.05.08

[실전 카프카 개발부터 운영까지] 6. 컨슈머의 내부 동작 원리와 구현

1. 컨슈머 오프셋 관리오프셋컨슈머가 읽은 메시지 위치를 나타내는 번호다음에 어디부터 읽을지 알 수 있음 __consumer_offest 토픽항목설명역할각 컨슈머 그룹이 어떤 파티션의 몇 번째 메시지까지 읽었는지 기록함포맷key: (group.id, topic, partition), value: offset파티션 수 조정offsets.topic.num.partitions (기본: 50)복제 수 조정offsets.topic.replication.factor (기본: 3 추천) 기본 동작컨슈머지정된 토픽의 메시지를 읽음읽은 메시지의 오프셋 정보를 __consumer_offest에 기록컨슈머 그룹컨슈머들의 오프셋 추적장애 시, 지정된 오프셋을 통해 복구 2. 그룹 코디네이터그룹 코디네이터컨슈머 그룹의 구성,..

카테고리 없음 2025.05.08

[실전 카프카 개발부터 운영까지] 5. 프로듀서의 내부 동작 원리와 구현

1. 파티셔너프로듀서가 메시지를 어느 파티션에 보낼지 결정하는 컴포넌트기본적으로 키 값을 해싱하여 파티션을 결정함하지만 키가 없으면 전략에 따라 파티션이 선택됨 주의사항동적으로 파티션이 증가할 경우, 기존의 키와 파티션의 매핑이 일치하지 않을 수 있음되도록이면 파티션 수를 변경하지 않는것이 권장됨 전략구분라운드 로빈 전략스티키 파티셔닝 전략전송 방식순차적으로 여러 파티션에 균등 분배하나의 파티션에 몰아서 일정량의 레코드를 채운 뒤 전송장점데이터가 파티션에 고르게 분산됨배치 전송이 빠르고 효율이 높음단점파티션별로 데이터가 얇게 퍼져 버퍼가 빨리 안 채워짐 파티션 간 데이터 분포가 고르지 않을 수 있음배치 효율낮음 (파티션별 최소 레코드 수 미달로 대기 → 비효율)높음 (한 파티션에 집중해 빠르게 배치 전송..

카테고리 없음 2025.05.07

[Spring for Apache Kafka] 3-3. Receiving Messages: Post-Processing

1. Forwarding Listener Results using @SendTo@KafkaListener에 @SendTo 애너테이션을 함께 사용하면, 리스너 메소드의 반환값을 지정된 토픽으로 자동 전달함 토픽 지정유형설명예시고정된 토픽애플리케이션 초기화 시 지정된 토픽에 메시지 전달@SendTo("someTopic")애플리케이션 초기화 시 결정되는 SpEL애플리케이션 초기화 시 평가되어 결정됨@SendTo("#{someExpression}")실행 시 결정되는 SpEL동적으로 토픽을 결정 (request, source, result 등 사용 가능)@SendTo("!{request.value()}")빈 토픽 이름!{source.headers['kafka_replyTopic']}로 간주@SendTo() 설정더..

카테고리 없음 2025.05.07

[Spring for Apache Kafka] 3-2. Receiving Messages: @KafkaListener

1. @KafkaListener AnnotationBean 메서드를 Kafka 리스너로 지정할 때 사용Listener Container 안에서 실행되며, 내부적으로 MessagingMessageListenerAdapter로 감싸집니다메서드 파라미터에 맞게 Converter 등을 설정해줌대부분의 속성들은 SpEL(#{...}) 또는 **프로퍼티 플레이스홀더(${...})**를 이용해 동적으로 지정할 수 있음 Record Listeners@KafkaListener를 붙이면 적용됨Listener Container Factory 필요 (이름이 kafkaListenerContainerFactory인 빈을 찾음) 예제더보기public class Listener { @KafkaListener(id = "foo"..

카테고리 없음 2025.05.07

[Spring for Apache Kafka] 4. Application Events

1. 이벤트Spring Kafka는 여러 소비자 관련 이벤트를 발행함카테고리이벤트명설명주요 속성소비자 생애주기ConsumerStartingEvent소비자 스레드가 생성될 때 발생(poll 시작 전)container, sourceConsumerStartedEvent소비자가 실제로 폴링을 시작하려고 할 때 발생container, sourceConsumerFailedToStartEvent소비자가 시작되지 못한 경우 발생(consumerStartTimeout 초과)container, sourceConsumerStoppingEvent소비자가 정지 직전에 발생container, partitionsConsumerStoppedEvent소비자가 정지된 후 발생container, source, reasonConsumerR..

카테고리 없음 2025.05.06

[Spring for Apache Kafka] 3-1. Receiving Messages: Message Listener

이 문서는 해당 섹션의 일부분만 정리되었음. 1. Message ListenersKafka에서 메시지를 처리하려면 리스너를 제공해야 함인터페이스설명MessageListener개별 메시지 처리AcknowledgingMessageListener메시지 처리 후 수동으로 오프셋을 커밋ConsumerAwareMessageListener메시지 처리 시 Consumer 객체 접근BatchMessageListener배치 메시지 처리BatchAcknowledgingMessageListener배치 메시지 처리 후 수동 커밋 예제더보기더보기MessageListener@KafkaListener(topics = "topic1")public void onMessage(ConsumerRecord record) { System..

카테고리 없음 2025.05.06

[Spring for Apache Kafka] 2. Sending Messages

1. DefaultKafkaProducerFactory기본적으로 모든 클라이언트에서 공유하는 단일 Kafka 프로듀서를 생성 설정) DefaultKafkaProducerFactory더보기@Configurationpublic class KafkaProducerConfig { @Bean public DefaultKafkaProducerFactory producerFactory() { Map configs = new HashMap(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONF..

카테고리 없음 2025.05.06