当前位置: 首页 > 知识库问答 >
问题:

Apache Storm 1.1.0未运行,无法从客户端sessionid读取其他数据

娄飞鸾
2023-03-14

我正在Apache Storm 1.1.0中运行一个简单的Hello World应用程序。应用程序有一个随机整数喷口和一个打印元组输出的螺栓。但不知何故,我无法让它在我的windows系统上工作。

public static void runTopology() {
    //String filePath = "./src/main/resources/operations.txt";
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("randomNumberSpout", new RandomIntSpout());
    builder.setBolt("printingBolt", new PrintingBolt()).shuffleGrouping("randomNumberSpout");

    Config config = new Config();
    config.setDebug(true);
    LocalCluster cluster = new LocalCluster();
    try{
        cluster.submitTopology("Test", config, builder.createTopology());
    }finally{
        cluster.shutdown();
    }
}
public class PrintingBolt extends BaseBasicBolt {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        System.out.println("Printing Tupple!!!!");
        System.out.println(tuple);
        System.out.println("Tupple processed " + tuple.getInteger(1));
        basicOutputCollector.emit(new Values(tuple.getInteger(1)));
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("TestOutput"));

    }
}
public class RandomIntSpout extends BaseRichSpout {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    private Random random;
    private SpoutOutputCollector outputCollector;

    /*@Override
    public void open(Map<String,Object> map, TopologyContext topologyContext,
      SpoutOutputCollector spoutOutputCollector) {
        random = new Random();
        outputCollector = spoutOutputCollector;
    }*/

    public void nextTuple() {
        Utils.sleep(1000);
        outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis()));
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp"));
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            random = new Random();
            outputCollector = collector;
    }
}

我可以提供其余的代码以及,但我不认为这将是必要的。如果需要请在评论中提到,我也会提供。

当我尝试运行应用程序时,我会得到以下错误。

10620[main]信息O.A.S.S.O.A.Z.Zookeeper-启动客户端连接,connectstring=localhost:2000/storm sessiontimeout=20000>watcher=org.apache.storm.shade.org.apache.curator.connectionstate@31b0f02 10625[main-sendthread(0:0:0:0:0:0:0:0:1:2000)]信息O.A.S.S.O.A.Z.ClientCNXN-打开到服务器的套接字连接将不尝试使用SASL(未知错误)10627[Main-SendThread(0:0:0:0:0:0:0:1:2000)]信息O.A.S.S.O.A.Z.ClientCNXN-建立到>0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:1:2000的套接字连接,启动会话10627[NIOServerCXN.工厂:0.0.0.0/0.0.0.0:2000]信息O.A.S.S.O.A.Z.S.NIOServerCNXN工厂-接受的套接字连接来自/>0:0:0:0:0:0:0:1:56905 10628[NIOServerCXN工厂:0.0.0.0/0.0.0.0:2000]信息O.A.S.S.O.A.Z.S.ZOOKEEPERServer-客户端试图在/>0:0:0:0:0:0:0:0:1:1:56905 10631[Main-SendThread(0:0:0:0:0:0:0:1:2000)]建立新会话信息O.A.S.S.O.A.Z.ClientCNXN-服务器上完成会话建立>0:0:0:0:0:0:0:0:0:0:1:2000,sessionid=0x16A8E5ABD97000D,协商超时=20000 10631[SyncThread:0]信息O.A.S.S.O.A.Z.S.ZookeePerServer-建立的会话0x16A8E5ABD97000D,客户端协商超时20000/>0:0:0:0:0:0:0:0:1:56905

我不明白为什么客户端套接字是关闭的,为什么会话是关闭的?我不能让它工作。请帮忙。

共有1个答案

刘高驰
2023-03-14

我想你可能需要在这里多睡一觉

try{
        cluster.submitTopology("Test", config, builder.createTopology());
        //Sleep here
    }finally{
        cluster.shutdown();
    }

当前您正在提交拓扑,并立即关闭。除非您Rest一下,否则您的拓扑没有机会运行。

 类似资料:
  • 为方便测试,我们以RPC中的例子来实现服务端,具体请看文档RPC章节。 纯原生异步 public static function mainServerCreate(ServerManager $server,EventRegister $register): void { // TODO: Implement mainServerCreate() method.

  • 我构建了一个实现Selenium驱动程序的ASP.NET web应用程序。我的问题是所有的测试都在本地机器上运行良好,但是在部署并作为客户端登录到我的网站并开始测试之后,驱动程序既不在服务器机器上也不在客户端机器上启动或做出任何反应 日志文件的一些 #软件:Microsoft Internet Information Services 8.5#版本:1.0#日期:2017-08-15 02:24:

  • 您可以get get client-go的一个版本,例如get get k8s.io/client-go/1.4/...或者获取k8s.io/client-go/1.4/kubernetes。 现在当我这么做的时候。这是输出- 警告:“k8s.io/client-go/1.4/...”匹配的无包无法加载包:包。:/users/shubhadeeproychowdhury/projects/Go/s

  • 反应阿波罗客户端应用程序。我试图使用readQuery从缓存中读取数据,但无法读取缓存字段。 这里有一个纲要: 组件调用,它执行我的查询并缓存结果。查询返回类型。一切都很好,开发工具显示包含一个名为getPost的字段和该帖子。 有一个子组件,它会在单击时删除评论。它调用并删除评论。查询返回类型。 Post有一个注释数组,现在我需要在缓存中更新它的数组并删除已删除的注释。我使用突变中的函数来获取缓

  • 我有一个Tcp客户端,连接到一个旧的主机(52年),发送和接收来自它的请求和响应。 这是我的客户机的核心连接部分, 我试图用Netty重写下面的文章。通过使用以下教程作为参考。 http://tutorials.jenkov.com/netty/netty-tcp-client.html 我面临的问题是我能够连接到服务器,但不能从中读写。我正在使用一个来执行读写操作。 这是我的脾气暴躁的客户 处理

  • 我正在尝试通过 tcp 连接远程执行程序,我想在客户端之间实时共享标准输出和标准输出 我有以下没有错误处理的测试服务器:p我知道,目前我无法执行带有参数的程序,但这很容易处理:) 你看,我尝试与 c.Write() 共享标准输出,但这不起作用。 我认为cmd.Stdin的另一个问题将与Stdout的问题相同。此时我没有实现任何标准函数。 有人能给我一个关于这个函数的提示或示例代码吗?