使用范围
ProcessWindowFunction是作用在keyed (grouped) and windows的数据流上
代码
package com.yy.Channel
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.util.Random
// 定义输入数据样例类
case class MarketUserBehavior(userId: String, behavior: String, channel: String, timestamp: Long)
// 定义输出数据样例类
case class MarketViewCount(windowStart: String, windowEnd: String, channel: String, behavior: String, count: Long)
// 自定义 测试 数据源
class SimulatedSource() extends RichSourceFunction[MarketUserBehavior] {
var running = true
val behaviorSet = Seq("view", "download", "install", "uninstall")
val channelSet = Seq("appstore", "weibo", "wechat", "tieba")
val rand = Random
override def run(ctx: SourceFunction.sourceContext[MarketUserBehavior]): Unit = {
val maxCount = Long.MaxValue
var count = 0L
while (running && count < maxCount) {
val id = java.util.UUID.randomUUID().toString
val behavior = behaviorSet(rand.nextInt(behaviorSet.size))
val channel = channelSet(rand.nextInt(channelSet.size))
val ts = System.currentTimeMillis()
ctx.collect(MarketUserBehavior(id, behavior, channel, ts))
count += 1
Thread.sleep(50L)
}
}
override def cancel(): Unit = running = false
}
/**
* 统计 同(chanel,behavior) 分组后 窗口内每个分组的个数
*/
object AppMarketByChannel {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream = env.addSource(new SimulatedSource)
.assignAscendingTimestamps(_.timestamp)
val resultStream = dataStream
.filter(_.behavior != "uninstall")
.keyBy(x => (x.channel, x.behavior))
.timeWindow(Time.days(1), Time.seconds(5))
.process(new MarketCountByChannel())
resultStream.print()
env.execute("任务名称")
}
}
class MarketCountByChannel extends ProcessWindowFunction[MarketUserBehavior, MarketViewCount, (String, String), TimeWindow]() {
override def process(key: (String, String), context: Context, elements: Iterable[MarketUserBehavior], out: Collector[MarketViewCount]): Unit = {
val start = context.window.getStart.toString
val end = context.window.getEnd.toString
val count = elements.size
out.collect(MarketViewCount(start, end, key._1, key._2, count))
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。