微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

The proctime attribute ‘rowtime‘ must not replace an existing field.

故障代碼如下:


// *************************************************************************
//     USER DATA TYPES
// *************************************************************************

/*
 * Simple POJO.
 */


import java.sql.Timestamp;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
 * Simple POJO.
 */
public class OrderStream
{
    public int id;
    public Long user;
    public String product;
    public int amount;
    public Long rowtime;

    public OrderStream()
    {
    }

    public OrderStream(int id,Long user,String product,int amount,Long rowtime)
    {
        this.id=id;
        this.user = user;
        this.product = product;
        this.amount = amount;
        this.rowtime = rowtime;
    }

    @Override
    public String toString()
    {
        return "Order{" +
                "id="+id+
                ", user=" + user +
                ", product='" + product + '\'' +
                ", amount=" + amount +
                ", ts=" + rowtime +
                '}';
    }
}



DataStream<OrderStream> orderA = env.fromCollection(Arrays.asList(
                new OrderStream(1, 1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00
                new OrderStream(2, 1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00
                new OrderStream(3, 3L, "rubber", 2, 1505527800L),//2017-09-16 10:10:00
                new OrderStream(4, 3L, "rubber", 2, 1505527800L),//2017-09-16 10:10:00
                new OrderStream(5, 1L, "diaper", 4, 1505528400L),//2017-09-16 10:20:00
                new OrderStream(6, 1L, "diaper", 4, 1505528400L)//2017-09-16 10:20:00
        ));




        tEnv.createTemporaryView("Orders", orderA,$("id"),$("user"), $("product"), $("amount"), $("rowtime").proctime());

完整報錯如下:

Exception in thread "main" org.apache.flink.table.api.ValidationException: The proctime attribute 'rowtime' must not replace an existing field.
	at org.apache.flink.table.typeutils.FieldInfoUtils$ExprToFieldInfo.validateProctimeDoesNotReplaceField(FieldInfoUtils.java:606)
	at org.apache.flink.table.typeutils.FieldInfoUtils$ExprToFieldInfo.createProctimeFieldInfo(FieldInfoUtils.java:599)
	at org.apache.flink.table.typeutils.FieldInfoUtils$ExprToFieldInfo.visit(FieldInfoUtils.java:564)
	at org.apache.flink.table.typeutils.FieldInfoUtils$ExprToFieldInfo.visit(FieldInfoUtils.java:536)
	at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
	at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
	at org.apache.flink.table.typeutils.FieldInfoUtils.lambda$extractFieldInfosByNameReference$7(FieldInfoUtils.java:431)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndcopyInto(AbstractPipeline.java:471)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
	at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfosByNameReference(FieldInfoUtils.java:432)
	at org.apache.flink.table.typeutils.FieldInfoUtils.extractField@R_556_4045@ion(FieldInfoUtils.java:266)
	at org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:233)
	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lambda$asQueryOperation$0(StreamTableEnvironmentImpl.java:384)
	at java.util.Optional.map(Optional.java:215)
	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.java:383)
	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.java:230)
	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.createTemporaryView(StreamTableEnvironmentImpl.java:262)
	at distinctAggregation4.main(distinctAggregation4.java:35)

The proctime attribute 'rowtime' must not replace an existing field.

原因與解決方案:

原因解決方案
使用proctime,那麼列名不能是OrderStream中的tEnv.createTemporaryView("Orders", orderA,$("id"),$("user"), $("product"), $("amount"), $("rowtime1").proctime());
使用rowtime,,列名可以是OrderStream中的.
tEnv.createTemporaryView("Orders", orderA,$("id"),$("user"), $("product"), $("amount"), $("rowtime").rowtime());

 

 

 

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