GeeRPC-day3

前面完成了客户端的功能,但是我们并没有实现结构体与服务的映射关系,今天我会补全这一部分功能

Golang的反射

反射将“接口类型的变量”转为了“反射的接口类型的变量”,比如实际上返回的是reflect.Value和reflect.Type的接口对象

获取变量内部信息

reflect提供了两种类型来进行访问接口变量的内容

类型 作用
reflect.ValueOf() 获取输入参数接口中的数据的值,如果为空则返回0 <- 注意是0
reflect.TypeOf() 动态获取输入参数接口中的值的类型,如果为空则返回nil <- 注意是nil

获取struct的内部信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type Student struct {
Id int
Name string
}

func (s Student) Hello(){
fmt.Println("我是一个学生")
}

func main() {
s := Student{Id: 1, Name: "咖啡色的羊驼"}

// 获取目标对象
t := reflect.TypeOf(s)
// .Name()可以获取去这个类型的名称
fmt.Println("这个类型的名称是", t.Name())

// 获取目标对象的值类型
v := reflect.ValueOf(s)
// .NumField()来获取其包含的字段的总数
for i := 0; i < t.NumField(); i++ {
// 从0开始获取Student所包含的key
key := t.Field(i)

// 通过interface方法来获取key所对应的值
value := v.Field(i).Interface()

fmt.Printf("第%d个字段是:%s:%v = %v \n", i+1, key.Name, key.Type, value)
}

// 通过.NumMethod()来获取Student里头的方法
for i:=0;i<t.NumMethod(); i++ {
m := t.Method(i)
fmt.Printf("第%d个方法是:%s:%v\n", i+1, m.Name, m.Type)
}

通过反射修改内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type Student struct {
Id int
Name string
}
func main() {
s := &Student{Id: 1, Name: "咖啡色的羊驼"}
v := reflect.ValueOf(s)
// 修改值必须是指针类型否则不可行
if v.Kind() != reflect.Ptr {
fmt.Println("不是指针类型,没法进行修改操作")
return
}
// 获取指针所指向的元素
v = v.Elem()
// 获取目标key的Value的封装
name := v.FieldByName("Name")

if name.Kind() == reflect.String {
name.SetString("小学生")
}
fmt.Printf("%#v \n", *s)

创建方法结构体

众所周知,我们不可能通过硬编码的方式(switch+case语句)来识别调用的方法并创建实例

每暴露一个方法,就需要编写等量的代码。那有没有什么方式,能够将这个映射过程自动化呢?可以借助反射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type methodType struct {
method reflect.Method//方法本身
ArgType reflect.Type//第一个参数
ReplyType reflect.Type//第二个参数
numCalls uint64//统计方法调用次数
}

func (m *methodType) NumCalls() uint64 {
return atomic.LoadUint64(&m.numCalls)
}
//创建入参实例
func (m *methodType) newArgv() reflect.Value {
var argv reflect.Value
// arg may be a pointer type, or a value type
if m.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(m.ArgType.Elem())
} else {
argv = reflect.New(m.ArgType).Elem()
}
return argv
}
//创建返回值实例
func (m *methodType) newReplyv() reflect.Value {
// reply must be a pointer type
replyv := reflect.New(m.ReplyType.Elem())
switch m.ReplyType.Elem().Kind() {
case reflect.Map:
replyv.Elem().Set(reflect.MakeMap(m.ReplyType.Elem()))
case reflect.Slice:
replyv.Elem().Set(reflect.MakeSlice(m.ReplyType.Elem(), 0, 0))
}
return replyv
}

构建Service

1
2
3
4
5
6
type service struct {
name string //映射的方法结构体的名称
typ reflect.Type//结构体类型
rcvr reflect.Value//方法结构体实例本身
method map[string]*methodType//存储映射的符合条件的方法结构体
}

完成service的构造函数,入参是任何需要映射为服务的方法结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func newService(rcvr interface{}) *service {
//通过反射来构建service实例
s := new(service)
s.rcvr = reflect.ValueOf(rcvr)
s.name = reflect.Indirect(s.rcvr).Type().Name()
s.typ = reflect.TypeOf(rcvr)
if !ast.IsExported(s.name) {
log.Fatalf("rpc server: %s is not a valid service name", s.name)
}
s.registerMethods()
return s
}
//过滤出所有符合条件的方法
func (s *service) registerMethods() {
s.method = make(map[string]*methodType)
for i := 0; i < s.typ.NumMethod(); i++ {
method := s.typ.Method(i)
mType := method.Type
//三个入参,一个返回值
if mType.NumIn() != 3 || mType.NumOut() != 1 {
continue
}
if mType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() {
continue
}
argType, replyType := mType.In(1), mType.In(2)
if !isExportedOrBuiltinType(argType) || !isExportedOrBuiltinType(replyType) {
continue
}
s.method[method.Name] = &methodType{
method: method,
ArgType: argType,
ReplyType: replyType,
}
log.Printf("rpc server: register %s.%s\n", s.name, method.Name)
}
}

func isExportedOrBuiltinType(t reflect.Type) bool {
return ast.IsExported(t.Name()) || t.PkgPath() == ""
}

方法的调用

1
2
3
4
5
6
7
8
9
10
func (s *service) call(m *methodType, argv, replyv reflect.Value) error {
atomic.AddUint64(&m.numCalls, 1)
f := m.method.Func
//通过反射调用方法,这里就是实际执行方法的地方!!
returnValues := f.Call([]reflect.Value{s.rcvr, argv, replyv})
if errInter := returnValues[0].Interface(); errInter != nil {
return errInter.(error)
}
return nil
}

将服务注册集成到服务端

之前的server是一个空结构体,现在我们将其补全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Server represents an RPC Server.
type Server struct {
//添加一个map,用于存储service名称和service实例
serviceMap sync.Map
}

///register,将注册的service存入map中
func (server *Server) Register(rcvr interface{}) error {
s := newService(rcvr)
if _, dup := server.serviceMap.LoadOrStore(s.name, s); dup {
return errors.New("rpc: service already defined: " + s.name)
}
return nil
}

// Register publishes the receiver's methods in the DefaultServer.
func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//通过 ServiceMethod 从 serviceMap 中找到对应的 service
func (server *Server) findService(serviceMethod string) (svc *service, mtype *methodType, err error) {
dot := strings.LastIndex(serviceMethod, ".")
if dot < 0 {
err = errors.New("rpc server: service/method request ill-formed: " + serviceMethod)
return
}
serviceName, methodName := serviceMethod[:dot], serviceMethod[dot+1:]
//从map中找到service
svci, ok := server.serviceMap.Load(serviceName)
if !ok {
err = errors.New("rpc server: can't find service " + serviceName)
return
}
svc = svci.(*service)
//找到methodType
mtype = svc.method[methodName]
if mtype == nil {
err = errors.New("rpc server: can't find method " + methodName)
}
return
}

因为 ServiceMethod 的构成是 “Service.Method”,因此先将其分割成 2 部分,第一部分是 Service 的名称,第二部分即方法名。

先在 serviceMap 中找到对应的 service 实例,再从 service 实例的 method 中,找到对应的 methodType。

补全readRequest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// request stores all information of a call
type request struct {
h *codec.Header // header of request
argv, replyv reflect.Value // argv and replyv of request
//和前几天的不同,我们添加了以下字段,代表调用的方法名和service实例
mtype *methodType
svc *service
}

func (server *Server) readRequest(cc codec.Codec) (*request, error) {
h, err := server.readRequestHeader(cc)
if err != nil {
return nil, err
}
req := &request{h: h}
req.svc, req.mtype, err = server.findService(h.ServiceMethod)
if err != nil {
return req, err
}
//创建实例
req.argv = req.mtype.newArgv()
req.replyv = req.mtype.newReplyv()

// make sure that argvi is a pointer, ReadBody need a pointer as parameter
argvi := req.argv.Interface()
if req.argv.Type().Kind() != reflect.Ptr {
argvi = req.argv.Addr().Interface()
}
if err = cc.ReadBody(argvi); err != nil {
log.Println("rpc server: read body err:", err)
return req, err
}
return req, nil
}

readRequest 方法中最重要的部分,即通过 newArgv()newReplyv() 两个方法创建出两个入参实例

然后通过 cc.ReadBody() 将请求报文反序列化为第一个入参 argv

1
2
3
4
5
6
7
8
9
10
11
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()
//上文的call方法
err := req.svc.call(req.mtype, req.argv, req.replyv)
if err != nil {
req.h.Error = err.Error()
server.sendResponse(cc, req.h, invalidRequest, sending)
return
}
server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
}

