Flink - End-to-End Exactly-Once语义
1. 什么是状态一致性
Flink跟其他的流计算引擎相比,最突出或者做的最好的就是状态的管理,根据对状态的学习,我们知道flink是有状态的流处理,内部每个算子任务都可以有自己的状态
对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确。
一条数据不应该丢失,也不应该重复计算
在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的。
2. 状态一致性分类
AT-MOST-ONCE(最多一次)
当任务故障时,最简单的做法是什么都不干,既不恢复丢失的状态,也不重播丢失的数据。At-most-once 语义的含义是最多处理一次事件。
AT-LEAST-ONCE(至少一次)
在大多数的真实应用场景,我们希望不丢失事件。这种类型的保障称为 at-least-once,意思是所有的事件都得到了处理,而一些事件还可能被处理多次。
EXACTLY-ONCE(精确一次)
恰好处理一次是最严格的保证,也是最难实现的。恰好处理一次语义不仅仅意味着没有事件丢失,还意味着针对每一个数据,内部状态仅仅更新一次。
At most noce < At least once < Exactly once < End to End Exactly once
3. 端到端(end-to-end)状态一致性
经过前面的介绍我们知道,Flink内部实现的一致性精确一次,只是对内部算子的保证(主要是Transformation阶段)
对于Source如果依赖外部系统或者某些重设数据位置的机制(比如我们演示的累加Source回滚)以及数据写出外部Sink
精确一次很难保证和外部系统的一致性(也就是内部(主要是Transformation)保证一致性,外部不能保证不重复消费或输出)
那么,有没有什么方式可以让我们实现精确的和外部交互能够一条不多一条不少呢。
有,这个就叫做端到端的精确一次性语义实现。
其主要基于 两端提交的协议而实现。
3.1 端到端 exactly-once的实现
如果想要实现两端提交协议来完成和外部系统的精确一次语义,需要满足:
内部保证 —— 依赖checkpoint的机制保证,Flink已经做到
source 端 —— 发生故障时,可重设数据的读取位置(类似Kafka消费者可以基于offset重新消费,或者如我们自己实现的重设累加器基数)
sink 端 —— 需要满足
幂等写入 或者
事务写入 (依靠事务,也就是Sink出去的地方必须支持事务回滚机制)
基于如上3个条件,才能完成 外部和Flink在执行结果上(Sink)整体的一致性
也就是,Source可能会重复消费,但是不管怎么重复消费Source,可以保证的是Sink出去的数据不会多也不会少,正好精确一条(基于事务)
3.2 Checkpoint如何保证数据正确性
CheckPoint保证Flink内部的一致性,主要基于barrier机制。
Barrier插入数据中,可以精确的控制,当前CheckPoint的边界。不会因为barrier向后移动导致边界变化。
只要barrier生成,插入数据的那一刻起,CheckPoint的结果已经确定,只是各个operator依次执行快照而已。
3.3 Sink端保证数据不会重复写入到外部系统
3.3.1 幂等写入(Idempotent Writes)
幂等写操作是指:任意多次向一个系统写入数据,只对目标系统产生一次结果影响。
例如,重复向一个HashMap里插入同一个Key-Value二元对,第一次插入时这个HashMap发生变化,后续的插入操作不会改变HashMap的结果,这就是一个幂等写操作。
Cassandra、HBase和Redis这样的KV数据库一般经常用来作为Sink,用以实现端到端的Exactly-Once。
需要注意的是,并不是说一个KV数据库就百分百支持幂等写。幂等写对KV对有要求,那就是Key-Value必须是可确定性(Deterministic)计算的。
假如我们设计的Key是:name + curTimestamp,每次执行数据重发时,生成的Key都不相同,会产生多次结果,整个操作不是幂等的。因此,为了追求端到端的Exactly-Once,我们设计业务逻辑时要尽量使用确定性的计算逻辑和数据模型。
3.3.2 事务写入(Transactional Writes)如何确保的双端一致性语义
事务(Transaction)是数据库系统索要解决的最核心问题。Flink借鉴了数据库中的事务处理技术,同时结合自身的Checkpoint机制来保证Sink只对外部输出产生一次影响。
比如数据id从1开始,一直向后累加(后面的新数据id +1)
上一次双端一致性语义,确保了数据id5的位置,被Flink成功处理并成功的提交到sink后的外部系统。
那么,当前流程的数据从id6开始。
那么,当前Source位置收到了一个barrier,barrier处于id10和id11之间
表示前面的6,7,8,9,10 5个数据均是本次CheckPoint要处理的
当barrier开始后,所有的barrier前面的数据,均在Sink这里被预提交(MysqL的execute,但是没有commit)
当barrier走到sink端并完成CheckPoint后,才提交事务。
可能会产生如下情况:
1.Sink的CheckPoint成功,事务提交失败,那么事务回滚,回滚到上一次的CheckPoint
由于本次的所有数据6,7,8,9,10均是预提交的,那么回滚事务后,这些数据不会写出,sink外部系统最新状态依旧是数据id5写入完成
同时上一次的CheckPoint也是记录的id5完成处理,那么重启后继续从6开始处理
从6开始重新处理可能导致Source会重复消费从6开始的数据,比如6,7,8,9,10
但是保证了Sink端不会重复写出
2.Sink的CheckPoint不成功,那么事务不会提交,事务会回滚,CheckPoint也会回滚到上一次成功的CheckPoint中
那么,整体还是从id6开始继续处理
3. Sink的checkpoint成功,事务也提交成功,那么这是一次完美的配合。
那么,当前整个系统Flink的CheckPoint推到了id10处理完成。
Sink的外部系统也是记录的id10写入成功。
后面继续处理id11开始。
3.4 两阶段提交(Two-Phase-Commit,2PC)的实现
https://zh.wikipedia.org/wiki/%E4%BA%8C%E9%98%B6%E6%AE%B5%E6%8F%90%E4%BA%A4
对于每个 checkpoint,sink 任务会启动一个事务,并将接下来所有接收的数据添加到事务里
然后将这些数据写入外部 sink 系统,但不提交它们 —— 这时只是“预提交”
当它收到 checkpoint 完成的通知时,它才正式提交事务,实现结果的真正写入
这种方式真正实现了 exactly-once,它需要一个提供事务支持的外部 sink 系统。Flink 提供了 TwoPhaseCommitSinkFunction 接口。
3.5 两阶段递交对外部系统的约束
外部 sink 系统必须提供事务支持,或者 sink 任务必须能够模拟外部系统上的事务
在 checkpoint 的间隔期间里,必须能够开启一个事务并接受数据写入
在收到 checkpoint 完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候sink系统关闭事务(例如超时了),那么未提交的数据就会丢失
sink 任务必须能够在进程失败后恢复事务
提交事务必须等待CheckPoint先完成
3.6 两阶段提交的简单总结
简单来说就是,两次检查点之间的数据,均不会提交出去,均是预提交状态,当checkpoint完成后,才会提交
3.7 Flink实现kafka的端对端exactly-once
Flink 1.4版本之前,支持Exactly Once语义,仅限于应用内部。
Flink 1.4版本之后,通过两阶段提交(TwoPhaseCommitSinkFunction)支持End-To-End Exactly Once,而且要求Kafka 0.11+。
利用TwoPhaseCommitSinkFunction是通用的管理方案,只要实现对应的接口,而且Sink的存储支持变乱提交,即可实现端到端的划一性语义。
4. 例子
4.1 Kafka来实现End-to-End Exactly-Once语义
/**
* Kafka Producer的容错-Kafka 0.9 and 0.10
* 如果Flink开启了checkpoint,针对FlinkKafkaProducer09 和FlinkKafkaProducer010 可以提供 at-least-once的语义,还需要配置下面两个参数
* •setLogFailuresOnly(false)
* •setFlushOnCheckpoint(true)
*
* 注意:建议修改kafka 生产者的重试次数
* retries【这个参数的值默认是0】
*
* Kafka Producer的容错-Kafka 0.11
* 如果Flink开启了checkpoint,针对FlinkKafkaProducer011 就可以提供 exactly-once的语义
* 但是需要选择具体的语义
* •Semantic.NONE
* •Semantic.AT_LEAST_ONCE【默认】
* •Semantic.EXACTLY_ONCE
*/
public class StreamKafkaSinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//checkpoint配置
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DataStreamSource<String> text = env.socketTextStream("node1", 9999, '\n');
String topic = "test";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", "node1:9092");
//设置事务超时时间,也可在kafka配置中设置
prop.setProperty("transaction.timeout.ms",60000*15+"");
//使用至少一次语义的形式
//val myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());
//使用支持仅一次语义的形式
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(topic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
text.addSink(myProducer);
env.execute("StreamingKafkaSinkScala");
}
}
MysqL来实现End-to-End Exactly-Once语义
* 自定义Source, 累加生成数据
* 自定义MysqLSource 实现双端提交
*/
public class MysqLEndToEndDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new FsstateBackend("file:///G:\\26-code\\Java\\flinkbase\\data\\output\\checkpoint"));
env.setParallelism(1);
DataStreamSource<Integer> source = env.addSource(new MySourceWithState());
source.addSink(new MysqLSinkWithExactlyOnce());
env.execute();
}
public static class MysqLSinkWithExactlyOnce extends TwoPhaseCommitSinkFunction<Integer, Connection, Void> implements SinkFunction
<Integer> {
public MysqLSinkWithExactlyOnce() {
super(new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE);
}
@Override
protected void invoke(Connection conn, Integer value, Context context) throws Exception {
PreparedStatement preparedStatement = conn.prepareStatement("INSERT INTO exactly_once VALUES(" + value + ")");
preparedStatement.execute();
if (value == 15) System.out.println(1 / 0);
}
@Override
protected Connection beginTransaction() throws Exception {
Class.forName("com.MysqL.jdbc.Driver");
// 创建数据库连接
String url = "jdbc:MysqL://node1:3306/flinkdemo?useUnicode=true&characterEncoding=utf-8&useSSL=false";
Connection conn = DriverManager.getConnection(url, "root", "123456");
conn.setAutoCommit(false); // 必须
return conn;
}
@Override
protected void preCommit(Connection transaction) throws Exception {
System.out.println("preCommit...");
}
@Override
protected void commit(Connection transaction) {
try {
transaction.commit();
} catch (sqlException throwables) {
throwables.printstacktrace();
}
}
@Override
protected void abort(Connection transaction) {
try {
transaction.rollback();
} catch (sqlException throwables) {
throwables.printstacktrace();
}
}
}
public static class MySourceWithState implements SourceFunction<Integer>, CheckpointedFunction {
private boolean isRun = true;
private ListState<Integer> state;
private int currentCounter = 0;
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 清除旧数据
this.state.clear();
// 添加新数据
this.state.add(this.currentCounter);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
this.state = context.getoperatorStateStore().getListState(new ListStateDescriptor<Integer>(
"operatorState",
Type@R_123_404[email protected](new TypeHint<Integer>() {})
));
for (int counter: this.state.get()) {
// 如果state为空, for循环无法进入
this.currentCounter = counter;
}
this.state.clear();
}
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (this.isRun) {
this.currentCounter++;
ctx.collect(this.currentCounter);
System.out.println("Source: " + this.currentCounter);
TimeUnit.SECONDS.sleep(1L);
// if (this.currentCounter == 10) {
// System.out.println("手抛异常:" + (1 / 0));
// }
}
}
@Override
public void cancel() {
this.isRun = false;
}
}
}
ProcessFunction API
1. 分层API
根据编程模型的学习,Flink的API分为三层:
sql/Table API
DataStream API/DataSet API
ProcessFunction(event,state,time)
之前学习的转换算子是无法访问事件的时间戳信息和水位线信息的。而这在一些应用场景下,极为重要。例如MapFunction这样的map转换算子就无法访问时间戳或者当前事件的事件时间。
基于此,DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。Process Function用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,Flink sql就是使用Process Function实现的。
2. ProcessFunction
ProcessFunction 是一个低阶的流处理操作,它可以访问流处理程序的基础构建模块:
事件 (event)(流元素)
状态 (state)(容错性,一致性,仅在keyed stream中)
定时器 (timers)(event time 和 processing time, 仅在keyed stream中)
Flink 提供了 8 个 Process Function:
keyedProcessFunction 可以看作是一个具有 keyed state 和 timers 访问权的 FlatMapFunction
通过 RuntimeContext 访问 keyed state 。
计时器允许应用程序对处理时间和事件时间中的更改作出响应。对 processElement(…) 函数的每次调用都获得一个 Context 对象,该对象可以访问元素的 event time timestamp 和 TimerService。
TimerService 可用于为将来的 event/process time 注册回调。当定时器的达到定时时间时,会调用 onTimer(…) 方法。
3. KeyedProcessFunction(重点)
KeyedProcessFunction作为ProcessFunction的扩展,在其onTimer(…)方法中提供对定时器对应key的访问。
KeyedProcessFunction 用来操作 KeyedStream。所有的 Process Function 都继承自 RichFunction 接口,所以都有:
open()
close()
getRuntimeContext()
等方法。而 KeyedProcessFunction[KEY, IN, OUT]还额外提供了两个方法:
processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一个元素 都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流(side outputs)。
onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一个回 调函数。当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定 的触发的时间戳。Collector 为输出结果的集合。OnTimerContext 和 processElement 的 Context 参数一样,提供了上下文的一些信息,例如定时器 触发的时间信息(事件时间或者处理时间)。
4. TimerService 和 定时器(Timers)
Context 和 OnTimerContext 所持有的 TimerService 对象拥有以下方法:
currentProcessingTime(): Long 返回当前处理时间
currentWatermark(): Long 返回当前 watermark 的时间戳
registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前 key 的 processing time 的定时器。当processing time到达定时时间时,触发 timer。
registerEventTimeTimer(timestamp: Long): Unit 会注册当前 key的 event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
当定时器 timer 触发时,会执行回调函数 onTimer()。注意定时器 timer 只能在 keyed streams 上面使用。
5. 案例实现
需求
在服务器运维中,需要实时监控服务器机架的温度,如果一定时间内温度超过了一定阈值(100度),且后一次上报的温度超过了前一次上报的温度,需要触发告警(温度持续升高中)
开发步骤
初始化流计算运行环境
指定数据按照事件事件进行处理
指定并行度为1
接入socket数据源,获取数据
将获取到的数据转换成tuple
自定义ProcessFunction对象,集成KeyedProcessFunction抽象类
实现如下方法:
open
processElement
onTimer
打印输出
执行任务
实现代码
public class KeyedProcessFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<Tuple2<Integer, Integer>> dataSource = env.socketTextStream("node1", 9999).map(new MapFunction
<String, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(String value) throws Exception {
String[] ss = value.split(" ");
return Tuple2.of(Integer.parseInt(ss[0]), Integer.parseInt(ss[1]));
}
});
dataSource.print("输入的数据>>>");
KeyedStream<Tuple2<Integer, Integer>, Tuple> keyedStream = dataSource.keyBy(0);
keyedStream.process(new MyKeyedProcessFunction()).print();
env.execute();
}
public static class MyKeyedProcessFunction extends KeyedProcessFunction<Tuple, Tuple2<Integer, Integer>, String> {
private ListState<Tuple2<Integer, Integer>> listState;
private Long timerTS = 0L;
@Override
public void processElement(Tuple2<Integer, Integer> value, Context ctx, Collector<String> out) throws Exception {
Tuple2<Integer, Integer> lastData = null;
for (Tuple2<Integer, Integer> data: listState.get()) {
lastData = data;
}
if (lastData == null) {
lastData = Tuple2.of(0, 0);
}
if (value.f1 > 100 && value.f1 > lastData.f1) {
System.out.println("增加state和注册窗口, last温度" + lastData.f1 + ", 当前温度: " + value.f1);
listState.add(Tuple2.of(value.f0, value.f1));
timerTS = ctx.timerService().currentProcessingTime() + 5000L;
ctx.timerService().registerProcessingTimeTimer(timerTS);
}else {
// 温度在下降, 温度相等就pass了不处理
if (value.f1 < lastData.f1) {
listState.clear(); // 清空list, 因为温度在下降了 等待下个温度开启timer
ctx.timerService().deleteProcessingTimeTimer(timerTS);
out.collect("温度下降啦");
}
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
if (IteratorUtils.toList(listState.get().iterator()).size() > 1) {
// 表明最少有2条数据了
out.collect("温度报警啦");
}
// 5秒后, 如果2条数据说明温度升高,如果1条数据表明5秒内没有升高温度 都清空state
// 我们的要求是在指定时间内完成温度升高的要求
listState.clear(); // 报警后清空状态
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ListStateDescriptor<Tuple2<Integer, Integer>> descriptor =
new ListStateDescriptor<Tuple2<Integer, Integer>>("list state", Type@R_123_404[email protected](new TypeHint<Tuple2<Integer, Integer>
>() {}));
listState = getRuntimeContext().getListState(descriptor);
}
}
综合案例实战 – 双流广播 + TOPN
1. 场景
海量实时数据流数据,关联广播状态流实时关联计算,得到结果输出
高数据吞吐下的滑动窗口
2. 需求:
* 1. 一个Source 每隔1段时间会生成汇率数据 包含货币类型(String),汇率(Integer)
* 2. 另一个会生成订单数据,订单数据中的价格,需要关联到汇率数据中的汇率乘以汇率得出汇率转换后的价格
* 时间戳(Long)
* 商品类别(String)
* 商品数量(Integer)
* 货币类型(String)
* 价格(Integer)
* 3. 需求1:得到汇率转换后的价格后写出到本地中StreamingFileSink, 汇率计算为价格乘以汇率即可
* 4. 需求2:每隔10秒统计前30秒钟,销售最热门的TOP5 商品类别的销售额
3.1 汇率生成器代码
/**
* 下面为汇率,订单包括字段,这里为了简单,我们将汇率定义为整数
* 时间戳(Long)
* 货币类型(String)
* 汇率(Integer)
*/
public class RateWriter implements ParallelSourceFunction<Map<String, Integer>> {
/**
* BEF:比利时法郎
* CNY:人民币
* DEM:德国马克
* EUR:欧元
* HKD:港币
* USD:美元
* ITL:意大利里拉
*/
public static final String[] HBDM = {"BEF", "CNY", "DEM", "EUR", "HKD", "USD", "ITL"};
private final Random r = new Random();
boolean running = true;
@Override
public void run(SourceContext<Map<String, Integer>> ctx) throws Exception {
while (running) {
Map<String, Integer> map = new HashMap<>();
Thread.sleep(r.nextInt(60) * 1000);
for (String moneyType: HBDM) {
map.put(moneyType, r.nextInt(20));
}
ctx.collect(map);
}
}
@Override
public void cancel() {
running = false;
}
}
3.2 订单数据生成器代码/**
* 下面为订单流,订单包括字段
* 时间戳(Long)
* 商品大类(String)
* 商品数量(Integer)
* 货币类型(String)
* 价格(Integer)
*/
public class OrderWriter implements ParallelSourceFunction<OrderBean> {
private final Random r = new Random();
boolean running = true;
private final List<String> catelogList = Arrays.asList(
"手机",
"电脑",
"笔记本",
"PC主机",
"显示器",
"游戏机",
"mp3播放器",
"mp4播放器",
"平板电脑",
"显卡",
"cpu",
"主板",
"硬盘",
"内存",
"机械键盘"
);
@Override
public void run(SourceContext<OrderBean> ctx) throws Exception {
while (running) {
Thread.sleep(r.nextInt(2500));
for (int i = 0; i < 10000; i++) {
ctx.collect(new OrderBean(
System.currentTimeMillis(),
catelogList.get(r.nextInt(catelogList.size())),
r.nextInt(5),
RateWriter.HBDM[r.nextInt(RateWriter.HBDM.length)],
r.nextInt(1000)));
}
}
}
@Override
public void cancel() {
running = false;
}
}
主核心逻辑代码
/**
* 1. 一个Source 每隔1段时间会生成汇率数据
* 2. 另一个会生成订单数据,订单数据中的价格,需要关联到汇率数据中的汇率乘以汇率得出汇率转换后的价格
* 3. 需求1:得到汇率转换后的价格后写出到本地中StreamingFileSik
* 4. 需求2:每隔10秒统计前30秒钟,销售最热门的TOP5 商品类别的销售额
*/
public class BigdataDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000L, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new FsstateBackend("file:///G:\\26-code\\Java\\flinkbase\\data\\output\\checkpoint"));
DataStreamSource<OrderBean> orderSource = env.addSource(new OrderWriter());
DataStreamSource<Map<String, Integer>> rateSource = env.addSource(new RateWriter());
// broadcast stream
broadcastStream<Map<String, Integer>> broadcastStream = rateSource.broadcast(new MapStateDescriptor<Integer, Map<String, Integer>>(
"broadcast state",
Type@R_123_404[email protected](new TypeHint<Integer>() {
}),
Type@R_123_404[email protected](new TypeHint<Map<String, Integer>>() {
})));
// connect
broadcastConnectedStream<OrderBean, Map<String, Integer>> broadcastConnectedStream = orderSource.connect(broadcastStream);
// process
SingleOutputStreamOperator<OrderWithPriceBean> broadcastResult = broadcastConnectedStream.process(new broadcastProcessFunction<OrderBean, Map<String, Integer>, OrderWithPriceBean>() {
final MapStateDescriptor<Integer, Map<String, Integer>> mapStateDescriptor = new MapStateDescriptor<Integer, Map<String, Integer>>(
"broadcast state",
Type@R_123_404[email protected](new TypeHint<Integer>() {
}),
Type@R_123_404[email protected](new TypeHint<Map<String, Integer>>() {
}));
@Override
public void processElement(OrderBean value, ReadOnlyContext ctx, Collector<OrderWithPriceBean> out) throws Exception {
ReadOnlybroadcastState<Integer, Map<String, Integer>> broadcastState = ctx.getbroadcastState(mapStateDescriptor);
Map<String, Integer> map = broadcastState.get(1);
if (map != null) {
Integer rate = map.get(value.getMoneyType());
out.collect(new OrderWithPriceBean(
value.getTs(),
value.getCatelog(),
value.getNumber(),
value.getMoneyType(),
value.getPrice() * rate));
}
}
@Override
public void processbroadcastElement(Map<String, Integer> value, Context ctx, Collector<OrderWithPriceBean> out) throws Exception {
broadcastState<Integer, Map<String, Integer>> broadcastState = ctx.getbroadcastState(mapStateDescriptor);
broadcastState.clear();
broadcastState.put(1, value);
}
});
// write to hdfs
broadcastResult.addSink(
StreamingFileSink.forRowFormat(
new Path("data/output/bigdata"), // 文件写出的路径
// 文件写出的序列化器和编码
new SimpleStringEncoder<OrderWithPriceBean>("UTF-8"))
// 桶分配策略
.withBucketAssigner(new BasePathBucketAssigner<OrderWithPriceBean>())
// 文件滚动(完成一次文件写出)的策略
.withRollingPolicy(
OnCheckpointRollingPolicy.build()
).build()
);
broadcastResult.print(">>>");
AllWindowedStream<OrderWithPriceBean, TimeWindow> windowedStream = broadcastResult.timeWindowAll(Time.seconds(30), Time.seconds(5));
SingleOutputStreamOperator<List<Tuple3<String, String, Long>>> TOPNResultList = windowedStream.apply(new AllWindowFunction<OrderWithPriceBean, List<Tuple3<String, String, Long>>, TimeWindow>() {
private final int TOPN = 5;
@Override
public void apply(TimeWindow window, Iterable<OrderWithPriceBean> values, Collector<List<Tuple3<String, String, Long>>> out) throws Exception {
final Map<String, Long> map = new HashMap<>();
Tuple2<String, Long> currentMax = Tuple2.of("", 0L);
List<Tuple3<String, String, Long>> resultList = new ArrayList<>();
for (OrderWithPriceBean data : values) {
Long lastMoney = map.getorDefault(data.getCatelog(), 0L);
map.put(data.getCatelog(), lastMoney + data.getPriceRated());
}
for (int i = 0; i < TOPN; i++) {
for (String key : map.keySet()) {
Long aLong = map.get(key);
if (aLong > currentMax.f1) {
currentMax = Tuple2.of(key, aLong);
}
}
resultList.add(Tuple3.of(
"window start: " + window.getStart() + ", window end: " + window.getEnd(),
currentMax.f0,
currentMax.f1
));
map.remove(currentMax.f0);
currentMax = Tuple2.of("", 0L);
}
out.collect(resultList);
}
});
SingleOutputStreamOperator<Tuple3<String, String, Long>> TOPNSingleElementResult = TOPNResultList.flatMap(new FlatMapFunction<List<Tuple3<String, String, Long>>, Tuple3<String, String, Long>>() {
@Override
public void flatMap(List<Tuple3<String, String, Long>> valueList, Collector<Tuple3<String, String, Long>> out) throws Exception {
for (Tuple3<String, String, Long> ele : valueList) {
out.collect(ele);
}
}
});
TOPNSingleElementResult.addSink(
StreamingFileSink.forRowFormat(
new Path("data/output/bigdata2"), // 文件写出的路径
// 文件写出的序列化器和编码
new SimpleStringEncoder<Tuple3<String, String, Long>>("UTF-8"))
// 桶分配策略
.withBucketAssigner(new BasePathBucketAssigner<Tuple3<String, String, Long>>())
// 文件滚动(完成一次文件写出)的策略
.withRollingPolicy(
OnCheckpointRollingPolicy.build()
).build()
);
TOPNSingleElementResult.print("window>>>");
env.execute();
}
}
综合案例实战 – 了解
1. Flink双流Join
双流Join是Flink面试的高频问题。一般情况下说明以下几点就可以了:
Join大体分类只有两种:Window Join和Interval Join。Window Join又可以根据Window的类型细分出3种:
Tumbling Window Join
Sliding Window Join
Session WidNow Join
Windows类型的join都是利用window的机制,先将数据缓存在Window State中,当窗口触发计算时,执行join操作;
interval join也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据触发数据清理;
目前Stream join的结果是数据的笛卡尔积;
日常使用中的一些问题,数据延迟、window序列化相关。
1.1 双流JOIN与传统数据库表JOIN的区别
传统数据库表的JOIN是两张静态表的数据联接,在流上面是 动态表,双流JOIN的数据不断流入与传统数据库表的JOIN有如下3个核心区别:
左右两边的数据集合无穷 - 传统数据库左右两个表的数据集合是有限的,双流JOIN的数据会源源不断的流入;
JOIN的结果不断产生/更新 - 传统数据库表JOIN是一次执行产生最终结果后退出,双流JOIN会持续不断的产生新的结果。
查询计算的双边驱动 - 双流JOIN由于左右两边的流的速度不一样,会导致左边数据到来的时候右边数据还没有到来,或者右边数据到来的时候左边数据没有到来,所以在实现中要将左右两边的流数据进行保存,以保证JOIN的语义。在Blink中会以State的方式进行数据的存储。
1.2 场景描述
日常中很多人会购买外汇,外汇的汇率是实时变化的,用户在下单的同时,会按照最新的汇率计算成交价格。
1.3 需求说明
我们构建来2个数据流,一条为实时汇率,一条为订单流,两条流合并,订单价格*汇率计算出最终价格。
1.4 日志结构说明
下面为订单流,订单包括字段
时间戳(Long)
商品大类(String)
商品细目(Integer)
货币类型(String)
价格(Integer)
下面为汇率,订单包括字段,这里为了简单,我们将汇率定义为整数
时间戳(Long)
货币类型(String)
汇率(Integer)
1.5 实现步骤
创建订单流实时生成订单数据
创建汇率流实时生成汇率数据
将以上两个流进行合并,订单价格*汇率计算出最终价格
初始化流处理运行环境
接入订单流,消费订单数据,并将消费的订单数据转换成元祖对象
接入汇率流,消费汇率数据,并将消费的汇率数据转换成元祖对象
设置按照事件时间处理数据
设置并行度为1
将订单流和汇率流添加到环境
为订单流以及汇率流添加水印支持,延迟10ms
将订单流与汇率流进行合并,将计算的订单价格进行输出,格式如下
(1585099091127,D,1,BEF,688,1585099094066,BEF,14,9632)
(1585099094771,A,1,BEF,253,1585099094066,BEF,14,3542)
1.6 参考代码
实时生成订单数据,写入到kafka集群
/**
* 下面为订单流,订单包括字段
* 时间戳(Long)
* 商品大类(String)
* 商品细目(Integer)
* 货币类型(String)
* 价格(Integer)
*/
public class OrderWriter {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Map prop = new HashMap();
prop.put("bootstrap.servers", "node01:9092");
prop.put("topic", "order");
ParameterTool parameterTool = ParameterTool.fromMap(prop);
DataStream<String> messageStream = env.addSource(new SourceFunction<String>() {
private Random r = new Random();
private static final long serialVersionUID = 1L;
boolean running = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while(running) {
Thread.sleep(r.nextInt(1500));
char catlog = (char) (65 + r.nextInt(5));
ctx.collect(String.format("%d,%s,%d,%s,%d", System.currentTimeMillis(), String.valueOf(catlog), r.nextInt(5), RateWriter.HBDM[r.nextInt(RateWriter.HBDM.length)], r.nextInt(1000)));
}
}
@Override
public void cancel() {
running = false;
}
});
DataStreamSink<String> airQualityVODataStreamSink = messageStream.addSink(new FlinkKafkaProducer<>(parameterTool.getrequired("bootstrap.servers"),
parameterTool.getrequired("topic"),
new SimpleStringSchema()));
messageStream.print();
env.execute("write order to kafka !!!");
}
}
实时生成汇率数据,写入kafka集群
/**
* 下面为汇率,订单包括字段,这里为了简单,我们将汇率定义为整数
* 时间戳(Long)
* 货币类型(String)
* 汇率(Integer)
*/
public class RateWriter {
/**
* BEF:比利时法郎
* CNY:人民币
* DEM:德国马克
* EUR:欧元
* HKD:港币
* USD:美元
* ITL:意大利里拉
*/
public static final String[] HBDM = {"BEF","CNY","DEM","EUR","HKD","USD","ITL"};
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Map prop = new HashMap();
prop.put("bootstrap.servers", "node01:9092");
prop.put("topic", "rate");
ParameterTool parameterTool = ParameterTool.fromMap(prop);
DataStream<String> messageStream = env.addSource(new SourceFunction<String>() {
private Random r = new Random();
private static final long serialVersionUID = 1L;
boolean running = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while(running) {
Thread.sleep(r.nextInt(3) * 1000);
ctx.collect(String.format("%d,%s,%d", System.currentTimeMillis(), HBDM[r.nextInt(HBDM.length)], r.nextInt(20)));
}
}
@Override
public void cancel() {
running = false;
}
});
DataStreamSink<String> airQualityVODataStreamSink = messageStream.addSink(new FlinkKafkaProducer<>(parameterTool.getrequired("bootstrap.servers"),
parameterTool.getrequired("topic"),
new SimpleStringSchema()));
messageStream.print();
env.execute("write rate to kafka !!!");
}
}
将订单流与汇率流进行合并
/**
* 我们构建来2个数据流,一条为实时汇率,一条为订单流,两条流合并,订单价格*汇率计算出最终价格。
*/
public class RateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//初始化kafka链接参数
Map properties= new HashMap();
properties.put("bootstrap.servers", "node01:9092");
properties.put("group.id", "test");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("topicOrder", "order");
properties.put("topicRate", "rate");
ParameterTool parameterTool = ParameterTool.fromMap(properties);
//首先,我们再消费kafka数据流的时候,定义个一个匿名类来规定如何消费数据,这里我们将数据切分成元组。
FlinkKafkaConsumer consumerRate = new FlinkKafkaConsumer(parameterTool.getrequired("topicRate"), new DeserializationSchema() {
@Override
public Type@R_123_4045@ion getProducedType() {
return Type@R_123_404[email protected](new TypeHint<Tuple3<Long,String,Integer>>(){});
//return Type@R_123_404[email protected](new TypeHint<Tuple>(){});
}
@Override
public Tuple3<Long,String,Integer> deserialize(byte[] message) throws IOException {
String[] res = new String(message).split(",");
Long timestamp = Long.valueOf(res[0]);
String dm = res[1];
Integer value = Integer.valueOf(res[2]);
return Tuple3.of(timestamp,dm,value);
}
@Override
public boolean isEndOfStream(Object nextElement) {
return false;
}
}, parameterTool.getProperties());
FlinkKafkaConsumer consumerOrder = new FlinkKafkaConsumer(
parameterTool.getrequired("topicOrder"), new DeserializationSchema() {
@Override
public Type@R_123_4045@ion getProducedType() {
return Type@R_123_404[email protected](new TypeHint<Tuple5<Long,String,Integer,String,Integer>>(){});
}
@Override
public Tuple5<Long,String,Integer,String,Integer> deserialize(byte[] message) throws IOException {
//%d,%s,%d,%s,%d
String[] res = new String(message).split(",");
Long timestamp = Long.valueOf(res[0]);
String catlog = res[1];
Integer subcat = Integer.valueOf(res[2]);
String dm = res[3];
Integer value = Integer.valueOf(res[4]);
return Tuple5.of(timestamp,catlog,subcat,dm,value);
}
@Override
public boolean isEndOfStream(Object nextElement) {
return false;
}
}, parameterTool.getProperties());
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Tuple3<Long,String,Integer>> rateStream = env.addSource(consumerRate);
DataStream<Tuple5<Long,String,Integer,String,Integer>> oraderStream = env.addSource(consumerOrder);
long delay = 1000;
//然后为两个流添加事件时间。
DataStream<Tuple3<Long,String,Integer>> rateTimedStream = rateStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Long,String,Integer>>(Time.milliseconds(delay)) {
@Override
public long extractTimestamp(Tuple3<Long, String, Integer> element) {
return (Long)element.getField(0);
}
});
DataStream<Tuple5<Long,String,Integer,String,Integer>> oraderTimedStream = oraderStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple5<Long,String,Integer,String,Integer>>() {
@Override
public long extractAscendingTimestamp(Tuple5 value) {
return (Long)value.getField(0);
}
});
//接下来,就是将两条流合并起来,要再where和equalTo的两个方法里,设置连接条件,然后通过window设置时间窗口,通过apply方法将join的数据最后结果拼装起来。
DataStream<Tuple9<Long,String,Integer,String,Integer,Long,String,Integer,Integer>> joinedStream = oraderTimedStream.join(rateTimedStream).where(new KeySelector<Tuple5<Long,String,Integer,String,Integer>,String>(){
@Override
public String getKey(Tuple5<Long,String,Integer,String,Integer> value) throws Exception {
return value.getField(3).toString();
}
}).equalTo(new KeySelector<Tuple3<Long,String,Integer>,String>(){
@Override
public String getKey(Tuple3<Long,String,Integer> value) throws Exception {
return value.getField(1).toString();
}
}).window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new JoinFunction<Tuple5<Long,String,Integer,String,Integer>, Tuple3<Long,String,Integer>,Tuple9<Long,String,Integer,String,Integer,Long,String,Integer,Integer>>() {
@Override
public Tuple9<Long,String,Integer,String,Integer,Long,String,Integer,Integer> join( Tuple5<Long,String,Integer,String,Integer> first, Tuple3<Long,String,Integer>second) throws Exception {
Integer res = (Integer)second.getField(2)*(Integer)first.getField(4);
return Tuple9.of(first.f0,first.f1,first.f2,first.f3,first.f4,second.f0,second.f1,second.f2,res);
}
});
joinedStream.print();
env.execute("done!");
}
}
2. 热门销售排行TopN
2.1 场景描述
TopN 的需求场景不管是在离线计算还是实时计算都是比较常见的,例如电商中计算热门销售商品、广告计算中点击数前N的广告、搜索中计算搜索次数前N的搜索词。topN又分为全局topN、分组topN, 比喻说热门销售商品可以直接按照各个商品的销售总额排序,也可以先按照地域分组然后对各个地域下各个商品的销售总额排序。本篇以热门销售商品为例,实时统计每10min内各个地域维度下销售额top10的商品。
2.2 需求说明
本案例将实现一个“实时热门商品”的需求,我们可以将“实时热门商品”翻译成程序员更好理解的需求:每隔10秒输出最近30秒内点击量最多的前 N 个商品。
这里的数据依然使用上一个案例的订单及汇率数据进行演示
2.3 日志结构说明
下面为订单流,订单包括字段
时间戳(Long)
商品大类(String)
商品细目(Integer)
货币类型(String)
价格(Integer)
下面为汇率,订单包括字段,这里为了简单,我们将汇率定义为整数
时间戳(Long)
货币类型(String)
汇率(Integer)
2.4 实现步骤
创建订单流实时生成订单数据
创建汇率流实时生成汇率数据
将以上两个流进行合并,订单价格*汇率计算出最终价格
初始化流处理运行环境
接入订单流,消费订单数据,并将消费的订单数据转换成元祖对象
接入汇率流,消费汇率数据,并将消费的汇率数据转换成元祖对象
设置按照事件时间处理数据
设置并行度为1
将订单流和汇率流添加到环境
将订单流与汇率流进行合并,将计算的订单价格进行输出,格式如下
(1585099091127,D,1,BEF,688,1585099094066,BEF,14,9632)
(1585099094771,A,1,BEF,253,1585099094066,BEF,14,3542)
为合并流添加水印支持,抽取事件时间
使用窗口函数每10秒统计一次过去30秒的成交最高的外汇币种
使用TopN对过去30秒内的数据聚合后排序,取前N名
2.5 参考代码
public class HottopNDemo {
public static void main(String[] args) throws Exception {
//初始化运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//初始化kafka接入参数
Map properties= new HashMap();
properties.put("bootstrap.servers", "node01:9092");
properties.put("group.id", "test");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
//properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("topicOrder", "order");
properties.put("topicRate", "rate");
ParameterTool parameterTool = ParameterTool.fromMap(properties);
//实例化汇率流消费者对象,并将数据转换成元祖对象
FlinkKafkaConsumer consumerRate = new FlinkKafkaConsumer(
parameterTool.getrequired("topicRate"), new DeserializationSchema() {
@Override
public Type@R_123_4045@ion getProducedType() {
return Type@R_123_404[email protected](new TypeHint<Tuple3<Long,String,Integer>>(){});
//return Type@R_123_404[email protected](new TypeHint<Tuple>(){});
}
@Override
public Tuple3<Long,String,Integer> deserialize(byte[] message) throws IOException {
String[] res = new String(message).split(",");
Long timestamp = Long.valueOf(res[0]);
String dm = res[1];
Integer value = Integer.valueOf(res[2]);
return Tuple3.of(timestamp,dm,value);
}
@Override
public boolean isEndOfStream(Object nextElement) {
return false;
}
}, parameterTool.getProperties());
//实例化订单流消费者对象,并将数据转换成元祖对象
FlinkKafkaConsumer consumerOrder = new FlinkKafkaConsumer(
parameterTool.getrequired("topicOrder"), new DeserializationSchema() {
@Override
public Type@R_123_4045@ion getProducedType() {
return Type@R_123_404[email protected](new TypeHint<Tuple5<Long,String,Integer,String,Integer>>(){});
}
@Override
public Tuple5<Long,String,Integer,String,Integer> deserialize(byte[] message) throws IOException {
//%d,%s,%d,%s,%d
String[] res = new String(message).split(",");
Long timestamp = Long.valueOf(res[0]);
String catlog = res[1];
Integer subcat = Integer.valueOf(res[2]);
String dm = res[3];
Integer value = Integer.valueOf(res[4]);
return Tuple5.of(timestamp,catlog,subcat,dm,value);
}
@Override
public boolean isEndOfStream(Object nextElement) {
return false;
}
}, parameterTool.getProperties());
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
//将数据流添加到环境
DataStream<Tuple3<Long,String,Integer>> rateStream = env.addSource(consumerRate);
DataStream<Tuple5<Long,String,Integer,String,Integer>> oraderStream = env.addSource(consumerOrder);
long delay = 1000;
DataStream<Tuple3<Long,String,Integer>> rateTimedStream = rateStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Long,String,Integer>>(Time.milliseconds(delay)) {
@Override
public long extractTimestamp(Tuple3<Long, String, Integer> element) {
return (Long)element.getField(0);
}
});
DataStream<Tuple5<Long,String,Integer,String,Integer>> oraderTimedStream = oraderStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple5<Long,String,Integer,String,Integer>>() {
@Override
public long extractAscendingTimestamp(Tuple5 value) {
return (Long)value.getField(0);
}
});
//合并订单流和汇率流
DataStream<Tuple9<Long,String,Integer,String,Integer,Long,String,Integer,Integer>> joinedStream = oraderTimedStream.join(rateTimedStream).where(new KeySelector<Tuple5<Long,String,Integer,String,Integer>,String>(){
@Override
public String getKey(Tuple5<Long,String,Integer,String,Integer> value) throws Exception {
return value.getField(3).toString();
}
}).equalTo(new KeySelector<Tuple3<Long,String,Integer>,String>(){
@Override
public String getKey(Tuple3<Long,String,Integer> value) throws Exception {
return value.getField(1).toString();
}
}).window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new JoinFunction<Tuple5<Long,String,Integer,String,Integer>, Tuple3<Long,String,Integer>,Tuple9<Long,String,Integer,String,Integer,Long,String,Integer,Integer>>() {
@Override
public Tuple9<Long,String,Integer,String,Integer,Long,String,Integer,Integer> join( Tuple5<Long,String,Integer,String,Integer> first, Tuple3<Long,String,Integer>second) throws Exception {
Integer res = (Integer)second.getField(2)*(Integer)first.getField(4);
return Tuple9.of(first.f0,first.f1,first.f2,first.f3,first.f4,second.f0,second.f1,second.f2,res);
}
});
//将合并后的数据流添加水印支持
DataStream<Tuple9<Long,String,Integer,String,Integer,Long,String,Integer,Integer>> joinedTimedStream = joinedStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple9<Long,String,Integer,String,Integer,Long,String,Integer,Integer>>() {
@Override
public long extractAscendingTimestamp(Tuple9<Long,String,Integer,String,Integer,Long,String,Integer,Integer> element) {
return element.f0;
}
});
//对数据流进行分流,并加入窗口函数,每隔10秒钟计算一下过去30秒的热销排行
DataStream<OrderView> windowedData = joinedTimedStream.keyBy(new KeySelector<Tuple9<Long,String,Integer,String,Integer,Long,String,Integer,Integer>,String>(){
@Override
public String getKey(Tuple9<Long,String,Integer,String,Integer,Long,String,Integer,Integer> value) throws Exception {
return value.f1+value.f2;
}
}).timeWindow(Time.seconds(30), Time.seconds(10))
.aggregate(new SumAgg(), new WindowResultFunction());
//获取热销排行topN
DataStream<String> topNHots = windowedData
.keyBy("windowEnd")
.process(new TopNHot(5));
topNHots.print();
env.execute("done!");
}
/**
* 定义聚合函数
*/
public static class SumAgg implements AggregateFunction<Tuple9<Long,String,Integer,String,Integer,Long,String,Integer,Integer>, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Tuple9<Long,String,Integer,String,Integer,Long,String,Integer,Integer> value, Long acc) {
return acc + value.f8;
}
@Override
public Long getResult(Long acc) {
return acc;
}
@Override
public Long merge(Long acc1, Long acc2) {
return acc1 + acc2;
}
}
/** 用于输出窗口的结果 */
//IN, OUT, KEY, W extends Window
public static class WindowResultFunction implements WindowFunction<Long, OrderView, String, TimeWindow> {
@Override
public void apply(
String key, // 窗口的主键
TimeWindow window, // 窗口
Iterable<Long> aggregateResult, // 聚合函数的结果
Collector<OrderView> collector // 输出类型为 OrderView
) throws Exception {
Long count = aggregateResult.iterator().next();
collector.collect(OrderView.of(key, window.getEnd(), count));
}
}
/**
* 定义订单视图
*/
public static class OrderView {
public String itemId; // 商品ID
public long windowEnd; // 窗口结束时间戳
public long allsum; // 商品的销售量
public static OrderView of(String itemId, long windowEnd, long allsum) {
OrderView result = new OrderView();
result.itemId = itemId;
result.windowEnd = windowEnd;
result.allsum = allsum;
return result;
}
}
/**
* 热门排行实现
*/
public static class TopNHot extends KeyedProcessFunction<Tuple, OrderView, String> {
private final int topSize;
public TopNHot(int topSize) {
this.topSize = topSize;
}
// 用于存储商品状态,待收齐同一个窗口的数据后,再触发 TopN 计算
private ListState<OrderView> orderState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 状态的注册
ListStateDescriptor<OrderView> itemsstateDesc = new ListStateDescriptor<>(
"orderState-state",
OrderView.class);
orderState = getRuntimeContext().getListState(itemsstateDesc);
}
@Override
public void processElement(
OrderView input,
Context context,
Collector<String> collector) throws Exception {
// 每条数据都保存到状态中
orderState.add(input);
// 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
context.timerService().registerEventTimeTimer(input.windowEnd + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 获取收到的所有商品销售量
List<OrderView> allItems = new ArrayList<>();
orderState.get().forEach(it -> allItems.add(it));
// 提前清除状态中的数据,释放空间
orderState.clear();
// 按照销售量从大到小排序
allItems.sort((x1, x2) -> (int) (x1.allsum - x2.allsum));
// 将排名信息格式化成 String, 便于打印
StringBuilder result = new StringBuilder();
result.append("====================================\n");
result.append("时间: ").append(new Timestamp(timestamp - 1)).append("\n");
for (int i = 0; i < topSize && i < allItems.size(); i++) {
OrderView currentItem = allItems.get(i);
// No1: 商品ID=12224 销售额=2413
result.append("No").append(i + 1).append(":")
.append(" 商品ID=").append(currentItem.itemId)
.append(" 销售额=").append(currentItem.allsum)
.append("\n");
}
result.append("====================================\n\n");
out.collect(result.toString());
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。