ants——Goroutine Pool阅读笔记

大规模 Goroutine 的瓶颈

golang 的 net/http 标准库,入口函数 ListenAndServe :

1
2
3
4
5
6
7
8
9
10
11
func (srv *Server) ListenAndServe() error {
addr := srv.Addr
if addr == "" {
addr = ":http"
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
}

看到最后那个 srv.Serve 调用了吗?这个 Serve 方法里面就是实际处理 http 请求的逻辑,我们再进入这个方法内部:

1
2
3
4
5
6
7
8
9
10
func (srv *Server) Serve(l net.Listener) error {
defer l.Close()
...
// 不断循环取出TCP连接
for {
rw, e := l.Accept()
...
go c.serve(ctx)
}
}

首先,这个方法的参数(l net.Listener) ,是一个 TCP 监听的封装,负责监听网络端口, rw, e := l.Accept() 则是一个阻塞操作,从网络端口取出一个新的 TCP 连接进行处理,最后 go c.serve(ctx) 就是最后真正去处理这个 http 请求的逻辑了,看到前面的 go 关键字了吗?

没错,这里启动了一个新的 goroutine 去执行处理逻辑,而且这是在一个无限循环体里面,所以意味着,每来一个请求它就会开一个 goroutine 去处理,相当任性粗暴啊…,不过有 Go 调度器背书,一般来说也没啥压力,然而,如果,我是说如果哈,突然一大波请求涌进来了,他来 10w 个请求你就要开给他 10w 个 goroutine,来 100w 个你就要老老实实开给他 100w 个,线程调度压力陡升,内存爆满。

如果实现一个类似JAVA线程池的协程池来实现资源复用,不仅不会出现这种状况,还可以大幅节约CPU资源,而ants就是基于go语言实现的协程池工具包

本文将以一个JAVA开发者+Golang初学者的角度,阅读源码并写下我的思考,我将一边完善本文,一边深入探索go语言

同步锁

ants中使用的锁,是基于CAS机制和指数回避算法实现的一种锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
type spinLock uint32
const maxBackoff = 16

func (sl *spinLock) Lock() {
backoff := 1
//尝试抢锁,如果本次没有抢到锁,使用指数回避算法来让自己在之后的某个时间段再随机抢锁,一直自旋直到抢到锁
//拿锁,CAS将标志位设为1
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {

for i := 0; i < backoff; i++ {
//runtime.Gosched() 会让当前goroutine让出CPU,好让其他的goroutine获得执行的机会。
//当一个goroutine发生阻塞,Go会自动地把与该goroutine处于同一系统线程的其他goroutines转移到另一个系统线程上去,以使这些goroutines不阻塞
runtime.Gosched()
}
//指数退避算法以指数方式重试请求(不断增加各次重试之间的等待时间,直到达到最大退避时间)
if backoff < maxBackoff {
backoff <<= 1
}
}
}
//解锁,原子操作将标志位设为0
func (sl *spinLock) Unlock() {
atomic.StoreUint32((*uint32)(sl), 0)
}
// NewSpinLock instantiates a spin-lock.
func NewSpinLock() sync.Locker {
return new(spinLock)
}

协程池中的同步锁原理很简单,就是暴力地使用CAS修改锁的标志位,如果未修改成功,便尝试重新拿锁,每次重新拿锁之前需要进行等待

而等待时间是以指数正增长,超过阈值便停止

如何理解下面这段代码?

1
2
3
for i := 0; i < backoff; i++ {
runtime.Gosched()
}

runtime ,即进程权限调度包,可以直接操纵CPU对进程的行为,Gosched()方法就是让当前goroutine让出CPU,好让其他的goroutine获得执行的机会。

所以这段代码的意思就是,让该协程停止拿锁,让出此时间片给其他协程,直到该循环结束

Go会自动地把与该goroutine处于同一系统线程的其他goroutines转移到另一个系统线程上去,以使这些goroutines不阻塞

核心数据结构

Pool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
type Pool struct {
//池容量,表示ants最多能创建的goroutine数量。如果为负数,表示容量无限制;
capacity int32
//已经在运行的worker goroutine数量;
running int32
//记录池当前的状态,是否已关闭(CLOSED)
state int32
//阻塞等待的任务数量
waiting int32
//spinlock锁,用于并发安全的从worker队列中获取空闲worker
lock sync.Locker
//存放池中所有的worker,实现容器的接口,workerArray包含可用workers队列和过期workers队列,只会从可用workers队列中取可用worker
workers workerArray
//条件原语,pool为阻塞模式时,如果retrieveWorker函数获取不到可用worker并且没有达到池的最大阻塞数量,会一直阻塞直到被唤醒
cond *sync.Cond
//临时对象池 用于在retrieveWorker函数中 加速获取一个可用的worker
workerCache sync.Pool
//为1表示已经停止清理过期worker,结束运行purgePeriodically
heartbeatDone int32
//用于通知purgePeriodically结束运行
stopHeartbeat context.CancelFunc
//用于配置pool的相关参数
options *Options
}
type Options struct {//pool的配置, 源码使用了Option写法对配置进行可选初始化
ExpiryDuration time.Duration //pool清理过期的worker的时间间隔

PreAlloc bool //初始化时是否内存预分配

MaxBlockingTasks int //pool.Submit被阻塞的最大goroutine数量,0表示没限制

Nonblocking bool //Pool.Submit是否阻塞,为false时MaxBlockingTasks参数无效

PanicHandler func(interface{}) //worker发生panic时 调用此函数,如果为nil panic会继续向外层抛出

Logger Logger //日志记录器 可自定义,默认官方logger
}

goWorker

1
2
3
4
5
6
7
8
9
//对象结构
type goWorker struct {
//核心执行单元,它会绑定一个函数并执行,执行完后解绑并回到池内
pool *Pool //表明这个goWorker是哪个池的

task chan func() //存放该goWorker要执行的所有工作,外部会通过调用pool.Submit方法向chan中发送任务

recycleTime time.Time //worker队列,每次取一个goWorker执行task,执行完放回worker队列,会更新goWorker的此字段
}

goWorker队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type workerArray interface {
len() int //获取长度
isEmpty() bool //判断是否为空
insert(worker *goWorker) error//加入goWorker
detach() *goWorker //从队列中获取一个可用的goWorker
retrieveExpiry(duration time.Duration) []*goWorker //获取过期worker的数组切片
reset()//清空队列
}
//循环队列
type loopQueue struct {
items []*goWorker
expiry []*goWorker
head int
tail int
size int
isFull bool
}
//栈
type workerStack struct {
items []*goWorker
expiry []*goWorker
}

临时池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
workerCache   sync.Pool
//点进SDK sync包下,发现以下代码
type Pool struct {
noCopy noCopy

local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array

victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array

// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() any
}

sync.Pool 是 Golang 内置的对象池,可用于缓存临时对象,也可以生成对象,避免因频繁建立临时对象所带来的消耗以及对 GC 造成的压力。

其中New func() any是一个对象的构造函数,用户使用 Get 来从对象池中获取对象,使用 Put 将对象归还给对象池。

在ants中,构造函数长这样

1
2
3
4
5
6
7
p.workerCache.New = func() interface{} {
//返回一个goWorker
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}

所以临时池的的作用就是帮我们创建goworker并交给协程池的workerArray管理,承担了一个cache的角色

利用对象池的特性,实现对象复用

初始化pool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
//创建pool的核心代码
func NewPool(size int, options ...Option) (*Pool, error) {
opts := loadOptions(options...)//加载配置

if size <= 0 {//如果容量为负数,置为-1,表示为无限池
size = -1
}
//检查清理时间设置是否合法
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime //如果没设置 默认1s清理一次过期worker
}

if opts.Logger == nil {
opts.Logger = defaultLogger//没有设置,就用go官方的日志器
}

p := &Pool{
capacity: int32(size),
lock: internal.NewSpinLock(),
options: opts,
}
p.workerCache.New = func() interface{} {//临时对象池,retrieveWorker函数会在一些情况下通过其获取可用的worker
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),//workerChanCap是一个函数,根据GOMAXPROCS数来决定task是无缓冲还是有缓冲
}
}
if p.options.PreAlloc { //如果要内存预分配,就使用队列的方式实现WorkerArray接口,此情况下的workers队列容量最大为size
if size == -1 {
return nil, ErrInvalidPreAllocSize
//设置为无限大就无序预分配了
}
p.workers = newWorkerArray(loopQueueType, size)//用预分配内存方式创建的pool不能通过Tune函数动态改变池的容量
} else { //否则用栈的实现方式(默认,workers队列容量无限制)
p.workers = newWorkerArray(stackType, 0)
}

