微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

4.9

解压
在hadoop102节点服务器上创建安装目录/opt/module,将flink执行解压命令,解压至当前目录。
§tar-zxvf flink-1.13.0-bin-scala _2.12. tgz-C/opt/module/
flink-1.13.0/
flink-1.13.0γ1og/
flink-1.13.0/LICENSE
flink-1.13.0/lib/
3.启动
进入解压后的目录,执行启动命令,并查看进程。
$cd flink-1.13.0/
$bin/start-cluster.sh
Starting cluster.
Starting taskexecutor daemon on host hadoop102(hadoop102主机名)
$jps
10369 StandalonesessionClusterEntrypoint
10680 TaskManagerRunner
10717 Jps

 

 

ackage com. atguigu. wc;
import org. apache. flink. api. java. ExecutionEnvironment ;

public class BoundedStreamWordCount {
public static void main(String[]args) {
1.创建流式的执行环境
StreamExecutionEnvironment env(自定义) streamExecutionEnvironmentecutionEnvironment();

//2.读取文件
Datastreams οurce<String) lineDatastreamSource =enν. readTextFile (hile(fiput/words. txt")

//3.转换计算
Singlecutuputstramperator < Tuple2KString , Long-> worldmonenuple =line0ates/ transmsrunce . flathttp/(String line, co).
String[]words=line. split(regex:“");
for(String word:words){
out. collect(Tuple2. of(word,1L));
}
})
returns(Types.TUPLE(Types,STRING, Types.LONG));

//4.分组
Keyed Stream. Cuplec<String, long>, String> wordqndonekeyed StreBm=world name Tuple, keygy(data->data, ff).

//5.求和
SingleOutputstreamoperatorkTuple χ(string, long>>sum awordOnekeyedStream . sum

//6、打印
sum. print();

//7.启动执行
env.execute();
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