微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink_热门页面浏览数

先实现“热门页面浏览数”的统计,也就是读取服务器日志中的每一行 log ,统计在一段时间内用户访问每一个 url 的次数,然后排序输出显示。 具体做法为: 每隔 5 秒,输出最近 10 分钟内访问量最多的前 N 个 URL 。可以看出,这个需求与之前“实时热门商品统计”非常类似。 输入数据:

 

数据示例:
83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Map

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

//定义输入数据样例类
case class ApacheLogEvent(ip: String,
                          userId:String,
                          timestamp: Long,
                          method: String,
                          url: String )

//窗口聚合结果样例类
case class PageViewCount(url: String,
                         windowEnd: Long,
                         count: Long )

object HotPagesNetworkFlow {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream: DataStream[String] = env.readTextFile("D:\\Mywork\\workspace\\Project_idea\\UserBehaviorAnalysis0903\\HotItemsAnalysis\\src\\main\\resources\\apache.log")
    val dataStream: DataStream[ApacheLogEvent] = inputStream.map(data => {
      val arr: Array[String] = data.split(" ")
      //对时间时间进行转换,得到时间戳
      val simpleDateFormat = new SimpleDateFormat("dd/MM/yy:HH:mm:ss")
      val ts = simpleDateFormat.parse(arr(3)).getTime
      ApacheLogEvent(arr(0), arr(1), ts, arr(5), arr(6))
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(1)) {  //waterMark 1S
      override def extractTimestamp(element: ApacheLogEvent): Long = element.timestamp
    }).filter(data => {
      val pattern = "^((?!\\.(css|js)$).)*$".r //过略掉.css|.js结尾的文件
      (pattern findFirstIn data.url).nonEmpty
    })

    // 进行开窗聚合,以及排序输出
    val aggStream: DataStream[PageViewCount] = dataStream
      .filter(_.method == "GET") //代表访问
      .keyBy(_.url)
      .timeWindow(Time.minutes(10), Time.seconds(5))
      .allowedLateness(Time.minutes(1))
      .sideOutputLateData(new OutputTag[ApacheLogEvent]("late"))
      .aggregate(new PageCountAgg(), new PageViewCountwindowResult())

    val resultStream: DataStream[String] = aggStream
      .keyBy(_.windowEnd)
      .process(new TopNHotPages(5))
    resultStream.print()
    env.execute("hot pages job")

  }

}

class PageCountAgg() extends AggregateFunction[ApacheLogEvent, Long, Long] {
  override def createAccumulator(): Long = 0L

  override def add(value: ApacheLogEvent, accumulator: Long): Long = accumulator + 1

  override def getResult(accumulator: Long): Long = accumulator

  override def merge(a: Long, b: Long): Long = a + b
}

class PageViewCountwindowResult() extends WindowFunction[Long, PageViewCount, String, TimeWindow] {
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PageViewCount]): Unit = {
    out.collect(PageViewCount(key, window.getEnd, input.iterator.next()))
  }
}

class TopNHotPages(topNsize: Int) extends KeyedProcessFunction[Long, PageViewCount, String] {
  lazy val pageViewCountMapState: MapState[String, Long] = getRuntimeContext.getMapState(new MapStateDescriptor[String, Long]("pageViewCount-map", classOf[String], classOf[Long]))

  override def processElement(value: PageViewCount, ctx: KeyedProcessFunction[Long, PageViewCount, String]#Context, out: Collector[String]): Unit = {
    pageViewCountMapState.put(value.url, value.count)
    ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)  //排序输出的定时器
    // 另外注册一个定时器,1分钟之后触发,这时窗口已经彻底关闭,不再有聚合结果输出,可以清空状态
    ctx.timerService().registerEventTimeTimer(value.windowEnd + 60000L)  //清空状态的定时器
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PageViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
    // 判断定时器触发的时间,已经是结束窗口时间1分钟之后,触发状态清空
    if (timestamp == ctx.getCurrentKey + 60000L){
      pageViewCountMapState.clear()
      return
    }

    val allPageViewCounts: ListBuffer[(String, Long)] = ListBuffer()
    val iter = pageViewCountMapState.entries().iterator()
    while (iter.hasNext){
      val entry: Map.Entry[String, Long] = iter.next()
      allPageViewCounts += ((entry.getKey, entry.getValue))
    }

    val sortedPageViewCounts: ListBuffer[(String, Long)] = allPageViewCounts.sortBy(_._2)(Ordering.Long.reverse).take(topNsize)

    // 将排名信息格式化成String,便于打印输出可视化展示
    val result: StringBuilder = new StringBuilder
    result.append("窗口结束时间:").append(new Timestamp(timestamp -1)).append("\n")

    // 遍历结果列表中的每个ItemViewCount,输出到一行
    for (i <- sortedPageViewCounts.indices){
      val currentItemViewCount = sortedPageViewCounts(i)
      result.append("NO").append(i + 1).append(": \t")
        .append("页面URL = ").append(currentItemViewCount._1).append("\t")
        .append("热门度 = ").append(currentItemViewCount._2).append("\n")
    }

    result.append("\n==================================\n\n")
    Thread.sleep(1000)
    out.collect(result.toString())

  }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