GeeRPC-day2

完成一个高性能Client

DAY1我们只完成了服务端,客户端的代码都在main函数里实现,今天我们来将client完善一下

Call的设计

封装一个完整的远程函数调用

1
2
3
4
5
6
7
8
9
10
11
12
13
type Call struct {
Seq uint64
ServiceMethod string // 方法名
Args interface{} // 参数
Reply interface{} // 返回值
Error error // 错误信息
Done chan *Call // 用于异步回调
}
//信道,异步回调通知调用方
func (call *Call) done() {
//单向出信道,代表已经完成调用
call.Done <- call
}

Client结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Client represents an RPC Client.
// There may be multiple outstanding Calls associated
// with a single Client, and a Client may be used by
// multiple goroutines simultaneously.
type Client struct {
cc codec.Codec//DAY1完成的编码解码器,用于序列化要发送的数据
opt *Option//option,标识符,用于识别相应的二进制流
sending sync.Mutex // 互斥锁,保证消息依次发送
header codec.Header//消息头
mu sync.Mutex // protect following
seq uint64//一个请求拥有的唯一编号
pending map[uint64]*Call //存储未完成的请求
closing bool // user has called Close
shutdown bool // server has told us to stop
}

var _ io.Closer = (*Client)(nil)

var ErrShutdown = errors.New("connection is shut down")

// Close the connection
func (client *Client) Close() error {
client.mu.Lock()
defer client.mu.Unlock()
if client.closing {
return ErrShutdown
}
client.closing = true
return client.cc.Close()
}

// IsAvailable return true if the client does work
func (client *Client) IsAvailable() bool {
client.mu.Lock()
defer client.mu.Unlock()
return !client.shutdown && !client.closing
}

为客户端设计功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//将参数 call 添加到 client.pending 中,并更新 client.seq
func (client *Client) registerCall(call *Call) (uint64, error) {
client.mu.Lock()
defer client.mu.Unlock()
if client.closing || client.shutdown {
return 0, ErrShutdown
}
call.Seq = client.seq
//在pending队列中添加此次调用的信息
client.pending[call.Seq] = call
client.seq++
return call.Seq, nil
}
//根据 seq,从 client.pending 中移除对应的 call,并返回
func (client *Client) removeCall(seq uint64) *Call {
client.mu.Lock()
defer client.mu.Unlock()
call := client.pending[seq]
delete(client.pending, seq)
return call
}
//服务端或客户端发生错误时调用,将 shutdown 设置为 true,且将错误信息通知所有 pending 状态的 call
func (client *Client) terminateCalls(err error) {
client.sending.Lock()
defer client.sending.Unlock()
client.mu.Lock()
defer client.mu.Unlock()
client.shutdown = true
for _, call := range client.pending {
call.Error = err
call.done()
}
}

接收响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (client *Client) receive() {
var err error
for err == nil {
var h codec.Header
if err = client.cc.ReadHeader(&h); err != nil {
break
}
call := client.removeCall(h.Seq)
switch {
//call 不存在,可能是请求没有发送完整,或者因为其他原因被取消,但是服务端仍旧处理了
case call == nil:
// it usually means that Write partially failed
// and call was already removed.
err = client.cc.ReadBody(nil)
//call 存在,但服务端处理出错,即 h.Error 不为空
case h.Error != "":
call.Error = fmt.Errorf(h.Error)
err = client.cc.ReadBody(nil)
call.done()
default:
//call 存在,服务端处理正常,那么需要从 body 中读取 Reply 的值
err = client.cc.ReadBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
// error occurs, so terminateCalls pending calls
client.terminateCalls(err)
}

发送信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (client *Client) send(call *Call) {
// make sure that the client will send a complete request
client.sending.Lock()
defer client.sending.Unlock()

// register this call.
seq, err := client.registerCall(call)
if err != nil {
call.Error = err
call.done()
return
}

// prepare request header
client.header.ServiceMethod = call.ServiceMethod
client.header.Seq = seq
client.header.Error = ""

// encode and send the request
if err := client.cc.Write(&client.header, call.Args); err != nil {
call := client.removeCall(seq)
// call may be nil, it usually means that Write partially failed,
// client has received the response and handled
if call != nil {
call.Error = err
call.done()
}
}
}

//GO是暴露给用户的接口,用于生成call实例,是一个异步接口
func (client *Client) Go(serviceMethod string, args, reply interface{}, done chan *Call) *Call {
if done == nil {
done = make(chan *Call, 10)
} else if cap(done) == 0 {
log.Panic("rpc client: done channel is unbuffered")
}
call := &Call{
ServiceMethod: serviceMethod,
Args: args,
Reply: reply,
Done: done,
}
client.send(call)
return call
}


//Call方法是对 Go 的封装,用于发起调用,并阻塞等待 call.Done,是一个同步接口
func (client *Client) Call(serviceMethod string, args, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}

初始化client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func NewClient(conn net.Conn, opt *Option) (*Client, error) {
//获取编码解码器
f := codec.NewCodecFuncMap[opt.CodecType]
if f == nil {
err := fmt.Errorf("invalid codec type %s", opt.CodecType)
log.Println("rpc client: codec error:", err)
return nil, err
}
// 与服务端协商option
if err := json.NewEncoder(conn).Encode(opt); err != nil {
log.Println("rpc client: options error: ", err)
_ = conn.Close()
return nil, err
}
return newClientCodec(f(conn), opt), nil
}
//创建client实例,启动一个协程来接收服务器响应
func newClientCodec(cc codec.Codec, opt *Option) *Client {
client := &Client{
seq: 1, // seq starts with 1, 0 means invalid call
cc: cc,
opt: opt,
pending: make(map[uint64]*Call),
}
//启动一个协程来接收响应
go client.receive()
return client
}

简化用户调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//用于生成option字段
func parseOptions(opts ...*Option) (*Option, error) {
// if opts is nil or pass nil as parameter
if len(opts) == 0 || opts[0] == nil {
return DefaultOption, nil
}
if len(opts) != 1 {
return nil, errors.New("number of options is more than 1")
}
opt := opts[0]
opt.MagicNumber = DefaultOption.MagicNumber
if opt.CodecType == "" {
opt.CodecType = DefaultOption.CodecType
}
return opt, nil
}

// 重写的Dial方法,内部封装了生成option和生成client实例的方法
func Dial(network, address string, opts ...*Option) (client *Client, err error) {
opt, err := parseOptions(opts...)
if err != nil {
return nil, err
}
//建立连接
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
// close the connection if client is nil
defer func() {
if client == nil {
_ = conn.Close()
}
}()
//生成client对象
return NewClient(conn, opt)
}

DAY2总结

今天实现了一个支持异步和并发的客户端,昨天的客户端太简单了,全程编码都在main函数中进行

几个重点

  1. 使用一个信道done来存储调用,用于异步回调通知调用方已经完成调用,当完成调用接收到返回值后,该call会出信道
  2. 我们重写了dial方法用于建立连接+生成client对象,此时客户端会生成一个协程来等待响应信息
  3. 客户端会使用call方法来生成一个call对象并发送,阻塞等待信道done,当等到了done,证明已经拿到了返回值

启动客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func main() {
log.SetFlags(0)
addr := make(chan string)
//和DAY1一样,启动服务器
go startServer(addr)
//使用我们重写的dial来建立连接,返回client对象
client, _ := geerpc.Dial("tcp", <-addr)
defer func() { _ = client.Close() }()

time.Sleep(time.Second)
// send request & receive response
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
args := fmt.Sprintf("geerpc req %d", i)
var reply string
//使用封装好的call方法发送信息获取返回值
if err := client.Call("Foo.Sum", args, &reply); err != nil {
log.Fatal("call Foo.Sum error:", err)
}
log.Println("reply:", reply)
}(i)
}
wg.Wait()
}

运行main函数结果如下


GeeRPC-day2
http://example.com/post/GeeRPC-day2.html
作者
SamuelZhou
发布于
2022年12月21日
许可协议