当前位置: 首页 > 知识库问答 >
问题:

如果无法访问接收器节点/主题,Kafka streams将关闭?

谷梁嘉悦
2023-03-14

我想测试使用处理器API的Kafka Streams从源代码读取和写入主题列表时的场景,一个或两个主题是不可访问的(失败测试:试图通过添加集群中不存在的1/2主题来模拟它)。

     topology.addSource("mysource","source_topic");
     topology.addProcessor("STREAM_PROCESSOR",()->new SourceProcessor(),"mysource");
     topology.addSink("SINK_1","TOPIC_1","STREAM_PROCESSOR");
     topology.addSink("SINK_2","TOPIC_2","STREAM_PROCESSOR");
     topology.addSink("SINK_3","TOPIC_3","STREAM_PROCESSOR"); // This topic is not present in cluster

      sourceContext.forward(eventName,eventMessage,To.child("sink1 or sink2 or sink3"));

我的理解是kafkaStreams应该为不存在的主题给出错误,并继续将记录转发到存在的主题1和主题2。

但我看到的行为是,它会产生以下错误:

     Exception in thread "StreamProcessor-56da56e4-4ab3-4ca3-bf48-b059558b689f-StreamThread-1" 
     org.apache.kafka.streams.errors.StreamsException: 
     task [0_0] Abort sending since an error caught with a previous record (timestamp 1592940025090) to topic "TOPIC_X" due to 
     org.apache.kafka.common.errors.TimeoutException: Topic "TOPIC_X" not present in metadata after 60000 ms.
     Timeout exception caught when sending record to topic "TOPIC_X". This might happen if the producer cannot send data to the
     Kafka cluster and thus, its internal buffer fills up. This can also happen if the broker is slow to respond, if the network connection to the
     broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout.

这是模拟不可达主题或主题不存在问题的正确方式吗?为什么即使在处理Streams和拓扑异常时,Kafka流也会因上述错误而关闭。如果某个接收器主题因某种原因不可用或不可访问,kafka流不应该关闭,对吗?。好心建议

关于上面的错误,我想在捕获StreamsException到错误主题时转发错误,但是kafkastreams会过早停止。

catch(StreamsException e)
{
    context.forward("","",Error_Topic)
}

这是预期的行为吗?

参考:https://docs.confluent.io/current/streams/developer-guide/manage-topics.html#user-主题这是否意味着kafkastreams拓扑中不允许不存在的主题作为接收节点。请确认。

共有1个答案

杨雪松
2023-03-14

根据设计,如果Kafka流不能写入水槽主题,它就会关闭。原因是默认情况下,Kafka Streams保证至少一次处理语义学,如果它不能将数据写入一个接收器主题,但将继续,至少一次处理将被违反,因为接收器主题中会有数据丢失。

有一个产品。例外处理程序可能有帮助的配置。它允许您在将数据写入输出主题时吞下某些异常。但是,请注意,这意味着相应主题的数据丢失。

 类似资料:
  • 工作节点1上的Curl对群集IP来说是最合适的(这是运行pod的节点) Curl在其他工作节点上也失败:

  • 我使用以下配置将lirc0绑定到容器中。 主机中/dev/lirc0的属性如下所示。 我使用lxc连接进入容器。容器中的属性与主机相同 但是无法打开。错误是“不允许操作”,我是容器中的主管(root)。 如何获得访问容器中的的权限?

  • 我使用他们的web UI在EMR上创建了一个AWS Spark2.2集群(这里是新手)。我知道我需要连接到主节点,以便开始发出pyspark命令来学习Spark。但是,当我尝试连接到主节点时,它给我一个错误。在浏览了internet之后,我发现使用可能有助于调试正在进行的操作,但我找不到任何有用的信息。下面是我的ssh调试日志。 有人能指出这里的问题是什么吗?编辑:我已经尝试过将端口22添加到安全

  • 我有四个Kafka流应用程序实例使用相同的应用程序id运行。所有输入主题都属于一个分区。为了实现可伸缩性,我通过一个具有多个分区的中间虚拟主题来传递它。我已经将< code>request.timeout.ms设置为4分钟。 Kafka 实例进入 ERROR 状态,而不会引发任何异常。很难弄清楚确切的问题是什么。有什么想法吗?

  • 问题内容: 我正在使用Jenkins Build Flow插件来实现并行化。Groovy DSL执行某些文件操作。即使该选项设置为在特定从属服务器上运行作业,但DSL仍在主服务器上运行。这不是故意的。 有人可以告诉我如何限制DSL在指定的从站上运行吗?即使有一种方法可以通过DSL访问从文件系统,也应该可以。 通常,我们如何使用Groovy从Jenkins主节点访问节点从节点上的文件? 工作空间位于