当前位置: 首页 > 知识库问答 >
问题:

暴风不受控制元组乘法

颜黎昕
2023-03-14

我试图把Kafka数据通过Storm在hdfs和Hive。我在和HortonWorks合作。因此,我有以下结构,正如在许多教程(http://henning.kropponline.de/2015/01/24/hive-streaming-with-storm/)中看到的那样(稍加修改):

 TopologyBuilder builder = new TopologyBuilder();

 builder.setSpout("kafka-spout", kafkaSpout);

 builder.setBolt("hdfs-bolt", hdfsBolt).globalGrouping("kafka-spout");

 builder.setBolt("parse-bolt", new ParseBolt()).globalGrouping("kafka-spout");

 builder.setBolt("hive-bolt", hiveBolt).globalGrouping("parse-bolt");  

我将kafka-spout数据直接发送到hdfs-bolt,当我只使用hdfs-bolt时,它是工作的。当我添加parse-bolt来解析kafka-data并将其发送到hive-bolt时,整个系统会变得疯狂。即使我只是通过kafka发送一条消息,这个消息也会被kafka-spout复制无限次,并写入hdfs infinite。

如果parse-bolt中出现错误,hdfs-bolt不应该还正常工作吗?我对这个话题是新认识的,有人能看出一个简单的初学者的错误吗?我很感激任何建议。

共有1个答案

钱雅逸
2023-03-14

你在两个Bolt行刑结束时都在收留言吗?

当您从同一流从您的kafka-spout读取时,消息将锚定到同一spout但具有唯一的MessageID。因此,本质上即使您的parse-bolt元组失败,由于它锚定到同一个spout,它将在spout处被重播。这将导致另一个具有不同messageId但相同内容的元组为订阅它的所有bolt播放,在您的例子中是parse-bolt和hdfs-bolt。请记住,重播发生在Spout,因此从Spout订阅到该流的所有内容都将得到冗余消息。

 类似资料:
  • 我有一个EvaluationBolt(用于内存监视),我希望确保每个工作进程上运行一个执行器(在我的例子中,每个物理节点运行一个执行器,即supervisor.slots.ports只配置为端口6700)。在题目上我发现了这个问题: 干杯,孙铁麟

  • 问题内容: 有没有一种干净的方法可以阻止风暴,而又不使用“ kill XXX”杀死风暴,其中XXX是PID? 我运行“ storm kill topology- name”杀死拓扑,但是在那之后,有没有一种干净的方法来关闭worker,nimbus,supervisor和ui? 我没有从文档中找到与此命令相对应的任何命令:https : //github.com/nathanmarz/storm/

  • 我已经开始使用storm,所以我使用本教程创建简单的拓扑 我的嘴是这样的 我的螺栓是这样的

  • 在大多数情况下,我们推荐使用受控组件来实现表单。在受控组件中,表单数据由React组件处理。另外一个可选项是不受控组件,其表单数据由DOM元素本身处理。 不同于对每次状态处理都需要编写事件处理函数程序,在不受控组件中,你可以使用ref从DOM获得表单数据。 例如,在不受控组件中,以下代码可以输入名字: class NameForm extends React.Component { const

  • 利用 cocos2d 制作的一款休闲游戏。游戏中有九种不同的卡通水果,您可以拖动整行或整列上的水果,或交换屏幕中两个相邻水果的位置,一条直线上的三个或三个以上的相同水果将会消失,而您将得到分数。当您的分数逐步提高后,将会进入更高难度的关卡,接受更高难度的挑战。 游戏中允许使用一些道具。 [Code4App.com]