ProcessFunction API
最底层的API
一共是八个ProcessFunction
- ProcessFunction dataStream
- KeyedProcessFunction 用于KeyedStream,keyBy之后的流处理
- coprocessFunction 用于connect连接的流
- ProcessJoinFunction 用于join流操作
- broadcastProcessFunction 用于广播
- KeyedbroadcastProcessFunction keyBy之后的广播
- ProcessWindowFunction 窗口增量聚合
- ProcessAllWindowFunction 全窗口聚合
继承自AbstractRichFunction,实现了RichFunction接口
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional @R_984_4045@ion
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.functions;
import org.apache.flink.annotation.Public;
import org.apache.flink.configuration.Configuration;
/**
* An base interface for all rich user-defined functions. This class defines methods for
* the life cycle of the functions, as well as methods to access the context in which the functions
* are executed.
*/
@Public
public interface RichFunction extends Function {
/**
* Initialization method for the function. It is called before the actual working methods
* (like <i>map</i> or <i>join</i>) and thus suitable for one time setup work. For functions that
* are part of an iteration, this method will be invoked at the beginning of each iteration superstep.
*
* <p>The configuration object passed to the function can be used for configuration and initialization.
* The configuration contains all parameters that were configured on the function in the program
* composition.
*
* <pre>{@code
* public class MyMapper extends FilterFunction<String> {
*
* private String searchString;
*
* public void open(Configuration parameters) {
* this.searchString = parameters.getString("foo");
* }
*
* public boolean filter(String value) {
* return value.equals(searchString);
* }
* }
* }</pre>
*
* <p>By default, this method does nothing.
*
* @param parameters The configuration containing the parameters attached to the contract.
*
* @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
* runtime catches an exception, it aborts the task and lets the fail-over logic
* decide whether to retry the task execution.
*
* @see org.apache.flink.configuration.Configuration
*/
void open(Configuration parameters) throws Exception;
/**
* Tear-down method for the user code. It is called after the last call to the main working methods
* (e.g. <i>map</i> or <i>join</i>). For functions that are part of an iteration, this method will
* be invoked after each iteration superstep.
*
* <p>This method can be used for clean up work.
*
* @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
* runtime catches an exception, it aborts the task and lets the fail-over logic
* decide whether to retry the task execution.
*/
void close() throws Exception;
// ------------------------------------------------------------------------
// Runtime context
// ------------------------------------------------------------------------
/**
* Gets the context that contains @R_984_4045@ion about the UDF's runtime, such as the
* parallelism of the function, the subtask index of the function, or the name of
* the of the task that executes the function.
*
* <p>The RuntimeContext also gives access to the
* {@link org.apache.flink.api.common.accumulators.Accumulator}s and the
* {@link org.apache.flink.api.common.cache.distributedCache}.
*
* @return The UDF's runtime context.
*/
RuntimeContext getRuntimeContext();
/**
* Gets a specialized version of the {@link RuntimeContext}, which has additional @R_984_4045@ion
* about the iteration in which the function is executed. This IterationRuntimeContext is only
* available if the function is part of an iteration. Otherwise, this method throws an exception.
*
* @return The IterationRuntimeContext.
* @throws java.lang.IllegalStateException Thrown, if the function is not executed as part of an iteration.
*/
IterationRuntimeContext getIterationRuntimeContext();
/**
* Sets the function's runtime context. Called by the framework when creating a parallel instance of the function.
*
* @param t The runtime context.
*/
void setRuntimeContext(RuntimeContext t);
}
ProcessFunction也是一个抽象类。增加了两个方法。其中out用来收集输出。
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
onTimer是在使用定时器fire的时候被调用的。
比如说coprocessFunction,支持同时对两个流的操作
public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
ProcessJoinFunction则用来处理join后的结果。
public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception;
具体如何使用可以参考如下博客@H_404_57@ https://xinchen.blog.csdn.net/article/details/109624375@H_404_57@ https://blog.csdn.net/boling_cavalry/article/details/109645214?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_title-2&spm=1001.2101.3001.4242
下面方式可以产生输入流
protected KeyedStream<Tuple2<String, Integer>, Tuple> buildStreamFromSocket(StreamExecutionEnvironment env, int port) {
return env
// 监听端口
.socketTextStream("localhost", port)
// 得到的字符串"aaa,3"转成Tuple2实例,f0="aaa",f1=3
.map(new WordCountMap())
// 将单词作为key分区
.keyBy(0);
}
下面方式可以处理流
SingleOutputStreamOperator<Tuple2<String, Integer>> mainDataStream = stream1
// 两个流连接
.connect(stream2)
// 执行低阶处理函数,具体处理逻辑在子类中实现
.process(new coprocessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public void processElement1(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) {
logger.info("处理1号流的元素:{},", value);
out.collect(value);
}
@Override
public void processElement2(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) {
logger.info("处理2号流的元素:{}", value);
out.collect(value);
}
};
}
);
DataStream<Integer> input = ...;
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = input
.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(
Integer value,
Context ctx,
Collector<Integer> out) throws Exception {
// 将数据发送到常规输出中
out.collect(value);
// 将数据发送到侧输出中
ctx.output(outputTag, "sideout-" + String.valueOf(value));
}
});
DataSet/DataStream API
TABLE API
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。