当前位置: 首页 > 面试题库 >

如何使用频道广播消息

司寇飞航
2023-03-14
问题内容

我是新手,我正在尝试创建一个简单的聊天服务器,客户端可以在其中向所有连接的客户端广播消息。

在我的服务器中,我有一个goroutine(无限循环),它接受连接,并且所有连接都由一个通道接收。

go func() {
    for {
        conn, _ := listener.Accept()
        ch <- conn
        }
}()

然后,我为每个连接的客户端启动一个处理程序(goroutine)。在处理程序内部,我尝试通过迭代通道来广播到所有连接。

for c := range ch {
    conn.Write(msg)
}

但是,我无法广播,因为(我认为通过阅读文档)通道需要在迭代之前关闭。我不确定何时应该关闭频道,因为我要不断接受新的连接,而关闭频道将不允许我这样做。如果有人可以帮助我,或者提供更好的方法向所有连接的客户端广播消息,将不胜感激。


问题答案:

您正在执行的是扇出模式,也就是说,多个端点正在侦听单个输入源。这种模式的结果是,只要输入源中有消息,这些侦听器中只有一个能够获取消息。唯一的例外是close渠道。close所有的听众都将认识到这一点,因此是“广播”。

但是您想要做的是广播从连接读取的消息,因此我们可以执行以下操作:

何时知道听众人数

让每个工作人员收听专用广播频道,并将消息从主频道分发到每个专用广播频道。

type worker struct {
    source chan interface{}
    quit chan struct{}
}

func (w *worker) Start() {
    w.source = make(chan interface{}, 10) // some buffer size to avoid blocking
    go func() {
        for {
            select {
            case msg := <-w.source
                // do something with msg
            case <-quit: // will explain this in the last section
                return
            }
        }
    }()
}

然后我们可能会有很多工人:

workers := []*worker{&worker{}, &worker{}}
for _, worker := range workers { worker.Start() }

然后启动我们的监听器:

go func() {
for {
    conn, _ := listener.Accept()
    ch <- conn
    }
}()

和调度员:

go func() {
    for {
        msg := <- ch
        for _, worker := workers {
            worker.source <- msg
        }
    }
}()

未知的听众人数

在这种情况下,上面给出的解决方案仍然有效。唯一的区别是,每当需要新工作人员时,都需要创建一个新工作人员,将其启动,然后将其推入workers切片。但是此方法需要一个线程安全的切片,该切片周围需要一个锁。一种实现可能如下所示:

type threadSafeSlice struct {
    sync.Mutex
    workers []*worker
}

func (slice *threadSafeSlice) Push(w *worker) {
    slice.Lock()
    defer slice.Unlock()

    workers = append(workers, w)
}

func (slice *threadSafeSlice) Iter(routine func(*worker)) {
    slice.Lock()
    defer slice.Unlock()

    for _, worker := range workers {
        routine(worker)
    }
}

每当您想开始工作时:

w := &worker{}
w.Start()
threadSafeSlice.Push(w)

您的调度员将更改为:

go func() {
    for {
        msg := <- ch
        threadSafeSlice.Iter(func(w *worker) { w.source <- msg })
    }
}()

遗言:永远不要离开悬空的goroutine

好的做法之一是:永远不要离开悬空的goroutine。因此,当您听完后,需要关闭所有触发的goroutine。这将通过以下quit渠道进行worker

首先,我们需要创建一个全局quit信令通道:

globalQuit := make(chan struct{})

每当我们创建一个worker时,我们都会为其分配globalQuit通道作为其退出信号:

worker.quit = globalQuit

然后,当我们要关闭所有工作程序时,我们只需执行以下操作:

close(globalQuit)

由于close所有侦听的goroutine都可以识别(这是您理解的重点),因此将返回所有goroutine。记住也要关闭调度程序,但我会留给您:)



 类似资料:
  • 我正在使用Laravel-Redis-Socketio-LaravelEcho进行实时通知。到目前为止,我能够广播到公共频道,但坚持与私人频道。 我的bootstrap.js: 拉威尔回声服务器。json: 在我的notification类中,我将broadcastOn()设置为: 我的客户是: 在公共频道中一切正常,但在私有频道中,启动laravel-echo-server后,命令行显示以下内容

  • 我想广播消息本地到许多应用程序。对于这一点,我认为UDP套接字是最好的IPC,纠正我,如果我是Worwn。 并倾听: 问题是我必须像这样通过IP192.168.1.255,但在实际场景中可能没有eth0接口,只有环回。那我怎么才能做到这一点呢?

  • 我重用了一个snipe命令代码来执行这个fetch命令,但这并不是我的问题。 我正试图从一个频道获取一条消息,并将其发布到指定的频道中,例如:在X中抓取消息,并将其发布到Y中。如果这有意义,那么到目前为止,我所拥有的只有: 非常感谢您的帮助! ps:到现在为止,它从它运行的命令的通道中获取消息。如果我在X通道中发送了一条消息,并在X通道中运行该命令,它将在X通道中获取消息。我的目标是尝试从一个频道

  • 我已经为我的应用程序设置了Pusher和Laravel Echo,以便在某些事件触发时通知用户。 我已经测试过,看看设置是否通过在“公共频道”上播放而工作,并成功地看到这是工作。 以下是活动本身: 公共频道: app/resources/assets/js/bootstrap。js: 和Laravel回声注册:(它是在我的主布局的头部部分,事件射击视图延伸。) 现在,这种设置适用于公共频道广播,但

  • 我正在尝试使用webrtc和socket通过getUserMedia()获取音频。io将其发送到服务器(socket.io支持音频、视频、二进制数据),然后服务器将其广播到所有连接的客户端。问题是,当流到达连接的客户端时,它会转换为JSON对象,而不是媒体流对象。所以我无法发送音频,我也尝试了套接字。io流模块,但我认为未成功。您能帮我正确捕获音频流并将其发送到所有连接的客户端吗。 下面是发送数据