我们创建了一个通用的 workerPool 包,根据业务所需的并发性使用 worker 来处理任务。一起来看下目录结构:
workerpool
├── pool.go
├── task.go
└── worker.go
workerpool 目录在项目的根目录下。Task 是需要处理单个工作单元;Worker 是一个简单的 worker 函数,用于执行任务;而 Pool 用于创建、管理 workers。
先看下 Task 代码:
// workerpool/task.go
package workerpool
import (
"fmt"
)
type Task struct {
Err error
Data interface{}
f func(interface{}) error
}
func NewTask(f func(interface{}) error, data interface{}) *Task {
return &Task{f: f, Data: data}
}
func process(workerID int, task *Task) {
fmt.Printf("Worker %d processes task %v\n", workerID, task.Data)
task.Err = task.f(task.Data)
}
Task 是一个简单的结构体,保存处理任务所需要的一切数据。创建 task 时,传递了 Data 和待执行函数 f,process() 函数会处理任务。处理任务时,将 Data 作为参数传递给函数 f,并将执行结果保存在 Task.Err 里。
我们来看下 Worker 是如何处理任务的:
// workerpool/worker.go
package workerpool
import (
"fmt"
"sync"
)
// Worker handles all the work
type Worker struct {
ID int
taskChan chan *Task
}
// NewWorker returns new instance of worker
func NewWorker(channel chan *Task, ID int) *Worker {
return &Worker{
ID: ID,
taskChan: channel,
}
}
// Start starts the worker
func (wr *Worker) Start(wg *sync.WaitGroup) {
fmt.Printf("Starting worker %d\n", wr.ID)
wg.Add(1)
go func() {
defer wg.Done()
for task := range wr.taskChan {
process(wr.ID, task)
}
}()
}
我们创建了一个小巧的 Worker 结构体,包含 worker ID 和 一个保存待处理任务的 channel。在 Start() 方法里,使用 for range 从 taskChan 读取任务并处理。可以想象的到,多个 worker 可以并发地执行任务。
我们通过实现 Task 和 Worker 来处理任务,但是好像还缺点什么东西,谁负责生成这些 worker 并将任务发送给它们?答案是:Worker Pool。
// workerpoo/pool.go
package workerpool
import (
"fmt"
"sync"
"time"
)
// Pool is the worker pool
type Pool struct {
Tasks []*Task
concurrency int
collector chan *Task
wg sync.WaitGroup
}
// NewPool initializes a new pool with the given tasks and
// at the given concurrency.
func NewPool(tasks []*Task, concurrency int) *Pool {
return &Pool{
Tasks: tasks,
concurrency: concurrency,
collector: make(chan *Task, 1000),
}
}
// Run runs all work within the pool and blocks until it's
// finished.
func (p *Pool) Run() {
for i := 1; i <= p.concurrency; i++ {
worker := NewWorker(p.collector, i)
worker.Start(&p.wg)
}
for i := range p.Tasks {
p.collector <- p.Tasks[i]
}
close(p.collector)
p.wg.Wait()
}
上面的代码,pool 保存了所有待处理的任务,并且生成与 concurrency 数量一致的 goroutine,用于并发地处理任务。workers 之间共享缓存 channel – collector。
所以,当我们把这个工作池跑起来时,可以生成满足所需数量的 worker,workers 之间共享 collector channel。接着,使用 for range 读取 tasks,并将读取到的 task 写入 collector 里。我们使用 sync.WaitGroup 实现协程之间的同步。现在我们有了一个很好的解决方案,一起来测试下。
// main.go
package main
import (
"fmt"
"time"
"github.com/Joker666/goworkerpool/workerpool"
)
func main() {
var allTask []*workerpool.Task
for i := 1; i <= 100; i++ {
task := workerpool.NewTask(func(data interface{}) error {
taskID := data.(int)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Task %d processed\n", taskID)
return nil
}, i)
allTask = append(allTask, task)
}
pool := workerpool.NewPool(allTask, 5)
pool.Run()
}
上面的代码,创建了 100 个任务并且使用 5 个并发处理这些任务。
输出如下:
Worker 3 processes task 98
Task 92 processed
Worker 2 processes task 99
Task 98 processed
Worker 5 processes task 100
Task 99 processed
Task 100 processed
Took ===============> 2.0056295s
处理 100 个任务花费了 2s,如何我们将并发数提高到 10,我们会看到处理完所有任务只需要大约 1s。
我们通过实现 workerPool 构建了一个健壮的解决方案,具有并发性、错误处理、数据处理等功能。这是个通用的包,不耦合具体的实现。我们可以使用它来解决一些大问题。
实际上,我们还可以进一步扩展上面的解决方案,以便 worker 可以在后台等待我们投递新的任务并处理。为此,代码需要做一些修改,Task 结构体保持不变,但是需要小改下 Worker,看下面代码:
// workerpool/worker.go
// Worker handles all the work
type Worker struct {
ID int
taskChan chan *Task
quit chan bool
}
// NewWorker returns new instance of worker
func NewWorker(channel chan *Task, ID int) *Worker {
return &Worker{
ID: ID,
taskChan: channel,
quit: make(chan bool),
}
}
....
// StartBackground starts the worker in background waiting
func (wr *Worker) StartBackground() {
fmt.Printf("Starting worker %d\n", wr.ID)
for {
select {
case task := <-wr.taskChan:
process(wr.ID, task)
case <-wr.quit:
return
}
}
}
// Stop quits the worker
func (wr *Worker) Stop() {
fmt.Printf("Closing worker %d\n", wr.ID)
go func() {
wr.quit <- true
}()
}
Worker 结构体新加 quit channel,并且新加了两个方法。StartBackgorund() 在 for 循环里使用 select-case 从 taskChan 队列读取任务并处理,如果从 quit 读取到结束信号就立即返回。Stop() 方法负责往 quit 写入结束信号。
添加完这两个新的方法之后,我们来修改下 Pool:
// workerpool/pool.go
type Pool struct {
Tasks []*Task
Workers []*Worker
concurrency int
collector chan *Task
runBackground chan bool
wg sync.WaitGroup
}
// AddTask adds a task to the pool
func (p *Pool) AddTask(task *Task) {
p.collector <- task
}
// RunBackground runs the pool in background
func (p *Pool) RunBackground() {
go func() {
for {
fmt.Print("⌛ Waiting for tasks to come in ...\n")
time.Sleep(10 * time.Second)
}
}()
for i := 1; i <= p.concurrency; i++ {
worker := NewWorker(p.collector, i)
p.Workers = append(p.Workers, worker)
go worker.StartBackground()
}
for i := range p.Tasks {
p.collector <- p.Tasks[i]
}
p.runBackground = make(chan bool)
<-p.runBackground
}
// Stop stops background workers
func (p *Pool) Stop() {
for i := range p.Workers {
p.Workers[i].Stop()
}
p.runBackground <- true
}
Pool 结构体添加了两个成员:Workers 和 runBackground,Workers 保存所有的 worker,runBackground 用于维持 pool 存活状态。
添加了三个新的方法,AddTask() 方法用于往 collector 添加任务;RunBackground() 方法衍生出一个无限运行的 goroutine,以便 pool 维持存活状态,因为 runBackground 信道是空,读取空的 channel 会阻塞,所以 pool 能维持运行状态。接着,在协程里面启动 worker;Stop() 方法用于停止 worker,并且给 runBackground 发送停止信号以便结束 RunBackground() 方法。
我们来看下具体是如何工作的。
如果是在现实的业务场景中,pool 将会与 HTTP 服务器一块运行并消耗任务。我们通过 for 无限循环模拟这种这种场景,如果满足某一条件,pool 将会停止。
// main.go
...
pool := workerpool.NewPool(allTask, 5)
go func() {
for {
taskID := rand.Intn(100) + 20
if taskID%7 == 0 {
pool.Stop()
}
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
task := workerpool.NewTask(func(data interface{}) error {
taskID := data.(int)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Task %d processed\n", taskID)
return nil
}, taskID)
pool.AddTask(task)
}
}()
pool.RunBackground()
当执行上面的代码时,我们就会看到有随机的 task 被投递到后台运行的 workers,其中某一个 worker 会读取到任务并完成处理。当满足某一条件时,程序便会停止退出。
以上文章来源于Golang来啦 ,作者Seekload