2020年1月

基于 MySQL 5.7,这是程序员在囧途的课程笔记,如果需要的话可以关注一下这个网站,有很多 PHP 相关的学习资料,而且很接地气,没有任何复杂概念,博主个人很喜欢。

01 场景案例:实现 select 增加行号

很多时候我们在做一个项目,无论使用的语言是 PHP 还是 Java,不管是为了性能还是业务逻辑,很多时候遇到的瓶颈并不在语言上,而是在数据库上,当然还有一些其他第三方工具,在这里就不进行讲解了,这里的重点是 SQL 思维培养。

首先我们准备一下 SQL。

create table products
(
    p_id   bigint auto_increment comment '产品 id',
    p_name varchar(100)     not null comment '商品名称',
    p_type varchar(50)      not null comment '商品分类,为了演示方便直接使用中文',
    p_view bigint default 0 not null comment '点击量',
    constraint products_pk
        primary key (p_id)
) comment '产品表';

2023-03-22T10:59:48.png

我们现在假设按照点击量进行倒序排列。

select p_name, p_view, p_type
from products
order by p_view desc;

这样就可以按照点击量的顺序取出数据了,接下来我们来显示行号。如果使用过Access 或者 SQl Server 的话它们提供了显示行号的方法,在 MySQL中并没有这样的方法。我们只能够自己实现。

我们首先来引入一个会话变量,我们先来看一下什么是会话变量。

我们可以通过 set 来设置变量:

set @name := 'maksim';
select @name

2023-03-22T11:00:15.png

这样我们就可以在当前绘画中来获取变量了。

这里注意,会话变量只在当前会话有效,当你打开新的会发现获取不到变量的情况。

2023-03-22T11:00:32.png

我们可以利用变量进行累加操作这样就可以实现行号了。

# 设置变量
set @rownum = 0;

# 利用累加实现编号
select p_name, p_view, p_type, @rownum := @rownum + 1
from products
order by p_view desc;

2023-03-22T11:02:18.png

我们不可能每次查询都要重新设置一下@rownum,其实我们可以借助 IFNULL 函数来实现在一条语句中实现获取行号的操作。

  • IFNULL(expr1, expr2) expr1 不为 null,则返回值为 expr1,反之为 expr2
select p_name, p_view, p_type, IFNULL(@rownum := @rownum + 1, @rownum := 1) as rownum
from products
order by p_view desc;

但是现在还有一个问题,当我们再次执行获取数据的语句时候会出现下面的状况。

2023-03-22T11:01:14.png

解决这个问题有两种方案,首先是在每次执行前都重置会话变量,但是这样并没有什么意义。

还有另外一个解决方案,我们可以在关联表后面增加一条语句,对@rownum 进行初始化。

select p_name, p_view, p_type,IFNULL(@rownum := @rownum + 1, @rownum := 1) as rownum
from products,
     (select @rownum := 0) b
order by p_view desc;

这样我们每次执行的结果就都是正确的了。

02 场景案例:分组后在分组内排序、每个分组中取前N条

这个需求在我们做项目的时候经常会碰到,有的时候 SQL 写不出来就会用 ORM 框架反复折腾,这个需求需要配合我们的 SQL 来进行完成。在一些复杂系统中仅仅会一些 ORM 框架是没有用的。

我们依旧使用上节中的数据结构,首先我们取出一个分组后排序的数据。

如果显示 group by 错误,无法显示p_view,可以参考下方链接

一文带你了解mysql sql model的only_full_group_by模式含Error 1055问题分析_ShenLiang2025的博客-CSDN博客

select p_type, p_name, p_view
from products
group by p_type, p_name
order by p_view desc ;

2023-03-22T11:05:10.png

我们可以看到为了满足 p_view的倒排序,列表中类型出现了交叉,分组就没有了意义。为了让类型不交叉,我们只需要在增加 p_type 的排序就可以了。

2023-03-22T11:05:39.png

接下来,我们对其增加序号。

select IF(@back = p_type, @rownum := @rownum + 1, @rownum := 1) as row_num,
       p_type,
       p_name,
       p_view,

       @back := p_type
from (select p_type, p_name, p_view
      from products
      group by p_type, p_name
      order by p_type, p_view desc) a,
     (select @rownum := 0, @bak := '') b;

2023-03-22T11:06:18.png

到这里,你是不是就发现了什么,对,没错,其实这就是我们常见的排行榜。

接下里我们增加单独去两行的操作。

select row_num, p_type, p_name, p_view
from (select IF(@back = p_type, @rownum := @rownum + 1, @rownum := 1) as row_num,
             p_type,
             p_name,
             p_view,

             @back := p_type
      from (select p_type, p_name, p_view
            from products
            group by p_type, p_name
            order by p_type, p_view desc) a,
           (select @rownum := 0, @bak := '') b) c
where c.row_num <= 2

2023-03-22T11:06:56.png

03 场景案例:纯 SQL 也能实现辅助决策统计

如果你只会使用 ORM,那么很多复杂的功能根本做不了,今天我们来实现一个纯 SQL 计算商品的重要度,是否需要进行补货。

很多人认为我们开发 web 只需要增删改查就好了,但是其实如果我们要去做电商平台,或者后台系统,最常见的需求就是统计分析,仅仅是显示商品列表,编辑一下,就很低级。

在 products 表中有点击量这个字段,如果我们工具点击量来依次为依据来判断他为热销商品,那肯定是不对的,因为我们还要去看销售量,在这里我们需要增加一张 products_sales表,来代表销售。

在这里有一个细节,加入我们的商品有那么些,我们的销量就只有几个,所以说并不是所有的商品都有销量。我们不能光靠点击量和销售量当个为依据来进行辅助决策。所以在做后台应用的时候要复杂很多。

2023-03-22T11:08:04.png

2023-03-22T11:08:34.png

我们通过点击量和销售量来做一个评分,然后根据评分辅助决策。

现在我们写一个 SQL,根据分类显示出商品的名称、点击量和销售量情况。没有销售的设置为 0.

select p_type, a.p_name, a.p_view, IFNULL(b.p_sales, 0) as sales
from products as a
         left join products_sales b on a.p_id = b.p_id
group by a.p_type, a.p_name
order by a.p_type desc, a.p_view desc;

2023-03-22T11:08:52.png

接下来,我们来计算分类的平均销量。

select p_type, sum(sales) / count(*)
from (select p_type, a.p_name, a.p_view, IFNULL(b.p_sales, 0) as sales
      from products as a
               left join products_sales b on a.p_id = b.p_id
      group by a.p_type, a.p_name
      order by a.p_type desc, a.p_view desc) a
group by p_type

2023-03-22T11:09:03.png

如果需要控制小数点,我们可以增加 round。

