当前位置: 首页 > 工具软件 > Golang-Pool > 使用案例 >

golang_Pool: go实现协程池

鲜于星波
2023-12-01

初始化一个 Goroutine Pool 池,这个Pool维护了一个类似栈的FILO队列 ,里面存放负责处理任务的Worker,然后每有一个请求就开启一个协程。
之后

  1. 检查当前Worker队列中是否有空闲的Worker,如果有,取出执行当前的task;
  2. 没有空闲Worker,判断当前在运行的Worker是否已超过该Pool的容量,是 — 阻塞等待直至有Worker被放回Pool;否 — 新开一个Worker(goroutine)处理;
  3. 每个Worker执行完任务之后,放回Pool的队列中等待。

参考资料:《Go语言实战》7.2章 Pool

package main

import (
	"io"
	"log"
	"sync"
	"errors"
)

// Pool管理一组可以安全在多个goroutine间共享的资源
// 被管理的资源必须实现io.Closer接口
type Pool struct {
	m         sync.Mutex
	resources chan io.Closer
	factory   func() (io.Closer, error)
	closed    bool
}

// ErrPoolClosed表示请求了一个已经关闭的池
var ErrPoolClosed = errors.New("Pool has been closed.")

//New创建一个用来管理资源的池
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
	if size <= 0 {
		return nil, errors.New("Size value too small.")
	}

	return &Pool{
		factory:   fn,
		resources: make(chan io.Closer, size),
	}, nil
}

//Acquire从池中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
	select {
	// 检查是否有空闲的资源
	case r, ok := <-p.resources:
		log.Println("Acquire:", "Shared Resource")
		if !ok {
			return nil, ErrPoolClosed
		}
		return r, nil

	//如果没有空闲资源可用,提供一个新资源
	default:
		log.Println("Acquire:", "New Resource")
		return p.factory()
	}
}

//Release将一个使用后的资源放回池里
func (p *Pool) Release(r io.Closer) {
	//加锁,保证本操作和Close操作的安全
	p.m.Lock()
	defer p.m.Unlock()

	//如果池已经关闭,销毁这个资源
	if p.closed {
		r.Close()
		return
	}

	select {
	//将这个资源放入队列
	case p.resources <- r:
		log.Println("Release:", "In Queue")
	// 如果队列已满,则关闭这个资源
	default:
		log.Println("Release:", "Closing")
		r.Close()
	}
}

// Close会让资源池停止工作,并关闭所有现在的资源
func (p *Pool) Close() {
	p.m.Lock()
	defer p.m.Unlock()

	if p.closed {
		return
	}

	//将池关闭
	p.closed = true

	//清空通道里的资源之前,将通道关闭,防止死锁
	close(p.resources)

	//关闭资源
	for r := range p.resources {
		r.Close()
	}
}

 类似资料: