go协程池(goroutine)、go线程池(Thread)、go连接池

慕容耘豪
2023-12-01

一、go协程池(goroutine)

注意

Go语言中的goroutine虽然相对于系统线程来说比较轻量级(初始栈大小仅2KB),但是在高并发量下的goroutine频繁创建和销毁对于性能损耗以及GC来说压力也不小

package main

import (
	"errors"
	"fmt"
	"log"
	"sync"
	"sync/atomic"
	"time"
)

var (
	// ErrInvalidPoolCap return if pool size <= 0
	ErrInvalidPoolCap = errors.New("invalid pool cap")
	// ErrPoolAlreadyClosed put task but pool already closed
	ErrPoolAlreadyClosed = errors.New("pool already closed")
)

const (
	// RUNNING pool is running
	RUNNING = 1
	// STOPED pool is stoped
	STOPED = 0
)

// Task task to-do
type Task struct {
	Handler func(v ...interface{})
	Params  []interface{}
}

// Pool task pool
type Pool struct {
	capacity       uint64
	runningWorkers uint64
	state          int64
	taskC          chan *Task
	PanicHandler   func(interface{})
	sync.Mutex
}

// NewPool init pool
func NewPool(capacity uint64) (*Pool, error) {
	if capacity <= 0 {
		return nil, ErrInvalidPoolCap
	}
	return &Pool{
		capacity: capacity,
		state:    RUNNING,
		taskC:    make(chan *Task, capacity),
	}, nil
}

// GetCap get capacity
func (p *Pool) GetCap() uint64 {
	return p.capacity
}

// GetRunningWorkers get running workers
func (p *Pool) GetRunningWorkers() uint64 {
	return atomic.LoadUint64(&p.runningWorkers)
}

func (p *Pool) incRunning() {
	atomic.AddUint64(&p.runningWorkers, 1)
}

func (p *Pool) decRunning() {
	atomic.AddUint64(&p.runningWorkers, ^uint64(0))
}

// Put put a task to pool
func (p *Pool) Put(task *Task) error {

	if p.getState() == STOPED {
		return ErrPoolAlreadyClosed
	}

	// safe run worker
	p.Lock()
	if p.GetRunningWorkers() < p.GetCap() {
		p.run()
	}
	p.Unlock()

	// send task safe
	p.Lock()
	if p.state == RUNNING {
		p.taskC <- task
	}
	p.Unlock()

	return nil
}

func (p *Pool) run() {
	p.incRunning()

	go func() {
		defer func() {
			p.decRunning()
			if r := recover(); r != nil {
				if p.PanicHandler != nil {
					p.PanicHandler(r)
				} else {
					log.Printf("Worker panic: %s\n", r)
				}
			}
		}()

		for {
			select {
			case task, ok := <-p.taskC:
				if !ok {
					return
				}
				task.Handler(task.Params...)
			}
		}
	}()
}

func (p *Pool) getState() int64 {
	p.Lock()
	defer p.Unlock()

	return p.state
}

func (p *Pool) setState(state int64) {
	p.Lock()
	defer p.Unlock()

	p.state = state
}

// close safe
func (p *Pool) close() {
	p.Lock()
	defer p.Unlock()

	close(p.taskC)
}

// Close close pool graceful
func (p *Pool) Close() {

	if p.getState() == STOPED {
		return
	}

	p.setState(STOPED) // stop put task

	for len(p.taskC) > 0 { // wait all task be consumed
		time.Sleep(1e6) // reduce CPU load
	}

	p.close()
}

//https://github.com/wazsmwazsm/mortar
func main() {
	// 创建容量为 10 的任务池
	pool, err := NewPool(10)
	if err != nil {
		panic(err)
	}

	wg := new(sync.WaitGroup)

	for i := 0; i < 1000; i++ {
		wg.Add(1)
		// 创建任务
		task := &Task{
			Handler: func(v ...interface{}) {
				wg.Done()
				fmt.Println(v)
			},
		}
		// 添加任务函数的参数
		task.Params = []interface{}{i, i * 2, "hello"}
		// 将任务放入任务池
		pool.Put(task)
	}
	wg.Add(1)
	// 再创建一个任务
	pool.Put(&Task{
		Handler: func(v ...interface{}) {
			wg.Done()
			fmt.Println(v)
		},
		Params: []interface{}{"hi!"}, // 也可以在创建任务时设置参数
	})

	wg.Wait()

	// 安全关闭任务池(保证已加入池中的任务被消费完)
	pool.Close()
	// 如果任务池已经关闭, Put() 方法会返回 ErrPoolAlreadyClosed 错误
	err = pool.Put(&Task{
		Handler: func(v ...interface{}) {},
	})
	if err != nil {
		fmt.Println(err) // print: pool already closed
	}
}

package main

import (
	"fmt"
	"time"
)

// 任务的属性应该是一个业务函数
type Task struct {
	f func() error // 函数名f, 无参,返回值为error
}

// 创建Task任务
func NewTask(arg_f func() error) *Task {
	task := Task{
		f: arg_f,
	}
	return &task
}

// Task绑定业务方法
func (task *Task) Execute() {
	task.f() // 调用任务中已经绑定好的业务方法
}

// ------------------------------------------------
type Pool struct {
	EntryChannel chan *Task // 对外的Task入口
	JobsChannel  chan *Task // 内部的Task队列
	workerNum    int        // 协程池中最大的woker数量
}

