我试图用goroutines写一个简单的工作池。
work_channel
?码:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
defer wg.Done()
sleepMs := rand.Intn(1000)
fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
o <- work + fmt.Sprintf("-%dms", sleepMs)
}
func main() {
var work_channel = make(chan string)
var results_channel = make(chan string)
// create goroutine per item in work_channel
go func() {
var c = 0
var wg sync.WaitGroup
for work := range work_channel {
wg.Add(1)
go worker(fmt.Sprintf("%d", c), work, results_channel, &wg)
c++
}
wg.Wait()
fmt.Println("closing results channel")
close(results_channel)
}()
// add work to the work_channel
go func() {
for c := 'a'; c < 'z'; c++ {
work_channel <- fmt.Sprintf("%c", c)
}
close(work_channel)
fmt.Println("sent work to work_channel")
}()
for x := range results_channel {
fmt.Printf("result: %s\n", x)
}
}
从任何意义上讲,您的解决方案都不是工作程序goroutine池:您的代码不会限制并发的goroutine,也不会“重用”
goroutines(它总是在收到新作业时启动新的goroutine)。
如在Bruteforce MD5 Password
Cracker上
所发布的,您可以使用生产者-消费者模式。您可能有一个指定的
生产者 goroutine,它将生成作业(要执行/计算的事情)并将其发送到 作业 频道。您可能有一个固定的 使用者 goroutine
池(例如其中的5个),它们会在传送作业的通道上循环,并且每个执行/完成接收到的作业。
该 制片人 够程可以简单地关闭jobs
时生成频道的所有作业,并发送正确的信令 消费者 没有更多的就业机会将到来。for ... range
通道上的构造处理“关闭”事件并正确终止。请注意,关闭通道之前发送的所有作业仍将被发送。
这将导致干净的设计,将导致固定数量(但任意)的goroutine,并且它将始终使用100%CPU(如果goroutine的数量大于CPU内核的数量)。它还具有的优点是它可以被“节流”与信道容量(缓冲信道)的数量和适当选择
消费者 够程。
请注意,这种具有指定生产者goroutine的模型不是强制性的。您可能也有多个goroutine来产生作业,但是您也必须同步它们,以便仅jobs
在所有生产者goroutine完成产生作业后才关闭通道-
否则在jobs
已关闭的通道上尝试发送另一个作业会导致运行时恐慌。通常,生产作业的价格便宜,并且可以比执行的速度快得多,因此,这种模型可以在1个goroutine中生产它们,而同时又消耗/执行许多工作,因此在实践中是很好的。
处理结果:
如果作业有结果,则可以选择在其上可以传送结果(“发送回”)的指定 结果
渠道,也可以选择在作业完成/完成时在使用者中处理结果。后者甚至可以通过具有处理结果的“回调”功能来实现。重要的是结果是否可以独立处理,还是需要合并(例如map-
reduce框架)或汇总。
如果使用results
通道,则还需要一个goroutine来接收来自该通道的值,以防止使用者被阻塞(如果的缓冲区results
将被填充,则会发生)。
results
通道string
我将创建一个包装器类型来容纳任何其他信息,而不是将简单的值作为作业和结果发送,因此它更加灵活:
type Job struct {
Id int
Work string
Result string
}
请注意,该Job
结构还会包装结果,因此当我们发回结果时,它也包含原始内容Job
作为上下文- 通常非常有用
。还要注意,仅*Job
在通道上发送指针()而不是Job
值是有利可图的,因此无需制作Job
s的“无数”副本,并且Job
struct值的大小也变得无关紧要。
这是生产者-消费者的样子:
我将使用2个sync.WaitGroup
值,它们的作用如下:
var wg, wg2 sync.WaitGroup
生产者负责生成要执行的作业:
func produce(jobs chan<- *Job) {
// Generate jobs:
id := 0
for c := 'a'; c <= 'z'; c++ {
id++
jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
}
close(jobs)
}
完成后(没有更多的工作),该jobs
通道将关闭,这表明消费者将没有更多的工作到达。
请注意,produce()
将jobs
通道视为“ 仅发送” ,因为这是生产者仅需要执行的操作:在该通道上 发送 作业( 关闭
该通道之后,但在“ 仅发送” 通道上也允许这样做)。生产者的意外接收将是编译时错误(在编译时及早发现)。
消费者的责任是在可以接收工作的同时接收工作并执行它们:
func consume(id int, jobs <-chan *Job, results chan<- *Job) {
defer wg.Done()
for job := range jobs {
sleepMs := rand.Intn(1000)
fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs)
results <- job
}
}
注意consume()
将jobs
通道视为 只接收 ; 消费者只需要从中 接收 。类似地, 仅为 消费者 发送results
频道。
__
还要注意,由于存在多个使用者goroutine, 因此无法*
在此处关闭results
通道,只有第一次尝试关闭该通道才会成功,而其他尝试会导致运行时出现恐慌!在所有使用者goroutine结束之后,可以(必须)关闭通道,因为这样我们可以确保不会在该通道上发送任何进一步的值(结果)。
*results``results
我们有需要分析的结果:
func analyze(results <-chan *Job) {
defer wg2.Done()
for job := range results {
fmt.Printf("result: %s\n", job.Result)
}
}
如您所见,只要结果可能出现(直到results
关闭通道),它也会收到结果。results
分析仪的通道 仅接收 。
请注意通道类型的使用:只要足够,就在编译时仅使用 单向 通道类型来及早发现并防止错误。如果确实需要 双向, 则仅使用 双向 通道类型。
这就是将所有这些粘合在一起的方式:
func main() {
jobs := make(chan *Job, 100) // Buffered channel
results := make(chan *Job, 100) // Buffered channel
// Start consumers:
for i := 0; i < 5; i++ { // 5 consumers
wg.Add(1)
go consume(i, jobs, results)
}
// Start producing
go produce(jobs)
// Start analyzing:
wg2.Add(1)
go analyze(results)
wg.Wait() // Wait all consumers to finish processing jobs
// All jobs are processed, no more values will be sent on results:
close(results)
wg2.Wait() // Wait analyzer to analyze all results
}
输出示例:
这是示例输出:
如您所见,在将所有作业排入队列之前,结果即将到来并得到分析:
worker #4 received: 'e', sleep 81ms
worker #0 received: 'a', sleep 887ms
worker #1 received: 'b', sleep 847ms
worker #2 received: 'c', sleep 59ms
worker #3 received: 'd', sleep 81ms
worker #2 received: 'f', sleep 318ms
result: c-59ms
worker #4 received: 'g', sleep 425ms
result: e-81ms
worker #3 received: 'h', sleep 540ms
result: d-81ms
worker #2 received: 'i', sleep 456ms
result: f-318ms
worker #4 received: 'j', sleep 300ms
result: g-425ms
worker #3 received: 'k', sleep 694ms
result: h-540ms
worker #4 received: 'l', sleep 511ms
result: j-300ms
worker #2 received: 'm', sleep 162ms
result: i-456ms
worker #1 received: 'n', sleep 89ms
result: b-847ms
worker #0 received: 'o', sleep 728ms
result: a-887ms
worker #1 received: 'p', sleep 274ms
result: n-89ms
worker #2 received: 'q', sleep 211ms
result: m-162ms
worker #2 received: 'r', sleep 445ms
result: q-211ms
worker #1 received: 's', sleep 237ms
result: p-274ms
worker #3 received: 't', sleep 106ms
result: k-694ms
worker #4 received: 'u', sleep 495ms
result: l-511ms
worker #3 received: 'v', sleep 466ms
result: t-106ms
worker #1 received: 'w', sleep 528ms
result: s-237ms
worker #0 received: 'x', sleep 258ms
result: o-728ms
worker #2 received: 'y', sleep 47ms
result: r-445ms
worker #2 received: 'z', sleep 947ms
result: y-47ms
result: u-495ms
result: x-258ms
result: v-466ms
result: w-528ms
result: z-947ms
在Go Playground上尝试完整的应用程序。
results
频道如果我们不使用results
通道,但是消费者goroutines立即处理结果(在我们的情况下将其打印),代码将大大简化。在这种情况下,我们不需要2个sync.WaitGroup
值(只需要第二个就可以等待分析器完成)。
没有results
渠道,完整的解决方案是这样的:
var wg sync.WaitGroup
type Job struct {
Id int
Work string
}
func produce(jobs chan<- *Job) {
// Generate jobs:
id := 0
for c := 'a'; c <= 'z'; c++ {
id++
jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
}
close(jobs)
}
func consume(id int, jobs <-chan *Job) {
defer wg.Done()
for job := range jobs {
sleepMs := rand.Intn(1000)
fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
fmt.Printf("result: %s\n", job.Work+fmt.Sprintf("-%dms", sleepMs))
}
}
func main() {
jobs := make(chan *Job, 100) // Buffered channel
// Start consumers:
for i := 0; i < 5; i++ { // 5 consumers
wg.Add(1)
go consume(i, jobs)
}
// Start producing
go produce(jobs)
wg.Wait() // Wait all consumers to finish processing jobs
}
输出与results
通道的输出“类似” (但执行/完成顺序当然是随机的)。
在Go Playground上尝试使用此变体。
问题内容: 在http://marcio.io/2015/07/handling-1-million-requests-per-minute-with- golang/ 提供的示例中,很多地方都引用了该示例。 分派服务完许多工作后,工人池(chan chan工作)会不会耗尽?因为从信道和信道工作拉出第一类型后没有被补充被调用的第一次?还是我想念/误读了什么?如何为WorkerPool补充可用的工作
在这个例子中,我们来看一下如何使用gorouotine和channel来实现工作池。 package main import "fmt" import "time" // 我们将在worker函数里面运行几个并行实例,这个函数从jobs通道 // 里面接受任务,然后把运行结果发送到results通道。每个job我们 // 都休眠一会儿,来模拟一个耗时任务。 func worker(id int,
如何从线程池中找到60%(或N%)的线程可用性?这背后的逻辑是什么? 父线程使用线程池线程生成多个网址,并等待所有子线程完成。 代码如下所示 父线程 子线程 用于跨线程通信的对象数据 在上述代码中,所需的线程硬编码为: 这种硬编码会导致线程池不足吗?如果线程池中没有可用的线程,会发生什么?如何在托管服务器的线程池中查找可用线程的总数? 谢谢。
我正在使用Hystrix库处理网络调用。这是我用于线程池配置的内容 我正在使用公共的HytrixCommandKey和HytrixThreadPoolKey创建一个包含1000个HytrixCommand对象的加载。但是线程池在服务前100个线程之后执行回退逻辑,而不是使用threadpool队列。这可以通过将threadpool,coreSize设置为1000来解决。但我使用的是java,将co
Eclipse 3.0作业API是否使用任何内部线程池来执行作业?还是每次计划作业(即将启动)时都会创建一个新线程? 如果它不使用任何线程池,是否可以通过某种方式将作业与Java的ExecutorService一起使用,以便调度的作业将重用执行器池中的现有线程? 如果不是,那么最后一个问题是,是否有机会在Eclipse进度视图中提供进度反馈(就像我在Jobs IProgressMonitor中所做
我有一个应用程序调用插件DLL。其中一些调用是从辅助线程(即,不是UI线程)执行的,并且可能会弹出一个带有MessageBox的对话框。根据这个(http://www.codeproject.com/articles/121226/messagebox-and-worker-threads),有效的UI线程被切换到调用MessageBox的线程。这会使应用程序“崩溃”,因为消息泵停止接收消息。有什