当前位置: 首页 > 知识库问答 >
问题:

从不执行对流的终端调用

蒋培
2023-03-14

下面是我的通道使用者的accept()方法。为了保护无辜者,一些名字被改了。

@Override
public void accept(final NetChannel channel) {
    log.info("Consuming NetChannel of FullHttpRequest");
    try {
        // Our initial Stream is a single HTTP request containing our XML document.
        Stream requestStream = channel.in();

        requestStream.filter((x) -> {return true;})
                .dispatchOn(dispatcher)
                .map(new FooRequestFunction())                    // 1)
                .flatMap(new BarRequestStreamFunction())          // 2)
                .flatMap(new DownstreamRequestZipFunction())      // 3)
                .toList()                                         // 4)
                .onComplete(new ResponsesConsumer(channel));      // 5)

    } catch (Exception e) {
        log.error("Exception thrown during Channel processing", e);
    }
}

因此,foorequest包装了许多barrequests,每个都有一个关联的Classify请求和一个关联的Validate请求。我们希望1)转换为foorequest,2)将foorequest转换为一系列barrequest,3)为每个barrequest运行两个下游请求,4)将所有barrespession对象聚合为一个整体响应,5)向客户机发送一个响应。

我遇到问题的地方是tolist()方法,它似乎从来没有执行过。每次尝试涉及promise的内容时,似乎总是失败,这次也不例外。

FooRequestFunctionBarRequestStreamFunction相当简单,似乎运行良好。它们的方法签名是:

// FooRequestFunction
public FooRequest apply(final FullHttpRequest request);

和:

// BarRequestStreamFunction
public Stream<BarRequest> apply(FooRequest dsoRequests);

DownStreamRequestZipFunction如下所示:

@Override
public Stream apply(BarRequest t) { 
    Stream classifyRes = Streams
            .just(t)
            .flatMap(new ClassifyDownstreamRequestFunction());

    Stream validateRes = Streams
            .just(t)
            .flatMap(new ValidateDownstreamRequestFunction());

    return Streams.zip(classifyRes, validateRes, (tuple) -> {
        BarResponse response = new BarResponse();
        response.setClassifyRes(tuple.getT1());
        response.setValidateRes(tuple.getT2());
        return response;
    });
}
// ResponsesConsumer
public void accept(Promise<List<BarResponse>> responses)

它所做的是await()响应promise,然后将所有这些响应聚合到一个写回通道的XML文档中。我可以告诉执行从来没有达到这种方法,因为没有一个伐木火灾。这一切似乎都止步于.tolist()。

有人知道为什么这个设置似乎会执行tolist()或之后的任何事情吗?

编辑:好的,我有更多的信息。在为应用程序中的每个线程提供命名约定以使调试更容易之后,我可以看到“shared-1”,运行accept()方法的线程进入等待状态,然后保持在那里。这可能与底层调度程序是一个单线程的ringbuffer调度程序有关。

@Override
public void accept(final NetChannel channel) {
    log.info("Consuming NetChannel of FullHttpRequest");
    try {
        // Our initial Stream is a single HTTP request containing our XML document.
        Stream requestStream = channel.in();

        requestStream.filter((x) -> {return true;})
                .dispatchOn(dispatcher)
                .map(new FooRequestFunction())                    // 1)
                .flatMap(new BarRequestStreamFunction())          // 2)
                .flatMap(new DownstreamRequestZipFunction())      // 3)
                .reduce(new ArrayList(), (list,resp) -> {log.info("Reducing"); list.add(resp); return list;})                                        // 4)
                .consumeOn((x)->{log.info("Consume");}, (x)->{log.error("error");}, (x)->{log.info("Complete");}, dispatcher);      // 5)

    } catch (Exception e) {
        log.error("Exception thrown during Channel processing", e);
    }
}

在上面,我用reduce()调用替换了toList(),并将所有内容折叠成一个列表 。我可以很好地看到这个执行和日志记录。但是,无论我对最后一个调用做什么,在尝试consumeOn()、consumeOn()等之后,它都不会执行,也不会记录上面看到的最后一个调用。

在VisualVM中,我可以看到dispatcher线程都在与阻塞队列相关联的同一个对象监视器上等待-换句话说,它们都在等待工作的到来。就像尾部的consumeOn()调用被完全忽略一样。

