Kafka

[실전 카프카 개발부터 운영까지] 3. 카프카 기본 개념과 구조

noahkim_ 2025. 5. 3. 08:04

고승범 님의 "실전 카프카 개발부터 운영까지" 책을 정리한 포스팅 입니다.


1. 카프카 기초 다지기

구성 요소 정의 핵심 특징 비고/예시
토픽 메시지를 분류하는 이름 메시지의 논리적 그룹화
프로듀서-컨슈머 간 송수신 단위
채팅 시스템, 로그 시스템 등
파티션 토픽을 나누는 단위 병렬·분산 처리 가능 (1 컨슈머당 1 파티션 읽음)
파티션 수 늘리기 가능 (줄이기 X)
성능 향상이 목적
세그먼트 파티션 로그 파일을 나누는 단위 일정 크기로 자동 분할됨
메시지 순서 보장
00000000000000000000.log
(오프셋을 의미함)
리플리케이션 파티션을 브로커에 복제 리더-팔로워 구조 (읽기/쓰기, 읽기)
고가용성 (장애 복구)
팩터 2: 유실 가능
팩터 3: 유실 없음

 

예시) 토픽 생성

더보기
kafka-topics.sh 
    --bootstrap-server [브로커 주소:포트] \
    --create --topic [토픽 이름] \
    --partitions [파티션 수] \
    --replication-factor [복제 수]

 

예시) 브로커 3개로 구성된 클러스터

더보기

partitions=3, replication-factor=2

파티션 리더 팔로워
A-0 B-1 B-2
A-1 B-2 B-3
A-2 B-3 B-1

 

partitions=2, replication-factor=3

파티션 리더 팔로워 팔로워 2
A-0 B-1 B-2 B-3
A-1 B-2 B-3 B-1

 

예시) 세그먼트 구조 및 내용

더보기
/tmp/kafka-logs/user-events-0/
  ├── 00000000000000000000.log         ← 메시지가 저장된 실제 로그 파일
  ├── 00000000000000000000.index       ← 오프셋 인덱스 파일 (빠른 검색용)
  ├── 00000000000000000000.timeindex   ← 타임스탬프 인덱스
  └── leader-epoch-checkpoint          ← 리더 변경 이력
kafka-run-class.sh kafka.tools.DumpLogSegments \
  --files 00000000000000000000.log \
  --print-data-log \
  --deep-iteration
offset: 0 position: 0 CreateTime: 1715408001000 key: user123 payload: {"action": "login"}
offset: 1 position: 128 CreateTime: 1715408002100 key: user123 payload: {"action": "click", "item": "item123"}
...

 

예시) 파티셔닝을 통한 병렬 처리

더보기

파티셔닝 하지 않은 경우 (파티션 1개)

  • Kafka는 하나의 파티션만 존재하므로, 그 파티션은 오직 한 명의 컨슈머(C1) 에게만 할당됨.
  • 나머지 C2, C3는 아무것도 할당받지 못함.
  • 모든 메시지는 C1 혼자 처리
  • → 병렬 처리 불가

 

파티셔닝 한 경우 (파티션 3개)

  • Kafka는 파티션을 각 컨슈머에게 하나씩 분배함:
    • C1 → 파티션 0
    • C2 → 파티션 1
    • C3 → 파티션 2
  • 각 컨슈머가 병렬로 메시지를 처리함 

ex) 사용자 이벤트 처리 시스템

  • : user_id
  • 설명: 사용자가 많고, 각 사용자의 행동 로그를 Kafka에 보냄
  • 전략: user_id로 파티션을 나눔 → 같은 유저는 같은 파티션으로, 유저 간 병렬 처리 가능 

 

2. 카프카의 핵심 개념

항목 설명
분산 시스템
카프카 클러스터에 브로커를 동적으로 추가 가능
➡️ 고성능, 고가용성
페이지 캐시 디스크 대신, OS가 관리하는 잉여 메모리 페이지를 활용
➡️ 처리량 증가
배치 전송 처리 실시간 통신을 일정량 모아서 한 번에 전송하는 방식
➡️ 성능 및 효율성 향상
압축 전송
메시지에 압축 알고리즘 적용
- gzip, snappy: 높은 압축율
- lz4, zstd: 빠른 응답 속도
주키퍼 의존성
클러스터 메타데이터 관리
znode 사용 (카프카의 메타 정보를 주키퍼에 기록)
➡️ 안정적인 클러스터 운영 (살아있는 노드가 과반수 이상일 때 서비스가 가능)

 

3. 프로듀서의 기본 동작과 예제 맛보기

구성 요소

구성 요소 설명
ProducerRecord Kafka에 보낼 메시지 구성 단위 (Topic, Partition, Key, Value)
Serializer
객체를 바이트 형태로 직렬화함
Partitioner
메시지를 어느 파티션에 보낼지 결정
- 파티션별로 임시 보관 (배치 전송을 효율적으로 수행하기 위함)
전송 결과
성공: 메타데이터 반환
실패: 지정된 횟수만큼 자동으로 재시도

 

주요 옵션

옵션명 설명
bootstrap.servers
kafka 클러스터에 연결할 브로커 주소 목록
client.dns.lookup
DNS 조회 방식
acks
메시지 전송 완료 옵션
- 0: 빠른 전송 (유실 가능성 있음)
- 1: 리더 확인
- all: 리더+ 팔로워 확인
max.in.flight.requests.per.connection
하나의 커넥션에서 응답 없이 전송할 수 있는 최대 요청 수
- 1: 순서 보장 (성능은 저하될 수 있음)
compression.type
데이터 전송 시, 사용하는 압축 타입
buffer.memory
데이터 전송 시, 임시로 보관할 수 있는 총 메모리 크기
retries
데이터 전송 실패 시, 재전송할 최대 횟수
batch.size
배치 전송 시, 한 배치에 담기는 최대 데이터 크기
linger.ms
배치 전송을 위해 대기하는 최대 대기 시간
enable.idempotent
Exactly-once 전송 보장 여부
transactional.id
트랜잭션 ID 설정

 

