0%

http之grpc之zrpc

[TOC]

概述

zRPC 主要有以下几个模块组成:

  • discov: 服务发现模块,基于 etcd 实现服务发现功能
  • resolver: 服务注册模块,实现了 gRPC 的 resolver.Builder 接口并注册到 gRPC
  • interceptor: 拦截器,对请求和响应进行拦截处理
  • balancer: 负载均衡模块,实现了 p2c 负载均衡算法,并注册到 gRPC
  • client: zRPC 客户端,负责发起请求
  • server: zRPC 服务端,负责处理请求

其中 resolver 和 balancer 模块实现了 gRPC 开放的接口,实现了自定义的 resolver 和 balancer,拦截器模块是整个 zRPC 的功能重点。

resolver 模块

zRPC 中自定义了 resolver 模块,用来实现服务的注册功能。zRPC 底层依赖 gRPC,在 gRPC 中要想自定义 resolver 需要实现 resolver.Builder 接口:

server

当我们启动我们的 zRPC Server 的时候,调用 Start 方法,会向 etcd 中注册对应的服务地址:

1
2
3
4
5
6
7
8
func (ags keepAliveServer) Start(fn RegisterFn) error {
// 注册服务地址
if err := ags.registerEtcd(); err != nil {
return err
}
// 启动服务
return ags.Server.Start(fn)
}

put key value to etcd

1
2
// value 是当前服务的地址
_, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))

client

在 grpc.DialContext 函数中会调用 newCCResolverWrapper 函数

1
2
3
4
5
6
7
8
9
10
11
12
// newCCResolverWrapper uses the resolver.Builder to build a Resolver and
// returns a ccResolverWrapper object which wraps the newly built resolver.
// rb 和 scheme 有关
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
// ....
// build
ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
if err != nil {
return nil, err
}
return ccr, nil
}

build 接口

1
2
3
4
5
6
7
8
9
10
11
// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
// Build creates a new resolver for the given target.
//
// gRPC dial calls Build synchronously, and fails if the returned error is
// not nil.
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
// Scheme returns the scheme supported by this resolver.
// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
Scheme() string
}

resolver 接口

1
2
3
4
5
6
7
8
9
10
11
// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
// ResolveNow will be called by gRPC to try to resolve the target name
// again. It's just a hint, resolver can ignore this if it's not necessary.
//
// It could be called multiple times concurrently.
ResolveNow(ResolveNowOptions)
// Close closes the resolver.
Close()
}

实现接口

当我们启动 zRPC 客户端的时候,在 gRPC 内部会调用我们自定义 resolver 的 Build 方法,zRPC 通过在 Build 方法内调用执行了 resolver.ClientConn 的 UpdateState 方法,该方法会把服务地址注册到 gRPC 客户端内部:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
resolver.Resolver, error) {
hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
return r == EndpointSepChar
})
// 服务发现
sub, err := discov.NewSubscriber(hosts, target.Endpoint)
if err != nil {
return nil, err
}

update := func() {
var addrs []resolver.Address
for _, val := range subset(sub.Values(), subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: val,
})
}
// 向gRPC注册服务地址
cc.UpdateState(resolver.State{
Addresses: addrs,
})
}
// 监听
sub.AddListener(update)
update()
// 返回自定义的resolver.Resolver
return &nopResolver{cc: cc}, nil
}

在 discov 中,通过调用 load 方法从 etcd 中获取指定服务的所有地址:

并通过 watch 监听服务地址的变化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (c *cluster) watch(cli EtcdClient, key string) {
rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
for {
select {
case wresp, ok := <-rch:
if !ok {
logx.Error("etcd monitor chan has been closed")
return
}
if wresp.Canceled {
logx.Error("etcd monitor chan has been canceled")
return
}
if wresp.Err() != nil {
logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
return
}
// 监听变化通知更新
c.handleWatchEvents(key, wresp.Events)
case <-c.done:
return
}
}
}

这部分主要介绍了 zRPC 中是如何自定义的 resolver,以及基于 etcd 的服务发现原理,通过这部分的介绍大家可以了解到 zRPC 内部服务注册发现的原理,源代码比较多只是粗略的从整个流程上进行了分析,如果大家对 zRPC 的源码比较感兴趣可以自行进行学习

balancer 模块

避免过载是负载均衡策略的一个重要指标,好的负载均衡算法能很好的平衡服务端资源。常用的负载均衡算法有轮训、随机、Hash、加权轮训等。但为了应对各种复杂的场景,简单的负载均衡算法往往表现的不够好,比如轮训算法当服务响应时间变长就很容易导致负载不再平衡, 因此 zRPC 中自定义了默认负载均衡算法 P2C(Power of Two Choices),和 resolver 类似,要想自定义 balancer 也需要实现 gRPC 定义的 balancer.Builder 接口,由于和 resolver 类似这里不再带大家一起分析如何自定义 balancer,感兴趣的朋友可以查看 gRPC 相关的文档来进行学习

注意,zRPC 是在客户端进行负载均衡,常见的还有通过 nginx 中间代理的方式

zRPC 框架中默认的负载均衡算法为 P2C,该算法的主要思想是:

  1. 从可用节点列表中做两次随机选择操作,得到节点 A、B
  2. 比较 A、B 两个节点,选出负载最低的节点作为被选中的节点

client

1
2
3
4
5
6
7
8
9
10
func NewClient(target string, opts ...ClientOption) (*client, error) {
var cli client
// WithBalancerName 指定 p2c.Name
opts = append([]ClientOption{WithDialOption(grpc.WithBalancerName(p2c.Name))}, opts...)
if err := cli.dial(target, opts...); err != nil {
return nil, err
}

return &cli, nil
}

pick

主要算法逻辑在 Pick 方法中实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) (
conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
p.lock.Lock()
defer p.lock.Unlock()

var chosen *subConn
switch len(p.conns) {
case 0:
return nil, nil, balancer.ErrNoSubConnAvailable
case 1:
chosen = p.choose(p.conns[0], nil)
case 2:
chosen = p.choose(p.conns[0], p.conns[1])
default:
var node1, node2 *subConn
for i := 0; i < pickTimes; i++ {
a := p.r.Intn(len(p.conns))
b := p.r.Intn(len(p.conns) - 1)
if b >= a {
b++
}
node1 = p.conns[a]
node2 = p.conns[b]
if node1.healthy() && node2.healthy() {
break
}
}

chosen = p.choose(node1, node2)
}

atomic.AddInt64(&chosen.inflight, 1)
atomic.AddInt64(&chosen.requests, 1)
return chosen.conn, p.buildDoneFunc(chosen), nil
}

choose 方法对随机选择出来的节点进行负载比较从而最终确定选择哪个节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (p *p2cPicker) choose(c1, c2 *subConn) *subConn {
start := int64(timex.Now())
if c2 == nil {
atomic.StoreInt64(&c1.pick, start)
return c1
}

if c1.load() > c2.load() {
c1, c2 = c2, c1
}

pick := atomic.LoadInt64(&c2.pick)
if start-pick > forcePick && atomic.CompareAndSwapInt64(&c2.pick, pick, start) {
return c2
} else {
atomic.StoreInt64(&c1.pick, start)
return c1
}
}

参考

企业级 RPC 框架 zRPC