当前位置: 首页 > 知识库问答 >
问题:

子goroutine不从父通道接收消息

彭存
2023-03-14

我有一个服务器,它在收到请求时需要使用goroutines从不同的流中读取消息,将它们发送到父goroutine,父goroutine将消息聚合并将它们发送到客户端。所以会是这样的:

Client --> API --> handler --> worker1
                          |--> worker2
                          |--> worker3

我使用不同的通道在所有这些goroutine之间进行通信,工人可以写到聚合通道,处理程序(父goroutine)可以接收它并将其发送回客户机,所以一切都正常工作,几乎是:)

问题是我的工人没有收到由通道上的处理程序发送的消息,工人的goroutines从未停止,但我找不到他们为什么没有收到消息。

INFO[0010] Request.Context().Done triggered             
INFO[0010] **Sending true event to loglistenmgm: 0

我的处理程序:

func handler() {
        logStream := make(chan string)
        done := make(chan bool, 5)
        loglistenmgm := make(chan bool, numberOfPods)
        errorStream := make(chan error)

        for _, pod := range pods {
            go getOnePodLogs(logStream, errorStream, loglistenmgm, done)
        }

        go func() {
            for {
                select {
                case <-c.Request.Context().Done():
                    log.Info("Request.Context().Done triggered")
                    for i := 0; i < numberOfPods; i++ {
                        log.Info("**Sending true event to loglistenmgm: " + strconv.Itoa(i))
                        loglistenmgm <- true
                        log.Info("**After sending true event to loglistenmgm") // This gets printed so the true has been sent to the channel
                    }
                    done <- true
                    return
                case err := <-errorStream:
                    c.JSON(http.StatusInternalServerError, map[string]string{
                        "message": "Internal Server Error: " + err.Error(),
                    })
                    for i := 0; i < numberOfPods; i++ {
                        loglistenmgm <- true
                    }
                    done <- true
                    return
                }
            }
        }()

        isStreaming := c.Stream(func(w io.Writer) bool {
            for {
                select {
                case <-done:
                    c.SSEvent("end", "end") // This also works properly and it can print the "stream closed" which happens after this
                    return false
                case msg := <-logStream:
                    c.Render(-1, sse.Event{
                        Event: "message",
                        Data:  msg,
                    })
                    return true
                }
            }
        })
        if !isStreaming {
            log.Info("stream closed")
        }
}

我的工人:

func getOnePodLogs(logStream chan string, errorStream chan error, loglistenmgm chan bool, done chan bool) {
    stream, err := podLogRequest.Stream()
    defer stream.Close()

    if err != nil {
        log.Error(err.Error())
        errorStream <- err
        return
    }

    for {
        select {
        case <-loglistenmgm:
            log.Info(pod + "stop listenning to logs") // this log line never get triggered
            return
        default:
            buf := make([]byte, 1000)
            numBytes, err := stream.Read(buf)
            if numBytes == 0 {
                log.Info(pod + ": numBytes == 0 --> End of log")
                done <- true
                return
            }
            if err == io.EOF {
                log.Info("io.EOF")
                return
            }
            if err != nil {
                log.Error("Error getting stream.Read(buf)")
                log.Error(err)
                return
            }
            message := string(buf[:numBytes])
            logStream <- message // This works and I can receive the message on handler and it can pass it to the client
        }
    }

}

共有1个答案

郝哲茂
2023-03-14

由于https://stackoverflow.com/users/32880/jimb指出问题在于stream.read并阻止了select,所以我找到了问题并解决了它。

stream.read是频率较低的IO.readcloser,因此当我将true事件发送到loglistenmgm通道时,工作人员正在等待在stream.read上接收新消息,无法从loglistenmgm通道读取。我通过在处理程序关闭几分钟后向stream.read发送一些消息来确认这一点,然后工作人员可以从通道读取并退出。

我通过更改程序解决了这个问题,我没有在worker中创建,而是在处理程序中创建它们并将它们传递给worker,当处理程序完成后,我关闭所有流,这将触发numbytes==0并关闭worker。

 类似资料:
  • 问题内容: 我在Go中有一个返回两个值的函数。我想将其作为goroutine运行,但是我无法弄清楚创建接收两个值的通道的语法。有人能指出我正确的方向吗? 问题答案: 使用两个值的字段定义自定义类型,然后创建该类型的。 编辑:我还添加了一个使用多个通道而不是自定义类型的示例(在底部)。我不确定哪个更惯用。 例如: 然后 使用自定义类型的频道(Playground)的示例: 产生: LOREM,5 I

  • 我是新来的,想知道一些很基本的问题,我不能弄清楚。 为了发挥作用(对实际需要的抽象),我需要: 用常数固定的元素初始化字符串片断 遍历此切片并为每个元素运行一个goroutine 每个goroutine将花费一定的时间来处理元素(随机秒持续时间) 作业完成后,我希望goroutine将结果推送到一个通道 然后我需要捕获来自这个通道的所有结果(在一个从主goroutine调用的函数中),将它们附加到

  • 我正在构建一个带有操作系统托盘菜单的电子应用程序。 在主进程中,我希望听到对“About”菜单项的单击,然后通知渲染器进程,以便它可以相应地更新窗口的视图。 以下是渲染窗口和任务栏菜单时的主要流程部分: 下面是应该侦听通道消息的渲染器脚本: 当我点击托盘中的“关于”菜单时,我从主进程中获得console.log输出,但我从未在渲染器进程中收到console.log或警报。 我的应用程序只有一个窗口

  • 问题内容: 我正在尝试实现字数统计程序,但是第一步我遇到了一些问题。 这是我的代码: 这是我的输出: 谢谢 ! 问题答案: 当主goroutine退出时,程序终止,因此没有时间做任何事情。您需要封锁直到完成。通道可用于此目的:

  • 我试图对goroutines做一个循环,它接受一个接收字符串的通道,每次接收它时,它都应该将值追加到另一个字符串。只有在所有goroutine结束时(goroutine计数应该是传入的的长度),代码才应该继续。

  • 最后,在中有一个子组件,名为。这就是问题所在。我还将从获得的项传递到。因此,数据流是(连接)>(未连接)>(连接)。 连接到Redux。它使用redux中的一些操作来更新中的项。当我在中更新时,可以很好地接收更新后的数据,但是却什么都接收不到。中的项从未更新。 我发现问题出在将连接到Redux时。如果没有连接到redux,它也会接收更新的道具,就像一样。 组件如下。我已经试着尽可能地简化它们。 D