当前位置: 首页 > 知识库问答 >
问题:

Storm消息失败

昌勇锐
2023-03-14

共有1个答案

沈国安
2023-03-14

您的问题与StormTopology中KafkaSpout对背压的处理有关。

您可以通过在拓扑配置中设置maxSpoutPending值来处理KafkaSpout的背压,

Config config = new Config();
config.setMaxSpoutPending(200); 
config.setMessageTimeoutSecs(100);

StormSubmitter.submitTopology("testtopology", config, builder.createTopology());

maxSpoutPending是给定时间拓扑中可以挂起确认的元组数。设置此属性将使KafkaSpout不再使用来自Kafka的任何数据,除非未确认的元组计数小于maxSpoutPending值。

 类似资料:
  • 我们试图在Storm拓扑中对性能进行基准测试。我们正在摄取大约1000/秒的消息到Kafka主题。当我们在KafkaSpout中max.spout.pendind=2000时,我们在Storm UI中不会看到任何失败的消息,但是当我们将max.spout.pendind值降低到500或100时,我们会在Storm UI中的spout中看到许多失败的消息。我的理解是,如果我们保持低max.spout

  • 我正在尝试构建一个Flink作业,该作业将从Kafka源读取数据并进行一系列处理,包括很少的REST调用,然后最终进入另一个Kafka主题。 我试图解决的问题是消息重试。如果REST API中存在瞬时错误怎么办?如何像Storm支持的方式那样,对这些消息进行基于指数退避的重试? 我有两种方法可以考虑 使用TimerService,但如果发生故障,状态将开始不受控制地扩展。 将失败的消息写入不同的K

  • 如何从动物园管理员那里获得最后一次偏移时间?当使用Storm喷口阅读来自Kafka的消息时。上下文:Kafka 不断获取消息,使用者读取一段时间,然后由于任何原因关闭,然后使用者仅读取最新消息,但不读取上次偏移量读取

  • 我正在尝试与RESTful API接口,该API在post中接受应用程序/x-protobuf对象。 .原型示例对象: 使用请求,我可以将此消息发布到服务器并收到200。 例如 当我以SerializeToString()格式查看此有效负载时,它显示类似于 b'\n\t\n\x03foo\x10\x01' 作为健全性检查,然后我可以创建一个新的消息对象和。将其上的ParseFromString()

  • 我想使用rabbitMq队列中Storm喷口中的消息。 现在,我们使用Spring AMQP异步发送和接收来自RabbitMq的消息。 Spring AMQP提供了从队列读取消息的机制(创建监听器或使用注释@RabbitListner)。 问题是我可以让一个侦听器从队列中读取消息。但是,我如何将此消息发送到Storm群上运行的Storm喷口? 拓扑将启动一个集群,但在我的spout的nextTup