当前位置: 首页 > 面试题库 >

使用集群将Socket.IO扩展到多个Node.js进程

戚俊人
2023-03-14
问题内容

可以说我在 四个 工作进程(伪)上具有以下内容:

// on the server
var express = require('express');
var server = express();
var socket = require('socket.io');
var io = socket.listen(server);

// socket.io
io.set('store', new socket.RedisStore);

// set-up connections...
io.sockets.on('connection', function(socket) {

  socket.on('join', function(rooms) {
    rooms.forEach(function(room) {
      socket.join(room);
    });
  });

  socket.on('leave', function(rooms) {
    rooms.forEach(function(room) {
      socket.leave(room);
    });
  });

});

// Emit a message every second
function send() {
  io.sockets.in('room').emit('data', 'howdy');
}

setInterval(send, 1000);

在浏览器上…

// on the client
socket = io.connect();
socket.emit('join', ['room']);

socket.on('data', function(data){
  console.log(data);
});

问题: 由于四个独立的工作进程发送消息,因此我每秒收到 四条 消息。

如何确保邮件仅发送一次?


问题答案:

编辑: 在Socket.IO 1.0+中,现在可以使用更简单的Redis适配器模块,而不是通过多个Redis客户端设置存储。

var io = require('socket.io')(3000);
var redis = require('socket.io-redis');
io.adapter(redis({ host: 'localhost', port: 6379 }));

下面显示的示例看起来更像这样:

var cluster = require('cluster');
var os = require('os');

if (cluster.isMaster) {
  // we create a HTTP server, but we do not use listen
  // that way, we have a socket.io server that doesn't accept connections
  var server = require('http').createServer();
  var io = require('socket.io').listen(server);
  var redis = require('socket.io-redis');

  io.adapter(redis({ host: 'localhost', port: 6379 }));

  setInterval(function() {
    // all workers will receive this in Redis, and emit
    io.emit('data', 'payload');
  }, 1000);

  for (var i = 0; i < os.cpus().length; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  }); 
}

if (cluster.isWorker) {
  var express = require('express');
  var app = express();

  var http = require('http');
  var server = http.createServer(app);
  var io = require('socket.io').listen(server);
  var redis = require('socket.io-redis');

  io.adapter(redis({ host: 'localhost', port: 6379 }));
  io.on('connection', function(socket) {
    socket.emit('data', 'connected to worker: ' + cluster.worker.id);
  });

  app.listen(80);
}

如果您有一个主节点需要发布到其他Socket.IO进程,但本身不接受套接字连接,请使用socket.io-
emitter
而不是socket.io-
redis

如果无法扩展,请使用来运行Node应用程序DEBUG=*。Socket.IO现在实现调试,该调试还将打印出Redis适配器调试消息。输出示例:

socket.io:server initializing namespace / +0ms
socket.io:server creating engine.io instance with opts {"path":"/socket.io"} +2ms
socket.io:server attaching client serving req handler +2ms
socket.io-parser encoding packet {"type":2,"data":["event","payload"],"nsp":"/"} +0ms
socket.io-parser encoded {"type":2,"data":["event","payload"],"nsp":"/"} as 2["event","payload"] +1ms
socket.io-redis ignore same uid +0ms

如果您的主进程和子进程都显示相同的解析器消息,则您的应用程序正在正确扩展。

如果您是从单个工作人员中解雇,则设置应该不会有问题。您正在执行的操作是从所有四个工作人员发出的,并且由于Redis的发布/订阅,消息不是重复的,而是按您要求应用html" target="_blank">程序执行的那样写了四次。这是Redis的简单示意图:

Client  <--  Worker 1 emit -->  Redis
Client  <--  Worker 2  <----------|
Client  <--  Worker 3  <----------|
Client  <--  Worker 4  <----------|

如您所见,当您从某个工作程序发出时,它会将发布内容发布到Redis,并将从其他已订阅Redis数据库的工作程序中镜像出来。这也意味着您可以使用连接同一实例的多个套接字服务器,并且一台服务器上的发射将在所有连接的服务器上触发。

使用群集时,当客户端连接时,它将连接到您的四个工作线程之一,而不是全部四个。这也意味着您从该工作人员发出的任何内容只会显示给客户端一次。因此,是的,应用程序正在扩展,但是您的操作方式是,您要从所有四个工作程序中释放资源,而Redis数据库则使它像在单个工作程序上调用了四次一样。如果客户端实际上连接到您的所有四个套接字实例,则它们每秒将接收十六条消息,而不是四条。

