Go Etcd 开发简易的注册中心

Golang 2945 字

什么是 ETCD

Etcd 是 CoreOS 基于 Raft 开发的分布式 key-value 存储,可用于服务发现、共享配置以及一致性保障(如数据库选主、分布式锁等)。

在分布式系统中,如何管理节点间的状态一直是一个难题, etcd 像是专门为集群环境的服务发现和注册而设计,它提供了数据 TTL 失效、数据改变监视、多值、目录监听、分布式锁原子操作等功能,可以方便的跟踪并管理集群节点状态。

Etcd 具有以下特性:

  • 键值对存储: 将数据存储在分层组织的目录中,如同在标准文件系统中。
  • 监测变更:监测特定的键或目录进行更改,并对值的更改做出反应
  • 简单:curl可访问的用户API(HTTP + JSON)
  • 安全:可选的 SSL 客户端证书认证
  • 快速:单实例美妙1000次写速度,2000+读操作
  • 可靠:使用Raft算法保证一致性

基于 bitnami/etcd 快速搭建开发环境

BitNami是一个开源项目,该项目产生的开源软件包括安装 Web应用程序和解决方案堆栈,其提供了大量的开箱即用的 Docker 镜像,可以帮助我们快速搭建本地开发环境。

如果你想了解更多关于 bitnami 的信息,你可以访问他们的官方:https://bitnami.com/
$ curl -LO https://raw.githubusercontent.com/bitnami/containers/main/bitnami/etcd/docker-compose.yml

由于 BitNami 提供的 yaml 文件没有开放端口,所以需要进行修改

# 
version: '2'

services:
  etcd:
    image: docker.io/bitnami/etcd:3.5
    environment:
      - ALLOW_NONE_AUTHENTICATION=yes
    volumes:
      - etcd_data:/bitnami/etcd
    ports:
      - 2379:2379
      - 2380:2380

volumes:
  etcd_data:
    driver: local
$ docker-compose up

由于是 Etcd 速学 所以在文章中并不会对 etcd 的运维、集群、原理等问题进行讲解,本文的主要宗旨是快速入门,那么以当前的这个环境就可以开始我们的使用了。

Etcd Key-value 操作快速入门

Etcd 是支持 http 协议的,同时也提供了 etcdctl 命令行工具。它能提供一些简洁的命令,便于进行服务测试和修改数据库内容,而无需基于 HTTP API方式。etcdctl支持的命令大体上分为数据库操作和非数据库操作两类。这些操作跟 HTTP API 基本上是对应的。etcdctl 在两个不同的 etcd 版本下的行为方式也完全不同, 可通过环境变量设置所需要的版本

下面我们来进行新增和获取操作。

$   docker exec -it etcd-etcd-1 etcdctl put /user/101/name maksim
OK
$   docker exec -it etcd-etcd-1 etcdctl put /user/101/age 19
OK

上面的命令代表着使用 put 向 etcd 新增数据,在 http 的语义中代表着新增。

$   docker exec -it etcd-etcd-1 etcdctl get /user/101/name
/user/101/name
maksim

当我们想要获取的数据的时候,其实就是向 etcd 发送 get 请求,来获取key 的值,这里需要注意,如果想要通过 get 来后去 get /usr/101 所有的数据,需要加上参数 --prefix,否则是无法获取到数据的。

$   docker exec -it etcd-etcd-1 etcdctl get /user/101 --prefix
/user/101/age
19
/user/101/name
maksim

在 Go 中使用 etcd

$ go get go.etcd.io/etcd/clientv3

我们可以使用 etcd 官方提供的库来访问 etcd。

package main

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "time"
)

func main() {
    config := clientv3.Config{
        Endpoints:   []string{"127.0.0.1:2379"},
        DialTimeout: 10 * time.Second,
    }

    client, err := clientv3.New(config)
    if err != nil {
        fmt.Errorf(err.Error())
    }
    defer client.Close()

    kv := clientv3.NewKV(client)
    ctx := context.Background()
    getResponse, err := kv.Get(ctx, "/user/101/name")
    if err != nil {
        fmt.Errorf(err.Error())
    }
    fmt.Println(getResponse.Kvs)
}

