Flink OutputTag 为什么需要加 "{}"
结论
先给出结论,OutputTag 可以加 也可以不加
// case 1 不加 {} ,运行时会报错
OutputTag<String> stringOutputTg = new OutputTag<String>("a");
// case 2 加 {} 就是 一个继承了 OutputTag 的 匿名类
OutputTag<String> stringOutputTg = new OutputTag<String>("a"){};
// case 3 不加 也是没有问题
OutputTag<String> stringOutputTg = new OutputTag<String>("a", Type@R_367_404[email protected](String.class));
为什么case 1报错
其实 case 2 一个是继承了OutputTag 的匿名类的对象 ,case 1 是直接 OutputTag 的对象。问题就出在构造方法
private final String id;
private final Type@R_367_4045@ion<T> typeInfo;
public OutputTag(String id) {
Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty.");
this.id = id;
try {
// 问题关键
this.typeInfo = TypeExtractor.createTypeInfo(this, OutputTag.class, getClass(), 0);
} catch (InvalidTypesException e) {
throw new InvalidTypesException(
"Could not determine Type@R_367_4045@ion for the OutputTag type. "
+ "The most common reason is forgetting to make the OutputTag an anonymous inner class. "
+ "It is also not possible to use generic type variables with OutputTags, such as 'Tuple2<A, B>'.",
e);
}
}
case 1 的异常
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Could not determine Type@R_367_4045@ion for the OutputTag type. The most common reason is forgetting to make the OutputTag an anonymous inner class. It is also not possible to use generic type variables with OutputTags, such as 'Tuple2<A, B>'.
at org.apache.flink.util.OutputTag.<init>(OutputTag.java:68)
at com.chouc.flink.OutputTg.main(OutputTg.java:52)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The types of the interface org.apache.flink.util.OutputTag Could not be inferred. Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point
at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1371)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:811)
at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:787)
at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:774)
at org.apache.flink.util.OutputTag.<init>(OutputTag.java:66)
... 1 more
导致错误的代码
private static Type getParameterType(
Class<?> baseClass, List<Type> typeHierarchy, Class<?> clazz, int pos) {
if (typeHierarchy != null) {
typeHierarchy.add(clazz);
}
Type[] interfaceTypes = clazz.getGenericInterfaces();
// search in interfaces for base class
for (Type t : interfaceTypes) {
Type parameter = getParameterTypeFromGenericType(baseClass, typeHierarchy, t, pos);
if (parameter != null) {
return parameter;
}
}
// 关键代码
// search in superclass for base class
Type t = clazz.getGenericSuperclass();
Type parameter = getParameterTypeFromGenericType(baseClass, typeHierarchy, t, pos);
if (parameter != null) {
return parameter;
}
throw new InvalidTypesException(
"The types of the interface "
+ baseClass.getName()
+ " Could not be inferred. "
+ "Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point");
}
原因
typeInfo 是一个带泛型的类,在构建的时候必须创建好对象,而这个泛型就是当前class 的泛型。如果在之传入id 不传入 typeInfo 的时候,就需要获取当前类的泛型,但是当前类是无法读到当前类的泛型,只能读 super 类的泛型。通过方法:java.lang.class#getGenericSuperclass
例子
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
public class OutputTg <T> {
String a;
T b;
public OutputTg(String a) throws InstantiationException, illegalaccessexception {
this.a = a;
System.out.println(this);
Type genericSuperclass = this.getClass().getGenericSuperclass();
System.out.println(genericSuperclass);
ParameterizedType baseClassChild = (ParameterizedType) genericSuperclass;
System.out.println(baseClassChild);
Type actualTypeArgument = baseClassChild.getActualTypeArguments()[0];
System.out.println(actualTypeArgument);
Class<? extends Type> bClass = (Class<? extends Type>) actualTypeArgument;
System.out.println(bClass);
this.b = (T) bClass.newInstance();
}
public static void main(String[] args) {
OutputTg<String> stringOutputTg = null;
try {
// error
// stringOutputTg = new OutputTg<String>("a");
// 正常
stringOutputTg = new OutputTg<String>("a") {};
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (illegalaccessexception e) {
throw new RuntimeException(e);
}
System.out.println(stringOutputTg.a);
System.out.println(stringOutputTg.b);
}
}
为什么case3 可以
private final String id;
private final Type@R_367_4045@ion<T> typeInfo;
public OutputTag(String id, Type@R_367_4045@ion<T> typeInfo) {
Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty.");
this.id = id;
this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type@R_367_4045@ion cannot be null.");
}
看了上面的原因,case 2 和 case 3 就很清晰,case 2 是继承了 OutputTg ,可以直接通过 java.lang.class#**getGenericSuperclass
读到泛型类型。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。