当前位置: 首页 > 知识库问答 >
问题:

Kafka流处理器上下文中的周期性NPE

唐茂实
2023-03-14

使用kafka-stream0.10.0.0,我在转发消息时定期在StreamTask中看到空指针异常。它在10%到50%的调用之间变化。NPE发生在这个方法中:

public <K, V> void forward(K key, V value) {
    ProcessorNode thisNode = currNode;
    try {
        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
            currNode = childNode;
            childNode.process(key, value);
        }
    } finally {
        currNode = thisNode;
    }
}

似乎在某些情况下,thisNode字段为空。知道是什么导致了这种情况吗?堆栈跟踪在下面。

[ERROR] 2016-08-21 14:50:39.288 [StreamThread-1] StreamedMetricMeter - Forwarding failed
java.lang.NullPointerException
    at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:336) ~[kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) ~[kafka-streams-0.10.0.0.jar:?]
    at com.heliosapm.streams.metrics.processors.AbstractStreamedMetricProcessor.forward(AbstractStreamedMetricProcessor.java:552) [classes/:?]
    at com.heliosapm.streams.metrics.processors.impl.StreamedMetricMeter.doProcess(StreamedMetricMeter.java:89) [classes/:?]
    at com.heliosapm.streams.metrics.processors.impl.StreamedMetricMeter.doProcess(StreamedMetricMeter.java:1) [classes/:?]
    at com.heliosapm.streams.metrics.processors.AbstractStreamedMetricProcessor.process(AbstractStreamedMetricProcessor.java:166) [classes/:?]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) [kafka-streams-0.10.0.0.jar:?]

共有1个答案

郑正文
2023-03-14

问题是,我的处理器供应商每次调用都返回相同的处理器实例。反过来,Kafka Streams引擎试图创建多个处理器实例,我毫无疑问创建了多线程转储程序。同样粗心的人请注意。。。。处理器供应商。get()应该在每次调用时返回处理器的新实例。

 类似资料:
  • 对于传入记录,我需要验证值,并且基于结果对象,我需要将错误转发到不同的主题,如果成功验证,则使用context.forward()转发相同的错误。可以使用本链接中提供的DSL来完成 现在,调用者再次需要检查并根据键来区分接收器主题。我使用processorAPI是因为我需要use头。 编辑: 当条件为false时,如何推送到不同的流。当前正在创建另一个谓词,该谓词收集链中不满足上述谓词的所有其他记

  • 我正在Kafka流中的处理器节点上工作。对于一个简单的代码,我编写如下代码只是为了过滤用户ID,这是在kafka流中处理处理器节点的正确方法吗? 但是,下面的代码没有编译,抛出了一个错误:

  • 曾发表过多篇文章,但大多数都与处理错误消息有关,而不是处理过程中的异常处理。 我想知道如何处理流应用程序接收到的消息,并且在处理消息时出现异常?异常可能是由于多种原因造成的,如网络故障、RuntimeException等。, 有人能提出正确的方法吗?我应该使用setUncaughtExceptionHandler吗?还是有更好的方法

  • 本文向大家介绍缺陷报告的生命周期(处理流程)?相关面试题,主要包含被问及缺陷报告的生命周期(处理流程)?时的应答技巧和注意事项,需要的朋友参考一下 激活、待确认、已解决、待确认、重新激活、已关闭

  • 问题内容: 我有一个奇怪的问题,我想使用上下文处理器添加全局查询。这是我通过以下方法做到的: 这样在我的应用中创建了一个processor.py: 并在我的setting.py结尾处添加了以下内容: 最后,我通过以下观点: 在我的index.html模板上: 最后是我的网址: 我的foos显示没有问题,但是我的media_url和其他上下文消失了。可能是什么问题 问题答案: 当你指定时: 在设置文

  • 本文向大家介绍8085微处理器中不同机器周期的比较,包括了8085微处理器中不同机器周期的比较的使用技巧和注意事项,需要的朋友参考一下 到目前为止,我们已经遇到了OF,MR,MW,IOR和IOW机器周期。8085中其他可能的机器周期是BI(总线空闲)和INA(中断应答)机器周期。现在,下表列出了一些机器周期之间的差异。 OF和MR之间的区别 1. OF的情况下的T状态数为4,MR的情况下的T状态数