Kafka

[실전 카프카 개발부터 운영까지] 5. 프로듀서의 내부 동작 원리와 구현

noahkim_ 2025. 5. 7. 18:14

1. 파티셔너

  • 프로듀서가 메시지를 어느 파티션에 보낼지 결정하는 컴포넌트
  • 기본적으로 키 값을 해싱하여 파티션을 결정함
  • 하지만 키가 없으면 전략에 따라 파티션이 선택됨

 

주의사항

  • 동적으로 파티션이 증가할 경우, 기존의 키와 파티션의 매핑이 일치하지 않을 수 있음
  • 되도록이면 파티션 수를 변경하지 않는것이 권장됨

 

전략

구분 라운드 로빈 전략 스티키 파티셔닝 전략
전송 방식 순차적으로 여러 파티션에 균등 분배 하나의 파티션에 몰아서 일정량의 레코드를 채운 뒤 전송
장점 데이터가 파티션에 고르게 분산됨 배치 전송이 빠르고 효율이 높음
단점 파티션별로 데이터가 얇게 퍼져 버퍼가 빨리 안 채워짐  파티션 간 데이터 분포가 고르지 않을 수 있음
배치 효율 낮음 (파티션별 최소 레코드 수 미달로 대기 → 비효율) 높음 (한 파티션에 집중해 빠르게 배치 전송)

 

2. 프로듀서의 배치

  • 프로듀서는 처리량 증가를 위해 여러 레코드를 묶어서 한 번에 전송
  • 메시지는 파티션별로 분류되어 내부 버퍼에 일시 저장됨
  • 장점: 네트워크 전송 효율 상승. 처리량 상승
  • 단점: 대기 시간 발생

 

옵션

옵션 설명
buffer.memory
프로듀서 내부 버퍼 메모리 총 크기 (모든 파티션 합산)
batch.size
파티션별로 묶어서 보내는 1회 배치 크기 (bytes 단위)
linger.ms
배치 전송 전, 최대 대기 시간 (ms 단위)
- batch.size를 채우지 못해도 이 시간 지나면 전송

 

3. 중복 없는 전송 (Exactly Once Delivery)

메시지 전송 방식

전송 방식 재전송 중복 가능성 손실 가능성 특징
at least once O O X
데이터 절대 잃지 않음
중복 가능성 있음
at most once X X O
중복 없음
일부 데이터 손실 가능
exactly once O X X 중복도, 손실도 없음

 

Idempotence

  • 메시지가 여러 번 전송되어도 브로커가 중복임을 인식하고 한 번만 처리하게끔 하는 기능

 

구현 원리
  1. Producer마다, PID(Producer ID)를 부여하고 각 메시지에는 시퀀스 번호를 붙여서 전송
  2. 브로커는 이 정보를 바탕으로 메시지 중복 여부를 판단함 (이미 처리한 시퀀스 번호면 무시함)

 

설정

옵션 설명
enable.idempotence 중복 제거 기능 활성화
max.in.flight.requests.per.connection 동시에 보낼 수 있는 메시지 개수 제한 (순서 보장)
- 5 이하 값만 idempotence 충족
acks
프로듀서가 메시지를 성공으로 간주하기 위해 요구하는 브로커의 응답 수준
- 0: ACK 응답 기다리지 않음
- 1: 리더 브로커가 저장하면 ACK
- all: 리더 + 팔로워까지 저장해야 ACK (idempotent 충족)
retries 메시지 전송 실패 시, 프로듀서가 재전송을 시도하는 최대 횟수

 

4. 트랜잭션

  • 여러 메시지를 하나의 원자적 단위로 처리하기 위한 기능

 

트랜잭션 코디네이터

  • 트랜잭션을 관리하는 브로커 컴포넌트
역할 / 기능 상세 설명
트랜잭션 식별자 관리
transactional.id를 기반으로 각 프로듀서에 PID와 에포크 번호를 부여함
트랜잭션 로그 기록
__transaction_state에 트랜잭션 상태 기록
트랜잭션 상태 관리
상태 전이 관리 (트랜잭션 시작, 커밋 준비, 커밋 완료, 중단 등)
참여 파티션 등록
트랜잭션에 포함된 토픽 파티션을 등록하여 어떤 메시지가 트랜잭션 범위에 포함되는지 추적함
컨트롤 메시지 전송
트랜잭션의 커밋/중단 상태를 각 파티션 브로커에 컨트롤 메시지로 전달함
장애 복구 지원
로그에 기록된 트랜잭션 상태를 기반으로 장애 발생 시 재시작하거나 롤백을 수행함
원자성 보장
트랜잭션에 포함된 모든 메시지의 성공/실패 여부를 일관되게 보장함 (Exactly Once의 핵심 기능)

 

옵션

항목 설명
transaction.state.log.num.partitions 트랜잭션 로그 파티션 수
transaction.state.log.replication.factor 트랜잭션 로그 복제 수

 

설정

더보기
spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      retries: 5
      buffer-memory: 33554432
      batch-size: 16384
      transaction-id-prefix: tx
      properties:
        enable.idempotence: true
        max.in.flight.requests.per.connection: 5
        linger.ms: 100
  • transaction.id를 설정해야 함
  • 중복없는 전송을 지원해야 함

 

@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager(ProducerFactory<String, Object> producerFactory) {
    return new KafkaTransactionManager<>(producerFactory);
}

 

코드

더보기
@MessageMapping("/chat.send")
@Transactional("kafkaTransactionManager")
public void sendMessageWithKafka(GroupChatMessageRequest message) {
    String key = message.groupId();

    kafkaTemplate.send(chatTopicName, key, message);
    kafkaTemplate.send(chatTopicName, key, message);
}

 

단계별 동작

  1. 트랜잭션 코디네이터 찾기
    • 프로듀서: 브로커에게 transactional.id로 FindCoordinatorRequest 요청
    • (트랜잭션 코디네이터가 존재하지 않는다면, 신규 트랜잭션 코디네이터가 생성됨)
  2. 트랜잭션 초기화 (initTransactions())
    • 프로듀서: 트랜잭션 코디네이터에게 InitPidRequest 요청
    • 트랜잭션 코디네이터: PID, Epoch 부여. 이를 트랜잭션 로그에 기록함 (PID Epoch 증가)
  3. 트랜잭션 시작 (beginTransaction())
    • 프로듀서: 트랜잭션 시작 (내부적으로 트랜잭션 상태를 관리함)
    • 프로듀서: 트랜잭션 코디네이터에게 트랜잭션 시작 사실을 알림
  4. 트랜잭션에 참여하는 파티션 등록
    • 프로듀서: 토픽 파티션 정보를 트랜잭션 코디네이터에게 전달
    • 트랜잭션 코디네이터: 해당 정보를 트랜잭션 로그에 기록
  5. 메시지 전송
    • 프로듀서: 대상 토픽의 파티션에 실제 메시지 전송 (PID, Epoch, 시퀀스 번호 포함)
  6. 트랜잭션 준비 (commitTransaction() or abortTransaction())
    • 프로듀서: 트랜잭션 코디네이터에게 트랜잭션 준비 요청
    • 트랜잭션 코디네이터: 트랜잭션 로그에 상태 기록 (PrepareCommit or PrepareAbort)
    • 트랜잭션 코디네이터: 관련 토픽 파티션에 컨트롤 메시지 전송 (PrepareCommit or PrepareAbort)
  7. 트랜잭션 커밋
    • 프로듀서: 트랜잭션 코디네이터에게 트랜잭션 완료 요청
    • 트랜잭션 코디네이터: 관련 토픽 파티션에 컨트롤 메시지 전송 (CommitMarker or AbortMarker)
    • 트랜잭션 코디네이터: 트랜잭션 로그에 상태 기록 (CompleteCommit or CompleteAbort)
  8. 트랜잭선 종료

 

예제

더보기
/opt/bitnami/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic __transaction_state \
--consumer.config /bitnami/kafka/config/consumer.config \
--formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter" --from-beginning
tx0::TransactionMetadata(transactionalId=tx0, producerId=12000, producerEpoch=0, txnTimeoutMs=60000, state=Empty, pendingState=None, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1746652130343)
tx0::TransactionMetadata(transactionalId=tx0, producerId=12000, producerEpoch=0, txnTimeoutMs=60000, state=Ongoing, pendingState=None, topicPartitions=Set(topic_chat-1), txnStartTimestamp=1746652130412, txnLastUpdateTimestamp=1746652130412)
tx0::TransactionMetadata(transactionalId=tx0, producerId=12000, producerEpoch=0, txnTimeoutMs=60000, state=PrepareCommit, pendingState=None, topicPartitions=Set(topic_chat-1), txnStartTimestamp=1746652130412, txnLastUpdateTimestamp=1746652130484)
tx0::TransactionMetadata(transactionalId=tx0, producerId=12000, producerEpoch=0, txnTimeoutMs=60000, state=CompleteCommit, pendingState=None, topicPartitions=Set(), txnStartTimestamp=1746652130412, txnLastUpdateTimestamp=1746652130492)
  1. Empty: 트랜잭션 초기화
  2. Ongoing: 상태 표시 및 메시지 전송
  3. PrepareCommit: 파티션으로 메시지 전송 완료 및 트랜잭션 종료 요청 완료
  4. CompleteCommit: 트랜잭션 단계가 최종적으로 완료