当前位置: 首页 > 知识库问答 >
问题:

StormKafka-喷口不能正常工作

哈泰
2023-03-14

一般:我是一个想在Storm/Kafka/Flink/MS Azure SA/Spark上运行一些性能测试(WordCount)的学生。我想使用Kafka经纪人作为输入源。

我使用了Storm-Starter项目中的WordCount示例,并添加了Kafka作为喷口:

    public class WordCountKafkaTopology {
    public static class SplitSentence extends ShellBolt implements IRichBolt {

        public SplitSentence() {
            super("python", "splitsentence.py");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }

        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }

    public static class WordCount extends BaseBasicBolt {
        Map<String, Integer> counts = new HashMap<String, Integer>();

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if (count == null)
                count = 0;
            count++;
            counts.put(word, count);
            collector.emit(new Values(word, count));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

    public static void main(String[] args) {

        String zkIp = "localhost";

        String topicName = "perfTest";

        List<String> nimbus_seeds = new ArrayList<String>();
        nimbus_seeds.add("localhost");

        String zookeeperHost = zkIp +":2181";

        ZkHosts zkHosts = new ZkHosts(zookeeperHost);

        SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, "/" + topicName, topicName);
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);


        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("kafkaPerfTestSpout", kafkaSpout, 8);

        builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("kafkaPerfTestSpout");
        builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

        Config config = new Config();

        config.setMaxTaskParallelism(5);
        config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2);
        config.put(Config.NIMBUS_SEEDS, nimbus_seeds);
        config.put(Config.NIMBUS_THRIFT_PORT, 6627);
        config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
        config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zkIp));

        try {
            StormSubmitter.submitTopology("my-kafka-topology", config, builder.createTopology());
        } catch (Exception e) {
            throw new IllegalStateException("Couldn't initialize the topology", e);
        }
    }

}

我使用kafka-console-producer生成一些消息。我希望有人能帮助我。我是编程Storm的新手...

共有1个答案

皇甫敏达
2023-03-14

正在删除“config.put(config.topology_tick_tuple_freq_secs,2);”完成任务了!

 类似资料:
  • 在我的storm拓扑(有2个喷口和1个bolt)中,其中一个kafka喷口使用者的偏移量正在前进,但MSG没有通过kafka喷口发送到bolt。我可以在storm ui中看到,对于那个特定的喷口,发出和传送的消息是0。所以,我的问题是为什么消费者在前进,我可以看到消费者从zookeeper客户端的抵消逐渐增加。

  • 我正在用Apache Storm 1.1.2和Kafka0.11在Java9中构建一个Spring应用程序 我注意到,在高负载(每秒2500条消息)下,Kafka喷口有一个非常高的滞后。Kafka喷口有一个平行性提示3。滞后几乎等于喷口提交的偏移。 这个滞后设置了拓扑每秒可以摄取的最大消息量的上限,这并不是很大。有人知道解决这个问题的办法吗? 更新:我还注意到,即使有10个工作者和4个并行性提示,

  • 我正试图在按下某个按钮时弹出一个警报对话框。我首先使用了Android Developer的示例代码而不是'这不起作用,所以我根据在这个站点上发现的情况进行了更改,但是现在我的程序在按下按钮后被迫停止。 就你的知识而言,这是在第二个不同于主要的活动中完成的。不确定这是否重要.... ‘ 碰撞日志:“03-25 19:34:24.373:E/AndroidRuntime(18828):致命异常:ma

  • 2,错误{org.apache.directory.server.LDAP.ldapserver}-ERR_171无法将LDAP服务(10,389)绑定到服务注册表。java.net.BindException:已在使用的地址 请帮忙谢谢 --------提示------------------- JAVA_HOME环境变量设置为/opt/java CARBON_HOME环境变量设置为/mnt/1

  • 我正在做一个类似生存的游戏,我有两种类型的碰撞,一种是玩家的敌人,另一种是敌人身上的子弹。我也有一个健康栏,由于某些原因,在picbox被移除后,健康仍然下降,就像敌人与玩家互动一样。 这是子弹碰撞代码的一个块(所有8个方向的所有代码都是相同的) 这是敌方与玩家碰撞的暗号

  • 我正在使用Storm 1.1.2和Kafka 0.11构建一个Java Spring应用程序,将在Docker容器中启动。 我的拓扑中的所有东西都按计划工作,但在Kafka的高负载下,Kafka滞后会随着时间的推移越来越大。 我的KafKaspoutConfig: 那么我的拓扑结构如下