1. Change Streams
- 애플리케이션이 실시간 데이터 변경 내용을 바로 받아볼 수 있게 하는 기능
구분 | 내용 |
Aggregation | Aggregation Pipeline을 사용해 Change Stream 이벤트를 원하는 형태로 가공 가능 |
Change Event Structure |
- 메타데이터(_id, operationType, ns, documentKey, updateDescription)
- 변경 내용 * publish.full.document.only 옵션: 변경 내용만 Kafka Connector로 전송 (메타데이터 제외) |
Performance | 인덱스 사용 불가 (oplog가 특수한 capped 컬렉션이라 인덱스 불가) → 고속 스토리지 사용, 충분한 캐시 메모리(WiredTiger cache) 확보 |
Source Connectors
- 하나의 Change Stream을 열어, 그 스트림의 데이터를 Kafka Connector에 전송하기
- Connector가 실행 되는 동안만 Change Stream 유지 (Connector 중지 시 Change Stream 닫음)
항목 | 정의 | 저장 위치 |
Resume Token | Change Event Document의 _id 값 | MongoDB 내부 |
Kafka Offset | Kafka Connector가 처리한 마지막 이벤트 위치 값 |
Kafka의 offset.storage.topic 토픽
|
동작 흐름) Source Connector
더보기
- 커넥터 첫 실행: Offset 없으면 새로운 Change Stream 시작
- 첫 이벤트 수신 후 Kafka에 발행: 해당 이벤트의 Resume Token을 Offset으로 처리
- 재시작/장애복구 시: 저장된 Offset 위치부터 이어서 처리
2. Apply Schemas
- MongoDB Kafka Source Connector에서 들어오는 문서에 스키마를 적용하는 방법
Default Schemas
스키마 종류 | 기본 내용 | 사용 권장 조건 | 설정 옵션 |
Key Schema | _id 필드 기반 | _id 필드를 유지하는 경우 |
output.format.key=schema
|
Value Schema | Change Event Document 전체 기반 | 문서를 변환하지 않은 경우 |
output.format.value=schema
|
Schemas For Transformed Documents
스키마 변경 설정
- publish.full.document.only=true
- 이벤트 문서 구조 변경 (Aggregation Pipeline)
스키마 정의
구분 | 설명 | 비고 |
스키마 직접 지정 | Avro 스키마 문법으로 Key/Value 스키마를 직접 정의 |
Avro 바이너리 인코딩 시 Avro Converter 필수
|
스키마 추론 | Value 스키마를 자동 추론하여 적용 (Key 스키마는 추론 불가) |
배열·중첩 문서 추론 |
예) 스키마 직접 지정
더보기
Key
output.format.key=schema
output.schema.key=<Avro 스키마>
Value
output.format.value=schema
output.schema.value=<Avro 스키마>
예) 스키마 추론
더보기
Value
output.format.value=schema
output.schema.infer.value=true
표) 스키마 추론 케이스
더보기
케이스 | 추론 동작 | 위험/한계 | 권장 |
문서 간 필드 존재/부재 | 필드 유무를 감안해 추론 시도 | 일부 문서에서만 존재 시 의미/타입 불명확 |
해당 필드의 존재/타입을 명시한 Value 스키마 지정
|
null 여부 차이 | null 포함 상황을 반영해 추론 시도 | null 처리 일관성 없으면 충돌 |
nullable 정책을 명확히 하여 스키마로 지정
|
배열 원소 타입 불일치 | 서로 다른 원소 타입 감지 | 혼합 타입이면 충돌 |
배열 원소 타입을 단일화하거나 스키마로 고정
|
배열 비어 있음 | 빈 배열로는 원소 타입 판단 어려움 | 타입 미상 |
원소 타입을 스키마에 명시
|
- 정확히 추론되지 않는 경우, string 폴백될 수 있음
3. JSON Formatters
- JSON 데이터를 어떻게 명시할 것인지를 담당함
- 출력되는 데이터의 타입을 표현하기 위한 명령들을 제공함
Built-In Formatters
클래스 | 설명 | 특징 |
DefaultJson | 예전 방식의 strict JSON 포맷터 |
- MongoDB Kafka Connector의 레거시 JSON 포맷
- 엄격한 JSON 형식 |
ExtendedJson | Fully type-safe Extended JSON 포맷터 | - 완전한 타입 안전성 - 대부분의 값을 BSON 타입으로 표현 (BSON 타입 보존에 중점) |
SimplifiedJson | 단순화된 JSON 포맷터 | - 단순한 JSON 형식 - BSON 타입 없이 문자열로 표현 |
Example
Name | Value | Type |
_id | "5f15aab12435743f9bd126a4" | ObjectID ($oid) |
w | [ 12345.6789, 23.53 ] |
Array of:
- Decimal128 ($numberDecimal) - Double ($numberDouble) |
x | "SSBsb3ZlIGZvcm1hdHRpbmch" of type "00" | Binary ($binary) |
y | "2023-05-11T08:27:07.000Z" | Date ($date) |
z | { a: false, b: 87, c: "hello world" } |
Document with fields:
- a: Boolean - b: Int32 ($numberInt) - c: String |
출력) DefaultJson
더보기
더보기
{
"_id": {"$oid": "5f15aab12435743f9bd126a4"},
"w": [{"$numberDecimal": "12345.6789"}, 23.53],
"x": {"$binary": "SSBsb3ZlIGZvcm1hdHRpbmch", "$type": "00"},
"y": {"$date": 1683793627000},
"z": {"a": false, "b": 87, "c": "hello world"}
}
출력) ExtendedJson
더보기
더보기
{
"_id": {"$oid": "5f15aab12435743f9bd126a4"},
"w": [{"$numberDecimal": "12345.6789"}, {"$numberDouble": "23.53"}],
"x": {"$binary": "SSBsb3ZlIGZvcm1hdHRpbmch", "$type": "00"},
"y": {"$date": 1683793627000},
"z": {"a": false, "b": {"$numberInt": "87"}, "c": "hello world"}
}
출력) SimplifiedJson
더보기
더보기
{
"_id": "5f15aab12435743f9bd126a4",
"w": ["12345.6789", 23.53],
"x": "SSBsb3ZlIGZvcm1hdHRpbmch",
"y": "2023-05-11T08:27:07Z",
"z": {"a": false, "b": 87, "c": "hello world"}
}