go并发编程

互斥锁

mutex实现了Locker接口

1
2
3
4
type Locker interface {
Lock()
Unlock()
}

加锁,解锁,简洁明了,Go的一贯风格

1
2
3
4
5
6
7
// mutex 是一种互斥锁
// mutex 的零值是未加锁的互斥锁
// mutex 在第一次使用之后不能进行复制
type Mutex struct {
state int32 //状态位,内部实现时把该变量分成四份
sema uint32 //信号量,用来控制等待的goroutine 的阻塞,休眠,唤醒
}

状态位

  1. Locked:表示该Mutex是否已被锁定,0-没有锁定,1-已被锁定。
  2. Woken:表示是否有协程已被唤醒,0-没有协程唤醒,1-已有协程唤醒,正在加锁过程中。
  3. Starving:表示该Mutex是否处于饥饿状态, 0-没有饥饿,1-饥饿状态,说明有协程阻塞了超过1ms。
  4. Waiter:表示阻塞等待锁的协程个数,协程解锁时根据此值来判断是否需要释放信号量。

协程之间抢锁实际上是抢给Locked赋值的权利,能给Locked域置1,就说明抢锁成功。

抢不到的话就阻塞等待Mutex.sema信号量,一旦持有锁的协程解锁,等待的协程会依次被唤醒。

Woken 和 Starving 主要用于控制协程间的抢锁

加锁

golang通过自旋的方式来修改状态位,加锁时,如果当前Locked位为1,说明该锁当前由其他协程持有,尝试加锁的协程并不是马上转入阻塞,而是会持续的探测Locked位是否变为0,自旋时间很短,但如果在自旋过程中发现锁已被释放,那么协程可以立即获取锁。

加锁时程序会自动判断是否可以自旋,以下情况不能自旋,会直接阻塞

  1. 自旋次数超过4次
  2. 单核CPU
  3. GOMAXPROCS()将处理器设置为1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 加锁
// 如果锁已经被使用,调用goroutine阻塞,直到锁可用
func (m *Mutex) Lock() {
// 快速路径:没有竞争直接获取到锁,修改状态位为加锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
// 开启-race之后会进行判断,正常情况可忽略
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 慢路径(以便快速路径可以内联)
m.lockSlow()
}

normal模式(非公平锁)

默认情况下,Mutex的模式为normal。

该模式下,协程如果加锁不成功不会立即转入阻塞,而是判断是否满足自旋的条件,如果满足则会启动自旋过程,尝试抢锁,直到自旋超过一定次数

starvation模式(公平锁)

饥饿模式是 1.9 版本中引入的优化,目的是保证互斥锁的公平性,防止协程长时间饥饿得不到调度

饥饿模式下,直接 unlock 把锁交给等待队列中排在第一位的 goroutine (等待时间最长),同时,新进来的 goroutine不会参与抢锁也不会进入自旋,会直接进入阻塞队列的尾部。这样很好的解决了老的 goroutine一直抢不到锁的情况。
触发条件:当一个 goroutine 等待锁时间超过 1 毫秒时,或者当前队列只剩下一个 goroutine 的时候,Mutex 切换到饥饿模式。

channel信道

用于不同的协程单元goroutine之间的同步通信

可以把 Channel 理解成是一个单向的管道,具有 FIFO 特性

Channel 是有容量限制的

  1. 当容量是 0 时,称为无缓冲 Channel。发送和接收只有一方就绪时,就绪方会被阻塞直到另一方也就绪。
  2. 当容量大于 0 时,称为有缓冲 Channel。当传输中的元素个数超过容量时,发送方将会被阻塞直到有可用的缓冲空间出现;当传输中的元素个数为 0 时,消费方将会被阻塞直到缓冲空间出现新的数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
type hchan struct {
qcount uint // 当前队列中剩余元素个数
dataqsiz uint // 环形队列长度,即可以存放的元素个数
buf unsafe.Pointer // 环形队列缓冲区
elemsize uint16 // 每个元素的大小
closed uint32 // 标识关闭状态
elemtype *_type // 元素类型
sendx uint // 队列下标,指示元素写入时存放到队列中的位置
recvx uint // 队列下标,指示元素从队列的该位置读出
recvq waitq // 等待读消息的goroutine队列
sendq waitq // 等待写消息的goroutine队列
lock mutex // 互斥锁,chan不允许并发读写
}

显而易见,channel是一个由双指针定义的环形队列,并且带有两个分别等待读写的G队列,并发特性简单,使用同步锁来进行互斥读写,并且一种管道只能传递一种类型的值

等待队列

从channel读数据,如果channel缓冲区为空或者没有缓冲区,当前goroutine会被阻塞;向channel写数据,如果channel缓冲区已满或者没有缓冲区,当前goroutine会被阻塞

被阻塞的goroutine将会挂在channel的等待队列中,等待被唤醒

因读阻塞的goroutine会被向channel写数据的goroutine唤醒
因写阻塞的goroutine会被从channel读数据的goroutine唤醒

写入消息:

读取消息:

通道实现生产者消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//单向写通道
func producer(out chan <- int) {
for i := 0; i < 10; i++ {
fmt.Println("producer send", i * i)
out <- i * i
}
close(out)
}
//单向读通道
func consumer(in <- chan int) {
for num := range in {
fmt.Println("consumer recv", num)
}
}