select p_type, round(sum(sales) / count(*), 0)
from (select p_type, a.p_name, a.p_view, IFNULL(b.p_sales, 0) as sales
      from products as a
               left join products_sales b on a.p_id = b.p_id
      group by a.p_type, a.p_name
      order by a.p_type desc, a.p_view desc) a
group by p_type

2023-03-22T11:09:16.png

其实这个地方还有问题,那就是我们需要剔除那些没有销量的商品。

select p_type, round(sum(sales) / count(*), 0) as sales_avg
from (select p_type, a.p_name, a.p_view, IFNULL(b.p_sales, 0) as sales
      from products as a
               left join products_sales b on a.p_id = b.p_id
      group by a.p_type, a.p_name
      order by a.p_type desc, a.p_view desc) a
where a.sales > 0
group by p_type

2023-03-22T11:09:27.png

接下来我们计算点击量的平均值。

select p_type, round(sum(p_view) / count(*)) as view_avg
from products
group by p_type

2023-03-22T11:09:55.png

再之后我们将两个 SQL 合并

select a.p_type, p_name, p_view, view_avg, sales, sales_avg
from (select p_type, a.p_name, a.p_view, IFNULL(b.p_sales, 0) as sales
      from products as a
               left join products_sales b on a.p_id = b.p_id
      group by a.p_type, a.p_name
      order by a.p_type desc, a.p_view desc) a,
     (select p_type, round(sum(p_view) / count(*)) as view_avg
      from products
      group by p_type) b,
     (select p_type, round(sum(sales) / count(*), 0) as sales_avg
      from (select p_type, a.p_name, a.p_view, IFNULL(b.p_sales, 0) as sales
            from products as a
                     left join products_sales b on a.p_id = b.p_id
            group by a.p_type, a.p_name
            order by a.p_type desc, a.p_view desc) a
      where a.sales > 0
      group by p_type) c
where a.p_type = b.p_type
  and a.p_type = c.p_type

既然数据我们都有了,就可以进行下一步计算评分了。

2023-03-22T11:10:04.png

我们通过点击量/点击量平均值+销量/销量平均值。

select a.p_type, p_name, p_view / view_avg, sales / sales_avg
from (select p_type, a.p_name, a.p_view, IFNULL(b.p_sales, 0) as sales
      from products as a
               left join products_sales b on a.p_id = b.p_id
      group by a.p_type, a.p_name
      order by a.p_type desc, a.p_view desc) a,
     (select p_type, round(sum(p_view) / count(*)) as view_avg
      from products
      group by p_type) b,
     (select p_type, round(sum(sales) / count(*), 0) as sales_avg
      from (select p_type, a.p_name, a.p_view, IFNULL(b.p_sales, 0) as sales
            from products as a
                     left join products_sales b on a.p_id = b.p_id
            group by a.p_type, a.p_name
            order by a.p_type desc, a.p_view desc) a
      where a.sales > 0
      group by p_type) c
where a.p_type = b.p_type
  and a.p_type = c.p_type

2023-03-22T11:10:25.png

select a.p_type, p_name, (p_view / view_avg + sales / sales_avg)
from (select p_type, a.p_name, a.p_view, IFNULL(b.p_sales, 0) as sales
      from products as a
               left join products_sales b on a.p_id = b.p_id
      group by a.p_type, a.p_name
      order by a.p_type desc, a.p_view desc) a,
     (select p_type, round(sum(p_view) / count(*)) as view_avg
      from products
      group by p_type) b,
     (select p_type, round(sum(sales) / count(*), 0) as sales_avg
      from (select p_type, a.p_name, a.p_view, IFNULL(b.p_sales, 0) as sales
            from products as a
                     left join products_sales b on a.p_id = b.p_id
            group by a.p_type, a.p_name
            order by a.p_type desc, a.p_view desc) a
      where a.sales > 0
      group by p_type) c
where a.p_type = b.p_type
  and a.p_type = c.p_type

这个时候问题出现了,明明是五花肉卖出去的更多,但是猪肉一个没卖出去,平分反倒更高了。所以我们需要对算法增加系数,也就是权重。

2023-03-22T11:10:39.png

select a.p_type, p_name, (p_view / view_avg) * 0.2 + (sales / sales_avg * 0.8)
from (select p_type, a.p_name, a.p_view, IFNULL(b.p_sales, 0) as sales
      from products as a
               left join products_sales b on a.p_id = b.p_id
      group by a.p_type, a.p_name
      order by a.p_type desc, a.p_view desc) a,
     (select p_type, round(sum(p_view) / count(*)) as view_avg
      from products
      group by p_type) b,
     (select p_type, round(sum(sales) / count(*), 0) as sales_avg
      from (select p_type, a.p_name, a.p_view, IFNULL(b.p_sales, 0) as sales
            from products as a
                     left join products_sales b on a.p_id = b.p_id
            group by a.p_type, a.p_name
            order by a.p_type desc, a.p_view desc) a
      where a.sales > 0
      group by p_type) c
where a.p_type = b.p_type
  and a.p_type = c.p_type

2023-03-22T11:11:59.png

这样,运营人员拿到商品就可以分析出哪些商品是需要赶紧补货的。

套接字地址格式

在使用套接字时,首先要解决通信双方寻址的问题。我们需要套接字的地址建立连接,就像打电话时首先需要查找电话簿,找到你想要联系的那个人,你才可以建立连接,开始交流。接下来,我们重点讨论套接字的地址格式。

通用套接字地址格式

/* POSIX.1g 规范规定了地址族为 2 字节的值.  */
typedef unsigned short int sa_family_t;
/* 描述通用套接字地址  */
struct sockaddr{
    sa_family_t sa_family;  /* 地址族.  16-bit*/
    char sa_data[14];   /* 具体的地址值 112-bit */
}; 

在这个结构体里,第一个字段是地址族,它表示使用什么样的方式对地址进行解释和保存,好比电话簿里的手机格式,或者是固话格式,这两种格式的长度和含义都是不同的。地址族在 glibc 里的定义非常多,常用的有以下几种:

  • AF_LOCAL:表示的是本地地址,对应的是 Unix 套接字,这种情况一般用于本地 socket 通信,很多情况下也可以写成 AF_UNIX、AF_FILE;
  • AF_INET:因特网使用的 IPv4 地址;
  • AF_INET6:因特网使用的 IPv6 地址。

这里的 AF_ 表示的含义是 Address Family,但是很多情况下,我们也会看到以 PF_ 表示的宏,比如 PF_INET、PF_INET6 等,实际上 PF_ 的意思是 Protocol Family,也就是协议族的意思。我们用 AF_xxx 这样的值来初始化 socket 地址,用 PF_xxx 这样的值来初始化 socket。我们在 <sys/socket.h> 头文件中可以清晰地看到,这两个值本身就是一一对应的。

