1. MongoDB Connection Properties
- Kafka Source Connector - MongoDB cluster와의 연결 및 통신 속성
| 이름 | 설명 | 주의사항 |
| connection.uri | MongoDB 인스턴스 또는 클러스터에 연결할 URI. | 인증 정보를 노출하지 않으려면 ConfigProvider 사용 권장. |
| database | 변경 감지를 수행할 데이터베이스 이름. | 미설정 시 모든 DB 감시 |
| collection | 변경 감지를 수행할 컬렉션 이름 | DB 설정이 비어 있으면 이 설정은 무시됨. |
| server.api.version | 사용할 Stable API 버전 | MongoDB Stable API 지원 버전 확인 필요. |
| server.api.deprecationErrors | true 시, 설정된 Stable API 버전에서 deprecated된 명령을 호출하면 예외 발생. | |
| server.api.strict | true 시, 설정된 Stable API 버전에 포함되지 않은 명령 호출 시 예외 발생. |
설정) connection properties
더보기
name=mongodb-source-connector
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# MongoDB 연결 URI (인증 정보 포함 가능)
connection.uri=mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=myRepl
# 변경 감지를 수행할 DB 및 컬렉션
database=myDatabase
collection=myCollection
# MongoDB Stable API 버전 지정 (선택)
server.api.version=1
server.api.deprecationErrors=false
server.api.strict=false
# Kafka 토픽 매핑
topic.prefix=mongo_
# Value/Key 포맷 설정 (예: JSON)
output.format.value=json
output.format.key=json
# Converter 설정 (예: JSON Converter, 스키마 없이)
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
2. Kafka Topic Properties
- 토픽 이름은 topic.prefix + topic.separator + 데이터베이스명 + topic.separator + 컬렉션명 + topic.suffix 구조로 생성됨
| Name | Description | 특징 |
| topic.prefix | Kafka 토픽 이름의 앞부분 | |
| topic.suffix | Kafka 토픽 이름의 뒷부분 | |
| topic.namespace.map | MongoDB 네임스페이스(데이터베이스.컬렉션)와 Kafka 토픽명을 매핑하는 JSON 객체 | - 정규식(regex)·와일드카드 지원. - topic.separator와 무관하게 네임스페이스 표기 시 항상 . 사용 |
| topic.separator | 토픽 이름 생성 시 각 요소를 연결하는 구분자 | |
| topic.mapper | 사용자 정의 토픽 매핑 로직을 구현한 Java 클래스 지정. |
설정) kafka topic properties
더보기
1
3. Change Stream Properties
| 이름 | 한 줄 설명 | 비고 |
| pipeline | Change Stream 이벤트에 적용할 Aggregation 파이프라인 |
fullDocument가 아니라 이벤트 문서에 대해 매칭함.
예: [{"$match":{"$and":[{"operationType":"insert"},{"fullDocument.eventId":1404}]}}] |
| change.stream.full.document | update 이벤트에서 어떤 문서를 돌려줄지 |
updateLookup는 변경분 + 수정된 전체 문서 사본까지 포함
|
| change.stream.show.expanded.events | DDL 이벤트 알림 포함 여부 (예: createIndexes, dropIndexes) |
updateDescription.disambiguatedPaths(모호한 경로 해소 정보)를 보려면 이 옵션이 필요함.
|
| change.stream.full.document.before.change | 프리이미지(pre-image) 반환 여부(업데이트/삭제 이전 문서) |
프리이미지는 사전 설정된 컬렉션에서만 가능.
기존 데이터 복사(snapshots) 중에는 적용되지 않음. |
| publish.full.document.only | 업데이트 이벤트에서 fullDocument만 내보낼지 |
true로 하면 change.stream.full.document를 자동으로 updateLookup로 강제함.
|
| publish.full.document.only.tombstone.on.delete | publish.full.document.only=true일 때 삭제 시 tombstone(key+null)도 보낼지 |
이 옵션은 위 조건에서만 의미 있음.
|
| change.stream.document.key.as.key | 소스 레코드 키를 documentKey로 쓸지 |
true면 tombstone에도 삭제된 문서의 키 포함.
false면 tombstone 키로 resume token 사용. |
| collation | Change Stream 반환 문서에 적용할 언어별 정렬 규칙(JSON) |
유효한 collation JSON이어야 함.
|
| batch.size | Change Stream 커서 배치 크기 | 정수. |
| poll.await.time.ms | 커넥터가 새 변경을 기다리는 최대 대기 시간(ms) |
정수. 새 데이터 없으면 이 시간 뒤 빈 배치 반환 가능.
|
| poll.max.batch.size | 한 번 폴링에서 읽을 최대 문서 수 |
정수. 내부 버퍼링량 제한 용도.
|
4. Output Format Properties
5. Startup Properties
6. Error Handling and Resuming from Interruption Properties
7. All Properties
출처
'Database > MongoDB' 카테고리의 다른 글
| [MongoDB Kafka Connector] 3-1. Source Connector: Fundamentals (1) | 2025.08.10 |
|---|---|
| [MongoDB] 2-1. Change Streams (0) | 2025.08.10 |
| [MongoDB] ?-2. Replication: Oplog (0) | 2025.08.10 |
| [MongoDB] MongoDB Replication (0) | 2025.08.05 |