当前位置: 首页 > 知识库问答 >
问题:

如何使用netty在单独的线程池中执行业务逻辑处理程序

罗星洲
2023-03-14

我有一个需要执行一些业务逻辑的处理程序,我希望它在单独的线程池中执行,以不阻塞io事件循环。我已将DefaultEventExecutorGroup添加到管道中,如http://netty.io/4.0/api/io/netty/channel/ChannelPipeline.htmljavadoc和http://netty.io/wiki/new-and-noteworthy-in-4.0.html#no-more-executionhandler---its-in-the-corewiki中指定的:

ch.pipeline().addLast(new DefaultEventExecutorGroup(10), new ServerHandler());

只是为了测试目的,我的服务器处理程序只是将当前线程置于Hibernate状态 5 秒钟:

protected void channelRead0(ChannelHandlerContext ctx, Command cmd) throws Exception {
    System.out.println("Starting.");

    try {
        Thread.currentThread().sleep(5000);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    System.out.println("Finished.");        
}

但显然业务逻辑仍然是同步执行的:

Starting.
Finished.
Starting.
Finished.
Starting.
Finished.

我错过了什么?

共有2个答案

喻增
2023-03-14

因为Netty处理由同一个EventExecutor从同一套接字发送的请求,所以您可以启动多个客户端,并查看结果。

吴靖
2023-03-14

如果您的目标不是阻止IO事件循环-您做得对。但是由于特定于网络,您的处理程序将始终附加到EventExecutorGroup的同一线程,因此您上面描述的行为是预期的。

如果您想在阻塞操作到达后立即并行执行阻塞操作,则需要使用另一种方式 - 单独的线程池计算器。喜欢这个:

ch.pipeline().addLast(new ServerHandler(blockingThreadPool));

其中,阻塞线程池是常规线程池计算器

例如:

ExecutorService blockingThreadPool = Executors.newFixedThreadPool(10);

现在,在您的逻辑处理程序中,您可以如下所示向该执行器提交阻塞任务:

protected void channelRead0(ChannelHandlerContext ctx, Command cmd) throws Exception {

    blockingIOProcessor.execute(new Runnable() {
        @Override
        public void run() {
            System.out.println("Starting.");

            try {
                Thread.currentThread().sleep(5000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            System.out.println("Finished.");
        }
    });

}

如果需要,还可以将上下文传递给这个runnable,以便在处理完成时返回响应。

 类似资料:
  • 问题内容: 您能解释一下Netty如何使用线程池工作吗?我是否正确理解,线程池有两种:老板线程和工人线程。老板用于执行I / O,而worker用于调用用户回调(messageReceived)来处理数据? 问题答案: 这是来自NioServerSocketChannelFactory文档 一个ServerSocketChannelFactory,它创建一个基于NIO的服务器端ServerSock

  • 我使用Spark 2.1.1。 我使用结构化流从2个Kafka分区读取消息。我正在向Spark Standalone集群提交我的应用程序,其中有一个工人和两个执行者(每个2个核心)。 我想要这样的功能,来自每个Kafka分区的消息应该由每个单独的执行器独立处理。但现在正在发生的是,执行器分别读取和映射分区数据,但在映射之后,形成的无边界表被普遍使用,并且具有来自两个分区的数据。 当我对表运行结构化

  • 我目前正在使用Android Room在Android上存储一些小数据。理论上,我使用allowMainThreadQueries()应该没有问题,因为我的所有数据都很小,但为了将来证明我的程序,我尝试将所有调用移动到一个单独的线程中。到目前为止,我在AppConfig中有一个静态处理程序,该类在应用程序初始启动时初始化,在应用程序范围内可见: DbHandler是一个扩展Handler的自定义类

  • 情况:考虑到telnet客户端 这是Netty 4! 通道处理程序在一段时间内阻塞线程,而不是echo回复消息(就像telnet服务器演示所做的那样)(在现实世界中,像JDBC或JSch,...). 这实际上是可行的:我正在用测试它,线程将被阻塞(并且waitis),直到3秒钟后返回。 然而,这意味着我正在用一个不相关的任务阻塞Netty的事件循环工作组的线程。 因此,我更改了频道注册并应用了自定

  • 问题内容: 现在我想在进入for循环之前集中所有任务,但是当我运行此程序时,for循环会在此之前执行并引发此异常: 问题答案: 一种工作方式是,当您调用它时,它等待所有任务完成: 执行给定的任务,并在所有任务完成时返回保存其状态和结果的期货列表。Future.isDone()对于返回列表的每个元素为true。 请注意,已完成的任务可能已正常终止或引发了异常而终止 。如果在进行此操作时修改了给定的集

  • 我试图使用Netty 4.0.8创建一个相当简单的WebSocket服务器。我有基本的握手设置和工作。但是从一个单独的线程发送的消息似乎没有传到客户端。 客户机/服务器交互的工作方式是,客户机启动连接,然后通过WebSocket发送初始消息(“hello”)。服务器立即响应。此消息通过Chrome开发工具传递并可见。写入此消息后,我将存储在中。此初始化如下: 然后添加,如下所示: 在单独的线程(在