返回
Featured image of post Go Channel的深入理解

Go Channel的深入理解

go Channel

  • Do not communicate by sharing memory; instead, share memory by communicating

CSP并发模型

  • CSP通信顺序进程交谈循序程序,又被译为交换消息的循序程序(communicating sequential processes),它是一种用来描述并发性系统之间进行交互的模型。
  • go Channe是一种特殊的类型,是有特定类型的队列。是链接goroutine(协程)的通信机制,通过通信共享内存而不是通过共享内存而实现通信.
  • Channel 收发操作均遵循了先进先出的设计,具体规则如下:
    1. 先从 Channel 读取数据的 Goroutine 会先接收到数据;
    2. 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;

channel 数据结构定义:

type hchan struct {
	// 队列中存储的数量
	qcount   uint           // total data in the queue
	//环形队列的大小(最大存储数量 )
	dataqsiz uint           // size of the circular queue
	// 存放环形队列的数据,数组
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	// 元素的大小
	elemsize uint16
	// 是否关闭的标识
	closed   uint32
	// 元素的类型(指向类型的元数据 )
	elemtype *_type // element type
	// 当前发送数据在环形队列的索引
	sendx    uint   // send index
	// 当前接受数据在环形队列的索引 
	recvx    uint   // receive index
	// 接收者等待队列(<-ch)阻塞在channel的协程队列
	recvq    waitq  // list of recv waiters
	// 发送者等待队列(ch<- data)阻塞在channel的协程队列
	sendq    waitq  // list of send waiters
	//锁保护hchan中的所有字段,以及几个
	//在这个通道上阻塞sudogs中的字段
	//保持这个锁时不要改变另一个G的状态
	//(特别是,不要准备一个G),因为这可能会死锁
	//栈收缩。
	lock mutex  // 保护hchan中的所有字段,保持协程的状态不被更改,避免造成栈收缩引起的死锁,使用互斥锁解决程序中可能存在的线程竞争问题是很常见的
}
发送者/接收者等待队列的结构:一个双向链表
type waitq struct {
	first *sudog
	last  *sudog
}
channel  sudog(等待队列)结构如下
type sudog struct {
	// The following fields are protected by the hchan.lock of the
	// channel this sudog is blocking on. shrinkstack depends on
	// this for sudogs involved in channel ops.
	//以下字段受hchan保护。锁的
  	//这个sudog正在阻塞。shrinkstack取决于
	//这是为涉及通道操作的sudogs。

	g *g  // 等待的协程协程
	next *sudog
	prev *sudog
	// 数据元素(可以指向堆栈),等待发送/接收的数据
	elem unsafe.Pointer // data element (may point to stack)

	// The following fields are never accessed concurrently.
	// For channels, waitlink is only accessed by g.
	// For semaphores, all fields (including the ones above)
	// are only accessed when holding a semaRoot lock.
	//下面的字段永远不会并发访问。
   	//对于通道,waitlink只被g访问。
	//对于信号量,所有的字段(包括上面的字段)
	//只在持有semaRoot锁时访问。
	acquiretime int64
	releasetime int64
	ticket      uint32

	// isSelect indicates g is participating in a select, so
	// g.selectDone must be CAS'd to win the wake-up race.
	// 表示g被选择
	isSelect bool

	// success indicates whether communication over channel c
	// succeeded. It is true if the goroutine was awoken because a
	// value was delivered over channel c, and false if awoken
	// because c was closed.
	//成功表示是否通过通道c通信
	// 成功了。 如果 goroutine 被唤醒是因为一个
	// 值通过通道 c 传递,如果被唤醒则返回 false
	// 因为 c 被关闭了
	success bool  // c 因关闭而唤醒

	parent   *sudog // semaRoot binary tree
	waitlink *sudog // g.waiting list or semaRoot
	waittail *sudog // semaRoot
    // 等待的channel被唤醒
	c        *hchan // channel
}
  • 结构如图所示

channel 创建

  • channel 和 切片、map一样,需要使用make(chan type, int )才能使用,应为make()会调用makeChan()初始化
makech函数源码如下:
// 参数类型:创建chan的类型和环型缓冲区的数量
func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}
	//判断环型缓冲区是否溢出
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case mem == 0:
		// Queue or element size is zero.
		// 当队列或者元素大小为0时,定义无缓冲chan(同步chan)
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
        // Race 竞争检查利用这个地址来进行同步操作
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
        // 元素不包含指针时。一次分配 hchan 和 buf 的内存。
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers.
        // 定义带缓存的chan或者异步的chan
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)  // chan元素的大小
	c.elemtype = elem               // chan元素的类型
	c.dataqsiz = uint(size)        //  chan缓存区大小
	lockInit(&c.lock, lockRankHchan)  //初始化互斥锁

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c
}
  • channel创建过程:
  1. 编译检查、缓冲区大小检查,判断是否溢出
  2. 判断chan的类型
    1、当创建无缓冲chan时,调用mallocgc()在堆上为chan开辟hchanSize的buf缓存内存空间
    2、创建带缓冲的chan时,判断元素的类型是否为指针类型,若不是,则mallocgc()在堆上为chan和buf缓冲区数组开辟一段大小为 hchanSize+mem连续的内存空间。若是则调用mallocgc()在堆上分别为chan和buf缓冲区分配连续内存空间。

