Golang源码分析系列之Channel底层实现

Golang中Channel是goroutine间重要通信的方式,是并发安全的,通道内的数据First In First Out,我们可以把通道想象成队列。这里面分析的源码基于go1.13版本。

channel数据结构

Channel底层数据结构是一个结构体。

1
2
3
4
5
6
7
8
9
10
11
12
13
type hchan struct {
qcount uint // 队列中元素个数
dataqsiz uint // 循环队列的大小
buf unsafe.Pointer // 指向循环队列
elemsize uint16 // 通道里面的元素大小
closed uint32 // 通道关闭的标志
elemtype *_type // 通道元素的类型
sendx uint // 待发送的索引,即循环队列中的队尾指针front
recvx uint // 待读取的索引,即循环队列中的队头指针rear
recvq waitq // 接收等待队列
sendq waitq // 发送等待队列
lock mutex // 互斥锁
}

hchan结构体中的buf指向一个数组,用来实现循环队列,sendx是循环队列的队尾指针,recvx是循环队列的队头指针。dataqsize是缓存型通道的大小,qcount是记录通道内元素个数。

循环队列一般使用空余单元法来解决队空和队满时候都存在font=rear带来的二义性问题,但这样会浪费一个单元。golang的channel中是通过增加qcount字段记录队列长度来解决二义性,一方面不会浪费一个存储单元,另一方面当使用len函数查看队列长度时候,可以直接返回qcount字段,一举两得。

hchan结构体中另一重要部分是recvq,sendq,分别存储了等待从通道中接收数据的goroutine,和等待发送数据到通道的goroutine。两者都是waitq类型。

waitq是一个结构体类型,waitq和sudog构成双向链表,其中sudog是链表元素的类型,waitq中first和last字段分别指向链表头部的sudog,链表尾部的sudog。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type waitq struct {
first *sudog
last *sudog
}

type sudog struct {
...
g *g // 当前阻塞的G
...
next *sudog
prev *sudog
elem unsafe.Pointer
...
}

hchan结构图如下:

channel的创建

在分析channel的创建代码之前,我们看下源码文件中最开始定义的两个常量;

1
2
3
4
5
const (
maxAlign = 8
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
...
)
  • maxAlgin用来设置内存最大对齐值,对应就是64位系统下cache line的大小。当结构体是8字节对齐时候,能够避免false share,提高读写速度
  • hchanSize用来设置chan大小,unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)),这个复杂公式用来计算离unsafe.Sizeof(hchan{})最近的8的倍数。假设hchan{}大小是13,hchanSize是16。

假设n代表unsafe.Sizeof(hchan{}),a代表maxAlign,c代表hchanSize,则上面hchanSize的计算公式可以抽象为:

c = n + ((-n) & (a - 1))

计算离8最近的倍数,只需将n补足与到8倍数的差值就可,c也可以用下面公式计算

c = n + (a - n%a)

感兴趣的可以证明在a为2的n的次幂时候,上面两个公式是相等的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 通道元素的大小不能超过64K
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}

// hchanSize大小不是maxAlign倍数,或者通道数据元素的对齐保证大于maxAlign
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 判断通道数据是否超过内存限制
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}

var c *hchan
switch {
case mem == 0: // 无缓冲通道
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 当通道数据元素不含指针,hchan和buf内存空间调用mallocgc一次性分配完成
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// hchan和buf内存上布局是紧挨着的
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 当通道数据元素含指针时候,先创建hchan,然后给buf分配内存空间
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}

c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
...
return c
}

发送数据到channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 当通道为nil时候
if c == nil {
// 非阻塞模式下,直接返回false
if !block {
return false
}
// 调用gopark将当前Goroutine休眠,调用gopark时候,将传入unlockf设置为nil,当前Goroutine会一直休眠
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}

// 调试,不必关注
if debugChan {
print("chansend: chan=", c, "\n")
}
// 竞态检测,不必关注
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}

// 非阻塞模式下,不使用锁快速检查send操作
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加锁
lock(&c.lock)

// 如果通道已关闭,再发送数据,发生恐慌
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

// 从接收者队列recvq中取出一个接收者,接收者不为空情况下,直接将数据传递给该接收者
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

// 缓冲队列中的元素个数小于队列的大小
// 说明缓冲队列还有空间
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx) // qp指向循环数组中未使用的位置
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 将发送的数据写入到qp指向的循环数组中的位置
typedmemmove(c.elemtype, qp, ep)
c.sendx++ // 将send加一,相当于循环队列的front指针向前进1
if c.sendx == c.dataqsiz { //当循环队列最后一个元素已使用,此时循环队列将再次从0开始
c.sendx = 0
}
c.qcount++ // 队列中元素计数加1
unlock(&c.lock) // 释放锁
return true
}

if !block {
unlock(&c.lock)
return false
}

gp := getg() // 获取当前的G
mysg := acquireSudog() // 返回一个sudog
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep // 发送的数据
mysg.waitlink = nil
mysg.g = gp // 当前G,即发送者
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) // 将当前发送者入队sendq中
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) // 将当前goroutine放入waiting状态,并释放c.lock锁

// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer
KeepAlive(ep)

// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
// 无缓冲通道
racesync(c, sg)
} else {
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++ // 相当于循环队列的rear指针向前进1
if c.recvx == c.dataqsiz { // 队列数组中最后一个元素已读取,则再次从头开始读取
c.recvx = 0
}
c.sendx = c.recvx
}
}
if sg.elem != nil { // 复制数据到sg中
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1) // 使goroutine变成runnable状态,唤醒goroutine
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}

// 返回缓存槽i位置的对应的指针
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

// 将src值复制到dst
// 源码https://github.com/golang/go/blob/2bc8d90fa21e9547aeb0f0ae775107dc8e05dc0a/src/runtime/mbarrier.go#L156
func typedmemmove(typ *_type, dst, src unsafe.Pointer) {
if dst == src {
return
}
...
memmove(dst, src, typ.size)
...
}

从channel中读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 当通道为nil时候
if c == nil {
if !block { // 当非阻塞模式直接返回
return
}
// 一直阻塞
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
// 加锁锁
lock(&c.lock)
// 当通道已关闭,且通道缓冲没有元素时候,直接返回
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock) // 释放锁
if ep != nil {
typedmemclr(c.elemtype, ep) // 清空ep指向的内存
}
return true, false
}
// 从发送者队列中取出一个发送者,发送者不为空时候,将发送者数据传递给接收者
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}

// 缓冲队列中有数据情况下,从缓存队列取出数据,传递给接收者
if c.qcount > 0 {
// qp指向循环队列数组中元素
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
// 直接qp指向的数据复制到ep指向的地址
typedmemmove(c.elemtype, ep, qp)
}
// 清空qp指向内存的数据
typedmemclr(c.elemtype, qp)
c.recvx++ // 相当于循环队列中的rear加1
if c.recvx == c.dataqsiz { // 队列最后一个元素已读取出来,recvx指向0
c.recvx = 0
}
c.qcount-- // 队列中元素个数减1
unlock(&c.lock) // 释放锁
return true, true
}

if !block {
unlock(&c.lock)
return false, false
}

gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}

mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// 复制队列中数据到接收者
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1) // 唤醒G
}

关闭channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func closechan(c *hchan) {
// 当关闭的通道是nil时候,直接恐慌
if c == nil {
panic(plainError("close of nil channel"))
}
// 加锁
lock(&c.lock)
// 通道已关闭,再次关闭直接恐慌
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
...
c.closed = 1 // 关闭标志closed置为1
var glist gList
// 将接收者添加到glist中
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 将发送者添加到glist中
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp) //
}
unlock(&c.lock)

// 循环glist,调用goready唤醒所有接收者和发送者
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}

总结

  1. channel规则:
操作 空Channel 已关闭Channel 活跃Channel
close(ch) panic panic 成功关闭
ch <-v 永远阻塞 panic 成功发送或阻塞
v,ok = <-ch 永远阻塞 不阻塞 成功接收或阻塞

注意:从空通道中写入或读取数据会永远阻塞,这会造成goroutine泄漏。

  1. 发送、接收数据以及关闭通道流程图:

golang通道发送、接收数据以及关闭通道流程图