[TOC]
概述
Go语言设计团队的首任负责人Rob Pike对并发编程的一个建议是不要让计算通过共享内存来通讯,而应该让它们通过通讯来共享内存。 通道机制就是这种哲学的一个设计结果。
数据结构
runtime.hchan
表示 channel 结构体
1 | type hchan struct { |
qcount
— Channel 中的元素个数;dataqsiz
— Channel 中的循环队列的长度;buf
— Channel 的缓冲区数据指针;sendx
— Channel 的发送操作处理到的位置;recvx
— Channel 的接收操作处理到的位置;sendq
和recvq
存储了当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,这些等待队列使用双向链表runtime.waitq
表示,链表中所有的元素都是runtime.sudog
结构:
创建管道
makechan 是 创建管道最终调用的函数。makechan64 先不考虑。
1 | func makechan(t *chantype, size int) *hchan { |
- 如果当前 Channel 中不存在缓冲区,那么就只会为
runtime.hchan
分配一段内存空间; - 如果当前 Channel 中存储的类型不是指针类型,就会为当前的 Channel 和底层的数组分配一块连续的内存空间;
- 在默认情况下会单独为
runtime.hchan
和缓冲区分配内存;
发送数据
当我们想要向 Channel 发送数据时,就需要使用 ch <- i
语句,编译器会将它解析成 OSEND
节点并在 cmd/compile/internal/gc.walkexpr
函数中转换成 runtime.chansend1
runtime.chansend1
只是调用了 runtime.chansend
并传入 Channel 和需要发送的数据。runtime.chansend
是向 Channel 中发送数据时最终会调用的函数,这个函数负责了发送数据的全部逻辑,如果我们在调用时将 block
参数设置成 true
,那么就表示当前发送操作是一个阻塞操作:
1 | func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { |
在发送数据的逻辑执行之前会先为当前 Channel 加锁,防止发生竞争条件。如果 Channel 已经关闭,那么向该 Channel 发送数据时就会报"send on closed channel"
错误并中止程序。
因为 runtime.chansend
函数的实现比较复杂,所以我们这里将该函数的执行过程分成以下的三个部分:
- 当存在等待的接收者时,通过
runtime.send
直接将数据发送给阻塞的接收者; - 当缓冲区存在空余空间时,将发送的数据写入 Channel 的缓冲区;
- 当不存在缓冲区或者缓冲区已满时,等待其他 Goroutine 从 Channel 接收数据;
直接发送
如果目标 Channel 没有被关闭并且已经有处于读等待的 Goroutine,那么 runtime.chansend
函数会从接收队列 recvq
中取出最先陷入等待的 Goroutine 并直接向它发送数据:
直接发送数据的过程
- 调用
runtime.sendDirect
函数将发送的数据直接拷贝到x = <-c
表达式中变量x
所在的内存地址上; - 调用
runtime.goready
将等待接收数据的 Goroutine 标记成可运行状态Grunnable
并把该 Goroutine 放到发送方所在的处理器的runnext
上等待执行,该处理器在下一次调度时就会立刻唤醒数据的接收方;
直接发送
1 | func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { |
缓冲区
如果当前 Channel 的缓冲区未满,向 Channel 发送的数据会存储在 Channel 中 sendx
索引所在的位置并将 sendx
索引加一,由于这里的 buf
是一个循环数组,所以当 sendx
等于 dataqsiz
时就会重新回到数组开始的位置。
阻塞发送
- 调用
runtime.getg
获取发送数据使用的 Goroutine; - 执行
runtime.acquireSudog
函数获取runtime.sudog
结构体并设置这一次阻塞发送的相关信息,例如发送的 Channel、是否在 Select 控制结构中和待发送数据的内存地址等; - 将刚刚创建并初始化的
runtime.sudog
加入发送等待队列,并设置到当前 Goroutine 的waiting
上,表示 Goroutine 正在等待该sudog
准备就绪; - 调用
runtime.goparkunlock
函数将当前的 Goroutine 陷入沉睡等待唤醒; - 被调度器唤醒后会执行一些收尾工作,将一些属性置零并且释放
runtime.sudog
结构体;
小结
我们在这里可以简单梳理和总结一下使用 ch <- i
表达式向 Channel 发送数据时遇到的几种情况:
- 如果当前 Channel 的
recvq
上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前的 Goroutine 并将其设置成下一个运行的 Goroutine; - 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们就会直接将数据直接存储到当前缓冲区
sendx
所在的位置上; - 如果不满足上面的两种情况,就会创建一个
runtime.sudog
结构并将其加入 Channel 的sendq
队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据;
接受数据
我们接下来继续介绍 Channel 操作的另一方 — 数据的接收。Go 语言中可以使用两种不同的方式去接收 Channel 中的数据:
1 | i <- ch |
接受数据
1 | func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { |
如果当前 Channel 已经被关闭并且缓冲区中不存在任何的数据,那么就会清除 ep
指针中的数据并立刻返回。
除了上述两种特殊情况,使用 runtime.chanrecv
从 Channel 接收数据时还包含以下三种不同情况:
- 当存在等待的发送者时,通过
runtime.recv
直接从阻塞的发送者或者缓冲区中获取数据; - 当缓冲区存在数据时,从 Channel 的缓冲区中接收数据;
- 当缓冲区中不存在数据时,等待其他 Goroutine 向 Channel 发送数据;
直接接收
当 Channel 的 sendq
队列中包含处于等待状态的 Goroutine 时,该函数会取出队列头等待的 Goroutine,处理的逻辑和发送时相差无几,只是发送数据时调用的是 runtime.send
函数,而接收数据时使用 runtime.recv
函数:
runtime.recv
函数的实现比较复杂:
1 | func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { |
- 如果 Channel 不存在缓冲区;
- 调用
runtime.recvDirect
函数会将 Channel 发送队列中 Goroutine 存储的elem
数据拷贝到目标内存地址中;
- 调用
- 如果 Channel 存在缓冲区;
- 将队列中的数据拷贝到接收方的内存地址;
- 将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方;
从发送队列中获取数据
Channel 在缓冲区已经没有空间并且发送队列中存在等待的 Goroutine 时,运行 <-ch
的执行过程 — 发送队列头的 runtime.sudog
结构中的元素会替换接收索引 recvx
所在位置的元素,原有的元素会被拷贝到接收数据的变量的内存空间上
阻塞接收
当 Channel 的发送队列中不存在等待的 Goroutine 并且缓冲区中也不存在任何数据时,从管道中接收数据的操作会变成阻塞操作,然而不是所有的接收操作都是阻塞的,与 select
语句结合使用时就可能会使用到非阻塞的接收操作:
1 | // 不阻塞,直接返回 |
小结
我们梳理一下从 Channel 中接收数据时可能会发生的五种情况:
- 如果 Channel 为空,那么就会直接调用
runtime.gopark
挂起当前 Goroutine; - 如果 Channel 已经关闭并且缓冲区没有任何数据,
runtime.chanrecv
函数会直接返回; - 如果 Channel 的
sendq
队列中存在挂起的 Goroutine,就会将recvx
索引所在的数据拷贝到接收变量所在的内存空间上并将sendq
队列中 Goroutine 的数据拷贝到缓冲区; - 如果 Channel 的缓冲区中包含数据就会直接读取
recvx
索引对应的数据; - 在默认情况下会挂起当前的 Goroutine,将
runtime.sudog
结构加入recvq
队列并陷入休眠等待调度器的唤醒;
我们总结一下从 Channel 接收数据时,会触发 Goroutine 调度的两个时机:
- 当 Channel 为空时;
- 当缓冲区中不存在数据并且也不存在数据的发送者时
关闭管道
编译器会将用于关闭管道的 close
关键字转换成 OCLOSE
节点以及 runtime.closechan
的函数调用。
当 Channel 是一个空指针或者已经被关闭时,Go 语言运行时都会直接 panic
并抛出异常:
1 | func closechan(c *hchan) { |
该函数在最后会为所有被阻塞的 Goroutine 调用 runtime.goready
触发调度。