고승범 님의 "실전 카프카 개발부터 운영까지" 책을 정리한 포스팅 입니다.
1. 카프카 기초 다지기
구성 요소 | 정의 | 핵심 특징 | 비고/예시 |
토픽 | 메시지를 분류하는 이름 | 메시지의 논리적 그룹화 프로듀서-컨슈머 간 송수신 단위 |
채팅 시스템, 로그 시스템 등 |
파티션 | 토픽을 나누는 단위 | 병렬·분산 처리 가능 (1 컨슈머당 1 파티션 읽음) 파티션 수 늘리기 가능 (줄이기 X) |
성능 향상이 목적 |
세그먼트 | 파티션 로그 파일을 나누는 단위 | 일정 크기로 자동 분할됨 메시지 순서 보장 |
00000000000000000000.log (오프셋을 의미함) |
리플리케이션 | 파티션을 브로커에 복제 | 리더-팔로워 구조 (읽기/쓰기, 읽기) 고가용성 (장애 복구) |
팩터 2: 유실 가능 팩터 3: 유실 없음 |
예시) 토픽 생성
더보기
kafka-topics.sh
--bootstrap-server [브로커 주소:포트] \
--create --topic [토픽 이름] \
--partitions [파티션 수] \
--replication-factor [복제 수]
예시) 브로커 3개로 구성된 클러스터
더보기
partitions=3, replication-factor=2
파티션 | 리더 | 팔로워 |
A-0 | B-1 | B-2 |
A-1 | B-2 | B-3 |
A-2 | B-3 | B-1 |
partitions=2, replication-factor=3
파티션 | 리더 | 팔로워 | 팔로워 2 |
A-0 | B-1 | B-2 | B-3 |
A-1 | B-2 | B-3 | B-1 |
예시) 세그먼트 구조 및 내용
더보기
/tmp/kafka-logs/user-events-0/
├── 00000000000000000000.log ← 메시지가 저장된 실제 로그 파일
├── 00000000000000000000.index ← 오프셋 인덱스 파일 (빠른 검색용)
├── 00000000000000000000.timeindex ← 타임스탬프 인덱스
└── leader-epoch-checkpoint ← 리더 변경 이력
kafka-run-class.sh kafka.tools.DumpLogSegments \
--files 00000000000000000000.log \
--print-data-log \
--deep-iteration
offset: 0 position: 0 CreateTime: 1715408001000 key: user123 payload: {"action": "login"}
offset: 1 position: 128 CreateTime: 1715408002100 key: user123 payload: {"action": "click", "item": "item123"}
...
예시) 파티셔닝을 통한 병렬 처리
더보기
파티셔닝 하지 않은 경우 (파티션 1개)
- Kafka는 하나의 파티션만 존재하므로, 그 파티션은 오직 한 명의 컨슈머(C1) 에게만 할당됨.
- 나머지 C2, C3는 아무것도 할당받지 못함.
- → 모든 메시지는 C1 혼자 처리
- → 병렬 처리 불가
파티셔닝 한 경우 (파티션 3개)
- Kafka는 파티션을 각 컨슈머에게 하나씩 분배함:
- C1 → 파티션 0
- C2 → 파티션 1
- C3 → 파티션 2
- → 각 컨슈머가 병렬로 메시지를 처리함
ex) 사용자 이벤트 처리 시스템
- 키: user_id
- 설명: 사용자가 많고, 각 사용자의 행동 로그를 Kafka에 보냄
- 전략: user_id로 파티션을 나눔 → 같은 유저는 같은 파티션으로, 유저 간 병렬 처리 가능
2. 카프카의 핵심 개념
항목 | 설명 |
분산 시스템 |
카프카 클러스터에 브로커를 동적으로 추가 가능
➡️ 고성능, 고가용성 |
페이지 캐시 | 디스크 대신, OS가 관리하는 잉여 메모리 페이지를 활용 ➡️ 처리량 증가 |
배치 전송 처리 | 실시간 통신을 일정량 모아서 한 번에 전송하는 방식 ➡️ 성능 및 효율성 향상 |
압축 전송 |
메시지에 압축 알고리즘 적용
- gzip, snappy: 높은 압축율 - lz4, zstd: 빠른 응답 속도 |
주키퍼 의존성 |
클러스터 메타데이터 관리
znode 사용 (카프카의 메타 정보를 주키퍼에 기록) ➡️ 안정적인 클러스터 운영 (살아있는 노드가 과반수 이상일 때 서비스가 가능) |
3. 프로듀서의 기본 동작과 예제 맛보기
구성 요소
구성 요소 | 설명 |
ProducerRecord | Kafka에 보낼 메시지 구성 단위 (Topic, Partition, Key, Value) |
Serializer |
객체를 바이트 형태로 직렬화함
|
Partitioner |
메시지를 어느 파티션에 보낼지 결정
- 파티션별로 임시 보관 (배치 전송을 효율적으로 수행하기 위함) |
전송 결과 |
성공: 메타데이터 반환
실패: 지정된 횟수만큼 자동으로 재시도 |
주요 옵션
옵션명 | 설명 |
bootstrap.servers |
kafka 클러스터에 연결할 브로커 주소 목록
|
client.dns.lookup |
DNS 조회 방식
|
acks |
메시지 전송 완료 옵션
- 0: 빠른 전송 (유실 가능성 있음) - 1: 리더 확인 - all: 리더+ 팔로워 확인 |
max.in.flight.requests.per.connection |
하나의 커넥션에서 응답 없이 전송할 수 있는 최대 요청 수
- 1: 순서 보장 (성능은 저하될 수 있음) |
compression.type |
데이터 전송 시, 사용하는 압축 타입
|
buffer.memory |
데이터 전송 시, 임시로 보관할 수 있는 총 메모리 크기
|
retries |
데이터 전송 실패 시, 재전송할 최대 횟수
|
batch.size |
배치 전송 시, 한 배치에 담기는 최대 데이터 크기
|
linger.ms |
배치 전송을 위해 대기하는 최대 대기 시간
|
enable.idempotent |
Exactly-once 전송 보장 여부
|
transactional.id |
트랜잭션 ID 설정
|
예제
더보기
spring:
kafka:
group-id: chat-group
auto-offset-reset: earliest
bootstrap-servers: 127.0.0.1:10000,127.0.0.1:10001
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
buffer-memory: 33554432
retries: 5
batch-size: 16384
transaction-id-prefix: my-transactional
properties: # native option
enable.idempotence: true
max.in.flight.requests.per.connection: 5
linger.ms: 100
@MessageMapping("/chat.send")
public void sendMessageWithKafka(GroupChatMessageRequest message) {
String key = message.groupId();
kafkaTemplate.send(chatTopicName, key, message)
.whenComplete((sendResult, ex) -> {
if (ex != null) {
// 실패한 경우
System.out.println("메시지 전송 실패: " + ex.getMessage());
} else {
// 성공한 경우
RecordMetadata metadata = sendResult.getRecordMetadata();
System.out.println("메시지 전송 성공!");
System.out.println("토픽: " + metadata.topic());
System.out.println("파티션: " + metadata.partition());
System.out.println("오프셋: " + metadata.offset());
System.out.println("타임스탬프: " + metadata.timestamp());
}
});
}
4. 컨슈머의 기본 동작과 예제 맛보기
컨슈머
- Kafka 브로커에 저장된 메시지를 소비하는 컴포넌트
항목 | 설명 |
파티션 단위 처리 | 컨슈머는 파티션 단위로 메시지를 소비함 |
파티션-컨슈머 관계 |
하나의 파티션은 한 시점에 오직 하나의 컨슈머에게만 할당됨
|
메시지 요청 방식 | 컨슈머가 파티션 리더에게 직접 메시지를 요청함 (Pull) |
오프셋 관리 |
메시지는 오프셋 기준으로 가져옴 (읽은 위치는 커밋됨)
컨슈머 그룹의 오프셋 정보를 __consumer_offsets 특수 커밋에 기록함 |
컨슈머 그룹
- 하나 이상의 컨슈머들이 모여 있는 그룹 (같은 group.id 공유)
- 하나의 토픽을 공동으로 처리
- 병렬 처리 및 내결함성을 위해 사용
항목 | 설명 |
정보 공유 |
같은 그룹 내 컨슈머는 누가 어떤 파티션을 소비 중인지 서로 인식함
|
리밸런싱 |
컨슈머 수나 파티션 수가 변동될 때 그룹 코디네이터가 파티션을 자동 재할당함
➡️ 컨슈머 장애 발생 시, 남은 컨슈머에게 해당 파티션을 재할당하여 소비를 이어가게 함 |
이상적인 구조 |
파티션 수와 컨슈머 수가 1:1일 때, 자원 낭비 없이 가장 효율적임
|
예제) 리밸런싱
더보기
컨슈머 C가 죽었을 경우
→ 그룹 코디네이터가 남은 컨슈머에게 파티션 재할당
예제) 이상적인 구조
더보기
파티션 수: 3, 컨슈머 수: 3
→ 각 컨슈머가 파티션 하나씩 소비 (이상적인 병렬 처리)
파티션 수: 3, 컨슈머 수: 2
→ 컨슈머 A가 2개, B가 1개 파티션을 맡음
주요 옵션
옵션 | 설명 |
bootstrap.servers |
Kafka 클러스터의 브로커 주소 목록 (최초 연결용)
|
group.id |
컨슈머 그룹 식별자 (필수)
|
group.instance.id |
특정 컨슈머 인스턴스를 고정적으로 식별하는 ID (Static Membership용)
|
heartbeat.interval.ms |
컨슈머가 코디네이터에게 보내는 하트비트 요청 주기 (ms)
|
fetch.min.bytes |
한 번의 요청으로 가져올 수 있는 최소 데이터 크기 (bytes)
|
fetch.max.bytes |
한 번의 요청으로 가져올 수 있는 최대 데이터 크기 (bytes)
|
max.partition.fetch.bytes |
파티션 하나당 가져올 수 있는 최대 데이터 크기 (bytes)
|
session.timeout.ms |
컨슈머가 응답하지 않을 때 그룹에서 제거되기까지 기다리는 최대 시간 (ms)
|
enable.auto.commit |
오프셋을 자동 커밋할지 여부 (true/false)
|
auto.offset.reset |
오프셋이 없거나 유효하지 않을 때 사용할 전략
- earliest: 가장 예전 메시지부터 읽기 - latest: 가장 최근 메시지부터 읽기 - none: 오프셋 없으면 에러 발생 |
예제
더보기
spring:
kafka:
group-id: chat-group
auto-offset-reset: earliest
bootstrap-servers: 127.0.0.1:10000,127.0.0.1:10001
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
heartbeat-interval: 3000
fetch-max-wait: 5000
fetch-min-size: 1
enable-auto-commit: true
properties:
spring.json.trusted.packages: "*"
max.partition.fetch.bytes: 1048576
session.timeout.ms: 100000
metadata.max.age.ms: 1000
@KafkaListener(topicPattern = "topic_chat", groupId = "chat-group")
public void consumeMessage(ConsumerRecord<String, GroupChatMessageRequest> record) {
String chatGroupId = record.key();
GroupChatMessageRequest message = record.value();
messagingTemplate.convertAndSend("/topic/chat/" + chatGroupId, new GroupChatMessageResponse(message.writerId(), message.content(), null));
}
'Kafka' 카테고리의 다른 글
[Kafka] 9. Kafka Streams (0) | 2025.05.12 |
---|---|
[실전 카프카 개발부터 운영까지] 6. 컨슈머의 내부 동작 원리와 구현 (0) | 2025.05.08 |
[실전 카프카 개발부터 운영까지] 5. 프로듀서의 내부 동작 원리와 구현 (0) | 2025.05.07 |
[실전 카프카 개발부터 운영까지] 4. 카프카의 내부 동작 원리와 구현 (0) | 2025.05.06 |
[실전 카프카 개발부터 운영까지] 1. 카프카 개요 (0) | 2025.05.02 |