如果缺少超时处理机制,无论是服务端还是客户端都容易因为网络或其他错误导致挂死,资源耗尽,这些问题的出现大大地降低了服务的可用性。因此,我们需要在 RPC 框架中加入超时处理的能力
客户端创建连接超时 我们将超时设定放在option中,ConnectTimeout 默认值为 10s,HandleTimeout 默认值为 0,即不设限。
1 2 3 4 5 6 7 8 9 10 11 12 type Option struct { MagicNumber int CodecType codec.Type ConnectTimeout time.Duration HandleTimeout time.Duration }var DefaultOption = &Option{ MagicNumber: MagicNumber, CodecType: codec.GobType, ConnectTimeout: time.Second * 10 , }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 type clientResult struct { client *Client err error }type newClientFunc func (conn net.Conn, opt *Option) (client *Client, err error )func dialTimeout (f newClientFunc, network, address string , opts ...*Option) (client *Client, err error ) { opt, err := parseOptions(opts...) if err != nil { return nil , err } conn, err := net.DialTimeout(network, address, opt.ConnectTimeout) if err != nil { return nil , err } defer func () { if err != nil { _ = conn.Close() } }() ch := make (chan clientResult) go func () { client, err := f(conn, opt) ch <- clientResult{client: client, err: err} }() if opt.ConnectTimeout == 0 { result := <-ch return result.client, result.err } select { case <-time.After(opt.ConnectTimeout): return nil , fmt.Errorf("rpc client: connect timeout: expect within %s" , opt.ConnectTimeout) case result := <-ch: return result.client, result.err } }func Dial (network, address string , opts ...*Option) (*Client, error ) { return dialTimeout(NewClient, network, address, opts...) }
在DAY2的时候,我们重写了Dial方法,封装了net包下的dial方法,并包含了初始化option和client实例的代码,而今天我们再封装一层,实现了一个超时处理的外壳 dialTimeout
,这个壳将 NewClient 作为入参,在 2 个地方添加了超时处理的机制。
将 net.Dial
替换为 net.DialTimeout
,如果连接创建超时,将返回错误。
使用子协程执行 NewClient,执行完成后则通过信道 ch 发送结果,如果 time.After()
信道先接收到消息,则说明 NewClient 执行超时,返回错误。
客户端发送请求超时 Client.Call
的超时处理机制,使用 context 包实现,控制权交给用户,控制更为灵活、
1 2 3 4 5 6 7 8 9 10 11 func (client *Client) Call(ctx context.Context, serviceMethod string , args, reply interface {}) error { call := client.Go(serviceMethod, args, reply, make (chan *Call, 1 )) select { case <-ctx.Done(): client.removeCall(call.Seq) return errors.New("rpc client: call failed: " + ctx.Err().Error()) case call := <-call.Done: return call.Error } }
用户可以使用 context.WithTimeout
创建具备超时检测能力的 context 对象来控制
1 2 3 ctx, _ := context.WithTimeout(context.Background(), time.Second)var reply int err := client.Call(ctx, "Foo.Sum" , &Args{1 , 2 }, &reply)
服务端处理超时 这一部分的实现与客户端很接近,使用 time.After()
结合 select+chan
完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) { defer wg.Done() called := make (chan struct {}) sent := make (chan struct {}) go func () { err := req.svc.call(req.mtype, req.argv, req.replyv) called <- struct {}{} if err != nil { req.h.Error = err.Error() server.sendResponse(cc, req.h, invalidRequest, sending) sent <- struct {}{} return } server.sendResponse(cc, req.h, req.replyv.Interface(), sending) sent <- struct {}{} }() if timeout == 0 { <-called <-sent return } select { case <-time.After(timeout): req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s" , timeout) server.sendResponse(cc, req.h, invalidRequest, sending) case <-called: <-sent } }
记住,整个 handleRequest函数中,不管是哪种情况,sendResponse永远只会被调用一次
总结 DAY4完成的事情很简单,大致就是为之前已经写好的方法封装一层超时处理逻辑,让实际调用时感知不到代码量的提升
在Golang网络编程中,经常要遇到设置超时的需求,而Go语言提供了time.After实现超时控制的相关内容
time.After()
表示time.Duration
长的时候后返回一条time.Time类型的通道消息,相当于是实现了计时器的功能
而一般time.After()会配合case语句进行超时逻辑处理,当time.After()
信道先接收到消息,证明已超时,则执行超时逻辑处理