一种错误的实现方式:这个下面就是一种错误的实现方式,首先定义数据源,其中有五个单词,然后进行数据处理,map里面实现了RichMapFunction接口,重写map方法,先定义一个计数器counter为0,每读一个单词,counter加一,就把这个单词变为counter进行输出,这里设置的并行度为1,输出结果是1,2,3,4,5
但是当我们将并行度设置为2的时候,输出变成了1,2,3,1,2 其实很好理解,就是Integer counter = 0;这句话我们执行了两次,因为并行度为2,导致我们最后是不同的并行度有不同的counter进行计数的!所以这样就不行
public class JavaCountApp {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> data = env.fromElements("flink", "hadoop", "spark", "storm", "linux");
data.map(new RichMapFunction<String, Integer>() {
Integer counter = 0;
@Override
public Integer map(String value) throws Exception {
counter++;
return counter;
}
}).setParallelism(1).print();
}
}
//并行度为1输出:1,2,3,4,5
//并行度为2输出:1,2,3,1,2
正确实现:
实现计数器有三个步骤:
代码运行步骤:
- 首先和上面的实现方法一样,map中实现RichMapFunction方法
- 第一步就是定义计数器
- 这里我们在open方法中进行注册计数器,每次注册完计数器看一下初始值进行输出(open方法有什么作用后面会说)
- 之后再map方法中计数器进行add(1)操作 并将值进行输出
- 最后我们在jobResult中获取到这个计数器
输出如下:
open counter: 0
map counter: 1
map counter: 2
map counter: 3
open counter: 0
map counter: 1
map counter: 2
num: 5
public class JavaCountApp {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> data = env.fromElements("flink", "hadoop", "spark", "storm", "linux");
DataSet<String> text = data.map(new RichMapFunction<String, String>() {
//step1:定义计数器
LongCounter counter = new LongCounter();
@Override
public void open(Configuration parameters) throws Exception {
//step2:注册计数器
getRuntimeContext().addAccumulator("ele-counts-java", counter);
//这里是为了调试进行输出每次注册计数器的初始值
System.out.println("open counter: "+counter.getLocalValue());
}
@Override
public String map(String value) throws Exception {
counter.add(1);
//查看计数器的值
System.out.println("map counter: "+counter.getLocalValue());
return value;
}
}).setParallelism(2);
String filePath = "C:\\Users\\77051\\IdeaProjects\\flinktrain01\\src\\main\\scala\\com\\imooc\\file\\sink1\\";
text.writeAsText(filePath, FileSystem.WriteMode.OVERWRITE);
JobExecutionResult jobResult = env.execute("JavaCountApp");
//step3:获取计数器
Long num = jobResult.getAccumulatorResult("ele-counts-java");
System.out.println("num: "+num);
}
}
其实这里我们还是可以看到,在并行度为2的情况下,open方法同样执行了两次,每次注册计数器都为0,所以两个并行度都有各自的计数器在计算,只不过在最后加了起来!最后得到的结果为5,那么是在哪里加起来的呢?
实际上是在job执行execute方法中自动执行的!将所有的计数器进行相加!
这一部分是ExecutionGraph.java的方法,进行相加计数器
/**
* Merges all accumulator results from the tasks prevIoUsly executed in the Executions.
* @return The accumulator map
*/
public Map<String, OptionalFailure<Accumulator<?, ?>>> aggregateUserAccumulators() {
Map<String, OptionalFailure<Accumulator<?, ?>>> userAccumulators = new HashMap<>();
for (ExecutionVertex vertex : getAllExecutionVertices()) {
Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
if (next != null) {
AccumulatorHelper.mergeInto(userAccumulators, next);
}
}
return userAccumulators;
}
open方法:RichSinkFunction与RichMapFunction都继承了AbstractRichFunction, 而AbstractRichFunction是一个抽象类,它实现了RichFunction接口,而open方法便是在RichFunction中定义的。 open方法相当于在执行具体计算前做一些初始化操作, 会在实际方法调用过程前(例如map)执行,因此适用于做一些一次性的配置工作。 对于一些多并行度的操作,会在每一次并行计算前执行一次open操作。 其中configuration参数包含做一些配置和初始化参数,这些参数可以在自定义。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。