Search

CDC: 데이터 변경 수집 기술(with Debezium)

CDC(Change Data Capture)

데이터베이스에서 발생하는 변경사항(Insert / Update / Delete)을 감지해 다른 시스템으로 전달하는 기술
운영 DB와 분석 DB 실시간 동기화시 보통 CDC가 활용됨
기존 배치 ETL 방식은 매 정해진 시점에 전체 테이블을 스캔하거나 특정 조건으로 조회하여 데이터를 적재하는 구조라 실시간성이 떨어지고 불필요한 DB 부하가 발생함
CDC는 변경된 데이터만 이벤트 형태로 흘려보내므로 운영 DB 부하를 피하면서 실시간에 가까운 동기화 구현이 가능
크게 세가지 방식으로 나뉨
Query-based CDC
주기적으로 DB를 조회해 변경된 데이터를 가져와 타겟 데이터베이스로 전달하는 방식
실시간이 아님, 변경된 레코드를 정확하게 찾기 어려움, 소스 데이터베이스 조회 때문에 높은 부하 발생 등 현대에서는 잘 사용하지 않음
Trigger-based CDC
DB 내부 Trigger로 변경 이벤트 발생시 로그 테이블로 기록하는 방식
쉽게 구현 가능하고 실시간으로 기록됨
모든 변경이 DB 내부 트랜잭션에서 처리되서 운영 DB 성능에 직접적인 영향을 끼침
Log-based CDC
DB의 트랜잭션 로그(WAL, binlog, redo 등) 읽어 변경을 감지하는 방식
서비스 트랜잭션에 영향이 없고, 모든 변경사항을 정확하게 캡쳐 가능
로그 포맷이 DBMS 마다 달라서 파싱이 까다로움, 때문에 직접 구현하기 보다 Debezium 같은 CDC 솔루션 필요
데이터 플랫폼, 이벤트 기반 아키텍처에서 가장 많이 사용하는 방식으로 Debezium/Kafka 조합이 사실상 표준 기술

Kafka Connect

외부 시스템과 카프카 사이 데이터 이동을 표준화한 프레임워크
connect 없이 데이터 이동을 한다면 MySQL → Kafka → S3 구조일때
MySQL Connector 라이브러리 사용해 Producer 코드 작성하고, S3 업로드하는 Consumer 따로 개발
즉, 데이터 소스/타겟마다 새로운 코드를 작성해야한다
하지만 Kafka Connect를 사용하면, 개발자가 코드작성할 필요 없이 커넥터 설정만으로 데이터 파이프라인을 구성할 수 있음

Kafka Connect 아키텍처

Worker
Kafka Connect 프로세스 자체, 커넥터를 실행하는 런타임 환경
Connector
어떤 데이터를 어디서 어디로 이동시킬지 정의하는 단위
여러 설정, 실행 메서드 등 jar 패키지로 크게 두 타입 있음
Source Connector
외부 시스템 → Kafka 토픽, 프로듀서 역할
Sink Connector
Kafka 토픽 → 외부 시스템, 컨슈머 역할
Task
Connector가 실제로 일을 수행하는 실행 단위
하나의 커넥터는 여러 Task로 분할되어 병렬 처리됨
작동흐름
1.
Connector 설정을 Kafka Connect REST API로 등록
2.
Worker가 설정을 보고 Connector 인스턴스를 생성
3.
Task가 만들어져 데이터를 읽고/쓰는 작업 수행
4.
Kafka Connect는 각 토픽의 Offset, 상태, 오류 등을 Kafka 내부 topic에 따로 관리하여 내구성 보장

Debezium

Debezium은 Kafka Connect 위에서 동작하는 Log-based CDC 특화 Source Connector
Kafka Connect는 데이터 이동 플랫폼(런타임 엔진)이라면, 디비지움은 그 위에서 동작하는 DBMS 트랜잭션 로그 해석기(Log Reader) + CDC 이벤트 생성기라고 볼 수 있음

동작

각 DB의 트랜잭션 로그(WAL/binlog/redo log)를 읽고 Insert/Update/Delete 같은 변경 이벤트를 row 단위로 추출
이를 Kafka Connect Source Connector 프로토콜에 맞게 Kafka Topic에 Publish 하는 것
이벤트는 Schema + Payload 형태의 structured JSON/Avro 를 제공
테이블마다 하나의 Kafka Topic으로 매핑됨
[DB 변경][트랜잭션 로그] → Debezium → Kafka Connect → Kafka Topic(테이블 단위)
JSON
복사
로그에 포함된 정보
어떤 테이블이 변경되었는지
어떤 PK(row)인지
변경 전 값(before)
변경 후 값(after)
트랜잭션 commit 시각
operation type (c/u/d)
이벤트 예시와 각 필드
{ "schema": { ... }, "payload": { "before": {...}, "after": {...}, "source": {...}, "op": "c | u | d | r", "ts_ms": 1733188185000 } }
JSON
복사
필드
설명
before
변경 전 row 상태 (UPDATE/DELETE 에서 존재)
after
변경 후 row 상태 (INSERT/UPDATE에서 존재)
source
DB명, 테이블명, binlog 위치, transaction id 등 메타 정보
op
operation (c=create, u=update, d=delete, r=snapshot)
ts_ms
Kafka Connect가 이벤트를 생성한 timestamp(ms)

커넥터 설정

