一个同学在群里问的这个问题,刚好好久没写过window 的sql了,玩一玩
手上没有环境,一起从简了
输出直接到 console 窗口
-- flink window tvf calc pv&uv
create table if not exists datagen_source (
id int
,name string
,sex string
,age int
,birthday string
,proc_time as proctime()
) with (
'connector' = 'datagen'
,'rows-per-second' = '10000'
,'fields.id.kind' = 'random'
,'fields.id.min' = '1'
,'fields.id.max' = '2000000'
);
create table if not exists print_sink(
start_time string
,end_time string
,pv bigint
,uv bigint
) with (
'connector' = 'print'
);
insert into print_sink
select
date_format(window_start, 'HH:mm:ss')
, date_format(window_end, 'HH:mm:ss')
, count(id)
, count(distinct id)
FROM TABLE(
TUMBLE(TABLE datagen_source, DESCRIPTOR(proc_time), INTERVAL '10' SECOND ))
GROUP BY window_start, window_end
union all
select
date_format(window_start, 'HH:mm:ss')
, date_format(window_end, 'HH:mm:ss')
, count(id)
, count(distinct id)
FROM TABLE(
TUMBLE(TABLE datagen_source, DESCRIPTOR(proc_time), INTERVAL '20' SECOND ))
GROUP BY window_start, window_end
union all
select
date_format(window_start, 'HH:mm:ss')
, date_format(window_end, 'HH:mm:ss')
, count(id)
, count(distinct id)
FROM TABLE(
TUMBLE(TABLE datagen_source, DESCRIPTOR(proc_time), INTERVAL '30' SECOND ))
GROUP BY window_start, window_end
;
查看结果:
+I[10:45:00, 10:45:20, 20000, 19900] # 20 s +I[10:45:10, 10:45:20, 20000, 19913] # 10 s +I[10:45:00, 10:45:30, 120000, 116420] # 30 s +I[10:45:20, 10:45:30, 100000, 97497] +I[10:45:30, 10:45:40, 100000, 97558] +I[10:45:20, 10:45:40, 200000, 190314]
流图:

功能倒是实现了,有点麻烦的是,现在 sql api 没有 trigger,不能中途输出计算结果,几分钟的窗口结束的时候输出数据还可以,小时、天的窗口,要窗口结束才输出一次结果,那还不如跑离线
注: Window TVF 支持 GROUPING SETS、ROLLUP、CUBE
欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。


