微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink-1.10.0中的readTextFile解读

Flink-1.10.0中的readTextFile解读

最近在学习Flink,研究了一些东西,在准备自定义一个简单的监听文件的source作为练手的时候,遇到了一个问题。就是应该如何在自己的source中决定哪个分区读取哪个文件?学习过spark的我们知道,source会被切分,然后每个分区读取自己分区的输入切片数据即可。那么Flink如何进行输入分片的切分的呢?我们如果自定义的source需要是一个并行的source时,又该如何实现呢?

带着这个疑问,查看了Flink-1.10.0的源代码,查看Flink的readTextFile算子是如何实现的。

首先,使用以下代码演示一个问题

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> test = env.socketTextStream("localhost", 8888);
System.out.println("test source parallel:\t" + test.getParallelism()); //test的分区数量为1
test.print(); // socket的每一行元素会在不同的分区进行输出

通过上面的简单的代码展示我们可以知道:

  1. print是一个并行的sink,即使和单并行的source一起使用也会并行的输出
  2. getParallelism方法可以查看DateStream的分区数量

那么我们来查看分析一下Flink中的readTextFile的源码吧。

首先,在IDEA中一步步查看readTextFile的实现,前面的方法基本都是检查参数和补全一些认参数,最后调用方法为createFileInput。代码如下

    private <OUT> DataStreamSource<OUT> createFileInput(FileInputFormat<OUT> inputFormat,
                                                        Type@R_671_4045@ion<OUT> typeInfo,
                                                        String sourceName,
                                                        FileProcessingMode monitoringMode,
                                                        long interval) {

    // 检查参数
        Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");
        Preconditions.checkNotNull(typeInfo, "Unspecified output type @R_671_4045@ion.");
        Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");
        Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode.");

        Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE) ||
                interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
            "The path monitoring interval cannot be less than " +
                    ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");

    // 输入分片构建的函数类
        ContinuousFileMonitoringFunction<OUT> monitoringFunction =
            new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval);

    // 读取输入分片的具体实现类
        ContinuousFileReaderOperator<OUT> reader =
            new ContinuousFileReaderOperator<>(inputFormat);

    /*
     * 和我们使用env.addSource一样,但是后面进跟着调用一个transform。
     * 这里就是整个解析中要重点说明的一点,monitoringFunction中只是负责构建数据切片的
     * 到这一步,其实这个source的并行度还是1
     *
     * 调用transform方法之后,将数据切片中的内容读取出来,这里的并行度才是配置文件中的并行度
     */
        SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName)
                .transform("Split Reader: " + sourceName, typeInfo, reader);

        return new DataStreamSource<>(source);
    }

为了验证没有调用transform之前的并行度,我们可以使用一下代码进行测试

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String inputPath = "/Users/xxx/test/flink_test";

