我们有一个Storm拓扑,其中我们配置了一个喷口和两个螺栓。Spout连续地从DB查询数据,并将其发送到first bolt进行某些处理。第一个bolt进行一些处理并将元组发送给第二个bolt,第二个bolt调用第三方web服务并发送数据。所以,经过一段时间后,最后一个bolt没有得到任何元组,如果我们重新启动拓扑,它工作得很好。这里只有最后一个螺栓有问题。其他喷口和第一螺栓运行良好,我不使用顶进框架。在本例中,我只配置了一个工作人员`。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("messageListenrSpout", new MessageListenerSpout(), 1);
builder.setBolt("processorBolt", new ProcessorBolt(), 20).shuffleGrouping("messageListenrSpout");
builder.setBolt("notifierBolt", new NotifierBolt(),40).shuffleGrouping("processorBolt");
Config conf = new Config();
conf.put(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS, 10000);
//conf.setMessageTimeoutSecs(600);
conf.setDebug(true);
StormSubmitter.submitTopology(TOPOLOGY, conf, builder.createTopology());
很可能是元组的积压造成了超时。尝试增加第二个bolt的并行性提示,因为听起来,一个人的处理时间比第一个bolt长得多(这就是为什么第二个bolt会有一个积压)。如果您正在集群上运行此拓扑,请查看Storm UI以了解详细信息。
我有一个简单的python脚本,它使用Google pubsub来检测Google云存储中的新文件。该脚本只需将新消息添加到队列中,另一个线程将在队列中处理这些消息: 这里,简单地将消息添加到队列中: 我遇到的问题是,过了一段时间(可能几天),订阅服务器停止接收新文件通知。如果我停止并重新启动脚本,它会一次获得所有通知。 我想知道是否有其他人也有类似的问题和/或可以建议解决问题的方法(可能通过打印
我在Azure中有一个服务总线队列和一个服务总线队列触发函数。当我第一次发布函数并向服务总线队列发送消息时,函数被触发并正常运行。 但是如果我不去处理它,并且在大约1个小时内不向队列发送任何消息,然后我发送了一条消息,那么函数就不会被触发。我必须通过按'run'在门户中再次手动运行该函数,否则我必须重新将其发布到Azure。 我如何保持它运行,这样我就不必每一个小时左右重新启动它???我的应用程序
问题内容: 这可能是一个重复的问题,但我没有找到想要的东西。我在UI活动中调用AsyncTask, 在doInBackground中调用需要时间的方法。如果一段时间后没有返回数据,我想中断该线程。以下是我尝试执行此操作的代码。 但这并不能在30秒后停止任务,事实上,这花费了更多时间。我也尝试过,但这也不起作用。 谁能告诉我该怎么做或如何在doInBackground中使用isCancelled()
我使用的是spring boot 2.2.4版本,spring-kafka 2.4.2版本 我的场景是以下一个: 所以我写了folloqing代码 生产者微服务 spring kafka配置: 在制作人方面所有的工作都很好。我能创造话题和发送信息。 消费者微服务 动态侦听器类 当我在生产者端发送消息时,我可以看到以下日志: 在消费者方面,我没有看到任何信息。我只看到下面的指纹: 谁能告诉我我错在哪
我已将flinkkafkaconsumer作为源添加到我的streamexecutionenvironment中。我想在特定时间内没有收到新消息时关闭/阻止flink使用数据(类似于kafka polltime)。目前它正在无限期运行,并阻止执行移动到下一步(验证消息)。请建议是否有任何解决方法。 注意:我从反序列化中尝试了endofstream,但它无法工作,因为流实际上是不确定的。 提前谢谢。