我在这里做错了什么?我有什么不明白的?

 @Bean
    public NetServer httpServer(
            final Environment env,
            final MetricRegistry metrics,
            final ChannelConsumer consumer) throws InterruptedException {

        NetServer server = new TcpServerSpec(
                NettyTcpServer.class)
                .env(env)
                .options(
                        new NettyServerSocketOptions()
                            .pipelineConfigurer((ChannelPipeline pipeline) -> pipeline
                                .addLast(new HttpServerCodec())
                                .addLast(new HttpObjectAggregator(MAX_CONTENT_LENGTH))))
                .consume(consumer)
                .get();

        server.start().await();

        return server;
    }

没有为此配置调度程序,它似乎在暗中使用LMAX中断程序,而不是NettyEventLoopDispatcher。尚不清楚如何设置NettyEventLoopDispatcher并将其用作替代调度程序。

共有1个答案

史阳晖
2023-03-14

consumeon()调用是多余的,因为您已经使用了dispatcher(除非您的“实际”代码使用了与本例不同的内容)。在编写输出时,它将在内部切换到NettyEventLoopDispatcher

我做了一个快速检查,以确保该流在tcpserver之外工作,并且按预期工作:

@Test
public void testStream() throws InterruptedException {
    Stream<String> s1 = Streams.just("Hello World!");

    s1.filter(s -> s.startsWith("Hello"))
      .dispatchOn(Environment.sharedDispatcher())
      .map(s -> s.toUpperCase())
      .flatMap(s -> Streams.just(s, s))
      .flatMap(s -> Streams.just(s, s))
      .reduce(new ArrayList<>(), (l, s) -> {
          l.add(s);
          return l;
      })
      .consume(l -> {
          System.out.println("thread: " + Thread.currentThread() + ", l: " + l);
      });

    Thread.sleep(500);
}

只要注释掉.dispatchon()调用,就可以知道流是否能在Netty线程上工作。

 类似资料:
  • 我知道有很多关于这个的线索,但是没有一个对我有用。以下是我试图做的: Javac并从我的java代码中运行一个文件。它适用于视窗系统,但我想让它也适用于UNIX。代码如下: 问题是,在UNIX系统上,它的行为“不可预测”,例如: 打开图像,但 它什么也没做。没有按摩。 我非常感谢您的任何意见。 更新--------------------------------------------------

  • 问题内容: 这里的目标是在新的shell中运行新的python文件,并在现有的shell中运行现有的python文件。说我有两个文件,aaa.py和bbb.py。为了简单起见,aaa.py所做的只是… …并且可以说bbb.py确实是… 现在的目标是在终端1中运行aaa.py,并使其在终端2中启动bbb.py。我希望存在类似下面的命令,但无法弄清楚。 问题答案: 通常,无法从shell进行此操作。您

  • 在我的客厅里,我有一台苹果迷你电脑,既是HTPC,也是家庭自动化服务器。它用于自动化的软件是Shion,这是一款免费的家庭自动化应用,支持苹果脚本。在同一台苹果迷你电脑上,我运行着Apache,并建立了一个可以发送命令的界面。(不管怎样,这个界面是使用jQuery Mobile构建的。) 我遇到的问题是,在终端和AppleScript编辑器中工作正常的AppleScripts正在Apache错误日

  • 我对Java比较陌生,我完成了教程,其中解释了如何创建和运行JAR文件的示例。您能告诉我在命令行中运行JAR应该做些什么吗? 我创建了JAR文件: 此Jar文件已复制到桌面 我希望命令:java-jar06_02.jar应该执行文件。 在IntelliJ终端中,我切换到桌面目录(文件所在)并运行:java-jar 06\u 02。罐子 这会导致错误: 错误:发生JNI错误,请检查您的安装并在线程“

  • 关于通过Java向终端发送命令,我遇到了一个非常令人困惑的问题。我有这个密码: 第一个命令是这个"useradd user-p密码-d /home/ftp/test/-s /bin/false",第二个应该是这个回显用户名:new_password|chpasswd,第一个命令工作没有任何问题,并创建了我定义的用户"服务器"变量,但当我试图执行第二个命令来改变用户传递此命令可能永远不会发生,输出为

  • 问题内容: 我正在使用Python启动程序。 在某些情况下,程序可能会冻结。这是我无法控制的。我从命令行启动时唯一可以做的就是迅速杀死程序。 有什么办法可以模仿吗?我正在用来启动程序。 问题答案: 请查看该模块上的文档以了解更多信息:http : //docs.python.org/2/library/subprocess.html