套接字处理的类型取决于您将要拥有的应用程序的类型。如果您要分别处理客户端,那么应该没有问题,因为连接事件将仅对每个客户端一个工作人员触发。如果需要全局“心跳”,则可以在主进程中使用套接字处理程序。由于工作人员在主进程死亡时死亡,因此您应抵消主进程的连接负载,并让子进程处理连接。这是一个例子:

var cluster = require('cluster');
var os = require('os');

if (cluster.isMaster) {
  // we create a HTTP server, but we do not use listen
  // that way, we have a socket.io server that doesn't accept connections
  var server = require('http').createServer();
  var io = require('socket.io').listen(server);

  var RedisStore = require('socket.io/lib/stores/redis');
  var redis = require('socket.io/node_modules/redis');

  io.set('store', new RedisStore({
    redisPub: redis.createClient(),
    redisSub: redis.createClient(),
    redisClient: redis.createClient()
  }));

  setInterval(function() {
    // all workers will receive this in Redis, and emit
    io.sockets.emit('data', 'payload');
  }, 1000);

  for (var i = 0; i < os.cpus().length; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  }); 
}

if (cluster.isWorker) {
  var express = require('express');
  var app = express();

  var http = require('http');
  var server = http.createServer(app);
  var io = require('socket.io').listen(server);

  var RedisStore = require('socket.io/lib/stores/redis');
  var redis = require('socket.io/node_modules/redis');

  io.set('store', new RedisStore({
    redisPub: redis.createClient(),
    redisSub: redis.createClient(),
    redisClient: redis.createClient()
  }));

  io.sockets.on('connection', function(socket) {
    socket.emit('data', 'connected to worker: ' + cluster.worker.id);
  });

  app.listen(80);
}

在该示例中,有五个Socket.IO实例,一个是主实例,四个是子实例。主服务器从不调用listen()因此该进程没有连接开销。但是,如果您在主进程上调用发射,它将被发布到Redis,并且四个工作进程将在其客户端上执行发射。这抵消了对工作人员的连接负载,如果工作人员死亡,则主应用程序中的主要应用程序逻辑将保持不变。

请注意,使用Redis,所有发射,即使是在名称空间或房间中,也会由其他工作进程处理,就像您从该进程触发了发射一样。换句话说,如果您有两个带有一个Redis实例的Socket.IO实例,则emit()在第一个工作线程中调用套接字将向其客户端发送数据,而第二个工作线程将执行与您从该工作线程调用emit相同的操作。



 类似资料:
  • Kubernetes是一个高度开放可扩展的架构,可以通过自定义资源类型CRD来定义自己的类型,还可以自己来扩展API服务,用户的使用方式跟Kubernetes的原生对象无异。

  • 扩展说明 当有多个服务提供方时,将多个服务提供方组织成一个集群,并伪装成一个提供方。 扩展接口 org.apache.dubbo.rpc.cluster.Cluster 扩展配置 <dubbo:protocol cluster="xxx" /> <!-- 缺省值配置,如果<dubbo:protocol>没有配置cluster时,使用此配置 --> <dubbo:provider cluster="

  • 本文档提供一个可扩展、高可用的 Seafile 集群架构。这种架构主要是面向较大规模的集群环境,可以通过增加更多的服务器来提升服务性能。如果您只需要高可用特性,请参考3节点高可用集群文档。 架构" class="reference-link"> 架构 Seafile集群方案采用了3层架构: 负载均衡层:将接入的流量分配到 seafile 服务器上。并且可以通过部署多个负载均衡器来实现高可用。 Se

  • 我目前正在一个小型集群(3个节点,32个CPU和128 GB Ram)上使用线性回归(Spark ML)中的基准测试来评估Spark 2.1.0。我只测量了参数计算的时间(不包括启动、数据加载、……)并识别了以下行为。对于小的数据集,0.1MIO-3MIO数据点,测量的时间并没有真正增加,停留在大约40秒。只有对于较大的数据集,如300个Mio数据点,处理时间才会达到200秒。因此,集群似乎根本无

  • 我有一个Spring启动应用程序,它使用来自 Kafka 集群中某个主题(例如 topic1)的消息。这就是我的代码目前的样子。 现在我想从另一个Kafka集群中的不同主题开始消费。一种方法是为此创建另一个bean。但是有更好的方法吗?

  • 我将express与socket.io一起使用,将express-session与Express-socket.io-session一起使用,这有助于将会话连接到我的套接字实例。 Deploy函数运行诸如会话、路由等内容。 POST http://localhost:8080/socket.io/?eio=3&transport=polling&t=lz9ey8p 404(未找到)