当前位置: 首页 > 知识库问答 >
问题:

重复的数据发送到事件机器pubsub用户

韦泳
2023-03-14

堆栈:Ruby 2.3。1,机架,薄型

简单websocket服务器:

require 'redis'
require 'em-hiredis'
require 'faye/websocket'
require 'json'

ws_channel = {}

App = lambda do |env|
$redis ||= EM::Hiredis.connect('redis://127.0.0.1:6379')

if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil,
                         headers: {'Access-Control-Allow-Origin' => '*'},
                         ping: 15
)

ws.on :open do |event|
  puts 'client connected'
  query_string = event.current_target.env['REQUEST_PATH'].gsub(/[^a-z0-9\-_\/]/, '')
  ws_channel[query_string] ||= EM::Channel.new

  pubsub = $redis.pubsub

  puts "subscribing to ws channel: ws:#{query_string}"
  sid = ws_channel[query_string].subscribe do |msg|
    puts "WS -> ws:#{query_string}/ #{sid} #{ws_channel[query_string]}"
    ws.send msg
  end

  puts "subscribing to redis: #{query_string}"
  pubsub.subscribe(query_string) do |msg|
    puts "REDIS -> ws:#{query_string}/"
    $redis.setex(query_string, 60, msg)
    ws_channel[query_string].push msg
  end

  EventMachine.add_periodic_timer(5) do
    ws.send ({ :ts => Time.now.to_i}.to_json) if ws
  end

  ws.on :close do |event|
    puts "client ##{query_string} disconnected"
    pubsub.unsubscribe(query_string) if pubsub
    ws_channel[query_string].unsubscribe(sid) if ws_channel[query_string]
    ws = nil
    pubsub = nil
  end
end

ws.rack_response
end
end

配置。ru:

require 'rubygems'
require 'bundler/setup'
require 'logger'
require File.expand_path('../app', __FILE__)
Faye::WebSocket.load_adapter('thin')

run App

要启动服务器,请执行以下操作:

bundle exec thin -p 9292 -R config.ru start

发行条件:

  1. 从同一IP建立到同一WS通道的多个连接(多个浏览器选项卡在同一台计算机上打开了相同的游戏)。
  2. 来自WS服务器的单个数据推送导致数据到达每个订阅服务器的次数与订阅用户的次数相同。
  3. 如果其中一个选项卡被刷新(到WS服务器的连接关闭并重新打开),随后的数据推送不会导致数据重复。
  4. 当建立了一个新的连接时,来自#2的场景会再次发生。

我的修复方法是在连接打开时取消订阅/重新订阅。因此:

pubsub = $redis.pubsub
pubsub.unsubscribe(query_string) if pubsub
pubsub = $redis.pubsub

但这带来了另一个问题:当一个选项卡关闭时,数据停止到达其他选项卡大约30秒。WS连接从未关闭,我可以在JS控制台中看到5秒的ping。

redis-cli $> PUBSUB NUMSUB <channel> 
  • 这只显示对频道的一个订阅,而不管有多少订阅者订阅了该频道

目标功能:

  1. 多个客户端(订户)从同一IP连接到同一通道
  2. 每个订阅服务器接收WS-server推送的数据的一个副本
  3. 客户端断开连接/新客户端连接不会导致其他客户端的任何服务中断

共有2个答案

景英杰
2023-03-14

编辑

我认为em hiredis实际上为Redis订阅回收了相同的连接和线程。。。我不确定情况是否如此,但如果是这样的话,那么下面的答案可能是过分的。

我仍然建议遵循建议的设计,因为它仍然可以节省大量资源。

原件:

我不确定我是否理解目标功能,但我相信代码中反映了一个设计问题。我也相信解决这个问题会产生正确的行为。

虽然这是由EM层抽象出来的,但每个Redis订阅客户端需要一个新线程和一个新TCP/IP连接(到Redis服务器)。自从我读了这篇文章的代码库后,事情可能发生了变化,但不知何故,我对此表示怀疑。。。

... 新线程(主要是Hibernate线程)不像新进程那样昂贵(我认为每个线程的成本略高于1Mib,主要用于堆栈分配),TCP/IP连接有限。

即使这不是一个问题(我假设Redis连接器的更新版本会解决这个问题),让应用程序从多个TCP/IP连接读取仍然有些资源浪费(?)应用程序已经拥有的数据的副本。

更好的设计将:

