官方地址:https://github.com/ververica/flink-cdc-connectors
版本至少jdk8
maven
<dependency> <groupId>com.ververica</groupId> <!-- add the dependency matching your database --> <artifactId>flink-connector-MysqL-cdc</artifactId> <!-- the dependency is available only for stable releases. --> <version>2.2-SNAPSHOT</version> </dependency>
java代码
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.connectors.MysqL.source.MysqLSource; public class MysqLSourceExample { public static void main(String[] args) throws Exception { MysqLSource<String> MysqLSource = MysqLSource.<String>builder() .hostname("yourHostname") .port(yourPort) .databaseList("yourDatabaseName") // set captured database .tableList("yourDatabaseName.yourTableName") // set captured table .username("yourUsername") .password("yourPassword") .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // enable checkpoint env.enableCheckpointing(3000); env .fromSource(MysqLSource, WatermarkStrategy.Nowatermarks(), "MysqL Source") // set 4 parallel source tasks .setParallelism(4) .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering env.execute("Print MysqL Snapshot + binlog"); } }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。