微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

flink-doris-connector flink1.13.1

 doris 官文: https://doris.apache.org/ecosystem/flink-doris-connector.html#how-to-use

  依赖

     <!--flink-doris-connector-->
        <dependency>
            <groupId>org.apache.doris</groupId>
            <!--<artifactId>flink-doris-connector-1.14_2.12</artifactId>-->
            <artifactId>flink-doris-connector-1.13_2.12</artifactId>
            <!--<artifactId>flink-doris-connector-1.12_2.12</artifactId>-->
            <!--<artifactId>flink-doris-connector-1.11_2.12</artifactId>-->
            <version>1.0.3</version>
        </dependency>

 

source :API

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties properties = new Properties();
        properties.put("fenodes","192.168.18.51:8030");
        properties.put("username","root");
        properties.put("password","root");
        properties.put("table.identifier","test.top");

        DataStreamSource<List<?>> listDataStreamSource = env.addSource(new DorisSourceFunction(
                        new DorisstreamOptions(properties),
                        new SimpleListDeserializationSchema()
                )
        );
        listDataStreamSource.print();
        env.execute();
    }

sql:

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executesql("CREATE TABLE flink_doris (\n" +
                "    siteid INT,\n" +
                "    citycode SMALLINT,\n" +
                "    username STRING,\n" +
                "    pv BIGINT\n" +
                "    ) \n" +
                "    WITH (\n" +
                "      'connector' = 'doris',\n" +
                "      'fenodes' = 'hadoop1:8030',\n" +
                "      'table.identifier' = 'test_db.table1',\n" +
                "      'username' = 'test',\n" +
                "      'password' = 'test'\n" +
                ")\n");
  tableEnv.executesql("select * from flink_doris").print();

 

sink:

API:    JsonSink

 

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties pro = new Properties();
        pro.setProperty("format", "json");
        pro.setProperty("strip_outer_array", "true");
        env.fromElements("{\"siteid\": \"66\", \"citycode\": \"6\", \"username\": \"pengyuyan\",\"pv\": \"6\"}")
                .addSink(
                        DorisSink.sink(
                                DorisReadOptions.builder().build(),
                                DorisExecutionoptions.builder()
                                        .setBatchSize(3)
                                        .setBatchIntervalMs(0L)
                                        .setMaxRetries(3)
                                        .setStreamloadProp(pro).build(),
                                DorisOptions.builder()
                                        .setFenodes("hadoop1:8030")
                                        .setTableIdentifier("test_db.table1")
                                        .setUsername("test")
                                        .setPassword("test").build()
                        ));
//            .addSink(
//                DorisSink.sink(
//                        DorisOptions.builder()
//                                .setFenodes("hadoop1:8030")
//                                .setTableIdentifier("test_db.table1")
//                                .setUsername("test")
//                                .setPassword("test").build()
//                ));
        env.execute();
    }

 

RowData

 

 

public class DataStreamRowDataSinkDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        DataStream<RowData> source = env.fromElements("")
                .map(new MapFunction<String, RowData>() {
                    @Override
                    public RowData map(String value) throws Exception {
                        GenericRowData genericRowData = new GenericRowData(4);
                        genericRowData.setField(0, 88);
                        genericRowData.setField(1, new Short("8"));
                        genericRowData.setField(2, StringData.fromString("flink-stream"));
                        genericRowData.setField(3, 8L);
                        return genericRowData;
                    }
                });
        LogicalType[] types = {new IntType(), new SmallIntType(), new VarCharType(32), new BigIntType()};
        String[] fields = {"siteid", "citycode", "username", "pv"};
        source.addSink(
                DorisSink.sink(
                        fields,
                        types,
                        DorisReadOptions.builder().build(),
                        DorisExecutionoptions.builder()
                                .setBatchSize(3)
                                .setBatchIntervalMs(0L)
                                .setMaxRetries(3)
                                .build(),
                        DorisOptions.builder()
                                .setFenodes("hadoop1:8030")
                                .setTableIdentifier("test_db.table1")
                                .setUsername("test")
                                .setPassword("test").build()
                ));
        env.execute();
    }
}

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