TextInputFormat format = new TextInputFormat(new Path(inputPath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
Type@R_671_4045@ion<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName("UTF-8");
format.setFilePath(inputPath);


FileProcessingMode monitoringMode = FileProcessingMode.PROCESS_ONCE;

ContinuousFileMonitoringFunction<String> function = new ContinuousFileMonitoringFunction<>(format, monitoringMode, 12, -1);

// 只构建到addSource这一步,不再进行transform的调用
DataStreamSource<TimestampedFileInputSplit> test = env.addSource(function, "test");
System.out.println("test source parallel:\t" + test.getParallelism());
test.print();


env.execute("user_defind_source");

执行结果如下

test source parallel: 1
10> [8] file:/Users/xxx/test/flink_test/word.txt mod@ 1582184141000 : 14 + 7
1> [0] file:/Users/xxx/test/flink_test/out mod@ 1582184340000 : 0 + 7
8> [6] file:/Users/xxx/test/flink_test/word.txt mod@ 1582184141000 : 0 + 7
6> [5] file:/Users/xxx/test/flink_test/out mod@ 1582184340000 : 35 + 4
5> [4] file:/Users/xxx/test/flink_test/out mod@ 1582184340000 : 28 + 7
12> [10] file:/Users/xxx/test/flink_test/word.txt mod@ 1582184141000 : 28 + 7
11> [9] file:/Users/xxx/test/flink_test/word.txt mod@ 1582184141000 : 21 + 7
9> [7] file:/Users/xxx/test/flink_test/word.txt mod@ 1582184141000 : 7 + 7
4> [3] file:/Users/xxx/test/flink_test/out mod@ 1582184340000 : 21 + 7
3> [2] file:/Users/xxx/test/flink_test/out mod@ 1582184340000 : 14 + 7
2> [1] file:/Users/xxx/test/flink_test/out mod@ 1582184340000 : 7 + 7

可以看出,不调用transform方法的话,其实只是构建出了数据切片而已。数据切片的构建规则仔细读读源码还是可以看懂的,就是根据分区数和文件长度计算的。

让我们再来看一下ContinuousFileReaderOperator这个类。

    /**
     *  该类的open方法中,获取了Flink的getRuntimeContext相关信息
     *  getRuntimeContext中包含了subtask得索引信息
     *  该类中还包含了一个SplitReader内部类,该类继承了Thread方法
     *  其run方法完成了具体的输入分片的读取任务
     */
    @Override
    public void open() throws Exception {
        super.open();

        checkState(this.reader == null, "The reader is already initialized.");
        checkState(this.serializer != null, "The serializer has not been set. " +
            "Probably the setoutputType() was not called. Please report it.");

    // 将Flink的RuntimeContext取出
        this.format.setRuntimeContext(getRuntimeContext());
        this.format.configure(new Configuration());
        this.checkpointLock = getContainingTask().getCheckpointLock();

        // set the reader context based on the time characteristic
        final TimeCharacteristic timeCharacteristic = getoperatorConfig().getTimeCharacteristic();
        final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
        this.readerContext = StreamSourceContexts.getSourceContext(
            timeCharacteristic,
            getProcessingTimeService(),
            checkpointLock,
            getContainingTask().getStreamStatusMaintainer(),
            output,
            watermarkInterval,
            -1);

        // and initialize the split reading thread
        this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, restoredReaderState);
        this.restoredReaderState = null;
        this.reader.start();
    }


        /**
         *  SplitReader类的run方法
         *  readTextFile方法debug来看,执行顺序是先执行上面的open()方法,open方法中启动了下面的run方法
         *  run方法首先会进入一个循环中进行等待,等待第一个输入切片的完成,然后就可以开始读数据了
         *  读取第一个输入切片的过程中,外部还可以继续向切片队列中添加切片。
         *  this.pendingSplits是一个输入切片的保存队列,提供了外部向队列添加输入切片的方法
         */
        @Override
        public void run() {
            try {

                Counter completedSplitsCounter = getMetricGroup().counter("numSplitsProcessed");
                this.format.openInputFormat();

                while (this.isRunning) {

                    synchronized (checkpointLock) {

                        if (currentSplit == null) {
                            currentSplit = this.pendingSplits.poll();

                            // if the list of pending splits is empty (currentSplit == null) then:
                            //   1) if close() was called on the operator then exit the while loop
                            //   2) if not wait 50 ms and try again to fetch a new split to read

              /**
               * 如果输入切片为空,则等待50ms之后重复while到这段的内容
               */
                            if (currentSplit == null) {
                                if (this.shouldClose) {
                                    isRunning = false;
                                } else {
                                    checkpointLock.wait(50);
                                }
                                continue;
                            }
                        }

                        if (this.format instanceof CheckpointableInputFormat && currentSplit.getSplitState() != null) {
                            // recovering after a node failure with an input
                            // format that supports resetting the offset
                            ((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) this.format).
                                reopen(currentSplit, currentSplit.getSplitState());
                        } else {
                            // we either have a new split, or we recovered from a node
                            // failure but the input format does not support resetting the offset.
                            this.format.open(currentSplit);
                        }

                        // reset the restored state to null for the next iteration
                        this.currentSplit.resetSplitState();
                        this.issplitOpen = true;
                    }

                    LOG.debug("Reading split: " + currentSplit);

                    try {
            // 读取数据,并且将数据放入context中
                        OT nextElement = serializer.createInstance();
                        while (!format.reachedEnd()) {
                            synchronized (checkpointLock) {
                                nextElement = format.nextRecord(nextElement);
                                if (nextElement != null) {
                                    readerContext.collect(nextElement);
                                } else {
                                    break;
                                }
                            }
                        }
                        completedSplitsCounter.inc();

                    } finally {
                        // close and prepare for the next iteration
                        synchronized (checkpointLock) {
                            this.format.close();
                            this.issplitOpen = false;
                            this.currentSplit = null;
                        }
                    }
                }

            } catch (Throwable e) {

                getContainingTask().handleAsyncException("Caught exception when processing split: " + currentSplit, e);

            } finally {
                synchronized (checkpointLock) {
                    LOG.debug("Reader terminated, and exiting...");

                    try {
                        this.format.closeInputFormat();
                    } catch (IOException e) {
                        getContainingTask().handleAsyncException(
                            "Caught exception from " + this.format.getClass().getName() + ".closeInputFormat() : " + e.getMessage(), e);
                    }
                    this.issplitOpen = false;
                    this.currentSplit = null;
                    this.isRunning = false;

                    checkpointLock.notifyAll();
                }
            }
        }

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