GeeRPC-day5

Web 开发中,我们经常使用 HTTP 协议中的 HEAD、GET、POST 等方式发送请求,等待响应。

但 RPC 的消息格式与标准的 HTTP 协议并不兼容,在这种情况下,就需要一个协议的转换过程。

HTTP 协议的 CONNECT 方法恰好提供了这个能力,CONNECT 一般用于代理服务。

对 RPC 服务端而言,需要做的是将 HTTP 协议转换为 RPC 协议

对客户端来说,需要新增通过 HTTP CONNECT 请求创建连接的逻辑

所以加入http协议后的通信流程应该是这样的

  1. 客户端向 RPC 服务器发送 CONNECT 请求:CONNECT 10.0.0.1:9999/_geerpc_ HTTP/1.0
  2. RPC 服务器返回 HTTP 200 状态码表示连接建立:HTTP/1.0 200 Connected to Gee RPC
  3. 客户端通过创建好的连接发送 RPC 报文,先发送 Option,再发送请求报文,服务端处理 RPC 请求并响应

在服务端添加处理http请求的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
const (
connected = "200 Connected to Gee RPC"
defaultRPCPath = "/_geeprc_"
defaultDebugPath = "/debug/geerpc"
)

// ServeHTTP implements an http.Handler that answers RPC requests.
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
//不是connect请求
if req.Method != "CONNECT" {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusMethodNotAllowed)
_, _ = io.WriteString(w, "405 must CONNECT\n")
return
}
//劫持连接对象
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
return
}
//回复状态码表示连接建立
_, _ = io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
server.ServeConn(conn)
}

// HandleHTTP registers an HTTP handler for RPC messages on rpcPath.
// It is still necessary to invoke http.Serve(), typically in a go statement.
func (server *Server) HandleHTTP() {
http.Handle(defaultRPCPath, server)
}

// HandleHTTP is a convenient approach for default server to register HTTP handlers
func HandleHTTP() {
DefaultServer.HandleHTTP()
}

ServeHTTP方法,传入一个http包下的空接口和http请求体来作为参数,内部方法均使用http包下的原生方法进行封装

defaultDebugPath 是为后续 DEBUG 页面预留的地址。

标准库http

在 Go 语言中处理 HTTP 请求是非常简单的一件事,Go 标准库中 http.Handle 的实现如下:

1
2
3
4
5
6
package http
// Handle registers the handler for the given pattern
// in the DefaultServeMux.
// The documentation for ServeMux explains how patterns are matched.
func Handle(pattern string, handler Handler)
{ DefaultServeMux.Handle(pattern, handler) }

第一个参数pattern是支持通配的字符串,我们固定传入 /_geeprc_

第二个参数Handler 是一个接口

1
2
3
type Handler interface {
ServeHTTP(w ResponseWriter, r *Request)
}

也就是说,只需要实现接口 Handler 内的方法ServeHTTP即可作为一个 HTTP Handler 处理 HTTP 请求。

在客户端添加http处理逻辑

客户端的事情很简单,只需要发起 CONNECT 请求,检查返回状态码即可成功建立连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// NewHTTPClient new a Client instance via HTTP as transport protocol
func NewHTTPClient(conn net.Conn, opt *Option) (*Client, error) {
//通过http connect请求建立连接后,后续的通信就交给newClient了
_, _ = io.WriteString(conn, fmt.Sprintf("CONNECT %s HTTP/1.0\n\n", defaultRPCPath))

// Require successful HTTP response
// before switching to RPC protocol.
resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
if err == nil && resp.Status == connected {
return NewClient(conn, opt)
}
if err == nil {
err = errors.New("unexpected HTTP response: " + resp.Status)
}
return nil, err
}

// DialHTTP connects to an HTTP RPC server at the specified network address
// listening on the default HTTP RPC path.
func DialHTTP(network, address string, opts ...*Option) (*Client, error) {
return dialTimeout(NewHTTPClient, network, address, opts...)
}

为了简化调用,我们封装一个xdial方法,该方法会检测请求体的第一个参数

