GeeRPC-day4

如果缺少超时处理机制,无论是服务端还是客户端都容易因为网络或其他错误导致挂死,资源耗尽,这些问题的出现大大地降低了服务的可用性。因此,我们需要在 RPC 框架中加入超时处理的能力

客户端创建连接超时

我们将超时设定放在option中,ConnectTimeout 默认值为 10s,HandleTimeout 默认值为 0,即不设限。

1
2
3
4
5
6
7
8
9
10
11
12
type Option struct {
MagicNumber int // MagicNumber marks this's a geerpc request
CodecType codec.Type // client may choose different Codec to encode body
ConnectTimeout time.Duration // 0 means no limit
HandleTimeout time.Duration
}

var DefaultOption = &Option{
MagicNumber: MagicNumber,
CodecType: codec.GobType,
ConnectTimeout: time.Second * 10,
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
type clientResult struct {
client *Client
err error
}

type newClientFunc func(conn net.Conn, opt *Option) (client *Client, err error)

func dialTimeout(f newClientFunc, network, address string, opts ...*Option) (client *Client, err error) {
opt, err := parseOptions(opts...)
if err != nil {
return nil, err
}
conn, err := net.DialTimeout(network, address, opt.ConnectTimeout)
if err != nil {
return nil, err
}
// close the connection if client is nil
defer func() {
if err != nil {
_ = conn.Close()
}
}()
//使用信道来记录client对象和超时信息
ch := make(chan clientResult)
go func() {
client, err := f(conn, opt)
ch <- clientResult{client: client, err: err}
}()
if opt.ConnectTimeout == 0 {
result := <-ch
return result.client, result.err
}
//信道+select
select {
//处理超时
case <-time.After(opt.ConnectTimeout):
return nil, fmt.Errorf("rpc client: connect timeout: expect within %s", opt.ConnectTimeout)
case result := <-ch:
return result.client, result.err
}
}
//
func Dial(network, address string, opts ...*Option) (*Client, error) {
return dialTimeout(NewClient, network, address, opts...)
}

在DAY2的时候,我们重写了Dial方法,封装了net包下的dial方法,并包含了初始化option和client实例的代码,而今天我们再封装一层,实现了一个超时处理的外壳 dialTimeout,这个壳将 NewClient 作为入参,在 2 个地方添加了超时处理的机制。

  1. net.Dial 替换为 net.DialTimeout,如果连接创建超时,将返回错误。
  2. 使用子协程执行 NewClient,执行完成后则通过信道 ch 发送结果,如果 time.After() 信道先接收到消息,则说明 NewClient 执行超时,返回错误。

客户端发送请求超时

Client.Call 的超时处理机制,使用 context 包实现,控制权交给用户,控制更为灵活、

1
2
3
4
5
6
7
8
9
10
11
//给入参添加了一个context对象
func (client *Client) Call(ctx context.Context, serviceMethod string, args, reply interface{}) error {
call := client.Go(serviceMethod, args, reply, make(chan *Call, 1))
select {
case <-ctx.Done():
client.removeCall(call.Seq)
return errors.New("rpc client: call failed: " + ctx.Err().Error())
case call := <-call.Done:
return call.Error
}
}

用户可以使用 context.WithTimeout 创建具备超时检测能力的 context 对象来控制

1
2
3
ctx, _ := context.WithTimeout(context.Background(), time.Second)
var reply int
err := client.Call(ctx, "Foo.Sum", &Args{1, 2}, &reply)

服务端处理超时

这一部分的实现与客户端很接近,使用 time.After() 结合 select+chan 完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {
defer wg.Done()
called := make(chan struct{})
sent := make(chan struct{})
//启动一个协程来进行方法调用和发送返回值
go func() {
err := req.svc.call(req.mtype, req.argv, req.replyv)
called <- struct{}{}
if err != nil {
req.h.Error = err.Error()
server.sendResponse(cc, req.h, invalidRequest, sending)
sent <- struct{}{}
return
}
server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
sent <- struct{}{}
}()
if timeout == 0 {
<-called
<-sent
return
}
select {
//处理超时的逻辑
case <-time.After(timeout):
req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)
server.sendResponse(cc, req.h, invalidRequest, sending)
case <-called:
<-sent
}
}

记住,整个 handleRequest函数中,不管是哪种情况,sendResponse永远只会被调用一次

总结

DAY4完成的事情很简单,大致就是为之前已经写好的方法封装一层超时处理逻辑,让实际调用时感知不到代码量的提升

在Golang网络编程中,经常要遇到设置超时的需求,而Go语言提供了time.After实现超时控制的相关内容

time.After()表示time.Duration长的时候后返回一条time.Time类型的通道消息,相当于是实现了计时器的功能

而一般time.After()会配合case语句进行超时逻辑处理,time.After() 信道先接收到消息,证明已超时,则执行超时逻辑处理


GeeRPC-day4
http://example.com/post/GeeRPC-day4.html
作者
SamuelZhou
发布于
2022年12月23日
许可协议