当前位置: 首页 > 知识库问答 >
问题:

使用Kafka喷口的Kafka Storm集成

云新知
2023-03-14

我用的是Kafka。请在下面找到测试程序。

我正在使用Storm 0.8.1。在Storm 0.8.2中存在多方案类。我会用那个。我只想知道早期版本是如何通过实例化String计划()类来工作的?我在哪里可以下载早期版本的Kafka喷口?但是我怀疑这是一个正确的选择,而不是在Storm 0.8.2上工作。???(困惑)

当我在暴风集群上运行代码(如下所示)时(即当我推我的拓扑时),我得到以下错误(当方案部分被注释时会发生这种情况,当然我会得到编译器错误,因为类在0.8.1中不存在):

java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme
        at storm.kafka.TestTopology.main(TestTopology.java:37)
Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme

在下面给出的代码中,您可以找到spoutConfig。scheme=新StringScheme();部分评论。如果我不注释那一行,我会得到编译器错误,这是很自然的,因为里面没有构造函数。另外,当我实例化MultiScheme时,我得到一个错误,因为我在0.8.1中没有这个类。

public class TestTopology {
    public static class PrinterBolt extends BaseBasicBolt {
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }

        public void execute(Tuple tuple, BasicOutputCollector collector) {
            System.out.println(tuple.toString());
        }
    }

    public static void main(String [] args) throws Exception {
        List<HostPort> hosts = new ArrayList<HostPort>();
        hosts.add(new HostPort("127.0.0.1",9092));
        LocalCluster cluster = new LocalCluster();
        TopologyBuilder builder = new TopologyBuilder();
        SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID");
        spoutConfig.zkServers=ImmutableList.of("localhost");
        spoutConfig.zkPort=2181;
        //spoutConfig.scheme=new StringScheme();
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        builder.setSpout("spout",new KafkaSpout(spoutConfig));
        builder.setBolt("printer", new PrinterBolt())
                .shuffleGrouping("spout");
        Config config = new Config();

        cluster.submitTopology("kafka-test", config, builder.createTopology());

        Thread.sleep(600000);
    }

共有2个答案

乐正穆冉
2023-03-14

我们也有类似的问题。

我们的解决方案:

>

  • 打开pom.xml

    将范围从提供更改为

    如果您想了解更多关于依赖作用域的信息,请查看maven docu: Maven docu-依赖作用域

  • 段超
    2023-03-14

    我也有同样的问题。最终解决了这个问题,我将完整的运行示例放在github上。

    欢迎你来这里看看

    (单击storm kafka目录中的示例程序,该程序将帮助您启动并运行)。

     类似资料:
    • 这里可能发生了同样的事情:错误backtype.storm.util-Async循环死亡!BufferUnderFlowException:null,但我将添加一个完整的堆栈跟踪和一些更多的上下文。 Storm版本-9.3 Storm-Kafka版本-9.3 Kafka版本-0.8.2-beta 堆栈跟踪: Spout代码(注意,出于调试目的,我使用的是一个静态定义的分区映射,只有一个代理):

    • 我想知道是否有任何Kafka喷口支持安全的Kafka经纪人。apache storm的KafkaSpout不支持SSL Kafka。 下面提到的Kafka不接受SSL Kafka生产者/消费者支持的任何参数。 请让我知道有没有任何方法,我们可以实现安全的Kafka消息流处理与Storm拓扑。

    • 我用Kafka壶嘴来消费信息。但是,如果我必须更改拓扑并上传,那么它将从旧消息恢复还是从新消息开始?Kafka壶嘴给了我们从哪里消费的时间戳,但我怎么知道时间戳呢?

    • 我使用storm0.9.4和storm-kafka:0.9.0-wip16a-scala292作为从kafka0.7读取的依赖项。 我们的Kafka保留政策是7天。 我从经纪人的最新偏移量开始读取。

    • Spout被配置为从zookeeper读取最后的提交偏移量,并且在此场景中,该偏移量大于Kafka中最新的消息偏移量。我们也在研究为什么主题偏移被重置。 目前我们通过观察Storm日志中的范围外警告来解决这个问题,删除zookeeper偏移条目,然后重新部署拓扑。

    • 我无法找到正确集成Kafka和Apache Storm Trident的好文档。我试图查看相关的问题之前张贴在这里,但没有充分的信息。 这样,我就可以为我的拓扑生成流,如下面的代码所示 虽然我提供了并行性和我的分区,但是只有一个Kafka Spout的执行器在运行,因此我无法很好地扩展它。 有谁能指导我更好地将Apache Storm Trident(2.0.0)与Apache Kafka(1.0