GeeRPC-LastDay

注册中心

如果我们引入注册中心,那么客户端和服务端都只需要感知注册中心的存在,而无需感知对方的存在。

更具体一些:

  1. 服务端启动后,向注册中心发送注册消息,注册中心得知该服务已经启动,处于可用状态。一般来说,服务端还需要定期向注册中心发送心跳,证明自己还活着。
  2. 客户端向注册中心询问,当前哪条服务是可用的,注册中心将可用的服务列表返回客户端。
  3. 客户端根据注册中心得到的服务列表,选择其中一个发起调用。

如果没有注册中心,就像 上一篇文章实现的一样,客户端需要硬编码服务端的地址,而且没有机制保证服务端是否处于可用状态。当然注册中心的功能还有很多,比如配置的动态同步、通知机制等。

Gee Registry

我们来实现一个简单支持心跳保活的注册中心

先定义结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// GeeRegistry is a simple register center, provide following functions.
// add a server and receive heartbeat to keep it alive.
// returns all alive servers and delete dead servers sync simultaneously.
type GeeRegistry struct {
timeout time.Duration
mu sync.Mutex // 互斥锁,保证使用注册中心的顺序
servers map[string]*ServerItem
}

type ServerItem struct {
Addr string
start time.Time
}

const (
defaultPath = "/_geerpc_/registry"
defaultTimeout = time.Minute * 5
)

// New create a registry instance with timeout setting
func New(timeout time.Duration) *GeeRegistry {
return &GeeRegistry{
servers: make(map[string]*ServerItem),
timeout: timeout,
}
}

var DefaultGeeRegister = New(defaultTimeout)

为该对象编写基本的方法:添加服务实例+返回服务列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (r *GeeRegistry) putServer(addr string) {
r.mu.Lock()
defer r.mu.Unlock()
s := r.servers[addr]
if s == nil {
r.servers[addr] = &ServerItem{Addr: addr, start: time.Now()}
} else {
s.start = time.Now() // if exists, update start time to keep alive
}
}

func (r *GeeRegistry) aliveServers() []string {
r.mu.Lock()
defer r.mu.Unlock()
var alive []string
for addr, s := range r.servers {
if r.timeout == 0 || s.start.Add(r.timeout).After(time.Now()) {
alive = append(alive, addr)
} else {
delete(r.servers, addr)
}
}
sort.Strings(alive)
return alive
}

对接HTTP请求

Get:返回所有可用的服务列表,通过自定义字段 X-Geerpc-Servers 承载。

Post:添加服务实例或发送心跳,通过自定义字段 X-Geerpc-Server 承载。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Runs at /_geerpc_/registry
func (r *GeeRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case "GET":
// keep it simple, server is in req.Header
w.Header().Set("X-Geerpc-Servers", strings.Join(r.aliveServers(), ","))
case "POST":
// keep it simple, server is in req.Header
addr := req.Header.Get("X-Geerpc-Server")
if addr == "" {
w.WriteHeader(http.StatusInternalServerError)
return
}
r.putServer(addr)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
}

// HandleHTTP registers an HTTP handler for GeeRegistry messages on registryPath
func (r *GeeRegistry) HandleHTTP(registryPath string) {
http.Handle(registryPath, r)
log.Println("rpc registry path:", registryPath)
}

func HandleHTTP() {
DefaultGeeRegister.HandleHTTP(defaultPath)
}

脉冲器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//定时向注册中心发送心跳
// Heartbeat send a heartbeat message every once in a while
// it's a helper function for a server to register or send heartbeat
func Heartbeat(registry, addr string, duration time.Duration) {
if duration == 0 {
// make sure there is enough time to send heart beat
// before it's removed from registry
//默认周期为比过期时间少1min
duration = defaultTimeout - time.Duration(1)*time.Minute
}
var err error
err = sendHeartbeat(registry, addr)
go func() {
//golang的周期性定时器,它会按照一个时间间隔往channel发送系统当前时间
t := time.NewTicker(duration)
for err == nil {
//C就是从通道内获取到的时间
<-t.C
err = sendHeartbeat(registry, addr)
}
}()
}

func sendHeartbeat(registry, addr string) error {
log.Println(addr, "send heart beat to registry", registry)
httpClient := &http.Client{}
req, _ := http.NewRequest("POST", registry, nil)
req.Header.Set("X-Geerpc-Server", addr)
if _, err := httpClient.Do(req); err != nil {
log.Println("rpc server: heart beat err:", err)
return err
}
return nil
}

完善服务发现

在上一篇博客中,我们实现了一个简易的服务发现接口

使用MultiServersDiscovery结构体来调用获取server的方法,其中加入了负载均衡的方式来分散热点

今天我们加入注册中心后来进一步完善

在xclient中完善服务发现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package xclient

type GeeRegistryDiscovery struct {
//嵌套了 MultiServersDiscovery,很多能力可以复用
*MultiServersDiscovery
registry string
timeout time.Duration
lastUpdate time.Time
}

const defaultUpdateTimeout = time.Second * 10

