文章目录
Flink集群搭建和使用
local 本地测试
idea运行
idea上运行
flink集群搭建
1、standallone cluster
1、准备工作
有jdk,节点间免密
2、上传解压
tar -zxvf flink-1.11.0-bin-scala_2.11.tgz
配置环境变量,过于基础不写了
然后生效
source /etc/profile
3、修改配置文件
#修改conf下的flink-conf.yaml
vim conf/flink-conf.yaml
#需要改的内容如下:
jobmanager.rpc.address: master 主节点ip地址
#修改workers
vim conf/workers
修改如下:
增加从节点 node1 node2
(把localhost改为node1,node2)
#修改masters
vim conf/masters
改成主节点ip
#同步到所有节点`pwd`是当前路径看清楚了
scp -r flink-1.11.0/ node1:`pwd`
4、启动集群
start-cluster.sh
http://master:8081 访问web界面
提交任务 – 将代码打包
1、在web页面提交任务
打开web界面后左边会有个Submit New job的点开然后上传jar包
2、web提交和flink命令提交任务一样,在shell里输入下面命令
flink run -c com.shujia.flink.soure.Demo4ReadKafka flink-1.0.jar
3、rpc方式提交任务(远程命令提交,直接在idea里打包idea里运行)
package com.liu.core
import org.apache.flink.streaming.api.scala._
/**
* @ Author : ld
* @ Description :
* @ Date : 2021/11/23 18:57
* @ Version : 1.0
*/
object WordCountRPC {
def main(args: Array[String]): Unit = {
//创建flink的环境
//注意下面参数设置
val env = StreamExecutionEnvironment.createRemoteEnvironment("master", 45189, "F:\\ideaProject\\liubigdata12\\Flink\\target\\Flink-1.0-SNAPSHOT.jar")
//设置并行度
// env.setParallelism(2)
//读取socket数据
//nc -lk 8888
env.socketTextStream("master",8888)
//把单词拆分
.flatMap(_.split(","))
//转换成kv格式
.map((_,1))
//按单词分组
.keyBy(_._1)
//统计单词数量
.sum(1)
//打印结果
.print()
//启动flink
env.execute()
}
}
2.flink on yarn 只需要部署一个节点
1、配置HADOOP_CONF_DIR
vim /etc/profile
#添加如下
export HADOOP_CONF_DIR=/usr/local/soft/hadoop-2.7.6/etc/hadoop/
#jar包
flink-shaded-hadoop-2-uber-2.6.5-10.0
flink和spark一样都是粗粒度资源申请
flink启动方式
1、yarn-session
在yarn里面启动一个flink集群 jobManager
先启动hadoop
yarn-session.sh -jm 1024m -tm 1096m
提交任务 任务提交的是偶根据并行度动态申请taskmanager
1、在web页面提交任务
2、同flink命令提交任务
flink run -c com.shujia.flink.soure.Demo4ReadKafka flink-1.0.jar
3、rpc方式提交任务
RPC模式结果
2、直接提交任务到yarn
直接提交到yarn不会生成端口号,通过master:8088界面查看任务,点击后面的ApplactionMaster跳转到Flink界面
flink run -m yarn-cluster -yjm 1024m -ytm 1096m -c
com.shujia.flink.core.Demo1WordCount flink-1.0.jar
#杀掉yarn上的任务,如果之前有任务每释放就执行,id不同别直接复制,没有就忽略
yarn application -kill application_1599820991153_0005
yarn-session先在yarn中启动一个jobMansager ,所有的任务共享一个jobmanager(提交任务更快,任务之间共享jobmanager , 相互有影响) 直接提交任务模型,为每一个任务启动一个joibmanager(每一个任务独立jobmanager , 任务运行稳定)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。