
1. 信道是golang中的顶级公民



信道分为两种: 无缓冲和有缓冲信道(先入先出)。


无缓冲信道: 若没有反向的goroutine在做动作, 当前goroutine会阻塞;
有缓冲信道: goroutine 直接面对的是缓冲队列, 队列满则写阻塞, 队列空则读阻塞。

一个陷阱: 信道被关闭后, 原来的goroutine阻塞状态不会维系, 能从信道读取到零值。


for range可以用于信道 :
一直从指定信道中值, 没有数据会阻塞, 直到信道关闭会自动退出循环。

var ch chan int = make(chan int, 10) go func() { 	for i := 0; i < 20; i++ {  		ch <- i 	} 	close(ch) }()  time.Sleep(time.Second * 2) for ele := range ch { 	fmt.Println(ele) }  output: 0,1,2,3,4...19 



2. 信道channel实现思路大盘点


        type hchan struct {         	qcount   uint           // 队列中已有的缓存元素的数量         	dataqsiz uint           // 环形队列的容量         	buf      unsafe.Pointer // 环形队列的地址         	elemsize uint16         	closed   uint32        // 标记是否关闭,初始化为0,一旦close(ch)为1         	elemtype *_type // 元素类型         	sendx    uint   // 待发送的元素索引         	recvx    uint   // 待接受元素索引         	recvq    waitq  // 阻塞等待的读goroutine队列         	sendq    waitq  // 阻塞等待的写gotoutine队列                   	// lock protects all fields in hchan, as well as several         	// fields in sudogs blocked on this channel.         	//         	// Do not change another G's status while holding this lock         	// (in particular, do not ready a G), as this can deadlock         	// with stack shrinking.         	lock mutex         }              type waitq struct {           first *sudog           last *sudog       } 


2.1 静态全局解读


① 环形队列buf (buf、dataqsize、sendx、recvx 圈定了一个有固定长度,由读/写指针控制队列数据的环形队列),从这看出队列是以链表实现。

② 存放阻塞写G和阻塞读G的队列sendqrecvq, recvq、sendq存放的不是当前通信的goroutine, 而是因读写信道而阻塞的goroutine:

  • 如果 qcount <dataqsiz(队列未满),sendq就为空(写就不会阻塞);
  • 如果 qcount > 0 (队列不为空),recvq就为空(读就不会阻塞)。

一旦解除阻塞,读/写动作会给到先进入阻塞队列的goroutine,也就是 recvq、sendq也是先进先出。

2.2 动态解读demo


第一阶段: 写入0到9这个10个元素

  1. goroutine在写数据之后会获取锁,以确保安全地修改信道底层的hchan结构体;
  2. 向环形队列buf入队enqueue元素,实际是将原始数据拷贝进环形队列buf的待插入位置sendx
  3. 入队操作完成,释放锁。



① 基于写goroutine创建sudog, 并将其放进sendq队列中;

② 调用gopark函数,让调度器P终止该goroutine执行。

调度器P将该goroutine状态改为waiting, 并从调度器P挂载的runQueue中移除,调度器P重新出队一个G交给OS线程来执行,这就是上下文切换,G被阻塞了而不是OS线程。



第三阶段: 读前10个元素(解除写阻塞)

  1. for range chan: 读goroutine从buf中出队元素: 将信道元素拷贝到目标接收区;
  2. 写goroutine从sendq中出队,因为现在信道不满,写不会阻塞;
  3. 调度器P调用goready, 将写goroutine状态变为runnable,并移入runQueue。


体现了写信道--> 写阻塞---> 被唤醒的过程

     // 这一部分是写数据, 从这里也可以看出是点对点的覆写,原buf内队列元素不用移动, 只用关注sendx                 if c.qcount < c.dataqsiz {  // 信道未满,则写不会阻塞=>senq为空	                 qp := chanbuf(c, c.sendx)   // chanbuf(c, i) 返回的是信道buf中待插入的位置指针                 typedmemmove(c.elemtype, qp, ep)                   c.sendx++                 if c.sendx == c.dataqsiz {                      c.sendx = 0                 }                 c.qcount++                 return true         }         if !block {       // 用于select case结构中,不阻塞select case的选择逻辑                 unlock(&c.lock)                 return false         }    // 这二部分是: 构建sudog,放进写阻塞队列,阻塞当前写gooroutine的执行         // Block on the channel. Some receiver will complete our operation for us.         gp := getg()     // 获取当前的goroutine  https://go.dev/src/runtime/HACKING         mysg := acquireSudog()   // sudog是等待队列sendq中的元素,封装了goroutine         mysg.releasetime = 0         if t0 != 0 {                 mysg.releasetime = -1         }         // No stack splits between assigning elem and enqueuing mysg         // on gp.waiting where copystack can find it.         mysg.elem = ep         mysg.waitlink = nil         mysg.g = gp         mysg.isSelect = false         mysg.c = c         gp.waiting = mysg         gp.param = nil         c.sendq.enqueue(mysg)  // 当前goroutine压栈sendq         // Signal to anyone trying to shrink our stack that we're about         // to park on a channel. The window between when this G's status         // changes and when we set gp.activeStackChans is not safe for         // stack shrinking.         gp.parkingOnChan.Store(true)         reason := waitReasonChanSend          gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2)   // 这里是阻塞函数     	         KeepAlive(ep)  // 这三部分: 调度器唤醒了当前goroutine         // someone woke us up.           if mysg != gp.waiting {                 throw("G waiting list is corrupted")         }         gp.waiting = nil         gp.activeStackChans = false         closed := !mysg.success         gp.param = nil         if mysg.releasetime > 0 {                 blockevent(mysg.releasetime-t0, 2)         }         mysg.c = nil         releaseSudog(mysg)         if closed {     // 已经关闭了,再写数据会panic              if c.closed == 0 {                  throw("chansend: spurious wakeup")              }             panic(plainError("send on closed channel"))         }         return true 


getg 获取当前的goroutine,sudog是goroutine的封装,表征一个因读写信道而阻塞的G,

typedmemmove(c.elemtype, qp, ep): 写数据到信道buf,由两个指针来完成拷贝覆写。

  //  typedmemmove copies a value of type typ to dst from src.     func typedmemmove(typ *abi.Type, dst, src unsafe.Pointer) {     	if dst == src {     		return     	}     	if writeBarrier.enabled && typ.Pointers() {     		// This always copies a full value of type typ so it's safe     		// to pass typ along as an optimization. See the comment on     		// bulkBarrierPreWrite.     		bulkBarrierPreWrite(uintptr(dst), uintptr(src), typ.PtrBytes, typ)     	}     	// There's a race here: if some other goroutine can write to     	// src, it may change some pointer in src after we've     	// performed the write barrier but before we perform the     	// memory copy. This safe because the write performed by that     	// other goroutine must also be accompanied by a write     	// barrier, so at worst we've unnecessarily greyed the old     	// pointer that was in src.     	memmove(dst, src, typ.Size_)     	if goexperiment.CgoCheck2 {     		cgoCheckMemmove2(typ, dst, src, 0, typ.Size_)     	}     } 

③ 我们看上面源码的第三部分, 唤醒了阻塞的写goroutine, 但是这里貌似没有将写goroutine携带的值传递给信道或对端。


// 发现sendq有阻塞的写G,则读取,并使用该写G携带的数据填充数据 // Just found waiting sender with not closed.     if sg := c.sendq.dequeue(); sg != nil {     // Found a waiting sender. If buffer is size 0, receive value     // directly from sender. Otherwise, receive from head of queue     // and add sender's value to the tail of the queue (both map to     // the same buffer slot because the queue is full).     recv(c, sg, ep, func() { unlock(&c.lock) }, 3)     return true, true } if c.qcount > 0 {  // 如果sendq队里没有阻塞G, 则直接从队列中读值     // Receive directly from queue }  ---  {     // Queue is full. Take the item at the     // head of the queue. Make the sender enqueue     // its item at the tail of the queue. Since the     // queue is full, those are both the same slot.     qp := chanbuf(c, c.recvx)  // 拿到buf中待接受元素指针     if raceenabled {             racenotify(c, c.recvx, nil)             racenotify(c, c.recvx, sg)     }     // copy data from queue to receiver     if ep != nil {             typedmemmove(c.elemtype, ep, qp)  // 将buf中待接收元素qp拷贝到目标指针ep     }     // copy data from sender to queue     typedmemmove(c.elemtype, qp, sg.elem)  //  将阻塞sendq队列中出站的sudog携带的值写入到待插入指针。     c.recvx++     if c.recvx == c.dataqsiz {             c.recvx = 0     }     c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz }          



② 读取第一个元素,解除写阻塞: sendq写G队列会出队第一个sudog, 将其携带的元素填充进buf待插入指针sendx,因为此时sendx=recvx,故第二次typedmemmove(c.elemtype, qp, sg.elem)是合理的。

如果sendq队列没有阻塞G, 则直接从buf中读取值。

3. 不要使用共享内存来通信,而是使用通信来共享内存

常见的后端java C#标配使用共享内存来通信, 比如 mutex、lock 关键词:

golang 推荐使用通信来共享内存, 这个是怎么理解的呢?

你要想使用某块内存数据, 并不是直接共享给你, 而是给你一个信道作为访问的接口, 并且你得到的是目标数据的拷贝,由此形成的信道访问为通信方式;

而原始的目标数据的生命周期由产生这个数据的G来决定, 它甚至不用care自己是不是要被其他G获知,因此体现了解耦并发编程参与方的作用。


4. 信道的实践指南

4.1 无缓冲信道


    c := make(chan int)  // Allocate a channel.     // Start the sort in a goroutine; when it completes, signal on the channel.     go func() {         list.Sort()         c <- 1  // Send a signal; value does not matter.     }()     doSomethingForAWhile()     <-c   // Wait for sort to finish; discard sent value. 

4.2 有缓冲信道

基础实践: 信号量、限流能力


var sem = make(chan int, MaxOutstanding)   func Serve(queue chan *Request) {     for req := range queue {         req:= req         sem <- 1            go func() {   // 只会开启MaxOutstanding个并发协程             process(req)             <-sem         }()     } } 

sem 提供了限制服务端并发处理请求的信号量
queue 提供了一个客户端请求队列,起媒介/解耦的作用


多路复用是网络编程中一个耳熟能详的概念,nginx redis等高性能web、内存kv都用到了这个技术 。


我们针对上面的服务端,编写客户端请求, 独立的客户端请求被服务端Serve收敛之后, Serve就起到了多路复用的概念,在Request定义resultChan信道,就给每个客户端请求提供了独立获取请求结果的能力, 这便是一种解多路复用。

    type Request struct {         args        []int         f           func([]int) int         resultChan  chan int     }     request := &Request{[]int{3, 4, 5}, nil, make(chan int)}      func SendReq(req *Request){         // Send request         clientRequests <- request         // Wait for response.         fmt.Printf("answer: %dn", <-request.resultChan)     } 


    // 定义在服务端的处理handler     func sum(a []int) (s int) {         for _, v := range a {             s += v         }         return     }      func process(req *Request) {        req.f = sum        req.resultChan <- req.f(req.args)     } 




    var numCPU = runtime.NumCPU() // number of CPU cores      func (v Vector) DoAll(u Vector) {         c := make(chan int, numCPU)  // Buffering optional but sensible.         for i := 0; i < numCPU; i++ {             go v.DoSome(i*len(v)/numCPU, (i+1)*len(v)/numCPU, u, c)         }                  for i := 0; i < numCPU; i++ {             <-c    // wait for one task to complete         }         // All done.     } 



最后给出了信道的常规实践, 解读了一些常规姿势的上层思路来源。

