当前位置: 首页 > 知识库问答 >
问题:

Flink进程中的TimerException

微生宝
2023-03-14

我们正在使用几个操作符运行Flink作业,包括map、windowing、flatMap(),作业失败,出现以下错误-只是想知道是什么原因导致此错误:

2021-05-27 07:00:07,023 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Collect Inventory (1/1)#0 (34d81cf2e59f350886f93a1e0f734d38) switched from RUNNING to FAILED with failure cause: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer.
 at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1282)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1258)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1397)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$16(StreamTask.java:1386)
 at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
 at java.lang.Thread.run(Thread.java:748)
Caused by: TimerException{java.lang.RuntimeException: Assigned key must not be null!}
 ... 12 more
Caused by: java.lang.RuntimeException: Assigned key must not be null!
 at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109)
 at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:93)
 at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
 at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
 at org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:35)
 at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:48)
 at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:577)
 at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533)
 at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1395)
 ... 11 more
Caused by: java.lang.NullPointerException: Assigned key must not be null!
 at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
 at org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:51)
 at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:63)
 at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:35)
 at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
 at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
 ... 22 more
2021-05-27 07:00:07,023 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Triggering cancellation of task code Collect Inventory (1/1)#0 (34d81cf2e59f350886f93a1e0f734d38).
{eventTime:2021-05-27T07:00:05.878Z, batchId:4a0c09ad-1e28-4f74-a19a-8e1422c5bc5a, serialDetail:null, inventoryCount:{"location": "tmobile01", "sku": "190198496225", "state": "LOST", "inventoryStatus": "Adjusted-Out", "inventoryType": "ACC", "sellableFlag": false, "version": 1, "updateTimestamp": 2021-05-27T07:00:07.118Z, "globalQuantity": -1, "localQuantity": -1, "eventId": null, "movementSource": "", "locationType": "Store"

共有1个答案

童琪
2023-03-14

您似乎正在使用keyBy操作,提取的密钥为null。你不能在Flink中输入null。

 类似资料:
  • 问题内容: 我试图使用读取文件: 我收到以下错误: 我正在使用flink版本1.3.2,java版本“ 1.8.0_91” 问题答案: 错误“ java.lang.NoSuchMethodError”的可能原因之一是当我们使用的flink版本与系统上安装的版本不同时。对我来说,我有Flink 1.3.2,我使用的版本是1.1.1。因此,我将pom文件更新为相同的版本。

  • 我们有一个包含事务的非键控数据流和一个包含规则的广播流。事实上,我们希望根据上次看到的规则处理事务。如果我们最后看到的规则是每日,我们必须将当前事务添加到每日事务列表中。此外,如果dailyTrnsList的大小大于阈值,则必须清除列表并将事务写入数据库。如果最后看到的规则是temp,我们也会做同样的事情。 代码如下: 我们的问题是编写一种容错方法。我们不知道如何使用ListState来解决我们的

  • 问题内容: 在我阅读的问题中,我们建议在进程上使用线程,因为线程速度更快。我决定使用程序的线程来编辑Wikipedia中某个类别的文章。该程序获取要编辑的文章列表,然后将文章划分为10个线程。这样一来,我每分钟进行6-7次编辑,这与我没有使用线程的速度相同。当我启动程序的多个实例并为每个实例指定要处理的类别时,我看到每个进程每分钟可以进行6-7次编辑(我用5个进程进行了测试)。 为什么我的流程更快

  • null 其中lambda1、2等是条件检查函数,例如 但不知什么原因对我不起作用,也许还有其他方法?正如我从文档(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html)中了解到的,OutputTag用于创建标记为tag的附加消息。还是我错了?

  • 我正在使用ApacheFlink-1.1.3进行实时流数据分析。我的系统由用于消息队列的Kafka集群、从Kafka分区读取消息并对其进行分析的Flink集群组成,最后我希望将生成的数据转储到Ignite缓存中。对于系统,我使用IgniteSink类将数据放入ignite缓存。版本如下: Flink1.1.3, Kafka2.10,点燃2.0.0 当我试图在flink cluster上运行作业时,

  • 主要内容:1.算子状态概述,2.算子状态 编程案例,3.状态持久化和状态后端,4.状态编程总结1.算子状态概述 1.1 算子状态分类 算子状态: 列表状态, 联合列表状态, 广播状态 ListState, UnionListState, BroadcastState 1.2 状态分析 列表状态: 与 Keyed State 中的 ListState 一样,将状态表示为一组数据的列表。与 Keyed State 中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以