etcd 的 sdk 使用起来非常简单,首先初始化 clientv3.Config ,由于我们使用的开箱即用的 etcd 所以没有过多配置,直接在 Endpoints 中输入我们的 IP 以及暴露的端口,DialTimeout 代表着超时时间。

下面就是初始化 clientv3 客户端,当我们需要访问 kv 存储时候,调用 clientv3.NewKV 实例化出一个新的 kv,剩下的操作就和我们使用 etcdctl 一样了。

Untitled

租约的使用和基本操作

在 Redis 中有 key 的过期时间,而在 etcd 对于过期的实现与 redis 不同,在 etcd 中 key 的过期利用 Lease(租约)来完成。

在 redis 中,我们可以在 set 缓存时直接指定到期时间,而在 etcd 中,我们首先需要创建 Lease ,然后再将 key 绑定到 Lease 上。

我们可以将 Lease 理解为现实世界中的”租房合同“,当我们的合同到期后我们就需要搬离,该房子继续使用,如果还想要住这个房子,那必须进行续约。

一个 Lease 可以关联若干个 key, 当租约到到期后,或者 Lease 被我们手动删除,key 就会随之过期无法继续使用。

lease 的 CURD

如果想要对某一个 key 设置过期时间,那么需要先创建一个租约,例如要设置一个 20 秒的租约:

$ docker exec -it etcd-etcd-1 etcdctl lease grant 20
lease 694d848998e3ec04 granted with TTL(20s)

etcd 一般还会作为服务发现,服务一旦注册到 etcd 后,如果地址宕机了,我们需要知道该地址是否可用,这个时候我们就可以基于租约来实现,如果在租约之内没有续约,那么就可以认为当前节点出现故障,需要从 etcd 中移除,避免其他服务对齐进行调用。

我们通过 lease timetolive 命令来查看当前租约的剩余时间。

$ docker exec -it etcd-etcd-1 etcdctl lease timetolive 694d848998e3ec04

lease 694d848998e3ec04 already expired

由于编写文章的时间关系,我们最开始创建的租约694d848998e3ec04 已经过期了,下面我们来进行连贯操作,先创建,然后直接查看租约。

$ docker exec -it etcd-etcd-1 etcdctl lease grant 10
lease 694d848998e3ec0d granted with TTL(10s)

$docker exec -it etcd-etcd-1 etcdctl lease timetolive 694d848998e3ec0d
lease 694d848998e3ec0d granted with TTL(10s), remaining(2s)

接下来,我们来通过 lease list 来查看当前 etcd 的租约列表:

# 新建租约
$ docker exec -it etcd-etcd-1 etcdctl lease grant 2000
lease 694d848998e3ec10 granted with TTL(2000s)
$ docker exec -it etcd-etcd-1 etcdctl lease list
found 1 leases
694d848998e3ec10

在上面的命令中我们创建了一个 2000 秒的租约,下面,我们使用 lease revoke 来删除租约。

$ docker exec -it etcd-etcd-1 etcdctl lease revoke 694d848998e3ec10
lease 694d848998e3ec10 revoked
$ docker exec -it etcd-etcd-1 etcdctl lease list
found 0 leases

在上面有描述在服务发现中如何处理异常节点中提到了续约其命令使用的就是 lease keep-alive

# 创建一个 20 秒的租约
$ docker exec -it etcd-etcd-1 etcdctl lease grant 200
lease 694d848998e3ec15 granted with TTL(200s)

# 执行续约
$ docker exec -it etcd-etcd-1 etcdctl lease keep-alive 694d848998e3ec15

lease 694d848998e3ec15 keepalived with TTL(200)

Lease 与 Key 的关联

