当前位置: 首页 > 知识库问答 >
问题:

群集顶点中的SockJS连接。x环境

东门子昂
2023-03-14

vertx应用程序在两个EC2实例上的Docker容器中运行,并且是集群的。

集群是通过hazelcast aws插件实现的,应用程序的启动如下:

docker run --name ... -p ... \
--network ... \
-v ... \
-d ... \
-c 'exec java \
-Dvertx.eventBus.options.setClustered=true \
-Dvertx.eventBus.options.setClusterPort=15701 \
-jar ... -conf ... \
-cluster'

没有以编程方式设置任何与群集相关的内容。

客户端在第一个请求上打开一个套接字并将其用于未来的类似请求。
每个请求将:

  1. 通过向事件总线发布消息来启动与服务器的异步请求
  2. 在事件总线上注册一个使用者,该事件总线将处理上述结果,并将引用传递给它应该将结果发送到的套接字连接

由于vertx在集群时默认进行循环,并且有两个实例,这意味着任何实例都会获得其他消息(来自上面的1),并使仅连接到一个实例的客户端接收到所有预期响应的一半。

我想这是因为,即使注册的使用者有一个对套接字对象的引用,它也不能使用它,因为它是在不同的节点/Web服务器上创建的。

这是正确的吗?有没有一种方法可以将100%的消息发送到客户端,只连接到一个节点,而不引入RabbitMQ之类的东西?

这是SockJS处理程序代码:

SockJSHandler sockJSHandler = SockJSHandler.create(vertx, new SockJSHandlerOptions());
sockJSHandler.socketHandler(socket -> {
    SecurityService securityService = (SecurityService) ServiceFactory.getService(SecurityService.class);
    if (securityService.socketHeadersSecurity(socket)) {
        socket.handler(socketMessage -> {
            try {
                LOGGER.trace("socketMessage: " + socketMessage);
                Socket socket = Json.decodeValue(socketMessage.toString(), Socket.class);
                Report report = socket.getReport();
                if (report != null) {
                    Account accountRequest = socket.getAccount();
                    Account accountDatabase = accountRequest == null ? null
                            : ((AccountService) ServiceFactory.getService(AccountService.class)).getById(accountRequest.getId());
                    Response result = securityService.socketReportSecurity(accountRequest, accountDatabase, report) ?
                            ((ReportService) ServiceFactory.getService(ReportService.class)).createOrUpdateReport(report, accountDatabase)
                            : new Response(Response.unauthorized);
                    if (Response.success.equals(result.getResponse())) {
                        //register a consumer
                        String consumerName = "report.result." + Timestamp.from(ClockFactory.getClock().instant());
                        vertx.eventBus().consumer(consumerName, message -> {
                            Response executionResult;
                            if ("success".equals(message.body())) {
                                try {
                                    Path csvFile = Paths.get(config.getString(Config.reportPath.getConfigName(), Config.reportPath.getDefaultValue())
                                            + "/" + ((Report) result.getPayload()).getId() + ".csv");
                                    executionResult = new Response(new JsonObject().put("csv", new String(Files.readAllBytes(csvFile))));
                                } catch (IOException ioEx) {
                                    executionResult = new Response(new Validator("Failed to read file.", ioEx.getMessage(), null, null));
                                    LOGGER.error("Failed to read file.", ioEx);
                                }
                            } else {
                                executionResult = new Response(new Validator("Report execution failed", (String)message.body(), null, null));
                            }
                            //send second message to client
                            socket.write(Json.encode(executionResult));
                            vertx.eventBus().consumer(consumerName).unregister();
                        });
                        //order report execution
                        vertx.eventBus().send("report.request", new JsonObject()
                                .put("reportId", ((Report) result.getPayload()).getId())
                                .put("consumerName", consumerName));
                    }
                    //send first message to client
                    socket.write(Json.encode(result));
                } else {
                    LOGGER.info("Insufficient data sent over socket: " + socketMessage.toString());
                    socket.end();
                }
            } catch (DecodeException dEx) {
                LOGGER.error("Error decoding message.", dEx);
                socket.end();
            }
        });
    } else {
        LOGGER.info("Illegal socket connection attempt from: " + socket.remoteAddress());
        socket.end();
    }
});
mainRouter.route("/html" target="_blank">websocket/*").handler(sockJSHandler);

有趣的是,当在localhost上运行两个集群节点时,客户端将获得100%的结果。

编辑:这不是SockJS,而是配置问题。

共有1个答案

郜光明
2023-03-14

由于vertx在集群时默认进行循环,并且有两个实例,这意味着任何实例都会获得其他消息(来自上面的1),并使仅连接到一个实例的客户端接收到所有预期响应的一半。

这个假设只是部分正确。Vert. x确实循环,是的,但这意味着每个实例将获得一半的连接,而不是一半的消息。

一旦建立连接,其所有消息将到达单个实例。

因此:

这是正确的吗?有没有一种方法可以将100%的消息发送到客户端,只连接到一个节点,而不引入RabbitMQ之类的东西?

已经发生了。

 类似资料:
  • 关于这个设置,我有几个问题: > 我的应用程序(Spring/Hibernate)每个用户有一个不同的数据库。所以这里的问题是数据源(使用spring和hibernate来实现持久性)是在Tomcat级别创建的。因此,无论我做什么连接池都是在服务器级别。 根据集群配置,Tomcat实例将创建它们自己的连接池。

  • 我按照以下说明设置了一个多节点kafka集群。现在,如何连接到动物园管理员?在JAVA中,只连接一个来自生产者/消费者端的动物园管理员可以吗?或者有办法连接所有的动物园管理员节点吗? 设置多节点阿帕奇动物园守护者集群 在集群的每个节点上,将以下行添加到文件kafka/config/zookeeper.properties中 在群集的每个节点上,在由 dataDir 属性表示的文件夹中创建一个名为

  • 使用节点。js集群支持,我希望主进程生成添加到Redis队列的数据项。然后,我想运行多个读取Redis队列的工作进程。当然,只有一个工作进程应该使用从队列中检索到的数据项。 为了让我开始学习,您能建议从节点包支持或原始Redis命令的角度来完成这项工作吗?让我强调一下,消费者是节点中独立的进程。js集群环境,我们可以在其中调整竞争从单个Redis队列读取的工作进程的数量,以调整整体系统性能。

  • 我想将配置文件中的多个Cassandraendpoint提供给我的Java应用程序。 例如:cassandra主机:“主机1,主机2” 我尝试了< code > addContactPoints(host),但它不起作用。如果其中一个Cassandra节点关闭,我不希望我的应用程序关闭。

  • 我一直在试验Vert. x的高可用性功能来测试水平可扩展性和弹性。我有一个基于Hazelcast的几个节点的集群。我正在通过HTTP应用编程接口在任何节点上创建顶点。Verticle在创建时设置了标志。 如果我有< code>n个节点< code>Nn加载了HA-verticles,并且如果我添加了一个额外的节点,则没有从新节点上的< code>Nn节点迁移的vertices,因此负载将会平衡。有

  • 我使用他们的web UI在EMR上创建了一个AWS Spark2.2集群(这里是新手)。我知道我需要连接到主节点,以便开始发出pyspark命令来学习Spark。但是,当我尝试连接到主节点时,它给我一个错误。在浏览了internet之后,我发现使用可能有助于调试正在进行的操作,但我找不到任何有用的信息。下面是我的ssh调试日志。 有人能指出这里的问题是什么吗?编辑:我已经尝试过将端口22添加到安全