当前位置: 首页 > 知识库问答 >
问题:

Netty UDP服务器中不并发执行的线程

沃博裕
2023-03-14
ExecutorService threadPool = Executors.newCachedThreadPool();

然后是数据报通道、pipelineFactory和Bootstrap:

int workerCount = 10;
DatagramChannelFactory datagramChannelFactory = new NioDatagramChannelFactory(threadPool, workerCount);
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory();

ConnectionlessBootstrap bootStrap = new ConnectionlessBootstrap(datagramChannelFactory);
bootStrap.setPipelineFactory(pipelineFactory);
bootStrap.bind(new InetSocketAddress(host, port));

在pipelineFactory中,getPipeline()添加自定义处理程序。

就像中所说的:UDP消息的多线程处理

然后我根据这些条目修改了一些代码。现在创建线程池的条件是:

int corePoolSize = 5;
ExecutorService threadPool = new OrderedMemoryAwareThreadPoolExecutor(corePoolSize, 1048576, 1048576);

和ExecutionHandler的pipelineFactory:

ExecutionHandler executionHandler = new ExecutionHandler(threadPool);
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory(executionHandler);

getPipeline()添加处理程序,如所描述的:

public class SNMPTrapsPipeLineFactory implements ChannelPipelineFactory {

    private ExecutionHandler executionHandler = null;

    public SNMPTrapsPipeLineFactory(ExecutionHandler executionHandler) { 
        this.executionHandler = executionHandler;
    }

    @Override
    public ChannelPipeline getPipeline() throws Exception {

        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addFirst("ExecutorHandler", executionHandler);

        // Here the custom handlers are added
        pipeline.addLast( ... )
    }

但它们不同时处理。messageReceived()下的处理必须在一个线程上完成,下一个线程才能处理下一个消息。我从不同的客户机向服务器发送了大量的消息,而我得到的日志并不是交错的。我还尝试在messageReceived()中使用thread.sleep()来确认前面的内容。

我是不是漏掉了什么?有没有办法用Netty实现真正的多线程UDP服务器?如何让不同的线程并发执行messageReceived()?

共有1个答案

伯和蔼
2023-03-14

根据我的经验和我对使用UDP的Netty的理解,只有一个线程处理UDP消息进行解码是正常的。由于UDP是无会话的,因此只有一个线程可以在一个UDP端口上接收数据并对其进行解码。

一旦解码了数据并将其包装到缓冲区或特定java对象中,就可以将该对象放入将处理它的线程池(执行处理程序->您的业务处理程序)。然后,一旦将先前解码的数据释放到执行处理程序中,就可以解码UDP端口上即将出现的新数据。

创建NioDatagramChannelFactory时可以指定的池线程仅在侦听多个端口上的数据时才使用。每个端口只有一个线程是有意义的。即使在该构造函数中指定了100个工作者,如果配置了一个UDP端口,也只会使用一个工作者。

 类似资料:
  • 这种方法的Java博士说 如果需要,最多等待给定的时间完成计算,然后检索其结果(如果可用)。 参数: 超时等待的最长时间 unit超时参数的时间单位 根据我的理解,我们对强加了一个超时,我们提交给,这样,我的将在指定的时间(超时)过去后中断 但是根据下面的代码,似乎超过了超时时间(2秒),我真的很难理解这一点。谁能给我指一下正确的路吗?

  • 欢迎阅读我的Java8并发教程的第一部分。这份指南将会以简单易懂的代码示例来教给你如何在Java8中进行并发编程。这是一系列教程中的第一部分。在接下来的15分钟,你将会学会如何通过线程,任务(tasks)和 exector services来并行执行代码。 并发在Java5中首次被引入并在后续的版本中不断得到增强。在这篇文章中介绍的大部分概念同样适用于以前的Java版本。不过我的代码示例聚焦于Ja

  • 我确信这两个列表都不是空的,并且正在调用,但是没有调用order execution run方法....

  • 我有一个数据库结果,每一个调用创建500条记录500条,然后下一个500条,然后下一个 我需要运行一个记录每个不同线程执行特定任务的程序 我举的例子如下 ExecutorService executor=Executors.newFixedThreadPool(10); 我的问题是,在完成当前executer服务之前,it需要获得接下来的500个用户并尝试开始处理,我需要停止该操作,直到处理了前5

  • 不同于顺序服务器,并发服务器 就要能在一个时间为多个客户端提供服务。 例如,一个聊天服务器可能服务一个特定的客户端数小时 ──在停止为这个客户端服务之前服务器不能等待, 除非是在等待一下个客户端到来之前的间隙才能等待。 这需要在我们的流程图中做一个重要的更改: 我们将提供服务从 守护进程移至它自己的服务进程。 然而,因为每个子进程都继承所有打开的文件(套接字被像文件一样处理), 新进程不仅继承“a

  • 我有一个Cucumber测试套件,由Jenkins在一个服务器池上执行,该服务器池有时在服务器池中并发运行。 当服务器a上的测试运行更改了数据库存储的设置时,我遇到了竞争情况,这会导致服务器B上的测试运行失败,因为它正在寻找处于不同状态的设置 有没有办法将cucumber tests或Jenkins配置为在服务器B上运行场景,等待服务器a上运行的场景完成后再继续?