DevOps/Kafka

[MongoDB Kafka Connector] 1. Kafka Connector

noahkim_ 2025. 8. 10. 20:47

1. Kafka Connector

  • Apache Kafka의 하위 구성 요소
  • Kafka를 데이터 저장소와 연결해줌

 

2. Install the MongoDB Kafka Connector

Kafka plugins 디렉토리에 복사

  • 커넥터 플러그인(JAR 파일들)을 plugin.path로 지정된 디렉토리에서 찾음
  • 예: plugin.path=/usr/local/share/kafka/plugins 라고 설정하면,
    /usr/local/share/kafka/plugins 폴더 안에 MongoDB Kafka Connector JAR을 넣어야 함.
  • 이렇게 하면 Kafka Connect 실행 시 해당 디렉토리에서 커넥터 클래스를 자동으로 로딩합니다.

 

분산 모드(Distributed Worker)

  • Kafka Connect는 Standalone과 Distributed 모드가 있음.
  • 분산 모드는 여러 Kafka Connect 워커 프로세스를 하나의 클러스터처럼 묶어서 작업을 나눠 처리하는 방식.
  • 이때 모든 워커가 동일한 커넥터 코드(JAR)를 가지고 있어야, 어디서든 커넥터 작업을 실행할 수 있습니다.

 

3. Connect to MongoDB

  • Connection URI를 사용하여 MongoDB에 연결
  • MongoDB Java Driver를 사용하여 Connection URI를 파싱함

 

예시) MongoDB ReplicaSet URI

더보기
mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017,mongodb2.example.com:27017/?replicaSet=myRepl

 

인증

  • MongoDB Java Driver에서 지원하는 모든 인증 방식이 MongoDB Kafka Connector에서도 사용 가능함

 

예시) SCRAM-SHA-256 인증 방식

더보기
mongodb://<db_username>:<db_password>@<hostname>:<port>/?authSource=<authenticationDb>&authMechanism=SCRAM-SHA-256
 

4. Data Formats

  • kafka connect 내부 데이터 포맷을 외부로 직렬화 할 떄 쓰는 인코딩 형식

 

JSON

형식 특징 주요 사용 시점
JSON JavaScript 객체 표기법을 기반으로 한 표준 데이터 교환 형식
일반적인 데이터 표현
Raw JSON JSON 객체를 문자열로 표현한 형식
Source/Sink 커넥터에 StringConverter 사용 시
BSON JSON과 유사한 객체를 이진(Binary) 형식으로 직렬화
MongoDB ↔ Connector 간 문서 전송 시 내부적으로 사용
JSON Schema JSON 객체의 구조와 값의 유효성을 정의하는 스키마 문법
JSON Schema Converter 적용 시, Kafka 토픽 메시지 형식 강제

 

예시) Row JSON

더보기
"{\"company\":\"MongoDB\"}"

 

예시) BSON

더보기
\x1a\x00\x00\x00\x02company\x00\x08\x00\x00\x00MongoDB\x00\x00

 

예시) JSON Schema

더보기
{
  "$schema": "http://json-schema.org/draft-07/schema",
  "type": "object",
  "required": ["company"],
  "properties": {
    "company": {
      "type": "string",
      "description": "A field to hold the name of a company"
    }
  },
  "additionalProperties": false
}

 

Avro

  • 스키마 기반 데이터 직렬화 및 전송 오픈소스 프레임워크
  • 데이터 통신 시, 데이터를 스키마로 정의하고 그 스키마에 맞게 변환 및 전송

 

Avro Schema
  • JSON 기반 스키마 정의 문법
구분 설명 예시 타입
MongoDB Kafka Connector 주의사항
Primitive 가장 기본적인 값 타입 string, int, long, boolean, bytes
null ❌
Complex 여러 값·타입을 조합 record, array, map, union, enum, fixed
- enum, fixed ❌
- union 제한 (2개 초과, null 중복 ❌)
Logical Primitive에 의미 부여 decimal, date, time-millis, time-micros,
timestamp-millis, timestamp-micros
Sink Connector는 제한된 Logical 타입만 지원
(여기 있는 목록만 가능)

 

 예) Avro Scheman

더보기
{
  "type": "record",
  "name": "example",
  "doc": "example documents have a company field",
  "fields": [
    {
      "name": "company",
      "type": "string"
    }
  ]
}

 

Avro Binary Encoding
  • Avro 스키마로 정의된 JSON 객체를 이진 형식으로 직렬화하는 방식
  • 더 compact하고 전송 효율이 높은 이진 데이터로 변환


 예) Avro Binary Encoding

더보기
\x0eMongoDB

 

Byte Arrays

  • 구조가 없는 연속된 바이트의 집합
  • 어떤 인코딩 형식으로 직렬화된 데이터든, 결국 바이트 배열 형태로 나타낼 수 있음
  • Kafka Converter가 데이터를 브로커로 전송하거나 읽어올 때, 내부적으로는 모두 바이트 배열로 처리함
  • 즉, 토픽의 메시지는 결국 바이트 배열 형태로 전송 및 저장됨

 

5. Converters

 

  • kafka connect ↔️ kafka 간 데이터 형식 변환을 담당하는 프로그램
  • 내부 포맷(Schema + Value)을 어떤 표현 방식으로 인코딩 할지 결정

 

Available Converters

  • MongoDB Kafka Connector
    • MongoDB 데이터(BSON)을 내부 포맷(Schema + Value)으로 변환
  • 인코딩
    • Kafka Connect가 지원하는 모든 Converter 사용 가능 (Avro Converter, Protobuf Converter, JSON Converter 등)
    • Source와 Sink에서 동일한 Converter 사용 필수

 

Connector Configuration

컨버터 Kafka 저장 형태 스키마 레지스트리 장점 주의점/제한
AvroConverter Avro 바이너리
(byte[])
필수 강한 스키마 관리
호환성 검사 지원
동적 스키마에 부적합
ProtobufConverter Protobuf 바이너리 필수 진화 용이
필드 태그 기반
동적 스키마에 취약
(진화 전략 필요)
JSON Schema Converter JSON+SchemaRef
(byte[])
필수 스키마 관리
JSON 친화
스키마 불일치에 민감
JSON Converter 순수 JSON 텍스트
(UTF-8 byte[])
불필요 단순/가벼움
도구 가시성 좋음
스키마 관리 없음
(검증 어려움)
String Converter
(Raw JSON)
문자열 byte[] 불필요 가장 단순
Raw JSON 라인로그처럼 사용
타입 보존/검증 불가

 

설정) AvroConverter

더보기
key.converter=value.converter=io.confluent.connect.avro.AvroConverter
*.schema.registry.url=<SR URI>
output.format.key=value=schema

 

설정) ProtobufConverter

더보기
o.confluent.connect.protobuf.ProtobufConverter
*.schema.registry.url=<SR URI>
output.format.*=schema

 

설정) JSON Schema Converter

더보기
io.confluent.connect.json.JsonSchemaConverter
*.schema.registry.url=<SR URI>
output.format.*=schema

 

설정) JSON Converter

더보기
org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
output.format.*=json

 

설정) String Converter

더보기
org.apache.kafka.connect.storage.StringConverter
key/value.converter.schemas.enable=false
output.format.*=json

 

 

출처