go语言阻塞唤醒底层实现之sync_runtime_SemacquireMutex和runtime_Semrelease

  • sync_runtime_SemacquireMutex
    • semacquire1
      • cansemacquire
      • semaRoot 平衡树
  • sync_runtime_Semrelease

sync_runtime_SemacquireMutex 和 runtime_Semrelease这两个函数是在go的mutex中 分别是让当前goroutine沉睡和苏醒的方法。来看一下这两个方法的实现

sync_runtime_SemacquireMutex

先看一下官方的解释

// SemacquireMutex is like Semacquire, but for profiling contended Mutexes.
// If lifo is true, queue waiter at the head of wait queue.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_SemacquireMutex's caller.
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)

从注释看出来s 是一个地址,这个是后面为了分配到平衡树用的,lifo 是用来判断是放在头部,skipframes 是跳过tracing 。

然后这个实现是在runtime/sema.go中,是通过 go:linkname 编译的时候指过去的。

const (semaBlockProfile semaProfileFlags = 1 << iotasemaMutexProfile
)//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}

semaBlockProfile 和 semaMutexProfile 从字面意思看应该是是否进行采样,应该是给pprof去使用的。
然后看一下semacquire1这个方法,主要是调用的这个方法。

semacquire1

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {// 获取当前的ggp := getg()if gp != gp.m.curg {throw("semacquire not on the G stack")}// Easy case. 根据信号量判断是否可以获取锁if cansemacquire(addr) {return}// Harder case://    increment waiter count//    try cansemacquire one more time, return if succeeded//  enqueue itself as a waiter//    sleep// (waiter descriptor is dequeued by signaler)// 获取一个sudog的链表s := acquireSudog()// 根据addr的地址去需要对应的树的节点 和初始化变量root := semtable.rootFor(addr)t0 := int64(0)s.releasetime = 0s.acquiretime = 0s.ticket = 0if profile&semaBlockProfile != 0 && blockprofilerate > 0 {t0 = cputicks()s.releasetime = -1}if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {if t0 == 0 {t0 = cputicks()}s.acquiretime = t0}// 进行加锁for {// 加锁 注意是底层线程也就是M lockWithRank(&root.lock, lockRankRoot)// Add ourselves to nwait to disable "easy case" in semrelease.// 等待的加一atomic.Xadd(&root.nwait, 1)// 判断是否能获得锁// Check cansemacquire to avoid missed wakeup.if cansemacquire(addr) {atomic.Xadd(&root.nwait, -1)unlock(&root.lock)break}// Any semrelease after the cansemacquire knows we're waiting// (we set nwait above), so go to sleep.// 加入队列root.queue(addr, s, lifo)// 调用gopark阻塞当前ggoparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)// 恢复运行 判断是否可以拿到信号量 if s.ticket != 0 || cansemacquire(addr) {break}}if s.releasetime > 0 {blockevent(s.releasetime-t0, 3+skipframes)}// 释放sudogreleaseSudog(s)
}

因为sodug之前在chan里面说过,这里主要比较重要的方法是cansemacquire,lock和unlock方法。

cansemacquire

这个就是根据信号量判断是否可以拿到锁,本质是一个CAS的操作

// 可以看出来这个就是CAS的操作
func cansemacquire(addr *uint32) bool {for {v := atomic.Load(addr)if v == 0 {return false}if atomic.Cas(addr, v, v-1) {return true}}
}

接下来就是go里面的底层的lock和unlock,需要注意的是这个基于底层线程的。需要注意的是上面使用的是lockWithRank,然后调用的是lock2。

func lockWithRank(l *mutex, rank lockRank) {lock2(l)
}
func lock2(l *mutex) {// 获取当前ggp := getg()if gp.m.locks < 0 {throw("runtime·lock: lock count")}gp.m.locks++// Speculative grab for lock.// 判断是否可以获取锁if atomic.Casuintptr(&l.key, 0, locked) {return}//创建线程锁 调用的是// pthread_mutex_init// pthread_cond_initsemacreate(gp.m)// On uniprocessor's, no point spinning.// On multiprocessors, spin for ACTIVE_SPIN attempts.// 这个是判断是不是单核 如果是多核那么自旋 如果是单核 直接沉睡等待唤醒spin := 0if ncpu > 1 {spin = active_spin}
Loop:for i := 0; ; i++ {// 尝试能不能加锁v := atomic.Loaduintptr(&l.key)if v&locked == 0 {// Unlocked. Try to lock.if atomic.Casuintptr(&l.key, v, v|locked) {return}i = 0}// 自旋if i < spin {procyield(active_spin_cnt)} else if i < spin+passive_spin {osyield()} else {// Someone else has it.// l->waitm points to a linked list of M's waiting// for this lock, chained through m->nextwaitm.// Queue this M.// 这边nextwaitm是一个链表 然后先l.key里面的其他M的地址// 放到nextwaitm中 然后自己当前的地址存到l.key// 这样解锁的时候根据l.key能找到需要恢复的Mfor {gp.m.nextwaitm = muintptr(v &^ locked)if atomic.Casuintptr(&l.key, v, uintptr(unsafe.Pointer(gp.m))|locked) {break}v = atomic.Loaduintptr(&l.key)if v&locked == 0 {continue Loop}}// 说明上面的操作成功了 那么当前的M可以沉睡了 if v&locked != 0 {// Queued. Wait.//调用semasleep(-1)i = 0}}}
}

看一下semasleep这个方法

//go:nosplit
func semasleep(ns int64) int32 {var start int64if ns >= 0 {start = nanotime()}// 先把底层的m加锁mp := getg().mpthread_mutex_lock(&mp.mutex)for {// 尝试的次数if mp.count > 0 {mp.count--pthread_mutex_unlock(&mp.mutex)return 0}// 等待的时间if ns >= 0 {spent := nanotime() - startif spent >= ns {pthread_mutex_unlock(&mp.mutex)return -1}var t timespect.setNsec(ns - spent)err := pthread_cond_timedwait_relative_np(&mp.cond, &mp.mutex, &t)if err == _ETIMEDOUT {pthread_mutex_unlock(&mp.mutex)return -1}} else {// 释放锁并且等待唤醒pthread_cond_wait(&mp.cond, &mp.mutex)}}
}

所以底层的线程加锁就是CAS尝试获取锁,然后通过pthread_cond_wait等待唤醒,并且把自己的地址存在共享变量上面,等待唤醒。

然后看一下解锁的方法,这里是unlockWithRank

func unlockWithRank(l *mutex) {unlock2(l)
}
// We might not be holding a p in this code.
//
//go:nowritebarrier
func unlock2(l *mutex) {gp := getg()var mp *mfor {// 获取共享变量v := atomic.Loaduintptr(&l.key)// 如果是locked 说明没有被存入其他M的内存地址 那么释放后跳出去if v == locked {if atomic.Casuintptr(&l.key, locked, 0) {break}} else {// Other M's are waiting for the lock.// Dequeue an M.// 说明这个key存了其他m的地址  获取到然后唤醒mp = muintptr(v &^ locked).ptr()if atomic.Casuintptr(&l.key, v, uintptr(mp.nextwaitm)) {// Dequeued an M.  Wake it.// 其实就是调用pthread_cond_signal 进行唤醒semawakeup(mp)break}}}gp.m.locks--if gp.m.locks < 0 {throw("runtime·unlock: lock count")}if gp.m.locks == 0 && gp.preempt { // restore the preemption request in case we've cleared it in newstackgp.stackguard0 = stackPreempt}
}
//go:nosplit
func semawakeup(mp *m) {// 加锁pthread_mutex_lock(&mp.mutex)mp.count++if mp.count > 0 {// 唤醒其他等待的线程pthread_cond_signal(&mp.cond)}// 解锁pthread_mutex_unlock(&mp.mutex)
}

semaRoot 平衡树

看一下这个平衡树,可以看出semTable是一个251的数组。然后semaRoot的成员treap 就是所有等待的go。

// A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem).
// Each of those sudog may in turn point (through s.waitlink) to a list
// of other sudogs waiting on the same address.
// The operations on the inner lists of sudogs with the same address
// are all O(1). The scanning of the top-level semaRoot list is O(log n),
// where n is the number of distinct addresses with goroutines blocked
// on them that hash to the given semaRoot.
// See golang.org/issue/17953 for a program that worked badly
// before we introduced the second level of list, and
// BenchmarkSemTable/OneAddrCollision/* for a benchmark that exercises this.
type semaRoot struct {lock  mutextreap *sudog // root of balanced tree of unique waiters.nwait uint32 // Number of waiters. Read w/o the lock.
}var semtable semTable// Prime to not correlate with any user patterns.
const semTabSize = 251type semTable [semTabSize]struct {root semaRootpad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}func (t *semTable) rootFor(addr *uint32) *semaRoot {return &t[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}

sync_runtime_Semrelease

这个和上面一样也是通过go:linkname 编译期间关联上。还是一样先看看官方实现.这个如果上面的加锁可以看懂,这个也是类似的。

func semrelease1(addr *uint32, handoff bool, skipframes int) {// 获取当前地址的平衡树root := semtable.rootFor(addr)// 加1 让后续加锁方直接走easy逻辑atomic.Xadd(addr, 1)// Easy case: no waiters?// This check must happen after the xadd, to avoid a missed wakeup// (see loop in semacquire).// 如果没有等待者直接返回if atomic.Load(&root.nwait) == 0 {return}// Harder case: search for a waiter and wake it.// 加锁lockWithRank(&root.lock, lockRankRoot)// 加锁再判断一次if atomic.Load(&root.nwait) == 0 {// The count is already consumed by another goroutine,// so no need to wake up another goroutine.unlock(&root.lock)return}// 从这个地址获取等待队列s, t0 := root.dequeue(addr)if s != nil {atomic.Xadd(&root.nwait, -1)}unlock(&root.lock)if s != nil { // May be slow or even yield, so unlock firstacquiretime := s.acquiretimeif acquiretime != 0 {mutexevent(t0-acquiretime, 3+skipframes)}if s.ticket != 0 {throw("corrupted semaphore ticket")}if handoff && cansemacquire(addr) {s.ticket = 1}// 调用goready 恢复readyWithTime(s, 5+skipframes)if s.ticket == 1 && getg().m.locks == 0 {// Direct G handoff// readyWithTime has added the waiter G as runnext in the// current P; we now call the scheduler so that we start running// the waiter G immediately.// Note that waiter inherits our time slice: this is desirable// to avoid having a highly contended semaphore hog the P// indefinitely. goyield is like Gosched, but it emits a// "preempted" trace event instead and, more importantly, puts// the current G on the local runq instead of the global one.// We only do this in the starving regime (handoff=true), as in// the non-starving case it is possible for a different waiter// to acquire the semaphore while we are yielding/scheduling,// and this would be wasteful. We wait instead to enter starving// regime, and then we start to do direct handoffs of ticket and// P.// See issue 33747 for discussion.// 恢复运行 再调度一次goyield()}}
}

【go语言阻塞唤醒底层实现之sync_runtime_SemacquireMutex和runtime_Semrelease】相关推荐

  1. 【go语言阻塞唤醒底层实现之gopark和goready】

    go语言之阻塞唤醒底层实现 gopark mcall park_m execute gogo goready ready runqput wakep 看过channel和mutex的实现都知道,对于c ...

  2. java线程阻塞唤醒的四种方式

    java在多线程情况下,经常会使用到线程的阻塞与唤醒,这里就为大家简单介绍一下以下几种阻塞/唤醒方式与区别,不做详细的介绍与代码分析 suspend与resume Java废弃 suspend() 去 ...

  3. 深度解密Go语言之channel底层实现

    并发模型并发与并行 大家都知道著名的摩尔定律.1965 年,时任仙童公司的 Gordon Moore 发表文章,预测在未来十年,半导体芯片上的晶体管和电阻数量将每年增加一倍:1975 年,Moore ...

  4. 进程线程的调度阻塞唤醒

    1Cpu线程调度 1一个CPU最多可以运行一个进程或者一个线程,如果是双核的CPU最多可运行 两个进程或两个线程, 操作系统是多任务操作系统,他不止同时运行两个任务,可能有很多个,如word文档,QQ ...

  5. c语言指针底层实现,C语言二级指针底层实现

    C语言中,Pointers to Pointers,即二级指针. 一级指针和二级指针的值都是指向一个内存单元: 一级指针指向的内存单元存放的是源变量的值, 二级指针指向的内存单元存放的是一级指针的地址 ...

  6. 操作系统(二 )| 进程管理初探(前趋图、程序执行、进程的定义特征基本状态,进程的创建终止,阻塞唤醒,挂起激活)

    文章目录 前趋图和程序执行 前趋图 程序的顺序执行 程序的并发执行 进程的定义和特征 进程的基本状态 就绪状态 运行状态 阻塞状态 挂起状态 进程控制块(PCB) 作用: 块中信息: PCB的组织方式 ...

  7. Go语言channel的底层

    底层数据结构需要看源码,版本为 go 1.9.2: type hchan struct {// chan 里元素数量qcount uint// chan 底层循环数组的长度dataqsiz uint/ ...

  8. c语言二级指针实现队列,C语言二级指针底层实现

    C语言中,Pointers to Pointers,即二级指针. 一级指针和二级指针的值都是指向一个内存单元: 一级指针指向的内存单元存放的是源变量的值, 二级指针指向的内存单元存放的是一级指针的地址 ...

  9. java底层语言_JAVA语言思维的底层基础

    现实世界中的物体有两种存在方式:1.静止     2.运动 静止的特征---〉属性----〉属性的名称---〉属性值得类型----〉属性值 java中的数据---〉变量---〉变量的名称---〉变量的 ...

最新文章

  1. 免费ASP,PHP空间
  2. 【 Linux 】记录下第一次使linux系统遇到的问题(系统安装、配置查看、搜狗输入法安装)
  3. 使用JDK 13查看TLS配置
  4. python中字符集
  5. *SCM-MANAGER独立部署方式
  6. qnap raid5升级raid6_QNAP TS-419P组建RAID5后重建Transmission!
  7. Android支付实践(一)之支付宝支付详解与demo
  8. 区块链与java的应用开发_用 Java 开发一个区块链
  9. Linux 测试端口是否 ping 的方法
  10. mysql中DateTime、Date、Time、TimeStamp区别
  11. 实木地板被机器人弄成坑_射阳县羽毛球木地板走在行业前端
  12. 纬地服务器找不带计算机,纬地V6.9升级启动解决方法大全
  13. java怎么输出英文字母表_Java程序设计(八)----输出英文字母表、希腊字母表
  14. MySQL索引原理以及查询优化
  15. linux卸载杀毒软件clama,centos 6 安装clamav杀毒软件查毒
  16. python实现税后工资_python税后工资计算器
  17. 流媒体 3——彩色数字图像基础
  18. 最让男人受不了的40种极品女人!
  19. STM32使用光敏传感器计算光照度Lux,而不是仅仅打印个电压值或者电阻值
  20. 3dsmax 制作u型长方体

热门文章

  1. SkeyeVSS安全生产风险监测预警系统
  2. 阿里巴巴java开发手册(2020版)
  3. 应届生校招找工作完整流程总结
  4. 数据可视化——Davinci
  5. 南信大python期末试卷_南信大 软件工程期末试卷
  6. 请用python写一段绘制网络拓扑图的程序
  7. python分析财务数据用什么软件_求助公司想要做一套财务数据分析系统,用什么工具比较好?...
  8. 汇编习题之某机指令字长16位,共有单地址指令和双地址指令两类,若每个地址字段均为5位,且双地址指令已用了X条,问单地址指令最多可以有多少条?
  9. 矩阵求逆的c#代码实现
  10. matlab对外部导入的数据进行三维曲面绘制