key 与 lease 的绑定非常简单,只需要我们在创建 key-value 的时候在末尾加上 --lease xxx 即可,其中 xxx 就是我们的租约 id。

#新建租约
$ docker exec -it etcd-etcd-1 etcdctl lease grant 60
lease 694d848998e3ec19 granted with TTL(60s)

# 绑定租约
$ docker exec -it etcd-etcd-1 etcdctl put /user maksim --lease  694d848998e3ec19
OK

# get user
$ docker exec -it etcd-etcd-1 etcdctl get /user
/user
maksim

我们等待 60 秒后再去查看 /user。

$ docker exec -it etcd-etcd-1 etcdctl get /user

$ docker exec -it etcd-etcd-1 etcdctl get /user --prefix
/user/102/name

我们可以看到,我们新建的 /user 已经被删除了,只能查看到上一篇文章的内容。

如果想要查看当前租约下绑定了哪些 key,我们可以使用 lease timetolive xxx --keys 来进行查看。

# 新建租约
$ docker exec -it etcd-etcd-1 etcdctl lease grant 60
lease 694d848998e3ec25 granted with TTL(60s)

# PUT 数据并且绑定租约
$ docker exec -it etcd-etcd-1 etcdctl put /user maksim --lease 694d848998e3ec25
OK

# 查看租约下的 key
$ docker exec -it etcd-etcd-1 etcdctl lease timetolive 694d848998e3ec25 --key
lease 694d848998e3ec25 granted with TTL(60s), remaining(28s), attached keys([/user])

服务注册

在最开始,我们提到过 etcd 的应用场景就包括了服务注册于发现,注册和发现是两个部分,在这里,我们先讲服务注册,服务注册的原理其实很简单,我们可以使用一些开源的库直接来完成,也可以利用手工的方式将节点信息写入到 etcd 当中。

简单注册

如果是点对点的调用方式,那就是客户端直接调用服务端,有了注册中心后是先去访问 etcd 或得到服务端的节点信息,节点信息主要包括:ip地址,端口号。

接下来,我们来实现服务的注册,在 etcd 中我们存储如下数据:

/services/product1/product
/services/users1/userinfo
  • services 代表着服务
  • product1 和 user1 代表节点,后面可能会存在 product2、product3 我们以此来做负载均衡
  • product 和 userinfo 是服务

我们来编写 product 服务

package main

import (
    "github.com/gorilla/mux"
    "net/http"
)

func main() {
    r := mux.NewRouter()
    r.HandleFunc("/product/{id:\\d+}", func(writer http.ResponseWriter, request *http.Request) {
        vars := mux.Vars(request)
        str := "get proudct ById: " + vars["id"]
        writer.Write([]byte(str))
    })
    http.ListenAndServe(":8081", r)
}

Untitled

这里只是简单提供了一个 product 接口,效果如上图。

接下来我们实现 service.go,其中主要包含两个方法

  • NewService: 实例化 service
  • Register:向 etcd 注册服务
package util

import (
    "context"
    "go.etcd.io/etcd/clientv3"
    "log"
    "time"
)

type Service struct {
    client *clientv3.Client
}

func NewService() *Service {
    config := clientv3.Config{
        Endpoints:   []string{"127.0.0.1:2379"},
        DialTimeout: 10 * time.Second,
    }

    client, err := clientv3.New(config)
    if err != nil {
        log.Fatal(err)
    }
    return &Service{client: client}
}

// 注册服务
func (s *Service) Register(id string, name string, address string) error {
    kv := clientv3.NewKV(s.client)
    prefix := "/services/"
    ctx := context.Background()
    _, err := kv.Put(ctx, prefix+id+"/"+name, address)
    return err
}

代码很简单,用到的都是我们上面学到的内容。

  1. 首先创建 client
  2. 创建 kv
  3. 向 etcd 中写入数据

接下来改造 main 函数。

package main

import (
    "etcd/util"
    "github.com/gorilla/mux"
    "net/http"
)