예제

더보기
spring:
  kafka:
    group-id: chat-group
    auto-offset-reset: earliest
    bootstrap-servers: 127.0.0.1:10000,127.0.0.1:10001
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      buffer-memory: 33554432
      retries: 5
      batch-size: 16384
      transaction-id-prefix: my-transactional
      properties: # native option
        enable.idempotence: true
        max.in.flight.requests.per.connection: 5
        linger.ms: 100
@MessageMapping("/chat.send")
public void sendMessageWithKafka(GroupChatMessageRequest message) {
    String key = message.groupId();

    kafkaTemplate.send(chatTopicName, key, message)
            .whenComplete((sendResult, ex) -> {
                if (ex != null) {
                    // 실패한 경우
                    System.out.println("메시지 전송 실패: " + ex.getMessage());
                } else {
                    // 성공한 경우
                    RecordMetadata metadata = sendResult.getRecordMetadata();
                    System.out.println("메시지 전송 성공!");
                    System.out.println("토픽: " + metadata.topic());
                    System.out.println("파티션: " + metadata.partition());
                    System.out.println("오프셋: " + metadata.offset());
                    System.out.println("타임스탬프: " + metadata.timestamp());
                }
            });
}


4. 컨슈머의 기본 동작과 예제 맛보기

컨슈머

  • Kafka 브로커에 저장된 메시지를 소비하는 컴포넌트
항목 설명
파티션 단위 처리 컨슈머는 파티션 단위로 메시지를 소비함
파티션-컨슈머 관계
하나의 파티션은 한 시점에 오직 하나의 컨슈머에게만 할당됨
메시지 요청 방식 컨슈머가 파티션 리더에게 직접 메시지를 요청함 (Pull)
오프셋 관리
메시지는 오프셋 기준으로 가져옴 (읽은 위치는 커밋됨)
컨슈머 그룹의 오프셋 정보를 __consumer_offsets 특수 커밋에 기록함

 

컨슈머 그룹

  • 하나 이상의 컨슈머들이 모여 있는 그룹 (같은 group.id 공유)
  • 하나의 토픽을 공동으로 처리
  • 병렬 처리 및 내결함성을 위해 사용
항목 설명
정보 공유
같은 그룹 내 컨슈머는 누가 어떤 파티션을 소비 중인지 서로 인식함
리밸런싱
컨슈머 수나 파티션 수가 변동될 때 그룹 코디네이터가 파티션을 자동 재할당함
➡️ 컨슈머 장애 발생 시, 남은 컨슈머에게 해당 파티션을 재할당하여 소비를 이어가게 함
이상적인 구조
파티션 수와 컨슈머 수가 1:1일 때, 자원 낭비 없이 가장 효율적임

 

예제) 리밸런싱

더보기
컨슈머 C가 죽었을 경우
→ 그룹 코디네이터가 남은 컨슈머에게 파티션 재할당

 

예제) 이상적인 구조

더보기
파티션 수: 3, 컨슈머 수: 3
→ 각 컨슈머가 파티션 하나씩 소비 (이상적인 병렬 처리)

파티션 수: 3, 컨슈머 수: 2
→ 컨슈머 A가 2개, B가 1개 파티션을 맡음

 

주요 옵션

옵션 설명
bootstrap.servers
Kafka 클러스터의 브로커 주소 목록 (최초 연결용)
group.id
컨슈머 그룹 식별자 (필수)
group.instance.id
특정 컨슈머 인스턴스를 고정적으로 식별하는 ID (Static Membership용)
heartbeat.interval.ms
컨슈머가 코디네이터에게 보내는 하트비트 요청 주기 (ms)
fetch.min.bytes
한 번의 요청으로 가져올 수 있는 최소 데이터 크기 (bytes)
fetch.max.bytes
한 번의 요청으로 가져올 수 있는 최대 데이터 크기 (bytes)
max.partition.fetch.bytes
파티션 하나당 가져올 수 있는 최대 데이터 크기 (bytes)
session.timeout.ms
컨슈머가 응답하지 않을 때 그룹에서 제거되기까지 기다리는 최대 시간 (ms)
enable.auto.commit
오프셋을 자동 커밋할지 여부 (true/false)
auto.offset.reset
오프셋이 없거나 유효하지 않을 때 사용할 전략
- earliest: 가장 예전 메시지부터 읽기
- latest: 가장 최근 메시지부터 읽기
- none: 오프셋 없으면 에러 발생

 

예제

더보기
spring:
  kafka:
    group-id: chat-group
    auto-offset-reset: earliest
    bootstrap-servers: 127.0.0.1:10000,127.0.0.1:10001
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      heartbeat-interval: 3000
      fetch-max-wait: 5000
      fetch-min-size: 1
      enable-auto-commit: true
      properties:
        spring.json.trusted.packages: "*"
        max.partition.fetch.bytes: 1048576
        session.timeout.ms: 100000
        metadata.max.age.ms: 1000
@KafkaListener(topicPattern = "topic_chat", groupId = "chat-group")
public void consumeMessage(ConsumerRecord<String, GroupChatMessageRequest> record) {
    String chatGroupId = record.key();
    GroupChatMessageRequest message = record.value();

    messagingTemplate.convertAndSend("/topic/chat/" + chatGroupId, new GroupChatMessageResponse(message.writerId(), message.content(), null));
}