大规模 Goroutine 的瓶颈 golang 的 net/http 标准库,入口函数 ListenAndServe :
1 2 3 4 5 6 7 8 9 10 11 func (srv *Server) ListenAndServe() error { addr := srv.Addr if addr == "" { addr = ":http" } ln, err := net.Listen("tcp" , addr) if err != nil { return err } return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)}) }
看到最后那个 srv.Serve 调用了吗?这个 Serve 方法里面就是实际处理 http 请求的逻辑,我们再进入这个方法内部:
1 2 3 4 5 6 7 8 9 10 func (srv *Server) Serve(l net.Listener) error { defer l.Close() ... for { rw, e := l.Accept() ... go c.serve(ctx) } }
首先,这个方法的参数(l net.Listener) ,是一个 TCP 监听的封装,负责监听网络端口, rw, e := l.Accept()
则是一个阻塞操作,从网络端口取出一个新的 TCP 连接进行处理,最后 go c.serve(ctx)
就是最后真正去处理这个 http 请求的逻辑了,看到前面的 go 关键字了吗?
没错,这里启动了一个新的 goroutine 去执行处理逻辑,而且这是在一个无限循环体里面,所以意味着,每来一个请求它就会开一个 goroutine 去处理,相当任性粗暴啊…,不过有 Go 调度器背书,一般来说也没啥压力,然而,如果,我是说如果哈,突然一大波请求涌进来了,他来 10w 个请求你就要开给他 10w 个 goroutine,来 100w 个你就要老老实实开给他 100w 个,线程调度压力陡升,内存爆满。
如果实现一个类似JAVA线程池的协程池来实现资源复用,不仅不会出现这种状况,还可以大幅节约CPU资源,而ants就是基于go语言实现的协程池工具包
本文将以一个JAVA开发者+Golang初学者的角度,阅读源码并写下我的思考,我将一边完善本文,一边深入探索go语言
同步锁 ants中使用的锁,是基于CAS机制和指数回避算法实现的一种锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 type spinLock uint32 const maxBackoff = 16 func (sl *spinLock) Lock() { backoff := 1 for !atomic.CompareAndSwapUint32((*uint32 )(sl), 0 , 1 ) { for i := 0 ; i < backoff; i++ { runtime.Gosched() } if backoff < maxBackoff { backoff <<= 1 } } }func (sl *spinLock) Unlock() { atomic.StoreUint32((*uint32 )(sl), 0 ) }func NewSpinLock () sync.Locker { return new (spinLock) }
协程池中的同步锁原理很简单,就是暴力地使用CAS修改锁的标志位,如果未修改成功,便尝试重新拿锁,每次重新拿锁之前需要进行等待
而等待时间是以指数正增长,超过阈值便停止
如何理解下面这段代码?
1 2 3 for i := 0 ; i < backoff; i++ { runtime.Gosched() }
runtime ,即进程权限调度包,可以直接操纵CPU对进程的行为,Gosched()方法就是让当前goroutine让出CPU,好让其他的goroutine获得执行的机会。
所以这段代码的意思就是,让该协程停止拿锁,让出此时间片 给其他协程,直到该循环结束
Go会自动地把与该goroutine处于同一系统线程的其他goroutines转移到另一个系统线程上去,以使这些goroutines不阻塞
核心数据结构 Pool 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 type Pool struct { capacity int32 running int32 state int32 waiting int32 lock sync.Locker workers workerArray cond *sync.Cond workerCache sync.Pool heartbeatDone int32 stopHeartbeat context.CancelFunc options *Options }type Options struct { ExpiryDuration time.Duration PreAlloc bool MaxBlockingTasks int Nonblocking bool PanicHandler func (interface {}) Logger Logger }
goWorker 1 2 3 4 5 6 7 8 9 type goWorker struct { pool *Pool task chan func () recycleTime time.Time }
goWorker队列 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 type workerArray interface { len () int isEmpty() bool insert(worker *goWorker) error detach() *goWorker retrieveExpiry(duration time.Duration) []*goWorker reset() }type loopQueue struct { items []*goWorker expiry []*goWorker head int tail int size int isFull bool }type workerStack struct { items []*goWorker expiry []*goWorker }
临时池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 workerCache sync.Pooltype Pool struct { noCopy noCopy local unsafe.Pointer localSize uintptr victim unsafe.Pointer victimSize uintptr New func () any }
sync.Pool 是 Golang 内置的对象池 ,可用于缓存临时对象,也可以生成对象,避免因频繁建立临时对象所带来的消耗以及对 GC 造成的压力。
其中New func() any
是一个对象的构造函数,用户使用 Get
来从对象池中获取对象,使用 Put
将对象归还给对象池。
在ants中,构造函数长这样
1 2 3 4 5 6 7 p.workerCache.New = func () interface {} { return &goWorker{ pool: p, task: make (chan func () , workerChanCap), } }
所以临时池的的作用就是帮我们创建goworker并交给协程池的workerArray管理,承担了一个cache的角色
利用对象池的特性,实现对象复用
初始化pool 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 func NewPool (size int , options ...Option) (*Pool, error ) { opts := loadOptions(options...) if size <= 0 { size = -1 } if expiry := opts.ExpiryDuration; expiry < 0 { return nil , ErrInvalidPoolExpiry } else if expiry == 0 { opts.ExpiryDuration = DefaultCleanIntervalTime } if opts.Logger == nil { opts.Logger = defaultLogger } p := &Pool{ capacity: int32 (size), lock: internal.NewSpinLock(), options: opts, } p.workerCache.New = func () interface {} { return &goWorker{ pool: p, task: make (chan func () , workerChanCap), } } if p.options.PreAlloc { if size == -1 { return nil , ErrInvalidPreAllocSize } p.workers = newWorkerArray(loopQueueType, size) } else { p.workers = newWorkerArray(stackType, 0 ) } p.cond = sync.NewCond(p.lock) var ctx context.Context ctx, p.stopHeartbeat = context.WithCancel(context.Background()) go p.purgePeriodically(ctx) return p, nil }
初始化pool之前会对参数做一些校验,比如定期清理时间的设定是否合法,是否启用官方的日志。
初始化pool后,随即创建临时池
创建后判断是否需要内存预分配,若不需要就使用栈的方式来实现WorkerArray
最后创建一个额外的goroutine分配给清道夫函数
队列方法 显而易见,每个worker队列的实现类都拥有两个数组切片
items:存储被使用的goWorker
expiry:存储已经过期的goWorker
以循环队列为例,看几个关键方法
可用数组中拿取一个可用的goWorker 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (wq *loopQueue) detach() *goWorker { if wq.isEmpty() { return nil } w := wq.items[wq.head] wq.items[wq.head] = nil wq.head++ if wq.head == wq.size { wq.head = 0 } wq.isFull = false return w }
获取过期goWorker的数组切片 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func (wq *loopQueue) retrieveExpiry(duration time.Duration) []*goWorker { expiryTime := time.Now().Add(-duration) index := wq.binarySearch(expiryTime) if index == -1 { return nil } wq.expiry = wq.expiry[:0 ] if wq.head <= index { wq.expiry = append (wq.expiry, wq.items[wq.head:index+1 ]...) for i := wq.head; i < index+1 ; i++ { wq.items[i] = nil } } else { wq.expiry = append (wq.expiry, wq.items[0 :index+1 ]...) wq.expiry = append (wq.expiry, wq.items[wq.head:]...) for i := 0 ; i < index+1 ; i++ { wq.items[i] = nil } for i := wq.head; i < wq.size; i++ { wq.items[i] = nil } } head := (index + 1 ) % wq.size wq.head = head if len (wq.expiry) > 0 { wq.isFull = false } return wq.expiry }
由上文的revertWorker
源码可知,对于循环队列来说,队头存放的就是recycleTime最旧的goWorker,队尾就是最新的goWorker
这类似于LRU,对吧?所以直接使用二分法找出最接近过期时间的goWoker,便可以找出所有已过期的goWorker
向队列中插入goWorker 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (wq *loopQueue) insert(worker *goWorker) error { if wq.size == 0 { return errQueueIsReleased } if wq.isFull { return errQueueIsFull } wq.items[wq.tail] = worker wq.tail++ if wq.tail == wq.size { wq.tail = 0 } if wq.tail == wq.head { wq.isFull = true } return nil }
如何获取一个goWorker并执行 显而易见,每个goWorker其实就是对submit的任务进行了封装,并设置了过期时间,使用LRU来淘汰长时间不使用的goWoker
也许有一些goWorker绑定了任务,但是只会执行一次,所以得定时进行淘汰回收资源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 func (p *Pool) retrieveWorker() (w *goWorker) { spawnWorker := func () { w = p.workerCache.Get().(*goWorker) w.run() } p.lock.Lock() w = p.workers.detach()if w != nil { p.lock.Unlock() } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { p.lock.Unlock() spawnWorker() } else { if p.options.Nonblocking { p.lock.Unlock() return } retry: if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks { p.lock.Unlock() return } p.addWaiting(1 ) p.cond.Wait() p.addWaiting(-1 ) if p.IsClosed() { p.lock.Unlock() return } var nw int if nw = p.Running(); nw == 0 { p.lock.Unlock() spawnWorker() return } if w = p.workers.detach(); w == nil { if nw < p.Cap() { p.lock.Unlock() spawnWorker() return } goto retry } p.lock.Unlock() }return }
这个方法里有很多条件语句的嵌套,读起来甚是费力,也许是go语言没有括号的原因,JAVA看多了就不习惯
我们可以捋顺逻辑了:
尝试获取worker:正常获取到则返回,若无worker,证明此时刚刚初始化,就由临时池创建并返回。
未获取到worker:说明此时已经无空闲goroutine,执行下面的逻辑
若为非阻塞模式,直接返回空值
若为阻塞模式,使用死循环等待获取goroutine。这里使用了goto来实现,若当前阻塞协程数量超过阈值,直接退出。若获取不到或者无已经启用的goroutine就从临时池里获取
给goWorker绑定函数 1 2 3 4 5 6 7 8 9 10 11 12 func (p *Pool) Submit(task func () ) error { if p.IsClosed() { return ErrPoolClosed } var w *goWorker if w = p.retrieveWorker(); w == nil { return ErrPoolOverload } w.task <- task return nil }
让该goWorker执行工作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func (w *goWorker) run() { w.pool.addRunning(1 ) go func () { defer func () { w.pool.addRunning(-1 ) w.pool.workerCache.Put(w) if p := recover (); p != nil { if ph := w.pool.options.PanicHandler; ph != nil { ph(p) } else { w.pool.options.Logger.Printf("worker exits from a panic: %v\n" , p) var buf [4096 ]byte n := runtime.Stack(buf[:], false ) w.pool.options.Logger.Printf("worker exits from panic: %s\n" , string (buf[:n])) } } w.pool.cond.Signal() }() for f := range w.task { if f == nil { return } f() if ok := w.pool.revertWorker(w); !ok { return } } }() }
将goWorker放回池中 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (p *Pool) revertWorker(worker *goWorker) bool { if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() { p.cond.Broadcast() return false } worker.recycleTime = time.Now() p.lock.Lock() if p.IsClosed() { p.lock.Unlock() return false } err := p.workers.insert(worker) if err != nil { p.lock.Unlock() return false } p.cond.Signal() p.lock.Unlock() return true }
清道夫函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func (p *Pool) purgePeriodically(ctx context.Context) { heartbeat := time.NewTicker(p.options.ExpiryDuration) defer func () { heartbeat.Stop() atomic.StoreInt32(&p.heartbeatDone, 1 ) }() for { select { case <-heartbeat.C: case <-ctx.Done(): return } if p.IsClosed() { break } p.lock.Lock() expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration) p.lock.Unlock() for i := range expiredWorkers { expiredWorkers[i].task <- nil expiredWorkers[i] = nil } if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0 ) { p.cond.Broadcast() } } }
清道夫函数和队列实现了协程的高效复用,只有拿到已绑定函数的goWorker对象才能创建goroutine来执行函数
避免了高并发频繁创建goroutine给GC带来压力
goWorker放回队列后便会以LRU算法后移,过期的goWorker对象会被清道夫函数解除绑定的函数,等待新任务的到来