微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

二Flink从入门到项目实战——Flink入门程序

Flink环境准备

开发工具及环境要求

IDE最好使用IntelliJ IDEA (eclipse存在插件不兼容的风险)

唯一的要求是使用 Maven 3.0.4 和安装 Java 8.x(或更高版本)。

Maven依赖坐标

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-core</artifactId>
  <version>1.9.0</version>
  </dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>1.9.0</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>1.9.0</version>
  <scope>provided</scope>
</dependency>

Flink入门程序

Flink批处理

package cn.tedu.flinktest;
 
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
 
public class BatchWC {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.读取源文件
        DataSource<String> textFile = env.readTextFile("E:\\words.txt");
        //3.读取当前行,并进行切割
        textFile.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] values = s.toLowerCase().split(" ");
                for (String value: values){
                    if (value.length()>0){
                        collector.collect(new Tuple2<String, Integer>(value,1));
                    }
                }
            }
        })
                //4.分组
                .groupBy(0)
                //5.求和
                .sum(1)
                //6.打印(sink)
                .print();
    }
}

 

Flink流式处理

package cn.tedu.flinktest;
 
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
 
 
public class StreamWC {
    public static void main(String[] args) throws Exception {
        //1.从args中获取端口等参数
        int port = 0;
        try{
            ParameterTool tool = ParameterTool.fromArgs(args);
            port = tool.getInt("port");
        }catch(Exception e){
            System.err.println("未定义端口,使用认9999");
            port = 9999;
        }
        //2.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //3.获取数据源
        DataStreamSource<String> source = env.socketTextStream("localhost", port);
        //4.对每行数据进行切割
        source.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] values = s.toLowerCase().split(" ");
                for (String value: values){
                    if (value.length()>0){
                        collector.collect(new Tuple2<String, Integer>(value,1));
                    }
                }
            }
        })
                //5.分组
                .keyBy(0)
                //6.定时刷新结果
                .timeWindow(Time.seconds(5))
                //7.求和
                .sum(1)
                //8.打印(sink)
                .print()
                //9.设置并行数
                .setParallelism(1);
        //10.提交作业
        env.execute("StreamWC");
    }
}

 

总结开发过程

Flink程序不管是批处理还是流式数据处理看起来都像转换数据集合的常规程序。每个程序都包含相同的基本部分:

获得execution environment

有三种方式:

getExecutionEnvironment()
 
createLocalEnvironment()
 
createRemoteEnvironment(String host, int port, String... jarFiles) //(不常用)

加载/创建初始数据source

批处理:DataStream<String> text = env.readTextFile("file:///path/to/file");

流式处理: DataStreamSource<String> source = env.socketTextStream("localhost", port);

等等(详见source章节)

指定对此数据的转换transformation

根据业务需求编写处理逻辑。此部分为后续重点讲解内容

指定将计算结果放在何处sink

writeAsText(String path)
 
print()

触发程序执行execute

Flink不会直接将部分代码逐行执行,而是采用懒加载方式,只有最终调用execute()方法时才会执行相关作业,这样能够保证flink可以胜任更加复杂的应用程序。

env.execute()

 

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