channel 发送数据与接收数据

channel 发送数据

  • chan <- data
chan发送数据源码如下:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 判断chan是否被初始化,向chan为nil的chan发送数据将会永久阻塞
	if c == nil {
		if !block {
			return false
		}
        // 使当前的groutine休眠
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if debugChan {
		print("chansend: chan=", c, "\n")
	}
     // 检查在没有获取锁的情况下会导致发送失败的非阻塞操作
	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	//
	// After observing that the channel is not closed, we observe that the channel is
	// not ready for sending. Each of these observations is a single word-sized read
	// (first c.closed and second full()).
	// Because a closed channel cannot transition from 'ready for sending' to
	// 'not ready for sending', even if the channel is closed between the two observations,
	// they imply a moment between the two when the channel was both not yet closed
	// and not ready for sending. We behave as if we observed the channel at that moment,
	// and report that the send cannot proceed.
	//
	// It is okay if the reads are reordered here: if we observe that the channel is not
	// ready for sending and then observe that it is not closed, that implies that the
	// channel wasn't closed during the first observation. However, nothing here
	// guarantees forward progress. We rely on the side effects of lock release in
	// chanrecv() and closechan() to update this thread's view of c.closed and full().
	if !block && c.closed == 0 && full(c) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}
    // 获得同步锁
	lock(&c.lock)
    // 当chan关闭时,释放锁,并panic
    // 向也关闭的chan发送消息,会引发panic
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
    // 如果接收队列中有等待的接收者,直接发送给接收者(有缓存区时,会绕过缓存区)
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	if c.qcount < c.dataqsiz {
        // 没有接收者,当有缓存区时,将要发送的元素放入队列中
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)  // 获取缓存地址
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++ // 指向下一个存储的位置
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++ // 缓存数量相加
		unlock(&c.lock)
		return true
	}

	if !block {
		unlock(&c.lock)
		return false
	}
    // 缓存区满了,将当前发送协程加入到等待send队列
	// Block on the channel. Some receiver will complete our operation for us.
	gp := getg() // 获取当前的g发送协程
	mysg := acquireSudog()// 创建sudog等待队列
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
    // 把当前的发送协程与等待队列绑定
	mysg.g = gp 
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
    // 加入到发送等待队列中
	c.sendq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
	KeepAlive(ep)

	// someone woke us up.
    // 发送协程被唤醒,解除等待队列的阻塞状态
    // 判断的等待队列是否在休眠
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg) // 释放等待队列
	if closed {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true
}
  • channel 发送数据总结
  1. 判断chan是否被初始化,向chan为nil的chan发送数据将会永久阻塞
  2. 检查在没有获取锁, 在没有获取锁的情况下会导致发送失败的非阻塞操作
  3. 检查chan是否关闭,向也关闭的chan发送消息,会引发panic
  4. 如果接收队列中有等待的接收者,直接发送给接收者(有缓存区时,会绕过缓存区)
  5. 没有接收者,当有缓存区时,将要发送的元素放入队列中
  6. 缓存区满了,将当前协程加入到send等待队列,并阻塞
  7. 当发送协程被唤醒,解除等待队列的阻塞状态,释放等待队列

channel 接收数据

  • <- data
