1. 파티셔너
- 프로듀서가 메시지를 어느 파티션에 보낼지 결정하는 컴포넌트
- 기본적으로 키 값을 해싱하여 파티션을 결정함
- 하지만 키가 없으면 전략에 따라 파티션이 선택됨
주의사항
- 동적으로 파티션이 증가할 경우, 기존의 키와 파티션의 매핑이 일치하지 않을 수 있음
- 되도록이면 파티션 수를 변경하지 않는것이 권장됨
전략
| 구분 | 라운드 로빈 전략 | 스티키 파티셔닝 전략 | 
| 전송 방식 | 순차적으로 여러 파티션에 균등 분배 | 하나의 파티션에 몰아서 일정량의 레코드를 채운 뒤 전송 | 
| 장점 | 데이터가 파티션에 고르게 분산됨 | 배치 전송이 빠르고 효율이 높음 | 
| 단점 | 파티션별로 데이터가 얇게 퍼져 버퍼가 빨리 안 채워짐 | 파티션 간 데이터 분포가 고르지 않을 수 있음 | 
| 배치 효율 | 낮음 (파티션별 최소 레코드 수 미달로 대기 → 비효율) | 높음 (한 파티션에 집중해 빠르게 배치 전송) | 
2. 프로듀서의 배치
- 프로듀서는 처리량 증가를 위해 여러 레코드를 묶어서 한 번에 전송
- 메시지는 파티션별로 분류되어 내부 버퍼에 일시 저장됨
- 장점: 네트워크 전송 효율 상승. 처리량 상승
- 단점: 대기 시간 발생
옵션
| 옵션 | 설명 | 
| buffer.memory | 프로듀서 내부 버퍼 메모리 총 크기 (모든 파티션 합산) | 
| batch.size | 파티션별로 묶어서 보내는 1회 배치 크기 (bytes 단위) | 
| linger.ms | 배치 전송 전, 최대 대기 시간 (ms 단위)  - batch.size를 채우지 못해도 이 시간 지나면 전송 | 
3. 중복 없는 전송 (Exactly Once Delivery)
메시지 전송 방식
| 전송 방식 | 재전송 | 중복 가능성 | 손실 가능성 | 특징 | 
| at least once | O | O | X | 데이터 절대 잃지 않음 중복 가능성 있음 | 
| at most once | X | X | O | 중복 없음 일부 데이터 손실 가능 | 
| exactly once | O | X | X | 중복도, 손실도 없음 | 
Idempotence
- 메시지가 여러 번 전송되어도 브로커가 중복임을 인식하고 한 번만 처리하게끔 하는 기능
구현 원리
- Producer마다, PID(Producer ID)를 부여하고 각 메시지에는 시퀀스 번호를 붙여서 전송
- 브로커는 이 정보를 바탕으로 메시지 중복 여부를 판단함 (이미 처리한 시퀀스 번호면 무시함)
설정
| 옵션 | 설명 | 
| enable.idempotence | 중복 제거 기능 활성화 | 
| max.in.flight.requests.per.connection | 동시에 보낼 수 있는 메시지 개수 제한 (순서 보장) - 5 이하 값만 idempotence 충족 | 
| acks | 프로듀서가 메시지를 성공으로 간주하기 위해 요구하는 브로커의 응답 수준  - 0: ACK 응답 기다리지 않음 - 1: 리더 브로커가 저장하면 ACK - all: 리더 + 팔로워까지 저장해야 ACK (idempotent 충족) | 
| retries | 메시지 전송 실패 시, 프로듀서가 재전송을 시도하는 최대 횟수 | 
4. 트랜잭션
- 여러 메시지를 하나의 원자적 단위로 처리하기 위한 기능
트랜잭션 코디네이터
- 트랜잭션을 관리하는 브로커 컴포넌트
| 역할 / 기능 | 상세 설명 | 
| 트랜잭션 식별자 관리 | transactional.id를 기반으로 각 프로듀서에 PID와 에포크 번호를 부여함 | 
| 트랜잭션 로그 기록 | __transaction_state에 트랜잭션 상태 기록 | 
| 트랜잭션 상태 관리 | 상태 전이 관리 (트랜잭션 시작, 커밋 준비, 커밋 완료, 중단 등) | 
| 참여 파티션 등록 | 트랜잭션에 포함된 토픽 파티션을 등록하여 어떤 메시지가 트랜잭션 범위에 포함되는지 추적함 | 
| 컨트롤 메시지 전송 | 트랜잭션의 커밋/중단 상태를 각 파티션 브로커에 컨트롤 메시지로 전달함 | 
| 장애 복구 지원 | 로그에 기록된 트랜잭션 상태를 기반으로 장애 발생 시 재시작하거나 롤백을 수행함 | 
| 원자성 보장 | 트랜잭션에 포함된 모든 메시지의 성공/실패 여부를 일관되게 보장함 (Exactly Once의 핵심 기능) | 
옵션
| 항목 | 설명 | 
| transaction.state.log.num.partitions | 트랜잭션 로그 파티션 수 | 
| transaction.state.log.replication.factor | 트랜잭션 로그 복제 수 | 
설정
더보기
spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      retries: 5
      buffer-memory: 33554432
      batch-size: 16384
      transaction-id-prefix: tx
      properties:
        enable.idempotence: true
        max.in.flight.requests.per.connection: 5
        linger.ms: 100- transaction.id를 설정해야 함
