本篇要解决的问题:
- Flink是使用Java的序列化方式吗?
- Java序列化方式有什么问题?
- Java中是用Class描述类型,Flink也是用Class描述吗?
- 请解释以下java类型擦除。
- Flink中为什么使用Lambda表达式实现flatMap需要通过returns指定类型呢?
- new ArrayList
()和new ArrayList (){}的区别是什么?
类型和序列化
类型和序列化是每一个计算引擎非常重要的模块。不管是Hadoop MapReduce、Spark还是Flink。以前,我们写单机Java程序的时候,几乎都不会太关注这一点。只要会使用类型、创建出来对象就可以了,序列化更只是做一个了解。
但大数据计算引擎,为了提高计算性能,需要尤其注意类型和序列化。特别是对于一些新手,写代码的时候,一碰到这方面的错误/异常,就不知道如何处理。大数据计算引擎都是分布式计算,分布式计算中存储和传输是非常重要的,每一个对象都是数据,都需要进行存储和传输,不管是存储在内存还是磁盘。所以,只要涉及到大数据的计算引擎,我们都需要将类型和序列化结合在一起来分析。
每一套计算引擎或者存储引擎,都有自己的类型系统。计算框架通过类型系统构建出来了属于自己独特的数据世界。Apache Flink也不例外,它有自己的类型系统,和自己的序列化系统。简单、易用、结构清晰的类型系统能够保障我们编写Flink代码,就像使用Java语言编写普通程序一样简单,高效、高性能序列化系统能够保障我们编写的Flink程序能够用最快地速度进行存储和传输。
Flink目前的主要开发语言是Java和Scala。作为框架的设计者,一定会优先考虑基于这两种语言来编写程序的易用性。可以明显感知到的是,我们使用Flink引擎不用像之前的Hadoop MapReduce程序一样,输入输出必须得使用MapReduce框架自带的类型和序列化。这对于开发人员来说,大大减轻了负担。所以很多开发人员几乎感知不到Flink类型系统的存在。但如果涉及到一些复杂场景,特别是引入一些特殊自定义数据类型或者使用其他的序列化方式,我们不了解Flink类型系统和序列化机制,碰见问题时一头雾水。
Flink类型系统
Flink为什么要自己实现序列化框架
目前,绝大多数的大数据计算框架都是基于JVM实现的,为了快速地计算数据,需要将数据加载到内存中进行处理。当大量数据需要加载到内存中时,如果使用Java序列化方式来存储对象,占用的空间会较大降低存储传输效率。
例如:一个只包含布尔类型的对象需要占用16个字节的内存:对象头要占8个字节、boolean属性占用1个字节、对齐填充还要占用7个字节。
Java序列化方式存储对象存储密度是很低的。也是基于此,Flink框架实现了自己的内存管理系统,在Flink自定义内存池分配和回收内存,然后将自己实现的序列化对象存储在内存块中。
Java生态系统中有挺多的序列化框架,例如:Kryo、Avro、ProtoBuf等。Flink自己实现了一套序列化系统可以让我们编写程序的时候,尽快地发现问题,更加节省内存空间,并直接进行二进制数据的处理。
Flink类型系统
基于Java和Scala语言,Flink实现了一套自己的一套类型系统,它支持很多种类的类型:
- 基本类型
- 数组类型
- 复合类型
- 辅助类型
- 通用类型
对于我们创建的任意的一个POJO类型,看起来它是一个普通的Java Bean,在Java中,使用Class可以用来描述该类型。但其实在Flink引擎中,它被描述为PojoTypeInfo,而PojoTypeInfo是Type@R_487_4045@ion的子类。
Type@R_487_4045@ion
Type@R_487_4045@ion是Flink类型系统的核心类。Flink使用Type@R_487_4045@ion来描述所有Flink支持的数据类型,就像Java中的Class类型一样。每一种Flink支持的数据类型都对应的是Type@R_487_4045@ion的子类。
例如:POJO类型对应的是PojoTypeInfo、基础数据类型数组对应的是BasicArrayTypeInfo、Map类型对应的是MapTypeInfo、值类型对应的是ValueTypeInfo。
除了对类型地描述之外,Type@R_487_4045@ion还提供了序列化的支撑。在Type@R_487_4045@ion中有一个方法:createSerializer方法,
它用来创建序列化器,序列化器中定义了一系列的方法。其中,通过serialize和deserialize方法,可以将指定类型进行序列化。并且,Flink的这些序列化器会以稠密的方式来将对象写入到内存中。
Flink中也提供了非常丰富的序列化器。
在我们基于Flink类型系统支持的数据类型进行编程时,Flink在运行时会推断出数据类型的信息,程序员在基于Flink编程时,几乎是不需要关心类型和序列化的。
类型与Lambda表达式支持
java类型擦除
在编译时,编译器能够从Java源代码中读取到完整的类型信息,并强制执行类型的约束,但生成class字节码时,会将参数化类型信息删除。这就是类型擦除。
注意:泛型只是在编译器编译时能够理解该类型。但编译后执行时,泛型是会被擦除掉的。(但不总是这样J)
例如:如果在Java代码中使用的是ArrayList
public static <T> boolean hasItems(T [] items, T item){
for (T i : items){
if(i.equals(item)){
return true;
}
}
return false;
}
以上是一段Java的泛型方法。但在编译后,编译器会将未绑定类型的T擦除掉,替换为Object。也就是变成下面的样子:
public static Object boolean hasItems(Object [] items, Object item){
for (Object i : items){
if(i.equals(item)){
return true;
}
}
return false;
}
泛型只是能够防止在运行时出现类型错误。
Lambda表达式的支持
下面我用非常简单的一段Flink代码来演示Flink对Lambda表达式的支持。
public class FlinkTypeDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<List<String>> phoneTypeDS = env.addSource(new SourceFunction<List<String>>() {
private Boolean isCancel = false;
public void run(SourceContext<List<String>> sourceContext) throws Exception {
while (!isCancel) {
List<String> phoneTypeList = new ArrayList<String>();
phoneTypeList.add("iphone");
phoneTypeList.add("xiaomi");
phoneTypeList.add("meizu");
sourceContext.collect(phoneTypeList);
TimeUnit.SECONDS.sleep(3);
}
}
public void cancel() {
isCancel = true;
}
});
SingleOutputStreamOperator<String> eachTypeDS = phoneTypeDS.flatMap((val, out) -> {
val.stream().forEach(t -> out.collect(t));
});
eachTypeDS.print();
env.execute("GenericTypeTest");
}
}
上面代码起来没有什么问题。但运行时会出现以下异常。
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(FlinkTypeDemo.java:37)' Could not be determined automatically, due to type erasure. You can give type @R_487_4045@ion hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
Flink以非常友好的方式提示我们了:
Could not be determined automatically, due to type erasure. You can give type @R_487_4045@ion hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
就是因为Java的类型擦除,所以Flink根本无法推断出来该flatMap要输出的类型是什么。
所以,Flink让我们指定该类型的Type@R_487_4045@ion,给它一个提示(TypeHint)。
指定Type@R_487_4045@ion或者TypeHint
创建Type@R_487_4045@ion:
phoneTypeDS.flatMap((List<String> val, Collector<String> out) -> {
val.stream().forEach(t -> out.collect(t));
})
.returns(Type@R_487_404[email protected](String.class))
.print();
创建TypeHint:
phoneTypeDS.flatMap((List<String> val, Collector<String> out) -> {
val.stream().forEach(t -> out.collect(t));
})
.returns(new TypeHint<String>() {})
.print();
大家可能会问,第二种方式不是也会进行类型擦除吗?
嘿嘿!注意TypeHint后面的那对花括号,千万别走开,继续往下看。
匿名内部类开启后门
SingleOutputStreamOperator<String> eachTypeDS = phoneTypeDS.flatMap(new FlatMapFunction<List<String>, String>() {
@Override
public void flatMap(List<String> value, Collector<String> out) throws Exception {
value.stream().forEach(v -> out.collect(v));
}
});
用匿名内部类的方式实现flatMap,我们惊奇地发现,程序居然可以成功运行。说明Flink在运行时确实正确地读取到了类型信息。
为什么使用匿名内部类就可以呢?我们在匿名内部类中也使用了泛型啊。奇怪!
有一个概念需要大家记住:泛型类型可以用于匿名类。javac编译器遇到匿名类时,会在字节码中创建数据结构(注意该结构只在运行时可用),这个数据结构中就包含了实际的泛型参数信息。所以,使用匿名类实例化对象是不会进行类型擦除的。
ArrayList<String> strings1 = new ArrayList<String>();
ArrayList<String> strings2 = new ArrayList<String>() {};
上面两行代码看起来没啥区别,但其实,第一行代码会进行编码擦除,而第二行代码是不会进行编码擦除的。
总结
Flink是使用Java的序列化方式吗?
不是。Flink自己实现了一套类型系统和序列化系统。
Java序列化方式有什么问题?
Java序列化方式对象存储很稀疏,除了存储数据本身外,还需要存储Class header数据、对齐填充数据。针对海量数据,会占用较多额外空间降低存储传输效率。
Java中是用Class描述类型,Flink也是用Class描述吗?
不是。Flink是基于Type@R_487_4045@ion描述类型信息,针对每一种类型Flink都提供了自己的Type@R_487_4045@ion,例如:PojoTypeInfo、ValueTypeInfo等等。
请解释一下java类型擦除。
Java中使用泛型只是在编译时进行强制类型检查,避免因为类型问题出现Runtime异常。而在运行过程中,泛型类型是会被擦除掉的。
Flink中为什么使用Lambda表达式实现flatMap需要通过returns指定类型呢?
flatMap有一个泛型参数Colletor
new ArrayList<String>()和new ArrayList<String>(){}的区别是什么?
使用起来是一样的。但后者是匿名内部类,在运行时会保留泛型信息。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。