// 创建Pool
func NewPool(cap int) *Pool {
	pool := Pool{
		EntryChannel: make(chan *Task),
		JobsChannel:  make(chan *Task),
		workerNum:    cap,
	}
	return &pool
}

// Pool绑定干活的方法
func (pool *Pool) worker(workID int) {
	// worker工作 : 永久从JobsChannel取任务 然后执行任务
	for task := range pool.JobsChannel {
		task.Execute()
		fmt.Println("work ID ", workID, " has executed")
	}
}

// Pool绑定协程池工作方法
func (pool *Pool) run() {
	// 定义worker数量
	for i := 0; i < pool.workerNum; i++ {
		go pool.worker(i)
	}

	// 从EntryChannel去任务,发送给JobsChannel
	for task := range pool.EntryChannel {
		pool.JobsChannel <- task // 添加task优先级排序逻辑
	}
}

// ------------------------------------------------
func main() {
	// 创建一些任务
	task := NewTask(func() error { // 匿名函数
		fmt.Println(time.Now())
		return nil
	})

	// 创建协程池
	pool := NewPool(4)

	// 创建多任务,抛给协程池
	go func() { // 开启新的协程,防止阻塞
		for {
			pool.EntryChannel <- task
		}
	}()

	// 启动协程池
	pool.run()
}

package main

import (
	"io/ioutil"
	"net/http"
	"runtime"

	"github.com/Jeffail/tunny"
)

func main() {
	numCPUs := runtime.NumCPU()

	pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
		var result []byte

		// TODO: Something CPU heavy with payload

		return result
	})
	defer pool.Close()

	http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
		input, err := ioutil.ReadAll(r.Body)
		if err != nil {
			http.Error(w, "Internal error", http.StatusInternalServerError)
		}
		defer r.Body.Close()

		// Funnel this work into our pool. This call is synchronous and will
		// block until the job is completed.
		result := pool.Process(input)

		w.Write(result.([]byte))
	})

	http.ListenAndServe(":8080", nil)
}

package main

import (
	"fmt"
	"sync"
	"time"
)

// 每个协程都会运行该函数。
// 注意,WaitGroup 必须通过指针传递给函数。
func worker(id int, wg *sync.WaitGroup) {
	fmt.Printf("Worker %d starting\n", id)

	// 睡眠一秒钟,以此来模拟耗时的任务。
	time.Sleep(time.Second)
	fmt.Printf("Worker %d done\n", id)

	// 通知 WaitGroup ,当前协程的工作已经完成。
	wg.Done()
}

func main() {

	// 这个 WaitGroup 被用于等待该函数开启的所有协程。
	var wg sync.WaitGroup

	// 开启几个协程,并为其递增 WaitGroup 的计数器。
	for i := 1; i <= 5; i++ {
		wg.Add(1)
		go worker(i, &wg)
	}

	// 阻塞,直到 WaitGroup 计数器恢复为 0,即所有协程的工作都已经完成。
	wg.Wait()
}

总结

1、限制并发的goroutine数量;
2、复用goroutine,减轻runtime调度压力,提升程序性能;
3、规避过多的goroutine侵占系统资源(CPU&内存)。

线程包含:内核级线程模型、系统级线程模型、用户级线程模型、混合型线程模型。

二、 go线程池(Thread)

package main

import (
	"fmt"
	"time"
)

// 这里是 worker,我们将并发执行多个 worker。
// worker 将从 `jobs` 通道接收任务,并且通过 `results` 发送对应的结果。
// 我们将让每个任务间隔 1s 来模仿一个耗时的任务。
func worker(id int, jobs <-chan int, results chan<- int) {
	for j := range jobs {
		fmt.Println("worker", id, "processing job", j)
		time.Sleep(time.Second)
		results <- j * 2
	}
}

func main() {

	// 为了使用 worker 线程池并且收集他们的结果,我们需要 2 个通道。
	jobs := make(chan int, 100)
	results := make(chan int, 100)

	// 这里启动了 3 个 worker,初始是阻塞的,因为还没有传递任务。
	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results)
	}

	// 这里我们发送 9 个 `jobs`,然后 `close` 这些通道
	// 来表示这些就是所有的任务了。
	for j := 1; j <= 9; j++ {
		jobs <- j
	}
	close(jobs)

	// 最后,我们收集所有这些任务的返回值。
	for a := 1; a <= 9; a++ {
		<-results
	}
}

三、go连接池

package main

import (
	"context"
	"fmt"
	"strconv"
	"sync/atomic"

	pool "github.com/jolestar/go-commons-pool"
)

func Example_simple() {
	type myPoolObject struct {
		s string
	}
	v := uint64(0)
	factory := pool.NewPooledObjectFactorySimple(
		func(context.Context) (interface{}, error) {
			return &myPoolObject{
					s: strconv.FormatUint(atomic.AddUint64(&v, 1), 10),
				},
				nil
		})
	ctx := context.Background()
	p := pool.NewObjectPoolWithDefaultConfig(ctx, factory)
	obj, err := p.BorrowObject(ctx)
	if err != nil {
		panic(err)
	}
	o := obj.(*myPoolObject)
	fmt.Println(o.s)
	err = p.ReturnObject(ctx, obj)
	if err != nil {
		panic(err)
	}
	// Output: 1
}

参考:github.com/jolestar/go-commons-pool
参考:github.com/Jeffail/tunny

 类似资料: