golang源码解析之chan


导语在go语言中,chan 和 goroutine 是其并发模型CSP最重要体现,本文将基于1.14版本,深入源码,尽可能详细分析其内部实现原理。

一、为什么要使用chan

在并发线程中通信一般来说有两种模型:共享内存和消息传递。
常见的共享内存方式涉及到数据竞争这些问题,引入到锁、原子操作来解决。而基于消息传递的方式保证了不会产生数据竞争状态。
其中,实现消息传递有两种常见的类型:基于channel的消息传递和基于actor的消息传递。
而golang,就是基于channel的代表语言。erlang则是基于actor的代表语言。
在CSP(communicating sequential process)中,它将channel列为第一类对象,它不关注发送消息的实体,而是关注发送消息时使用的channel。golang则是基于这篇论文中的部分理论诞生的,也就是理论中的Process/channel:process和channel没有从属关系,process可以消费任意个channel,而channel也不关心具体是哪个process在使用它进行通信;process之间依据channel进行消息传递,形成一套有序阻塞和可预测的并发模型。对应到golang中,process就是goroutine,channel就是chan
备注CSP理论模型电子版链接:http://www.usingcsp.com/cspbook.pdf ,作者Tony Hoare

二、chan是怎样实现的

敲黑板:chan的实质是一个队列
如果你创建的是一个带缓冲的chan,chan就是一个循环队列,如果不带缓冲就是一个普通的队列。
src/runtime/chan.go中,定义了一个结构体:hchan,还实现了一些方法:makechan、chansend、chanrecv、closechan,我们所使用的chan的最主要的功能,包括chan的创建,向chan写数据,读chan中的数据,关闭chan,就是围绕这个结构体和这几个方法实现的。我们接下来的内容,也主要围绕它们展开。

hchan

源码如下:

type hchan struct {qcount   uint           // total data in the queuedataqsiz uint           // size of the circular queuebuf      unsafe.Pointer // points to an array of dataqsiz elementselemsize uint16closed   uint32elemtype *_type // element typesendx    uint   // send indexrecvx    uint   // receive indexrecvq    waitq  // list of recv waiterssendq    waitq  // list of send waiters// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.//// Do not change another G's status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking.lock mutex
}

qcount:buf数组中已经放入的元素个数
dataqsize:buf数组长度,创建时调用make指定
buf:buf 数组
elemsize:buf数组中每个元素的大小
closed:chan是否关闭, 0代表没有关闭
elemtype:chan中元素的类型
sendx:buf数组中以发送的索引位置,用以构造循环队列
recvx:buf数组中已接收的索引位置,用以构造循环队列
recvq:等待接收的goroutine,当chan中buf无数据并且无sendq时但有goroutine等待消费时会产生,实质是包含goroutine及有关信息的sudog,多个recvq会形成链表,依然是FIFO的标准队列
sendq:等待发送的goroutine,当chan中buf数据写满时但仍然有goroutine等待写入时会产生,实质是包含goroutine及有关信息的sudog,多个sendq会形成链表,依然是FIFO的标准队列。
lock:锁,用以保证chan中数据的顺序通信

makechan

源码如下:

func makechan(t *chantype, size int) *hchan {elem := t.elem// compiler checks this but be safe. 校验数据类型大小,大于1<<16(65536)异常if elem.size >= 1<<16 {throw("makechan: invalid channel element type")}//内存对齐(多平台兼容,降低维度提高速度,减少内存消耗),大于最大内次8字节时异常if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")}//判断所需空间是否大于堆可分配的最大内存mem, overflow := math.MulUintptr(elem.size, uintptr(size))if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.// buf points into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.var c *hchanswitch {//size为0,分配hchan结构体空间case mem == 0:// Queue or element size is zero.c = (*hchan)(mallocgc(hchanSize, nil, true))// Race detector uses this location for synchronization.c.buf = c.raceaddr()//不包括指针,分配连续地址空间,包括hchan结构体+数据,将申请下来的地址首地址赋值给buf,便于GC回收,减小gc压力case elem.ptrdata == 0:// Elements do not contain pointers.// Allocate hchan and buf in one call.c = (*hchan)(mallocgc(hchanSize+mem, nil, true))c.buf = add(unsafe.Pointer(c), hchanSize)//包括指针,buf单独分配空间default:// Elements contain pointers.c = new(hchan)c.buf = mallocgc(mem, elem, true)}c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")}return c
}

注意makechan 返回的是hchan指针,这也就是为什么chan是golang中的引用类型,传递的是指针而非值

chansend

源码如下:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {//检测chan是否为空,为空报错,所以往一个nil的chan中写数据,程序会异常退出报错if c == nil {//如果是非阻塞的,返回false,不会触发if !block {return false}//如果是阻塞的goroutine停止gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)throw("unreachable")}if debugChan {print("chansend: chan=", c, "\n")}//开启竞争检测if raceenabled {racereadpc(c.raceaddr(), callerpc, funcPC(chansend))}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not closed, we observe that the channel is// not ready for sending. Each of these observations is a single word-sized read// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).// Because a closed channel cannot transition from 'ready for sending' to// 'not ready for sending', even if the channel is closed between the two observations,// they imply a moment between the two when the channel was both not yet closed// and not ready for sending. We behave as if we observed the channel at that moment,// and report that the send cannot proceed.//// It is okay if the reads are reordered here: if we observe that the channel is not// ready for sending and then observe that it is not closed, that implies that the// channel wasn't closed during the first observation.//如果size = 0 或者 缓冲满了,返回false,不会触发block传入时值为trueif !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {return false}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}//chan加锁lock(&c.lock)//往关闭了的chan写数据,直接panicif c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}
//看接收者是否为空,如果为空,说明buf一定为空,直接取接受者队列队首sudog,把数据发给它并且释放锁。if sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}
//如果buf还有空位,将数据写入buf数组中if c.qcount < c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.qp := chanbuf(c, c.sendx)if raceenabled {raceacquire(qp)racerelease(qp)}typedmemmove(c.elemtype, qp, ep)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++unlock(&c.lock)return true}if !block {unlock(&c.lock)return false}// Block on the channel. Some receiver will complete our operation for us.//获取当前goroutinegp := getg()//创建sudogmysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.//sudog赋值mysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nil//将sudog加入sendq链表中c.sendq.enqueue(mysg)//将当前goroutine陷入沉睡gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)// Ensure the value being sent is kept alive until the// receiver copies it out. The sudog has a pointer to the// stack object, but sudogs aren't considered as roots of the// stack tracer.KeepAlive(ep)//再次唤醒,说明数据已经发送出去了,写入buf,或者被接收者消费// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif gp.param == nil {if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}gp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)return true
}

send函数

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {//不会触发,默认为falseif raceenabled {if c.dataqsiz == 0 {racesync(c, sg)} else {// Pretend we go through the buffer, even though// we copy directly. Note that we need to increment// the head/tail locations only when raceenabled.qp := chanbuf(c, c.recvx)raceacquire(qp)racerelease(qp)raceacquireg(sg.g, qp)racereleaseg(sg.g, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}}//数据没问题,直接调用sendDirect,将数据拷贝到目标内存地址if sg.elem != nil {sendDirect(c.elemtype, sg, ep)sg.elem = nil}//获取该goroutinegp := sg.gunlockf()gp.param = unsafe.Pointer(sg)if sg.releasetime != 0 {sg.releasetime = cputicks()}//将该goroutine放入到p的runnext中,等待下次直接调度goready(gp, skip+1)
}

chanrecv

源码如下:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// raceenabled: don't need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan {print("chanrecv: chan=", c, "\n")}//如果从nil的chan中读数据,报错,程序退出if c == nil {if !block {return}gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not ready for receiving, we observe that the// channel is not closed. Each of these observations is a single word-sized read// (first c.sendq.first or c.qcount, and second c.closed).// Because a channel cannot be reopened, the later observation of the channel// being not closed implies that it was also not closed at the moment of the// first observation. We behave as if we observed the channel at that moment// and report that the receive cannot proceed.//// The order of operations is important here: reversing the operations can lead to// incorrect behavior when racing with a close.//不会触发if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&atomic.Load(&c.closed) == 0 {return}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)//向已经关闭的chan读数据,如果buf为空,返回false不会报错和panic,如果buf不为空,仍然能读取到数据if c.closed != 0 && c.qcount == 0 {if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}//如果发送者队列不为空,存在两种情况,第一种是不带buf,直接赋值,第二种是buf已满,这个时候需要取出buf中recvx位置数据,交给当前goroutine消费,并且把发送者队列队首数据写入buf中if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}
//buf中有数据,从buf中拿数据if c.qcount > 0 {// Receive directly from queueqp := chanbuf(c, c.recvx)if raceenabled {raceacquire(qp)racerelease(qp)}if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--unlock(&c.lock)return true, true}if !block {unlock(&c.lock)return false, false}// no sender available: block on this channel.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nilc.recvq.enqueue(mysg)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}closed := gp.param == nilgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, !closed
}

recv

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {//不带buf情况if c.dataqsiz == 0 {if raceenabled {racesync(c, sg)}if ep != nil {// copy data from senderrecvDirect(c.elemtype, sg, ep)}} else {//buf已满情况// Queue is full. Take the item at the// head of the queue. Make the sender enqueue// its item at the tail of the queue. Since the// queue is full, those are both the same slot.qp := chanbuf(c, c.recvx)if raceenabled {raceacquire(qp)racerelease(qp)raceacquireg(sg.g, qp)racereleaseg(sg.g, qp)}// copy data from queue to receiverif ep != nil {typedmemmove(c.elemtype, ep, qp)}// copy data from sender to queuetypedmemmove(c.elemtype, qp, sg.elem)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}sg.elem = nilgp := sg.gunlockf()gp.param = unsafe.Pointer(sg)if sg.releasetime != 0 {sg.releasetime = cputicks()}goready(gp, skip+1)
}

closechan

源码如下:

func closechan(c *hchan) {//关闭一个nil的chan,直接panicif c == nil {panic(plainError("close of nil channel"))}lock(&c.lock)//关闭一个已经关闭了的chan,直接panicif c.closed != 0 {unlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {callerpc := getcallerpc()racewritepc(c.raceaddr(), callerpc, funcPC(closechan))racerelease(c.raceaddr())}//将closed置为非零c.closed = 1var glist gList//清理所有的数据,包括recvq,sendq// release all readersfor {sg := c.recvq.dequeue()if sg == nil {break}if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = nilif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// release all writers (they will panic)for {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = nilif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}unlock(&c.lock)// Ready all Gs now that we've dropped the channel lock.for !glist.empty() {gp := glist.pop()gp.schedlink = 0goready(gp, 3)}
}

