// MultiServersDiscovery is a discovery for multi servers without a registry center // user provides the server addresses explicitly instead type MultiServersDiscovery struct { r *rand.Rand // 随机数实例,使用时间戳作为随机数种子 mu sync.RWMutex // 同步锁保证顺序 servers []string index int// 记录轮询到的位置 }
// NewMultiServerDiscovery creates a MultiServersDiscovery instance funcNewMultiServerDiscovery(servers []string) *MultiServersDiscovery { d := &MultiServersDiscovery{ servers: servers, r: rand.New(rand.NewSource(time.Now().UnixNano())), } //为了避免每次从 0 开始,初始化时随机设定一个值 d.index = d.r.Intn(math.MaxInt32 - 1) return d }
// Update the servers of discovery dynamically if needed func(d *MultiServersDiscovery) Update(servers []string) error { d.mu.Lock() defer d.mu.Unlock() d.servers = servers returnnil }
// 通过负载均衡策略获取一个服务 func(d *MultiServersDiscovery) Get(mode SelectMode) (string, error) { d.mu.Lock() defer d.mu.Unlock() n := len(d.servers) if n == 0 { return"", errors.New("rpc discovery: no available servers") } switch mode { case RandomSelect: return d.servers[d.r.Intn(n)], nil case RoundRobinSelect: s := d.servers[d.index%n] // servers could be updated, so mode n to ensure safety d.index = (d.index + 1) % n return s, nil default: return"", errors.New("rpc discovery: not supported select mode") } }
// returns all servers in discovery func(d *MultiServersDiscovery) GetAll() ([]string, error) { d.mu.RLock() defer d.mu.RUnlock() // return a copy of d.servers servers := make([]string, len(d.servers), len(d.servers)) copy(servers, d.servers) return servers, nil }
func(xc *XClient) Close() error { xc.mu.Lock() defer xc.mu.Unlock() for key, client := range xc.clients { // I have no idea how to deal with error, just ignore it. _ = client.Close() delete(xc.clients, key) } returnnil }
为了尽量地复用已经创建好的 Socket 连接,使用 clients 保存创建成功的 Client 实例,并提供 Close 方法在结束后,关闭已经建立的连接。
// Broadcast invokes the named function for every server registered in discovery func(xc *XClient) Broadcast(ctx context.Context, serviceMethod string, args, reply interface{}) error { servers, err := xc.d.GetAll() if err != nil { return err } var wg sync.WaitGroup var mu sync.Mutex // protect e and replyDone var e error replyDone := reply == nil// if reply is nil, don't need to set value ctx, cancel := context.WithCancel(ctx) //for循环获取所有的server实例,并发起请求 for _, rpcAddr := range servers { wg.Add(1) gofunc(rpcAddr string) { defer wg.Done() var clonedReply interface{} if reply != nil { clonedReply = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface() } err := xc.call(rpcAddr, ctx, serviceMethod, args, clonedReply) mu.Lock() if err != nil && e == nil { e = err cancel() // if any call failed, cancel unfinished calls } if err == nil && !replyDone { reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(clonedReply).Elem()) replyDone = true } mu.Unlock() }(rpcAddr) } wg.Wait() return e }