박종훈 기술블로그

Debezium 에서 이벤트 필터링 및 데이터 수정하기 (with SMT, Single Message Transformations)

Debezium 에서 이벤트 필터링 및 데이터 수정하기

최근 Debezium 으로 DB 변경사항 캡처하기 글과 Debezium Sink Connector로 DB 변경사항을 다른 DB에 자동으로 동기화하는 방법 글을 작성하였다.

여기서 조금 더 나아가서 이벤트를 필터링 하거나, 데이터를 수정할 수 있는 방법에 대해서 알아보겠다.

물론 Application에서 처리를 한다면 더 높은 자유도가 있을 것이다. 하지만, 이 글을 작성하면서 어플리케이션의 변경은 최소화 하자는 생각을 가지고 테스트를 진행하였기 때문에, Connector 자체적으로 처리하는 방법에 대해서 소개한다.

SMT

SMT는 Single Message Transformations 의 약자이다. SMT를 사용하여 소스 커넥터가 Kafka에 기록되기 전에나, 싱크 커넥터로 전송되기 전 메시지를 변환할 수 있다.

시나리오

다음과 같은 시나리오를 생각해본다.

A DB는 한국, B DB는 미국에 있다고 해보자. A DB에서 customer 테이블에 대해 변경된 사항을 B DB으로 전송 하고, B DB에서 customer 테이블에 대해 변경된 사항을 A DB로 전송 해서 서로의 싱크를 맞춘다.

이 때, 테이블의 변경 사항이 유저에 의해 발생 된 것인지 된 것인지, 변경사항 싱크를 위해 발생된 것인지 알지 못한다면 계속 업데이트 되는 상황이 발생될 것이다.

이걸 어떻게 해결할 수 있을까?

해결방향

만약 데이터가 cdc sync 에 의해 변경된 것이라면, source connector 에서 무시하도록 하고 그런 것이 아니라면 sink connector 를 통해 데이터를 업데이트 한다.

이를 위해 테이블에 is_cdc_sync 라는 컬럼을 추가한다. bit(1) 로 설정하였다. 해당 컬럼이 0 이라면 사용자에 의한 업데이트라고 가정하고, 1 이라면 cdc sync 에 의한 업데이트라고 가정한다.

시스템 구성 및 커넥터 설정

SMT를 사용하기 위해서는 connect에 ENABLE_DEBEZIUM_SCRIPTING 환경변수를 주입해줘야 한다.

connect:
  image: quay.io/debezium/connect:3.2.1.Final
  container_name: kafka_connect
  ports:
    - "8083:8083"
  depends_on:
    - kafka-1
    - kafka-2
    - kafka-3
    - mysql
  environment:
    BOOTSTRAP_SERVERS: "kafka-1:29092,kafka-2:29093,kafka-3:29094"
    GROUP_ID: 1
    CONFIG_STORAGE_TOPIC: my_connect_configs
    OFFSET_STORAGE_TOPIC: my_connect_offsets
    STATUS_STORAGE_TOPIC: my_connect_statuses
    ADVERTISED_HOST_NAME: connect
    ENABLE_DEBEZIUM_SCRIPTING: true
  networks:
    - kafka-net

그리고 connect 를 bash로 접근하여 groovy 관련 jar를 추가해준다. debezium-connector-mysql 와 debezium-connector-jdbc 에 아래와 같이 jar를 다운로드 하여 추가해주었다.

  • groovy 외에도 다른 스크립트 언어도 지원하지만, 이 글에서는 groovy를 사용한다.
cd ~/connect/debezium-connector-mysql
curl -O https://repo1.maven.org/maven2/org/apache/groovy/groovy/4.0.28/groovy-4.0.28.jar
curl -O https://repo1.maven.org/maven2/org/apache/groovy/groovy-jsr223/4.0.28/groovy-jsr223-4.0.28.jar

cd ~/connect/debezium-connector-jdbc
curl -O https://repo1.maven.org/maven2/org/apache/groovy/groovy/4.0.28/groovy-4.0.28.jar
curl -O https://repo1.maven.org/maven2/org/apache/groovy/groovy-jsr223/4.0.28/groovy-jsr223-4.0.28.jar

필터링 (Source Connector)

