GeeRPC day1

序列化协议的制定

Header:存储除返回值和参数外的信息

1
2
3
4
5
type Header struct {
ServiceMethod string // 调用方法名
Seq uint64 // 请求ID,用于区分不同请求
Error string
}

option:放在头部前,协商确定编码方式

1
2
3
4
5
6
7
8
9
10
11
const MagicNumber = 0x3bef5c

type Option struct {
MagicNumber int // MagicNumber marks this's a geerpc request
CodecType codec.Type // client may choose different Codec to encode body
}

var DefaultOption = &Option{
MagicNumber: MagicNumber,
CodecType: codec.GobType,
}

编解码接口

1
2
3
4
5
6
type Codec interface {
io.Closer
ReadHeader(*Header) error
ReadBody(interface{}) error
Write(*Header, interface{}) error
}

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type NewCodecFunc func(io.ReadWriteCloser) Codec

type Type string

const (
GobType Type = "application/gob"
JsonType Type = "application/json" // not implemented
)
var NewCodecFuncMap map[Type]NewCodecFunc
//只演示GobCodec的实现
func init() {
//使用Map来定义多种编解码器,只需实现接口再加在这里就可以了
NewCodecFuncMap = make(map[Type]NewCodecFunc)
NewCodecFuncMap[GobType] = NewGobCodec
}

自定义序列化器GobCodec

gob是Golang包自带的一个数据结构序列化的编码/解码工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
type GobCodec struct {
conn io.ReadWriteCloser//socket链接实例
buf *bufio.Writer//缓冲区
dec *gob.Decoder//解码器
enc *gob.Encoder//编码器
}

var _ Codec = (*GobCodec)(nil)
//初始化函数
func NewGobCodec(conn io.ReadWriteCloser) Codec {
buf := bufio.NewWriter(conn)
return &GobCodec{
conn: conn,
buf: buf,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
}
}
//实现Codec接口的方法
func (c *GobCodec) ReadHeader(h *Header) error {
return c.dec.Decode(h)
}

func (c *GobCodec) ReadBody(body interface{}) error {
return c.dec.Decode(body)
}
//编码并发送到缓冲区
func (c *GobCodec) Write(h *Header, body interface{}) (err error) {
defer func() {
_ = c.buf.Flush()
if err != nil {
_ = c.Close()
}
}()
if err := c.enc.Encode(h); err != nil {
log.Println("rpc codec: gob error encoding header:", err)
return err
}
if err := c.enc.Encode(body); err != nil {
log.Println("rpc codec: gob error encoding body:", err)
return err
}
return nil
}

func (c *GobCodec) Close() error {
return c.conn.Close()
}

服务提供

处理连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Server represents an RPC Server.
type Server struct{}

// NewServer returns a new Server.
func NewServer() *Server {
return &Server{}
}

// DefaultServer is the default instance of *Server.
var DefaultServer = NewServer()

//轮询监听新连接,并创建一个协程处理新连接
func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Println("rpc server: accept error:", err)
return
}
go server.ServeConn(conn)
}
}

func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//通过反序列化得到option实例,验证基本信息是否正确
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
defer func() { _ = conn.Close() }()
var opt Option
if err := json.NewDecoder(conn).Decode(&opt); err != nil {
log.Println("rpc server: options error: ", err)
return
}
//验证魔数
if opt.MagicNumber != MagicNumber {
log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)
return
}
//通过type得到对应的解码器
f := codec.NewCodecFuncMap[opt.CodecType]
if f == nil {
log.Printf("rpc server: invalid codec type %s", opt.CodecType)
return
}
//将处理过程交给解码器
server.serveCodec(f(conn))
}

提供服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// invalidRequest is a placeholder for response argv when error occurs
var invalidRequest = struct{}{}

