我用的是Kafka。请在下面找到测试程序。
我正在使用Storm 0.8.1。在Storm 0.8.2中存在多方案类。我会用那个。我只想知道早期版本是如何通过实例化String计划()类来工作的?我在哪里可以下载早期版本的Kafka喷口?但是我怀疑这是一个正确的选择,而不是在Storm 0.8.2上工作。???(困惑)
当我在暴风集群上运行代码(如下所示)时(即当我推我的拓扑时),我得到以下错误(当方案部分被注释时会发生这种情况,当然我会得到编译器错误,因为类在0.8.1中不存在):
java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme
at storm.kafka.TestTopology.main(TestTopology.java:37)
Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme
在下面给出的代码中,您可以找到spoutConfig。scheme=新StringScheme();部分评论。如果我不注释那一行,我会得到编译器错误,这是很自然的,因为里面没有构造函数。另外,当我实例化MultiScheme时,我得到一个错误,因为我在0.8.1中没有这个类。
public class TestTopology {
public static class PrinterBolt extends BaseBasicBolt {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.toString());
}
}
public static void main(String [] args) throws Exception {
List<HostPort> hosts = new ArrayList<HostPort>();
hosts.add(new HostPort("127.0.0.1",9092));
LocalCluster cluster = new LocalCluster();
TopologyBuilder builder = new TopologyBuilder();
SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID");
spoutConfig.zkServers=ImmutableList.of("localhost");
spoutConfig.zkPort=2181;
//spoutConfig.scheme=new StringScheme();
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
builder.setSpout("spout",new KafkaSpout(spoutConfig));
builder.setBolt("printer", new PrinterBolt())
.shuffleGrouping("spout");
Config config = new Config();
cluster.submitTopology("kafka-test", config, builder.createTopology());
Thread.sleep(600000);
}
我也有同样的问题。最终解决了这个问题,我将完整的运行示例放在github上。
欢迎你来这里看看
(单击storm kafka目录中的示例程序,该程序将帮助您启动并运行)。
这里可能发生了同样的事情:错误backtype.storm.util-Async循环死亡!BufferUnderFlowException:null,但我将添加一个完整的堆栈跟踪和一些更多的上下文。 Storm版本-9.3 Storm-Kafka版本-9.3 Kafka版本-0.8.2-beta 堆栈跟踪: Spout代码(注意,出于调试目的,我使用的是一个静态定义的分区映射,只有一个代理):
我想知道是否有任何Kafka喷口支持安全的Kafka经纪人。apache storm的KafkaSpout不支持SSL Kafka。 下面提到的Kafka不接受SSL Kafka生产者/消费者支持的任何参数。 请让我知道有没有任何方法,我们可以实现安全的Kafka消息流处理与Storm拓扑。
我用Kafka壶嘴来消费信息。但是,如果我必须更改拓扑并上传,那么它将从旧消息恢复还是从新消息开始?Kafka壶嘴给了我们从哪里消费的时间戳,但我怎么知道时间戳呢?
我使用storm0.9.4和storm-kafka:0.9.0-wip16a-scala292作为从kafka0.7读取的依赖项。 我们的Kafka保留政策是7天。 我从经纪人的最新偏移量开始读取。
Spout被配置为从zookeeper读取最后的提交偏移量,并且在此场景中,该偏移量大于Kafka中最新的消息偏移量。我们也在研究为什么主题偏移被重置。 目前我们通过观察Storm日志中的范围外警告来解决这个问题,删除zookeeper偏移条目,然后重新部署拓扑。
我无法找到正确集成Kafka和Apache Storm Trident的好文档。我试图查看相关的问题之前张贴在这里,但没有充分的信息。 这样,我就可以为我的拓扑生成流,如下面的代码所示 虽然我提供了并行性和我的分区,但是只有一个Kafka Spout的执行器在运行,因此我无法很好地扩展它。 有谁能指导我更好地将Apache Storm Trident(2.0.0)与Apache Kafka(1.0