chan 接收数据源码如下:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// raceenabled: don't need to check ep, as it is always on the stack
	// or is new memory allocated by reflect.

	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}
    // 判断chan是否初始化,若没有初始化,接收channel数据将阻塞
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
    // 检查chan是否为空,是否关闭
	// Fast path: check for failed non-blocking operation without acquiring the lock.
	if !block && empty(c) {
		// After observing that the channel is not ready for receiving, we observe whether the
		// channel is closed.
		//
		// Reordering of these checks could lead to incorrect behavior when racing with a close.
		// For example, if the channel was open and not empty, was closed, and then drained,
		// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
		// we use atomic loads for both checks, and rely on emptying and closing to happen in
		// separate critical sections under the same lock.  This assumption fails when closing
		// an unbuffered channel with a blocked send, but that is an error condition anyway.
		if atomic.Load(&c.closed) == 0 {  // chan关闭,就返回
			// Because a channel cannot be reopened, the later observation of the channel
			// being not closed implies that it was also not closed at the moment of the
			// first observation. We behave as if we observed the channel at that moment
			// and report that the receive cannot proceed.
			return
		}
		// The channel is irreversibly closed. Re-check whether the channel has any pending data
		// to receive, which could have arrived between the empty and closed checks above.
		// Sequential consistency is also required here, when racing with such a send.
		if empty(c) { // 如果chan为空
			// The channel is irreversibly closed and empty.
            // // channel 不可逆的关闭了且为空
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)
    // chan 关闭了,清理缓冲区
	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}
    //  找到一个等待的发件人。如果缓冲区大小为 0,则直接从发送方接收值。否则,从队列的头部接收 
    // 并将发送者的值添加到队列的尾部(两者都映射到 
    // 相同的缓冲区槽,因为队列已满)
      // 如果是无缓冲队列,直接从发送方取值
      // 如果是待缓冲的区,就从缓冲区头部获取值,并将发送着的值保存在缓冲区后
	if sg := c.sendq.dequeue(); sg != nil {
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}
    // 没有发送的协程,但是缓冲区有元素,直接获取缓冲区头部的值
	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

	if !block {
		unlock(&c.lock)
		return false, false
	}
    // 当没有发送数据的的协程,且缓冲区值,就将接收的协程放入等待队列中
	// no sender available: block on this channel.
	gp := getg()  // 获取当前接收协程
	mysg := acquireSudog() // 创建等待队列
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg 
    // 将接送写协程与等待队列绑定
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
    // 放入在协程的等待队列中
	c.recvq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    //当接收协程被唤醒时,解除阻塞状态
	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg) // 释放等待队列内存
	return true, success
}

  • channel 发送数据总结
  1. 判断chan是否初始化,若没有初始化,接收channel数据将阻塞
  2. 检查chan是否为空,是否关闭
  3. 当有发送协程,如果是无缓冲队列,直接从发送方取值,如果是待缓冲的区,就从缓冲区头部获取值,并将发送着的值保存在缓冲区后
  4. 当没有发送协程,但是有缓冲区有元素,直接获取缓冲区头部的值
  5. 当没有发送数据的的协程,且缓冲区值,就将接收的协程放入等待队列中
  6. 当接收协程被唤醒时,解除阻塞状态,释放等待队列内存

channel 关闭

  • close(chan)
chan 关闭源码如下:
    func closechan(c *hchan) {
        // 判断chan是否初始化,没有初始化,关闭没有初始化的chan,直接panic
    	if c == nil {
    		panic(plainError("close of nil channel"))
    	}
        // 判断chan是否也被关闭,关闭也关闭的chan,也会发送panic
    	lock(&c.lock)
    	if c.closed != 0 {
    		unlock(&c.lock)
    		panic(plainError("close of closed channel"))
    	}
    
    	if raceenabled {
    		callerpc := getcallerpc()
    		racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
    		racerelease(c.raceaddr())
    	}
    
    	c.closed = 1
    
    	var glist gList
        // 释放所有的接收chan,并将所有的接收队列加入到待清除队列 glist 中
    	// release all readers
    	for {
    		sg := c.recvq.dequeue()
    		if sg == nil {
    			break
    		}
    		if sg.elem != nil {
    			typedmemclr(c.elemtype, sg.elem)
    			sg.elem = nil
    		}
    		if sg.releasetime != 0 {
    			sg.releasetime = cputicks()
    		}
    		gp := sg.g
    		gp.param = unsafe.Pointer(sg)
    		sg.success = false
    		if raceenabled {
    			raceacquireg(gp, c.raceaddr())
    		}
    		glist.push(gp)
    	}
        // 释放所有的发送chan,发送者的等待队列 sendq 中的 sudog 放入待清除队列 glist 中
    	// release all writers (they will panic)
    	for {
    		sg := c.sendq.dequeue()
    		if sg == nil {
    			break
    		}
    		sg.elem = nil
    		if sg.releasetime != 0 {
    			sg.releasetime = cputicks()
    		}
    		gp := sg.g
    		gp.param = unsafe.Pointer(sg)
    		sg.success = false
    		if raceenabled {
    			raceacquireg(gp, c.raceaddr())
    		}
    		glist.push(gp)
    	}
    	unlock(&c.lock)
    最后会为所有被阻塞的 goroutine 调用 goready 触发调度。将所有 glist 中的 
    goroutine 状态从 _Gwaiting 设置为 _Grunnable 状态,等待调度器的调度。
    	// Ready all Gs now that we've dropped the channel lock.
    	for !glist.empty() {
    		gp := glist.pop()
    		gp.schedlink = 0
    		goready(gp, 3)
    	}
    }
  • channel 关闭总结
  1. 判断chan是否初始化,没有初始化,关闭没有初始化的chan,直接panic
  2. 判断chan是否也被关闭,关闭也关闭的chan,也会发送panic
  3. 先释放所有的接收chan,并将所有的接收队列加入到待清除队列 glist 中
  4. 释放所有的发送chan,发送者的等待队列 sendq 中的 sudog 放入待清除队列 glist 中
  5. 最后会为所有被阻塞的 goroutine 调用 goready 触发调度。将所有 glist 中的goroutine 状态从 _Gwaiting 设置为 _Grunnable 状态,等待调度器的调度。
Licensed under CC BY-NC-SA 4.0
Built with Hugo
Theme Stack designed by Jimmy