Database/MongoDB

[MongoDB Kafka Connector] 3-2. Source Connector: Configuration Properties

noahkim_ 2025. 8. 10. 14:33

1. MongoDB Connection Properties

  • Kafka Source Connector - MongoDB cluster와의 연결 및 통신 속성
이름 설명 주의사항
connection.uri MongoDB 인스턴스 또는 클러스터에 연결할 URI. 인증 정보를 노출하지 않으려면 ConfigProvider 사용 권장.
database 변경 감지를 수행할 데이터베이스 이름. 미설정 시 모든 DB 감시
collection 변경 감지를 수행할 컬렉션 이름 DB 설정이 비어 있으면 이 설정은 무시됨.
server.api.version 사용할 Stable API 버전 MongoDB Stable API 지원 버전 확인 필요.
server.api.deprecationErrors true 시, 설정된 Stable API 버전에서 deprecated된 명령을 호출하면 예외 발생.  
server.api.strict true 시, 설정된 Stable API 버전에 포함되지 않은 명령 호출 시 예외 발생.  

 

설정) connection properties

더보기
name=mongodb-source-connector
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1

# MongoDB 연결 URI (인증 정보 포함 가능)
connection.uri=mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=myRepl

# 변경 감지를 수행할 DB 및 컬렉션
database=myDatabase
collection=myCollection

# MongoDB Stable API 버전 지정 (선택)
server.api.version=1
server.api.deprecationErrors=false
server.api.strict=false

# Kafka 토픽 매핑
topic.prefix=mongo_

# Value/Key 포맷 설정 (예: JSON)
output.format.value=json
output.format.key=json

# Converter 설정 (예: JSON Converter, 스키마 없이)
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

 

2. Kafka Topic Properties

  • 토픽 이름은 topic.prefix + topic.separator + 데이터베이스명 + topic.separator + 컬렉션명 + topic.suffix 구조로 생성됨
Name Description 특징
topic.prefix Kafka 토픽 이름의 앞부분  
topic.suffix Kafka 토픽 이름의 뒷부분  
topic.namespace.map MongoDB 네임스페이스(데이터베이스.컬렉션)와 Kafka 토픽명을 매핑하는 JSON 객체 - 정규식(regex)·와일드카드 지원.
- topic.separator와 무관하게 네임스페이스 표기 시 항상 . 사용
topic.separator 토픽 이름 생성 시 각 요소를 연결하는 구분자  
topic.mapper 사용자 정의 토픽 매핑 로직을 구현한 Java 클래스 지정.  

 

설정) kafka topic properties

 

3. Change Stream Properties

이름 한 줄 설명 비고
pipeline Change Stream 이벤트에 적용할 Aggregation 파이프라인
fullDocument가 아니라 이벤트 문서에 대해 매칭함.
예: [{"$match":{"$and":[{"operationType":"insert"},{"fullDocument.eventId":1404}]}}]
change.stream.full.document update 이벤트에서 어떤 문서를 돌려줄지
updateLookup는 변경분 + 수정된 전체 문서 사본까지 포함
change.stream.show.expanded.events DDL 이벤트 알림 포함 여부
(예: createIndexes, dropIndexes)
updateDescription.disambiguatedPaths(모호한 경로 해소 정보)를 보려면 이 옵션이 필요함.
change.stream.full.document.before.change 프리이미지(pre-image) 반환 여부(업데이트/삭제 이전 문서)
프리이미지는 사전 설정된 컬렉션에서만 가능.
기존 데이터 복사(snapshots) 중에는 적용되지 않음.
publish.full.document.only 업데이트 이벤트에서 fullDocument만 내보낼지
true로 하면 change.stream.full.document를 자동으로 updateLookup로 강제함.
publish.full.document.only.tombstone.on.delete publish.full.document.only=true일 때 삭제 시 tombstone(key+null)도 보낼지
이 옵션은 위 조건에서만 의미 있음.
change.stream.document.key.as.key 소스 레코드 키를 documentKey로 쓸지
true면 tombstone에도 삭제된 문서의 키 포함.
false면 tombstone 키로 resume token 사용.
collation Change Stream 반환 문서에 적용할 언어별 정렬 규칙(JSON)
유효한 collation JSON이어야 함.
batch.size Change Stream 커서 배치 크기 정수.
poll.await.time.ms 커넥터가 새 변경을 기다리는 최대 대기 시간(ms)
정수. 새 데이터 없으면 이 시간 뒤 빈 배치 반환 가능.
poll.max.batch.size 한 번 폴링에서 읽을 최대 문서 수
정수. 내부 버퍼링량 제한 용도.

 

4. Output Format Properties

5. Startup Properties

6. Error Handling and Resuming from Interruption Properties

7. All Properties

 

출처