func main() {
// 有缓冲
ch := make(chan int, 5)

go producer(ch)
consumer(ch)
}

通道模拟读写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
var value int
//单向读通道
func readGo(in <- chan int, idx int) {
for {
num := <- in
fmt.Println("Id", idx, "Read", num)
time.Sleep(time.Millisecond * 300)
}
}
//单向写通道
func writeGo(out chan <- int, idx int) {
for {
// 生成随机数
num := rand.Intn(1000)
out <- num
fmt.Println("Id", idx, "Write", num)
time.Sleep(time.Millisecond * 300)
}
}

func main() {
// 随机数种子
rand.Seed(time.Now().UnixNano())
//无缓冲区通道
ch := make(chan int)

// 5个读协程 5个写协程
for i := 0; i < 5; i++ {
go readGo(ch, i)
}

for i := 0; i < 5; i++ {
go writeGo(ch, i)
}

//<- quit
for {
;
}
}

WaitGroup以组为单位等待

WaitGroup 直译是“等待组”,翻译成大白话就是等待一组协程完成任务。如果没有完成,就阻塞。

我们要计算100万个数的和,并对这个和求根号。常规的思路肯定是先一个 for 循环计算总和,再开根号,但是这样效率很低。

我们可以起1000个协程,每个协程计算1000个数的和,然后再对这些和求和,最后再开个根号,计算根号的时候,需要等所有并发的协程都计算完才行,WaitGroup 就是解决等所有并发协程完成计算的问题的

WaitGroup 的用法很简单。标准库中的 WaitGroup 只有三个方法

1
2
3
func (wg *WaitGroup) Add(delta int)//用来设置 WaitGroup 的计数值,delta 可正可负
func (wg *WaitGroup) Done()//用来将 WaitGroup 的计数值减一,其实就是调用了 Add(-1)
func (wg *WaitGroup) Wait()//阻塞等待,直到 WaitGroup 的计数值变成0,进入下一步

使用计算100万个数之和来举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"fmt"
"math"
"sync"
)

// 计算1000个数的和
func compute(m *sync.Mutex, wg *sync.WaitGroup, s, e int, count *int) {
sum := 0
for i := s; i < e; i++ {
sum += i
}
m.Lock()
*count += sum
m.Unlock()
//每个协程执行结束最后都会执行一次 Done() 函数,表示当前协程完成
wg.Done()
}

func main() {

var m sync.Mutex
//声明wg
var wg sync.WaitGroup

var count int
//因为这里开了1000个协程,所以执行 Add(1000)
wg.Add(1000)
for i := 0; i < 1000; i++ {
go compute(&m, &wg, i*1000+1, (i+1)*1000+1, &count)
}
//执行 Wait() 函数,等待这1000个协程组完成任务
wg.Wait()
fmt.Println(math.Sqrt(float64(count)))
return
}

实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type WaitGroup struct {
noCopy noCopy

// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state, and the other 4 as storage
// for the sema.
state1 [3]uint32
}

// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}

state1[3]

state1 是一个长度为3的 int32 类型数组

在64位系统下定义如下:

  1. 计数值:协程组中运行的协程数量
  2. waiter数:等待者个数,一个wg.wait()对应一个等待者
  3. 信号量

在32位系统下,如果 state1 不是64位对齐,定义如下:

v2-3980f8c6669e5890932173b059cbe434_r

什么是64bit对齐?

在后面 Add 或者 Wait 的操作中,类似这样的操作: atomic.AddUint64(statep, uint64(delta)<<32)

这些都是64位的原子操作,这些操作是对64位的内存块直接操作的。所以像这种64位的原子操作,都要求64位对齐。

64位对齐是要求数据的起始地址是64的倍数。在64位的系统中没有问题,但是在32位的系统中,内存块最小是4个字节,只能保证32位对齐,有可能会出现 state1 的起始地址不是64的倍数,而是32的倍数

这种情况下,将 state1[0] 当成信号量,就能保证 state1[1-2] 是64位对齐。

add

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
// 高32bit是计数值v,所以把delta左移32,增加到计数上
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32) // 当前计数值
w := uint32(state) // waiter count

if v > 0 || w == 0 {
return
}

// 如果计数值v为0并且waiter的数量w不为0,那么state的值就是waiter的数量
// 将waiter的数量设置为0,因为计数值v也是0,所以它们俩的组合*statep直接设置为0即可。此时需要并唤醒所有的waiter
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}


// Done方法实际就是计数器减1
func (wg *WaitGroup) Done() {
wg.Add(-1)
}

wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()

for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32) // 当前计数值
w := uint32(state) // waiter的数量
if v == 0 {
// 如果计数值为0, 所有任务已经完成,调用这个方法的goroutine不必再等待,继续执行它后面的逻辑即可
return
}
// 如果计数值大于0,说明此时还有任务没完成,把waiter数量加1。期间可能有并发调用Wait的情况,所以最外层使用了一个for循环
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 阻塞休眠等待
runtime_Semacquire(semap)
// 被唤醒,不再阻塞,返回
return
}
}
}

go并发编程
http://example.com/post/go并发编程.html
作者
SamuelZhou
发布于
2022年12月13日
许可协议