聚合函数(Aggregate Functions)
- 用户自定义聚合函数(user-defined Aggregate Functions,UDAGGs) 可以把一个表中的数据,聚合成一个标量值;
- 用户定义的聚合函数,是通过继承 AggregateFunction 抽象类实现的
- AggregationFunction要求必须实现的方法:
– createAccumulator()
– accumulate()
– getValue()
- AggregateFunction 的工作原理如下:
1、首先,它需要一个累加器(Accumulator),用来保存聚合中间结果的数据结构; 可以通过调用 createAccumulator() 方法创建空累加器; 2、随后,对每个输入行调用函数的 accumulate() 方法来更新累加器; 3、处理完所有行后,将调用函数的 getValue() 方法来计算并返回最终结果
表聚合函数(Table Aggregate Functions)
- 用户定义的表聚合函数(user-defined Table Aggregate Functions,UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表;
- 用户定义表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的;
- AggregationFunction 要求必须实现的方法:
– createAccumulator()
– accumulate() – emitValue()
- TableAggregateFunction 的工作原理如下:
1、首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用 createAccumulator() 方法可以创建空累加器。
2、随后,对每个输入行调用函数的 accumulate() 方法来更新累加器。
3、处理完所有行后,将调用函数的 emitValue() 方法来计算并返回最终结果。
package com.congge.table.api.udf;
import com.congge.source.SensorReading;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;
public class TestAggregateFunction {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String path = "E:\\code-self\\flink_study\\src\\main\\resources\\sensor.txt";
// 1. 读取数据
DataStreamSource<String> inputStream = env.readTextFile(path);
// 2. 转换成POJO
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 3. 将流转换成表
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");
// 4. 自定义聚合函数,求当前传感器的平均温度值
// table API
AvgTemp avgTemp = new AvgTemp();
// 需要在环境中注册UDF
tableEnv.registerFunction("avgTemp", avgTemp);
Table resultTable = sensorTable
.groupBy("id")
.aggregate( "avgTemp(temp) as avgtemp" )
.select("id, avgtemp");
// sql
tableEnv.createTemporaryView("sensor", sensorTable);
Table resultsqlTable = tableEnv.sqlQuery("select id, avgTemp(temp) " +
" from sensor group by id");
// 打印输出
tableEnv.toRetractStream(resultTable, Row.class).print("result");
tableEnv.toRetractStream(resultsqlTable, Row.class).print("sql");
env.execute();
}
// 实现自定义的AggregateFunction
public static class AvgTemp extends AggregateFunction<Double, Tuple2<Double, Integer>>{
@Override
public Double getValue(Tuple2<Double, Integer> accumulator) {
return accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0, 0);
}
// 必须实现一个accumulate方法,来数据之后更新状态
public void accumulate( Tuple2<Double, Integer> accumulator, Double temp ){
accumulator.f0 += temp;
accumulator.f1 += 1;
}
}
}
本例的需求是,通过 Flink Table Api读取原始文件数据,然后通过自定义统计聚合函数,将读取到的数据进行聚合统计输出到控制台,运行上面的代码,观察输出效果
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。