当前位置: 首页 > 知识库问答 >
问题:

Storm-Kafka喷口慢慢消费

刘琨
2023-03-14

我只是在试用这里提到的kafka-storm喷口https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka,我使用的配置如下所述。

    BrokerHosts brokerHosts = KafkaConfig.StaticHosts.fromHostString(
            ImmutableList.of("localhost"), 1);
    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, // list of Kafka
            "test", // topic to read from
            "/kafkastorm", // the root path in Zookeeper for the spout to
            "discovery"); // an id for this consumer for storing the
                            // consumer offsets in Zookeeper
    spoutConfig.scheme = new StringScheme();
    spoutConfig.stateUpdateIntervalMs = 1000;


    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

    TridentTopology topology = new TridentTopology();
    InetSocketAddress inetSocketAddress = new InetSocketAddress(
            "localhost", 6379);
    TridentState wordsCount = topology
            .newStream(SPOUT_FIRST, kafkaSpout)
            .parallelismHint(1)
            .each(new Fields("str"), new TestSplit(), new Fields("words"))
            .groupBy(new Fields("words"))
            .persistentAggregate(
                    RedisState.transactional(inetSocketAddress),
                    new Count(), new Fields("counts")).parallelismHint(100);

    Config conf = new Config();
    conf.setMaxTaskParallelism(200);
    // conf.setDebug( true );
    // conf.setMaxSpoutPending(20);

    // This topology can only be run as local because it is a toy example
    LocalDRPC drpc = new LocalDRPC();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("symbolCounter", conf, topology.build());

但是,上面的喷子从Kafka主题中获取消息的速度大约是每秒7000条,但我预计每秒大约有50000条消息。我尝试了在spoutConfig中增加提取缓冲区大小的各种选项,但没有看到任何结果。

有没有人面临类似的问题,他无法通过storm以制作人制作信息的速度获取Kafka的主题?

共有1个答案

杜嘉木
2023-03-14

我将config中的“topology.spout.max.batch.size”值更新为64*1024值,然后Storm处理变得更快。

 类似资料:
  • 我使用storm0.9.4和storm-kafka:0.9.0-wip16a-scala292作为从kafka0.7读取的依赖项。 我们的Kafka保留政策是7天。 我从经纪人的最新偏移量开始读取。

  • 这里可能发生了同样的事情:错误backtype.storm.util-Async循环死亡!BufferUnderFlowException:null,但我将添加一个完整的堆栈跟踪和一些更多的上下文。 Storm版本-9.3 Storm-Kafka版本-9.3 Kafka版本-0.8.2-beta 堆栈跟踪: Spout代码(注意,出于调试目的,我使用的是一个静态定义的分区映射,只有一个代理):

  • 我想知道是否有任何Kafka喷口支持安全的Kafka经纪人。apache storm的KafkaSpout不支持SSL Kafka。 下面提到的Kafka不接受SSL Kafka生产者/消费者支持的任何参数。 请让我知道有没有任何方法,我们可以实现安全的Kafka消息流处理与Storm拓扑。

  • 我用的是Kafka。请在下面找到测试程序。 我正在使用Storm 0.8.1。在Storm 0.8.2中存在多方案类。我会用那个。我只想知道早期版本是如何通过实例化String计划()类来工作的?我在哪里可以下载早期版本的Kafka喷口?但是我怀疑这是一个正确的选择,而不是在Storm 0.8.2上工作。???(困惑) 当我在暴风集群上运行代码(如下所示)时(即当我推我的拓扑时),我得到以下错误(

  • 编辑:我向Bolt添加了一个。ack()(这要求我使用一个丰富的Bolt而不是基本的Bolt)并且遇到了同样的问题--没有任何信息告诉我Bolt正在处理元组。 如果有关系的话,我会在EC2实例上的CentOS映像上运行这个。如有任何帮助,不胜感激。 查看生成的Storm worker日志,我看到这一行: 下面几行如下: 工作日志的其余部分没有显示螺栓处理的消息的日志/打印。我不明白为什么螺栓似乎没