/* 各种地址族的宏定义  */
#define AF_UNSPEC PF_UNSPEC
#define AF_LOCAL  PF_LOCAL
#define AF_UNIX   PF_UNIX
#define AF_FILE   PF_FILE
#define AF_INET   PF_INET
#define AF_AX25   PF_AX25
#define AF_IPX    PF_IPX
#define AF_APPLETALK  PF_APPLETALK
#define AF_NETROM PF_NETROM
#define AF_BRIDGE PF_BRIDGE
#define AF_ATMPVC PF_ATMPVC
#define AF_X25    PF_X25
#define AF_INET6  PF_INET6

sockaddr 是一个通用的地址结构,通用的意思是适用于多种地址族。为什么定义这么一个通用地址结构呢,这个放在后面讲。

IPv4 套接字格式地址

接下来,看一下常用的 IPv4 地址族的结构:

/* IPV4 套接字地址,32bit 值.  */
typedef uint32_t in_addr_t;
struct in_addr
  {
    in_addr_t s_addr;
  };
  
/* 描述 IPV4 的套接字地址格式  */
struct sockaddr_in
  {
    sa_family_t sin_family; /* 16-bit */
    in_port_t sin_port;     /* 端口口  16-bit*/
    struct in_addr sin_addr;    /* Internet address. 32-bit */
 
 
    /* 这里仅仅用作占位符,不做实际用处  */
    unsigned char sin_zero[8];
  };

我们对这个结构体稍作解读,首先可以发现和 sockaddr 一样,都有一个 16-bit 的 sin_family 字段,对于 IPv4 来说这个值就是 AF_INET。

接下来是端口号,我们可以看到端口号最多是 16-bit,也就是说最大支持 2 的 16 次方,这个数字是 65536,所以我们应该知道支持寻址的端口号最多就是 65535。关于端口,我在前面的章节也提到过,这里重点阐述一下保留端口。所谓保留端口就是大家约定俗成的,已经被对应服务广为使用的端口,比如 ftp 的 21 端口,ssh 的 22 端口,http 的 80 端口等。一般而言,大于 5000 的端口可以作为我们自己应用程序的端口使用。

下面是 glibc 定义的保留端口。

/* Standard well-known ports.  */
enum
  {
    IPPORT_ECHO = 7,    /* Echo service.  */
    IPPORT_DISCARD = 9,   /* Discard transmissions service.  */
    IPPORT_SYSTAT = 11,   /* System status service.  */
    IPPORT_DAYTIME = 13,  /* Time of day service.  */
    IPPORT_NETSTAT = 15,  /* Network status service.  */
    IPPORT_FTP = 21,    /* File Transfer Protocol.  */
    IPPORT_TELNET = 23,   /* Telnet protocol.  */
    IPPORT_SMTP = 25,   /* Simple Mail Transfer Protocol.  */
    IPPORT_TIMESERVER = 37, /* Timeserver service.  */
    IPPORT_NAMESERVER = 42, /* Domain Name Service.  */
    IPPORT_WHOIS = 43,    /* Internet Whois service.  */
    IPPORT_MTP = 57,
 
 
 
 
    IPPORT_TFTP = 69,   /* Trivial File Transfer Protocol.  */
    IPPORT_RJE = 77,
    IPPORT_FINGER = 79,   /* Finger service.  */
    IPPORT_TTYLINK = 87,
    IPPORT_SUPDUP = 95,   /* SUPDUP protocol.  */
 
 
    IPPORT_EXECSERVER = 512,  /* execd service.  */
    IPPORT_LOGINSERVER = 513, /* rlogind service.  */
    IPPORT_CMDSERVER = 514,
    IPPORT_EFSSERVER = 520,
 
 
    /* UDP ports.  */
    IPPORT_BIFFUDP = 512,
    IPPORT_WHOSERVER = 513,
    IPPORT_ROUTESERVER = 520,
 
 
    /* Ports less than this value are reserved for privileged processes.  */
    IPPORT_RESERVED = 1024,
 
 
    /* Ports greater this value are reserved for (non-privileged) servers.  */
    IPPORT_USERRESERVED = 5000

实际的 IPv4 地址是一个 32-bit 的字段,可以想象最多支持的地址数就是 2 的 32 次方,大约是 42 亿,应该说这个数字在设计之初还是非常巨大的,无奈互联网蓬勃发展,全球接入的设备越来越多,这个数字渐渐显得不太够用了,于是大家所熟知的 IPv6 就隆重登场了。

IPv6 套接字地址格式
我们再看看 IPv6 的地址结构:

struct sockaddr_in6
  {
    sa_family_t sin6_family; /* 16-bit */
    in_port_t sin6_port;  /* 传输端口号 # 16-bit */
    uint32_t sin6_flowinfo; /* IPv6 流控信息 32-bit*/
    struct in6_addr sin6_addr;  /* IPv6 地址 128-bit */
    uint32_t sin6_scope_id; /* IPv6 域 ID 32-bit */
  };
  

整个结构体长度是 28 个字节,其中流控信息和域 IP 先不用管,这两个字段,一个在 glibc 的官网上根本没出现,另一个是当前未使用的字段。这里的地址族显然应该是 AF_INET6,端口同 IPv4 地址一样,关键的地址从 32 位升级到 128 位,这个数字就大到恐怖了,完全解决了寻址数字不够的问题。

请注意,以上无论 IPv4 还是 IPv6 的地址格式都是因特网套接字的格式,还有一种本地套接字格式,用来做为本地进程间的通信, 也就是前面提到的 AF_LOCAL。

struct sockaddr_un {
    unsigned short sun_family; /* 固定为 AF_LOCAL */
    char sun_path[108];   /* 路径名 */
};

几种套接字地址格式比较

这几种地址的比较见下图,IPv4 和 IPv6 套接字地址结构的长度是固定的,而本地地址结构的长度是可变的。

在这篇文章中,我们将介绍利用 Confid 配合 Etcd 来完成建议配置中心。

随着服务越来越多每个服务都是一个独立的项目,在项目中可能会涉及到数据库和一些配置,如果这些配置一旦发生改变了那我们就可能到服务器上进行修改,然后重启服务,如果服务只有一两个,那么没有必要使用配置中心。

如果服务有几十个,甚至上百个,我们就需要有一个配置中心来帮助我们管理配置文件,配置中心产生的原因,其实就是随着服务节点的增多而出现的,为了增加我们的可维护性专门有一个配置中心,统一配置。

能够完成这项工作的软件有很多,Apollo 等等,可视化肯定要比自己手工写配置中心要好一些,如果想要使用轻量级的,完全可以利用 etcd 来完成。

confd 从 etcd 中拉取数据

confd 是一款高可用统一配置管理工具,源代码是用 Go 编写的,其地址如下:

https://github.com/kelseyhightower/confd

它可以监听 etcd 中的内容,然后根据模板来生成配置文件,在这里我们使用 docker 来构建 confd,我们现将源代码下载到本地。

https://codeload.github.com/kelseyhightower/confd/tar.gz/refs/tags/v0.16.0

,下载完成后解压到工程目录下。

FROM golang:1.12-alpine as confd
ADD ./confd-0.16.0/ /go/src/github.com/kelseyhightower/confd
RUN export CGO_ENABLED=0 && \
    sed -i 's/dl-cdn.alpinelinux.org/mirrors.ustc.edu.cn/g' /etc/apk/repositories && \
    apk update && \
    apk add --no-cache bzip2 make && \
    cd /go/src/github.com/kelseyhightower/confd && \
    go install github.com/kelseyhightower/confd
ENTRYPOINT ["/go/bin/confd"]

构建镜像:

docker build -t confd:my .

我们在 etcd 里面插入两个值

$ docker exec -it etcd-etcd-1 etcdctl put /config_center/mysql/user root
$ docker exec -it etcd-etcd-1 etcdctl put /config_center/mysql/pass 123123

接下来我们利用 confd 生成配置文件,首先在本地创建一个文件夹叫做 confdfiles ,里面分别创建两个文件夹 conf.d和 templates

$ mkdir -p confdfiles/{conf.d,templates,dest}

其中 conf.d 存放配置文件,templates 方模板文件。

先编写 confd 的配置文件

[template]
src = "config.conf.tmpl"
dest = "/etc/confd/dest/config.conf"
keys = [
    "/config_center/mysql/user",
    "/config_center/mysql/root"
]
  • src 代表模板地址
  • dest 代表获取配置后生成到哪里
  • keys 代表要从 etcd 读取的 key

编写模板文件,conf.d 会根据模板文件生成最终的配置文件。

[config]
database_user = {{getv "/config_center/mysql/user"}}
database_pass = {{getv "/config_center/mysql/pass"}}

其中的 getv 代表着从 key 中取值。

注意:在编写配置文件的时候一定要注意,不能写错!

我们来试一下启动容器获取配置

$ docker run -it --rm --name confd \
 -v $PWD/confdfiles:/etc/confd confd:my \
 -onetime -backend etcdv3 -node http://192.168.124.4:2379
[config]
database_user = root
database_pass = 123123

http 服务读取配置文件

我们先编写一个简单的 httpserver,其主要功能是展示配置文件中的db_user和db_pass。

package main

import (
    "flag"
    "gopkg.in/ini.v1"
    "log"
    "net/http"
    "strconv"
)

func main() {
    port := flag.Int("p", 0, "服务端口")
    flag.Parse()
    if *port == 0 {
        log.Fatalln("请指定端口")
    }

    cfg, err := ini.Load("my.ini")
    if err != nil {
        log.Fatalln(err)
    }
    http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
        dbUser := cfg.Section("db").Key("db_user").Value()
        dbPass := cfg.Section("db").Key("db_pass").Value()
        writer.Write([]byte("<h1>" + dbUser + "</h1>"))
        writer.Write([]byte("<h1>" + dbPass + "</h1>"))
    })
    http.ListenAndServe(":"+strconv.Itoa(*port), nil)

}

