微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink代码阅读之API

ProcessFunction API

最底层的API

一共是八个ProcessFunction

  • ProcessFunction dataStream
  • KeyedProcessFunction 用于KeyedStream,keyBy之后的流处理
  • coprocessFunction 用于connect连接的流
  • ProcessJoinFunction 用于join流操作
  • broadcastProcessFunction 用于广播
  • KeyedbroadcastProcessFunction keyBy之后的广播
  • ProcessWindowFunction 窗口增量聚合
  • ProcessAllWindowFunction 全窗口聚合

继承自AbstractRichFunction,实现了RichFunction接口

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional @R_984_4045@ion
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.api.common.functions;

import org.apache.flink.annotation.Public;
import org.apache.flink.configuration.Configuration;

/**
 * An base interface for all rich user-defined functions. This class defines methods for
 * the life cycle of the functions, as well as methods to access the context in which the functions
 * are executed.
 */
@Public
public interface RichFunction extends Function {

	/**
	 * Initialization method for the function. It is called before the actual working methods
	 * (like <i>map</i> or <i>join</i>) and thus suitable for one time setup work. For functions that
	 * are part of an iteration, this method will be invoked at the beginning of each iteration superstep.
	 *
	 * <p>The configuration object passed to the function can be used for configuration and initialization.
	 * The configuration contains all parameters that were configured on the function in the program
	 * composition.
	 *
	 * <pre>{@code
	 * public class MyMapper extends FilterFunction<String> {
	 *
	 *     private String searchString;
	 *
	 *     public void open(Configuration parameters) {
	 *         this.searchString = parameters.getString("foo");
	 *     }
	 *
	 *     public boolean filter(String value) {
	 *         return value.equals(searchString);
	 *     }
	 * }
	 * }</pre>
	 *
	 * <p>By default, this method does nothing.
	 *
	 * @param parameters The configuration containing the parameters attached to the contract.
	 *
	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
	 *                   runtime catches an exception, it aborts the task and lets the fail-over logic
	 *                   decide whether to retry the task execution.
	 *
	 * @see org.apache.flink.configuration.Configuration
	 */
	void open(Configuration parameters) throws Exception;

	/**
	 * Tear-down method for the user code. It is called after the last call to the main working methods
	 * (e.g. <i>map</i> or <i>join</i>). For functions that  are part of an iteration, this method will
	 * be invoked after each iteration superstep.
	 *
	 * <p>This method can be used for clean up work.
	 *
	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
	 *                   runtime catches an exception, it aborts the task and lets the fail-over logic
	 *                   decide whether to retry the task execution.
	 */
	void close() throws Exception;

	// ------------------------------------------------------------------------
	//  Runtime context
	// ------------------------------------------------------------------------

	/**
	 * Gets the context that contains @R_984_4045@ion about the UDF's runtime, such as the
	 * parallelism of the function, the subtask index of the function, or the name of
	 * the of the task that executes the function.
	 *
	 * <p>The RuntimeContext also gives access to the
	 * {@link org.apache.flink.api.common.accumulators.Accumulator}s and the
	 * {@link org.apache.flink.api.common.cache.distributedCache}.
	 *
	 * @return The UDF's runtime context.
	 */
	RuntimeContext getRuntimeContext();

	/**
	 * Gets a specialized version of the {@link RuntimeContext}, which has additional @R_984_4045@ion
	 * about the iteration in which the function is executed. This IterationRuntimeContext is only
	 * available if the function is part of an iteration. Otherwise, this method throws an exception.
	 *
	 * @return The IterationRuntimeContext.
	 * @throws java.lang.IllegalStateException Thrown, if the function is not executed as part of an iteration.
	 */
	IterationRuntimeContext getIterationRuntimeContext();

	/**
	 * Sets the function's runtime context. Called by the framework when creating a parallel instance of the function.
	 *
	 * @param t The runtime context.
	 */
	void setRuntimeContext(RuntimeContext t);
}

ProcessFunction也是一个抽象类。增加了两个方法。其中out用来收集输出


	public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
	public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

onTimer是在使用定时器fire的时候被调用的。

比如说coprocessFunction,支持同时对两个流的操作

	public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
    public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;

ProcessJoinFunction则用来处理join后的结果。

	public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception;

具体如何使用可以参考如下博客@H_404_57@ https://xinchen.blog.csdn.net/article/details/109624375@H_404_57@ https://blog.csdn.net/boling_cavalry/article/details/109645214?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_title-2&spm=1001.2101.3001.4242

下面方式可以产生输入流

   protected KeyedStream<Tuple2<String, Integer>, Tuple> buildStreamFromSocket(StreamExecutionEnvironment env, int port) {
        return env
                // 监听端口
                .socketTextStream("localhost", port)
                // 得到的字符串"aaa,3"转成Tuple2实例,f0="aaa",f1=3
                .map(new WordCountMap())
                // 将单词作为key分区
                .keyBy(0);
    }

下面方式可以处理流

        SingleOutputStreamOperator<Tuple2<String, Integer>> mainDataStream = stream1
                // 两个流连接
                .connect(stream2)
                // 执行低阶处理函数,具体处理逻辑在子类中实现
                .process(new coprocessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {

            @Override
            public void processElement1(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) {
                logger.info("处理1号流的元素:{},", value);
                out.collect(value);
            }

            @Override
            public void processElement2(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) {
                logger.info("处理2号流的元素:{}", value);
                out.collect(value);
            }
        };
    }

);

下面的方式可以写到输出流和侧输出

DataStream<Integer> input = ...;

final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<Integer> mainDataStream = input
  .process(new ProcessFunction<Integer, Integer>() {

      @Override
      public void processElement(
          Integer value,
          Context ctx,
          Collector<Integer> out) throws Exception {
        // 将数据发送到常规输出中
        out.collect(value);

        // 将数据发送到侧输出中
        ctx.output(outputTag, "sideout-" + String.valueOf(value));
      }
    });

DataSet/DataStream API

TABLE API

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