声明:本系列博客是根据SGG的视频整理而成,非常适合大家入门学习。
AggregateFunction(主要用于增量计算)
// 测试数据: 某个用户在某个时刻浏览了某个商品,以及商品的价值
// {"userID": "user_4", "eventTime": "2019-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}
// API
// IN: 输入元素类型
// ACC: 累加器类型
// OUT: 输出元素类型
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
// 初始化累加器 创建一个新的累加器,启动一个新的聚合,负责迭代状态的初始化
ACC createAccumulator();
// 累加 对于数据的每条数据,和迭代数据的聚合的具体实现
ACC add(IN value, ACC accumulator);
// 累加器合并 合并两个累加器,返回一个具有合并状态的累加器
ACC merge(ACC a, ACC b);
// 输出 从累加器获取聚合的结果
OUT getResult(ACC accumulator);
}
实例一
// 示例: 获取一段时间内(Window Size)每个用户(KeyBy)浏览的平均价值(AggregateFunction)
kafkaStream
// 将从Kafka获取的JSON数据解析成Java Bean
.process(new KafkaProcessFunction())
// 提取时间戳生成水印
.assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds)))
// 按用户分组
.keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)
// 构造TimeWindow
.timeWindow(Time.seconds(windowLengthSeconds))
// 窗口函数: 获取这段窗口时间内,每个用户浏览的平均价值
.aggregate(new AggregateFunction<UserActionLog, Tuple2<Long,Long>, Double>() {
// 1、初始值
// 定义累加器初始值
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L,0L);
}
// 2、累加
// 定义累加器如何基于输入数据进行累加
@Override
public Tuple2<Long, Long> add(UserActionLog value, Tuple2<Long, Long> accumulator) {
accumulator.f0 += 1;
accumulator.f1 += value.getProductPrice();
return accumulator;
}
// 3、合并
// 定义累加器如何和State中的累加器进行合并
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) {
acc1.f0+=acc2.f0;
acc1.f1+=acc2.f1;
return acc1;
}
// 4、输出
// 定义如何输出数据
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return accumulator.f1 / (accumulator.f0 * 1.0);
}
})
.print();
#结果
20.0
10.0
30.0
25.0
20.0
实例二
val avgTempPerWindow: DataStream[(String, Double)] = sensorData
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(15))
.aggregate(new AvgTempFunction)
// An AggregateFunction to compute the average temperature per sensor.
// The accumulator holds the sum of temperatures and an event count.
class AvgTempFunction
extends AggregateFunction[(String, Double),
(String, Double, Int), (String, Double)] {
override def createAccumulator() = {
("", 0.0, 0)
}
override def add(in: (String, Double), acc: (String, Double, Int)) = {
(in._1, in._2 + acc._2, 1 + acc._3)
}
override def getResult(acc: (String, Double, Int)) = {
(acc._1, acc._2 / acc._3)
}
override def merge(acc1: (String, Double, Int),
acc2: (String, Double, Int)) = {
(acc1._1, acc1._2 + acc2._2, acc1._3 + acc2._3)
}
}
实例三
一,概述
Flink 的AggregateFunction是一个基于中间计算结果状态进行增量计算的函数。由于是迭代计算方式,所以,在窗口处理过程中,不用缓存整个窗口的数据,所以效率执行比较高。
二,AggregateFunction接口类
AggregateFunction比ReduceFunction更加通用,它有三个参数:输入类型(IN),累加器类型(ACC)和输出类型(OUT)。
@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
...............................
}
自定义聚合函数需要实现AggregateFunction接口类,它有四个接口实现方法:
a.创建一个新的累加器,启动一个新的聚合,负责迭代状态的初始化
ACC createAccumulator();
b.对于数据的每条数据,和迭代数据的聚合的具体实现
ACC add(IN value, ACC accumulator);
c.合并两个累加器,返回一个具有合并状态的累加器
ACC merge(ACC a, ACC b);
d.从累加器获取聚合的结果
OUT getResult(ACC accumulator);
三、代码实例
1.模拟场景:
从文件/socket读取数据,数据包含三个字段:商品ID,用户ID,访问类型(1.点击查看 2.收藏 3.购买),访问时间;这里每隔3秒对最近6秒内的数据进行汇总计算各个商品的“点击查看”访问量,也就是访问类型为1的数据。
这里自定义聚合函数MyCountAggregate数据进行预聚合,自定义窗口函数MyCountwindowFunction2对聚合的数据封装成字符串,并加上窗口结束时间信息进行输出。
2.数据准备:
product1,user14,1,1586855115084 product2,user19,2,1586855116087 product2,user19,1,1586855116087 product3,user17,1,1586855117089 product1,user17,1,1586855118092 product2,user17,1,1586855119095 product3,user15,1,1586855120097 product1,user12,1,1586855121100 product2,user13,1,1586855122102 product3,user13,1,1586855123105 product1,user13,1,1586855124108 product2,user19,3,1586855116087 product2,user16,1,1586855125111 product1,user17,1,1586855136113 product1,user14,1,1586855127116 product2,user16,1,1586855128119 product2,user16,1,1586855129122 product3,user16,1,1586855130125 product2,user11,1,1586855131128 product1,user16,1,1586855132131 product2,user13,1,1586855133134 product3,user16,1,1586855134137 product3,user13,1,1586855135139 product2,user19,3,1586855116087 product1,user18,1,1586855136142 product2,user12,1,1586855137145 product1,user13,1,1586855138148 product3,user17,1,1586855139150
package com.hadoop.ljs.flink110.aggreagate;
import org.apache.flink.api.common.functions.AggregateFunction;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-04-15 22:00
* @version: v1.0
* @description: com.hadoop.ljs.flink110.aggreagate
* 输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。
*/
public class MyCountAggregate implements AggregateFunction<ProductViewData, Long, Long> {
@Override
public Long createAccumulator() {
/*访问量初始化为0*/
return 0L;
}
@Override
public Long add(ProductViewData value, Long accumulator) {
/*访问量直接+1 即可*/
return accumulator+1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
/*合并两个统计量*/
@Override
public Long merge(Long a, Long b) {
return a+b;
}
}
package com.hadoop.ljs.flink110.aggreagate;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-04-15 21:56
* @version: v1.0
* @description: com.hadoop.ljs.flink110.aggreagate
* *自定义窗口函数,封装成字符串
* *第一个参数是上面MyCountAggregate的输出,就是商品的访问量统计
* * 第二个参数 输出 这里为了演示 简单输出字符串
* * 第三个就是 窗口类 能获取窗口结束时间
*/
public class MyCountwindowFunction2 implements WindowFunction<Long,String,String, TimeWindow> {
@Override
public void apply(String productId, TimeWindow window, Iterable<Long> input, Collector<String> out) throws Exception {
/*商品访问统计输出*/
/*out.collect("productId"productId,window.getEnd(),input.iterator().next()));*/
out.collect("----------------窗口时间:"+window.getEnd());
out.collect("商品ID: "+productId+" 浏览量: "+input.iterator().next());
}
}
package com.hadoop.ljs.flink110.aggreagate;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-04-14 11:28
* @version: v1.0
* @description: com.hadoop.ljs.flink110.aggreagate
* 自定义聚合函数类和窗口类,进行商品访问量的统计,根据滑动时间窗口,按照访问量排序输出
*/
public class AggregateFunctionMain2 {
public static int windowSize=6000;/*滑动窗口大小*/
public static int windowSlider=3000;/*滑动窗口滑动间隔*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
senv.setParallelism(1);
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
/*DataStream<String> sourceData = senv.socketTextStream("localhost",9000);*/
//从文件读取数据,也可以从socket读取数据
DataStream<String> sourceData = senv.readTextFile("D:\\projectData\\ProductViewData2.txt");
DataStream<ProductViewData> productViewData = sourceData.map(new MapFunction<String, ProductViewData>() {
@Override
public ProductViewData map(String value) throws Exception {
String[] record = value.split(",");
return new ProductViewData(record[0], record[1], Long.valueOf(record[2]), Long.valueOf(record[3]));
}
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<ProductViewData>(){
@Override
public long extractAscendingTimestamp(ProductViewData element) {
return element.timestamp;
}
});
/*过滤操作类型为1 点击查看的操作*/
DataStream<String> productViewCount = productViewData.filter(new FilterFunction<ProductViewData>() {
@Override
public boolean filter(ProductViewData value) throws Exception {
if(value.operationType==1){
return true;
}
return false;
}
}).keyBy(new KeySelector<ProductViewData, String>() {
@Override
public String getKey(ProductViewData value) throws Exception {
return value.productId;
}
//时间窗口 6秒 滑动间隔3秒
}).timeWindow(Time.milliseconds(windowSize), Time.milliseconds(windowSlider))
/*这里按照窗口进行聚合*/
.aggregate(new MyCountAggregate(), new MyCountwindowFunction2());
//聚合结果输出
productViewCount.print();
senv.execute("AggregateFunctionMain2");
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。