doris 官文: https://doris.apache.org/ecosystem/flink-doris-connector.html#how-to-use
依赖
<!--flink-doris-connector--> <dependency> <groupId>org.apache.doris</groupId> <!--<artifactId>flink-doris-connector-1.14_2.12</artifactId>--> <artifactId>flink-doris-connector-1.13_2.12</artifactId> <!--<artifactId>flink-doris-connector-1.12_2.12</artifactId>--> <!--<artifactId>flink-doris-connector-1.11_2.12</artifactId>--> <version>1.0.3</version> </dependency>
source :API
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Properties properties = new Properties(); properties.put("fenodes","192.168.18.51:8030"); properties.put("username","root"); properties.put("password","root"); properties.put("table.identifier","test.top"); DataStreamSource<List<?>> listDataStreamSource = env.addSource(new DorisSourceFunction( new DorisstreamOptions(properties), new SimpleListDeserializationSchema() ) ); listDataStreamSource.print(); env.execute(); }
sql:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executesql("CREATE TABLE flink_doris (\n" + " siteid INT,\n" + " citycode SMALLINT,\n" + " username STRING,\n" + " pv BIGINT\n" + " ) \n" + " WITH (\n" + " 'connector' = 'doris',\n" + " 'fenodes' = 'hadoop1:8030',\n" + " 'table.identifier' = 'test_db.table1',\n" + " 'username' = 'test',\n" + " 'password' = 'test'\n" + ")\n");
tableEnv.executesql("select * from flink_doris").print();
sink:
API: JsonSink
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Properties pro = new Properties(); pro.setProperty("format", "json"); pro.setProperty("strip_outer_array", "true"); env.fromElements("{\"siteid\": \"66\", \"citycode\": \"6\", \"username\": \"pengyuyan\",\"pv\": \"6\"}") .addSink( DorisSink.sink( DorisReadOptions.builder().build(), DorisExecutionoptions.builder() .setBatchSize(3) .setBatchIntervalMs(0L) .setMaxRetries(3) .setStreamloadProp(pro).build(), DorisOptions.builder() .setFenodes("hadoop1:8030") .setTableIdentifier("test_db.table1") .setUsername("test") .setPassword("test").build() )); // .addSink( // DorisSink.sink( // DorisOptions.builder() // .setFenodes("hadoop1:8030") // .setTableIdentifier("test_db.table1") // .setUsername("test") // .setPassword("test").build() // )); env.execute(); }
RowData
public class DataStreamRowDataSinkDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<RowData> source = env.fromElements("") .map(new MapFunction<String, RowData>() { @Override public RowData map(String value) throws Exception { GenericRowData genericRowData = new GenericRowData(4); genericRowData.setField(0, 88); genericRowData.setField(1, new Short("8")); genericRowData.setField(2, StringData.fromString("flink-stream")); genericRowData.setField(3, 8L); return genericRowData; } }); LogicalType[] types = {new IntType(), new SmallIntType(), new VarCharType(32), new BigIntType()}; String[] fields = {"siteid", "citycode", "username", "pv"}; source.addSink( DorisSink.sink( fields, types, DorisReadOptions.builder().build(), DorisExecutionoptions.builder() .setBatchSize(3) .setBatchIntervalMs(0L) .setMaxRetries(3) .build(), DorisOptions.builder() .setFenodes("hadoop1:8030") .setTableIdentifier("test_db.table1") .setUsername("test") .setPassword("test").build() )); env.execute(); } }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。