文章目录

  • 互斥锁 Mutex
    • 拷贝使用 Mutex 的问题
  • 读写锁 RWMutex
  • 条件变量 Cond
  • 等待组 WaitGroup
  • 仅执行一次 Once
  • 原子操作
  • 其他

上一篇 《原生并发 goroutine channel 和 select 常见使用场景》 介绍了基于 CSP 模型的并发方式。

除了 CSP,Go 通过 sync 包以及 atomic 包还提供了一些更底层的同步 API,一般用于性能要求比较高的场景。

sync.Mutex 实现的同步机制的性能要比 channel 实现的高出三倍多。

在 sync/mutex.go 中,有这么一段注释:

// Package sync provides basic synchronization primitives such as mutual
// exclusion locks. Other than the Once and WaitGroup types, most are intended
// for use by low-level library routines. Higher-level synchronization is
// better done via channels and communication.
//
// Values containing the types defined in this package should not be copied.

注释的大概意思是,sync 包提供的是底层并发原语,一般给底层库用的,如果是上层业务同步,最好还是使用 channel。

还有一点,在日常编码中,不要使用拷贝的并发包对象。首先拷贝的对象,和原本不是一个内容,状态不一致;其次如果被拷贝的锁对象已经是锁定状态,可能会导致副本也是锁定状态,超出预期。

如果需要多处使用,可以使用全局变量或者指针传递(& 创建,* 使用)。

只有拥有数据对象所有权(从 channel 接收到该数据)的 Goroutine 才可以对该数据对象进行状态变更。

互斥锁 Mutex

Mutex 和 RMMutex 的作用和 Java 里的类似,主要来看下 API 和基本实现。

使用:

    mu := sync.Mutex{}mu.Lock()       //加互斥锁mu.Unlock()

实现:

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {state int32sema  uint32
}

实现很简单,一个是状态,另一个是信号量。

拷贝使用 Mutex 的问题

来通过代码看一下拷贝使用 sync.Mutex 的问题。

var num int = 1
func testCopyMutex() {mu := sync.Mutex{}waitGroup := sync.WaitGroup{}waitGroup.Add(1)go func(copyMu sync.Mutex) {copyMu.Lock()num = 100fmt.Println("update num from sub-goroutine: ", num)time.Sleep(5 * time.Second)fmt.Println("read num from sub-goroutine: ", num)copyMu.Unlock()waitGroup.Done()}(mu)time.Sleep(time.Second)mu.Lock()num = 1fmt.Println("read num from main: ", num)mu.Unlock()waitGroup.Wait()
}

上面的代码中,我们子 goroutine 里先加锁,然后修改了 num,在等待 5s 后,输出了 num 的值,然后才释放锁;在这期间,主 goroutine 里会尝试获取锁,然后修改 num。

我们期望的是传递的同一个 Mutex,那子 goroutine 里在释放锁之前,num 都是它修改后的值,但允许的结果却让人意外:

update num from sub-goroutine:  100
read num from main:  1
read num from sub-goroutine:  1

可以看到,在真正运行时,子 goroutine 加锁的时间内,主 goroutine 居然也可以访问到 num。

问题的原因就在于传递的是 Mutex 的值。