p.cond = sync.NewCond(p.lock)//初始化条件原语

var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
go p.purgePeriodically(ctx)
//用额外的协程去启动p.purgePeriodically(ctx),即清道夫函数,用于定期检查释放池中过期的workers,ctx用于pool控制该goroutine什么时候结束
return p, nil
}

初始化pool之前会对参数做一些校验,比如定期清理时间的设定是否合法,是否启用官方的日志。

初始化pool后,随即创建临时池

创建后判断是否需要内存预分配,若不需要就使用栈的方式来实现WorkerArray

最后创建一个额外的goroutine分配给清道夫函数

队列方法

显而易见,每个worker队列的实现类都拥有两个数组切片

  1. items:存储被使用的goWorker
  2. expiry:存储已经过期的goWorker

以循环队列为例,看几个关键方法

可用数组中拿取一个可用的goWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//从可用数组中获取一个可用的goWorker
func (wq *loopQueue) detach() *goWorker {
if wq.isEmpty() {
return nil
}
//队头元素出队,类似于poll()
w := wq.items[wq.head]
wq.items[wq.head] = nil
//对头指针后移
wq.head++
if wq.head == wq.size {
wq.head = 0
}
wq.isFull = false

return w
}

获取过期goWorker的数组切片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//获取过期goWorker的数组切片,用于清道夫函数
func (wq *loopQueue) retrieveExpiry(duration time.Duration) []*goWorker {
expiryTime := time.Now().Add(-duration)
//二分查找获取已经过期的goWoker的索引
index := wq.binarySearch(expiryTime)
if index == -1 {
return nil
}
wq.expiry = wq.expiry[:0]
//将已经过期的所有goWorker加入队列并从原队列中移除
if wq.head <= index {
wq.expiry = append(wq.expiry, wq.items[wq.head:index+1]...)
for i := wq.head; i < index+1; i++ {
wq.items[i] = nil
}
} else {
wq.expiry = append(wq.expiry, wq.items[0:index+1]...)
wq.expiry = append(wq.expiry, wq.items[wq.head:]...)
for i := 0; i < index+1; i++ {
wq.items[i] = nil
}
for i := wq.head; i < wq.size; i++ {
wq.items[i] = nil
}
}
head := (index + 1) % wq.size
wq.head = head
if len(wq.expiry) > 0 {
wq.isFull = false
}
return wq.expiry
}

