runner 包用于展示如何使用通道来监视程序的执行时间,如果程序运行时间太长,也可以用 runner 包来终止程序。
当开发需要调度后台处理任务的程序的时候,这种模式会很有用。
这个程序可能会作为 cron 作业执行,或者在基于定时任务的云环境(如 iron.io)里执行。让我们来看一下 runner 包里的 runner.go 代码文件,如代码清单 7-1 所示。
代码清单 7-1 runner/runner.go
// Gabriel Aszalos 协助完成了这个示例
// runner 包管理处理任务的运行和生命周期
package runner
import (
"errors"
"os"
"os/signal"
"time"
)
// Runner 在给定的超时时间内执行一组任务,
// 并且在操作系统发送中断信号时结束这些任务
type Runner struct {
// interrupt 通道报告从操作系统
// 发送的信号
interrupt chan os.Signal
// complete 通道报告处理任务已经完成
complete chan error
// timeout 报告处理任务已经超时
timeout <-chan time.Time
// tasks 持有一组以索引顺序依次执行的
// 函数
tasks []func(int)
}
// ErrTimeout 会在任务执行超时时返回
var ErrTimeout = errors.New("received timeout")
// ErrInterrupt 会在接收到操作系统的事件时返回
var ErrInterrupt = errors.New("received interrupt")
// New 返回一个新的准备使用的 Runner
func New(d time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d),
}
}
// Add 将一个任务附加到 Runner 上。这个任务是接收int 类型的 ID 作为参数的函数
func (r *Runner) Add(tasks ...func(int)) {
r.tasks = append(r.tasks, tasks...)
}
// Start 执行所有任务,并监视通道事件
func (r *Runner) Start() error {
// 我们希望接收所有中断信号
signal.Notify(r.interrupt, os.Interrupt)
// 用不同的 goroutine 执行不同的任务
go func() {
r.complete <- r.run()
}()
select {
// 当任务处理完成时发出的信号
case err := <-r.complete:
return err
// 当任务处理程序运行超时时发出的信号
case <-r.timeout:
return ErrTimeout
}
}
// run 执行每一个已注册的任务
func (r *Runner) run() error {
for id, task := range r.tasks {
// 检测操作系统的中断信号
if r.gotInterrupt() {
return ErrInterrupt
}
// 执行已注册的任务
task(id)
}
return nil
}
// gotInterrupt 验证是否接收到了中断信号
func (r *Runner) gotInterrupt() bool {
select {
// 当中断事件被触发时发出的信号
case <-r.interrupt:
// 停止接收后续的任何信号
signal.Stop(r.interrupt)
return true
// 继续正常运行
default:
return false
}
}
代码清单 7-1 中的程序展示了依据调度运行的无人值守的面向任务的程序,及其所使用的并发模式。在设计上,可支持以下终止点:
书中对代码进行了详细解释,不过实在太过啰嗦了,我们只要多看几遍代码就可以看明白了,这里不再赘述。
读者若想详细了解,请查看原书。
测试类
package main
import (
"learn/runner"
"log"
"os"
"time"
)
// timeout 规定了必须在多少秒内处理完成
const timeout = 3 * time.Second
// main 是程序的入口
func main() {
log.Println("Starting work.")
// 为本次执行分配超时时间
r := runner.New(timeout)
// 加入要执行的任务
r.Add(createTask(), createTask(), createTask())
// 执行任务并处理结果
if err := r.Start(); err != nil {
switch err {
case runner.ErrTimeout:
log.Println("Terminating due to timeout.")
os.Exit(1)
case runner.ErrInterrupt:
log.Println("Terminating due to interrupt.")
os.Exit(2)
}
}
log.Println("Process ended.")
}
// createTask 返回一个根据 id 休眠指定秒数的示例任务
func createTask() func(int) {
return func(id int) {
log.Printf("Processor - Task #%d.", id)
time.Sleep(time.Duration(id) * time.Second)
}
}