微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink自定义aggregate聚合函数的步骤

第一步:将dataStream转换城windowedStream


        // 从kafka读取数据
        val inputStream = env.addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties))
            .map(data => {
                val dataArray = data.split(",")
                UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
            })
            .assignAscendingTimestamps(_.timestamp * 1000L)
        
        // 对数据进行窗口聚合处理
        val aggStream: DataStream[ItemViewCount] = inputStream
            .filter(_.behavior == "pv") // 过滤出pv数据
            .keyBy(_.itemId)
            .timeWindow(Time.hours(1), Time.minutes(5)) // 开窗进行统计
            .aggregate(new CountAgg(), new WindowCountResult()) // 聚合出当前商品在时间窗口内的统计数量

第二步:自定义聚合函数

// 自定义的预聚合函数,来一条数据就加一
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long] {
    //add方法为累加器累加的方法,这里为最简单的+1操作
    override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1
    //初始化累加值
    override def createAccumulator(): Long = 0L
    //最后返回那个值,这里为accumulator
    override def getResult(accumulator: Long): Long = accumulator
    //分区处理的归并操作,这里将所有并处理的结果相加
    override def merge(a: Long, b: Long): Long = a + b
}

第三部:自定义窗口函数

// 自定义window function
class WindowCountResult() extends WindowFunction[Long, ItemViewCount, Long, TimeWindow] {
    //Long类型的Key为上一步的自定义累加器的返回值
    //Window为差给你扣类型,第一步中的没窗口类型,TimeWindow
    //input为接收的数据类型,此处为Long类型的迭代器
    //out为此方法返回的类型,此处为ItemViewCount样例类对象的集合
    override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
        //调用ItemViewCount样例类对象的构造器,依次构造出ItemViewCount样例类并返回
        out.collect(ItemViewCount(key, window.getEnd, input.iterator.next()))
    }
}

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