func (server *Server) serveCodec(cc codec.Codec) {
//使用互斥锁来保证逐个回复
sending := new(sync.Mutex) // make sure to send a complete response
wg := new(sync.WaitGroup) // wait until all request are handled
for {
//读取请求
req, err := server.readRequest(cc)
if err != nil {
if req == nil {
break // it's not possible to recover, so close the connection
}
req.h.Error = err.Error()
//回复请求
server.sendResponse(cc, req.h, invalidRequest, sending)
continue
}
wg.Add(1)
//处理请求,使用协程并发处理
go server.handleRequest(cc, req, sending, wg)
}
wg.Wait()
_ = cc.Close()
}

服务端的所有方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// request stores all information of a call
type request struct {
h *codec.Header // header of request
argv, replyv reflect.Value // argv and replyv of request
}
//读取头部信息
func (server *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) {
var h codec.Header
//读取头部
if err := cc.ReadHeader(&h); err != nil {
if err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println("rpc server: read header error:", err)
}
return nil, err
}
return &h, nil
}
//读取请求
func (server *Server) readRequest(cc codec.Codec) (*request, error) {
//先读取到头部
h, err := server.readRequestHeader(cc)
if err != nil {
return nil, err
}
req := &request{h: h}
//假定输入的参数是字符串
// TODO: now we don't know the type of request argv
// day 1, just suppose it's string
req.argv = reflect.New(reflect.TypeOf(""))
if err = cc.ReadBody(req.argv.Interface()); err != nil {
log.Println("rpc server: read argv err:", err)
}
return req, nil
}
//发送请求
func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex) {
//拿到互斥锁
sending.Lock()
defer sending.Unlock()
//写回请求
if err := cc.Write(h, body); err != nil {
log.Println("rpc server: write response error:", err)
}
}
//处理请求
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {
// TODO, should call registered rpc methods to get the right replyv
// day 1, just print argv and send a hello message
defer wg.Done()
log.Println(req.h, req.argv.Elem())
req.replyv = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq))
//写回请求
server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
}

DAY1总结

RPC的一大核心便是序列化和反序列化

用户与服务端约定双方使用的协议,我们使用option+header+body的形式组建一个简单且安全的二进制流

程序使用option中的魔数来截取需要的二进制流,使用header来获取调用的方法名。

第一天我们没有实现body内包装对象或者其他数据,而是使用一个空接口来填充

对于RPC的服务提供方,总体的处理流程可以泛化为四步:

  1. 处理连接:反序列化获得option,截取到需要的二进制流
  2. 读取请求:读取header
  3. 处理请求:在这里我们使用字符串来代替body中的参数和返回值
  4. 发送请求:使用同步锁来发送缓冲区的数据

总的来说,DAY1我们实现了一个最简单的单机RPC模型

通过GO自带的GOB包实现了简单的序列化和反序列化

服务启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//选一个端口启动服务
func startServer(addr chan string) {
// pick a free port
l, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatal("network error:", err)
}
log.Println("start rpc server on", l.Addr())
addr <- l.Addr().String()
//与服务器建立连接
geerpc.Accept(l)
}

func main() {
log.SetFlags(0)
addr := make(chan string)
go startServer(addr)

// in fact, following code is like a simple geerpc client
conn, _ := net.Dial("tcp", <-addr)
defer func() { _ = conn.Close() }()

time.Sleep(time.Second)
// send options
_ = json.NewEncoder(conn).Encode(geerpc.DefaultOption)
cc := codec.NewGobCodec(conn)
// 发送信息,获取返回值
for i := 0; i < 5; i++ {
h := &codec.Header{
ServiceMethod: "Foo.Sum",
Seq: uint64(i),
}
_ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))
_ = cc.ReadHeader(h)
var reply string
_ = cc.ReadBody(&reply)
log.Println("reply:", reply)
}
}

运行main函数结果如下


GeeRPC day1
http://example.com/post/GeeRPC-day1.html
作者
SamuelZhou
发布于
2022年12月20日
许可协议