由上文的revertWorker源码可知,对于循环队列来说,队头存放的就是recycleTime最旧的goWorker,队尾就是最新的goWorker

这类似于LRU,对吧?所以直接使用二分法找出最接近过期时间的goWoker,便可以找出所有已过期的goWorker

向队列中插入goWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//放入goWorker
func (wq *loopQueue) insert(worker *goWorker) error {
if wq.size == 0 {
return errQueueIsReleased
}
if wq.isFull {
return errQueueIsFull
}
//移动队尾指针
wq.items[wq.tail] = worker
wq.tail++
if wq.tail == wq.size {
wq.tail = 0
}
if wq.tail == wq.head {
wq.isFull = true
}
return nil
}

如何获取一个goWorker并执行

显而易见,每个goWorker其实就是对submit的任务进行了封装,并设置了过期时间,使用LRU来淘汰长时间不使用的goWoker

也许有一些goWorker绑定了任务,但是只会执行一次,所以得定时进行淘汰回收资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func (p *Pool) retrieveWorker() (w *goWorker) { //获取一个可用的worker
//从临时对象池中获取一个新生成的worker
spawnWorker := func() {
w = p.workerCache.Get().(*goWorker)
w.run()
}
p.lock.Lock()

w = p.workers.detach()
if w != nil { //尝试从队列中获取worker
p.lock.Unlock()
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
//如果工作队列为空并且池未满,就从临时对象池中获取
p.lock.Unlock()
spawnWorker()
} else
{
if p.options.Nonblocking { //如果池是非阻塞模式,则直接返回nil切片
p.lock.Unlock()
return
}
retry: //如果池是阻塞模式,这种情况只能阻塞直到获取一个可用的worker
if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks { //如果当前阻塞goroutine数量 >= 设置的最大阻塞数,直接返回
p.lock.Unlock()
return
}
//否则阻塞直到收到通知有可用的worker
p.addWaiting(1)
p.cond.Wait()
p.addWaiting(-1)

if p.IsClosed() { //被唤醒后如果池已经关闭则直接结束
p.lock.Unlock()
return
}

var nw int
if nw = p.Running(); nw == 0 {
p.lock.Unlock()
spawnWorker() //如果是被清道夫唤醒则从临时对象池中获取worker
return
}
if w = p.workers.detach(); w == nil { //如果正常从worker队列中没获取到
if nw < p.Cap() { //如果当前运行数量小于容量就从临时对象池获取
p.lock.Unlock()
spawnWorker()
return
}
goto retry //否则重试之前操作
}
p.lock.Unlock() //正常从worker队列中获取到了则返回
}
return
}

这个方法里有很多条件语句的嵌套,读起来甚是费力,也许是go语言没有括号的原因,JAVA看多了就不习惯

我们可以捋顺逻辑了:

  1. 尝试获取worker:正常获取到则返回,若无worker,证明此时刚刚初始化,就由临时池创建并返回。
  2. 未获取到worker:说明此时已经无空闲goroutine,执行下面的逻辑
  3. 若为非阻塞模式,直接返回空值
  4. 若为阻塞模式,使用死循环等待获取goroutine。这里使用了goto来实现,若当前阻塞协程数量超过阈值,直接退出。若获取不到或者无已经启用的goroutine就从临时池里获取

给goWorker绑定函数

1
2
3
4
5
6
7
8
9
10
11
12
func (p *Pool) Submit(task func()) error {
if p.IsClosed() {
return ErrPoolClosed
}
var w *goWorker
if w = p.retrieveWorker(); w == nil {
return ErrPoolOverload
}
//绑定函数
w.task <- task
return nil
}

让该goWorker执行工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (w *goWorker) run() {
w.pool.addRunning(1)
go func() {
//defer延迟函数,在return前执行,用于释放资源
defer func() {
w.pool.addRunning(-1)
//将w放回对象池
w.pool.workerCache.Put(w)
if p := recover(); p != nil {
if ph := w.pool.options.PanicHandler; ph != nil {
ph(p)
} else {
w.pool.options.Logger.Printf("worker exits from a panic: %v\n", p)
var buf [4096]byte
n := runtime.Stack(buf[:], false)
w.pool.options.Logger.Printf("worker exits from panic: %s\n", string(buf[:n]))
}
}
// Call Signal() here in case there are goroutines waiting for available workers.
w.pool.cond.Signal()
}()
//执行绑定的函数
for f := range w.task {
if f == nil {
return
}
f()
//放回原队列
if ok := w.pool.revertWorker(w); !ok {
return
}
}
}()
}

将goWorker放回池中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (p *Pool) revertWorker(worker *goWorker) bool {
if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
p.cond.Broadcast()
return false
}
//将时间设置为最新
worker.recycleTime = time.Now()
p.lock.Lock()

// To avoid memory leaks, add a double check in the lock scope.
if p.IsClosed() {
p.lock.Unlock()
return false
}
//往队列里插入goWoker
err := p.workers.insert(worker)
if err != nil {
p.lock.Unlock()
return false
}

// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.
p.cond.Signal()
p.lock.Unlock()
return true
}

清道夫函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (p *Pool) purgePeriodically(ctx context.Context) {
heartbeat := time.NewTicker(p.options.ExpiryDuration) //定义一个断续器,根据配置的时间定期向通道发送清除信号
defer func() {
heartbeat.Stop()
atomic.StoreInt32(&p.heartbeatDone, 1)
}()

for {
select {
case <-heartbeat.C://接收到清除信号后去执行清理任务
case <-ctx.Done()://又或者ReleaseTimeout函数向该管道发送了信号,停止执行清理任务
return
}
//执行清理任务前 先检查池是否关闭
if p.IsClosed() {
break
}

p.lock.Lock()
expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration) //获取池中过期的workers,回收时间在time.now()-p.options.ExpiryDuration之前的worker就是过期worker
p.lock.Unlock()

for i := range expiredWorkers {
expiredWorkers[i].task <- nil //让过期worker解除已经绑定的函数
expiredWorkers[i] = nil //释放过期worker
}
// 因为有可能所有的worker都被清理了或者开发者调用了Tune函数扩大了pool的容量,但仍然有goroutine被p.cond.Wait()阻塞,此时就可唤醒全部goroutine去抢夺worker
if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) {
p.cond.Broadcast()
}
}
}

清道夫函数和队列实现了协程的高效复用,只有拿到已绑定函数的goWorker对象才能创建goroutine来执行函数

避免了高并发频繁创建goroutine给GC带来压力

goWorker放回队列后便会以LRU算法后移,过期的goWorker对象会被清道夫函数解除绑定的函数,等待新任务的到来


ants——Goroutine Pool阅读笔记
http://example.com/post/ants——Goroutine-Pool阅读笔记.html
作者
SamuelZhou
发布于
2022年11月30日
许可协议