func NewGeeRegistryDiscovery(registerAddr string, timeout time.Duration) *GeeRegistryDiscovery {
if timeout == 0 {
timeout = defaultUpdateTimeout
}
d := &GeeRegistryDiscovery{
MultiServersDiscovery: NewMultiServerDiscovery(make([]string, 0)),
registry: registerAddr,
timeout: timeout,
}
return d
}
  1. registry 即注册中心的地址
  2. timeout 服务列表的过期时间
  3. lastUpdate 是代表最后从注册中心更新服务列表的时间,默认 10s 过期,即 10s 之后,需要从注册中心更新新的列表。

重写方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (d *GeeRegistryDiscovery) Update(servers []string) error {
d.mu.Lock()
defer d.mu.Unlock()
d.servers = servers
d.lastUpdate = time.Now()
return nil
}

func (d *GeeRegistryDiscovery) Refresh() error {
d.mu.Lock()
defer d.mu.Unlock()
if d.lastUpdate.Add(d.timeout).After(time.Now()) {
return nil
}
log.Println("rpc registry: refresh servers from registry", d.registry)
resp, err := http.Get(d.registry)
if err != nil {
log.Println("rpc registry refresh err:", err)
return err
}
servers := strings.Split(resp.Header.Get("X-Geerpc-Servers"), ",")
d.servers = make([]string, 0, len(servers))
for _, server := range servers {
if strings.TrimSpace(server) != "" {
d.servers = append(d.servers, strings.TrimSpace(server))
}
}
d.lastUpdate = time.Now()
return nil
}

Get 和 GetAll 与 MultiServersDiscovery

唯一的不同在于,GeeRegistryDiscovery 需要先调用 Refresh 确保服务列表没有过期。

1
2
3
4
5
6
7
8
9
10
11
12
13
func (d *GeeRegistryDiscovery) Get(mode SelectMode) (string, error) {
if err := d.Refresh(); err != nil {
return "", err
}
return d.MultiServersDiscovery.Get(mode)
}

func (d *GeeRegistryDiscovery) GetAll() ([]string, error) {
if err := d.Refresh(); err != nil {
return nil, err
}
return d.MultiServersDiscovery.GetAll()
}

总结

今天完成了注册中心,那么一个RPC框架就算是完成了,虽然代码量很少,但是能够提供最基本的服务

registry是一个结构体,存储了每个服务端的地址和过期时间

对于服务端来说,服务端启动时便需要在注册中心里进行注册,在确定过期时间后便开始为客户端提供服务

当然我们也编写了脉冲器来进行心跳保活,脉冲器会定时为注册中心发送保活HTTP报文来确保该服务未过期

对于客户端来说,我们需要向注册中心询问获取可用的服务名单

DEMO

添加函数 startRegistry

修改 startServer,添加调用注册中心的 Heartbeat 方法的逻辑,定期向注册中心发送心跳保活

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func startRegistry(wg *sync.WaitGroup) {
l, _ := net.Listen("tcp", ":9999")
registry.HandleHTTP()
wg.Done()
_ = http.Serve(l, nil)
}

func startServer(registryAddr string, wg *sync.WaitGroup) {
var foo Foo
l, _ := net.Listen("tcp", ":0")
server := geerpc.NewServer()
_ = server.Register(&foo)
registry.Heartbeat(registryAddr, "tcp@"+l.Addr().String(), 0)
wg.Done()
server.Accept(l)
}

将 call 和 broadcast 的 MultiServersDiscovery 替换为 GeeRegistryDiscovery

前者需要硬编码服务列表,后者不需要

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func call(registry string) {
d := xclient.NewGeeRegistryDiscovery(registry, 0)
xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
defer func() { _ = xc.Close() }()
// send request & receive response
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
foo(xc, context.Background(), "call", "Foo.Sum", &Args{Num1: i, Num2: i * i})
}(i)
}
wg.Wait()
}

func broadcast(registry string) {
d := xclient.NewGeeRegistryDiscovery(registry, 0)
xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
defer func() { _ = xc.Close() }()
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
foo(xc, context.Background(), "broadcast", "Foo.Sum", &Args{Num1: i, Num2: i * i})
// expect 2 - 5 timeout
ctx, _ := context.WithTimeout(context.Background(), time.Second*2)
foo(xc, ctx, "broadcast", "Foo.Sleep", &Args{Num1: i, Num2: i * i})
}(i)
}
wg.Wait()
}

最后在 main 函数中,将所有的逻辑串联起来,确保注册中心启动后,再启动 RPC 服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func main() {
log.SetFlags(0)
registryAddr := "http://localhost:9999/_geerpc_/registry"
var wg sync.WaitGroup
wg.Add(1)
go startRegistry(&wg)
wg.Wait()

time.Sleep(time.Second)
wg.Add(2)
go startServer(registryAddr, &wg)
go startServer(registryAddr, &wg)
wg.Wait()

time.Sleep(time.Second)
call(registryAddr)
broadcast(registryAddr)
}

运行结果


GeeRPC-LastDay
http://example.com/post/GeeRPC-LastDay.html
作者
SamuelZhou
发布于
2023年1月10日
许可协议