我们修改之前的confd 的配置文件

[template]
src = "config.ini.tmpl"
dest = "/etc/confd/dest/config.ini"
keys = [
    "/config_center/mysql/user",
    "/config_center/mysql/pass"
]

这里需要注意,我们变更里配置,对应的文件名也需要进行改变 config.ini.tmpl

[db]
db_user = {{getv "/config_center/mysql/user"}}
db_pass = {{getv "/config_center/mysql/pass"}}

启动应用

$ go run main.go -p 2000 -c ./confdfiles/dest/config.ini

我们可以看到在浏览器中已经展示出了当前的值!

当配置发生变化后的简单重载策略,监听

在上面我们使用 -interval 来设置定期扫描,etcd 是支持 watch 的,我们只需要使用过 -watch 参数即可开启监听功能。

 $ docker run -it -d --rm --name confd \
 -v $PWD/confdfiles:/etc/confd confd:my \
 -watch -backend etcdv3 -node http://192.168.124.4:2379

在 confd 中提供了一个功能,当配置发生变化后可以出发 hook——reload_cmd 配置。

[template]
src = "config.ini.tmpl"
dest = "/etc/confd/dest/config.ini"
keys = [
    "/config_center/mysql/user",
    "/config_center/mysql/pass"
]
reload_cmd = "curl http://192.168.124.4:20001/reload"

我在应用中开放一个接口用来更新配置。

http.HandleFunc("/reload", func(writer http.ResponseWriter, request *http.Request) {
        newCfg, err := ini.Load(*config)
        if err != nil {
            log.Fatalln(err)
        }
        cfg = newCfg
    })

这个接口用来重新加载配置文件,并且将新的配置复制给老的配置。这种方式很简单,但是如果服务比较多那就会比较复杂了。

当我们修改配置文件后,容器会异常退出,这是因为在当前容器中并没有 curl 命令,所以我们需要修改 Dockerfile 并重新 build 镜像。

FROM golang:1.12-alpine as confd
ADD ./confd-0.16.0/ /go/src/github.com/kelseyhightower/confd
RUN export CGO_ENABLED=0 && \
    sed -i 's/dl-cdn.alpinelinux.org/mirrors.ustc.edu.cn/g' /etc/apk/repositories && \
    apk update && \
    apk add --no-cache bzip2 make curl && \
    cd /go/src/github.com/kelseyhightower/confd && \
    go install github.com/kelseyhightower/confd
ENTRYPOINT ["/go/bin/confd"]
docker build confd:v2 -t .
 $ docker run -it -d --rm --name confd \
 -v $PWD/confdfiles:/etc/confd confd:v2 \
 -watch -backend etcdv3 -node http://192.168.124.4:2379

当配置被更新后就会重新加载配置了。

线上千万别用这种方式!!!!!

监控文件变化

上面我们用 confd 监听变化出发重新加载配置文件,在这里我们让服务自己监控文件,如果文件发生了变化,进行重启服务。

最简单的方案是使用 md5,如果上一次的文件 md5和现在的 MD5不一致则说明文件被更改了,就出发更新操作.

func getFileMd5(path string) (string, error) {
    file, err := os.Open(path)
    if err != nil {
        return "", err
    }
    hash := md5.New()
    if _, err = io.Copy(hash, file); err != nil {
        return "", err
    }
    hashInBytes := hash.Sum(nil)[:16]
    return hex.EncodeToString(hashInBytes), nil
}

这个函数就是去获取 md5值的。

go func() {
        fileMd5, err := getFileMd5(*config)
        if err != nil {
            log.Println(err)
            return
        }
        for {
            newMd5, err := getFileMd5(*config)
            if err != nil {
                log.Println(err)
                break
            }
            if strings.Compare(newMd5, fileMd5) != 0 {
                fileMd5 = newMd5
                fmt.Println("文件变更了!")

                # 重启服务....
            }
            time.Sleep(time.Second * 2)
        }
    }()

我们开启一个协程去不断的监听配置的变化,如果发生变化了,就开启平滑重启,逻辑很麻烦,我们利用第三方的库来实现。

有兴趣的可以去看一下这个库:github.com/jpillora/overseer

什么是 ETCD

Etcd 是 CoreOS 基于 Raft 开发的分布式 key-value 存储,可用于服务发现、共享配置以及一致性保障(如数据库选主、分布式锁等)。

在分布式系统中,如何管理节点间的状态一直是一个难题, etcd 像是专门为集群环境的服务发现和注册而设计,它提供了数据 TTL 失效、数据改变监视、多值、目录监听、分布式锁原子操作等功能,可以方便的跟踪并管理集群节点状态。

Etcd 具有以下特性:

  • 键值对存储: 将数据存储在分层组织的目录中,如同在标准文件系统中。
  • 监测变更:监测特定的键或目录进行更改,并对值的更改做出反应
  • 简单:curl可访问的用户API(HTTP + JSON)
  • 安全:可选的 SSL 客户端证书认证
  • 快速:单实例美妙1000次写速度,2000+读操作
  • 可靠:使用Raft算法保证一致性

基于 bitnami/etcd 快速搭建开发环境

BitNami是一个开源项目,该项目产生的开源软件包括安装 Web应用程序和解决方案堆栈,其提供了大量的开箱即用的 Docker 镜像,可以帮助我们快速搭建本地开发环境。

如果你想了解更多关于 bitnami 的信息,你可以访问他们的官方:https://bitnami.com/
$ curl -LO https://raw.githubusercontent.com/bitnami/containers/main/bitnami/etcd/docker-compose.yml

由于 BitNami 提供的 yaml 文件没有开放端口,所以需要进行修改

# 
version: '2'

services:
  etcd:
    image: docker.io/bitnami/etcd:3.5
    environment:
      - ALLOW_NONE_AUTHENTICATION=yes
    volumes:
      - etcd_data:/bitnami/etcd
    ports:
      - 2379:2379
      - 2380:2380

volumes:
  etcd_data:
    driver: local
$ docker-compose up

由于是 Etcd 速学 所以在文章中并不会对 etcd 的运维、集群、原理等问题进行讲解,本文的主要宗旨是快速入门,那么以当前的这个环境就可以开始我们的使用了。

Etcd Key-value 操作快速入门

Etcd 是支持 http 协议的,同时也提供了 etcdctl 命令行工具。它能提供一些简洁的命令,便于进行服务测试和修改数据库内容,而无需基于 HTTP API方式。etcdctl支持的命令大体上分为数据库操作和非数据库操作两类。这些操作跟 HTTP API 基本上是对应的。etcdctl 在两个不同的 etcd 版本下的行为方式也完全不同, 可通过环境变量设置所需要的版本

下面我们来进行新增和获取操作。

$   docker exec -it etcd-etcd-1 etcdctl put /user/101/name maksim
OK
$   docker exec -it etcd-etcd-1 etcdctl put /user/101/age 19
OK

上面的命令代表着使用 put 向 etcd 新增数据,在 http 的语义中代表着新增。

$   docker exec -it etcd-etcd-1 etcdctl get /user/101/name
/user/101/name
maksim

当我们想要获取的数据的时候,其实就是向 etcd 发送 get 请求,来获取key 的值,这里需要注意,如果想要通过 get 来后去 get /usr/101 所有的数据,需要加上参数 --prefix,否则是无法获取到数据的。

$   docker exec -it etcd-etcd-1 etcdctl get /user/101 --prefix
/user/101/age
19
/user/101/name
maksim

在 Go 中使用 etcd

$ go get go.etcd.io/etcd/clientv3

我们可以使用 etcd 官方提供的库来访问 etcd。

package main

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "time"
)

func main() {
    config := clientv3.Config{
        Endpoints:   []string{"127.0.0.1:2379"},
        DialTimeout: 10 * time.Second,
    }

    client, err := clientv3.New(config)
    if err != nil {
        fmt.Errorf(err.Error())
    }
    defer client.Close()

    kv := clientv3.NewKV(client)
    ctx := context.Background()
    getResponse, err := kv.Get(ctx, "/user/101/name")
    if err != nil {
        fmt.Errorf(err.Error())
    }
    fmt.Println(getResponse.Kvs)
}

etcd 的 sdk 使用起来非常简单,首先初始化 clientv3.Config ,由于我们使用的开箱即用的 etcd 所以没有过多配置,直接在 Endpoints 中输入我们的 IP 以及暴露的端口,DialTimeout 代表着超时时间。

下面就是初始化 clientv3 客户端,当我们需要访问 kv 存储时候,调用 clientv3.NewKV 实例化出一个新的 kv,剩下的操作就和我们使用 etcdctl 一样了。

Untitled

租约的使用和基本操作

在 Redis 中有 key 的过期时间,而在 etcd 对于过期的实现与 redis 不同,在 etcd 中 key 的过期利用 Lease(租约)来完成。

在 redis 中,我们可以在 set 缓存时直接指定到期时间,而在 etcd 中,我们首先需要创建 Lease ,然后再将 key 绑定到 Lease 上。

我们可以将 Lease 理解为现实世界中的”租房合同“,当我们的合同到期后我们就需要搬离,该房子继续使用,如果还想要住这个房子,那必须进行续约。

一个 Lease 可以关联若干个 key, 当租约到到期后,或者 Lease 被我们手动删除,key 就会随之过期无法继续使用。

lease 的 CURD

如果想要对某一个 key 设置过期时间,那么需要先创建一个租约,例如要设置一个 20 秒的租约:

$ docker exec -it etcd-etcd-1 etcdctl lease grant 20
lease 694d848998e3ec04 granted with TTL(20s)

etcd 一般还会作为服务发现,服务一旦注册到 etcd 后,如果地址宕机了,我们需要知道该地址是否可用,这个时候我们就可以基于租约来实现,如果在租约之内没有续约,那么就可以认为当前节点出现故障,需要从 etcd 中移除,避免其他服务对齐进行调用。

我们通过 lease timetolive 命令来查看当前租约的剩余时间。

$ docker exec -it etcd-etcd-1 etcdctl lease timetolive 694d848998e3ec04

lease 694d848998e3ec04 already expired

由于编写文章的时间关系,我们最开始创建的租约694d848998e3ec04 已经过期了,下面我们来进行连贯操作,先创建,然后直接查看租约。

$ docker exec -it etcd-etcd-1 etcdctl lease grant 10
lease 694d848998e3ec0d granted with TTL(10s)

