微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

聊聊Java类型擦除、Flink中使用Lambda表达式丢失信息和Flink类型暗示机制

@H_404_0@

最近在学Flink时发现,由于java类型擦除的存在,导致Flink中使用Lambda表达式时,无法检测出泛型的类型,需要使用Flink类型暗示(type hint)机制才能解决。现在我们就深入剖析一下吧!

什么是Java泛型擦除

本文不介绍Java的泛型,对泛型不太了解的同学强烈推荐这篇博客https://www.cnblogs.com/coprince/p/8603492.html

看两个例子:

(1)例1

List arrayList = new ArrayList();
arrayList.add("abc");
arrayList.add(12);

for(int i = 0; i< arrayList.size();i++){
	String item = (String)arrayList.get(i);
	System.out.println(item);
}

运行报错:

Exception in thread "main" java.lang.classCastException: java.lang.Integer cannot be cast to java.lang.String

因为我们没有指定泛型的类型,所以在List中可以存放任意类型的数据。上述代码先在List中添加一个String类型的数据,后添加一个Integer类型的数据,编译器不会提示任何错误,但运行时却报错了。

这是因为List以第一次添加的数据类型为准,即以String的方式使用,后面再添加Integer类型的数据,程序就崩溃了。为了在编译阶段解决类似的问题,我们可以在代码中执行泛型的类型:

List<String> arrayList = new ArrayList<String>();
//arrayList.add(100); 在编译阶段,编译器提示错误

(2)例2

List<String> stringArrayList = new ArrayList<String>();
List<Integer> integerArrayList = new ArrayList<Integer>();

Class classstringArrayList = stringArrayList.getClass();
Class classIntegerArrayList = integerArrayList.getClass();

System.out.println(classstringArrayList==classIntegerArrayList);

输出结果:true

通过上面的例子可以证明,在编译之后程序会采取去泛型化的措施。也就是说Java中的泛型,只在编译阶段有效。在编译过程中,正确检验泛型结果后,在运行时会将泛型的相关信息擦出,编译器只会在对象进入JVM和离开JVM的边界处添加类型检查和转换的方法,泛型的信息不会进入到运行时阶段,这就是所谓的java类型擦除

泛型擦除有两种方式,Java使用的是第一种方式,C++和C#使用的是第二种方式

  • 方式一:Code sharing。对同一个原始类型下的泛型类型只生成同一份目标代码
  • 方式二:Code specialization。对每一个泛型类型都生成不同的目标代码

它们也分别俗称“假”泛型和“真”泛型。导致程序在运行时对泛型类型没有感知,所以上述例子一的代码反编译后只剩下了List,实际上都是Class<? extends ArrayList>的比较,导致例2输出true

为什么Java要采用Code sharing机制进行类型擦除呢?有两点原因:一是Java泛型是到1.5版本才出现的特性,在此之前JVM已经在无泛型的条件下经历了较长时间的发展,如果采用Code specialization,就得对JVM的类型系统做伤筋动骨的改动,并且无法保证向前兼容性。二是Code specialization对每个泛型类型都生成不同的目标代码,如果有10个不同泛型的List,就要生成10份字节码,造成代码膨胀。

类型擦除对Flink的影响

来看一段简单的代码

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dataStream = env.fromCollection(Arrays.asList("hello", "world", "flink", "hello", "flink"));
DataStream<Tuple2<String, Integer>> mapDataStream = dataStream.map(word -> new Tuple2<>(word, 1));
mapDataStream.print();
env.execute();

程序运行报错,错误原因如下:

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. In many cases lambda methods don't provide enough @R_613_4045@ion for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has to be specified explicitly using type @R_613_4045@ion.

意思是说Tuple2中的参数类型缺失,这很可能是因为lambda表达式不能提供足够的信息,使得无法自动检测出Tuple2中的参数类型,建议我们使用匿名内部类代替。

我们换成匿名匿名内部类实现:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dataStream = env.fromCollection(Arrays.asList("hello", "world", "flink", "hello", "flink"));
DataStream<Tuple2<String, Integer>> mapDataStream = dataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
    @Override
    public Tuple2<String, Integer> map(String value) throws Exception {
        return new Tuple2<>(value,1);
    }
});

mapDataStream.print();
env.execute();

上述代码成功执行并打印输出

为什么使用lambda表达式,JVM就无法自动检测出Tuple2中的参数类型,而匿名内部类却可以?

Tuple2中是有两个泛型的,使用匿名内部类时,会被真正编译为class文件,在对象进入JVM和离开JVM的边界处进行类型的检查和转换,从而保证Tuple2的参数类型能够正确的被检测出来。这种方式其实是静态语言的特性。

而Lambda表达式是在运行时调用invokedynamic指令,用以支持动态语言的方法调用。具体来说,它将调用点(CallSite)抽象成一个 Java 类,并且将原本由 Java 虚拟机控制的方法调用以及方法链接暴露给了应用程序。在运行过程中,每一条 invokedynamic 指令将捆绑一个调用点,并且会调用调用点所链接方法句柄。在第一次执行 invokedynamic 指令时,Java 虚拟机会调用该指令所对应的启动方法(BootStrap Method),来生成前面提到的调用点,并且将之绑定至该 invokedynamic 指令中。在之后的运行过程中,Java 虚拟机则会直接调用绑定的调用点所链接方法句柄。亦即在第一次执行其逻辑时才会确定。但是,对象进入JVM后,就会进行类型擦除,导致没有足够的信息检测出Tuple2中两个泛型的具体类型。

上面的说法可能让人有点模糊,需要懂得JVM invokedynamic的原理(哈哈,其实我也没有深入挖,有机会再补)。

为了克服类型擦除带来的问题,Flink类型系统中提供了类型暗示(type hint)机制。在map之后调用returns方法,就可以指定返回类型了。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromCollection(Arrays.asList("hello", "world", "flink", "hello", "flink"));
DataStream<Tuple2<String, Integer>> mapDataStream = dataStream.map(word -> new Tuple2<>(word, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
mapDataStream.print();
env.execute();

另外,对于确定的数据类型(即没有泛型的数据类型),可以随意在flink中使用lambda表达式。例如:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dataStream = env.fromCollection(Arrays.asList("hello", "world", "flink", "hello", "flink"));
DataStream<String> mapDataStream = dataStream.map(word -> word+"_1");

mapDataStream.print();
env.execute();

上述代码就正常执行。

参考资料

  1. https://www.cnblogs.com/coprince/p/8603492.html
  2. https://time.geekbang.org/column/article/12564
  3. https://time.geekbang.org/column/article/12574
  4. https://zhuanlan.zhihu.com/p/26389041
  5. https://blog.csdn.net/nazeniwaresakini/article/details/104220123

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