当前位置: 首页 > 工具软件 > goworker > 使用案例 >

go开启多个子协程干活、worker

江英华
2023-12-01

一 目标

go启动多个协程,每个协程做一个任务,实现并行做任务。

二 code示例

1. 简单go知识介绍

go的并发通过goroutine来处理;
func(){}()是go里的匿名函数,{}后面的()是传参数的。如func(a int){…}(100),传了个参数值100;
go func(){}开启1个子协程;
for+ go func开启多个子协程;
go的sync.WaitGroup控制并发的流程;
go func()里+defer捕获子协程内部有发生的panic。因为主进程的defer无法捕获子协程中的panic异常,所有go func()中一般有defer()处理子协程内发生的panic。

2. sync.WaitGroup

是一个struct,有三个方法:
var wkCount sync.WaitGroup

  • wkCount.Add(delta int):增加需要等待的任务数
  • wkCount.Done():任务数减1。放在任务做完的时候,执行。
  • wkCount.Wait():会将程序阻塞,直到Done()将任务数降至0,既所有任务做完。只要有任务没做完,便锁住程序。

3. 看code及输出,理解

// Exception 异常处理
func Exception(funcName string) {
	if p := recover(); p != nil {
		fmt.Println("[msg: recover panic: %v] [err: %v]", funcName, p)
		debug.PrintStack()
	}
}

func testWaitGroup(workers int) { // 主进程
	defer Exception("test WaitGroup")

	var wkCount sync.WaitGroup

	// 增加需要等待的任务数
	wkCount.Add(workers)

	for i := 0; i < workers; i++ {
		go func() {
			// go func(goroutine方式):开启一个子协程   但主进程的defer无法捕获子协程中的panic异常,所有go func()中一般有defer()处理子协程内发生的panic
			// go func()+defer一般用在:多次远程调用,每一次新开一个协程去执行,无需等待结果即可循环下一次????
			// defer捕获子协程内发生的panic
			defer func() {
				// 任务数降1个 当捕获子协程异常时,说明1个子协程(process)已经执行结束了
				wkCount.Done()
				fmt.Println("333") // 3
				if p := recover(); p != nil {
					fmt.Println("[msg: recovered panic] [err: %v]", p)
					debug.PrintStack()
				}
				fmt.Println("444") // 4
			}()

			// 核心处理操作  子协程
			fmt.Println("222 process:", i+10) // 2
		}()

		fmt.Println("111") // 执行顺序:1
	}
	wkCount.Wait()          // 等待所有协程执行完毕 只要有任务在阻塞着,便锁住程序
	fmt.Println("wait 最后后") // 5
}

func main() {
	workers := 3
	testWaitGroup(workers)
}

启1个的输出:
111
222 process: 10
333
444
wait 最后后

启3个的输出(空行我自己添加的):
111
111
111

222 process: 13
333
444

222 process: 13
333
444

222 process: 13
333
444

wait 最后后

go worker

代码示例

worker层:

type EntPushWorker struct {
	logger xxx
	xxx
}

func (w *EntPushWorker) Test2worker() {
	fmt.Println("abc")
}

func (w *EntPushWorker) Test2worker() {
	fmt.Println("def")
}

func (w *EntPushWorker) Test3worker() {
	fmt.Println("xyz")
}

main.go:

package main

import (
	"sync"

	"xxx/xxx/worker"
)

func main() {
	wg := sync.WaitGroup{}

	var entPushWorker worker.EntPushWorker
	// 下面开了3个任务,并行执行
	wg.Add(1)
	go func() {
		defer wg.Done()
		entPushWorker.Test1worker() // 可以写init
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()
		entPushWorker.Test2worker()
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()
		entPushWorker.Test3worker()
	}()

	wg.Wait() // 直到所有的执行完了,才会wg.wait
	fmt.Println("done!")
}

输出:

并发执行,谁先执行完,谁先输出
xyz
abc
def

写定时任务

定时器:每隔5秒输出一个时间

func (w *EntPushWorker) Test1worker() {
	// fmt.Println("abc")
	var ch chan int
	//定时任务
	ticker := time.NewTicker(time.Second * 5)
	go func() {
		for range ticker.C {
			fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
		}
		ch <- 1
	}()
	<-ch
}

输出:
2022-09-16 11:05:55
2022-09-16 11:06:00
2022-09-16 11:06:05
2022-09-16 11:06:10
2022-09-16 11:06:15
2022-09-16 11:06:20
 类似资料: