编辑:我向Bolt添加了一个。ack()(这要求我使用一个丰富的Bolt而不是基本的Bolt)并且遇到了同样的问题--没有任何信息告诉我Bolt正在处理元组。
如果有关系的话,我会在EC2实例上的CentOS映像上运行这个。如有任何帮助,不胜感激。
查看生成的Storm worker日志,我看到这一行:从以下内容读取分区信息:/helloworld spout/partition_0-->{“topic”:“helloworld”、“partition”:0、“topology”:{“id”:“
下面几行如下:
s.k.PartitionManager [INFO] Last commit offset from zookeeper: 0
s.k.PartitionManager [INFO] Commit offset 0 is more than 9223372036854775807 behind, resetting to startOffsetTime=-2
s.k.PartitionManager [INFO] Starting Kafka ip-10-0-0-35.ec2.internal:0 from offset 0
s.k.ZkCoordinator [INFO] Task [1/1] Finished refreshing
s.k.ZkCoordinator [INFO] Task [1/1] Refreshing partition manager connections
s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=ip-10-0-0-35.ec2.internal:6667}}
工作日志的其余部分没有显示螺栓处理的消息的日志/打印。我不明白为什么螺栓似乎没有从Kafka集群得到任何信息。任何帮助都会很好。谢了。
private static KafkaSpout setupSpout() {
BrokerHosts hosts = new ZkHosts("localhost:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "helloworld", "", "HelloWorld Spout");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.forceFromStart = true;
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
return new KafkaSpout(spoutConfig);
}
构建拓扑并提交
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("Kafka Spout", setupSpout());
builder.setBolt("Echo Bolt", new SystemOutEchoBolt());
try {
System.setProperty("storm.jar", "/tmp/storm.jar");
StormSubmitter.submitTopology("Kafka-Storm test", new Config(), builder.createTopology());
} //catchExceptionsHere
}
螺栓
public class SystemOutEchoBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(SystemOutEchoBolt.class);
private OutputCollector m_collector;
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map _map, TopologyContext _conetxt, OutputCollector _collector) {
m_collector = _collector;
}
@Override
public void execute(Tuple _tuple) {
System.out.println("Printing tuple with toString(): " + _tuple.toString());
System.out.println("Printing tuple with getString(): " + _tuple.getString(0));
logger.info("Logging tuple with logger: " + _tuple.getString(0));
m_collector.ack(_tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer _declarer) {}
}
答案很简单。我从来没有告诉螺栓订阅哪条流。添加.shufflegrouping(“Kafka spout”);
修复了这个问题。
因此,如果您使用基于JUnit的单元测试,是否建议您运行一个小型模拟拓扑(?)并测试该拓扑下的(或)的隐含契约?或者,是否可以使用JUnit,但这意味着我们必须仔细模拟Bolt的生命周期(创建它、调用、嘲弄等)?在这种情况下,被测类(螺栓/喷口)有哪些一般的测试点需要考虑? 其他开发人员在创建正确的单元测试方面做了什么? 我注意到有一个拓扑测试API(参见:https://github.com/x
我对Apache Storm有一个奇怪的问题。我有一个Kafka连接到Kafka集群,里面有10条消息。 螺栓接收每条消息并正确处理,因为在Storm UI中,螺栓被列为“已确认”。然而,storm UI下面列出的喷口表示所有元组都失败了。 我相信这会导致喷口再次发出所有的信息。。。因此,我看到一个Storm螺栓打印出消息1-10,然后以相同的顺序一次又一次地打印出来。 我适当地调用了和方法,我只
在我的拓扑中,当元组从spout转移到bolt或从bolt转移到bolt时,我看到大约1-2 ms的延迟。我使用纳秒时间戳来计算延迟,因为整个拓扑运行在单个Worker中。拓扑是在集群中运行的,集群运行在具有生产能力的硬件中。 根据我的理解,在这种情况下,元组不需要序列化/反序列化,因为所有东西都在单个JVM中。我已经将大多数喷流和螺栓的并行性提示设置为5,并且喷流仅以每秒100的速率产生事件。我
来自Kafka喷泉的Storm--有条件消耗的溪流? 我如何获得这个bolt中数据中所有字段的模式,而不是基本上重新解析所有数据并重新创建它?
我正在本地开发一个Storm拓扑。我正在使用Storm 0.9.2孵化,并开发了一个简单的拓扑。当我使用LocalCluster()选项部署它时,它工作得很好,但它不会显示在我的Storm UI中,它只是执行而已。 当我定期部署它时,它会在我的Storm UI中显示拓扑结构,但当我单击它时,不会看到喷口或螺栓。 我还尝试了许多Storm启动项目中的示例WordCountTopology。同样的行为
我有一个非常简单的Storm螺栓,从Kafka喷口输入,应该只是写到标准输出。它扩展了BaseRichBolt。有关的两种方法是: