DevOps/Kafka

[Debezium] 1. Introduction

noahkim_ 2025. 8. 11. 17:10
  • 다양한 데이터베이스의 CDC를 위한 오픈소스 커넥터 모음 (Kafka Connect 위에서 동작함)
  • 각 데이터베이스의 로그를 기반으로 한 변경 스트림을 읽음

 

장점

항목 설명
완전성
모든 데이터 변경을 누락 없이 캡처
 
낮은 지연
폴링 대비 매우 낮은 레이턴시
MySQL·Postgres 사례: ms 수준
스키마 변경 불필요
Last Updated 같은 컬럼 추가 불필요
 
삭제 캡처
DELETE 이벤트도 포함
 
메타데이터 제공
DB/설정에 따라 포함 가능
이전 레코드 상태, 트랜잭션 ID, 유발 쿼리 등

 

1. Architecture

  • Kafka Connect 위에서 동작하는 소스 커넥터로 배포됨
구분 내용 가능 여부/비고
토픽 네이밍 기본은 서버/DB/테이블 기반 토픽명 생성
✅ SMT로 패턴 치환/이름 변경 가능
토픽 라우팅 1:1 한 테이블 → 한 토픽 ✅ 기본 동작
토픽 라우팅 N:1 여러 테이블 → 하나의 토픽으로 합치기
✅ 라우팅 SMT로 매핑
토픽 라우팅 1→N(중 하나로 분기) 조건에 따라 여러 후보 중 하나의 토픽으로 라우팅
✅ 라우팅 SMT 조합으로 가능
토픽 라우팅 1→N(동시 복제/팬아웃) 동시에 여러 토픽으로 복제
❌ SMT 단독 불가
→ Kafka Streams/ksqlDB 사용
 여러 싱크가 동일 소스 토픽 읽기

 

 

Debezium Server

  • Kafka Connect 없이 돌아가는 준비된 실행 앱
  • Debezium 소스 커넥터를 붙여 DB 변경을 읽고, 메시지큐로 바로 보냄 (Kafka 말고 다른 제품 가능)

 

Debezium Engine

  • Kafka Connect 없이 Debezium 커넥터를 자바 애플리케이션에 라이브러리로 직접 돌리는 방식
  • 앱 안에서 DB 변경 이벤트를 콜백으로 직접 소비하거나 다른 브로커로 보내고 싶을 때 유용

 

예제) java application

더보기
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.format.Json;

import java.util.Properties;
import java.util.concurrent.Executors;

public class DebeziumEmbedded {
  public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    // 엔진/오프셋 저장
    props.setProperty("name", "my-embedded-engine");
    props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
    props.setProperty("offset.storage.file.filename", "/tmp/debezium-offsets.dat");
    props.setProperty("offset.flush.interval.ms", "1000");

    // 커넥터 지정 (예: MySQL)
    props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
    props.setProperty("database.hostname", "mysql");
    props.setProperty("database.port", "3306");
    props.setProperty("database.user", "user");
    props.setProperty("database.password", "pass");
    props.setProperty("database.server.id", "184054");
    props.setProperty("topic.prefix", "dbz.mysql");
    props.setProperty("database.include.list", "shop");
    props.setProperty("table.include.list", "shop.orders");

    DebeziumEngine<ChangeEvent<String, String>> engine =
        DebeziumEngine.create(Json.class)
            .using(props)
            .notifying(record -> {
              // 여기서 바로 처리하거나 다른 브로커로 전송
              System.out.println(record.value());
            })
            .build();

    var exec = Executors.newSingleThreadExecutor();
    exec.execute(engine);

    // 종료 시: engine.close(); exec.shutdown();
  }
}

 

2. Feature

기능 설명  
스냅샷
초기 실행 시 현재 상태 스냅샷(로그가 오래되어 일부 없을 때 유용)
증분 스냅샷도 지원(런타임 트리거 가능)
 
필터링
스키마/테이블/컬럼 단위 include/exclude 필터
 
마스킹
특정 민감 컬럼 값 마스킹
 
모니터링
대부분 JMX로 상태/지표 모니터링
 
SMT 제공 라우팅/필터링/이벤트 평탄화 등 예: ExtractNewRecordState로 after만 남기기

 

표) SMT 클래스

더보기
목적 SMT 클래스 (value 기준) 메모
필드 제거/선택 ReplaceField$Value
화이트리스트/블랙리스트
필드 마스킹 MaskField$Value PII 가림
필드 추출 ExtractField$Value
중첩에서 특정 필드만
구조 평탄화 Flatten$Value nested → flat
값→키 승격 ValueToKey
특정 필드를 레코드 키로
토픽명 변경 RegexRouter
정규식으로 rename/합치기(N:1)
타임스탬프 라우팅 TimestampRouter 날짜별 토픽 구분
헤더/필드 추가 InsertField$Value/Key
처리시각/정적값 주입
Debezium 언랩 io.debezium.transforms.ExtractNewRecordState
after-only 페이로드로 단순화
Debezium outbox io.debezium.transforms.outbox.EventRouter Outbox 패턴 전용

 

예시) SMT - after-only로 언랩 + 토픽명 변경

더보기
{
  "transforms": "unwrap,route",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.drop.tombstones": "true",
  "transforms.unwrap.delete.handling.mode": "drop",
  "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.route.regex": "^(.*)$",
  "transforms.route.replacement": "$1.v1"
}

 

 

출처