当前位置: 首页 > 知识库问答 >
问题:

vertx内的多线程

查修谨
2023-03-14

我是vert的新手。x、 我在尝试垂直。x“NetServer”功能。http://vertx.io/core_manual_java.html#writing-tcp服务器和客户端,它的工作方式很有魅力。

然而,我也读到“verticle实例严格来说是单线程的。

如果您创建一个简单的TCP服务器并部署它的单个实例,那么该服务器的所有处理程序始终在同一个事件循环(线程)上执行。”

目前,对于我的实现,我希望接收TCP字节流,然后触发另一个组件。但这不应该是Verticle的“start”方法中的阻塞调用。那么,在start方法中编写执行器是一种好的做法吗?或者vertx是否自动处理此类情况。

这里有一个片段

public class TCPListener extends Verticle {

    public void start(){

        NetServer server = vertx.createNetServer();

        server.connectHandler(new Handler<NetSocket>() {
            public void handle(NetSocket sock) {
                container.logger().info("A client has connected");
                sock.dataHandler(new Handler<Buffer>() {
                    public void handle(Buffer buffer) {
                        container.logger().info("I received " + buffer.length() + " bytes of data");

                        container.logger().info("I received " + new String(buffer.getBytes()));
                        //Trigger another component here. SHould be done in a sperate thread. 
                        //The previous call should be returned . No need to wait for component response.
                    }
                });
            }
        }).listen(1234, "host");
    }
}

应该有什么机制使其成为非阻塞调用

共有3个答案

白翔
2023-03-14

正如duffymo提到的那样,创建线程违背了使用vertx的目的。最好的方法是将消息写入eventbus并创建一个新的处理程序来监听来自eventbus的消息。更新代码以展示这一点。将消息写入“next.topic”主题,并注册一个处理程序来读取来自“next.topic”主题的消息。

public class TCPListener extends Verticle {

public void start(){

    NetServer server = vertx.createNetServer();

    server.connectHandler(new Handler<NetSocket>() {
        public void handle(NetSocket sock) {
            container.logger().info("A client has connected");
            sock.dataHandler(new Handler<Buffer>() {
                public void handle(Buffer buffer) {
                    String recvMesg = new String(buffer.getBytes());
                    container.logger().info("I received " + buffer.length() + " bytes of data");

                    container.logger().info("I received " + recvMesg);
                    //Writing received message to event bus
                    vertx.eventBus().send("next.topic", recvMesg);
                }
            });
        }
    }).listen(1234, "host");

    //Registering new handler listening to "next.topic" topic on event bus
    vertx.eventBus().registerHandler("next.topic", new Handler<Message<String>() {
       public void handle(Message<String> mesg) {
           container.logger.info("Received message: "+mesg.body());
       }
    };

}
}
卢德惠
2023-03-14

最灵活的方法是创建ExecutorService并使用它处理请求。这带来了对工作线程模型的细粒度控制(固定或可变的线程数,应该在单个线程上串行执行哪些工作,等等)。

修改后的示例可能如下所示:

public class TCPListener extends Verticle {

    private final ExecutorService executor = Executors.newFixedThreadPool(10);

    public void start(){

        NetServer server = vertx.createNetServer();

        server.connectHandler(new Handler<NetSocket>() {
            public void handle(final NetSocket sock) { // <-- Note 'final' here
                container.logger().info("A client has connected");
                sock.dataHandler(new Handler<Buffer>() {
                    public void handle(final Buffer buffer) { // <-- Note 'final' here

                        //Trigger another component here. SHould be done in a sperate thread. 
                        //The previous call should be returned . No need to wait for component response.
                        executor.submit(new Runnable() {

                            public void run() {
                                //It's okay to read buffer data here
                                //and use sock.write() if necessary
                                container.logger().info("I received " + buffer.length() + " bytes of data");
                                container.logger().info("I received " + new String(buffer.getBytes()));
                            }
                        }
                    }
                });
            }
        }).listen(1234, "host");
    }
}
广晔
2023-03-14

我认为这不是vert的发展方向。十、

更好的方法是正确使用事件总线而不是执行器。让工作人员在总线上响应事件,进行处理,并在总线完成时向总线发出信号。

创建线程违背了使用vert的目的。十、

 类似资料:
  • 我在健康检查中使用ClusterHealthCheck过程,并且在日志中看到Vertx Thread被阻止的异常。我需要将健康检查作为阻塞代码执行吗? 下面是我用来创建健康检查的代码, 低于例外 我查看了ClusterHealthCheck类,它看起来很简单,从集群管理器获取分区服务并获取集群状态,但不确定为什么需要5秒钟以上才能完成。 我使用的是vertx 4.0.3、hazelcast 4.0

  • 在Vertx官方文件中,我阅读了以下段落 在关于Reactor的文章中: 据我所知,如果我写错了,请纠正我,Vertx的工作方式如下: 当我们为阻塞代码提供一个处理程序时,vertx将线程池中的一个线程(不是事件循环)放入该代码中,该线程等于内核数,例如我有4个内核,事件循环检查每次迭代后该线程的状态,如果它准备好执行与该阻塞代码相关的处理程序,当所有4个内核都忙时,vertx将任务放入队列,稍后

  • 问题内容: 我正在努力加快某些过程的执行速度,这些过程将大量记录(大多数是几百万个)发布到Elasticsearch。在我的C#代码中,我已经使用Dataflow实现了一个多线程解决方案,如下所示: 然后我要实现的发送批量请求调用: 我的问题 ,你 是对的实用性存在的数据流管道的一部分的锁内执行额外的线程。 这个可以吗?我可以在性能,执行,缓存/内存丢失等方面看到任何潜在的问题吗? 任何见识都会很

  • 我有两个问题与Vertx线程模型有关。文件提到: 一个顶点。默认情况下,x实例维护N个事件循环线程(其中N默认为core*2) 对于许多现代应用程序所需的并发级别,阻塞方法无法扩展 Vertx还提供了线程池相关的功能,以处理使用服务器资源的任务,这些资源需要长时间的事件处理(工作线程)。 好的,所以我们知道线程在所需内存(例如堆栈)和上下文切换方面有开销。 Vertx线程没有被阻塞(如果正确使用)

  • 我们使用Vertx 3.0.0。 情况: 我们有一个带Rest处理器的主垂直面。 REST处理程序通过事件总线调用worker,如下所示: 我们有工人垂直,这就是我们执行垂直的方式: 这里是Worker实现: 如果我多次请求REST服务,那么所有消息都会在Worker中按顺序处理。为什么多线程在这里不起作用?worker多线程选项的目的是什么(文档中不清楚它到底是如何工作的)? 顺便说一句,如果我

  • 问题内容: Vert.x是用于在JVM上构建响应式应用程序的工具包。 我想对基于JVM的自动可伸缩RESTful后端API使用vertx。 到目前为止,我从文档中发现,默认情况下,它需要计算机中的内核数,假设您有N个内核并为每个内核创建N个线程,每个线程是一个事件总线,每个线程包含vertx实例。问题是,Vertx如何控制实例数量?基于负载压力? 关于控制通过给定线程运行的Verticles数量的