$docker exec -it etcd-etcd-1 etcdctl lease timetolive 694d848998e3ec0d
lease 694d848998e3ec0d granted with TTL(10s), remaining(2s)

接下来,我们来通过 lease list 来查看当前 etcd 的租约列表:

# 新建租约
$ docker exec -it etcd-etcd-1 etcdctl lease grant 2000
lease 694d848998e3ec10 granted with TTL(2000s)
$ docker exec -it etcd-etcd-1 etcdctl lease list
found 1 leases
694d848998e3ec10

在上面的命令中我们创建了一个 2000 秒的租约,下面,我们使用 lease revoke 来删除租约。

$ docker exec -it etcd-etcd-1 etcdctl lease revoke 694d848998e3ec10
lease 694d848998e3ec10 revoked
$ docker exec -it etcd-etcd-1 etcdctl lease list
found 0 leases

在上面有描述在服务发现中如何处理异常节点中提到了续约其命令使用的就是 lease keep-alive

# 创建一个 20 秒的租约
$ docker exec -it etcd-etcd-1 etcdctl lease grant 200
lease 694d848998e3ec15 granted with TTL(200s)

# 执行续约
$ docker exec -it etcd-etcd-1 etcdctl lease keep-alive 694d848998e3ec15

lease 694d848998e3ec15 keepalived with TTL(200)

Lease 与 Key 的关联

key 与 lease 的绑定非常简单,只需要我们在创建 key-value 的时候在末尾加上 --lease xxx 即可,其中 xxx 就是我们的租约 id。

#新建租约
$ docker exec -it etcd-etcd-1 etcdctl lease grant 60
lease 694d848998e3ec19 granted with TTL(60s)

# 绑定租约
$ docker exec -it etcd-etcd-1 etcdctl put /user maksim --lease  694d848998e3ec19
OK

# get user
$ docker exec -it etcd-etcd-1 etcdctl get /user
/user
maksim

我们等待 60 秒后再去查看 /user。

$ docker exec -it etcd-etcd-1 etcdctl get /user

$ docker exec -it etcd-etcd-1 etcdctl get /user --prefix
/user/102/name

我们可以看到,我们新建的 /user 已经被删除了,只能查看到上一篇文章的内容。

如果想要查看当前租约下绑定了哪些 key,我们可以使用 lease timetolive xxx --keys 来进行查看。

# 新建租约
$ docker exec -it etcd-etcd-1 etcdctl lease grant 60
lease 694d848998e3ec25 granted with TTL(60s)

# PUT 数据并且绑定租约
$ docker exec -it etcd-etcd-1 etcdctl put /user maksim --lease 694d848998e3ec25
OK

# 查看租约下的 key
$ docker exec -it etcd-etcd-1 etcdctl lease timetolive 694d848998e3ec25 --key
lease 694d848998e3ec25 granted with TTL(60s), remaining(28s), attached keys([/user])

服务注册

在最开始,我们提到过 etcd 的应用场景就包括了服务注册于发现,注册和发现是两个部分,在这里,我们先讲服务注册,服务注册的原理其实很简单,我们可以使用一些开源的库直接来完成,也可以利用手工的方式将节点信息写入到 etcd 当中。

简单注册

如果是点对点的调用方式,那就是客户端直接调用服务端,有了注册中心后是先去访问 etcd 或得到服务端的节点信息,节点信息主要包括:ip地址,端口号。

接下来,我们来实现服务的注册,在 etcd 中我们存储如下数据:

/services/product1/product
/services/users1/userinfo
  • services 代表着服务
  • product1 和 user1 代表节点,后面可能会存在 product2、product3 我们以此来做负载均衡
  • product 和 userinfo 是服务

我们来编写 product 服务

package main

import (
    "github.com/gorilla/mux"
    "net/http"
)

func main() {
    r := mux.NewRouter()
    r.HandleFunc("/product/{id:\\d+}", func(writer http.ResponseWriter, request *http.Request) {
        vars := mux.Vars(request)
        str := "get proudct ById: " + vars["id"]
        writer.Write([]byte(str))
    })
    http.ListenAndServe(":8081", r)
}

Untitled

这里只是简单提供了一个 product 接口,效果如上图。

接下来我们实现 service.go,其中主要包含两个方法

  • NewService: 实例化 service
  • Register:向 etcd 注册服务
package util

import (
    "context"
    "go.etcd.io/etcd/clientv3"
    "log"
    "time"
)

type Service struct {
    client *clientv3.Client
}

func NewService() *Service {
    config := clientv3.Config{
        Endpoints:   []string{"127.0.0.1:2379"},
        DialTimeout: 10 * time.Second,
    }

    client, err := clientv3.New(config)
    if err != nil {
        log.Fatal(err)
    }
    return &Service{client: client}
}

// 注册服务
func (s *Service) Register(id string, name string, address string) error {
    kv := clientv3.NewKV(s.client)
    prefix := "/services/"
    ctx := context.Background()
    _, err := kv.Put(ctx, prefix+id+"/"+name, address)
    return err
}

代码很简单,用到的都是我们上面学到的内容。

  1. 首先创建 client
  2. 创建 kv
  3. 向 etcd 中写入数据

接下来改造 main 函数。

package main

import (
    "etcd/util"
    "github.com/gorilla/mux"
    "net/http"
)

func main() {
    r := mux.NewRouter()
    r.HandleFunc("/product/{id:\\d+}", func(writer http.ResponseWriter, request *http.Request) {
        vars := mux.Vars(request)
        str := "get proudct ById: " + vars["id"]
        writer.Write([]byte(str))
    })
    s := util.NewService()
    s.Register("p1", "productservice", "127.0.0.1:8081")
    http.ListenAndServe(":8081", r)
}

我们先简陋处理,后续再添加协程以及信号监听。

2023-04-04T12:49:35.png

运行后,我们可以看到节点信息已经被写入到 etcd 中了其流程如下:

2023-04-04T12:49:06.png

优雅启动

2023-04-04T12:49:51.png

我们可以看到在 ide 中,我们最后两步目前是报黄了,没有去处理异常。当服务出现异常我们都是需要进行处理的,这里先初步的设计。

在优雅启动中,我们分成了 4 步。

第一步:我们首先为了以后维护方便,将写死的值变成一个变量。

func main() {

    serviceId := "p1"
    serviceName := "productservice"
    serviceAddr := "127.0.0.1"
    servicePort := 8080
    r := mux.NewRouter()
    r.HandleFunc("/product/{id:\\d+}", func(writer http.ResponseWriter, request *http.Request) {
        vars := mux.Vars(request)
        str := "get proudct ById: " + vars["id"]
        writer.Write([]byte(str))
    })
    s := util.NewService()
    s.Register(serviceId, serviceName, serviceAddr+":"+strconv.Itoa(servicePort))
    http.ListenAndServe(":"+strconv.Itoa(servicePort), r)
}

