最近在学Flink时发现,由于java类型擦除的存在,导致Flink中使用Lambda表达式时,无法检测出泛型的类型,需要使用Flink类型暗示(type hint)机制才能解决。现在我们就深入剖析一下吧!
什么是Java泛型擦除
本文不介绍Java的泛型,对泛型不太了解的同学强烈推荐这篇博客:https://www.cnblogs.com/coprince/p/8603492.html
看两个例子:
(1)例1
List arrayList = new ArrayList();
arrayList.add("abc");
arrayList.add(12);
for(int i = 0; i< arrayList.size();i++){
String item = (String)arrayList.get(i);
System.out.println(item);
}
运行报错:
Exception in thread "main" java.lang.classCastException: java.lang.Integer cannot be cast to java.lang.String
因为我们没有指定泛型的类型,所以在List中可以存放任意类型的数据。上述代码先在List中添加了一个String类型的数据,后添加了一个Integer类型的数据,编译器不会提示任何错误,但运行时却报错了。
这是因为List以第一次添加的数据类型为准,即以String的方式使用,后面再添加Integer类型的数据,程序就崩溃了。为了在编译阶段解决类似的问题,我们可以在代码中执行泛型的类型:
List<String> arrayList = new ArrayList<String>();
//arrayList.add(100); 在编译阶段,编译器提示错误
(2)例2
List<String> stringArrayList = new ArrayList<String>();
List<Integer> integerArrayList = new ArrayList<Integer>();
Class classstringArrayList = stringArrayList.getClass();
Class classIntegerArrayList = integerArrayList.getClass();
System.out.println(classstringArrayList==classIntegerArrayList);
输出结果:true
通过上面的例子可以证明,在编译之后程序会采取去泛型化的措施。也就是说Java中的泛型,只在编译阶段有效。在编译过程中,正确检验泛型结果后,在运行时会将泛型的相关信息擦出,编译器只会在对象进入JVM和离开JVM的边界处添加类型检查和转换的方法,泛型的信息不会进入到运行时阶段,这就是所谓的java类型擦除。
泛型擦除有两种方式,Java使用的是第一种方式,C++和C#使用的是第二种方式
它们也分别俗称“假”泛型和“真”泛型。导致程序在运行时对泛型类型没有感知,所以上述例子一的代码反编译后只剩下了List,实际上都是Class<? extends ArrayList>
的比较,导致例2输出的true
。
为什么Java要采用Code sharing机制进行类型擦除呢?有两点原因:一是Java泛型是到1.5版本才出现的特性,在此之前JVM已经在无泛型的条件下经历了较长时间的发展,如果采用Code specialization,就得对JVM的类型系统做伤筋动骨的改动,并且无法保证向前兼容性。二是Code specialization对每个泛型类型都生成不同的目标代码,如果有10个不同泛型的List,就要生成10份字节码,造成代码膨胀。
类型擦除对Flink的影响
来看一段简单的代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromCollection(Arrays.asList("hello", "world", "flink", "hello", "flink"));
DataStream<Tuple2<String, Integer>> mapDataStream = dataStream.map(word -> new Tuple2<>(word, 1));
mapDataStream.print();
env.execute();
程序运行报错,错误原因如下:
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. In many cases lambda methods don't provide enough @R_613_4045@ion for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has to be specified explicitly using type @R_613_4045@ion.
意思是说Tuple2
中的参数类型缺失,这很可能是因为lambda表达式不能提供足够的信息,使得无法自动检测出Tuple2
中的参数类型,建议我们使用匿名内部类代替。
我们换成匿名匿名内部类实现:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromCollection(Arrays.asList("hello", "world", "flink", "hello", "flink"));
DataStream<Tuple2<String, Integer>> mapDataStream = dataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2<>(value,1);
}
});
mapDataStream.print();
env.execute();
为什么使用lambda表达式,JVM就无法自动检测出Tuple2
中的参数类型,而匿名内部类却可以?
Tuple2中是有两个泛型的,使用匿名内部类时,会被真正编译为class文件,在对象进入JVM和离开JVM的边界处进行类型的检查和转换,从而保证Tuple2
的参数类型能够正确的被检测出来。这种方式其实是静态语言的特性。
而Lambda表达式是在运行时调用invokedynamic指令,用以支持动态语言的方法调用。具体来说,它将调用点(CallSite)抽象成一个 Java 类,并且将原本由 Java 虚拟机控制的方法调用以及方法链接暴露给了应用程序。在运行过程中,每一条 invokedynamic 指令将捆绑一个调用点,并且会调用该调用点所链接的方法句柄。在第一次执行 invokedynamic 指令时,Java 虚拟机会调用该指令所对应的启动方法(BootStrap Method),来生成前面提到的调用点,并且将之绑定至该 invokedynamic 指令中。在之后的运行过程中,Java 虚拟机则会直接调用绑定的调用点所链接的方法句柄。亦即在第一次执行其逻辑时才会确定。但是,对象进入JVM后,就会进行类型擦除,导致没有足够的信息检测出Tuple2
中两个泛型的具体类型。
上面的说法可能让人有点模糊,需要懂得JVM invokedynamic的原理(哈哈,其实我也没有深入挖,有机会再补)。
为了克服类型擦除带来的问题,Flink类型系统中提供了类型暗示(type hint)机制。在map之后调用returns方法,就可以指定返回类型了。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromCollection(Arrays.asList("hello", "world", "flink", "hello", "flink"));
DataStream<Tuple2<String, Integer>> mapDataStream = dataStream.map(word -> new Tuple2<>(word, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
mapDataStream.print();
env.execute();
另外,对于确定的数据类型(即没有泛型的数据类型),可以随意在flink中使用lambda表达式。例如:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromCollection(Arrays.asList("hello", "world", "flink", "hello", "flink"));
DataStream<String> mapDataStream = dataStream.map(word -> word+"_1");
mapDataStream.print();
env.execute();
上述代码就正常执行。
参考资料
- https://www.cnblogs.com/coprince/p/8603492.html
- https://time.geekbang.org/column/article/12564
- https://time.geekbang.org/column/article/12574
- https://zhuanlan.zhihu.com/p/26389041
- https://blog.csdn.net/nazeniwaresakini/article/details/104220123
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。