如果改成传递指针,结果就会符合预期:

    go func(copyMu *sync.Mutex) {   //参数类型加 *copyMu.Lock()num = 100fmt.Println("update num from sub-goroutine: ", num)time.Sleep(5 * time.Second)fmt.Println("read num from sub-goroutine: ", num)copyMu.Unlock()waitGroup.Done()}(&mu)  //传递指针

为什么传递值会有问题呢?

原因在于,Mutex 复制后,状态分离。子 goroutine 对副本加锁,主 goroutine 感知不到,因为它们使用的不是同一份数据了!

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {// Fast path: grab unlocked mutex.if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {if race.Enabled {race.Acquire(unsafe.Pointer(m))}return}// Slow path (outlined so that the fast path can be inlined)m.lockSlow()
}

读写锁 RWMutex

使用:

    rmu := sync.RWMutex{}rmu.RLock()     //读锁rmu.RUnlock()rmu.Lock()      //写锁rmu.Unlock()rmu.RLocker().Lock()        //通过 RLock 实现rmu.RLocker().Unlock()

实现:

// A RWMutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer.
// The zero value for a RWMutex is an unlocked mutex.
//
// A RWMutex must not be copied after first use.
//
// If a goroutine holds a RWMutex for reading and another goroutine might
// call Lock, no goroutine should expect to be able to acquire a read lock
// until the initial read lock is released. In particular, this prohibits
// recursive read locking. This is to ensure that the lock eventually becomes
// available; a blocked Lock call excludes new readers from acquiring the
// lock.
type RWMutex struct {w           Mutex  // held if there are pending writerswriterSem   uint32 // semaphore for writers to wait for completing readersreaderSem   uint32 // semaphore for readers to wait for completing writersreaderCount int32  // number of pending readersreaderWait  int32  // number of departing readers
}

可以看到,RWMutex 的成员有一个互斥锁(用于在写入时获取),读写者的信号量,读者数量等。

RWMutext 读不阻塞读,但会阻塞写。

// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {if race.Enabled {_ = rw.w.staterace.Disable()}// First, resolve competition with other writers.rw.w.Lock()// Announce to readers there is a pending writer.r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders// Wait for active readers.if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {runtime_SemacquireMutex(&rw.writerSem, false, 0)}if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(&rw.readerSem))race.Acquire(unsafe.Pointer(&rw.writerSem))}
}

在调用写锁时,就是获取其中的互斥锁。

再多废话一句:读写锁适合并发量级比较大,且读的次数大于写的情况。

互斥锁和读写锁的注意点:

  • 减少锁的范围
  • 千千万万记得 unlock,可以早点写 defer unlock,避免忘记

条件变量 Cond

sync.Cond,在需要“等待某个条件成立”的场景下使用,使用的很少。

支持多个 goroutine 等待某个条件,等条件允许后,广播唤醒这些 goroutine 执行。

比起轮训互斥的状态,条件变量消耗资源更小,实现也更简单。

使用:

    cond := sync.NewCond(&sync.Mutex{}) 参数为 sync.Locker 接口类型go func() {//    cond.L.Lock()//    for !condition() {cond.Wait()      //等待,一般放在循环里,查询一次,不满足就阻塞(释放锁),等被唤醒后,再检查下条件//    }//    ... make use of condition ...//    cond.L.Unlock()}()cond.L.Lock()   //获取构造传入的锁cond.Broadcast()    //通知所有等待的 goroutine,从 Wait 返回,重新获取锁cond.Signal()   //通知一个cond.L.Unlock()

sync.Cond.Wait 一般结合 for 循环使用,反复检查条件是否满足。

实现:

// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
//
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.
type Cond struct {noCopy noCopy// L is held while observing or changing the conditionL Lockernotify  notifyListchecker copyChecker
}// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {return &Cond{L: l}
}

可以看到,条件主要由一个锁和一个等待唤醒的队列组成。

func (c *Cond) Wait() {c.checker.check()t := runtime_notifyListAdd(&c.notify)c.L.Unlock()runtime_notifyListWait(&c.notify, t)c.L.Lock()
}

在等待一个条件时,会先加入等待队列,然后释放这个条件的锁。

func (c *Cond) Broadcast() {c.checker.check()runtime_notifyListNotifyAll(&c.notify)
}

广播时,会通知所有等待的 goroutine 恢复执行 Wait 里的逻辑,重新申请获取锁。

等待组 WaitGroup

在需要等待多个 goroutine 完成任务后继续执行的场景,可以使用 sync.WaitGroup,和 Java 的 CountDownLaunch 类似。

使用:

    waitGroup := sync.WaitGroup{}waitGroup.Add(1)        //需要等待数为 1go func() {waitGroup.Done()    //减去需要等待数}()waitGroup.Wait()    //等待数为 0 才继续执行,循环检查

实现:

type WaitGroup struct {noCopy noCopy// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.// 64-bit atomic operations require 64-bit alignment, but 32-bit// compilers do not ensure it. So we allocate 12 bytes and then use// the aligned 8 bytes in them as state, and the other 4 as storage// for the sema.state1 [3]uint32
}