第二步:协程运行

go func() {
        s.Register(serviceId, serviceName, serviceAddr+":"+strconv.Itoa(servicePort))
        http.ListenAndServe(":"+strconv.Itoa(servicePort), r)
}()

第三步:监听信号

go func() {
        sig := make(chan os.Signal)
        signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
}()

第四步:处理错误和信号处理

package main

import (
    "etcd/util"
    "fmt"
    "github.com/gorilla/mux"
    "log"
    "net/http"
    "os"
    "os/signal"
    "strconv"
    "syscall"
)

func main() {

    serviceId := "p1"
    serviceName := "productservice"
    serviceAddr := "127.0.0.1"
    servicePort := 8080
    r := mux.NewRouter()
    r.HandleFunc("/product/{id:\\d+}", func(writer http.ResponseWriter, request *http.Request) {
        vars := mux.Vars(request)
        str := "get proudct ById: " + vars["id"]
        writer.Write([]byte(str))
    })
    s := util.NewService()

    errChan := make(chan error)

    go func() {
        err := http.ListenAndServe(":"+strconv.Itoa(servicePort), r)
        if err != nil {
            errChan <- err
        }
        err = s.Register(serviceId, serviceName, serviceAddr+":"+strconv.Itoa(servicePort))
        if err != nil {
            errChan <- err
        }

    }()

    go func() {
        sig := make(chan os.Signal)
        signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
        errChan <- fmt.Errorf("%s", <-sig)
    }()
    getErr := <-errChan

    log.Fatalln("err:", getErr)
}

最终的代码就是这样的,我们新建了一个 errChan 去处理错误,如果我们开启了数据库连接,redis 连接就可以在这里进行资源回收了。

在这里,我们实现 http 服务的优雅关闭。

func main () {   
  .... 
    serv := &http.Server{
        Addr:              ":"+strconv.Itoa(servicePort),
        Handler:           r,
    }
    go func() {
        err := serv.ListenAndServe()
        if err != nil {
            errChan <- err
        }
        err = s.Register(serviceId, serviceName, serviceAddr+":"+strconv.Itoa(servicePort))
        if err != nil {
            errChan <- err
        }

    }()

    go func() {
        sig := make(chan os.Signal)
        signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
        errChan <- fmt.Errorf("%s", <-sig)
    }()
    getErr := <-errChan
    err := serv.Shutdown(context.Background())
  ...
}

服务关闭时反注册

当我们的服务发生问题异常关闭,或者是接收到信号后需要进行退出操作,这个时候我们需要将该节点的信息从 etcd 中删除。

// UnRegister 反注册服务
func (s *Service) UnRegister(id string) error {
    kv := clientv3.NewKV(s.client)
    prefix := "/services/" + id
    ctx := context.Background()
    _, err := kv.Delete(ctx, prefix, clientv3.WithPrefix())
    return err
}

接下来我们在服务关闭之前调用反注册。

err := s.UnRegister(serviceId)
if err != nil {
        log.Fatalln("err:", err)
}
err = serv.Shutdown(context.Background())

关闭应用程序前:

2023-04-04T12:50:34.png

关闭应用程序后

2023-04-04T12:50:45.png

设置租约

在注册服务中现在创建的数据是不会过期的,但是在生产环境中如果服务假死无法提供服务的时候,在注册中心中我们就需要将其摘掉。

// Register 注册服务
func (s *Service) Register(id string, name string, address string) error {
    kv := clientv3.NewKV(s.client)
    prefix := "/services/"
    ctx := context.Background()
    leaseRes, err := clientv3.NewLease(s.client).Grant(ctx, 20)
    if err != nil {
        return err
    }
    _, err = kv.Put(ctx, prefix+id+"/"+name, address, clientv3.WithLease(leaseRes.ID))
    return err
}

这样一来,当过了 20 秒后还没有续期,就说明当前服务已经出现问题了。

2023-04-04T14:18:19.png
2023-04-04T14:18:28.png

定期续租

设置租约后我们修妖解决续租的问题。

// Register 注册服务
func (s *Service) Register(id string, name string, address string) error {
    kv := clientv3.NewKV(s.client)
    prefix := "/services/"
    ctx := context.Background()
    fmt.Println(prefix)
    lease := clientv3.NewLease(s.client)
    leaseRes, err := lease.Grant(ctx, 20)
    fmt.Println(leaseRes)
    if err != nil {
        return err
    }
    _, err = kv.Put(ctx, prefix+id+"/"+name, address, clientv3.WithLease(leaseRes.ID))
    if err != nil {
        return err
    }
    keepliveRes, err := lease.KeepAlive(ctx, leaseRes.ID)
    if err != nil {
        return err
    }
    go lisKeepAlive(keepliveRes)
    return nil
}

func lisKeepAlive(res <-chan *clientv3.LeaseKeepAliveResponse) {
    for {
        select {
        case ret := <-res:
            if ret != nil {
                //续租成功
                fmt.Println("续租成功")
            }
        }
    }
}

我们每个十秒查看一次 etcd 中的数据。

2023-04-04T14:18:49.png

到这里,我们最基本的服务注册就完成了。

服务发现

获取服务数据

服务发现的原理非常简单,其实就是按照我们的定义好的 key 去取值。我们在这一小节现直接将值取出来。先编写客户端。

package util

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "log"
    "time"
)

type Client struct {
    client *clientv3.Client
}

func NewClient() *Client {
    config := clientv3.Config{
        Endpoints:   []string{"127.0.0.1:2379"},
        DialTimeout: 10 * time.Second,
    }

    client, err := clientv3.New(config)
    if err != nil {
        log.Fatal(err)
    }
    return &Client{client: client}
}

func (c *Client) GetService() {
    kv := clientv3.NewKV(c.client)
    res, _ := kv.Get(context.Background(), "/services", clientv3.WithPrefix())
    for _, item := range res.Kvs {
        fmt.Println(string(item.Key))
    }
}

我们在 GetService 中现将数据打印到控制台。

编写 main 函数。

package main

import "etcd/util"

func main() {
    client := util.NewClient()
    client.GetService()
}

2023-04-04T14:19:12.png

接下来我们继续完善获取,我们先新建一个服务信息结构体。

type ServiceInfo struct {
    ServiceId   string
    ServiceName string
    ServiceAddr string
}

func (s *Client) parseService(key []byte, value []byte) {
    reg := regexp.MustCompile("/services/(\\w+)/(\\w+)")
    if reg.Match(key) {
        idAndName := reg.FindSubmatch(key)
        id := idAndName[0]
        name := idAndName[2]
        s.Services = append(s.Services, &ServiceInfo{ServiceId: string(id), ServiceName: string(name), ServiceAddr: string(value)})
    }
}

