当前位置: 首页 > 知识库问答 >
问题:

创建数千个Netty客户端,而无需创建数千个线程

姬念
2023-03-14

我使用Netty 4创建了一个相当直接的服务器。我已经能够将它扩展到处理数千个连接,而且它从未超过40个线程。

为了测试它,我还创建了一个创建数千个连接的测试客户端。不幸的是,这会创建和连接一样多的线程。我希望尽量减少客户端的线程。我已经看了很多帖子。许多示例显示了单个连接设置。这个和这个说在客户端之间共享NioEventLoopGroup,我这样做了。我得到的nioEventLoopGroup数量有限,但在其他地方每个连接都得到一个线程。我不是故意在管道中创建线程,也不知道可能是什么。

这是我的客户端代码设置的一个片段。根据我到目前为止的研究,它似乎应该保持固定的线程数。我应该做些什么来防止每个客户端连接一个线程吗?

主要的

final EventLoopGroup group = new NioEventLoopGroup();

for (int i=0; i<100; i++)){
    MockClient client = new MockClient(i, group);
    client.connect();
}

MockClient

public class MockClient implements Runnable {

    private final EventLoopGroup group;

    private int identity;

    public MockClient(int identity, final EventLoopGroup group) {
        this.identity = identity;
        this.group = group;
    }

    @Override
    public void run() {
        try {
            connect();
        } catch (Exception e) {}
    }

    public void connect() throws Exception{

        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new MockClientInitializer(identity, this));

        final Runnable that = this;
        // Start the connection attempt
        b.connect(config.getHost(), config.getPort()).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    Channel ch = future.sync().channel();
                } else {
                    //if the server is down, try again in a few seconds
                    future.channel().eventLoop().schedule(that, 15, TimeUnit.SECONDS); 
                }
            }
        });
    }
}

共有1个答案

卢磊
2023-03-14

正如我以前多次遇到的那样,详细解释这个问题让我思考了更多,我遇到了这个问题。如果其他人在创建数千个Netty客户端时遇到同样的问题,我想在这里提供它。

我的管道中有一条路径将创建一个超时任务来模拟客户端连接重新启动。事实证明,每当它收到来自服务器的“重新启动”信号(这种情况经常发生),直到每个连接都有一个线程,这个计时器任务就会为每个连接创建额外的线程。

处理程序

private final HashedWheelTimer timer;

@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception {

    Packet packet = reboot();

    ChannelFutureListener closeHandler = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            RebootTimeoutTask timeoutTask = new RebootTimeoutTask(identity, client);
            timer.newTimeout(timeoutTask, SECONDS_FOR_REBOOT, TimeUnit.SECONDS);
        }
    };

    ctx.writeAndFlush(packet).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                future.channel().close().addListener(closeHandler);
            } else {
                future.channel().close();
            }
        }
    });

}

超时任务

public class RebootTimeoutTask implements TimerTask {

    public RebootTimeoutTask(...) {...}

    @Override
    public void run(Timeout timeout) throws Exception {
        client.connect(identity);
    }

}
 类似资料:
  • 问题内容: 我们正在构建一个eLearningMultipleChoice工具,成千上万的用户将完成我们的测试。我们已经有成千上万的其他研讨会的订阅者,因此很可能成千上万的人也将完成MC测试。 现在,我们需要跟踪每个用户回答的每个问题,花了他多长时间,是否正确(经过多次尝试之后),如果不是,他给出了哪个错误答案等等。确实有很多数据。 现在,我们将有成千上万的问题和成千上万的用户。由于每个问题至少要

  • 我正在尝试从另一台机器创建与基于java的套接字服务器的多个客户端连接。服务器和客户端都使用Netty 4进行NIO。在服务器端,我使用了boss和Worker group,它能够在单个linux盒上接收和服务器100000并发连接(在设置内核参数和ulimited之后)。 但是,我最终在客户端为每个连接创建了一个新线程,这导致了JVM线程限制异常。 有人能告诉我,我如何使用Netty从客户端创建

  • 如何恰当地做到这一点?也许我需要配置akka调度程序?

  • 语境 Spark2.0.1,spark-submit在集群模式下。我正在从HDFS中读取一个拼花地板文件: 我正在将保存到一个配置单元表中,该表包含按分组的50个桶: 现在,当我查看hdfs的hive仓库时,有一个名为的文件夹。它的内部是一堆文件。我希望有50个文件。但是没有,有3201个文件!!!!每个分区有64个文件,为什么?对于我保存为配置单元表的不同文件,每个分区有不同数量的文件。所有的文

  • 如果我只是想在Netty应用程序中不使用ByteBuf。我使用创建一个ByteBuf,那么我必须在函数末尾调用吗?

  • 创建客户端有两种方式,一种是直接使用特化的构造器函数,另一种是使用工厂构造器函数。 第一种方式返回的是具体的客户端结构体指针对象,第二种方式返回的是客户端接口对象。 使用特化的构造器函数创建客户端 特化的构造器函数有下面几个: func NewHTTPClient(uri ...string) (client *HTTPClient) func NewTCPClient(uri ...string