微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink 1.12 之后的窗口聚合

提示文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

 

文章目录

前言

一、默认时间语义的变化

二、水印策略和时间戳方法变化


前言

Flink 1.12 版本之后,窗口聚合操作的变化


提示:以下是本篇文章正文内容,下面案例可供参考

一、认时间语义的变化

Flink 1.12 将认的时间语义从处理时间转为事件时间,并将设置时间语义的API设置为 @Deprecated 状态

@Deprecated
@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
    this.timeCharacteristic = (TimeCharacteristic)Preconditions.checkNotNull(characteristic);
    if (characteristic == TimeCharacteristic.ProcessingTime) {
       this.getConfig().setAutoWatermarkInterval(0L);
    } else {
       this.getConfig().setAutoWatermarkInterval(200L);
    }
}

二、水印策略和时间戳方法变化

源码中 assignTimestampsAndWatermarks 方法的变化,原本 AssignerWithPeriodicWatermarks 入参构造方式被标记 @Deprecated,采用新的对象 WatermarkStrategy 来构造,并且新增了 withIdleness(Duration idleTimeout) 方法解决因为上游某个并行算子无数据而导致的下游无法继续进行的情况,例如,从kafka消费时,共有两个分区,并且做10s的窗口聚合操作,而其中一个分区因为无数据进入,导致水位线比另外一个分区晚了15s,导致上一个时间窗口一直无法关闭,使窗口聚合算子无法继续往下走,无法输出从另一个分区消费数据聚合的结果。withIdleness(Duration idleTimeout) 设置可以在 idleTimeOut 时间之后,将这个无数据分区设置为空闲状态,不在等待这个分区下来的水位线。

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy) {
        WatermarkStrategy<T> cleanedStrategy = (WatermarkStrategy)this.clean(watermarkStrategy);
        int inputParallelism = this.getTransformation().getParallelism();
        TimestampsAndWatermarksTransformation<T> transformation = new TimestampsAndWatermarksTransformation("Timestamps/Watermarks", inputParallelism, this.getTransformation(), cleanedStrategy);
        this.getExecutionEnvironment().addOperator(transformation);
        return new SingleOutputStreamOperator(this.getExecutionEnvironment(), transformation);
}

/** 
* @deprecated 
*/
@Deprecated
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
        AssignerWithPeriodicWatermarks<T> cleanedAssigner = (AssignerWithPeriodicWatermarks)this.clean(timestampAndWatermarkAssigner);
        WatermarkStrategy<T> wms = new Strategy(cleanedAssigner);
        return this.assignTimestampsAndWatermarks((WatermarkStrategy)wms);
}
DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        })
                // flink 1.12 中水印策略与时间戳获取
                .assignTimestampsAndWatermarks(WatermarkStrategy.<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        // 在数据一段时间内没有到底,特别是向kafka多个分区中,某些分区数据已经是最新的,某些分区迟迟没有最新数据到底
                        // 根据 flink 下游水位规则,会取最小值,因此容易使下游算子不能继续往下走,因此引入了超时机制,一段时间内没有数据就将把这个上有sink标记为空闲
                        // 不在等待这个sink的水位标记到达
                        .withIdleness(Duration.ofSeconds(10))
                .withTimestampAssigner(new SerializableTimestampAssigner<SensorReading>() {
                    @Override
                    public long extractTimestamp(SensorReading sensorReading, long l) {
                        return sensorReading.getTimestamp() * 1000;
                    }
                }));

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