golang源码解析之chan相关推荐

  1. 【golang源码分析】chan底层原理——附带读写用户队列的环形缓冲区

    1 环形缓冲区 1.1 环形缓冲区结构 环形缓冲区通常有一个读指针和一个写指针.读指针指向环形缓冲区中可读的数据,写指针指向环形缓冲区中可写的缓冲区.通过移动读指针和写指针就可以实现缓冲区的数据读取和 ...

  2. Go源码解析——Channel篇

    channel.map.slice作为golang的核心三剑客,对于使用golang作为主语言完成开发工作的程序猿来说是非常重要的.了解其设计和源码是使用的基础,因此笔者本专题会对这三种数据结构的源码 ...

  3. loraserver 源码解析 (四) lora-gateway-bridge

    lora-gateway-bridge  负责接收 gateway 通过 udp 发送的 packet-forwarder 数据 然后通过 MQTT broker 将报文转发给 LoRa Server ...

  4. Colly源码解析——结合例子分析底层实现

    通过<Colly源码解析--框架>分析,我们可以知道Colly执行的主要流程.本文将结合http://go-colly.org上的例子分析一些高级设置的底层实现.(转载请指明出于break ...

  5. golang源码分析-启动过程概述

    golang源码分析-启动过程概述 golang语言作为根据CSP模型实现的一种强类型的语言,本文主要就是通过简单的实例来分析一下golang语言的启动流程,为深入了解与学习做铺垫. golang代码 ...

  6. kube-proxy源码解析

    kube-proxy源码解析 ipvs相对于iptables模式具备较高的性能与稳定性, 本文讲以此模式的源 码解析为主,如果想去了解iptables模式的原理,可以去参考其实现,架构上无差别. ku ...

  7. Cilium创建pod network源码解析

    01 Overview 我们生产K8s使用容器网络插件 Cilium 来创建 Pod network,下发 eBPF 程序实现 service 负载均衡来替换 kube-proxy,并且使用 BGP ...

  8. Kubernetes学习笔记之Calico CNI Plugin源码解析(二)

    女主宣言 今天小编继续为大家分享Kubernetes Calico CNI Plugin学习笔记,希望能对大家有所帮助. PS:丰富的一线技术.多元化的表现形式,尽在"360云计算" ...

  9. gin 源码解析 - 详解http请求在gin中的流转过程

    本篇文章是 gin 源码分析系列的第二篇,这篇文章我们主要弄清一个问题:一个请求通过 net/http 的 socket 接收到请求后, 是如何回到 gin 中处理逻辑的? 我们仍然以 net/htt ...

  10. loraserver 源码解析 (六) lora-app-server

    目录 下载源码 升级 npm 安装一些必要的依赖库 pq_trgm extension run 调用 handleDataDownPayloads 开启一个Goroutine  G1 run再调用 s ...

最新文章

  1. 华为热设计工程师待遇_华为给985毕业生开2万元的工资,是侮辱价?还是不自量力?...
  2. 微软最新启动了一个 I'm 活动
  3. Bootstrap简介--目前最受欢迎的前端框架(一)
  4. ASP.NET知识点:母版页的路径问题
  5. JSON数据与JavaScript对象转换
  6. 初识莫队——小Z的袜子
  7. 值得收藏的JSP连接mysql数据库的例子
  8. 《视频直播技术详解》系列之七:现代播放器原理
  9. hive insert报错return code 1 from org.apache.hadoop.hive.ql.exec.StatsTask (state=08S01,code=1)
  10. 598. 范围求和 II
  11. 十大建筑中的数学之美
  12. SpringBoot面试题第一弹
  13. 致openGauss社区用户的一封信
  14. 【持续更新】一些常用的网站分享(智能教育装备、智能机器人行业)
  15. 输入表重建工具ImportREC
  16. JAVA练习题---银行账户管理系统
  17. Java 细胞分裂问题
  18. 鸡得呼吸道病会易发啥病 鸡喂什么药预防打喷嚏
  19. 2011年北邮各组组线
  20. 硬件MSB最高位优先、LSB最低位优先的CRC计算原理详细解释和程序,正算反算成功等效,DS18B20和HTU31D传感器CRC

热门文章

  1. 2019Java面试题
  2. 全国计算机将文件属性隐藏,一键玩转隐藏属性文件
  3. 笑声的音效素材,几百个你想要的都在这
  4. Jzoj4699 Password
  5. Mastermind游戏
  6. Android 渠道抽成,内容为主,渠道为辅,国内Android商店何时才能调整分成比?
  7. 【SpringBoot+Mybatis】bootstrap/sematic UI与pagehelper实现分页
  8. 专硕计算机考研英语一还是二,学硕只会考英语一?专硕只会考英语二?
  9. 论文笔记SKEP: Sentiment Knowledge Enhanced Pre-training for Sentiment Analysis
  10. 婚宴座位图html5,婚宴座位图模版欣赏【婚礼纪】