当前位置: 首页 > 知识库问答 >
问题:

在Apache storm中提交字数拓扑时出错

秦鸿羽
2023-03-14

这是我尝试运行的基本wordcount拓扑。但我收到的错误是“信息组织”。阿帕奇。暴风雨动物园管理员。服务器SessionTrackerImpl-SessionTrackerImpl已退出循环!'。有人能帮我吗??

当我移除集群时。shutdown(),在我按下cntrl c之前,tweets会一直出现。wordcount也不会显示##

     import java.util.Arrays;
     import backtype.storm.Config;
     import backtype.storm.LocalCluster;
     import backtype.storm.topology.TopologyBuilder;
     import backtype.storm.tuple.Fields;
     public class TwitterHashtagStorm {

    public static void main(String[] args) throws Exception {
    String consumerKey = "************";
    String consumerSecret = "***************";
    String accessToken = "**********";
    String accessTokenSecret = "***********";
    String[] keyWords = {"apple"};
    Config config = new Config();
    config.setDebug(true);
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
            consumerSecret, accessToken, accessTokenSecret, keyWords));
    builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
            .shuffleGrouping("twitter-spout");
    builder.setBolt("twitter-hashtag-counter-bolt",
            new HashtagCounterBolt()).fieldsGrouping(
            "twitter-hashtag-reader-bolt", new Fields("hashtag"));
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("TwitterHashtagStorm", config,
            builder.createTopology());
    Thread.sleep(10000);
    cluster.shutdown();
}
}

共有1个答案

孙成益
2023-03-14

10秒(10000毫秒)可能不足以建立推特连接,也不足以让推特进入您的拓扑结构。您应该将睡眠呼叫设置为更长的时间(至少几分钟)。

至于显示工作计数,HashTagCounter是否将计数打印出来?如果是这样,打印输出可能会丢失在Storm的日志消息中。尝试设置config。setDebud(false)(减少日志消息并让您有机会看到计数)或重写HashTagCounter以将消息发送到与您运行Storm的控制台分离的另一个位置(消息代理、本地套接字接收器等)。

 类似资料:
  • 当我站在上面提交拓扑时,我在storm jar中遇到了问题 我写 打出来的 没有找到Storm指令 我想知道怎么了? Storm罐之间的区别是什么。。。。和 都在提交拓扑?

  • 我一直在使用IDE(Eclipse)测试Storm拓扑的远程提交。我成功地将简单的storm拓扑上传到了远程storm集群,但奇怪的是,当我检查storm UI以确定远程提交的拓扑是否正常工作时,我在UI中看到了just_acker bolt,但其他bolt和spout却不在那里。之后,我从命令行手动提交了拓扑,并再次检查了Storm UI,它正常工作,没有问题。我一直在找原因,但没有找到。我在下

  • 我正在设计一个ApacheStorm拓扑(使用streamparse),它由一个喷口(ApacheKafka喷口)和一个具有并行性的螺栓构建 螺栓分批读取信息。如果批量成功完成,我手动提交apache kafka偏移。 当mysql上的螺栓插入失败时,我不会在Kafka中提交偏移量,但是一些消息已经在喷口发送到螺栓的消息队列中。 应该删除队列中已经存在的消息,因为我无法在不丢失先前失败消息的情况下

  • 我试图在storm Clustre上部署一个简单的字数拓扑。我使用kafka作为输入(kafka Spout)。这是我得到的错误 java.lang.noClassDefFoundError:无法初始化类org.apache.log4j.log4jLogger.getLogger(logger.java:39)在kafka.utils.logging$class.logger(logger.sca

  • 问题内容: 我在使用IDE向生产集群提交拓扑时遇到了一个问题,而如果我在命令行中使用command 执行同样的事情,它的运行就像天堂一样。我从githublink看到了同样的例子。 为了提交拓扑,我正在使用这些行集 请建议我这是否是运行的正确方法? 问题答案: 很好找到解决方案。当我们运行“ storm jar”时,它将在提交的jar中触发storm.jar的属性标志。因此,如果我们要以编程方式提

  • > 灵光升起 StormUI启动 我使用的两个工人都起来了 Zookeper已启动 我和暴风一起跑 Storm罐myjar.jar MyClass Nimbus提交拓扑 该拓扑的日志文件不会出现在workers中。 我在supervisor.log上的worker中有以下日志: 所以我确信我与nimbus有连接问题,但是worker中的属性文件是: 错误在哪里,我如何修复它? 谢了!