1. 파티셔너
- 프로듀서가 메시지를 어느 파티션에 보낼지 결정하는 컴포넌트
- 기본적으로 키 값을 해싱하여 파티션을 결정함
- 하지만 키가 없으면 전략에 따라 파티션이 선택됨
주의사항
- 동적으로 파티션이 증가할 경우, 기존의 키와 파티션의 매핑이 일치하지 않을 수 있음
- 되도록이면 파티션 수를 변경하지 않는것이 권장됨
전략
구분 | 라운드 로빈 전략 | 스티키 파티셔닝 전략 |
전송 방식 | 순차적으로 여러 파티션에 균등 분배 | 하나의 파티션에 몰아서 일정량의 레코드를 채운 뒤 전송 |
장점 | 데이터가 파티션에 고르게 분산됨 | 배치 전송이 빠르고 효율이 높음 |
단점 | 파티션별로 데이터가 얇게 퍼져 버퍼가 빨리 안 채워짐 | 파티션 간 데이터 분포가 고르지 않을 수 있음 |
배치 효율 | 낮음 (파티션별 최소 레코드 수 미달로 대기 → 비효율) | 높음 (한 파티션에 집중해 빠르게 배치 전송) |
2. 프로듀서의 배치
- 프로듀서는 처리량 증가를 위해 여러 레코드를 묶어서 한 번에 전송
- 메시지는 파티션별로 분류되어 내부 버퍼에 일시 저장됨
- 장점: 네트워크 전송 효율 상승. 처리량 상승
- 단점: 대기 시간 발생
옵션
옵션 | 설명 |
buffer.memory |
프로듀서 내부 버퍼 메모리 총 크기 (모든 파티션 합산)
|
batch.size |
파티션별로 묶어서 보내는 1회 배치 크기 (bytes 단위)
|
linger.ms |
배치 전송 전, 최대 대기 시간 (ms 단위)
- batch.size를 채우지 못해도 이 시간 지나면 전송 |
3. 중복 없는 전송 (Exactly Once Delivery)
메시지 전송 방식
전송 방식 | 재전송 | 중복 가능성 | 손실 가능성 | 특징 |
at least once | O | O | X |
데이터 절대 잃지 않음
중복 가능성 있음 |
at most once | X | X | O |
중복 없음
일부 데이터 손실 가능 |
exactly once | O | X | X | 중복도, 손실도 없음 |
Idempotence
- 메시지가 여러 번 전송되어도 브로커가 중복임을 인식하고 한 번만 처리하게끔 하는 기능
구현 원리
- Producer마다, PID(Producer ID)를 부여하고 각 메시지에는 시퀀스 번호를 붙여서 전송
- 브로커는 이 정보를 바탕으로 메시지 중복 여부를 판단함 (이미 처리한 시퀀스 번호면 무시함)
설정
옵션 | 설명 |
enable.idempotence | 중복 제거 기능 활성화 |
max.in.flight.requests.per.connection | 동시에 보낼 수 있는 메시지 개수 제한 (순서 보장) - 5 이하 값만 idempotence 충족 |
acks |
프로듀서가 메시지를 성공으로 간주하기 위해 요구하는 브로커의 응답 수준
- 0: ACK 응답 기다리지 않음 - 1: 리더 브로커가 저장하면 ACK - all: 리더 + 팔로워까지 저장해야 ACK (idempotent 충족) |
retries | 메시지 전송 실패 시, 프로듀서가 재전송을 시도하는 최대 횟수 |
4. 트랜잭션
- 여러 메시지를 하나의 원자적 단위로 처리하기 위한 기능
트랜잭션 코디네이터
- 트랜잭션을 관리하는 브로커 컴포넌트
역할 / 기능 | 상세 설명 |
트랜잭션 식별자 관리 |
transactional.id를 기반으로 각 프로듀서에 PID와 에포크 번호를 부여함
|
트랜잭션 로그 기록 |
__transaction_state에 트랜잭션 상태 기록
|
트랜잭션 상태 관리 |
상태 전이 관리 (트랜잭션 시작, 커밋 준비, 커밋 완료, 중단 등)
|
참여 파티션 등록 |
트랜잭션에 포함된 토픽 파티션을 등록하여 어떤 메시지가 트랜잭션 범위에 포함되는지 추적함
|
컨트롤 메시지 전송 |
트랜잭션의 커밋/중단 상태를 각 파티션 브로커에 컨트롤 메시지로 전달함
|
장애 복구 지원 |
로그에 기록된 트랜잭션 상태를 기반으로 장애 발생 시 재시작하거나 롤백을 수행함
|
원자성 보장 |
트랜잭션에 포함된 모든 메시지의 성공/실패 여부를 일관되게 보장함 (Exactly Once의 핵심 기능)
|
옵션
항목 | 설명 |
transaction.state.log.num.partitions | 트랜잭션 로그 파티션 수 |
transaction.state.log.replication.factor | 트랜잭션 로그 복제 수 |
설정
더보기
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 5
buffer-memory: 33554432
batch-size: 16384
transaction-id-prefix: tx
properties:
enable.idempotence: true
max.in.flight.requests.per.connection: 5
linger.ms: 100
- transaction.id를 설정해야 함
- 중복없는 전송을 지원해야 함
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager(ProducerFactory<String, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
코드
더보기
@MessageMapping("/chat.send")
@Transactional("kafkaTransactionManager")
public void sendMessageWithKafka(GroupChatMessageRequest message) {
String key = message.groupId();
kafkaTemplate.send(chatTopicName, key, message);
kafkaTemplate.send(chatTopicName, key, message);
}
단계별 동작
- 트랜잭션 코디네이터 찾기
- 프로듀서: 브로커에게 transactional.id로 FindCoordinatorRequest 요청
- (트랜잭션 코디네이터가 존재하지 않는다면, 신규 트랜잭션 코디네이터가 생성됨)
- 트랜잭션 초기화 (initTransactions())
- 프로듀서: 트랜잭션 코디네이터에게 InitPidRequest 요청
- 트랜잭션 코디네이터: PID, Epoch 부여. 이를 트랜잭션 로그에 기록함 (PID Epoch 증가)
- 트랜잭션 시작 (beginTransaction())
- 프로듀서: 트랜잭션 시작 (내부적으로 트랜잭션 상태를 관리함)
- 프로듀서: 트랜잭션 코디네이터에게 트랜잭션 시작 사실을 알림
- 트랜잭션에 참여하는 파티션 등록
- 프로듀서: 토픽 파티션 정보를 트랜잭션 코디네이터에게 전달
- 트랜잭션 코디네이터: 해당 정보를 트랜잭션 로그에 기록
- 메시지 전송
- 프로듀서: 대상 토픽의 파티션에 실제 메시지 전송 (PID, Epoch, 시퀀스 번호 포함)
- 트랜잭션 준비 (commitTransaction() or abortTransaction())
- 프로듀서: 트랜잭션 코디네이터에게 트랜잭션 준비 요청
- 트랜잭션 코디네이터: 트랜잭션 로그에 상태 기록 (PrepareCommit or PrepareAbort)
- 트랜잭션 코디네이터: 관련 토픽 파티션에 컨트롤 메시지 전송 (PrepareCommit or PrepareAbort)
- 트랜잭션 커밋
- 프로듀서: 트랜잭션 코디네이터에게 트랜잭션 완료 요청
- 트랜잭션 코디네이터: 관련 토픽 파티션에 컨트롤 메시지 전송 (CommitMarker or AbortMarker)
- 트랜잭션 코디네이터: 트랜잭션 로그에 상태 기록 (CompleteCommit or CompleteAbort)
- 트랜잭선 종료
예제
더보기
/opt/bitnami/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic __transaction_state \
--consumer.config /bitnami/kafka/config/consumer.config \
--formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter" --from-beginning
tx0::TransactionMetadata(transactionalId=tx0, producerId=12000, producerEpoch=0, txnTimeoutMs=60000, state=Empty, pendingState=None, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1746652130343)
tx0::TransactionMetadata(transactionalId=tx0, producerId=12000, producerEpoch=0, txnTimeoutMs=60000, state=Ongoing, pendingState=None, topicPartitions=Set(topic_chat-1), txnStartTimestamp=1746652130412, txnLastUpdateTimestamp=1746652130412)
tx0::TransactionMetadata(transactionalId=tx0, producerId=12000, producerEpoch=0, txnTimeoutMs=60000, state=PrepareCommit, pendingState=None, topicPartitions=Set(topic_chat-1), txnStartTimestamp=1746652130412, txnLastUpdateTimestamp=1746652130484)
tx0::TransactionMetadata(transactionalId=tx0, producerId=12000, producerEpoch=0, txnTimeoutMs=60000, state=CompleteCommit, pendingState=None, topicPartitions=Set(), txnStartTimestamp=1746652130412, txnLastUpdateTimestamp=1746652130492)
- Empty: 트랜잭션 초기화
- Ongoing: 상태 표시 및 메시지 전송
- PrepareCommit: 파티션으로 메시지 전송 완료 및 트랜잭션 종료 요청 완료
- CompleteCommit: 트랜잭션 단계가 최종적으로 완료
'Kafka' 카테고리의 다른 글
[실전 카프카 개발부터 운영까지] 6. 컨슈머의 내부 동작 원리와 구현 (0) | 2025.05.08 |
---|---|
[실전 카프카 개발부터 운영까지] 4. 카프카의 내부 동작 원리와 구현 (0) | 2025.05.06 |
[실전 카프카 개발부터 운영까지] 3. 카프카 기본 개념과 구조 (0) | 2025.05.03 |
[실전 카프카 개발부터 운영까지] 1. 카프카 개요 (0) | 2025.05.02 |