如果是http就使用我们写好的方法,如果是其他就调用go net包下的dial方法来处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// XDial calls different functions to connect to a RPC server
// according the first parameter rpcAddr.
// rpcAddr is a general format (protocol@addr) to represent a rpc server
// eg, http@10.0.0.1:7001, tcp@10.0.0.1:9999, unix@/tmp/geerpc.sock
func XDial(rpcAddr string, opts ...*Option) (*Client, error) {
parts := strings.Split(rpcAddr, "@")
if len(parts) != 2 {
return nil, fmt.Errorf("rpc client err: wrong format '%s', expect protocol@addr", rpcAddr)
}
protocol, addr := parts[0], parts[1]
switch protocol {
case "http":
return DialHTTP("tcp", addr, opts...)
default:
// tcp, unix or other transport protocol
return Dial(protocol, addr, opts...)
}
}

测试

实现一个debug界面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package geerpc

import (
"fmt"
"html/template"
"net/http"
)

const debugText = `<html>
<body>
<title>GeeRPC Services</title>
{{range .}}
<hr>
Service {{.Name}}
<hr>
<table>
<th align=center>Method</th><th align=center>Calls</th>
{{range $name, $mtype := .Method}}
<tr>
<td align=left font=fixed>{{$name}}({{$mtype.ArgType}}, {{$mtype.ReplyType}}) error</td>
<td align=center>{{$mtype.NumCalls}}</td>
</tr>
{{end}}
</table>
{{end}}
</body>
</html>`

var debug = template.Must(template.New("RPC debug").Parse(debugText))

type debugHTTP struct {
*Server
}

type debugService struct {
Name string
Method map[string]*methodType
}

// Runs at /debug/geerpc
//这个就是上文提到的,实现serveHTTP即可作为一个handler处理http请求
func (server debugHTTP) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Build a sorted version of the data.
var services []debugService
server.serviceMap.Range(func(namei, svci interface{}) bool {
svc := svci.(*service)
services = append(services, debugService{
Name: namei.(string),
Method: svc.method,
})
return true
})
err := debug.Execute(w, services)
if err != nil {
_, _ = fmt.Fprintln(w, "rpc: error executing template:", err.Error())
}
}

通过上述代码我们将得到一个debugHTTP 实例,里面包含了HTML报文

将 debugHTTP 实例绑定到地址 /debug/geerpc

1
2
3
4
5
func (server *Server) HandleHTTP() {
http.Handle(defaultRPCPath, server)
http.Handle(defaultDebugPath, debugHTTP{server})
log.Println("rpc server debug path:", defaultDebugPath)
}

启动服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type Foo int

type Args struct{ Num1, Num2 int }

func (f Foo) Sum(args Args, reply *int) error {
*reply = args.Num1 + args.Num2
return nil
}

func startServer(addrCh chan string) {
var foo Foo
l, _ := net.Listen("tcp", ":9999")
_ = geerpc.Register(&foo)
//将 startServer 中的 geerpc.Accept() 替换为了 geerpc.HandleHTTP()
geerpc.HandleHTTP()
addrCh <- l.Addr().String()
_ = http.Serve(l, nil)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func call(addrCh chan string) {
//客户端将 Dial 替换为 DialHTTP,其余地方没有发生改变。
client, _ := geerpc.DialHTTP("tcp", <-addrCh)
defer func() { _ = client.Close() }()

time.Sleep(time.Second)
// send request & receive response
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
args := &Args{Num1: i, Num2: i * i}
var reply int
if err := client.Call(context.Background(), "Foo.Sum", args, &reply); err != nil {
log.Fatal("call Foo.Sum error:", err)
}
log.Printf("%d + %d = %d", args.Num1, args.Num2, reply)
}(i)
}
wg.Wait()
}

func main() {
log.SetFlags(0)
ch := make(chan string)
go call(ch)
startServer(ch)
}

此时我们如果在浏览器中访问 localhost:9999/debug/geerpc,将会看到

总结

今天我们自己使用http包的原生方法自制了双方处理http请求的逻辑

对于服务端来说,只接收CONNECT的请求,并且会劫持该http的tcp连接来生成con对象,从而建立连接

debugHTTP持Server对象,实现handler接口,处理函数里用持有的Server做一些debug相关的统计,这时候可以通过HTTP请求获取到对应的Server的一些调用状态。

看起来更像是让client可以通过HTTP CONNECT方法来创建RPC C/S之间的连接,这个对于RPC框架的使用者是不是透明的,使用这个RPC框架,调用的时候还是创建Client,然后调用CallMethod,还是这种函数的调用方式。


GeeRPC-day5
http://example.com/post/GeeRPC-day5.html
作者
SamuelZhou
发布于
2022年12月27日
许可协议