当前位置: 首页 > 文档资料 > Go 中文文档 >

5.13. 并发

优质
小牛编辑
131浏览
2023-12-01

5.13. 并发

5.13.1. 交流来分享

并发编程是很大的主题,此处只够讲 Go 方面的要点。

很多环境的并发编程变得困难出自于实现正确读写共享变量的微妙性。Go 鼓励一种不一样的方式,这里,共享变量在信道是传递,并且事实上,从来未被独立的执行序列所共享。每一特定时间只有一个够程在存取该值。从设计上数据竞争就不会发生。为鼓励这种思考方式我们把它缩减成一句口号:

> 别靠共享内存来通信,要靠通信来分享内存!

此方式可扯的太远。例如,引用计数最好靠整型变量外加一个互斥。但最为高层方式,使用信道控制存取可以更容易写成清楚正确的程序。

思考这种模型一种方式是考虑在单一 CPU 上运行的典型的单线程程序,不需用到同步原语。现在多运行一份;也同样不需同步。现在让两者通信;如果通信是同步者,还是不需其它的同步。例如,Unix 的管道完美的适合这个模型。尽管 Go 的并行方式源自 Hoare 的通信顺序进程(CSP),它也可视为泛化的类型安全的 Unix 管道。

5.13.2. Goroutines(Go程)

它们叫做够程, 是因为现有的术语 -- 线程,协程和进程等 -- 传达的含义不够精确。够程的模式很简单:它是在同一地址空间和其它够程并列执行的函数。它轻盈,只比分配堆栈空间多费一点儿。因为堆栈开始时很小,所以它们很便宜,只在需要时分配(和释放)堆库存。

够程在多个 OS 线程间复用,所以如果某个需要阻塞,例如在等待IO,其它的可继续执行。它们的设计隐藏了许多线程生成和管理的复杂性。

在某个函数或方法前加上 go 键字则在新够程中执行此调用。当调用完成,够程安静的退出。(效果类似 Unix shell 的 & -- 在后台执行命令。)

  go list.Sort()  // run list.Sort in parallel; don't wait for it.

函数字面在实施够程上很顺手:

  func Announce(message string, delay int64) {
      go func() {
          time.Sleep(delay)
          fmt.Println(message)
      }()  // Note the parentheses - must call the function.
  }

在 Go 里,函数字面是闭包:实现保证函数引用的变量可以活到它们不在用了。

这些例子没什么用处,因为函数无法通知其结束。那需用到信道。

5.13.3. Channels(信道)

类型映射,信道是引用类型,使用 make 分配。如果提供了可选的整型参量,它会设置信道的缓冲大小。默认是0,即无缓冲的或同步的信道。

  ci := make(chan int)            // unbuffered channel of integers
  cj := make(chan int, 0)         // unbuffered channel of integers
  cs := make(chan *os.File, 100)  // buffered channel of pointers to Files

信道结合了通信 -- 即值的交换 -- 和同步 -- 确保两个计算(够程)处于某个已知状态。

信道有很多惯用语。先从一个开始。上节我们启动了个后台的排序。信道使启动够程能等待排序完成。

  c := make(chan int)  // Allocate a channel.
  // Start the sort in a goroutine; when it completes, signal on the channel.
  go func() {
      list.Sort()
      c <- 1  // Send a signal; value does not matter. 
  }()
  doSomethingForAWhile()
  <-c   // Wait for sort to finish; discard sent value.

接收者阻塞到有数据可以接收。如果信道是非缓冲的,发送者阻塞到接收者收到其值。如果信道有缓冲,发送者只需阻塞到值拷贝到缓冲里;如果缓冲满,则等待直到某个接收者取走一值。

一个缓冲信道可以用作信号灯,比如用来限速。下例中,到来请求传递给 handle,来发送一值到信道,处理请求,以及才信道接收值。信道的容量决定了可同时调用 process 的数量。

  var sem = make(chan int, MaxOutstanding)

  func handle(r *Request) {
      sem <- 1    // Wait for active queue to drain.
      process(r)  // May take a long time.
      <-sem       // Done; enable next request to run.
  }

  func Serve(queue chan *Request) {
      for {
          req := <-queue
          go handle(req)  // Don't wait for handle to finish.
      }
  }

同样的概念,我们可以启动一定数量的 handle 够程,全都读取请求信道。够程的数量限制同时调用 process 的数量。此 Serve 函数也接受一个信道来告知它退出;够程启动后会接收阻塞在此信道。

  func handle(queue chan *Request) {
      for r := range queue {
          process(r)
      }
  }

  func Serve(clientRequests chan *clientRequests, quit chan bool) {
      // Start handlers
      for i := 0; i < MaxOutstanding; i++ {
          go handle(clientRequests)
      }
      <-quit  // Wait to be told to exit.
  }