>

  • 为应用程序中的所有事件创建全局Redis通道。此全局通道可用于跨进程发布应用程序范围的广播。

    为每个应用程序进程创建一个私有Redis通道。此通道可用于特定Websocket客户端之间的直接通信(使用其主机进程)。

    为每个WebSocket客户端分配一个进程唯一ID(可以是一个简单的分子)。

    与进程特定的UUID通道一起,此本地ID将允许每个Websocket连接具有唯一的全局标识符(进程UUID是“拥有”连接的进程的通道)。

    创建一个单一线程(每个应用程序进程),侦听两个通道(全局和专用通道),并将消息“分派”到进程中的最终目的地。

    “dispatcher”可能应该忽略来自自己进程的任何消息,以防止双重处理(这允许单进程应用程序避免使用Redis)。

    这个设计是我在Plezi框架中实现的,它的资源效率允许Plezi同时为大量客户服务。

    这里有一个使用Plezi的快速示例,因为我不太擅长Faye/EM应用编程接口。将此示例保存为config.ru文件,并使用iodine(或rackup)命令从shell运行:

    require 'plezi'
    require 'redis'
    # uncomment and update URL
    # ENV['PL_REDIS_URL'] = "redis://my.host:6389/0"
    
    class TimeAndEcho
      def index
        "return the client page using `:render` or as a simple string".freeze
      end
      def on_open
        # `id` is inherited from Plezi using a Controller module mixin
        puts "New connection with UUID (process+client): #{id}"
      end
      def on_message data
        # The data is sent to everyone EXCEPT self:
        broadcast :handle_message, data
        write "sent"
      end
      # broadcasting invokes a non-inherited method.. we will simply write the info
      def handle_message data
        # write is "inherited" using a module mixin when the class is used as a Websocket controller.
        write data
      end
    end
    
    Plezi.route '/', TimeAndEcho
    # Idione's timer is in milliseconds.
    Iodine.run_every(5000) do
      TimeAndEcho. broadcast(:handle_message, { :ts => Time.now.to_i}.to_json)
    end
    # Set the Rack application
    run Plezi.app
    

    在本例中,上述调度逻辑由Plezi框架使用MessageDispatch模块实现,该模块使用Redis和发布/子线程发送和接收消息。

    请注意,运行此示例需要一个POSIX系统(Linux/macOS/BSD)和ioid服务器。

    Idium实施了拟议的Websocket机架规范草案,出于性能原因,Plezi使用了这种原生Websocket设计。

  • 颜宸
    2023-03-14

    每个WS连接创建唯一的EM通道,并取消订阅ws.close上的特定程序,似乎可以完成这项工作:

    require 'redis'
    require 'em-hiredis'
    require 'faye/websocket'
    require 'json'
    
    App = lambda do |env|
    $redis ||= EM::Hiredis.connect('redis://127.0.0.1:6379')
    $pubsub ||= $redis.pubsub
    
    if Faye::WebSocket.websocket?(env)
    ws = Faye::WebSocket.new(env, nil,
                             headers: {'Access-Control-Allow-Origin' => '*'},
                             ping: 15
    )
    
    ws.on :open do |event|
      puts 'client connected'
    
      query_string = event.current_target.env['REQUEST_PATH'].gsub(/[^a-z0-9\-_\/]/, '')
      channel = EM::Channel.new
    
      puts "subscribing to ws channel: ws:#{query_string}"
      sid = channel.subscribe do |msg|
        puts "WS -> ws:#{query_string}/ #{sid} #{channel}"
        ws.send msg
      end
    
      puts "subscribing to redis: #{query_string}"
      subs = {}; r_callback = rand(Time.now.to_i)
      subs[r_callback] = Proc.new { |msg|
        puts "REDIS -> ws:#{query_string}/"
        $redis.setex(query_string, 60, msg)
        channel.push msg
      }
      $pubsub.subscribe(query_string, subs[r_callback])
      #puts $pubsub.inspect
    
      ws.on :close do |event|
        puts "client ##{query_string} disconnected"
        $pubsub.unsubscribe_proc(query_string, subs[r_callback]) if $pubsub
        puts "Unsubscribed proc: #{subs[r_callback]}"
        channel.unsubscribe(sid) if channel
        ws = nil
      end
    end
    
    ws.rack_response
    end
    end
    
     类似资料:
    • 你好,我想把数据从一个html页面发送到另一个html页面 这是我的index.html页面 这是newpage.html:

    • 我有这个问题,其中由于某种原因,我不能发送数据到另一个PHP脚本与POST通过API。 我已经检查了其他解决方案并清理了我的代码,但它仍然无法工作。 也许我错过了什么。 这是我的密码: 印刷品: 数组(0){} 我还测试了在PHP中解析Javascript读取中显示的解决方案,但它没有解决问题。我仍然没有在我的PHP脚本中获得POST数据。

    • 我想将数据从活动发送到片段,但我提交的片段中的数据为空。我不知道为什么。这是我发送数据并调用片段时的代码。 这是代码,当我检索数据发送: 有人能帮忙吗?

    • 我有问题通过php mail()函数发送邮件消息。我不确定这是不是代码的问题,因为我读到一些主机服务器不允许发送邮件,但我也试图发送这封邮件,当网站是在本地主机,但它仍然不工作-点击“发送”后,我看到信息:“您的邮件已发送”,但当我检查我的邮筒没有邮件(也是垃圾邮件)。 对我来说,代码看起来不错,但也许我错过了什么。我正在考虑的第二个选择是,我的本地主机也不允许发送邮件。 我的php conf图片

    • 为什么每个站点都解释说,在SSE中,客户端和服务器之间只有一个连接保持打开状态“在SSE中,客户端发送一个标准的HTTP请求,请求一个事件流,服务器最初以标准的HTTP响应进行响应,并保持连接打开” 然后,当服务器决定它可以向客户机发送数据时,我正在尝试实现SSE,我会看到每隔几秒钟发送一次fiddler请求 对我来说,这感觉像是长时间的轮询,没有一个连接保持打开。

    • 概述 客户端代码 概述 建立连接 open事件 message事件 error事件 自定义事件 close方法 数据格式 概述 data:数据栏 id:数据标识符 event栏:自定义信息类型 retry:最大间隔时间 服务器代码 参考链接 概述 传统的网页都是浏览器向服务器“查询”数据,但是很多场合,最有效的方式是服务器向浏览器“发送”数据。比如,每当收到新的电子邮件,服务器就向浏览器发送一个“