[Note] 시스템 구성 및 커넥터 설정 을 마치고 이 섹션을 참고한다.

참고: Debezium - Message Filtering

아래와 같이 커넥터를 업데이트 한다.

curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/mysql-inventory-connector -d '{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "debeziumrootpassword",
    "database.server.id": "184054",
    "topic.prefix": "dbserver1",
    "database.server.name": "mysql-server-1",
    "database.include.list": "mydb",
    "table.include.list": "mydb.customers",
    "schema.history.internal.kafka.bootstrap.servers": "kafka-1:29092,kafka-2:29093,kafka-3:29094",
    "schema.history.internal.kafka.topic": "dbhistory.inventory",
    "include.schema.changes": true,
    "transforms": "filter",
    "transforms.filter.language": "jsr223.groovy",
    "transforms.filter.type": "io.debezium.transforms.Filter",
    "transforms.filter.condition": "value.after.is_cdc_sync == 0"
  }'

아래 4줄이 핵심 파트이다.

"transforms": "filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.condition": "value.after.is_cdc_sync == 0"

이렇게 커넥터 설정이 업데이트가 되었다면 value.after.is_cdc_sync == 0 일 때만 kafka topic에 produce 를 한다.

데이터 수정 (Sink Connector)

[Note] 시스템 구성 및 커넥터 설정 을 마치고 이 섹션을 참고한다.

아래와 같이 커넥터를 업데이트 한다.

curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/my-debezium-sink-connector/config -d '{
    "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://mysql2:3306/mydb?useSSL=false&allowPublicKeyRetrieval=true",
    "connection.username": "mysqluser",
    "connection.password": "mysqlpw",
    "insert.mode": "upsert",
    "delete.enabled": "true",
    "primary.key.mode": "record_key",
    "schema.evolution": "basic",
    "use.time.zone": "UTC",
    "topics": "dbserver1.mydb.customers",
    "table.name.format": "customers",
    "transforms": "unwrap,removeField,insertField,cast",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "true",
    "transforms.unwrap.delete.handling.mode": "none",
    "transforms.removeField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.removeField.exclude": "is_cdc_sync",
    "transforms.insertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.insertField.static.field": "is_cdc_sync",
    "transforms.insertField.static.value": 1,
    "transforms.cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
    "transforms.cast.spec": "is_cdc_sync:int8"
  }'

아래 transforms 부분이 핵심 파트이다.

"transforms": "unwrap,removeField,insertField,cast",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "none",
"transforms.removeField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.removeField.exclude": "is_cdc_sync",
"transforms.insertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertField.static.field": "is_cdc_sync",
"transforms.insertField.static.value": 1,
"transforms.cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.cast.spec": "is_cdc_sync:int8"

unwrap 과정을 통해 레코드를 추출하고, is_cdc_sync 를 강제로 1로 덮어씌운다. (삭제 후 재추가 과정을 통해) kafka connector 설정에서는 1으로 넣어도 문자열로 변환되기 때문에, Cast를 해줘야 한다는 부분도 주의할 포인트이다.

이를 통해 cdc sync 로 발생된 데이터 변경건 일 경우 is_cdc_sync1로 기록되도록 한다. 이제 이 변경 사항이 cdc 과정중 캡처되겠지만, 이후 필터링에 걸려 topic에 produce 되지는 않게 되었다.

데이터를 확인해보면 실제로 값이 1 인 상태로 동기화 되고 있는 것을 볼 수 있다.

단점

Application에서 사용자가 업데이트 할 경우에는 is_cdc_sync 컬럼을 0 으로 업데이트 해줘야 한다. 만약 사용자가 업데이트 했음에도 is_cdc_sync 컬럼의 값이 1인 상태라면 Source Connector 의 필터링을 통과하지 못할 것이다.

이런식으로 Application 에서 신경쓰는게 번거롭다면 trigger를 사용해보는것도 하나의 방법이 될 것이다.

마무리

Debezium 에서 SMT를 사용한 이벤트 필터링 및 데이터 수정 방법에 대해서 알아보았다.

해당 부분들을 어플리케이션에서 직접 제어하는 할 수도 있겠지만, 커넥터 레벨에서 처리할 수 있다는 포인트도 상황에 따라 유용하게 쓸 수 있을 것 같다.