GeeRPC-day6

负载均衡

假设有多个服务实例,每个实例提供相同的功能,为了提高整个系统的吞吐量,每个实例部署在不同的机器上。

客户端可以选择任意一个实例进行调用,获取想要的结果。

对于 RPC 框架来说,一般可以使用以下几种策略:

  1. 随机选择(Random) - 从服务列表中随机选择一个
  2. 轮询(Round Robin) - 依次调度不同的服务器,每次调度执行 i = (i + 1)%n
  3. 加权轮询(Weight Round Robin) - 在轮询算法的基础上,为每个服务实例设置一个权重,高性能的机器赋予更高的权重,也可以根据服务实例的当前的负载情况做动态的调整,例如考虑最近5分钟部署服务器的 CPU、内存消耗情况。
  4. 哈希/一致性哈希策略 - 依据请求的某些特征,计算一个 hash 值,根据 hash 值将请求发送到对应的机器。一致性 hash 还可以解决服务实例动态添加情况下,调度抖动的问题。参考Redis集群的部署策略

服务发现基础模块

服务发现与通信是解耦的,所以我们新建一个包来构建该模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package xclient

import (
"errors"
"math"
"math/rand"
"sync"
"time"
)
//代表不同的负载均衡策略
type SelectMode int

//为了方便,我们目前只选择随机和轮询的负载均衡策略
const (
RandomSelect SelectMode = iota // select randomly
RoundRobinSelect // select using Robbin algorithm
)
//服务发现的基本接口
type Discovery interface {
Refresh() error // 从注册中心更新服务列表
Update(servers []string) error//手动更新服务列表
Get(mode SelectMode) (string, error)//根据负载均衡策略选择一个服务实例
GetAll() ([]string, error)//返回所有的服务实例
}

我们实现一个不需要注册中心,服务列表由手工维护的服务发现的结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// MultiServersDiscovery is a discovery for multi servers without a registry center
// user provides the server addresses explicitly instead
type MultiServersDiscovery struct {
r *rand.Rand // 随机数实例,使用时间戳作为随机数种子
mu sync.RWMutex // 同步锁保证顺序
servers []string
index int // 记录轮询到的位置
}

// NewMultiServerDiscovery creates a MultiServersDiscovery instance
func NewMultiServerDiscovery(servers []string) *MultiServersDiscovery {
d := &MultiServersDiscovery{
servers: servers,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
//为了避免每次从 0 开始,初始化时随机设定一个值
d.index = d.r.Intn(math.MaxInt32 - 1)
return d
}

实现服务发现接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
var _ Discovery = (*MultiServersDiscovery)(nil)

// 因为今天没有实现注册中心,所以我们不需要该方法
func (d *MultiServersDiscovery) Refresh() error {
return nil
}

// Update the servers of discovery dynamically if needed
func (d *MultiServersDiscovery) Update(servers []string) error {
d.mu.Lock()
defer d.mu.Unlock()
d.servers = servers
return nil
}

// 通过负载均衡策略获取一个服务
func (d *MultiServersDiscovery) Get(mode SelectMode) (string, error) {
d.mu.Lock()
defer d.mu.Unlock()
n := len(d.servers)
if n == 0 {
return "", errors.New("rpc discovery: no available servers")
}
switch mode {
case RandomSelect:
return d.servers[d.r.Intn(n)], nil
case RoundRobinSelect:
s := d.servers[d.index%n] // servers could be updated, so mode n to ensure safety
d.index = (d.index + 1) % n
return s, nil
default:
return "", errors.New("rpc discovery: not supported select mode")
}
}

// returns all servers in discovery
func (d *MultiServersDiscovery) GetAll() ([]string, error) {
d.mu.RLock()
defer d.mu.RUnlock()
// return a copy of d.servers
servers := make([]string, len(d.servers), len(d.servers))
copy(servers, d.servers)
return servers, nil
}

实现支持负载均衡的客户端

创建负载均衡客户端的结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package xclient

import (
"context"
. "geerpc"
"io"
"reflect"
"sync"
)

type XClient struct {
d Discovery
mode SelectMode
opt *Option
mu sync.Mutex // protect following
clients map[string]*Client
}

var _ io.Closer = (*XClient)(nil)
//XClient的构造函数需要传入三个参数:服务发现实例 Discovery、负载均衡模式 SelectMode 以及协议选项 Option。
func NewXClient(d Discovery, mode SelectMode, opt *Option) *XClient {
return &XClient{d: d, mode: mode, opt: opt, clients: make(map[string]*Client)}
}

func (xc *XClient) Close() error {
xc.mu.Lock()
defer xc.mu.Unlock()
for key, client := range xc.clients {
// I have no idea how to deal with error, just ignore it.
_ = client.Close()
delete(xc.clients, key)
}
return nil
}

为了尽量地复用已经创建好的 Socket 连接,使用 clients 保存创建成功的 Client 实例,并提供 Close 方法在结束后,关闭已经建立的连接。

通过负载均衡策略获取client对象以及发起方法调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (xc *XClient) dial(rpcAddr string) (*Client, error) {
xc.mu.Lock()
defer xc.mu.Unlock()
client, ok := xc.clients[rpcAddr]
if ok && !client.IsAvailable() {
_ = client.Close()
delete(xc.clients, rpcAddr)
client = nil
}
if client == nil {
var err error
//昨天引入http而实现的Xdial方法,根据协议来调用不同的dail方法
client, err = XDial(rpcAddr, xc.opt)
if err != nil {
return nil, err
}
xc.clients[rpcAddr] = client
}
return client, nil
}

func (xc *XClient) call(rpcAddr string, ctx context.Context, serviceMethod string, args, reply interface{}) error {
client, err := xc.dial(rpcAddr)
if err != nil {
return err
}
return client.Call(ctx, serviceMethod, args, reply)
}

// Call invokes the named function, waits for it to complete,
// and returns its error status.
// xc will choose a proper server.
func (xc *XClient) Call(ctx context.Context, serviceMethod string, args, reply interface{}) error {
rpcAddr, err := xc.d.Get(xc.mode)
if err != nil {
return err
}
return xc.call(rpcAddr, ctx, serviceMethod, args, reply)
}

我们将复用 Client 的能力封装在方法 dial 中,dial 的处理逻辑是这样的

  1. 检查 xc.clients 是否有缓存的 Client,如果有,检查是否是可用状态,如果可用则返回缓存的 Client,如果不可用,则从缓存中删除。
  2. 如果步骤 1) 没有返回缓存的 Client,则说明需要创建新的 Client,缓存并返回。

