微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

FlinkCDC问题集锦

1.  MysqL的账号权限问题

Caused by: java.sql.sqlSyntaxErrorException: Access denied; you need (at least one of) the RELOAD privilege(s) for this operation

解决方案:https://www.cnblogs.com/30go/p/15808632.html

 

2. 指定StartupOptions.latest() 会出现检查点保存空指针异常问题

这个场景是从MysqL到Kafka,设置了ck,MysqL的读取设置的是 StartupOptions.latest()

报错异常如下:

Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 4 for operator Source: Custom Source (1/1)#3. Failure reason: Checkpoint was declined.
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:697)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:618)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:583)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:309)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1013)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:997)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailBox(StreamTask.java:921)
    ... 10 more
Caused by: java.lang.NullPointerException
    at com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.snapshotOffsetState(DebeziumSourceFunction.java:262)
    at com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.snapshotState(DebeziumSourceFunction.java:240)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
    ... 20 more

按道理说,从最新的记录开始,如果没有最新的,那么应该是等待状态的,所以这里不知道为啥会报异常。

把 StartupOptions.latest() 改为StartupOptions.earliest() 就正常了。 啥时候空了再慢慢翻源码吧。。。

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