当前位置: 首页 > 知识库问答 >
问题:

socket.io和节点集群:将事件分派到套接字

狄玉书
2023-03-14

我有一个nodejs集群服务器,它使用mongo changestream侦听器,通过socket.io向客户端发送数据。

< code>{ userId: 'aaa ',socketId: 'bbb' }

用于存储此数据的redis客户端在主进程中初始化。mongo变更流是在主进程中创建的。

当变更流看到新文档时,它会将该文档作为消息发送到子流程。当子进程收到消息时,它可以从文档中检索 userId。使用 userId,可以从 redis 检索客户端连接的套接字 ID。

我遇到的问题是在从 redis 检索到套接字 Id 后尝试发出消息。我正在创建一个包含套接字 Id 的套接字处理程序对象。当我使用此套接字 Id 发出套接字消息时,如下所示:

io.sockets.to(userSocketId)
          .emit("confirmOrder", "Your order is being processed!")

我收到一个错误:

(node:31804)UnhandledPromiseRejectionWarning:错误:客户端在新的ClientClosedError处关闭(/Users/a 99999999/code/*/node _ modules/@ node-redis/client/dist/lib/errors . js:24:9)

错误来自redis,起源于上面写的套接字发射行。^^

下面是来自工作进程的更多代码:

const pubClient = createClient({ host: "127.0.0.1", port: 6379 }),
    subClient = pubClient.duplicate();
  io.adapter(createAdapter(pubClient, subClient));
  setupWorker(io);

  io.on("connection", (socket) => {
    const socketId = socket.id;

    socket.emit("connection", "SERVER: you are connected");

    socket.on("userConnect", (user) => {
      let { userId } = user;
      userConnectClient
        .HSET(userId, { userId, socketId })
        .catch((err) => console.log("ERROR: ", err));
    });
  });

  process.on("message", async ({ type, data }) => {
    switch (type) {
      case "dispatch:order":
        let { order } = JSON.parse(data);
        const socketsHandler = await createSocketsHandler(order);

        const userSocketId = socketsHandler.user.socketId;
        io.sockets
          .to(userSocketId)
          .emit("confirmOrder", "Your order is being processed!");
        break;
    }
  });

  async function createSocketsHandler(order) {
    let { userId } = order;
    let user = await userConnectClient
      .HGETALL(userId)
      .catch((err) => console.log(err));
    return {
      user: user,
    };
  }

在这一点上我暂时被难住了。目前正在试验io对象,并试图找到更好的工具来监控redis。感谢任何帮助/问题!谢谢大家!

共有1个答案

容修贤
2023-03-14

从那时起,我就意识到redis客户端工作不正常的原因。我正在使用redis的发布者和订户客户端。问题是,我在服务器的工作进程中创建redis客户端。因此,每当服务器向redis发出命令时,它都无法正确执行,因为每个工作进程都有一对客户端,我认为这不是正确的实现。

这是通过在我的集群服务器代码之外创建redis客户端来解决的。; P我的服务器现在可以在主进程和工作进程中正确订阅redis客户端了!

 类似资料:
  • 如有任何帮助,不胜感激。

  • 我使用他们的web UI在EMR上创建了一个AWS Spark2.2集群(这里是新手)。我知道我需要连接到主节点,以便开始发出pyspark命令来学习Spark。但是,当我尝试连接到主节点时,它给我一个错误。在浏览了internet之后,我发现使用可能有助于调试正在进行的操作,但我找不到任何有用的信息。下面是我的ssh调试日志。 有人能指出这里的问题是什么吗?编辑:我已经尝试过将端口22添加到安全

  • 我不知道如何通过N连接到AWS的ElastiCache Redisode.js.我已经成功地通过node_redisNPM连接到主主机(001),但是我无法使用ioredis的集群能力,因为显然ElastiCache没有实现CLUSTER命令。 我认为必须有另一种方法,但用于节点的AWS SDK只有用于管理ElastiCache的命令,而不是用于实际连接到ElastiCache的命令。 如果不使用

  • 我按照以下说明设置了一个多节点kafka集群。现在,如何连接到动物园管理员?在JAVA中,只连接一个来自生产者/消费者端的动物园管理员可以吗?或者有办法连接所有的动物园管理员节点吗? 设置多节点阿帕奇动物园守护者集群 在集群的每个节点上,将以下行添加到文件kafka/config/zookeeper.properties中 在群集的每个节点上,在由 dataDir 属性表示的文件夹中创建一个名为

  • 我的第二个问题是:是否需要?我将来可能会添加更多的节点。

  • 我有 2 个 docker 容器运行我的 Web 应用程序和机器学习应用程序,都使用 h2o。最初,我既调用 h2o.init() 又指向同一个 IP:PORT,因此初始化了一个具有一个节点的 h2o 集群。 考虑到我已经训练了一个模型,现在我正在训练第二个模型。在此训练过程中,如果web应用程序调用h2o集群(例如,从第一个模型请求预测),它将终止训练过程(错误消息如下),这是无意的。我尝试为每