上一篇 《原生并发 goroutine channel 和 select 常见使用场景》 介绍了基于 CSP 模型的并发方式。

除了 CSP,Go 通过 sync 包以及 atomic 包还提供了一些更底层的同步 API,一般用于性能要求比较高的场景。

sync.Mutex 实现的同步机制的性能要比 channel 实现的高出三倍多。

在 sync/mutex.go 中,有这么一段注释:

// Package sync provides basic synchronization primitives such as mutual
// exclusion locks. Other than the Once and WaitGroup types, most are intended
// for use by low-level library routines. Higher-level synchronization is
// better done via channels and communication.
// Values containing the types defined in this package should not be copied.

注释的大概意思是,sync 包提供的是底层并发原语,一般给底层库用的,如果是上层业务同步,最好还是使用 channel。


如果需要多处使用,可以使用全局变量或者指针传递(& 创建,* 使用)。

只有拥有数据对象所有权(从 channel 接收到该数据)的 Goroutine 才可以对该数据对象进行状态变更。

互斥锁 Mutex

Mutex 和 RMMutex 的作用和 Java 里的类似,主要来看下 API 和基本实现。


    mu := sync.Mutex{}mu.Lock()       //加互斥锁mu.Unlock()


// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
// A Mutex must not be copied after first use.
type Mutex struct {state int32sema  uint32


拷贝使用 Mutex 的问题

来通过代码看一下拷贝使用 sync.Mutex 的问题。

var num int = 1
func testCopyMutex() {mu := sync.Mutex{}waitGroup := sync.WaitGroup{}waitGroup.Add(1)go func(copyMu sync.Mutex) {copyMu.Lock()num = 100fmt.Println("update num from sub-goroutine: ", num)time.Sleep(5 * time.Second)fmt.Println("read num from sub-goroutine: ", num)copyMu.Unlock()waitGroup.Done()}(mu)time.Sleep(time.Second)mu.Lock()num = 1fmt.Println("read num from main: ", num)mu.Unlock()waitGroup.Wait()

上面的代码中,我们子 goroutine 里先加锁,然后修改了 num,在等待 5s 后,输出了 num 的值,然后才释放锁;在这期间,主 goroutine 里会尝试获取锁,然后修改 num。

我们期望的是传递的同一个 Mutex,那子 goroutine 里在释放锁之前,num 都是它修改后的值,但允许的结果却让人意外:

update num from sub-goroutine:  100
read num from main:  1
read num from sub-goroutine:  1

可以看到,在真正运行时,子 goroutine 加锁的时间内,主 goroutine 居然也可以访问到 num。

问题的原因就在于传递的是 Mutex 的值。


    go func(copyMu *sync.Mutex) {   //参数类型加 *copyMu.Lock()num = 100fmt.Println("update num from sub-goroutine: ", num)time.Sleep(5 * time.Second)fmt.Println("read num from sub-goroutine: ", num)copyMu.Unlock()waitGroup.Done()}(&mu)  //传递指针


原因在于,Mutex 复制后,状态分离。子 goroutine 对副本加锁,主 goroutine 感知不到,因为它们使用的不是同一份数据了!

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {// Fast path: grab unlocked mutex.if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {if race.Enabled {race.Acquire(unsafe.Pointer(m))}return}// Slow path (outlined so that the fast path can be inlined)m.lockSlow()

读写锁 RWMutex


    rmu := sync.RWMutex{}rmu.RLock()     //读锁rmu.RUnlock()rmu.Lock()      //写锁rmu.Unlock()rmu.RLocker().Lock()        //通过 RLock 实现rmu.RLocker().Unlock()


// A RWMutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer.
// The zero value for a RWMutex is an unlocked mutex.
// A RWMutex must not be copied after first use.
// If a goroutine holds a RWMutex for reading and another goroutine might
// call Lock, no goroutine should expect to be able to acquire a read lock
// until the initial read lock is released. In particular, this prohibits
// recursive read locking. This is to ensure that the lock eventually becomes
// available; a blocked Lock call excludes new readers from acquiring the
// lock.
type RWMutex struct {w           Mutex  // held if there are pending writerswriterSem   uint32 // semaphore for writers to wait for completing readersreaderSem   uint32 // semaphore for readers to wait for completing writersreaderCount int32  // number of pending readersreaderWait  int32  // number of departing readers

可以看到,RWMutex 的成员有一个互斥锁(用于在写入时获取),读写者的信号量,读者数量等。

RWMutext 读不阻塞读,但会阻塞写。

// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {if race.Enabled {_ = rw.w.staterace.Disable()}// First, resolve competition with other writers.rw.w.Lock()// Announce to readers there is a pending writer.r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders// Wait for active readers.if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {runtime_SemacquireMutex(&rw.writerSem, false, 0)}if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(&rw.readerSem))race.Acquire(unsafe.Pointer(&rw.writerSem))}




  • 减少锁的范围
  • 千千万万记得 unlock,可以早点写 defer unlock,避免忘记

条件变量 Cond


支持多个 goroutine 等待某个条件,等条件允许后,广播唤醒这些 goroutine 执行。



    cond := sync.NewCond(&sync.Mutex{}) 参数为 sync.Locker 接口类型go func() {//    cond.L.Lock()//    for !condition() {cond.Wait()      //等待,一般放在循环里,查询一次,不满足就阻塞(释放锁),等被唤醒后,再检查下条件//    }//    ... make use of condition ...//    cond.L.Unlock()}()cond.L.Lock()   //获取构造传入的锁cond.Broadcast()    //通知所有等待的 goroutine,从 Wait 返回,重新获取锁cond.Signal()   //通知一个cond.L.Unlock()

sync.Cond.Wait 一般结合 for 循环使用,反复检查条件是否满足。


// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
// A Cond must not be copied after first use.
type Cond struct {noCopy noCopy// L is held while observing or changing the conditionL Lockernotify  notifyListchecker copyChecker
}// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {return &Cond{L: l}


func (c *Cond) Wait() {c.checker.check()t := runtime_notifyListAdd(&c.notify)c.L.Unlock()runtime_notifyListWait(&c.notify, t)c.L.Lock()


func (c *Cond) Broadcast() {c.checker.check()runtime_notifyListNotifyAll(&c.notify)

广播时,会通知所有等待的 goroutine 恢复执行 Wait 里的逻辑,重新申请获取锁。

等待组 WaitGroup

在需要等待多个 goroutine 完成任务后继续执行的场景,可以使用 sync.WaitGroup,和 Java 的 CountDownLaunch 类似。


    waitGroup := sync.WaitGroup{}waitGroup.Add(1)        //需要等待数为 1go func() {waitGroup.Done()    //减去需要等待数}()waitGroup.Wait()    //等待数为 0 才继续执行,循环检查


type WaitGroup struct {noCopy noCopy// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.// 64-bit atomic operations require 64-bit alignment, but 32-bit// compilers do not ensure it. So we allocate 12 bytes and then use// the aligned 8 bytes in them as state, and the other 4 as storage// for the sema.state1 [3]uint32

可以看到,WaitGroup 核心就是一个计数的 state,高位 32 位为数量,低位 32 位为等待的数量。

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {statep, semap := wg.state()if race.Enabled {_ = *statep // trigger nil deref earlyrace.Disable()}for {state := atomic.LoadUint64(statep)v := int32(state >> 32)w := uint32(state)if v == 0 {// Counter is 0, no need to wait.if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}// Increment waiters count.//...}

当调用 waitGroup.Wait() 时,会去循环检查 state,只有当高 32 位为 0(即当前执行的任务为 0)时才会返回,否则就会增加低为然后继续循环。

可想而知,调用 Done 就是高位减 1,就暂不赘述了。

仅执行一次 Once

见字如面,sync.Once 用于保证传入的函数只执行一次。

在有些高并发的场景下,可能会有这种需求:多个 goroutine 同时执行任务 A,哪个先跑完就去执行任务 B,跑得慢的不需要执行。


    once := sync.Once{}once.Do(func() {fmt.Println("do the work that only need exec once")})


type Once struct {// done indicates whether the action has been performed.// It is first in the struct because it is used in the hot path.// The hot path is inlined at every call site.// Placing done first allows more compact instructions on some architectures (amd64/386),// and fewer instructions (to calculate offset) on other architectures.done uint32m    Mutex

Once 的实现就是一个状态值和一个互斥锁。

func (o *Once) Do(f func()) {// Note: Here is an incorrect implementation of Do:////  if atomic.CompareAndSwapUint32(&o.done, 0, 1) {//      f()//  }//// Do guarantees that when it returns, f has finished.// This implementation would not implement that guarantee:// given two simultaneous calls, the winner of the cas would// call f, and the second would return immediately, without// waiting for the first's call to f to complete.// This is why the slow path falls back to a mutex, and why// the atomic.StoreUint32 must be delayed until after f returns.if atomic.LoadUint32(&o.done) == 0 {// Outlined slow-path to allow inlining of the fast-path.o.doSlow(f)}
func (o *Once) doSlow(f func()) {o.m.Lock()defer o.m.Unlock()if o.done == 0 {defer atomic.StoreUint32(&o.done, 1)f()}

当首次执行时,会通过原子操作修改其中的 done 状态(这个过程需要获取互斥锁)。后面再执行 Do,发现状态不对,就不会执行了。


在前面看并发包的一些实现时,发现多多少少都是使用 atomic 进行实现,比如 WaitGroup#Wait:

        state := atomic.LoadUint64(statep)v := int32(state >> 32)w := uint32(state)

atomic 原子操作,只能同步一个整型变量或自定义类型变量,更适合一些对性能十分敏感、并发量较大且读多写少的场合。

atomic 原子操作的特性:随着并发量提升,使用 atomic 实现的共享变量的并发读写性能表现更为稳定,尤其是原子读操作,和 sync 包中的读写锁原语比起来,atomic 表现出了更好的伸缩性和高性能



虽然都在 sync 包中,但 sync.WaitGroup,Map,Pool 层级更高一些,是基于 Mutex、RWMutex 和 Cond 这三个基本原语之上实现的机制。

Go 团队认为递归锁或可重入锁是一个不好的语法,所以不支持。

