微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink 实现指定时长或消息条数的触发器

Flink 中窗口是很重要的一个功能,而窗口又经常配合触发器一起使用。

Flink 自带的触发器大概有:

CountTrigger: 指定条数触发

ContinuousEventTimeTrigger:指定事件时间触发
ContinuousProcessingTimeTrigger:指定处理时间触发

ProcessingTimeTrigger: 认触发器,窗口结束触发
EventTimeTrigger: 认处理时间触发器,窗口结束触发

NeverTrigger:全局窗口触发器,不触发

但是没有可以指定时间和条数一起作为触发条件的触发器,所有就自己实现了一个(参考:ProcessingTimeTrigger、CountTrigger)

先看下测试数据生成代码,有个判断语句,随机睡眠一定时间,控制条数和时间触发器的都可以触发

while (true) {
      val map = Map("id" -> i, "createTime" -> getCreateTime(), "amt" -> (MathUtil.random.nextInt(10) + "." + MathUtil.random.nextInt(10)))
      val jsonObject: JSONObject = new JSONObject(map)
      println(jsonObject.toString())
      // topic current_day
      val msg = new ProducerRecord[String, String]("current_day", jsonObject.toString())
      producer.send(msg)
      producer.flush()
      if (MathUtil.random.nextBoolean()) {
        Thread.sleep(1500)
      } else {
        Thread.sleep(500)

      }
      i = i + 1
      //      System.exit(-1)
    }

看下调用触发器的窗口代码

val stream = env.addSource(kafkaSource)
      .map(s => {
        s
      })
      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(60)))
      .trigger(CountAndTimeTrigger.of(10, Time.seconds(10)))
      .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
      override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
        var count = 0

        elements.iterator.foreach(s => {
          count += 1
        })
        logger.info("this trigger have : {} item", count)
      }
    })

很简单的一段代码:定义了一个60秒的窗口,触发器是自己实现的10条数据或者 10 秒触发一次的触发器,窗口函数输出窗口数据的条数

下面看下自定义触发器 CountAndTimeTrigger 的代码

/**
 * CountAndTimeTrigger : 满足一定条数和时间触发
 * 条数的触发使用计数器计数
 * 时间的触发,使用 flink 的 timerServer,注册触发器触发
 *
 * @param <W>
 */
public class CountAndTimeTrigger<W extends Window> extends Trigger<Object, W> {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    // 触发的条数
    private final long size;
    // 触发的时长
    private final long interval;
    private static final long serialVersionUID = 1L;
    // 条数计数器
    private final ReducingStateDescriptor<Long> countStateDesc =
            new ReducingStateDescriptor<>("count", new ReduceSum(), LongSerializer.INSTANCE);
    // 时间计数器,保存下一次触发的时间
    private final ReducingStateDescriptor<Long> timeStateDesc =
            new ReducingStateDescriptor<>("fire-interval", new ReduceMin(), LongSerializer.INSTANCE);

    public CountAndTimeTrigger(long size, long interval) {
        this.size = size;
        this.interval = interval;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        // 注册窗口结束的触发器, 不需要会自动触发
//        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        // count
        ReducingState<Long> count = ctx.getPartitionedState(countStateDesc);
        //interval
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);
        // 每条数据 counter + 1
        count.add(1L);
        if (count.get() >= size) {
            logger.info("countTrigger triggered, count : {}", count.get());
            // 满足条数的触发条件,先清 0 条数计数器
            count.clear();
            // 满足条数时也需要清除时间的触发器,如果不是创建结束的触发器
            if (fireTimestamp.get() != window.maxTimestamp()) {
//                logger.info("delete trigger : {}, {}", sdf.format(fireTimestamp.get()), fireTimestamp.get());
                ctx.deleteProcessingTimeTimer(fireTimestamp.get());
            }
            fireTimestamp.clear();
            // fire 触发计算
            return TriggerResult.FIRE;
        }

        // 触发之后,下一条数据进来才设置时间计数器注册下一次触发的时间
        timestamp = ctx.getCurrentProcessingTime();
        if (fireTimestamp.get() == null) {
//            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = timestamp + interval;
//            logger.info("register trigger : {}, {}", sdf.format(nextFireTimestamp), nextFireTimestamp);
            ctx.registerProcessingTimeTimer(nextFireTimestamp);
            fireTimestamp.add(nextFireTimestamp);
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {

        // count
        ReducingState<Long> count = ctx.getPartitionedState(countStateDesc);
        //interval
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);

        // time trigger and window end
        if (time == window.maxTimestamp()) {
            logger.info("window close : {}", time);
            // 窗口结束,清0条数和时间的计数器
            count.clear();
            ctx.deleteProcessingTimeTimer(fireTimestamp.get());
            fireTimestamp.clear();
            return TriggerResult.FIRE_AND_PURGE;
        } else if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
            logger.info("timeTrigger trigger, time : {}", time);
            // 时间计数器触发,清0条数和时间计数器
            count.clear();
            fireTimestamp.clear();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {

    }

    @Override
    public void onMerge(Window window, OnMergeContext ctx) {
        ctx.mergePartitionedState(countStateDesc);
        ctx.mergePartitionedState(timeStateDesc);
    }

    @Override
    public String toString() {
        return "CountAndContinuousProcessingTimeTrigger( maxCount:" + size + ",interval:" + interval + ")";
    }

    public static <W extends Window> CountAndTimeTrigger<W> of(long maxCount, Time interval) {
        return new CountAndTimeTrigger(maxCount, interval.toMilliseconds());
    }

    /**
     * 用于合并
     */
    private static class ReduceSum implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) {
            return value1 + value2;
        }
    }

    /**
     * 用于合并
     */
    private static class ReduceMin implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) {
            return Math.min(value1, value2);
        }
    }
}

主要是在数据进来的时候,调用  onElement 做条数的计数器,满足条件就触发, onProcessingTime 是 flink 的 timeservice 调用的,作为定时触发的触发器

在时间和条数的定时器都有清除时间和条数计数器的计数,让计数器在下一条数据到的时候,重新开始计数

特别需要注意:窗口结束的时候,会自动触发调用 onProcessingTime ,一定要包含在触发器逻辑里面,不然不能获取窗口的完整数据

// time trigger and window end
        if (time == window.maxTimestamp()) {
            logger.info("window close : {}", time);
            // 窗口结束,清0条数和时间的计数器
            count.clear();
            ctx.deleteProcessingTimeTimer(fireTimestamp.get());
            fireTimestamp.clear();
            return TriggerResult.FIRE_AND_PURGE;
        } e

如在获取到窗口触发时间是窗口的结束时间(即窗口的结束时间减1,Java的时间精度是到毫秒,如 10秒的窗口时间是:(00000, 10000)0000-10000 ,实际上窗口结束时间就是  9999)

看执行的结果:

 

从 “14:42:00,002 INFO - window close : 1573281719999” 窗口结束

到 “14:42:10,015 INFO - countTrigger triggered, count : 10 ” , “14:42:19,063 INFO - countTrigger triggered, count : 10”  条数触发

到 “14:42:36,499 INFO - timeTrigger trigger, time : 1573281756496” 时间触发

最后 窗口结束 “14:43:00,002 INFO - window close : 1573281779999”

搞定

 

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