微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

JAVA整合FlinkCDC

 

官方地址:https://github.com/ververica/flink-cdc-connectors

 

版本至少jdk8

 

maven

<dependency>
  <groupId>com.ververica</groupId>
  <!-- add the dependency matching your database -->
  <artifactId>flink-connector-MysqL-cdc</artifactId>
  <!-- the dependency is available only for stable releases. -->
  <version>2.2-SNAPSHOT</version>
</dependency>

 

 

java代码

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.MysqL.source.MysqLSource;

public class MysqLSourceExample {
  public static void main(String[] args) throws Exception {
    MysqLSource<String> MysqLSource = MysqLSource.<String>builder()
            .hostname("yourHostname")
            .port(yourPort)
            .databaseList("yourDatabaseName") // set captured database
            .tableList("yourDatabaseName.yourTableName") // set captured table
            .username("yourUsername")
            .password("yourPassword")
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // enable checkpoint
    env.enableCheckpointing(3000);

    env
      .fromSource(MysqLSource, WatermarkStrategy.Nowatermarks(), "MysqL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

    env.execute("Print MysqL Snapshot + binlog");
  }
}

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