The following code shows how to use toDataStream
for different scenarios.
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.types.Row; import java.time.Instant; // POJO with mutable fields // since no fully assigning constructor is defined, the field order // is alphabetical [event_time, name, score] public static class User { public String name; public Integer score; public Instant event_time; } tableEnv.executesql( "CREATE TABLE GeneratedTable " + "(" + " name STRING," + " score INT," + " event_time TIMESTAMP_LTZ(3)," + " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND" + ")" + "WITH ('connector'='datagen')"); Table table = tableEnv.from("GeneratedTable"); // === EXAMPLE 1 === // use the default conversion to instances of Row // since `event_time` is a single rowtime attribute, it is inserted into the DataStream // Metadata and watermarks are propagated DataStream<Row> dataStream = tableEnv.toDataStream(table); // === EXAMPLE 2 === // a data type is extracted from class `User`, // the planner reorders fields and inserts implicit casts where possible to convert internal // data structures to the desired structured type // since `event_time` is a single rowtime attribute, it is inserted into the DataStream // Metadata and watermarks are propagated DataStream<User> dataStream = tableEnv.toDataStream(table, User.class); // data types can be extracted reflectively as above or explicitly defined DataStream<User> dataStream = tableEnv.toDataStream( table, DataTypes.STRUCTURED( User.class, DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.INT()), DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3))));
Note that only non-updating tables are supported by toDataStream
. Usually, time-based operations such as windows, interval joins, or the MATCH_RECOGNIZE
clause are a good fit for insert-only pipelines next to simple operations like projections and filters.
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。