微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink:简单上手

简介@H_404_1@

官网:https://flink.apache.org/

image-20210831145112869

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.

Apache Flink 是一个框架分布式处理引擎,用于对无界和有界数据流进行状态计算。

流数据更真实地反映了我们的生活方式

传统的数据架构是基于有限数据集的

我们的目标:低延迟、高吞吐、结果的准确性和良好的容错性

传统数据处理架构:事务处理

image-20210831151845002

分析处理架构:将数据从业务数据库复制到数仓,再进行分析和查询

image-20210831152248048

有状态的流式处理

@H_404_53@

流处理的演变:

lambda架构:用两套系统,同时保证低延迟和结果准确

image-20210831152942613

fink

image-20210831153235369

Flink的主要特点

  • 事件驱动(Event-driven)

image-20210831153510640

  • 基于流的世界观

    在flink的世界观中,一切都是由流组成的,离线数据是有界的流;

    实时数据是一个没有界限的流:这就是所谓的有界流和无界流

image-20210831153955065

  • 分层API

    越顶层越抽象,表达含义越简明,使用越方便

    越底层越具体,表达能力越丰富,使用越灵活

image-20210831155538615

Flink的其他特点

支持事件时间(event-time)和处理时间(processing-time) 语义

• 精确一次(exactly-once)的状态一致性保证

• 低延迟,每秒处理数百万个事件,毫秒级延迟

• 与众多常用存储系统的连接

• 高可用,动态扩展,实现7*24小时全天候运行

  • 流(stream)和微批(micro-batching)

image-20210831160139833

  • 数据模型

spark 采用 RDD 模型,spark streaming 的 DStream 实际上也就是一组组小批 数据 RDD 的集合

flink 基本数据模型是数据流,以及事件(Event)序列

  • 运行时架构

spark 是批计算,将 DAG 划分为不同的 stage,一个完成后才可以计算下一个

flink 是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理

简单上手@H_404_1@

批处理

下面提供一个例子:使用flink读取文件并计算每一个单词出现次数

pom.xml

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <!--2.12指的是scala版本-->
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
    </dependencies>

测试:

//批处理word count
public class WordCount {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        String inputPath = "D:\\project\\flink-demo\\src\\main\\resources\\hello.txt";
        DataSet<String> inputDataSet = env.readTextFile(inputPath);
        //对数据集进行处理,按空格分词展开,转换成(word,1)二元组进行统计
        DataSet<Tuple2<String, Integer>> dataSet = inputDataSet.flatMap(new MyFlatMap())
                .groupBy(0)//按照第一个位置的word分组
                .sum(1);//将第二个位置上的数据求和

        dataSet.print();
    }

}

public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] words = value.split(" ");
        //遍历所有word,包成二元组
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1));
        }
    }
}

hello.txt:

hello word
hello flink
hello java
hello scala
hello spring
how are you
are you ok

输出结果:

image-20210831191626564

流处理

public class StreamWordCount {

    public static void main(String[] args) throws Exception {
        //创建流处理执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String inputPath = "D:\\project\\flink-demo\\src\\main\\resources\\hello.txt";
        DataStream<String> dataStream = env.readTextFile(inputPath);
        //基于数据流进行转换计算
        DataStream<Tuple2<String, Integer>> resultStream = dataStream.flatMap(new MyFlatMap())
                .keyBy(0).sum(1);

        resultStream.print();
        //执行任务
        env.execute();
    }
}

输出结果:

image-20210831192949254

输出结果前面的数字:表示当前并行执行的线程编号,认并行度跟自己电脑有有关,当然也可以自己设置并行度。

env.setParallelism(4);

image-20210831193311253

流式数据源测试

linux上安装nc

yum install -y nc
nc -lk 7777

代码:

    public static void main(String[] args) throws Exception {
        //创建流处理执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
//        String inputPath = "D:\\project\\flink-demo\\src\\main\\resources\\hello.txt";
//        DataStream<String> dataStream = env.readTextFile(inputPath);

        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host = parameterTool.get("host");
        int port = parameterTool.getInt("port");

        DataStream<String> dataStream = env.socketTextStream(host, port);
        //基于数据流进行转换计算
        DataStream<Tuple2<String, Integer>> resultStream = dataStream.flatMap(new MyFlatMap())
                .keyBy(0).sum(1);

        resultStream.print();
        //执行任务
        env.execute();
    }

设置命令行参数:

image-20210831194805584

启动方法后,我们在nc控制台输入的数据,flink都能进行实时的流式计算

image-20210831195051079

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