我有两个卡夫卡喷口,我要将其值发送到同一螺栓。
可能吗 ?
是的,有可能:
TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new KafkaSpout(...));
b.setSpout("topic_2", new KafkaSpout(...));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1").shuffleGrouping("topic_2");
您也可以使用任何其他分组。
更新:
为了区分使用者螺栓中的元组(即topic_1或topic_2),有两种可能性:
1)您可以使用操作员ID(如@ user-4870385所建议):
if(input.getSourceComponent().equalsIgnoreCase("topic_1")) {
//do something
} else {
//do something
}
2)您可以使用流名称(@zenbeni建议)。对于这种情况,两个喷口都需要声明命名流,而螺栓需要通过流名称连接到喷口:
public class MyKafkaSpout extends KafkaSpout {
final String streamName;
public MyKafkaSpout(String stream) {
this.streamName = stream;
}
// other stuff omitted
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// compare KafkaSpout.declareOutputFields(...)
declarer.declare(streamName, _spoutConfig.scheme.getOutputFields());
}
}
构建拓扑,现在需要使用流名称:
TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new MyKafkaSpout("stream_t1"));
b.setSpout("topic_2", new MyKafkaSpout("stream_t2"));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1", "stream_t1").shuffleGrouping("topic_2", "stream_t2");
MyBolt
现在可以在流名称中区分输入元组:
// in my MyBolt.execute():
if(input.getSourceStreamId().equals("Topic1")) {
// do something
} else {
// do something
}
讨论:
虽然使用流名称的 第二种 方法更自然(根据@zenbeni),但 第 一种方法更灵活(IHMO)。流名称直接由spout /
bolt声明(即,在编写spout / bolt代码时);与此相反,当拓扑放在一起操作者ID分配(即,在时间喷口/螺栓 使用 )。
假设我们得到三个螺栓作为类文件(没有源代码)。前两个应该用作生产者,并且都用相同的名称声明输出流。如果第三个使用者按流区分输入元组,则此方法将无效。即使两个给定的生产者螺栓都声明了不同的输出流名称,预期的输入流名称也可能在使用者螺栓中进行了硬编码,并且可能不匹配。因此,它也不起作用。但是,如果使用者螺栓使用组件名称(即使它们是硬编码的)来区分传入的元组,则可以正确分配期望的组件ID。
当然,有可能从给定的类继承(如果未声明final
并进行覆盖declareOutputFields(...)
,以便分配自己的流名称。但是,这是要做的更多工作。
我是阿帕奇Storm的新手。 请救命!! 我的拓扑: TopologyBuilder builder=new TopologyBuilder(); 这里,我试图从事件流中获取长度为3的窗口中的第一个和最后一个事件。但是我得到的第一个和最后一个事件是一样的,因为KafkaSpout一次只发送一个元组。
我需要我的logstash conf文件向一个kafka主题发送一条消息,以指示已处理的文档已发送到ElasticSearch。我已经准备好了logstash文件来构造发送到ElasticSearch的数据,但是我需要通过同一个logstash文件向kafka主题发布“是”或“否”消息。
问题内容: 我的AngularJS应用程序中有一个页面,我想在其中包含相同的html部分,但具有不同的变量。如果我在我的主要这样做: 而看起来像 两者都会看起来像 我想这与以下事实有关:ng-includes也需要相同的名称。那么,如何将不同的变量发送到每个不同的include? 问题答案: 每次加载新的部分时,传递给的表达式都会求值。在这种情况下,您将值更改为两次,因此在加载两个部分时,当前值将
我发送JSON文件(现在从邮递员)的身体是这样的: 控制器上的方法是: 我犯了个错误- 如何解决这个问题?
嗨,我写了一个mapreduce作业,它一般解析XML文件。我能够解析XML文件并正确生成所有键值对。我有6个不同的键和相应的值。所以我并行运行6个不同的减速器。 现在我面临的问题是,reducer将两个不同的键 - 值对放在同一个文件中,其余4个键值放在单个文件中。因此,简而言之,在化简器输出的6个文件中,我得到了4个具有单键值对的文件和1个具有两个键值对的文件以及1个没有任何内容的文件。 我尝
我们有两个不同的ASP.NET应用程序启用了Log4net日志记录。它们都有相同的log4net1.2.10.0版本。