然而,我在激活时不断得到错误,然后监视拓扑。下面是我实现的Storm拓扑的源代码:
BrokerHosts hosts = new ZkHosts("127.0.0.1:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "bytes", "/kafkastorm/", "bytes" + UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.zkServers = Arrays.asList("127.0.0.1");
spoutConfig.zkPort = 2181;
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("bytes", kafkaSpout);
builder.setBolt("byteSize", new KafkaByteProcessingBolt()).shuffleGrouping("bytes");
StormTopology topology = builder.createTopology();
Config config = new Config();
StormSubmitter.submitTopology("topology", config, topology);
但是,在执行bin/storm监视器
时,我不断得到的错误消息如下:
Exception in thread "main" java.lang.IllegalArgumentException: stream: default not found
at org.apache.storm.utils.Monitor.metrics(Monitor.java:223)
at org.apache.storm.utils.Monitor.metrics(Monitor.java:159)
at org.apache.storm.command.monitor$_main.doInvoke(monitor.clj:36)
at clojure.lang.RestFn.applyTo(RestFn.java:137)
at org.apache.storm.command.monitor.main(Unknown Source)
然而,通过检查工作者的日志(worker.log文件),我得出结论认为KafkaSpout在open()方法上失败。
java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75) ~[storm-kafka-1.0.2.jar:1.0.2]
at org.apache.storm.daemon.executor$fn__7990$fn__8005.invoke(executor.clj:604) ~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:482) [storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
Caused by: java.lang.ClassNotFoundException: org.apache.curator.RetryPolicy
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_101]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_101]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[?:1.8.0_101]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_101]
... 5 more
从错误“java.lang.noClassDefounderRror:org/apache/curator/retrypolicy”中可以看出curator-client.jar丢失了。
请检查以下链接是否对您有帮助?
https://github.com/abhinavg6/kafka-storm-conscomp/issues/1
我使用的是Storm-kafka-1.1.1-plus和storm 1.1.1。并使用BaseRichBolt、一个KafkaSpout和两个bolts bolt-A、Bolt-B进行配置。元组被锚定在bolt-A中,一旦Bolt-B确认,它将被视为成功处理的元组,并将被提交。但是,问题是由于某种原因,一些失败的消息在KafKaspout中被复制了。 例如: KafkaSpout在处理它时发出了1
我使用一个带有默认KafkaSpout的Storm拓扑(带有Storm 0.10.0),从一个Kafka主题获取JSON数据并对其进行处理。 知道为什么会这样吗?我能提供任何有助于调试此问题的额外信息吗?
嗨,我是新来的斯托姆和Kafka。我使用的是storm 1.0.1和kafka 0.10.0,我们有一个kafkaspout可以接收来自kafka主题的java bean。我花了几个小时来寻找正确的方法。发现很少文章是有用的,但没有一个方法为我工作到目前为止。 KafKaProducer: } Kyro串行器:
2016-07-05 03:59:25.042 O.A.S.D.Executor[INFO]正在处理-2元组的接收消息:源:__System:-1,流:__Tick,ID:{},[30] 2016-07-05 03:59:25.946 O.A.S.D.Executor[INFO]正在处理-2元组的接收消息:源:__System:-1,流:__Metrics_Tick,ID:{},[60] 我的测试
我对Kafka和斯托姆有意见。我现在不确定是我正在设置的KafkaSpout配置有问题,还是我没有正确地进行处理。 我在我的Kafka主题上排了50个条目,但我的喷口发出了1300多个(而且还在计数)元组。此外,the Spout报道说,几乎所有人都“失败了”。拓扑实际上并没有失败,它正在成功地写入数据库,但我只是不知道为什么它明显地重播所有内容(如果它正在这样做的话) 最大的问题是: 下面是我如