5.13.4. Channels of channels(信道的信道)

Go 的一个最重要的特色是信道作为一等值可以被分配被传递,正如其它的值。此特色常用来实现安全并行的分路器。

上节的例子里,handle 是个理想化的请求经手者,但我们并未定义其经手的类型。如果此类型包括一个可回发的信道,每个客户都可提供自身的回答途径。下面是类型 Request 的语义定义。

  type Request struct {
      args        []int
      f           func([]int) int
      resultChan  chan int
  }

客户提供一个函数及其参量,以及在请求物件里的一个信道,用来接收答案。

  func sum(a []int) (s int) {
      for _, v := range a {
          s += v
      }
      return
  }

  request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
  // Send request
  clientRequests <- request
  // Wait for response.
  fmt.Printf("answer: %d\n", <-request.resultChan)

服务器端,经手函数是唯一需要改变的。

  func handle(queue chan *Request) {
      for req := range queue {
          req.resultChan <- req.f(req.args)
      }
  }

当然还要很多工作使其实际,但此代码是个速率限制、并行、非阻塞的 RPC 系统的框架,而且看不到一个互斥。

5.13.5. 并发

这些概念的另一应用是在多 CPU 核上并发计算。如果一个运算可以分解为独立片段,则可并发,用一信道通知每个片段的结束。

比如我们有个很花时间的运算执行在一列项上,每个项的运算值都是独立的,如下例:

  type Vector []float64

  // Apply the operation to v[i], v[i+1] ... up to v[n-1].
  func (v Vector) DoSome(i, n int, u Vector, c chan int) {
      for ; i < n; i++ {
          v[i] += u.Op(v[i])
      }
      c <- 1    // signal that this piece is done
  }

我们在循环里单独启动片段,每个 CPU 一个。 它们谁先完成都没关系;我们只是启动全部够程前清空信道,再数数结束通知即可。

  const NCPU = 4  // number of CPU cores

  func (v Vector) DoAll(u Vector) {
      c := make(chan int, NCPU)  // Buffering optional but sensible.
      for i := 0; i < NCPU; i++ {
          go v.DoSome(i*len(v)/NCPU, (i+1)*len(v)/NCPU, u, c)
      }
      // Drain the channel.
      for i := 0; i < NCPU; i++ {
          <-c    // wait for one task to complete
      }
      // All done.
  }

现在的 gc 实现(6g 等)不会默认的并发此代码。它只投入一个核给用户层的运算。任意多的够程可以阻塞在系统调用上,但默认的每个时刻只有一个可以执行用户层的代码。它本该更聪明些,某天它会变聪明,但那之前如果你要并发 CPU 就必须告知运行态你要同时执行代码的够程的数量。有两种相关的办法,或者你把环境变量GOMAXPROCS 设为你要用到的核数(默认1);或者导入 runtime 包调用 runtime.GOMAXPROCS(NCPU)。再提一遍,此要求会随着调度及运行态的进步而退休。

5.13.6. 漏水缓冲

并发编程的工具也可用来使非并发的概念更容易表达。下例是从某个RPC 包里提取的。客户够程循环接收数据自某源,可能是网络。为免分配释放缓冲,它保有一个自由列,并由一个缓冲的信道代表。如果信道空,则新缓冲被分配。当消息缓冲好时,它在 serverChan 上发给服务器。

  var freeList = make(chan *Buffer, 100)
  var serverChan = make(chan *Buffer)

  func client() {
      for {
          b, ok := <-freeList  // grab a buffer if available
          if !ok {              // if not, allocate a new one
              b = new(Buffer)
          }
          load(b)              // read next message from the net
          serverChan <- b      // send to server
      }
  }

服务器循环读消息自客户,处理,返回缓冲到自由列。

  func server() {
      for {
          b := <-serverChan    // wait for work
          process(b)
          _ = freeList <- b    // reuse buffer if room
      }
  }

客户无阻的从 freeList 得到一个缓冲,如还没有则客户分配个新的缓冲。服务器无阻的发送给 freeList,放 b 回自由列,除非列满,此时缓冲掉到地板上被垃圾收集器回收。(发送操作赋值给空白标识使其 无阻但会忽略操作是否成功。)此实现仅用几行就打造了个漏水缓冲,靠缓冲信道和垃圾收集器记账。