go Channe是一种特殊的类型,是有特定类型的队列。是链接goroutine(协程)的通信机制,通过通信共享内存而不是通过共享内存而实现通信.
Channel 收发操作均遵循了先进先出的设计,具体规则如下:
先从 Channel 读取数据的 Goroutine 会先接收到数据;
先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;
channel 数据结构定义:
type hchan struct {
// 队列中存储的数量
qcount uint // total data in the queue
//环形队列的大小(最大存储数量 )
dataqsiz uint // size of the circular queue
// 存放环形队列的数据,数组
buf unsafe.Pointer // points to an array of dataqsiz elements
// 元素的大小
elemsize uint16
// 是否关闭的标识
closed uint32
// 元素的类型(指向类型的元数据 )
elemtype *_type // element type
// 当前发送数据在环形队列的索引
sendx uint // send index
// 当前接受数据在环形队列的索引
recvx uint // receive index
// 接收者等待队列(<-ch)阻塞在channel的协程队列
recvq waitq // list of recv waiters
// 发送者等待队列(ch<- data)阻塞在channel的协程队列
sendq waitq // list of send waiters
//锁保护hchan中的所有字段,以及几个
//在这个通道上阻塞sudogs中的字段
//保持这个锁时不要改变另一个G的状态
//(特别是,不要准备一个G),因为这可能会死锁
//栈收缩。
lock mutex // 保护hchan中的所有字段,保持协程的状态不被更改,避免造成栈收缩引起的死锁,使用互斥锁解决程序中可能存在的线程竞争问题是很常见的
}
发送者/接收者等待队列的结构:一个双向链表
type waitq struct {
first *sudog
last *sudog
}
channel sudog(等待队列)结构如下
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
//以下字段受hchan保护。锁的
//这个sudog正在阻塞。shrinkstack取决于
//这是为涉及通道操作的sudogs。
g *g // 等待的协程协程
next *sudog
prev *sudog
// 数据元素(可以指向堆栈),等待发送/接收的数据
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
//下面的字段永远不会并发访问。
//对于通道,waitlink只被g访问。
//对于信号量,所有的字段(包括上面的字段)
//只在持有semaRoot锁时访问。
acquiretime int64
releasetime int64
ticket uint32
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
// 表示g被选择
isSelect bool
// success indicates whether communication over channel c
// succeeded. It is true if the goroutine was awoken because a
// value was delivered over channel c, and false if awoken
// because c was closed.
//成功表示是否通过通道c通信
// 成功了。 如果 goroutine 被唤醒是因为一个
// 值通过通道 c 传递,如果被唤醒则返回 false
// 因为 c 被关闭了
success bool // c 因关闭而唤醒
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
// 等待的channel被唤醒
c *hchan // channel
}
结构如图所示
channel 创建
channel 和 切片、map一样,需要使用make(chan type, int )才能使用,应为make()会调用makeChan()初始化
makech函数源码如下:
// 参数类型:创建chan的类型和环型缓冲区的数量
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
//判断环型缓冲区是否溢出
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
// 当队列或者元素大小为0时,定义无缓冲chan(同步chan)
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
// Race 竞争检查利用这个地址来进行同步操作
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
// 元素不包含指针时。一次分配 hchan 和 buf 的内存。
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
// 定义带缓存的chan或者异步的chan
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size) // chan元素的大小
c.elemtype = elem // chan元素的类型
c.dataqsiz = uint(size) // chan缓存区大小
lockInit(&c.lock, lockRankHchan) //初始化互斥锁
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
chan发送数据源码如下:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 判断chan是否被初始化,向chan为nil的chan发送数据将会永久阻塞
if c == nil {
if !block {
return false
}
// 使当前的groutine休眠
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
// 检查在没有获取锁的情况下会导致发送失败的非阻塞操作
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second full()).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation. However, nothing here
// guarantees forward progress. We rely on the side effects of lock release in
// chanrecv() and closechan() to update this thread's view of c.closed and full().
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 获得同步锁
lock(&c.lock)
// 当chan关闭时,释放锁,并panic
// 向也关闭的chan发送消息,会引发panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 如果接收队列中有等待的接收者,直接发送给接收者(有缓存区时,会绕过缓存区)
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
// 没有接收者,当有缓存区时,将要发送的元素放入队列中
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx) // 获取缓存地址
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++ // 指向下一个存储的位置
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++ // 缓存数量相加
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// 缓存区满了,将当前发送协程加入到等待send队列
// Block on the channel. Some receiver will complete our operation for us.
gp := getg() // 获取当前的g发送协程
mysg := acquireSudog()// 创建sudog等待队列
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
// 把当前的发送协程与等待队列绑定
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 加入到发送等待队列中
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
// 发送协程被唤醒,解除等待队列的阻塞状态
// 判断的等待队列是否在休眠
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg) // 释放等待队列
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
channel 发送数据总结
判断chan是否被初始化,向chan为nil的chan发送数据将会永久阻塞
检查在没有获取锁, 在没有获取锁的情况下会导致发送失败的非阻塞操作
检查chan是否关闭,向也关闭的chan发送消息,会引发panic
如果接收队列中有等待的接收者,直接发送给接收者(有缓存区时,会绕过缓存区)
没有接收者,当有缓存区时,将要发送的元素放入队列中
缓存区满了,将当前协程加入到send等待队列,并阻塞
当发送协程被唤醒,解除等待队列的阻塞状态,释放等待队列
channel 接收数据
<- data
chan 接收数据源码如下:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
// 判断chan是否初始化,若没有初始化,接收channel数据将阻塞
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 检查chan是否为空,是否关闭
// Fast path: check for failed non-blocking operation without acquiring the lock.
if !block && empty(c) {
// After observing that the channel is not ready for receiving, we observe whether the
// channel is closed.
//
// Reordering of these checks could lead to incorrect behavior when racing with a close.
// For example, if the channel was open and not empty, was closed, and then drained,
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
// we use atomic loads for both checks, and rely on emptying and closing to happen in
// separate critical sections under the same lock. This assumption fails when closing
// an unbuffered channel with a blocked send, but that is an error condition anyway.
if atomic.Load(&c.closed) == 0 { // chan关闭,就返回
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
return
}
// The channel is irreversibly closed. Re-check whether the channel has any pending data
// to receive, which could have arrived between the empty and closed checks above.
// Sequential consistency is also required here, when racing with such a send.
if empty(c) { // 如果chan为空
// The channel is irreversibly closed and empty.
// // channel 不可逆的关闭了且为空
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// chan 关闭了,清理缓冲区
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 找到一个等待的发件人。如果缓冲区大小为 0,则直接从发送方接收值。否则,从队列的头部接收
// 并将发送者的值添加到队列的尾部(两者都映射到
// 相同的缓冲区槽,因为队列已满)
// 如果是无缓冲队列,直接从发送方取值
// 如果是待缓冲的区,就从缓冲区头部获取值,并将发送着的值保存在缓冲区后
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 没有发送的协程,但是缓冲区有元素,直接获取缓冲区头部的值
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// 当没有发送数据的的协程,且缓冲区值,就将接收的协程放入等待队列中
// no sender available: block on this channel.
gp := getg() // 获取当前接收协程
mysg := acquireSudog() // 创建等待队列
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
// 将接送写协程与等待队列绑定
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 放入在协程的等待队列中
c.recvq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
//当接收协程被唤醒时,解除阻塞状态
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg) // 释放等待队列内存
return true, success
}