我想做的是拥有一组生产者goroutine(其中一些可能完成或可能不完成)和一个消费者例程。问题在于括号中的警告-我们不知道将返回答案的总数。
所以我想做的是:
package main
import (
"fmt"
"math/rand"
)
func producer(c chan int) {
// May or may not produce.
success := rand.Float32() > 0.5
if success {
c <- rand.Int()
}
}
func main() {
c := make(chan int, 10)
for i := 0; i < 10; i++ {
go producer(c, signal)
}
// If we include a close, then that's WRONG. Chan will be closed
// but a producer will try to write to it. Runtime error.
close(c)
// If we don't close, then that's WRONG. All goroutines will
// deadlock, since the range keyword will look for a close.
for num := range c {
fmt.Printf("Producer produced: %d\n", num)
}
fmt.Println("All done.")
}
所以问题是,如果我关闭它是错误的,如果我没有关闭-它仍然是错误的(请参见代码中的注释)。
现在,解决方案将是一个带外信号通道,所有生产者都将其写入:
package main
import (
"fmt"
"math/rand"
)
func producer(c chan int, signal chan bool) {
success := rand.Float32() > 0.5
if success {
c <- rand.Int()
}
signal <- true
}
func main() {
c := make(chan int, 10)
signal := make(chan bool, 10)
for i := 0; i < 10; i++ {
go producer(c, signal)
}
// This is basically a 'join'.
num_done := 0
for num_done < 10 {
<- signal
num_done++
}
close(c)
for num := range c {
fmt.Printf("Producer produced: %d\n", num)
}
fmt.Println("All done.")
}
这完全可以满足我的需求!但是在我看来,这似乎是满口的。我的问题是:是否有任何习语/技巧可以让我以更简单的方式做类似的事情?
我在这里查看了一下:http :
//golang.org/doc/codewalk/sharemem/
似乎complete
chan(在开头main
)已在一定范围内使用,但从未关闭。我不明白如何。
如果有人有任何见解,我将不胜感激。干杯!
编辑:fls0815有答案,并且还回答了近距离通道范围如何工作的问题。
我上面的代码修改为可以工作(在fls0815之前提供的代码之前完成):
package main
import (
"fmt"
"math/rand"
"sync"
)
var wg_prod sync.WaitGroup
var wg_cons sync.WaitGroup
func producer(c chan int) {
success := rand.Float32() > 0.5
if success {
c <- rand.Int()
}
wg_prod.Done()
}
func main() {
c := make(chan int, 10)
wg_prod.Add(10)
for i := 0; i < 10; i++ {
go producer(c)
}
wg_cons.Add(1)
go func() {
for num := range c {
fmt.Printf("Producer produced: %d\n", num)
}
wg_cons.Done()
} ()
wg_prod.Wait()
close(c)
wg_cons.Wait()
fmt.Println("All done.")
}
只有生产者应该关闭渠道。您可以通过range
在创建生产商之后调用在结果通道上进行迭代()的消费者来实现您的目标。在您的主线程中,您等待(请参阅sync.WaitGroup
),直到您的消费者/生产者完成工作为止。生产者完成后,您关闭生成的通道,这将迫使您的消费者退出(range
在通道关闭且没有剩余缓冲项目时退出)。
示例代码:
package main
import (
"log"
"sync"
"time"
"math/rand"
"runtime"
)
func consumer() {
defer consumer_wg.Done()
for item := range resultingChannel {
log.Println("Consumed:", item)
}
}
func producer() {
defer producer_wg.Done()
success := rand.Float32() > 0.5
if success {
resultingChannel <- rand.Int()
}
}
var resultingChannel = make(chan int)
var producer_wg sync.WaitGroup
var consumer_wg sync.WaitGroup
func main() {
rand.Seed(time.Now().Unix())
for c := 0; c < runtime.NumCPU(); c++ {
producer_wg.Add(1)
go producer()
}
for c := 0; c < runtime.NumCPU(); c++ {
consumer_wg.Add(1)
go consumer()
}
producer_wg.Wait()
close(resultingChannel)
consumer_wg.Wait()
}
我将close
-statement放入主函数的原因是因为我们有多个生产者。在上面的示例中,关闭一个生产者中的通道会导致您已经遇到的问题(在封闭的通道上写;原因是可能还有一位生产者留下来仍在生产数据)。仅当没有生产者时才应关闭通道(因此,我建议仅由生产者关闭通道)。这就是Go中构建通道的方式。在这里,您会找到有关关闭频道的更多信息。
与sharemem示例相关:AFAICS通过一次又一次地对资源进行排队(从挂起->完整->挂起->完整…依次类推),使该示例无休止地运行。这就是main-
func末尾的迭代。它接收完成的资源,并使用Resource.Sleep()将它们重新排队以待处理。当没有完成的资源时,它将等待并阻止新资源完成。因此,由于通道一直在使用中,因此无需关闭通道。
我正在处理一个kafka用例,在这个用例中,我需要在生产者和消费者端具有事务性语义...我可以使用kafka transaction API 0.11将事务性消息发布到kafka集群,但在消费者端,我面临着一个问题...我在属性文件中设置了但我不能使用它...我可以看到消息被使用但这不是希望的... 生产者代码 ProducerTX.Properties 消费者 感谢你的帮助..谢谢
本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要
生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f
所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职
我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统
我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?