我正在尝试使用Apache Kafka将事件从MySQL传输到Postgresql.尽管插入和更新工作正常,但我无法弄清楚如何从MysqL中删除记录并将此事件传输到Postgresql.
假设以下拓扑:
+-------------+
| |
| MysqL |
| |
+------+------+
|
|
|
+---------------v------------------+
| |
| Kafka Connect |
| (Debezium, JDBC connectors) |
| |
+---------------+------------------+
|
|
|
|
+-------v--------+
| |
| Postgresql |
| |
+----------------+
我正在使用以下docker图像;
> Apache-Zookeper
> Apache-Kafka
> Debezium/JDBC connectors
然后
# Start the application
export DEBEZIUM_VERSION=0.6
docker-compose up
# Start Postgresql connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json
# Start MysqL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json
docker-compose -f docker-compose-jdbc.yaml exec MysqL bash -c 'MysqL -u $MysqL_USER -p$MysqL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | [email protected] |
| 1003 | Edward | Walker | [email protected] |
| 1004 | Anne | Kretchmar | [email protected] |
+------+------------+-----------+-----------------------+
docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
Thomas | 1001 | Sally | sally.thomas@acme.com
Bailey | 1002 | George | [email protected]
Walker | 1003 | Edward | [email protected]
Kretchmar | 1004 | Anne | [email protected]
(4 rows)
docker-compose -f docker-compose-jdbc.yaml exec MysqL bash -c 'MysqL -u $MysqL_USER -p$MysqL_PASSWORD inventory'
MysqL> delete from customers where id = 1004;
docker-compose -f docker-compose-jdbc.yaml exec MysqL bash -c 'MysqL -u $MysqL_USER -p$MysqL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | [email protected] |
| 1003 | Edward | Walker | [email protected] |
+------+------------+-----------+-----------------------+
虽然该记录已从MysqL中删除,但该条目仍会出现在Postgressql中
docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
Thomas | 1001 | Sally | sally.thomas@acme.com
Bailey | 1002 | George | [email protected]
Walker | 1003 | Edward | [email protected]
Kretchmar | 1004 | Anne | [email protected]
(4 rows)
我知道支持软删除,是否可以从Postgressql中完全删除该特定条目(通过Apache-Kafka从MysqL流式传输del事件)?
编辑:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.MysqL.MysqLConnector",
"tasks.max": "1",
"database.hostname": "MysqL",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "customers",
"connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}
我也尝试设置“pk.mode”:“record_key”和“delete.enabled”:“true”(bug fix suggestion),但这种修改似乎不起作用.
解决方法:
Confluent JDBC接收器当前不支持删除.有一个待处理的拉取请求(您已经链接到它),但尚未合并.
目前,您可以自己构建基于该分支的JDBC接收器连接器,或者创建一个简单的自定义接收器连接器,它只通过在目标数据库上执行相应的DELETE语句来处理逻辑删除事件.
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。