微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

flink ProcessWindowFunction使用举例



使用范围


ProcessWindowFunction是作用在keyed (grouped) and windows的数据流上

 

代码

package com.yy.Channel

import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.util.Random

// 定义输入数据样例类
case class MarketUserBehavior(userId: String, behavior: String, channel: String, timestamp: Long)

// 定义输出数据样例类
case class MarketViewCount(windowStart: String, windowEnd: String, channel: String, behavior: String, count: Long)


// 自定义 测试 数据源
class SimulatedSource() extends RichSourceFunction[MarketUserBehavior] {
    var running = true
    val behaviorSet = Seq("view", "download", "install", "uninstall")
    val channelSet = Seq("appstore", "weibo", "wechat", "tieba")
    val rand = Random

    override def run(ctx: SourceFunction.sourceContext[MarketUserBehavior]): Unit = {
        val maxCount = Long.MaxValue
        var count = 0L

        while (running && count < maxCount) {
            val id = java.util.UUID.randomUUID().toString
            val behavior = behaviorSet(rand.nextInt(behaviorSet.size))
            val channel = channelSet(rand.nextInt(channelSet.size))
            val ts = System.currentTimeMillis()

            ctx.collect(MarketUserBehavior(id, behavior, channel, ts))
            count += 1
            Thread.sleep(50L)
        }


    }

    override def cancel(): Unit = running = false
}

/**
 * 统计 同(chanel,behavior) 分组后 窗口内每个分组的个数
 */
object AppMarketByChannel {
    def main(args: Array[String]): Unit = {

        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        val dataStream = env.addSource(new SimulatedSource)
            .assignAscendingTimestamps(_.timestamp)
        val resultStream = dataStream
            .filter(_.behavior != "uninstall")
            .keyBy(x => (x.channel, x.behavior))
            .timeWindow(Time.days(1), Time.seconds(5))
            .process(new MarketCountByChannel())

        resultStream.print()


        env.execute("任务名称")
    }

}


class MarketCountByChannel extends ProcessWindowFunction[MarketUserBehavior, MarketViewCount, (String, String), TimeWindow]() {
    override def process(key: (String, String), context: Context, elements: Iterable[MarketUserBehavior], out: Collector[MarketViewCount]): Unit = {
        val start = context.window.getStart.toString
        val end = context.window.getEnd.toString
        val count = elements.size
        out.collect(MarketViewCount(start, end, key._1, key._2, count))
    }
}

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