func main() {
    r := mux.NewRouter()
    r.HandleFunc("/product/{id:\\d+}", func(writer http.ResponseWriter, request *http.Request) {
        vars := mux.Vars(request)
        str := "get proudct ById: " + vars["id"]
        writer.Write([]byte(str))
    })
    s := util.NewService()
    s.Register("p1", "productservice", "127.0.0.1:8081")
    http.ListenAndServe(":8081", r)
}

我们先简陋处理,后续再添加协程以及信号监听。

2023-04-04T12:49:35.png

运行后,我们可以看到节点信息已经被写入到 etcd 中了其流程如下:

2023-04-04T12:49:06.png

优雅启动

2023-04-04T12:49:51.png

我们可以看到在 ide 中,我们最后两步目前是报黄了,没有去处理异常。当服务出现异常我们都是需要进行处理的,这里先初步的设计。

在优雅启动中,我们分成了 4 步。

第一步:我们首先为了以后维护方便,将写死的值变成一个变量。

func main() {

    serviceId := "p1"
    serviceName := "productservice"
    serviceAddr := "127.0.0.1"
    servicePort := 8080
    r := mux.NewRouter()
    r.HandleFunc("/product/{id:\\d+}", func(writer http.ResponseWriter, request *http.Request) {
        vars := mux.Vars(request)
        str := "get proudct ById: " + vars["id"]
        writer.Write([]byte(str))
    })
    s := util.NewService()
    s.Register(serviceId, serviceName, serviceAddr+":"+strconv.Itoa(servicePort))
    http.ListenAndServe(":"+strconv.Itoa(servicePort), r)
}

第二步:协程运行

go func() {
        s.Register(serviceId, serviceName, serviceAddr+":"+strconv.Itoa(servicePort))
        http.ListenAndServe(":"+strconv.Itoa(servicePort), r)
}()

第三步:监听信号

go func() {
        sig := make(chan os.Signal)
        signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
}()

第四步:处理错误和信号处理

package main

import (
    "etcd/util"
    "fmt"
    "github.com/gorilla/mux"
    "log"
    "net/http"
    "os"
    "os/signal"
    "strconv"
    "syscall"
)

func main() {

    serviceId := "p1"
    serviceName := "productservice"
    serviceAddr := "127.0.0.1"
    servicePort := 8080
    r := mux.NewRouter()
    r.HandleFunc("/product/{id:\\d+}", func(writer http.ResponseWriter, request *http.Request) {
        vars := mux.Vars(request)
        str := "get proudct ById: " + vars["id"]
        writer.Write([]byte(str))
    })
    s := util.NewService()

    errChan := make(chan error)

    go func() {
        err := http.ListenAndServe(":"+strconv.Itoa(servicePort), r)
        if err != nil {
            errChan <- err
        }
        err = s.Register(serviceId, serviceName, serviceAddr+":"+strconv.Itoa(servicePort))
        if err != nil {
            errChan <- err
        }

    }()

    go func() {
        sig := make(chan os.Signal)
        signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
        errChan <- fmt.Errorf("%s", <-sig)
    }()
    getErr := <-errChan

    log.Fatalln("err:", getErr)
}

最终的代码就是这样的,我们新建了一个 errChan 去处理错误,如果我们开启了数据库连接,redis 连接就可以在这里进行资源回收了。

在这里,我们实现 http 服务的优雅关闭。

func main () {   
  .... 
    serv := &http.Server{
        Addr:              ":"+strconv.Itoa(servicePort),
        Handler:           r,
    }
    go func() {
        err := serv.ListenAndServe()
        if err != nil {
            errChan <- err
        }
        err = s.Register(serviceId, serviceName, serviceAddr+":"+strconv.Itoa(servicePort))
        if err != nil {
            errChan <- err
        }

    }()

    go func() {
        sig := make(chan os.Signal)
        signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
        errChan <- fmt.Errorf("%s", <-sig)
    }()
    getErr := <-errChan
    err := serv.Shutdown(context.Background())
  ...
}

