当前位置: 首页 > 知识库问答 >
问题:

Kafka·斯普特在Storm拓扑上读了两次信息

巫马越彬
2023-03-14

我试图模拟流流量使用KafkaStorm。我使用KafkaSpout阅读来自一个主题的消息,该主题由一个生产者发送,该生产者阅读这些推文并将其发送到一个主题。我的问题是,在拓扑消耗了本主题中的所有推文发送后,它会继续阅读本主题中的消息两次。如何阻止KafkaSpout阅读两次?复制因子设置为1)

共有1个答案

岳志义
2023-03-14

在我看来配置不错。

也许问题是双重包装。确保在execute中只对每个元组进行一次确认。

如评论中所述,请考虑升级到较新的Kafka版本,以及切换到Storon-kafka-Client

也可能会让你的生活变得简单一点:考虑扩展<代码> BaseBaseBase<代码>,而不是<代码> BaseRixByth。如果运行execute未引发错误,则code>BaseBasicBolt会自动为您确认元组。如果要使元组失败,可以抛出FailedExceptionBaseRichBolt仅当您希望执行更复杂的确认时才应使用,例如,在确认之前聚合内存中许多execute调用的元组。

 类似资料:
  • 我正在尝试使用Eclipse在Linux中运行Storm启动示例。我收到以下错误和函数从未被调用。 错误: 我的拓扑类: 我正在虚拟机环境中工作,所以不知道这是否是由于安装了Zookeeper。有什么想法吗?

  • 8台机器一直在使用。每一个都有22个核心和512 GB的RAM。但是,我们的代码运行得真的很慢。传输600万个数据需要10分钟才能完成。 60个文件中的10 MB在一秒钟内传输到HDFS。我们正在努力优化我们的代码,但很明显我们做了一些非常错误的事情。 对于蜂巢表,我们有64个桶。 在HDFS喷口;.setmaxextending(50000); 在蜂巢喷口选项;.WithTxNsperBatch

  • 如何为storm拓扑提供自定义配置?例如,如果我构建了一个连接到MySQL集群的拓扑,并且我希望能够更改需要连接到哪些服务器而不需要重新编译,我将如何做到这一点?我更喜欢使用配置文件,但我担心文件本身没有部署到集群中,因此它不会运行(除非我对集群工作方式的理解有缺陷)。到目前为止,我所看到的在运行时将配置选项传递到storm拓扑的唯一方法是通过命令行参数,但当您获得大量参数时,这将是混乱的。 有一

  • 因此,在下面的配置中,当我们将Spring Boot容器扩展到10个JVM时,事件的数量随机多于发布的数量,例如,如果发布了320000条消息,那么事件有时会达到320500条等等。。 更新以下更改现在似乎工作正常,我将在消费者中添加重复检查,以防止消费者失败的情况

  • 我是阿帕奇Storm的新手。我已经在intellij中用java创建了一个storm项目,它成功地创建了一个本地集群,并将拓扑提交给它,然后在本地运行。我想在亚马逊EC2上运行这个Storm项目。我跟踪了https://github.com/nathanmarz/storm-deploy/wiki链接。跟随链接成功发射了2个主管,1个动物园管理员和1个灵光。现在我想在服务器上运行我的拓扑。这是我在

  • 我读了很多和Storm有关的网站。但我仍然无法将拓扑结构完美地映射到Storm集群中。 请帮助我理解这一点。 在Storm集群中有这样的术语 null null null 所有这些都要用Storm集群来映射。我已经在一个项目里工作了。所以我知道拓扑结构。