我有一个服务器,它在收到请求时需要使用goroutines从不同的流中读取消息,将它们发送到父goroutine,父goroutine将消息聚合并将它们发送到客户端。所以会是这样的:
Client --> API --> handler --> worker1
|--> worker2
|--> worker3
我使用不同的通道在所有这些goroutine之间进行通信,工人可以写到聚合通道,处理程序(父goroutine)可以接收它并将其发送回客户机,所以一切都正常工作,几乎是:)
问题是我的工人没有收到由通道上的处理程序发送的消息,工人的goroutines从未停止,但我找不到他们为什么没有收到消息。
INFO[0010] Request.Context().Done triggered
INFO[0010] **Sending true event to loglistenmgm: 0
我的处理程序:
func handler() {
logStream := make(chan string)
done := make(chan bool, 5)
loglistenmgm := make(chan bool, numberOfPods)
errorStream := make(chan error)
for _, pod := range pods {
go getOnePodLogs(logStream, errorStream, loglistenmgm, done)
}
go func() {
for {
select {
case <-c.Request.Context().Done():
log.Info("Request.Context().Done triggered")
for i := 0; i < numberOfPods; i++ {
log.Info("**Sending true event to loglistenmgm: " + strconv.Itoa(i))
loglistenmgm <- true
log.Info("**After sending true event to loglistenmgm") // This gets printed so the true has been sent to the channel
}
done <- true
return
case err := <-errorStream:
c.JSON(http.StatusInternalServerError, map[string]string{
"message": "Internal Server Error: " + err.Error(),
})
for i := 0; i < numberOfPods; i++ {
loglistenmgm <- true
}
done <- true
return
}
}
}()
isStreaming := c.Stream(func(w io.Writer) bool {
for {
select {
case <-done:
c.SSEvent("end", "end") // This also works properly and it can print the "stream closed" which happens after this
return false
case msg := <-logStream:
c.Render(-1, sse.Event{
Event: "message",
Data: msg,
})
return true
}
}
})
if !isStreaming {
log.Info("stream closed")
}
}
我的工人:
func getOnePodLogs(logStream chan string, errorStream chan error, loglistenmgm chan bool, done chan bool) {
stream, err := podLogRequest.Stream()
defer stream.Close()
if err != nil {
log.Error(err.Error())
errorStream <- err
return
}
for {
select {
case <-loglistenmgm:
log.Info(pod + "stop listenning to logs") // this log line never get triggered
return
default:
buf := make([]byte, 1000)
numBytes, err := stream.Read(buf)
if numBytes == 0 {
log.Info(pod + ": numBytes == 0 --> End of log")
done <- true
return
}
if err == io.EOF {
log.Info("io.EOF")
return
}
if err != nil {
log.Error("Error getting stream.Read(buf)")
log.Error(err)
return
}
message := string(buf[:numBytes])
logStream <- message // This works and I can receive the message on handler and it can pass it to the client
}
}
}
由于https://stackoverflow.com/users/32880/jimb指出问题在于stream.read
并阻止了select
,所以我找到了问题并解决了它。
stream.read
是频率较低的IO.readcloser
,因此当我将true
事件发送到loglistenmgm
通道时,工作人员正在等待在stream.read
上接收新消息,无法从loglistenmgm
通道读取。我通过在处理程序关闭几分钟后向stream.read
发送一些消息来确认这一点,然后工作人员可以从通道读取并退出。
我通过更改程序解决了这个问题,我没有在worker中创建流
,而是在处理程序中创建它们并将它们传递给worker,当处理程序完成后,我关闭所有流,这将触发numbytes==0
并关闭worker。
问题内容: 我在Go中有一个返回两个值的函数。我想将其作为goroutine运行,但是我无法弄清楚创建接收两个值的通道的语法。有人能指出我正确的方向吗? 问题答案: 使用两个值的字段定义自定义类型,然后创建该类型的。 编辑:我还添加了一个使用多个通道而不是自定义类型的示例(在底部)。我不确定哪个更惯用。 例如: 然后 使用自定义类型的频道(Playground)的示例: 产生: LOREM,5 I
我是新来的,想知道一些很基本的问题,我不能弄清楚。 为了发挥作用(对实际需要的抽象),我需要: 用常数固定的元素初始化字符串片断 遍历此切片并为每个元素运行一个goroutine 每个goroutine将花费一定的时间来处理元素(随机秒持续时间) 作业完成后,我希望goroutine将结果推送到一个通道 然后我需要捕获来自这个通道的所有结果(在一个从主goroutine调用的函数中),将它们附加到
我正在构建一个带有操作系统托盘菜单的电子应用程序。 在主进程中,我希望听到对“About”菜单项的单击,然后通知渲染器进程,以便它可以相应地更新窗口的视图。 以下是渲染窗口和任务栏菜单时的主要流程部分: 下面是应该侦听通道消息的渲染器脚本: 当我点击托盘中的“关于”菜单时,我从主进程中获得console.log输出,但我从未在渲染器进程中收到console.log或警报。 我的应用程序只有一个窗口
问题内容: 我正在尝试实现字数统计程序,但是第一步我遇到了一些问题。 这是我的代码: 这是我的输出: 谢谢 ! 问题答案: 当主goroutine退出时,程序终止,因此没有时间做任何事情。您需要封锁直到完成。通道可用于此目的:
我试图对goroutines做一个循环,它接受一个接收字符串的通道,每次接收它时,它都应该将值追加到另一个字符串。只有在所有goroutine结束时(goroutine计数应该是传入的的长度),代码才应该继续。
最后,在中有一个子组件,名为。这就是问题所在。我还将从获得的项传递到。因此,数据流是(连接)>(未连接)>(连接)。 连接到Redux。它使用redux中的一些操作来更新中的项。当我在中更新时,可以很好地接收更新后的数据,但是却什么都接收不到。中的项从未更新。 我发现问题出在将连接到Redux时。如果没有连接到redux,它也会接收更新的道具,就像一样。 组件如下。我已经试着尽可能地简化它们。 D