服务关闭时反注册

当我们的服务发生问题异常关闭,或者是接收到信号后需要进行退出操作,这个时候我们需要将该节点的信息从 etcd 中删除。

// UnRegister 反注册服务
func (s *Service) UnRegister(id string) error {
    kv := clientv3.NewKV(s.client)
    prefix := "/services/" + id
    ctx := context.Background()
    _, err := kv.Delete(ctx, prefix, clientv3.WithPrefix())
    return err
}

接下来我们在服务关闭之前调用反注册。

err := s.UnRegister(serviceId)
if err != nil {
        log.Fatalln("err:", err)
}
err = serv.Shutdown(context.Background())

关闭应用程序前:

2023-04-04T12:50:34.png

关闭应用程序后

2023-04-04T12:50:45.png

设置租约

在注册服务中现在创建的数据是不会过期的,但是在生产环境中如果服务假死无法提供服务的时候,在注册中心中我们就需要将其摘掉。

// Register 注册服务
func (s *Service) Register(id string, name string, address string) error {
    kv := clientv3.NewKV(s.client)
    prefix := "/services/"
    ctx := context.Background()
    leaseRes, err := clientv3.NewLease(s.client).Grant(ctx, 20)
    if err != nil {
        return err
    }
    _, err = kv.Put(ctx, prefix+id+"/"+name, address, clientv3.WithLease(leaseRes.ID))
    return err
}

这样一来,当过了 20 秒后还没有续期,就说明当前服务已经出现问题了。

2023-04-04T14:18:19.png
2023-04-04T14:18:28.png

定期续租

设置租约后我们修妖解决续租的问题。

// Register 注册服务
func (s *Service) Register(id string, name string, address string) error {
    kv := clientv3.NewKV(s.client)
    prefix := "/services/"
    ctx := context.Background()
    fmt.Println(prefix)
    lease := clientv3.NewLease(s.client)
    leaseRes, err := lease.Grant(ctx, 20)
    fmt.Println(leaseRes)
    if err != nil {
        return err
    }
    _, err = kv.Put(ctx, prefix+id+"/"+name, address, clientv3.WithLease(leaseRes.ID))
    if err != nil {
        return err
    }
    keepliveRes, err := lease.KeepAlive(ctx, leaseRes.ID)
    if err != nil {
        return err
    }
    go lisKeepAlive(keepliveRes)
    return nil
}

func lisKeepAlive(res <-chan *clientv3.LeaseKeepAliveResponse) {
    for {
        select {
        case ret := <-res:
            if ret != nil {
                //续租成功
                fmt.Println("续租成功")
            }
        }
    }
}

我们每个十秒查看一次 etcd 中的数据。

2023-04-04T14:18:49.png

到这里,我们最基本的服务注册就完成了。

服务发现

获取服务数据

服务发现的原理非常简单,其实就是按照我们的定义好的 key 去取值。我们在这一小节现直接将值取出来。先编写客户端。

package util

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "log"
    "time"
)

type Client struct {
    client *clientv3.Client
}

func NewClient() *Client {
    config := clientv3.Config{
        Endpoints:   []string{"127.0.0.1:2379"},
        DialTimeout: 10 * time.Second,
    }

    client, err := clientv3.New(config)
    if err != nil {
        log.Fatal(err)
    }
    return &Client{client: client}
}

func (c *Client) GetService() {
    kv := clientv3.NewKV(c.client)
    res, _ := kv.Get(context.Background(), "/services", clientv3.WithPrefix())
    for _, item := range res.Kvs {
        fmt.Println(string(item.Key))
    }
}

我们在 GetService 中现将数据打印到控制台。

编写 main 函数。

package main

import "etcd/util"

func main() {
    client := util.NewClient()
    client.GetService()
}

2023-04-04T14:19:12.png

