Flink的安装部署
首先打开终端 ,在/home/xxx/ 下执行命令获取安装包:
wget https://labfile.oss.aliyuncs.com/courses/3423/flink-1.10.0-bin-scala_2.11.tar
tar -xvf flink-1.10.0-bin-scala_2.11.tar
将其解压到该目录下。
如果你学习过 Spark 的话,那么你应该会觉得接下来的内容似曾相识。在 Flink 中有三种部署模式,分别是 Standalone、Yarn 模式和 Kubernetes。在本次实验中我们的重点是 Standalone 模式,Yarn 和 Kubernetes 只需要了解即可。
Standalone
这里的 Standalone 和 Spark 中的 Standalone 的含义是一样的,就是 Flink 自己来负责资源调度,不依赖于其它工具。
搭建步骤如下:
- 将解压好的安装包复制到 /opt/目录下
sudo cp -r ~/flink-1.10.0 /opt/
- 进入到 /opt/flink-1.10.0目录下,启动集群
sudo cd /opt/flink-1.10.0
sudo bin/start-cluster.sh
至此,我们已经搭建了一个最简单的 Standalone 模式的集群,也就是伪分布式或者说是单节点的集群。双击桌面的 Firefox 浏览器,打开 localhost:8081 就会看到下面的界面:
受限于我们的云主机环境,在实验课程中只能搭建伪分布式的集群。如果需要搭建多台机器组成的 Standalone 模式集群也并不难,只需要在以上步骤的基础上做简单配置即可。
假设要有三台机器,主机名分别为 bigdata1、bigdata2、bigdata3,接下来只需要:
在 bigdata1 的 Flink 安装包中的 conf/flink-conf.yaml 文件中添加以下内容:
jobmanager.rpc.address:bigdata1
copy
在 bigdata1 中的 Flink 安装包中的 conf/slaves 文件中添加以下内容:
bigdata2
bigdata3
copy
将 bigdata1 中的 Flink 安装包分别拷贝到 bigdata2 和 bigdata3 的相同目录下,然后在 bigdata1 中执行 bin/start-cluster.sh 启动集群。
注意:以上三个步骤的前提是三台机器上都安装了相同的版本的 jdk 和 scala,并且配置了 ssh 免密登录。
Yarn 模式
和 Spark 类似,Flink 同样支持以 Yarn 模式部署 Flink 任务,但是 Hadoop 的版本必须在 2.2 以上。
Yarn 模式的部署分为两种,分别是 Session-Cluster 和 Pre-Job-Cluster。
Session-Cluster
Session-Cluster 的特点是在提交任务之前,需要先去 Yarn 启动一个 Flink 集群,启动成功之后通过 Flink run 命令往 Flink 集群中提交任务。当 Job 执行完之后,集群并不会关闭,等待下个 Job 提交。这样做的好处是 Job 启动的时间变短,适合规模小执行时间短的作业。
Pre-Job-Cluster
Pre-Job-Cluster 的特点是一个 Job 对应一个集群,每提交一个 Job,会根据其自身情况单独向 Yarn 申请资源。这样做的好处是各个 Job 使用的资源之间独立,一个作业失败并不会影响到其它作业的正常运行,适合规模大运行时间长的任务。
Kubernetes 模式
随着信息化技术的发展,容器化技术也越来越流行。你可能没有听过 Kubernetes,但是你一定听过 Docker。是的,Kubernetes 和 Docker 类似,也是容器化技术的一种。虽然 Flink 支持基于 Kubernetes 部署,但是在业界好像并没有听到过有哪家企业在使用这种部署模式,所以大家只需要了解即可,不用做过多研究。
Flink的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.shiyanlou</groupId>
<artifactId>FlinkLearning</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件用于将 Scala 代码编译成 class 文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
BatchWordCount.scala 中的代码如下:
package com.shiyanlou.wc
import org.apache.flink.api.scala._
object BatchWordCount {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 从文本读取数据
val inputPath = "/home/shiyanlou/words.txt"
val inputDS: DataSet[String] = env.readTextFile(inputPath)
// 计算逻辑
val wordCountDS: AggregateDataSet[(String, Int)] = inputDS
.flatMap(_.split(" "))
.map((_, 1))
.groupBy(0)
.sum(1)
// 打印输出
wordCountDS.print()
}
}
Flink 流处理 WordCount
在 StreamWordCount.scala 中加入如下代码:
package com.shiyanlou.wc
import org.apache.flink.streaming.api.scala._
object StreamWordCount {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 监控Socket数据
val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)
// 导入隐式转换
import org.apache.flink.api.scala._
// 计算逻辑
val dataStream: DataStream[(String, Int)] = textDstream
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.sum(1)
// 设置并行度
dataStream.print().setParallelism(1)
// 执行
env.execute("Socket stream word count")
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。