然后在 client 中增加 []*Serviceinfo 的切片,因为我们不能每一次都去 etcd 中获取数据,这样会增加 etcd 的压力,所以我们需要将信息缓存到内存中。parseService 函数用来讲字符串解析成结构体。

type Client struct {
    client   *clientv3.Client
    Services []*ServiceInfo
}

最后我们在 GetService 函数中增加转换结构体的逻辑。

func (c *Client) GetService() {
    kv := clientv3.NewKV(c.client)
    res, _ := kv.Get(context.Background(), "/services", clientv3.WithPrefix())
    for _, item := range res.Kvs {
        c.parseService(item.Key, item.Value)
    }
}

简单修改一下 main 函数

func main() {
    client := util.NewClient()
    client.GetService()
    for _, service := range client.Services {
        fmt.Println(service)
    }
}

这个时候我们启动两个 product 服务。

2023-04-04T14:19:34.png
2023-04-04T14:19:41.png

根据服务名获取地址和优雅的调用机制

我们的客户端可以直接使用 httpclient 直接调用我们的服务,但是为了更良好的凤凰钻杆,我们可以先设计一个结构。

type Endpoint func (ctx context.Context, request interface{}) (reqponse interface{}, err error)
type EncodeReqeustFunc func(context.Context, *http.Request, interface{}) error
  • Endpoint 代表端口,其实就是一个如何去执行服务调用的基本函数,最终的调用就是通过它来进行调用的。
  • EncodeRequestFunc 也是一个函数,是用户自定义的,用来决定到底怎么传参,调用具体哪个 path。

我们先来修改 GetService 方法。


func (c *Client) LoadService() {
    kv := clientv3.NewKV(c.client)
    res, _ := kv.Get(context.Background(), "/services", clientv3.WithPrefix())
    for _, item := range res.Kvs {
        c.parseService(item.Key, item.Value)
    }
}

func (c *Client) GetService(name string, method string, encodeFunc EncodeRequestFunc) Endpoint {
    for _, service := range c.Services {
        if service.ServiceName == name {
            return func(ctx context.Context, requestParams interface{}) (req interface{}, err error) {
                httpClient := &http.DefaultClient
                httpRequest, err := http.NewRequest(method, "http://"+service.ServiceAddr, nil)
                err = encodeFunc(ctx, httpRequest, requestParams)
                if err != nil {
                    return nil, err
                }
                res, err := (*httpClient).Do(httpRequest)
                defer res.Body.Close()
                if err != nil {
                    return nil, err
                }
                body, err := ioutil.ReadAll(res.Body)
                if err != nil {
                    return nil, err
                }
                return string(body), nil
            }
        }
    }
    return nil
}

我们将原来的 GetService 修改为 LoadService 用来加载 service。

GetService 其实就是查找服务地址,并将服务地址,如何调用封装成一个闭包,返回给用户,然后用户在需要调用的地方进行调用。

没有看懂代码没关系,我们来看一下main 中是如何调用的

type ProdRequestParams struct {
    ProductId int
}

func main() {
    client := util.NewClient()
    client.LoadService()

    for _, service := range client.Services {
        fmt.Println(service)
    }
    endpoint := client.GetService("productservice", "GET", func(ctx context.Context, request *http.Request, i interface{}) error {
        params := i.(ProdRequestParams)
        request.URL.Path += "/product/" + strconv.Itoa(params.ProductId)
        return nil
    })
    res, err := endpoint(context.Background(), ProdRequestParams{ProductId: 106})
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(res)
}

我们首先定义了一个 ProdRequestParams 来代表请求参数,其中包含饿了一个 ProductId 的字段。

重点是这一段:

    endpoint := client.GetService("productservice", "GET", func(ctx context.Context, request *http.Request, i interface{}) error {
        params := i.(ProdRequestParams)
        request.URL.Path += "/product/" + strconv.Itoa(params.ProductId)
        return nil
    })

这里面定义了请求的参数如何组织,以及请求的地址,并且通过 GetService 获取了一个执行服务的闭包函数。

    res, err := endpoint(context.Background(), ProdRequestParams{ProductId: 106})
    if err != nil {
        log.Fatal(err)
    }

真正发起调用的地方是这里,在前面的几个步骤,其实都是在构建函数,我们执行一下。

➜  etcd go run client-demo.go
&{/services/p2/productservice productservice 127.0.0.1:8082}
get proudct ById: 106
  • Endpoint 代表端口,其实就是一个如何去执行服务调用的基本函数
  • EncodeRequestFunc 也是一个函数,是用户自定义的,用来决定到底怎么传参,调用具体哪个 path

同时注册多个服务到 ETCD

在之前我们注册了一个服务,如果需要负载均衡目前的办法是手动修改 ServiceID,这样的办法其实很 low,需要大量的手工操作,例如:

go run p1.go -name prudctservice -id=p1 -p8081

其中 -id 指定的就是我们的 ServiceID,其实我们可以用 id 生成工具来帮助我们进行生成,这样可以减少手工的操作。

go get -u github.com/satori/go.uuid
serviceId := uuid.NewV4().String()

这样我们看一下 etcd 中的值:

$  docker exec -it etcd-etcd-1 etcdctl get / --prefix
/services/7ade2d76-b681-41cc-9ea1-55dccabbccf4/productservice
127.0.0.1:8082

修改完 uuid 后还需要对 loadService 进行修改。

func (s *Client) parseService(key []byte, value []byte) {
    reg := regexp.MustCompile("/services/(\\w{8}(-\\w{4}){3}-\\w{12})/(\\w+)")
    if reg.Match(key) {
        idAndName := reg.FindSubmatch(key)
        id := idAndName[1]
        name := idAndName[3]
        s.Services = append(s.Services, &ServiceInfo{ServiceId: string(id), ServiceName: string(name), ServiceAddr: string(value)})
    }
}

负载均衡

package util

import (
    "math/rand"
    "time"
)

type LoadBalance struct {
    Servers []*ServiceInfo
}

func NewLoadBalance(services []*ServiceInfo) *LoadBalance {
    return &LoadBalance{Servers: services}
}

func (l *LoadBalance) GetByRand(name string) *ServiceInfo {
    tmp := make([]*ServiceInfo, 0)
    for _, service := range l.Servers {
        if service.ServiceName == name {
            tmp = append(tmp, service)
        }
    }
    if len(tmp) == 0 {
        return nil
    }

    rand.Seed(time.Now().UnixNano())
    max := len(tmp) - 1
    if max == 0 {
        return l.Servers[0]
    }
    i := rand.Intn(max)
    return l.Servers[i]
}

在负载均衡结构体重存储了所有的服务列表,这个服务列表是在获取服务的时候放进去的,然后在需要获取服务的地方 调用 getByRand 来获取服务,这里采用了随机算法,随机从数组中取值。