最近flink 因为 其吞吐量 ,exactly once 特性 比较热门 ,尤其是 flink sql 的易于管理 和 复用的特点 ,都使得大数据团队最近更加喜欢选择flink 进行数据处理 分析等,其他的优势就不一一对比了,下面记录一下我用flink sql遇见的一个小问题。
flink sql的底层解析用的是apache calcite , hive sql 也用的calcite解析,因此 flinksql 的大致原理和我们常见的sql差不多。outer join的话 ,就是查出有结果的话 就返回结果,没有结果的话将没有结果的字段用null 补齐。
针对官方给出的flinksql的例子 进行演示
两个数据流分别是:
val orderA: DataStream[Order] = env.fromCollection(Seq(
Order(1L, "beer", 3),
Order(1L, "diaper", 4),
Order(1L, "rubber", 2)))
val orderB: DataStream[Order] = env.fromCollection(Seq(
Order(2L, "pen", 3),
Order(1L, "rubber", 3),
Order(4L, "beer", 1)))
注册成表:
// convert DataStream to Table
var tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
// register DataStream as Table
tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
查询:
val result = tEnv.sqlQuery(
s"SELECT a.`user`,OrderB.product,a.amount FROM $tableA AS a " +
s"FULL OUTER JOIN OrderB ON a.`user` = OrderB.`user`")
结果如下:
1> (true,null,pen,null)
2> (true,1,null,2)
4> (true,null,beer,null)
2> (true,1,null,4)
2> (false,1,null,2)
2> (true,1,rubber,2)
2> (false,1,null,4)
2> (true,1,rubber,4)
2> (true,1,rubber,3)
但是如果指定结果输出类型是 case class的话 只会输出join 结果 ,因为 case class 不支持null 类型 ,这里 我指定为row 作为输出类型:
result.toRetractStream[Row].print()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。