> 本系列每篇文章都是从一些实际的 case 出发,分析一些生产环境中经常会遇到的问题,抛砖引玉,以帮助小伙伴们解决一些实际问题。本文介绍 Flink 时间以及时区问题,分析了在天级别的窗口时会遇到的时区问题,如果对小伙伴有帮助的话,欢迎点赞 + 再看~
本文主要分为两部分:
第一部分(第 1 - 3 节)的分析主要针对 flink,分析了 flink 天级别窗口的中存在的时区问题以及解决方案。
第二部分(第 4 节)的分析可以作为所有时区问题的分析思路,主要以解决方案中的时区偏移量为什么是加 8 小时为案例做了通用的深度解析。
为了让读者能对本文探讨的问题有一个大致了解,本文先给出问题 sql,以及解决方案。后文给出详细的分析~
## 1.问题以及解决方案
### 问题 sql
sql 很简单,用来统计当天累计 uv。
```sql
--------------- 伪代码 ---------------
INSERT INTO
kafka_sink_table
SELECT
-- 窗口开始时间
CAST(
TUMBLE_START(proctime, INTERVAL '1' DAY) AS bigint
) AS window_start,
-- 当前记录处理的时间
cast(max(proctime) AS BIGINT) AS current_ts,
-- 每个桶内的 uv
count(disTINCT id) AS part_daily_full_uv
FROM
kafka_source_table
GROUP BY
mod(id, bucket_number),
-- bucket_number 为常数,根据具体场景指定具体数值
TUMBLE(proctime, INTERVAL '1' DAY)
--------------- 伪代码 ---------------
```
你是否能一眼看出这个 sql 所存在的问题?(PS:数据源以及数据汇时区都为东八区)
**没错,天级别窗口所存在的时区问题,即这段代码统计的不是楼主所在东八区一整天数据的 uv,这段代码统计的一整天的范围在东八区是第一天早 8 点至第二天早 8 点。**
### 解决方案
楼主目前所处时区为东八区,解决方案如下:
```sql
--------------- 伪代码 ---------------
CREATE VIEW view_table AS
SELECT
id,
-- 通过注入时间解决
-- 加上东八区的时间偏移量,设置注入时间为时间戳列
CAST(CURRENT_TIMESTAMP AS BIGINT) * 1000 + 8 * 60 * 60 * 1000 as ingest_time
FROM
source_table;
INSERT INTO
target_table
SELECT
CAST(
TUMBLE_START(ingest_time, INTERVAL '1' DAY) AS bigint
) AS window_start,
cast(max(ingest_time) AS BIGINT) - 8 * 3600 * 1000 AS current_ts,
count(disTINCT id) AS part_daily_full_uv
FROM
view_table
GROUP BY
mod(id, 1024),
-- 根据注入时间划分天级别窗口
TUMBLE(ingest_time, INTERVAL '1' DAY)
--------------- 伪代码 ---------------
```
通过上述方案,就可以将统计的数据时间范围调整为东八区的今日 0 点至明日 0 点。下文详细说明整个需求场景以及解决方案的实现和分析过程。
## 2.需求场景以及实现方案
### 需求场景
coming,需求场景比较简单,就是消费上游的一个埋点日志数据源,根据埋点中的 id 统计当天 0 点至当前时刻的累计 uv,按照分钟级别产出到下游 OLAP 引擎中进行简单的聚合,最后在 BI 看板进行展示,没有任何维度字段(感动到哭版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。