当前位置: 首页 > 面试题库 >

多个生产者,一个消费者:所有goroutine都在睡眠中-死锁

年运珧
2023-03-14
问题内容

在进行工作之前,我一直遵循一种检查通道中是否有东西的模式:

func consume(msg <-chan message) {
  for {
    if m, ok := <-msg; ok {
      fmt.Println("More messages:", m)
    } else {
      break
    }
  }
}

这是基于这部视频的。这是我的完整代码:

package main

import (
    "fmt"
    "strconv"
    "strings"
    "sync"
)

type message struct {
    body string
    code int
}

var markets []string = []string{"BTC", "ETH", "LTC"}

// produces messages into the chan
func produce(n int, market string, msg chan<- message, wg *sync.WaitGroup) {
    // for i := 0; i < n; i++ {
    var msgToSend = message{
        body: strings.Join([]string{"market: ", market, ", #", strconv.Itoa(1)}, ""),
        code: 1,
    }
    fmt.Println("Producing:", msgToSend)
    msg <- msgToSend
    // }
    wg.Done()
}

func receive(msg <-chan message, wg *sync.WaitGroup) {
    for {
        if m, ok := <-msg; ok {
            fmt.Println("Received:", m)
        } else {
            fmt.Println("Breaking from receiving")
            break
        }
    }
    wg.Done()
}

func main() {
    wg := sync.WaitGroup{}
    msgC := make(chan message, 100)
    defer func() {
        close(msgC)
    }()
    for ix, market := range markets {
        wg.Add(1)
        go produce(ix+1, market, msgC, &wg)
    }
    wg.Add(1)
    go receive(msgC, &wg)
    wg.Wait()
}

如果您尝试运行它,则在打印将要中断的消息之前,我们最终将陷入僵局。自上次以来,当chan中没有其他内容时,tbh才有意义,因此我们试图拉出该值,因此出现此错误。但是这样的模式是不可行的if m, ok := <- msg; ok。我如何使此代码起作用以及为什么会出现此死锁错误(大概此模式应该起作用?)。


问题答案:

鉴于您在一个频道上确实有多个作者,因此您会遇到一些挑战,因为在Go中执行此操作的简单方法通常是在一个频道上拥有一个作者,然后让该作者关闭频道。发送最后一个数据时的通道:

func produce(... args including channel) {
    defer close(ch)
    for stuff_to_produce {
        ch <- item
    }
}

这种模式具有很好的特性,无论您如何退出produce,通道都会关闭,从而指示生产已结束。

您没有使用这种模式,而是向多个goroutine传递了一个通道,每个goroutine都可以发送 一条
消息,因此您需要移动close(当然,也可以使用其他模式)。表达所需模式的最简单方法是:

func overall_produce(... args including channel ...) {
    var pg sync.WaitGroup
    defer close(ch)
    for stuff_to_produce {
        pg.Add(1)
        go produceInParallel(ch, &pg) // add more args if appropriate
    }
    pg.Wait()
}

pg计数器累计活跃的生产者。每个调用都必须使用pg.Done()来表明已完成ch。总制片人现在等待他们全部完成,那么
关闭的道路上走出通道。

(如果将内部produceInParallel函数编写为闭包,则无需显式传递ch并传递pg给它。也可以将其编写overallProducer为闭包。)

请注意,单个使用者的循环可能最好使用以下for ... range结构来表示:

func receive(msg <-chan message, wg *sync.WaitGroup) {
    for m := range msg {
        fmt.Println("Received:", m)
    }
    wg.Done()
}

(您提到了select向循环添加a的意图,以便在消息尚未准备好时可以执行其他一些计算。如果无法将该代码分解为独立的goroutine,则实际上您将需要更高级的m, ok := <-msg构造。)

还要注意,wgfor
receive(取决于您构造其他事物的方式可能是不必要pg的)与生产者的等待组非常独立。诚然,按照书面说明,只有在所有生产者都完成之后才能完成消费者的工作,但我们希望独立等待生产者完成,以便我们可以关闭整体生产者包装中的渠道。



 类似资料:
  • 我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。

  • 问题内容: 给出以下简单的Go程序 我想知道是否有人可以启发我 谢谢 问题答案: 由于您从不关闭通道,因此范围循环将永远不会结束。 您不能在同一频道上发送结果。一种解决方案是使用其他解决方案。 您的程序可以这样修改:

  • 我有一个使用ActiveMQ的消息队列。web请求用persistency=true将消息放入队列。现在,我有两个消费者,它们都作为单独的会话连接到这个队列。使用者1总是确认消息,但使用者2从不这样做。 JMS队列实现负载平衡器语义。一条消息将被一个使用者接收。如果在发送消息时没有可用的使用者,它将被保留,直到有可以处理消息的使用者可用为止。如果使用者接收到一条消息,但在关闭之前没有确认它,那么该

  • 我有一个生产者/消费者场景,我不希望一个生产者交付产品,多个消费者消费这些产品。然而,常见的情况是,交付的产品只被一个消费者消费,而其他消费者从未看到过这个特定的产品。我不想实现的是,一个产品被每个消费者消费一次,而没有任何形式的阻碍。 我的第一个想法是使用多个BlockingQueue,每个消费者使用一个,并使生产者将每个产品按顺序放入所有可用的BlockingQueues中。但是,如果其中一个

  • 问题内容: 因此,我已经看到了许多在Go中实现一个消费者和许多生产者的方法-Go 并发中的经典fanIn函数。 我想要的是fanOut功能。它以一个通道作为参数,它从中读取一个值,并返回一个通道片,该通道将这个值的副本写入其中。 有没有正确/推荐的方法来实现这一目标? 问题答案: 您几乎描述了执行此操作的最佳方法,但这是执行此操作的一小段代码示例。 去游乐场:https : //play.gola

  • 问题内容: 对于我的要求之一,我必须创建N个工作程序例程,该例程将由一个监视程序监视。所有工作程序完成后,监视程序必须结束。我的代码以死锁结尾,请帮忙。 问题答案: 您的monitorWorker永不死。当所有工作人员完成时,它将继续等待cs。这将导致死锁,因为其他任何东西都不会继续发送给CS,因此wg永远不会达到0。一种可能的解决方法是,当所有工作程序完成时,让监视器关闭通道。如果for循环在m