当前位置: 首页 > 面试题库 >

如何将两个不同的Spout的输出发送到同一Bolt?

毋宪
2023-03-14
问题内容

我有两个卡夫卡喷口,我要将其值发送到同一螺栓。

可能吗 ?


问题答案:

是的,有可能:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new KafkaSpout(...));
b.setSpout("topic_2", new KafkaSpout(...));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1").shuffleGrouping("topic_2");

您也可以使用任何其他分组。

更新:

为了区分使用者螺栓中的元组(即topic_1或topic_2),有两种可能性:

1)您可以使用操作员ID(如@ user-4870385所建议):

if(input.getSourceComponent().equalsIgnoreCase("topic_1")) {
    //do something
} else {
    //do something
}

2)您可以使用流名称(@zenbeni建议)。对于这种情况,两个喷口都需要声明命名流,而螺栓需要通过流名称连接到喷口:

public class MyKafkaSpout extends KafkaSpout {
  final String streamName;

  public MyKafkaSpout(String stream) {
    this.streamName = stream;
  }

  // other stuff omitted

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // compare KafkaSpout.declareOutputFields(...)
    declarer.declare(streamName, _spoutConfig.scheme.getOutputFields());
  }
}

构建拓扑,现在需要使用流名称:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new MyKafkaSpout("stream_t1"));
b.setSpout("topic_2", new MyKafkaSpout("stream_t2"));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1", "stream_t1").shuffleGrouping("topic_2", "stream_t2");

MyBolt现在可以在流名称中区分输入元组:

// in my MyBolt.execute():
if(input.getSourceStreamId().equals("Topic1")) {
  // do something
} else {
  // do something
}

讨论:

虽然使用流名称的 第二种 方法更自然(根据@zenbeni),但 一种方法更灵活(IHMO)。流名称直接由spout /
bolt声明(即,在编写spout / bolt代码时);与此相反,当拓扑放在一起操作者ID分配(即,在时间喷口/螺栓 使用 )。

假设我们得到三个螺栓作为类文件(没有源代码)。前两个应该用作生产者,并且都用相同的名称声明输出流。如果第三个使用者按流区分输入元组,则此方法将无效。即使两个给定的生产者螺栓都声明了不同的输出流名称,预期的输入流名称也可能在使用者螺栓中进行了硬编码,并且可能不匹配。因此,它也不起作用。但是,如果使用者螺栓使用组件名称(即使它们是硬编码的)来区分传入的元组,则可以正确分配期望的组件ID。

当然,有可能从给定的类继承(如果未声明final并进行覆盖declareOutputFields(...),以便分配自己的流名称。但是,这是要做的更多工作。



 类似资料:
  • 我是阿帕奇Storm的新手。 请救命!! 我的拓扑: TopologyBuilder builder=new TopologyBuilder(); 这里,我试图从事件流中获取长度为3的窗口中的第一个和最后一个事件。但是我得到的第一个和最后一个事件是一样的,因为KafkaSpout一次只发送一个元组。

  • 我需要我的logstash conf文件向一个kafka主题发送一条消息,以指示已处理的文档已发送到ElasticSearch。我已经准备好了logstash文件来构造发送到ElasticSearch的数据,但是我需要通过同一个logstash文件向kafka主题发布“是”或“否”消息。

  • 问题内容: 我的AngularJS应用程序中有一个页面,我想在其中包含相同的html部分,但具有不同的变量。如果我在我的主要这样做: 而看起来像 两者都会看起来像 我想这与以下事实有关:ng-includes也需要相同的名称。那么,如何将不同的变量发送到每个不同的include? 问题答案: 每次加载新的部分时,传递给的表达式都会求值。在这种情况下,您将值更改为两次,因此在加载两个部分时,当前值将

  • 我发送JSON文件(现在从邮递员)的身体是这样的: 控制器上的方法是: 我犯了个错误- 如何解决这个问题?

  • 嗨,我写了一个mapreduce作业,它一般解析XML文件。我能够解析XML文件并正确生成所有键值对。我有6个不同的键和相应的值。所以我并行运行6个不同的减速器。 现在我面临的问题是,reducer将两个不同的键 - 值对放在同一个文件中,其余4个键值放在单个文件中。因此,简而言之,在化简器输出的6个文件中,我得到了4个具有单键值对的文件和1个具有两个键值对的文件以及1个没有任何内容的文件。 我尝

  • 我们有两个不同的ASP.NET应用程序启用了Log4net日志记录。它们都有相同的log4net1.2.10.0版本。