微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink的安装部署

Flink的安装部署

首先打开终端 ,在/home/xxx/ 下执行命令获取安装包:

wget https://labfile.oss.aliyuncs.com/courses/3423/flink-1.10.0-bin-scala_2.11.tar
tar -xvf flink-1.10.0-bin-scala_2.11.tar

将其解压到该目录下。
如果你学习过 Spark 的话,那么你应该会觉得接下来的内容似曾相识。在 Flink 中有三种部署模式,分别是 Standalone、Yarn 模式和 Kubernetes。在本次实验中我们的重点是 Standalone 模式,Yarn 和 Kubernetes 只需要了解即可。

Standalone
这里的 Standalone 和 Spark 中的 Standalone 的含义是一样的,就是 Flink 自己来负责资源调度,不依赖于其它工具。

搭建步骤如下:

  1. 将解压好的安装包复制到 /opt/目录下
sudo cp -r ~/flink-1.10.0 /opt/
  1. 进入到 /opt/flink-1.10.0目录下,启动集群
sudo cd /opt/flink-1.10.0
sudo bin/start-cluster.sh

至此,我们已经搭建了一个最简单的 Standalone 模式的集群,也就是伪分布式或者说是单节点的集群。双击桌面的 Firefox 浏览器,打开 localhost:8081 就会看到下面的界面:

在这里插入图片描述

受限于我们的云主机环境,在实验课程中只能搭建伪分布式的集群。如果需要搭建多台机器组成的 Standalone 模式集群也并不难,只需要在以上步骤的基础上做简单配置即可。

假设要有三台机器,主机名分别为 bigdata1、bigdata2、bigdata3,接下来只需要:

在 bigdata1 的 Flink 安装包中的 conf/flink-conf.yaml 文件添加以下内容

jobmanager.rpc.address:bigdata1
copy
在 bigdata1 中的 Flink 安装包中的 conf/slaves 文件添加以下内容

bigdata2
bigdata3
copy
将 bigdata1 中的 Flink 安装包分别拷贝到 bigdata2 和 bigdata3 的相同目录下,然后在 bigdata1 中执行 bin/start-cluster.sh 启动集群。

注意:以上三个步骤的前提是三台机器上都安装了相同的版本的 jdk 和 scala,并且配置了 ssh 免密登录

Yarn 模式
和 Spark 类似,Flink 同样支持以 Yarn 模式部署 Flink 任务,但是 Hadoop 的版本必须在 2.2 以上。

Yarn 模式的部署分为两种,分别是 Session-Cluster 和 Pre-Job-Cluster。

Session-Cluster

Session-Cluster 的特点是在提交任务之前,需要先去 Yarn 启动一个 Flink 集群,启动成功之后通过 Flink run 命令往 Flink 集群中提交任务。当 Job 执行完之后,集群并不会关闭,等待下个 Job 提交。这样做的好处是 Job 启动的时间变短,适合规模小执行时间短的作业。

Pre-Job-Cluster

Pre-Job-Cluster 的特点是一个 Job 对应一个集群,每提交一个 Job,会根据其自身情况单独向 Yarn 申请资源。这样做的好处是各个 Job 使用的资源之间独立,一个作业失败并不会影响到其它作业的正常运行,适合规模大运行时间长的任务。

Kubernetes 模式
随着信息化技术的发展,容器化技术也越来越流行。你可能没有听过 Kubernetes,但是你一定听过 Docker。是的,Kubernetes 和 Docker 类似,也是容器化技术的一种。虽然 Flink 支持基于 Kubernetes 部署,但是在业界好像并没有听到过有哪家企业在使用这种部署模式,所以大家只需要了解即可,不用做过多研究。

Flink的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.shiyanlou</groupId>
    <artifactId>FlinkLearning</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
          <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

BatchWordCount.scala 中的代码如下:

package com.shiyanlou.wc

import org.apache.flink.api.scala._

object BatchWordCount {
  def main(args: Array[String]): Unit = {
    // 创建执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    // 从文本读取数据
    val inputPath = "/home/shiyanlou/words.txt"
    val inputDS: DataSet[String] = env.readTextFile(inputPath)
    // 计算逻辑
    val wordCountDS: AggregateDataSet[(String, Int)] = inputDS
      .flatMap(_.split(" "))
      .map((_, 1))
      .groupBy(0)
      .sum(1)
    // 打印输出
    wordCountDS.print()
  }
}

Flink 流处理 WordCount
在 StreamWordCount.scala 中加入如下代码
package com.shiyanlou.wc

import org.apache.flink.streaming.api.scala._

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //  监控Socket数据
    val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)
    // 导入隐式转换
    import org.apache.flink.api.scala._
    // 计算逻辑
    val dataStream: DataStream[(String, Int)] = textDstream
      .flatMap(_.split(" "))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    // 设置并行度
    dataStream.print().setParallelism(1)

    // 执行
    env.execute("Socket stream word count")
  }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