微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink异步IO-实现原理外文

原文转至:https://docs.google.com/document/d/1Lr9UYXEz6s6R_3PWg3bZQLF3upGaNEkc0rQCFSzaYDI/edit#

        Asynchronous I/O Design and Implementation

 

Motivation

I/O access, for the most case, is a time-consuming process, making the TPS for single operator much lower than in-memory computing, particularly for streaming job, when low latency is a big concern for users. Starting multiple threads may be an option to handle this problem, but the drawbacks are obvIoUs: The programming model for end users may become more complicated as they have to implement thread model in the operator. Furthermore, they have to pay attention to coordinate with checkpointing.


Scenario

For the ML streaming job, data flow has to get data from HBase, a data collection with billions of records, and then compute against it. The bottleneck for this job is the operator accessing HBase. Even though HBase has `been highly optimized, achieving very high QPS for the total cluster, but TPS for each subtask can not be very high due to slow I/O operation.

 

 

 

 

 

 

AsyncFunction

AsyncFunction works as a user function in AsyncWaitOperator, which looks like StreamFlatMap operator, having open()/processElement(StreamRecord<IN> record)/processWatermark(Watermark mark).

For user’s concrete AsyncFunction, the asyncInvoke(IN input, AsyncCollector<OUT> collector) has to be overriden to supply codes to start an async operation.

Interface

public interface AsyncFunction<IN, OUT> extends Function, Serializable {

  /**

   * Trigger async operation for each stream input.

   * The AsyncCollector should be registered into async client.

   *

   * @param input Stream Input

   * @param collector AsyncCollector

   */

  void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;

}

Async Resource

Async resource refers to clients or connections used to carry out async operation. Using getting results from HBase as an example, async resource Could be a hbase connection pool.

User can place async resource as a member variable inside AsyncFunction. If it is not serializable, using keyword transient is an option.

Interaction with AsyncWaitOperator

For each input stream record of AsyncWaitOperator, they will be processed by AsyncFunction.asyncInvoke(IN input, AsyncCollector<OUT> cb). Then AsyncCollector will be appended into AsyncCollectorBuffer. We will cover AsyncCollector and AsyncCollectorBuffer later.

AsyncCollector

AsyncCollector is created by AsyncWaitOperator, and passed into AsyncFunction, where it should be added into user’s callback. It acts as a role to get results or errors from user codes and notify the AsyncCollectorBuffer to emit results.

The functions specific for the user is the collect, and they should be called when async operation is done or errors are thrown out.

public class AsyncCollector<OUT> {
  private List<OUT> result;
  private Throwable error;
  private AsyncCollectorBuffer<OUT> buffer;

  /**
   * Set result
   * @param result A list of results.
   */
  public void collect(List<OUT> result) { 
    this.result = result;
    buffer.mark(this);
  }

  /**
   * Set error
   * @param error A Throwable object.
   */
  public void collect(Throwable error) {
    this.error = error;
    buffer.mark(this);
  }

  /**
   * Get result. Throw RuntimeException while encountering an error.
   * @return A List of result.
   * @throws RuntimeException RuntimeException wrapping errors from user codes.
   */
  public List<OUT> getResult() throws RuntimeException { ... }
}

 

 

How is it used

Before calling AsyncFunction.asyncInvoke(IN input, AsyncCollector<OUT> collector), AsyncWaitOperator will try to get an instance of AsyncCollector from AsyncCollectorBuffer. Then it will be taken into user’s callback function. If the buffer is full, it will wait until some of ongoing callbacks has finished. 

Once async operation has done, the AsyncCollector.collect() will take results or errors and AsyncCollectorBuffer will be notified.

AsyncCollector is implemented by FLINK.

AsyncCollectorBuffer

AsyncCollectorBuffer keeps all AsyncCollectors, and emit results to the next nodes.

When AsyncCollector.collect() is called, a mark will be placed in AsyncCollectorBuffer, indicating finished AsyncCollectors. A working thread, named Emitter, will also be signalled once a AsyncCollector gets results, and then try to emit results depending on the ordered or unordered setting.

For simplicity, we will refer task to AsycnCollector in the AsyncCollectorBuffer in the following text.

 

 

 

Ordered and Unordered

Based on the user configuration, the order of output elements will or will not be guaranteed. If not guaranteed, the finished AsyncCollectors coming later will be emitted earlier.

Emitter Thread

The Emitter Thread will wait for finished AsyncCollectors. When it is signalled, it will process tasks in the buffer as follow:

  • Ordered Mode

If the first task in the buffer is finished, then Emitter will collect its results, and then proceed to the second task. If the first task is not finished yet, just wait for it again.

  • Unordered Mode

Check all finished tasks in the buffer, and collect results from those tasks which are prior to the oldest Watermark in the buffer.

 

The Emitter Thread and Task Thread will access exclusively by acquiring/releasing the checkpoint lock from StreamTask.

Signal Task Thread when all tasks have finished to notify it that all data has been processed, and it is OK to close the operator.

Signal Task Thread after removing some tasks from the buffer.

Propagate Exceptions to Task Thread.

Task Thread

Access AsyncCollectorBuffer exclusively against the Emitter Thread.

Get and Add a new AsyncCollector to the buffer, wait while buffer is full.

Watermark

All watermarks will also be kept in AsyncCollectorBuffer. A watermark will be emitted if and only if after all AsyncCollectors coming before current watermark have been emitted.

Interface

public interface AsyncCollectorBuffer<IN, OUT> {
  /**
   * Add an AsyncCollector into the buffer.
   *
   * @param collector AsyncCollector
   * @throws Exception InterruptedException or exceptions from AsyncCollector.
   */
  void add(AsyncCollector<OUT> collector) throws Exception;

  /**
   * Notify the Emitter Thread that a AsyncCollector has completed.
   *
   * @param collector Completed AsyncCollector
   * @throws Exception InterruptedException.
   */
  void mark(AsyncCollector<OUT> collector) throws Exception;

  /**
   * Caller will wait here if buffer is not empty, meaning that not all tasks have returned yet.
   *
   * @throws Exception InterruptedException or Exceptions from AsyncCollector.
   */
  void waitEmpty() throws Exception;
}

State, Failover and Checkpoint

A new operator, named AsyncWaitOperator<IN, OUT>, is added to FLINK streaming. This operator will buffer all AsyncCollectors, sending the processed data to the following operators.

 

 

 

State and Checkpoint

All input StreamRecords will be kept in state. Instead of storing each input stream records into state one by one while processing, AsyncWaitOperator will put all input stream records in AsyncCollectorBuffer into state while snapshotting operator state, which will be cleared before persisting those records.

When all barriers have arrived at the operator, checkpoint can be carried out immediately.

Failover

While restoring the operator’s state, the operator will scan all elements in the state, get AsyncCollectors, call AsyncFunction.asyncInvoke() and insert them back into AsyncCollectorBuffer.

API

No modification to current DataStream class in FLINK. AsyncDataStream will handle adding AsyncWaitOperator.

AsyncDataStream

AsyncDataStream provides two methods to add AsyncWaitOperator with AsyncFunction into FLINK streaming job.

public class AsyncDataStream {
  /**
   * Add an AsyncWaitOperator. The order of output stream records may be reordered.
   *
   * @param func AsyncWaitFunction
   * @return A new DataStream.
   */
  public static DataStream<OUT> unorderedWait(DataStream<IN>, AsyncWaitFunction<IN, OUT> func);

  /**
   * Add an AsyncWaitOperator. The order of output stream records is guaranteed to be the same as input ones.
   *
   * @param func AsyncWaitFunction
   * @return A new DataStream.
   */
  public static DataStream<OUT> orderedWait(DataStream<IN>, AsyncWaitFunction<IN, OUT> func);
}

Error Handling

Exceptions can be propagated into framework code, causing task failover.

Notes

Async Resource Sharing

For the case to share async resources(like connection to hbase, netty connections) among different slots(task workers) in the same TaskManager(a.k.a the same JVM), we can make the connection static so that all threads in the same process can share the same instance.

Of course, please pay attention to thread safety while using those resources.

Example

For callback

public class HBaseAsyncFunction implements AsyncFunction<String, String> {
  // initialize it while reading object
  transient Connection connection;

  @Override
  public void asyncInvoke(String val, AsyncCollector<String> c) {
    Get get = new Get(Bytes.toBytes(val));
    Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes("test")));
    // UserCallback is from user’s async client.
    ((AsyncableHTableInterface) ht).asyncGet(get, new UserCallback(c));
  }
}

// create data stream
public void createHBaseasynctestStream(StreamExecutionEnvironment env) {
  DataStream<String> source = getDataStream(env);
  DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction());
  stream.print();
}

For ListenableFuture

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ListenableFuture;

public class HBaseAsyncFunction implements AsyncFunction<String, String> {
  // initialize it while reading object
  transient Connection connection;

  @Override
  public void asyncInvoke(String val, AsyncCollector<String> c) {
    Get get = new Get(Bytes.toBytes(val));
    Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes("test")));

    ListenableFuture<Result> future = ht.asyncGet(get);
    Futures.addCallback(future,
      new FutureCallback<Result>() {
        @Override public void onSuccess(Result result) {
          List ret = new ArrayList<String>();
          ret.add(result.get(...));
          c.collect(ret);
        }

        @Override public void onFailure(Throwable t) {
          c.collect(t);
        }
      },
      MoreExecutors.newDirectExecutorService()
    );
  }
}

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