添加广播功能

Broadcast 将请求广播到所有的服务实例,如果任意一个实例发生错误,则返回其中一个错误;

如果调用成功,则返回其中一个的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Broadcast invokes the named function for every server registered in discovery
func (xc *XClient) Broadcast(ctx context.Context, serviceMethod string, args, reply interface{}) error {
servers, err := xc.d.GetAll()
if err != nil {
return err
}
var wg sync.WaitGroup
var mu sync.Mutex // protect e and replyDone
var e error
replyDone := reply == nil // if reply is nil, don't need to set value
ctx, cancel := context.WithCancel(ctx)
//for循环获取所有的server实例,并发起请求
for _, rpcAddr := range servers {
wg.Add(1)
go func(rpcAddr string) {
defer wg.Done()
var clonedReply interface{}
if reply != nil {
clonedReply = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()
}
err := xc.call(rpcAddr, ctx, serviceMethod, args, clonedReply)
mu.Lock()
if err != nil && e == nil {
e = err
cancel() // if any call failed, cancel unfinished calls
}
if err == nil && !replyDone {
reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(clonedReply).Elem())
replyDone = true
}
mu.Unlock()
}(rpcAddr)
}
wg.Wait()
return e
}
  1. 为了提升性能,请求是并发的。
  2. 并发情况下需要使用互斥锁保证 error 和 reply 能被正确赋值。
  3. 借助 context.WithCancel 确保有错误发生时,快速失败。

总结

当我们提供多个服务实例的时候,为了保证分布式系统的服务高效提供,就需要设计负载均衡策略来保证热点被均匀分散。今天我们使用了两种策略,一是轮询,这种策略在OS的进程调度和IO多路复用中很常见,就是在一个列表中依据下标轮转循环调度,保证热点被分散。另一种是随机策略,我们使用计算机当前的时间作为随机数种子来随机生成索引。

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"context"
"geerpc"
"geerpc/xclient"
"log"
"net"
"sync"
"time"
)

type Foo int

type Args struct{ Num1, Num2 int }

func (f Foo) Sum(args Args, reply *int) error {
*reply = args.Num1 + args.Num2
return nil
}
//测试广播超时是否可以正确处理
func (f Foo) Sleep(args Args, reply *int) error {
time.Sleep(time.Second * time.Duration(args.Num1))
*reply = args.Num1 + args.Num2
return nil
}

func startServer(addrCh chan string) {
var foo Foo
l, _ := net.Listen("tcp", ":0")
server := geerpc.NewServer()
_ = server.Register(&foo)
addrCh <- l.Addr().String()
server.Accept(l)
}

封装一个方法,根据调用参数选择相应的方法,这里是实现RPC的核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func foo(xc *xclient.XClient, ctx context.Context, typ, serviceMethod string, args *Args) {
var reply int
var err error
switch typ {
case "call":
err = xc.Call(ctx, serviceMethod, args, &reply)
case "broadcast":
err = xc.Broadcast(ctx, serviceMethod, args, &reply)
}
if err != nil {
log.Printf("%s %s error: %v", typ, serviceMethod, err)
} else {
log.Printf("%s %s success: %d + %d = %d", typ, serviceMethod, args.Num1, args.Num2, reply)
}
}

具体的调用逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//调用单个服务实例
func call(addr1, addr2 string) {
d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})
xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
defer func() { _ = xc.Close() }()
// send request & receive response
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
foo(xc, context.Background(), "call", "Foo.Sum", &Args{Num1: i, Num2: i * i})
}(i)
}
wg.Wait()
}
//调用所有服务实例
func broadcast(addr1, addr2 string) {
d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})
xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
defer func() { _ = xc.Close() }()
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
foo(xc, context.Background(), "broadcast", "Foo.Sum", &Args{Num1: i, Num2: i * i})
// expect 2 - 5 timeout
ctx, _ := context.WithTimeout(context.Background(), time.Second*2)
foo(xc, ctx, "broadcast", "Foo.Sleep", &Args{Num1: i, Num2: i * i})
}(i)
}
wg.Wait()
}


func main() {
log.SetFlags(0)
ch1 := make(chan string)
ch2 := make(chan string)
// 启动两个服务实例
go startServer(ch1)
go startServer(ch2)

addr1 := <-ch1
addr2 := <-ch2

time.Sleep(time.Second)
call(addr1, addr2)
broadcast(addr1, addr2)
}

结果如上图,我们发现广播调用是为所有的服务实例发起调用,所以会有10次,其中有三次超时了,得到了正确处理


GeeRPC-day6
http://example.com/post/GeeRPC-day6.html
作者
SamuelZhou
发布于
2022年12月28日
许可协议