- 중복없는 전송을 지원해야 함
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager(ProducerFactory<String, Object> producerFactory) {
    return new KafkaTransactionManager<>(producerFactory);
}
코드
더보기
@MessageMapping("/chat.send")
@Transactional("kafkaTransactionManager")
public void sendMessageWithKafka(GroupChatMessageRequest message) {
    String key = message.groupId();
    kafkaTemplate.send(chatTopicName, key, message);
    kafkaTemplate.send(chatTopicName, key, message);
}
단계별 동작
- 트랜잭션 코디네이터 찾기
- 프로듀서: 브로커에게 transactional.id로 FindCoordinatorRequest 요청
- (트랜잭션 코디네이터가 존재하지 않는다면, 신규 트랜잭션 코디네이터가 생성됨)
 
- 트랜잭션 초기화 (initTransactions())
- 프로듀서: 트랜잭션 코디네이터에게 InitPidRequest 요청
- 트랜잭션 코디네이터: PID, Epoch 부여. 이를 트랜잭션 로그에 기록함 (PID Epoch 증가)
 
- 트랜잭션 시작 (beginTransaction())
- 프로듀서: 트랜잭션 시작 (내부적으로 트랜잭션 상태를 관리함)
- 프로듀서: 트랜잭션 코디네이터에게 트랜잭션 시작 사실을 알림
 
- 트랜잭션에 참여하는 파티션 등록
- 프로듀서: 토픽 파티션 정보를 트랜잭션 코디네이터에게 전달
- 트랜잭션 코디네이터: 해당 정보를 트랜잭션 로그에 기록
 
- 메시지 전송
- 프로듀서: 대상 토픽의 파티션에 실제 메시지 전송 (PID, Epoch, 시퀀스 번호 포함)
 
- 트랜잭션 준비 (commitTransaction() or abortTransaction())
- 프로듀서: 트랜잭션 코디네이터에게 트랜잭션 준비 요청
- 트랜잭션 코디네이터: 트랜잭션 로그에 상태 기록 (PrepareCommit or PrepareAbort)
- 트랜잭션 코디네이터: 관련 토픽 파티션에 컨트롤 메시지 전송 (PrepareCommit or PrepareAbort)
 
- 트랜잭션 커밋
 - 프로듀서: 트랜잭션 코디네이터에게 트랜잭션 완료 요청
- 트랜잭션 코디네이터: 관련 토픽 파티션에 컨트롤 메시지 전송 (CommitMarker or AbortMarker)
- 트랜잭션 코디네이터: 트랜잭션 로그에 상태 기록 (CompleteCommit or CompleteAbort)
 
- 트랜잭선 종료
예제
더보기
/opt/bitnami/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic __transaction_state \
--consumer.config /bitnami/kafka/config/consumer.config \
--formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter" --from-beginningtx0::TransactionMetadata(transactionalId=tx0, producerId=12000, producerEpoch=0, txnTimeoutMs=60000, state=Empty, pendingState=None, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1746652130343)
tx0::TransactionMetadata(transactionalId=tx0, producerId=12000, producerEpoch=0, txnTimeoutMs=60000, state=Ongoing, pendingState=None, topicPartitions=Set(topic_chat-1), txnStartTimestamp=1746652130412, txnLastUpdateTimestamp=1746652130412)
tx0::TransactionMetadata(transactionalId=tx0, producerId=12000, producerEpoch=0, txnTimeoutMs=60000, state=PrepareCommit, pendingState=None, topicPartitions=Set(topic_chat-1), txnStartTimestamp=1746652130412, txnLastUpdateTimestamp=1746652130484)
tx0::TransactionMetadata(transactionalId=tx0, producerId=12000, producerEpoch=0, txnTimeoutMs=60000, state=CompleteCommit, pendingState=None, topicPartitions=Set(), txnStartTimestamp=1746652130412, txnLastUpdateTimestamp=1746652130492)- Empty: 트랜잭션 초기화
- Ongoing: 상태 표시 및 메시지 전송
- PrepareCommit: 파티션으로 메시지 전송 완료 및 트랜잭션 종료 요청 완료
- CompleteCommit: 트랜잭션 단계가 최종적으로 완료
'DevOps > Kafka' 카테고리의 다른 글
| [Kafka] 9. Kafka Streams (0) | 2025.05.12 | 
|---|---|
| [실전 카프카 개발부터 운영까지] 6. 컨슈머의 내부 동작 원리와 구현 (0) | 2025.05.08 | 
| [실전 카프카 개발부터 운영까지] 4. 카프카의 내부 동작 원리와 구현 (0) | 2025.05.06 | 
| [실전 카프카 개발부터 운영까지] 3. 카프카 기본 개념과 구조 (0) | 2025.05.03 | 
| [실전 카프카 개발부터 운영까지] 1. 카프카 개요 (0) | 2025.05.02 |