1. Kafka Connector
- Apache Kafka의 하위 구성 요소
- Kafka를 데이터 저장소와 연결해줌
2. Install the MongoDB Kafka Connector
Kafka plugins 디렉토리에 복사
- 커넥터 플러그인(JAR 파일들)을 plugin.path로 지정된 디렉토리에서 찾음
- 예: plugin.path=/usr/local/share/kafka/plugins 라고 설정하면,
/usr/local/share/kafka/plugins 폴더 안에 MongoDB Kafka Connector JAR을 넣어야 함. - 이렇게 하면 Kafka Connect 실행 시 해당 디렉토리에서 커넥터 클래스를 자동으로 로딩합니다.
분산 모드(Distributed Worker)
- Kafka Connect는 Standalone과 Distributed 모드가 있음.
- 분산 모드는 여러 Kafka Connect 워커 프로세스를 하나의 클러스터처럼 묶어서 작업을 나눠 처리하는 방식.
- 이때 모든 워커가 동일한 커넥터 코드(JAR)를 가지고 있어야, 어디서든 커넥터 작업을 실행할 수 있습니다.
3. Connect to MongoDB
- Connection URI를 사용하여 MongoDB에 연결
- MongoDB Java Driver를 사용하여 Connection URI를 파싱함
예시) MongoDB ReplicaSet URI
더보기
mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017,mongodb2.example.com:27017/?replicaSet=myRepl
인증
- MongoDB Java Driver에서 지원하는 모든 인증 방식이 MongoDB Kafka Connector에서도 사용 가능함
예시) SCRAM-SHA-256 인증 방식
더보기
mongodb://<db_username>:<db_password>@<hostname>:<port>/?authSource=<authenticationDb>&authMechanism=SCRAM-SHA-256
4. Data Formats
- kafka connect 내부 데이터 포맷을 외부로 직렬화 할 떄 쓰는 인코딩 형식
JSON
형식 | 특징 | 주요 사용 시점 |
JSON | JavaScript 객체 표기법을 기반으로 한 표준 데이터 교환 형식 |
일반적인 데이터 표현
|
Raw JSON | JSON 객체를 문자열로 표현한 형식 |
Source/Sink 커넥터에 StringConverter 사용 시
|
BSON | JSON과 유사한 객체를 이진(Binary) 형식으로 직렬화 |
MongoDB ↔ Connector 간 문서 전송 시 내부적으로 사용
|
JSON Schema | JSON 객체의 구조와 값의 유효성을 정의하는 스키마 문법 |
JSON Schema Converter 적용 시, Kafka 토픽 메시지 형식 강제
|
예시) Row JSON
더보기
"{\"company\":\"MongoDB\"}"
예시) BSON
더보기
\x1a\x00\x00\x00\x02company\x00\x08\x00\x00\x00MongoDB\x00\x00
예시) JSON Schema
더보기
{
"$schema": "http://json-schema.org/draft-07/schema",
"type": "object",
"required": ["company"],
"properties": {
"company": {
"type": "string",
"description": "A field to hold the name of a company"
}
},
"additionalProperties": false
}
Avro
- 스키마 기반 데이터 직렬화 및 전송 오픈소스 프레임워크
- 데이터 통신 시, 데이터를 스키마로 정의하고 그 스키마에 맞게 변환 및 전송
Avro Schema
- JSON 기반 스키마 정의 문법
구분 | 설명 | 예시 타입 |
MongoDB Kafka Connector 주의사항
|
Primitive | 가장 기본적인 값 타입 | string, int, long, boolean, bytes |
null ❌
|
Complex | 여러 값·타입을 조합 | record, array, map, union, enum, fixed |
- enum, fixed ❌
- union 제한 (2개 초과, null 중복 ❌) |
Logical | Primitive에 의미 부여 | decimal, date, time-millis, time-micros, timestamp-millis, timestamp-micros |
Sink Connector는 제한된 Logical 타입만 지원
(여기 있는 목록만 가능) |
예) Avro Scheman
더보기
{
"type": "record",
"name": "example",
"doc": "example documents have a company field",
"fields": [
{
"name": "company",
"type": "string"
}
]
}
Avro Binary Encoding
- Avro 스키마로 정의된 JSON 객체를 이진 형식으로 직렬화하는 방식
- 더 compact하고 전송 효율이 높은 이진 데이터로 변환
예) Avro Binary Encoding
더보기
\x0eMongoDB
Byte Arrays
- 구조가 없는 연속된 바이트의 집합
- 어떤 인코딩 형식으로 직렬화된 데이터든, 결국 바이트 배열 형태로 나타낼 수 있음
- Kafka Converter가 데이터를 브로커로 전송하거나 읽어올 때, 내부적으로는 모두 바이트 배열로 처리함
- 즉, 토픽의 메시지는 결국 바이트 배열 형태로 전송 및 저장됨
5. Converters
- kafka connect ↔️ kafka 간 데이터 형식 변환을 담당하는 프로그램
- 내부 포맷(Schema + Value)을 어떤 표현 방식으로 인코딩 할지 결정
Available Converters
- MongoDB Kafka Connector
- MongoDB 데이터(BSON)을 내부 포맷(Schema + Value)으로 변환
- 인코딩
- Kafka Connect가 지원하는 모든 Converter 사용 가능 (Avro Converter, Protobuf Converter, JSON Converter 등)
- Source와 Sink에서 동일한 Converter 사용 필수
Connector Configuration
컨버터 | Kafka 저장 형태 | 스키마 레지스트리 | 장점 | 주의점/제한 |
AvroConverter | Avro 바이너리 (byte[]) |
필수 | 강한 스키마 관리 호환성 검사 지원 |
동적 스키마에 부적합 |
ProtobufConverter | Protobuf 바이너리 | 필수 | 진화 용이 필드 태그 기반 |
동적 스키마에 취약 (진화 전략 필요) |
JSON Schema Converter | JSON+SchemaRef (byte[]) |
필수 | 스키마 관리 JSON 친화 |
스키마 불일치에 민감 |
JSON Converter | 순수 JSON 텍스트 (UTF-8 byte[]) |
불필요 | 단순/가벼움 도구 가시성 좋음 |
스키마 관리 없음 (검증 어려움) |
String Converter (Raw JSON) |
문자열 byte[] | 불필요 | 가장 단순 Raw JSON 라인로그처럼 사용 |
타입 보존/검증 불가 |
설정) AvroConverter
더보기
key.converter=value.converter=io.confluent.connect.avro.AvroConverter
*.schema.registry.url=<SR URI>
output.format.key=value=schema
설정) ProtobufConverter
더보기
o.confluent.connect.protobuf.ProtobufConverter
*.schema.registry.url=<SR URI>
output.format.*=schema
설정) JSON Schema Converter
더보기
io.confluent.connect.json.JsonSchemaConverter
*.schema.registry.url=<SR URI>
output.format.*=schema
설정) JSON Converter
더보기
org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
output.format.*=json
설정) String Converter
더보기
org.apache.kafka.connect.storage.StringConverter
key/value.converter.schemas.enable=false
output.format.*=json
출처
'DevOps > Kafka' 카테고리의 다른 글
[Kafka] Dead Letter Queue (1) | 2025.08.15 |
---|---|
[Debezium] 1. Introduction (2) | 2025.08.11 |
[Kafka] 9. Kafka Streams (0) | 2025.05.12 |
[실전 카프카 개발부터 운영까지] 6. 컨슈머의 내부 동작 원리와 구현 (0) | 2025.05.08 |
[실전 카프카 개발부터 운영까지] 5. 프로듀서의 내부 동작 원리와 구현 (0) | 2025.05.07 |