- 다양한 데이터베이스의 CDC를 위한 오픈소스 커넥터 모음 (Kafka Connect 위에서 동작함)
- 각 데이터베이스의 로그를 기반으로 한 변경 스트림을 읽음
장점
항목 | 설명 | 예 |
완전성 |
모든 데이터 변경을 누락 없이 캡처
|
|
낮은 지연 |
폴링 대비 매우 낮은 레이턴시
|
MySQL·Postgres 사례: ms 수준
|
스키마 변경 불필요 |
Last Updated 같은 컬럼 추가 불필요
|
|
삭제 캡처 |
DELETE 이벤트도 포함
|
|
메타데이터 제공 |
DB/설정에 따라 포함 가능
|
이전 레코드 상태, 트랜잭션 ID, 유발 쿼리 등
|
1. Architecture
- Kafka Connect 위에서 동작하는 소스 커넥터로 배포됨
구분 | 내용 | 가능 여부/비고 |
토픽 네이밍 | 기본은 서버/DB/테이블 기반 토픽명 생성 |
✅ SMT로 패턴 치환/이름 변경 가능
|
토픽 라우팅 1:1 | 한 테이블 → 한 토픽 | ✅ 기본 동작 |
토픽 라우팅 N:1 | 여러 테이블 → 하나의 토픽으로 합치기 |
✅ 라우팅 SMT로 매핑
|
토픽 라우팅 1→N(중 하나로 분기) | 조건에 따라 여러 후보 중 하나의 토픽으로 라우팅 |
✅ 라우팅 SMT 조합으로 가능
|
토픽 라우팅 1→N(동시 복제/팬아웃) | 동시에 여러 토픽으로 복제 |
❌ SMT 단독 불가
→ Kafka Streams/ksqlDB 사용 → 여러 싱크가 동일 소스 토픽 읽기 |
Debezium Server
- Kafka Connect 없이 돌아가는 준비된 실행 앱
- Debezium 소스 커넥터를 붙여 DB 변경을 읽고, 메시지큐로 바로 보냄 (Kafka 말고 다른 제품 가능)
Debezium Engine
- Kafka Connect 없이 Debezium 커넥터를 자바 애플리케이션에 라이브러리로 직접 돌리는 방식
- 앱 안에서 DB 변경 이벤트를 콜백으로 직접 소비하거나 다른 브로커로 보내고 싶을 때 유용
예제) java application
더보기
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.format.Json;
import java.util.Properties;
import java.util.concurrent.Executors;
public class DebeziumEmbedded {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
// 엔진/오프셋 저장
props.setProperty("name", "my-embedded-engine");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/tmp/debezium-offsets.dat");
props.setProperty("offset.flush.interval.ms", "1000");
// 커넥터 지정 (예: MySQL)
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("database.hostname", "mysql");
props.setProperty("database.port", "3306");
props.setProperty("database.user", "user");
props.setProperty("database.password", "pass");
props.setProperty("database.server.id", "184054");
props.setProperty("topic.prefix", "dbz.mysql");
props.setProperty("database.include.list", "shop");
props.setProperty("table.include.list", "shop.orders");
DebeziumEngine<ChangeEvent<String, String>> engine =
DebeziumEngine.create(Json.class)
.using(props)
.notifying(record -> {
// 여기서 바로 처리하거나 다른 브로커로 전송
System.out.println(record.value());
})
.build();
var exec = Executors.newSingleThreadExecutor();
exec.execute(engine);
// 종료 시: engine.close(); exec.shutdown();
}
}
2. Feature
기능 | 설명 | |
스냅샷 |
초기 실행 시 현재 상태 스냅샷(로그가 오래되어 일부 없을 때 유용)
증분 스냅샷도 지원(런타임 트리거 가능) |
|
필터링 |
스키마/테이블/컬럼 단위 include/exclude 필터
|
|
마스킹 |
특정 민감 컬럼 값 마스킹
|
|
모니터링 |
대부분 JMX로 상태/지표 모니터링
|
|
SMT 제공 | 라우팅/필터링/이벤트 평탄화 등 | 예: ExtractNewRecordState로 after만 남기기 |
표) SMT 클래스
더보기
목적 | SMT 클래스 (value 기준) | 메모 |
필드 제거/선택 | ReplaceField$Value |
화이트리스트/블랙리스트
|
필드 마스킹 | MaskField$Value | PII 가림 |
필드 추출 | ExtractField$Value |
중첩에서 특정 필드만
|
구조 평탄화 | Flatten$Value | nested → flat |
값→키 승격 | ValueToKey |
특정 필드를 레코드 키로
|
토픽명 변경 | RegexRouter |
정규식으로 rename/합치기(N:1)
|
타임스탬프 라우팅 | TimestampRouter | 날짜별 토픽 구분 |
헤더/필드 추가 | InsertField$Value/Key |
처리시각/정적값 주입
|
Debezium 언랩 | io.debezium.transforms.ExtractNewRecordState |
after-only 페이로드로 단순화
|
Debezium outbox | io.debezium.transforms.outbox.EventRouter | Outbox 패턴 전용 |
예시) SMT - after-only로 언랩 + 토픽명 변경
더보기
{
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "drop",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "^(.*)$",
"transforms.route.replacement": "$1.v1"
}
출처
'DevOps > Kafka' 카테고리의 다른 글
[Kafka] Dead Letter Queue (2) | 2025.08.15 |
---|---|
[MongoDB Kafka Connector] 1. Kafka Connector (0) | 2025.08.10 |
[Kafka] 9. Kafka Streams (0) | 2025.05.12 |
[실전 카프카 개발부터 운영까지] 6. 컨슈머의 내부 동작 원리와 구현 (0) | 2025.05.08 |
[실전 카프카 개발부터 운영까지] 5. 프로듀서의 내부 동작 원리와 구현 (0) | 2025.05.07 |