카테고리 없음

[MongoDB Kafka Connector] 3-1. Source Connector: Fundamentals

noahkim_ 2025. 8. 10. 14:32

1. Change Streams

  • 애플리케이션이 실시간 데이터 변경 내용을 바로 받아볼 수 있게 하는 기능
구분 내용
Aggregation Aggregation Pipeline을 사용해 Change Stream 이벤트를 원하는 형태로 가공 가능
Change Event Structure 
- 메타데이터(_id, operationType, ns, documentKey, updateDescription)
- 변경 내용

* publish.full.document.only 옵션: 변경 내용만 Kafka Connector로 전송 (메타데이터 제외)
Performance 인덱스 사용 불가 (oplog가 특수한 capped 컬렉션이라 인덱스 불가)
고속 스토리지 사용, 충분한 캐시 메모리(WiredTiger cache) 확보

 

Source Connectors

  • 하나의 Change Stream을 열어, 그 스트림의 데이터를 Kafka Connector에 전송하기
  • Connector가 실행 되는 동안만 Change Stream 유지 (Connector 중지 시 Change Stream 닫음)
항목 정의 저장 위치
Resume Token Change Event Document의 _id 값 MongoDB 내부
Kafka Offset Kafka Connector가 처리한 마지막 이벤트 위치 값
Kafka의 offset.storage.topic 토픽

 

동작 흐름) Source Connector

더보기
  1. 커넥터 첫 실행: Offset 없으면 새로운 Change Stream 시작
  2. 첫 이벤트 수신 후 Kafka에 발행: 해당 이벤트의 Resume Token을 Offset으로 처리
  3. 재시작/장애복구 시: 저장된 Offset 위치부터 이어서 처리

 

2. Apply Schemas

  • MongoDB Kafka Source Connector에서 들어오는 문서에 스키마를 적용하는 방법

 

Default Schemas

스키마 종류 기본 내용 사용 권장 조건 설정 옵션
Key Schema _id 필드 기반 _id 필드를 유지하는 경우
output.format.key=schema
Value Schema Change Event Document 전체 기반 문서를 변환하지 않은 경우
output.format.value=schema

 

Schemas For Transformed Documents

스키마 변경 설정
  • publish.full.document.only=true
  • 이벤트 문서 구조 변경 (Aggregation Pipeline)

 

스키마 정의
구분 설명 비고
스키마 직접 지정 Avro 스키마 문법으로 Key/Value 스키마를 직접 정의
Avro 바이너리 인코딩 시 Avro Converter 필수
스키마 추론 Value 스키마를 자동 추론하여 적용
(Key 스키마는 추론 불가)
배열·중첩 문서 추론

 

예) 스키마 직접 지정

더보기
Key
output.format.key=schema
output.schema.key=<Avro 스키마>

 

Value

output.format.value=schema
output.schema.value=<Avro 스키마>

 

예) 스키마 추론

더보기
Value
output.format.value=schema
output.schema.infer.value=true

 

표) 스키마 추론 케이스

더보기
케이스 추론 동작 위험/한계 권장
문서 간 필드 존재/부재 필드 유무를 감안해 추론 시도 일부 문서에서만 존재 시 의미/타입 불명확
해당 필드의 존재/타입을 명시한 Value 스키마 지정
null 여부 차이 null 포함 상황을 반영해 추론 시도 null 처리 일관성 없으면 충돌
nullable 정책을 명확히 하여 스키마로 지정
배열 원소 타입 불일치 서로 다른 원소 타입 감지 혼합 타입이면 충돌
배열 원소 타입을 단일화하거나 스키마로 고정
배열 비어 있음 빈 배열로는 원소 타입 판단 어려움 타입 미상
원소 타입을 스키마에 명시
  • 정확히 추론되지 않는 경우, string 폴백될 수 있음

 

3. JSON Formatters

  • JSON 데이터를 어떻게 명시할 것인지를 담당함
  • 출력되는 데이터의 타입을 표현하기 위한 명령들을 제공함

 

Built-In Formatters

클래스 설명 특징
DefaultJson 예전 방식의 strict JSON 포맷터
- MongoDB Kafka Connector의 레거시 JSON 포맷
- 엄격한 JSON 형식
ExtendedJson Fully type-safe Extended JSON 포맷터 - 완전한 타입 안전성
- 
대부분의 값을 BSON 타입으로 표현 (BSON 타입 보존에 중점)
SimplifiedJson 단순화된 JSON 포맷터 - 단순한 JSON 형식
- BSON 타입 없이 문자열로 표현

 

Example

Name Value Type
_id "5f15aab12435743f9bd126a4" ObjectID ($oid)
w [ 12345.6789, 23.53 ]
Array of:
- Decimal128 ($numberDecimal)
- Double ($numberDouble)
x "SSBsb3ZlIGZvcm1hdHRpbmch" of type "00" Binary ($binary)
y "2023-05-11T08:27:07.000Z" Date ($date)
z { a: false, b: 87, c: "hello world" }
Document with fields:
- a: Boolean
- b: Int32 ($numberInt)
- c: String

 

출력) DefaultJson

더보기
더보기
{
   "_id": {"$oid": "5f15aab12435743f9bd126a4"},
   "w": [{"$numberDecimal": "12345.6789"}, 23.53],
   "x": {"$binary": "SSBsb3ZlIGZvcm1hdHRpbmch", "$type": "00"},
   "y": {"$date": 1683793627000},
   "z": {"a": false, "b": 87, "c": "hello world"}
}

 

출력) ExtendedJson

더보기
더보기
{
   "_id": {"$oid": "5f15aab12435743f9bd126a4"},
   "w": [{"$numberDecimal": "12345.6789"}, {"$numberDouble": "23.53"}],
   "x": {"$binary": "SSBsb3ZlIGZvcm1hdHRpbmch", "$type": "00"},
   "y": {"$date": 1683793627000},
   "z": {"a": false, "b": {"$numberInt": "87"}, "c": "hello world"}
}

 

출력) SimplifiedJson

더보기
더보기
{
   "_id": "5f15aab12435743f9bd126a4",
   "w": ["12345.6789", 23.53],
   "x": "SSBsb3ZlIGZvcm1hdHRpbmch",
   "y": "2023-05-11T08:27:07Z",
   "z": {"a": false, "b": 87, "c": "hello world"}
}