可以看到,WaitGroup 核心就是一个计数的 state,高位 32 位为数量,低位 32 位为等待的数量。

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {statep, semap := wg.state()if race.Enabled {_ = *statep // trigger nil deref earlyrace.Disable()}for {state := atomic.LoadUint64(statep)v := int32(state >> 32)w := uint32(state)if v == 0 {// Counter is 0, no need to wait.if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}// Increment waiters count.//...}
}

当调用 waitGroup.Wait() 时,会去循环检查 state,只有当高 32 位为 0(即当前执行的任务为 0)时才会返回,否则就会增加低为然后继续循环。

可想而知,调用 Done 就是高位减 1,就暂不赘述了。

仅执行一次 Once

见字如面,sync.Once 用于保证传入的函数只执行一次。

在有些高并发的场景下,可能会有这种需求:多个 goroutine 同时执行任务 A,哪个先跑完就去执行任务 B,跑得慢的不需要执行。

使用:

    once := sync.Once{}once.Do(func() {fmt.Println("do the work that only need exec once")})

实现也很简单:

type Once struct {// done indicates whether the action has been performed.// It is first in the struct because it is used in the hot path.// The hot path is inlined at every call site.// Placing done first allows more compact instructions on some architectures (amd64/386),// and fewer instructions (to calculate offset) on other architectures.done uint32m    Mutex
}

Once 的实现就是一个状态值和一个互斥锁。

func (o *Once) Do(f func()) {// Note: Here is an incorrect implementation of Do:////  if atomic.CompareAndSwapUint32(&o.done, 0, 1) {//      f()//  }//// Do guarantees that when it returns, f has finished.// This implementation would not implement that guarantee:// given two simultaneous calls, the winner of the cas would// call f, and the second would return immediately, without// waiting for the first's call to f to complete.// This is why the slow path falls back to a mutex, and why// the atomic.StoreUint32 must be delayed until after f returns.if atomic.LoadUint32(&o.done) == 0 {// Outlined slow-path to allow inlining of the fast-path.o.doSlow(f)}
}
func (o *Once) doSlow(f func()) {o.m.Lock()defer o.m.Unlock()if o.done == 0 {defer atomic.StoreUint32(&o.done, 1)f()}
}

当首次执行时,会通过原子操作修改其中的 done 状态(这个过程需要获取互斥锁)。后面再执行 Do,发现状态不对,就不会执行了。

原子操作

在前面看并发包的一些实现时,发现多多少少都是使用 atomic 进行实现,比如 WaitGroup#Wait:

        state := atomic.LoadUint64(statep)v := int32(state >> 32)w := uint32(state)

atomic 原子操作,只能同步一个整型变量或自定义类型变量,更适合一些对性能十分敏感、并发量较大且读多写少的场合。

原子操作由底层硬件直接提供支持,是一种硬件实现的指令级的“事务”
atomic 原子操作的特性:随着并发量提升,使用 atomic 实现的共享变量的并发读写性能表现更为稳定,尤其是原子读操作,和 sync 包中的读写锁原语比起来,atomic 表现出了更好的伸缩性和高性能

无论整型变量和自定义类型变量,atomic的操作实质上针对的都是字长长度的指针。在64位cpu上就是8个字节。因为CPU通过数据总线,一次从内存中最多只能获取一个字长的信息。所以atomic的限制也是一个字长。

其他

虽然都在 sync 包中,但 sync.WaitGroup,Map,Pool 层级更高一些,是基于 Mutex、RWMutex 和 Cond 这三个基本原语之上实现的机制。

Go 团队认为递归锁或可重入锁是一个不好的语法,所以不支持。

Golang 基础:底层并发原语 Mutex RWMutex Cond WaitGroup Once等使用和基本实现相关推荐

  1. 深入 Go 并发原语 — Channel 底层实现

    作为 Go 并发原语的第一篇文章,一定绕不开 Go 的并发哲学.从 Tony Hoare 写的 Communicating Sequential Processes 这篇文章说起,这篇经典论文算是 G ...

  2. 视频教程-桫哥-GOlang基础-Go语言实战:并发爬虫-Go语言

    桫哥-GOlang基础-Go语言实战:并发爬虫 多年互联网从业经验: 有丰富的的企业网站.手游.APP开发经验: 曾担任上海益盟软件技术股份有限公司项目经理及产品经理: 参与项目有益盟私募工厂.睿妙影 ...

  3. golang基础面试题总结

    golang基础面试题总结 前言:由于正在准备之后的实习面试,故总结了一部分golang语言基础的问题,回答全为自己组织的语言,若有错各位大佬可及时指出,大家共同进步,谢谢. 1.go中怎样实现安全读 ...

  4. GoLang之使用sync.pool和sync.cond

    GoLang之使用sync.pool和sync.cond 目录 GoLang之使用sync.pool和sync.cond 1.sync.Pool结构体 1.1sync.Pool结构体 1.2Put方法 ...

  5. [Tour of Go] Golang基础

    Golang基础 看官网文档做的笔记.厌倦了每次捡起Go都要重看文档了. 官网的 Tutorials 大部分是一些使用Go进行开发的简单流程示例,我个人感觉是,按照按需自取的原则,稍微看一下,敲一下, ...

  6. Golang基础知识点整理

    Golang基础知识点整理 Go语言strconv包实现字符串和数值类型的相互转换 1 Slice 1.1 定义 1.2 切片的底层原理 1.3 切片的创建方式 1.3.1 从已有的数组或切片生成新的 ...

  7. Golang的高并发

    Golang的高并发 Golang的调度器有三个核心的元素: 物理处理器 逻辑处理器 goroutine 物理处理器更接近cpu核的概念,主要包括 调度线程来运行. 分配逻辑处理器的基础, 每个物理处 ...

  8. go每日新闻(2022-03-07)——Go 原生并发原语和最佳实践

    每日一谚:If you don't understand the data, you don't understand the problem. go中文网每日资讯--2022-03-07 一.Go语 ...

  9. Golang 基础学习

    Golang 基础学习 使用vscode开发配置国内源 dep包管理器 变量 常量 基本数据类型 判断数据类型 基本数据类型转换 整型 浮点型 布尔类型 字符串(两种) array数组与切片 数组定义 ...

最新文章

  1. 总分的公式计算机,装机模拟器各配件跑分及计算公式分享 3DMark分数怎么算 3DMark分数计算公式_游侠网...
  2. IExtensibleObjectExtensibleHttpApplication的性能问题
  3. 射雕三部曲的优美片段
  4. 学习响应式BootStrap来写融职教育网站,Bootsrtap第三天nav布局
  5. java 重定向 https_使用简单身份验证从HTTP重定向到HTTPS
  6. idea 单独引入jar_Intellij IDEA 添加jar包的三种方式
  7. idea 快捷删除移动_21个极大提高开发效率的VS Code快捷键
  8. Rust 第一章 简介
  9. Mac 下配置XAMPP
  10. OCR之Tesseract使用
  11. Unity Android 使用UnityWebRequest Post 数据后,没有获得服务器返回的数据
  12. MySQL 修改字段类型或长度
  13. AndroidStudio 抓包工具Profiler使用
  14. 分享几篇有关DO-178和GJB5000对比的论文
  15. 联想云计算机终端,联想智能云教室系统 V1.3.20.1109_C201105 最新官网版本
  16. 是时候更换你的无线路由器(wifi暴力破解)
  17. java推送微信消息换行_微信公众号开发:回复文本消息换行的三种方式
  18. 中国驾照在美国各州开车的规定
  19. 石子合并——最经典的dp问题
  20. zookeeper下载以及zkClient的使用

热门文章

  1. 基于PyQt5实现查看本地图片功能
  2. oracle tsn文件,tsn-12560 tsn-00530 一个很棘手的问题,查了metalink也没有解决方案
  3. Python主要用来做什么 它的应用大全有哪些
  4. 计算机狐狸标志的程序,Firefox 推出新 logo,这只狐狸长这样子
  5. KIBANA用户手册(一) DISCOVER-时间
  6. C++之begin()和end()分析(C艹11)
  7. VPS国内各网点一键测试脚本
  8. 码教授|信息流的那点事:为什么信息流如此流行
  9. 定义银行账户类Account,有属性:卡号cid,余额balance,所属用户Customer 银行账户类Account有方法: (1)getInfo(),返回String类型,返回卡的详细信息
  10. easyui SWFUpload