Debezium 커넥터는 Kafka Connect REST API에 JSON을 PUT/POST해서 등록
설정 파일 하나로 파이프라인을 정의할 수 있어 간편함
설정 예시
{ "name": "mysql-cdc-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "app-mysql.example.internal", "database.port": "3306", "database.user": "${CDC_DB_USER}", "database.password": "${CDC_DB_PASSWORD}", "database.server.id": "11001", "database.server.name": "dev.cdc-app", "topic.prefix": "dev.cdc-app", "database.include.list": "APP_DB,WAREHOUSE", "table.include.list": "APP_DB.PRODUCT,APP_DB.ORDER,WAREHOUSE.STOCK", "database.history.kafka.bootstrap.servers": "${KAFKA_BOOTSTRAP_SERVERS}", "database.history.kafka.topic": "dev.cdc-app.history", "schema.history.internal.kafka.bootstrap.servers": "${KAFKA_BOOTSTRAP_SERVERS}", "schema.history.internal.kafka.topic": "dev.cdc-app.schema-history", "schema.history.internal.skip.unparseable.ddl": "true", "include.schema.changes": "false", "time.precision.mode": "connect", "database.history.store.only.monitored.tables.ddl": "true", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.add.fields.prefix": "cdc_meta_", "transforms.unwrap.add.fields": "op,table", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "errors.log.include.messages": "true", "errors.retry.timeout": "30000", "errors.retry.delay.max.ms": "10000", "snapshot.locking.mode": "none", "snapshot.mode": "initial", "database.history.kafka.recovery.attempts": "4", "provide.transaction.metadata": "false", "decimal.handling.mode": "double", "include.schema.comments": "false", "database.history.kafka.recovery.poll.interval.ms": "100", "database.history.skip.unparseable.ddl": "true", "heartbeat.topics.prefix": "dev.cdc-app.heartbeat", "database.history.kafka.query.timeout.ms": "3000", "include.query": "false", "heartbeat.interval.ms": "10000", "database.ssl.mode": "disabled", "topic.creation.default.cleanup.policy": "delete", "topic.creation.default.replication.factor": "3", "topic.creation.default.partitions": "1" }
JSON
복사
주요 설정 항목 설명
항목
설명
예시 값
connector.class
사용할 Debezium 커넥터 종류
io.debezium.connector.mysql.MySqlConnector
tasks.max
병렬 실행할 Task 수
1
database.hostname
CDC 대상 DB 호스트
mysql.example.internal
database.port
DB 포트
3306
database.user / database.password
binlog 읽기용 계정
${CDC_USER}
database.server.id
MySQL replication client ID (유일해야 함)
11001
database.server.name
Debezium 인스턴스를 식별하는 이름 (Topic prefix 역할)
dev.cdc-app
topic.prefix
생성할 Topic prefix
dev.cdc-app
database.include.list
캡처할 DB 목록
APP_DB,WAREHOUSE
table.include.list
캡처할 테이블 목록
APP_DB.PRODUCT,APP_DB.ORDER
snapshot.mode
초기 스냅샷 동작 방식
initial
transforms
적용할 SMT 체인 목록
changes,unwrap 등
key.converter / value.converter
직렬화 포맷(JSON/Avro)
JsonConverter
*.schemas.enable
schema 포함 여부
false

SMT(Single Message Transform)

SMT는 Kafka Connect에서 메시지를 단건 단위로 변환하는 경량 모듈
코드 수정없이 설정만으로 간단한 메시지 transform이 가능
before/after 구조 → after만 남기기 (flatten)
tombstone 메시지 제거
변경된 컬럼만 추출
특정 헤더 값을 payload로 이동
민감한 컬럼 masking
topic명 변경
schema 제거
특정 필드 삭제 또는 rename

체인 사용 예시

"config": { ... "transforms": "changes,unwrap,moveHeadersToValue", "transforms.changes.type": "io.debezium.transforms.ExtractChangedRecordState", "transforms.changes.header.changed.name": "ChangedFields", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.add.fields.prefix": "cdc_meta_", "transforms.unwrap.add.fields": "op,table", "transforms.moveHeadersToValue.type": "io.debezium.transforms.HeaderToValue", "transforms.moveHeadersToValue.headers": "ChangedFields", "transforms.moveHeadersToValue.fields": "cdc_changed_fields", "transforms.moveHeadersToValue.operation": "move", ... } }
JSON
복사
SMT 체인은 transforms에 지정한 순서대로 실행됨
changes → unwrap → moveHeadersToValue
ExtractChangedRecordState (changes.*)
UPDATE 이벤트에서 어떤 컬럼이 변경되었는지를 헤더로 추출
before/after 비교해 변경된 컬럼명을 헤더에 ChangedFields 이름으로 추가
ChangedFields: ["status_code", "review_count"]
JSON
복사
ExtractNewRecordState (unwrap.*)
Debezium CDC 이벤트의 구조를 after 중심의 flat JSON 으로 변환
기본 이벤트 구조는 위에서 본 것처럼 다음과 같음
{ "schema": {...}, "payload": { "before": {...}, "after": {...}, "source": {...}, "op": "u", "ts_ms": 1234567 } }
JSON
복사
이 구조가 페이로드가 상당히 많아지고 복잡하기 때문에 unwrap을 통해 after 만 flat처리함
{ "product_id": 1002, "status_code": "40", "review_count": 27, "cdc_meta_op": "u", "cdc_meta_table": "PRODUCT" }
JSON
복사
이 단계를 통해 컨슈머가 바로 쓰기 좋은 단일 객체가 됨
HeaderToValue (moveHeadersToValue.*)
ChangedFields 헤더 값을 payload의 필드로 이동시키기
헤더로 추출한 ChangedFields 값을 위에 지정한것과 같이 cdc_changed_fields 라는 이름의 새 필드로 만들어 넣고 헤더에서는 삭제
최종적으로 아래와 같은 형태가 된다
{ "product_id": 1002, "status_code": "40", "review_count": 27, "cdc_meta_op": "u", "cdc_meta_table": "PRODUCT", "cdc_changed_fields": ["review_count", "status_code"] // 새로 생김 }
JSON
복사