接下来我们继续完善获取,我们先新建一个服务信息结构体。

type ServiceInfo struct {
    ServiceId   string
    ServiceName string
    ServiceAddr string
}

func (s *Client) parseService(key []byte, value []byte) {
    reg := regexp.MustCompile("/services/(\\w+)/(\\w+)")
    if reg.Match(key) {
        idAndName := reg.FindSubmatch(key)
        id := idAndName[0]
        name := idAndName[2]
        s.Services = append(s.Services, &ServiceInfo{ServiceId: string(id), ServiceName: string(name), ServiceAddr: string(value)})
    }
}

然后在 client 中增加 []*Serviceinfo 的切片,因为我们不能每一次都去 etcd 中获取数据,这样会增加 etcd 的压力,所以我们需要将信息缓存到内存中。parseService 函数用来讲字符串解析成结构体。

type Client struct {
    client   *clientv3.Client
    Services []*ServiceInfo
}

最后我们在 GetService 函数中增加转换结构体的逻辑。

func (c *Client) GetService() {
    kv := clientv3.NewKV(c.client)
    res, _ := kv.Get(context.Background(), "/services", clientv3.WithPrefix())
    for _, item := range res.Kvs {
        c.parseService(item.Key, item.Value)
    }
}

简单修改一下 main 函数

func main() {
    client := util.NewClient()
    client.GetService()
    for _, service := range client.Services {
        fmt.Println(service)
    }
}

这个时候我们启动两个 product 服务。

2023-04-04T14:19:34.png
2023-04-04T14:19:41.png

根据服务名获取地址和优雅的调用机制

我们的客户端可以直接使用 httpclient 直接调用我们的服务,但是为了更良好的凤凰钻杆,我们可以先设计一个结构。

type Endpoint func (ctx context.Context, request interface{}) (reqponse interface{}, err error)
type EncodeReqeustFunc func(context.Context, *http.Request, interface{}) error
  • Endpoint 代表端口,其实就是一个如何去执行服务调用的基本函数,最终的调用就是通过它来进行调用的。
  • EncodeRequestFunc 也是一个函数,是用户自定义的,用来决定到底怎么传参,调用具体哪个 path。

我们先来修改 GetService 方法。


func (c *Client) LoadService() {
    kv := clientv3.NewKV(c.client)
    res, _ := kv.Get(context.Background(), "/services", clientv3.WithPrefix())
    for _, item := range res.Kvs {
        c.parseService(item.Key, item.Value)
    }
}

func (c *Client) GetService(name string, method string, encodeFunc EncodeRequestFunc) Endpoint {
    for _, service := range c.Services {
        if service.ServiceName == name {
            return func(ctx context.Context, requestParams interface{}) (req interface{}, err error) {
                httpClient := &http.DefaultClient
                httpRequest, err := http.NewRequest(method, "http://"+service.ServiceAddr, nil)
                err = encodeFunc(ctx, httpRequest, requestParams)
                if err != nil {
                    return nil, err
                }
                res, err := (*httpClient).Do(httpRequest)
                defer res.Body.Close()
                if err != nil {
                    return nil, err
                }
                body, err := ioutil.ReadAll(res.Body)
                if err != nil {
                    return nil, err
                }
                return string(body), nil
            }
        }
    }
    return nil
}

我们将原来的 GetService 修改为 LoadService 用来加载 service。

GetService 其实就是查找服务地址,并将服务地址,如何调用封装成一个闭包,返回给用户,然后用户在需要调用的地方进行调用。

没有看懂代码没关系,我们来看一下main 中是如何调用的

type ProdRequestParams struct {
    ProductId int
}

func main() {
    client := util.NewClient()
    client.LoadService()

    for _, service := range client.Services {
        fmt.Println(service)
    }
    endpoint := client.GetService("productservice", "GET", func(ctx context.Context, request *http.Request, i interface{}) error {
        params := i.(ProdRequestParams)
        request.URL.Path += "/product/" + strconv.Itoa(params.ProductId)
        return nil
    })
    res, err := endpoint(context.Background(), ProdRequestParams{ProductId: 106})
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(res)
}