handleRequest 的实现非常简单,通过 req.svc.call 完成方法调用,将 replyv 传递给 sendResponse 完成序列化即可。

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//定义结构体 Foo实现的方法
type Foo int

type Args struct{ Num1, Num2 int }
//RPC的方法调用必须为导出方法,即在golang中函数名必须为大写
func (f Foo) Sum(args Args, reply *int) error {
*reply = args.Num1 + args.Num2
return nil
}


func _assert(condition bool, msg string, v ...interface{}) {
if !condition {
panic(fmt.Sprintf("assertion failed: "+msg, v...))
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//测试 newService 和 call 方法
func TestNewService(t *testing.T) {
var foo Foo
//服务注册
s := newService(&foo)
_assert(len(s.method) == 1, "wrong service Method, expect 1, but got %d", len(s.method))
mType := s.method["Sum"]
_assert(mType != nil, "wrong Method, Sum shouldn't nil")
}
//服务调用
func TestMethodType_Call(t *testing.T) {
var foo Foo
s := newService(&foo)
mType := s.method["Sum"]

argv := mType.newArgv()
replyv := mType.newReplyv()
argv.Set(reflect.ValueOf(Args{Num1: 1, Num2: 3}))
err := s.call(mType, argv, replyv)
_assert(err == nil && *replyv.Interface().(*int) == 4 && mType.NumCalls() == 1, "failed to call Foo.Sum")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//注册foo并启动server
func startServer(addr chan string) {
var foo Foo
if err := geerpc.Register(&foo); err != nil {
log.Fatal("register error:", err)
}
// pick a free port
l, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatal("network error:", err)
}
log.Println("start rpc server on", l.Addr())
addr <- l.Addr().String()
geerpc.Accept(l)
}

main函数,发送RPC请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func main() {
log.SetFlags(0)
addr := make(chan string)
go startServer(addr)
client, _ := geerpc.Dial("tcp", <-addr)
defer func() { _ = client.Close() }()

time.Sleep(time.Second)
// send request & receive response
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
args := &Args{Num1: i, Num2: i * i}
var reply int
if err := client.Call("Foo.Sum", args, &reply); err != nil {
log.Fatal("call Foo.Sum error:", err)
}
log.Printf("%d + %d = %d", args.Num1, args.Num2, reply)
}(i)
}
wg.Wait()
}

最终结果

总结

今天我们完成了服务注册的相关代码,现在发起一个调用涉及到三个结构体

  1. request:这里包含了一个请求的所有信息,包括请求头,入参和返回值,方法结构体指针,service实例
  2. 方法结构体:包含了实际要执行的方法函数,入参和返回值,当注册Service时程序会检测该方法的返回值和参数是否合法,合法才会创建方法结构体
  3. Service:用于注册,当一个方法注册后,就会对应一个service实例,内部有一个map存储名称与方法结构体的映射

当调用被server端反序列化后,会首先解析头部的字符串,看是否能在map中找到已注册的service实例,并从中找到方法结构体,若找到,程序通过反射创建入参和返回值的实例,记住,通过反射创建,我们就无需知道实际的类型。通过反射,可以获得方法结构体中的函数,也就是实际要执行的程序代码,同样使用反射包下的Call函数来进行调用。

最后即可将返回值和头部序列化写回客户端


GeeRPC-day3
http://example.com/post/GeeRPC-day3.html
作者
SamuelZhou
发布于
2022年12月22日
许可协议