我们首先定义了一个 ProdRequestParams 来代表请求参数,其中包含饿了一个 ProductId 的字段。

重点是这一段:

    endpoint := client.GetService("productservice", "GET", func(ctx context.Context, request *http.Request, i interface{}) error {
        params := i.(ProdRequestParams)
        request.URL.Path += "/product/" + strconv.Itoa(params.ProductId)
        return nil
    })

这里面定义了请求的参数如何组织,以及请求的地址,并且通过 GetService 获取了一个执行服务的闭包函数。

    res, err := endpoint(context.Background(), ProdRequestParams{ProductId: 106})
    if err != nil {
        log.Fatal(err)
    }

真正发起调用的地方是这里,在前面的几个步骤,其实都是在构建函数,我们执行一下。

➜  etcd go run client-demo.go
&{/services/p2/productservice productservice 127.0.0.1:8082}
get proudct ById: 106
  • Endpoint 代表端口,其实就是一个如何去执行服务调用的基本函数
  • EncodeRequestFunc 也是一个函数,是用户自定义的,用来决定到底怎么传参,调用具体哪个 path

同时注册多个服务到 ETCD

在之前我们注册了一个服务,如果需要负载均衡目前的办法是手动修改 ServiceID,这样的办法其实很 low,需要大量的手工操作,例如:

go run p1.go -name prudctservice -id=p1 -p8081

其中 -id 指定的就是我们的 ServiceID,其实我们可以用 id 生成工具来帮助我们进行生成,这样可以减少手工的操作。

go get -u github.com/satori/go.uuid
serviceId := uuid.NewV4().String()

这样我们看一下 etcd 中的值:

$  docker exec -it etcd-etcd-1 etcdctl get / --prefix
/services/7ade2d76-b681-41cc-9ea1-55dccabbccf4/productservice
127.0.0.1:8082

修改完 uuid 后还需要对 loadService 进行修改。

func (s *Client) parseService(key []byte, value []byte) {
    reg := regexp.MustCompile("/services/(\\w{8}(-\\w{4}){3}-\\w{12})/(\\w+)")
    if reg.Match(key) {
        idAndName := reg.FindSubmatch(key)
        id := idAndName[1]
        name := idAndName[3]
        s.Services = append(s.Services, &ServiceInfo{ServiceId: string(id), ServiceName: string(name), ServiceAddr: string(value)})
    }
}

负载均衡

package util

import (
    "math/rand"
    "time"
)

type LoadBalance struct {
    Servers []*ServiceInfo
}

func NewLoadBalance(services []*ServiceInfo) *LoadBalance {
    return &LoadBalance{Servers: services}
}

func (l *LoadBalance) GetByRand(name string) *ServiceInfo {
    tmp := make([]*ServiceInfo, 0)
    for _, service := range l.Servers {
        if service.ServiceName == name {
            tmp = append(tmp, service)
        }
    }
    if len(tmp) == 0 {
        return nil
    }

    rand.Seed(time.Now().UnixNano())
    max := len(tmp) - 1
    if max == 0 {
        return l.Servers[0]
    }
    i := rand.Intn(max)
    return l.Servers[i]
}

在负载均衡结构体重存储了所有的服务列表,这个服务列表是在获取服务的时候放进去的,然后在需要获取服务的地方 调用 getByRand 来获取服务,这里采用了随机算法,随机从数组中取值。

maksim
Maksim(一笑,吡罗),PHPer,Goper
OωO
开启隐私评论,您的评论仅作者和评论双方可见
评论 ( 1 )
  1. maksim 博主大人


    如果你在运行时候遇到 grpc 报错,可以选择更新 grpc 的依赖。

    ```shell
    go get -u -x google.golang.org/[email protected]
    ```

    2022年11月11日回复