《Redis设计与实现》读书笔记

第二章 简单动态字符串

SDS还被用作AOF缓冲区

1
2
3
4
5
6
7
8
9
10
struct sdshdr {
  //buf数组中已使用字节的数量
  int len;

  // buf数组中未使用字节的数量
  int free;

  // 保存字符串,字符串的最后一个字节保存空字符'\0'
  char buf[];
}

2.2 SDS与C字符串区别

2.2.1 O(1)获取字符串长度

2.2.2 杜绝缓冲区溢出

1
char *strcat(char *dest, cont char *src)

C字符串不记录自身的长度,strcat函数假定dest已有足够多的内存。

当SDS API对SDS进行修改时,API会先检查SDS的空间是否满足修改所需。

减少修改字符串时带来的内存重分配

C字符串每次增长或者缩短,程序总要对保存这个C字符串的数组进行一次内存重分配。 但Redis作为数据库,数据被频繁修改。

SDS空间预分配

当SDS的API对一个SDS进行修改,且需要对SDS进行空间扩展。程序不仅为SDS分配所必要的空间,还会分配额外空间。 * 修改后SDS的len小于1MB,那么程序分配和len属性同样大小的未使用空间。 * 修改后SDS的len大于1MB,则额外分配1MB。

SDS惰性空间释放

当API需要缩短字符串,程序不会立即进行内存重分配。而使用free属性记录,以待将来使用。

2.2.4 二进制安全

buf属性为字节数组,保存一系列二进制数据。

2.2.5 兼容部分C字符串函数

sds->buf

第三章 链表

1
2
3
4
5
typedef struct listNode {
  struct listNode *prev;
  struct listNode *next;
  void *value;
} listNOde;
1
2
3
4
5
6
7
8
9
10
11
12
typedef struct list {
  ListNode *head;
  ListNOde *tail;
  // 节点数
  unsigned long len;
  // 节点复制函数
  void *(*dup)(void *ptr);
  // 节点释放函数
  void (*free)(void *ptr);
  // 节点值对比函数
  int (*match)(void *ptr, void *key)
} list;

第四章 字典

1
2
3
4
5
6
7
8
9
10
typedef struct dictht {
  // 哈希表数组
  dictEntry **table;
  // 哈希表大小
  unsigned long size;
  // 哈希表大小掩码, 用于计算索引值。等于size-1
  unsigned long sizemask;
  // 哈希表已有节点数
  unsigned long used
} dictht;
1
2
3
4
5
6
7
8
9
10
11
12
typedef struct dictEntry {
  void *key;

  union {
    void *val;
    uint64_t u64;
    int64_t s64;
  }

  // 解决键冲突的问题
  struct dictEntry *next;
} dictEntry;

4.1.3 字典

1
2
3
4
5
6
7
8
9
10
typedef struct dict {
  // 类型特定函数
  dictType *type;
  // 私有数据
  void *privdata;
  // 哈希表,一般只使用ht[0],ht[1]只在对ht[0]进行rehash才使用。
  dictht ht[2];
  // rehash索引。当值为-1,rehash不在进行。记录rehash的进度
  int trehashidx;
}
  • dictType结构保存用于特定类型键值对的函数。
  • privdata属性保存了需要传给那些类型特定函数的可选参数。
    1
    2
    3
    4
    5
    6
    7
    8
    
    typedef struct dictType {
      unsigned int (hashFunction)(const void *key);
      void *(keyDup)(void privdata, const void *key);
      void *(valDup)(void privdata, cont void *obj);
      int (keyCompare)(void privdata, const void *key1, const void *key2);
      void (keyDestructor)(void privdata, void *key);
      void (valDestructor)(void *privdata, void *obj);
    } dictType;

4.2 哈希算法

1
2
hash = dict->type->hashFunction(key);
index = hash & dict->ht[x].sizemask;

4.4 rehash

  • 为字典的ht[1]分配空间。
    • 扩容时的大小为第一个大于等于ht[0].used * 2 的 2^n.
    • 收缩时的大小为第一个大于等于ht[0].used的2^n。
  • 将ht[0]的所有键值对rehash到ht[1]。重新计算哈希值和索引。
  • 释放ht[0],将ht[1]设为ht[0],并为ht[1]新创建一个空白哈希表。

4.4 扩展与收缩

  • 没有执行BGSAVE或者BGREWRITEAOF,且哈希表负载因子大于等于1.
  • 正在执行BGSAVE或者BGREWRITEAOF,负载因子大于5.

load_factor = ht[0].used / ht[0].size

当负载因子小于0.1,进行收缩。

渐进式rehash

  • 为ht[1]分配空间。
  • 维持索引计算器变量rehashidx,设为0
  • 在rehash期间,每次对字典的添加、删除、查找或者更新操作,程序顺带将ht[0]哈希表在rehashidx索引上的所有键值对rehash到ht[1],将rehashidx增一。
  • 最终ht[0]的所有键值对都rehash到ht[1],将rehashidx属性设为-1.

在rehash过程,字典同时使用ht[0]和ht[1]。所以delete、find、update等需在两个ht上进行。新添加的字典的键值一律保存在ht[1]。

第五章 跳跃表 skiplist

1
2
3
4
5
6
7
8
9
10
11
12
13
typedef struct zskiplistNode {
  struct zskiplistNode *backward;
  // 分值
  double score;
  // 成员对象
  rojb *obj;
  // 层
  struct zskiplistLevel {
    struct zskiplistNode *forward;
    // 跨度
    unsigned int span;
  } level[];
} zskiplistNode;
1
2
3
4
5
6
7
typedef struct zskiplist {
  structz skiplistNode *header, *tail;
  // 节点数量
  unsigned long length;
  // 表中层数最大的节点的层数
  int level
}
  • 层高1至32
  • 多个节点可以有相同的score,但obj需唯一。
  • 节点按照score大小进行排序,当score相同,按成员对象的大小排序。

第六章 整数集合 intset

1
2
3
4
5
6
7
8
typedef struct intset {
  // 编码方式
  uint32_t encoding;
  // 集合包含的元素数量
  uint32_t length;
  // 保存元素的数组,从小到大排列
  int8_t contents[];
}

6.2 升级

  • 根据新元素类型,重新分配空间。
  • 将现有的元素转换成新元素相同的类型,并放到正确的位置。
  • 添加新元素。

例子,现有int_16类型的数组[1,2,3],添加65535 位 | 0-15 | 16 - 31 | 32 - 47 | 48 - 127 元素 | 1 2 3 新分配空间

挪动1、2、3到正确位置 位 | 0-31 | 31 - 63 | 64 - 95 | 96 - 127 元素 | 1 2 3 新分配空间

添加新元素的时间复杂度为O(N)。

不能降级。

第七章 压缩列表 ziplist

7.1 构成

zlbytes | zltail | zllen | entry1 | entry2 | … | entryN | zlend

  • zlbytes, uint32_t, 记录整个压缩列表占用的字节数
  • zltail, uint32_t, 记录表尾节点到ziplist起始位置的距离。 lastEntry = head + zltail
  • zllen, uint16_t, 节点数量
  • entryX 节点
  • zlend, uint8_t, 特殊值0xFF

7.2 节点构成

previousentrylength | encoding | content

content保存字节数组或整数值。

7.2.1 previousentrylength

长度可以是1字节或5字节 * 前一节点的长度小于254字节 * 前一节点的长度大于等于254字节,则第一字节为0xFE,之后4字节保存前一节点的长度。

entry1 | entry2 | entry3 | entry4 entry3pointer = entry4pointer - entry4.previousentrylength

7.3 连锁更新

zlbytes | zltail | zllen | e1 | e2 | … | eN | zlend

e1至eN的节点长度都大于250个字节并且小于254字节。

插入newElement,长度大于254字节

zlbytes | zltail | zllen | newElement | e1 | e2 | … | eN | zlend

因为e1的previousentrylength属性长度仅为1字节,需要扩展为5字节。则新e1的长度大于254字节,e2也要重新分配。同理e3, e4…eN

最坏情况O(N^2)。需要对ziplist执行N次空间重分配操作,而每次空间重分配的最坏复杂度为O(N)。实际并不常见。

第八章 对象

字符串、列表、哈希、集合、有序集合

1
2
3
4
5
6
7
typedef struct redisObject {
  // 可作类型检查
  unsigned type:4
  unsigned encoding:4
  // 指向底层实现数据结构的指针
  void *prt;
}

type的类型 * REDISSTRING * REDISLIST * REDISHASH * REDISSET * REDIS_ZSET

encoding * REDISENCODINGINT * REDISENCODINGEMBSTR (一次分配的SDS) * REDISENCODINGRAW (SDS) * REDISENCODINGHT (字典) * REDISENCODINGLINKEDLIST * REDISENCODINGZIPLIST * REDISENCODINGINTSET * REDISENCODINGSKIPLIST

type和encoding * REDISSTRING | REDISENCODINGINT * REDISSTRING | REDISENCODINGEMBSTR * REDISSTRING | REDISENCODINGRAW * REDISLIST | REDISENCODINGZIPLIST * REDISLIST | REDISENCODINGLINKEDLIST * REDISHASH | REDISENCODINGZIPLIST * REDISHASH | REDISENCODINGHT * REDISSET | REDISENCODINGINTSET * REDISSET | REDISENCODINGHT * REDISZSET | REDISENCODINGZIPLIST * REDISZSET | REDISENCODING_SKIPLIST

8.2 字符串对象

  • int,保存整数值。
  • raw,值的长度大于39字节。调用两次内存分配来创建redisObject和sdshdr。
  • embstr,值的长度小于等于39字节。调用一次内存分配,redisObject和sdshdr是一块连续的空间。释放也是一次。任何修改操作都会变成raw。

long double类型的浮点数也是作为字符串值来保存。

8.3 列表对象

  • ziplist。所有字符串元素长度小于64字节,且元素数量小于512个。
  • linkedlist

8.4 哈希对象

  • ziplist。 所有键和值的字符串长度都小于64字节,键值对数量小于512个。
  • hashtable

ziplist格式。同一键值对紧挨一起,键在前,值在后。 zlbytes | zltail | zlen | k1 | v1 | k2 | v2 | zlend

8.5 集合对象

  • intset。所有元素都是整数,元素数量小于512.
  • hashtable。字典的键为字符串,值为NULL。

8.6 有序集合

  • ziplist。元素member在前,元素的score在后。 按score从小到大排列。 元素数量小于128个,所有元素成员的长度都小于64字节。
  • skiplist。同时包含字典和跳跃表。
1
2
3
4
typedef struct zset {
  zskiplist *zsl;
  dict *dict;
} zset;

zsl跳跃表按分值从小到大保存了所有集合的元素。跳跃表节点的object属性保存了元素的成员,跳跃表节点的score属性则保存了元素的分值。如果只使用跳跃表,查找成员分值的复杂度O(logN)

dict字典的key是成员,value是分值。可以O(1)找到给定成员的score。如果只使用字典,ZRANK、ZRANGE需要对所有值进行排序, O(N * logN)时间复杂度,并需要额外O(N)空间保存排序后的元素。

8.8 内存回收

1
2
3
4
typedef struct redisObject {
  // 引用计数
  int refcount;
}

8.9 对象共享

Redis在初始化服务器时,创建从0到9999的字符串。

8.10 空转时长

1
2
3
4
typedef struct redisObject {
  // 最后一次被访问的时间
  unsigned lru:22
}

第九章 数据库

1
2
3
4
5
6
7
struct redisServer {
  // 一个数组,保存服务器的所有数据库
  // db[0] | db[1] | db[2] | ...
  redisDB *db;
  // 数据库数量
  int dbnum;
}
1
2
3
4
5
typedef struct redisClient {
  // 正在使用的数据库
  redisDb *db

} redisClient;

redisDb

1
2
3
4
5
6
7
typedef struct redisDb {
  // 数据库键空间,保存数据库的所有键值对
  dict *dict;

  // 过期字典, 保存着键的过期时间,unix timestamp
  dict *expires;
}

9.5 过期键删除策略

  • 定时删除。创建一个timer,让timer在键的过期时间立即删除。占CPU,且Redis的时间事件是无序链表,查找需要O(N)
  • 惰性删除。每次获取键时,检查是否过期。
  • 定期删除。每隔一段时间。activeExpireCycle函数。
    • 函数每次运行时,从一定数量的数据库,随机取出一些键进行检查。
    • 全局变量current_db记录检查进度。
    • 全部数据库都检查后,current_db重置为0.

9.7 AOF、RDB对过期键的处理

  • 生成RDB,会进行检查,过期键不会保存。
  • 载入RDB。
    • 主服务器,过期键会忽略
    • 从服务器,载入,不会忽略。
  • AOF写入。当键被清理,程序向AOF文件追加一条DEL命令
  • AOF重写。已过期的键不会写入。

主从复制 - 主服务器删除一个过期键,向所有从服务器发送DEL命令 - 从服务器在执行客户端的读命令时,即使碰到过期键也不会清理,并正常返回。主服务器统一控制过期键的清理。

第10章 RDB持久化

10.1 RDB文件的创建与写入

  • SAVE。 阻塞Redis进程,期间服务器不能处理任何命令。
  • BGSAVE。fork一个子进程,由子进程创建RDB。

RDB文件的载入工作是在服务器的启动时。

如果服务器开启了AOF,优先使用AOF文件还原数据。

10.1.2 BGSAVE命令执行时的服务器状态

  • BGSAVE执行期间,客户端发起的SAVE会被拒绝。
  • 客户端发起的BGSAVE也会被拒绝。
  • BGREWRITEAOF和BGSAVE不能同时执行。性能方面考虑。

10.2 自动间隔保存

  • save 900 1,900秒内,至少1次修改
  • save 300 10,300秒内,至少10次修改
  • save 60 10000,60秒内,至少10000次修改
1
2
3
4
5
6
7
8
9
10
11
12
13
struct redisServer {
  //记录了保存条件的数组
  struct saveparam *saveparams;
  // 修改计数器,距上一次SAVE后,由多少次修改
  long long dirty
  // 上一次保存时间, unix timestap
  time_t last_save
}

struct saveparam {
  time_t seconds;
  int changes;
}

10.3 RDB文件结构

全大写单词标示常量,全小写单词标示变量和数据

REDIS | dbversion | database0 | database1 | … | databaseN | EOF | checksum

database部分

SELECTDB | dbnumber | keyvalue_pairs

keyvaluepair部分

不带过期时间

TYPE | key |value

带过期时间

EXPIRETIME_MS | ms | TYPE | key | value

第11章 AOF持久化

保存服务器执行的命令。

1
2
3
4
struct redisServer {
  // AOF 缓冲区
  sds aof_buf;
}

AOF文件的写入与同步

  • always。效率最低
  • everysec。距离超过一秒钟。
  • no。同步由操作系统决定。

11.2 AOF文件的载入

  • 创建一个不带网络连接的fake client
  • 从AOF文件分析,读出一条命令
  • fake client执行命令
  • 重复上面两步

11.3 AOF重写

解决AOF文件体积膨胀问题。 Redis创建一个新的AOF文件替代原文件。

AOF文件重写不需要对现有的AOF文件进行任何读取。

从数据库中读取键的现在的值,然后用一条命令记录键值对。

11.3.2 AOF后台重写

AOF重写程序放在子进程。

问题: 在子进程重写期间,新的命令可能对现有的数据库状态进行修改。

解决 * 执行客户端发来的命令 * 将执行后的写命令追加到AOF缓冲区 * 将执行后的写命令追加到AOF重写缓冲区

当子进程完成AOF重写后,向父进程发送信号。父进程 * 将AOF重写缓冲区的内容写入到新AOF文件 * 用新文件覆盖旧文件

第12章 事件

文件事件。服务器通过套接字与客户端连接,文件事件就是服务器对套接字的抽象。 使用I/O多路复用

时间事件。所有时间事件放在一个无序链表time_events

  • id, 全局唯一ID
  • when, unix timestamp
  • timeProc, 处理函数

第13章 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
struct redisServer {
  // 链表,保存了所有客户端状态
  // 新连接的客户端放到链表末尾
  list *clients
}

typedef struct redisClient {
  // 套接字的描述符,伪客户端为-1
  int fd;
  // 默认情况,没有名字
  robj *name;
  // 标志
  int flags;
  //输入缓冲区,保存客户端发送的命令,不超过1GB
  sds querybuf;

  // 服务器对querybuf的请求进行分析,生成argv和argc
  // 一个数组,每项时一个字符串对象,argv[0]是要执行的命令,如SET
  robj **argv
  // 记录argv的长度
  int argc

  // 命令的实现,通过argv[0]查找命令表
  struct redisCommand *cmd

  // 输出缓冲区, REDIS_REPLY_CHUNK_BYTES默认大小16*1024
  char buf[REDIS_REPLY_CHUNK_BYTES];
  // 记录buf数组已使用的字节数
  int bufpos
  // 当buf数组已经用完,使用可变大小缓冲区
  list *reply

  // 身份验证,0为未通过,1为通过
  int authenticated;

  // 创建客户端的事件
  time_t ctime;
  // 最后一次进行交互,可计算客户端空转idle时间
  time_t lastinteraction;
  // 记录输出缓冲区第一次到达软性限制的时间
  time_t obuf_soft_limit_reched_time;
}

13.2.2 关闭普通客户端

  • 客户端进程退出
  • 客户端发送了不符合协议格式的命令
  • 客户端设置了timeout,空转时间超过设置。
  • 客户端发送的请求的大小超过了输入缓冲区限制。
  • 输出缓冲区溢出。

第14章

读取命令请求

  • 读取套接字中的命令,保存到客户端状态的输入缓冲区
  • 分析输入缓冲区,提取命令参数和个数,保存到argv和argc属性
  • 调用命令执行器

命令执行器的后续工作

  • 如果开启了慢查询日志功能,添加新记录
  • 刚执行的命令的耗时,添加到redisCommand的milliseconds属性,并且calls属性加一。
  • 如果开启了AOF,写入到AOF缓冲区。
  • 服务器将命令传播给所有从服务器。

14.2 serverCron函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
struct redisServer {
  // serverCron每100毫秒更新
  // 当前unix timestamp,秒级
  time_t unixtime;
  // 当前unix timestamp,毫秒级
  long long mstime

  // 每10秒更新一次的时钟缓存,用于计算键的空转时长
  unsigned lruclock:22

  // 上一次抽样时间
  long long ops_sec_last_samle_time;
  // 上一次抽样时,服务器已执行的命令数
  long long ops_sec_last_sample_ops;
  // 循环数组,默认大小16,每项记录一次抽样结果
  long long ops_sec_samples[REDIS_OPS_SEC_SAMPLES];
  // ops_sec_amples的索引值
  int ops_sec_idx;

  //已使用内存峰值
  size_t stat_peak_memory;

  //如果为1,有BGREWRITEAOF命令被延迟了
  int aof_rewrite_scheduled;

  pid_t rdb_child_pid;
  pid_t aof_child_pid;
}

typedef struct redisObject {
  // 计算一个键的空转时长
  // 服务器的lruclock - 对象的lru
  unsigned lru:22
}

检查持久化操作

  1. 服务器没有在执行持久化操作
  2. 有BGREWRITEAOF被延迟?
    • 是,执行BGSAVE
    • 否,自动保存的条件已经满足?
    • 是,执行BGSAVE
    • 否, AOF重写的条件已经满足?
      • 是,执行BGREWRITEAOF
      • 否,不作动作

第15章 复制

15.1 旧版复制

客户端向从服务器发送SLAVEOF命令,要求从服务器复制主服务器。

  • 从服务器向主服务器发送SYNC
  • 主服务器执行BGSAVE,生成RDB文件,并用一个缓冲区记录从现在开始的所有命令
  • 主服务器将RDB文件发给从服务器,从服务器加载RDB文件。
  • 主服务器将缓冲区的命令发送给从服务器,从服务器执行。

问题:断线后重连,需重新SYNC。

15.3 新版复制

使用PSYNC代替SYNC * 完整重同步 * 部分重同步。主服务器可以将断开期间的写命令发送给从服务器。

复制偏移量

  • 主服务器每次向从服务器传播N个字节,将自己的复制偏移量的值加上N。
  • 从服务器每次接收到N个字节,将自己的复制偏移量加N。
  • 如果主从一致,两者的复制偏移量总是相同的。

复制积压缓冲区

主服务器命令传播时,也会将命令写入到复制积压缓冲区

  • 如果offset之后的数据仍然存在于复制积压缓冲区, 主从执行部分重同步。
  • 否则,执行完整重同步。

第16章 Sentinel

当server1下线超时,Sentinel的故障转移: * 挑选server1的一个从服务器,将它升级为主服务器。 * 向server1的所有从服务器发送新的复制命令,让它们变成新主的从。 * 继续监视server1,在它重新上线时,将它设为新主的从。

启动并初始化Sentinel

只是一个运行在特殊模式的Redis服务器,不使用数据库,所以不会载入RDB、AOF。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
struct sentinelState{
  // 当前纪元,用于实现故障转移
  uint64_t current_epoch

  // 这个Sentinel建设的所有主服务器
  // 键是服务器名
  // 值指向sentinelRedisInstance结构的指针
  dict *masters

  // 是否进入TILT模式
  int tilt;

  // 正在执行的脚本数
  int running_sripts;
  // 进入TITLT模式的时间
  mstime_t tilt_start_time;
  // 最后一次执行时间处理器的时间
  mstime_t previous_time;

  // FIFO队列,所有需要执行的用户脚本
  list *scripts_queue;
} sentinel;

typedef struct sentinelRedisInstance {
  // 标识,记录实例的类型,实例的状态
  int flags
  // 实例名字,格式ip:port
  char *name
  // 实例运行ID
  char *runid
  // 配置纪元
  uint64_t config_epoch;
  // 实例的地址
  sentinelAddr *addr

  // 实例无响应多少毫秒判断为主观下线
  mstime_t down_after_period;
  // 判断这个实例为客观下线所需的票数
  int quorum;
  // 在故障转移时,可以同时对新主进行同步的从服务器数
  int parallel_syncs;
  // 刷新故障迁移的最大时限
  mstime_t failover_timeout;

  // 主服务器的所有从服务器
  // 键是ip:port
  // 值指向sentinelRedisInstance结构的指针
  dict *slaves 
  // 监视该主服务器的所有Sentinel
  // 键是ip:port
  // 值指向sentinelRedisInstance结构的指针
  dict *sentinels  
} sentinelRedisInstance;

typedef struct sentinelAddr {
  char *ip;
  int port;
} seninelAddr;

初始化Seninel的最后一步,创建连向被监视主服务器的网络连接,称为主服务器的客户端。会创建两个异步网络连接 * 命令连接。发命令,收命令回复。 * 订阅连接。订阅主服务器的sentinel:hello频道

Sentinel每十秒一次向被监视的主服务器发INFO命令,主服务器返回信息包含: * 服务器本身的信息,运行ID, role域记录的角色。 * 所有从服务器的信息。包含ip、port。用这些信息更新主服务器实例结构的slaves字典。

Seninel也会于从服务器建立命令连接和订阅连接。

对于监视同一个服务器的多个Sentinel,一个Sentinel发送的信息会被其他Sentinel接收到。从sentinel:hello消息时,可以提取出消息中的Sentinel IP, port, 运行ID。因此可以发现其他Sentinel,并更新为主服务器创建的实例结构的sentinels字典。

16.6 检测主观下线

每秒一次的频率向与它建立了命令连接的实例(主、从、其他Sentinel)发送PING命令。正常返回PONG

16.7 客观下线

当Sentinel将一个主服务器判断为主观下线后,为了确认,向同样监视这个主服务器的其他Sentinel进行询问。超过一定比例。

16.8 选举领头Sentinel

当一个主服务器被判断为客观下线,监视这个服务器的各Sentinel会选举出一个领头Sentinel。由领头Sentinel对下线服务器进行故障转移。 * 都有资格。 * 每轮投票过后,configuration epoch加一。 * 在一个纪元内,每个只有一次投票机会。 * 每个发现主服务器进入客观下线的Sentinel都会要求其他Sentinel投自己一票。 * Sentinel设置当轮领头Sentinel的规则是先到先得。 * 如果某个Sentinel票数超过半数,则成为领头Sentinel。 * 如果在给定时限内,没有选出。则重新投票。

16.9 故障转移

  • 选一个从,设为新主
  • 其他从,改为复制新主
  • 监视旧主,上线将它设为从

新主的选择,按以下规则从上往下。 * 最高优先级 * 偏移量最大 * 运行ID最小的服务器。

第17章

集群通过分片sharding来进行数据共享、复制和故障转移。设置cluster-enbaled配置为yes

1
2
3
4
127.0.0.1:7000> CLUSTER MEET 127.0.0.1 7001
OK
127.0.0.1:7000> CLUSTER MEET 127.0.0.1 7002
OK

集群数据结构

每个节点都是用一个clusterNode结构记录自己的状态,并为集群的所有其他节点都创建一个clusterNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
struct clusterNode{
  // 节点创建时间
  mstime_t ctime;
  // 节点名字
  char name[REDIS_CLUSTER_NAMELEN];
  // 节点标识
  int flags;
  // 当前的配置纪元
  uint64_t configEpoch;

  char ip[REDIS_IP_STR_LEN];
  int port;

  // 保存节点所需的相关信息
  clusterLink *link;
}

typedef struct clusterLink {
  // 连接的创建时间
  mstime_t ctime;
  // TCP套接字描述符
  int fd;
  // 输出缓冲区,保存着等待发送给其他节点的信息
  sds sndbuf;
  // 输入缓冲区,保存从其他节点接收到的信息。
  sds rcvbuf;
  // 也这个节点相关联的节点
  struct clusterNode *node;
} clusterLink;

每个节点都保存着一个clusterState结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct clusterState {
  // 指向当前节点
  clusterNode *myself;
  // 配置纪元
  uint64_t currentEpoch;
  // 集群当前的状态,上线还是下线
  int state;
  // 集群中至少处理着一个槽的节点的数量
  int size;
  // 集群节点名单
  // 键为节点的名字
  // 值指向clusterNode
  dict *nodes;
}

17.2 槽指派

集群分为16384(2048 * 8)个槽slot,每个节点可以处理0个或最多16384个。

1
2
3
4
5
6
127.0.0.1:7000> CLUSTER ADDSLOTS 0 1 2 3 4 ... 5000
OK
127.0.0.1:7001> CLUSTER ADDSLOTS 5001 5002 ... 10000 
OK
127.0.0.1:7002> CLUSTER ADDSLOTS 10001 10002 ... 16383
OK
1
2
3
4
5
struct clusterNode {
  // 二进制位数组,索引i上的二进制位为1代表负责处理槽i
  unsigned char slots[16384/8];
  int numslots;
}
1
2
3
4
5
typedef struct clusterState {
  // slots[i]指向NULL,说明槽i未指派
  // slots[i]指向一个clusterNode结构,表示槽i已经指派
  clusterNode *slots[16384];
}

如果节点只使用clusterNode.slots数组来记录槽的指派信息,求槽i被指派给了哪个节点? 程序需要遍历clusterState.nodes, 检查clusterNode.slots. O(N),N为node数

如果只使用clusterState.slots的话,找到节点A负责的槽。需要遍历clusterState.slots,O(16384)

1
2
3
4
5
def CLUSTER_ADDSLOTS(*all_input_slots):
  for i in all_input_slots:
    if clusterState.slots[i] != NULL
      reply_erro()
      return

17.3 在集群中执行命令

  • 客户端向节点发送命令
  • 如果键所在的槽正好就是当前节点,执行命令。
  • 槽没有指派给当前节点, 返回MOVED错误。 MOVED <slot> <ip>:<port>
  • 客户端根据MOVED错误提供的信息,转向正确的节点。
1
2
def slot_number(key)
  return CRC16(key) & 16383

节点与单机数据库的区别。节点只使用0号数据库。

1
2
3
4
typedef struct clusterState {
  //score是槽号,member是键
  zskiplist *slotstokeys
}
可以方便第对属于某个或某些槽的所有数据库键进行批量操作。

17.4 重新分片

Redis集群管理软件redis-trib * 目标节点准备导入slot的键值对。redis-trib对目标发送CLUSTER SETSLOT <slot> IMPORTING <source_id> * 源节点准备迁移slot的键值对。redis-trib对源节点发送CLUSTER SETSLOT <slot> MIGRATING <target_id> * 获取count个键值对。redis-trib向源节点发送CLUSTER GETKEYSINSLOT <slot> <count> * 对上一步的所有键,遍历,redis-trib向源节点发送MIGRATE <target_ip> <target_port> <key_name> 0 <timeout> * redis-trib向集群任一节点发送CLUSTER SETSLOT <slot> NODE <target_id>,会通过消息发送到整个集群。

17.5 ASK错误

正进行重新分片 * 客户端向源节点发送键key的命令 * key还在源节点,直接执行。 * key已在目标节点, 返回ASK错误,只因客户端转向目标节点。 * 客户端接到ASK错误,向目标节点发送ASKING命令,之后重发原命令。

节点正在导入槽i,但导入未完成,槽i还没有指派给这个节点。将返回MOVED错误,带ASKING可以让目标节点破例一次。

MOVED与ASK区别 * MOVED永久 * ASK只是一次。

17.6 复制和故障转移

CLUSTER REPLICATE <node_id>, 运行节点成为node_id节点的从。

1
2
3
4
5
6
7
8
9
struct clusterNode {
  // 指向主节点
  struct clusterNode *slaveof;

  // 正在复制这个节点的从节点数
  int numslaves;
  // 数组,每一项指向clusterNode结构
  struct clusterNode **slaves;
}

故障检测

每个节点都会定期地向集群所有节点发送PING消息。

半数以上负责处理槽的主节点将某个主节点x报告为疑似下线,那么这个主节点将标记为FAIL。并向集群广播该节点FAIL。

第18章 发布与订阅

1
2
3
4
5
6
SUBSCRIBE "news.it"
PUBLISH "news.it" "hello"

PSUBSCRIBE "news2.*"
PUBLISH "news2.it1" "hello"
PUBLISH "news2.it2" "hello"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct redisServer {
  // key是订阅的频道名
  // value是链表,记录客户端
  dict *pubsub_channels;

  // 模式订阅链表
  // 每个节点包含一个pubsubPattern结构
  // PUBLISH是需要遍历该链表
  list *pubsub_patterns
}

typedef struct pubsubPattern {
  redisClient *client;
  robj *pattern;
}

第19章 事务

1
2
3
4
5
6
7
8
9
10
11
12
redis> MULTI
ok
redis> SET "name" "v1"
QUEUED
redis> GET "name"
QUEUED
redis> SET "author" "v2"
QUEUED
redis> EXEC
1) OK
2) "v1"
3) OK

事务实现

MULTI命令将client的flags属性加上REDIS_MULTI

  • 如果客户端发送的是EXEC、DISCARD、WATCH、MULTI,则立即执行
  • 否则,放入事务队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct redisClient {
  // 事务状态
  multiState mstate;
}

typedef struct multiState {
  // 事务队列, FIFO顺序
  multiCmd *commands;
  // 已入队列命令数
  int count;
} multiState;

typedef struct multiCmd {
  robj **argv;
  int argc;
  struct redisCommand *cmd;
}

19.2 WATCH命令实现

WATCH是一个乐观锁,在EXEC执行前,监视任意数量的键。在EXEC执行时,检查被监视的键是否有被修改,如果有的话,拒绝执行事务。

1
2
3
4
5
6
7
8
redis> WATCH "name"
OK
redis> MULTI
OK
redis> SET "name" "peter"
QUEUED
redis> EXEC
(nil)
1
2
3
4
5
typedef struct redisDb {
  // key是键
  // value是链表,客户端
  dict *watched_keys;
}

对数据库进行修改的命令,执行后会调用touchWatchKey,打开客户端的REDISDIRTYCAS标识,表示客户端的事物安全性已被破坏。

1
2
3
4
def touchWatchKey(db, key):
  if key in db.watched_keys:
    for client in db.watched_keys[key]:
      client.flags |= REDIS_DIRTY_CAS

判断事务是否安全 * 客户端向服务器发送EXEC命令 * 客户端的REDISDIRTYCAS标识是否已经打开 + 是,拒绝执行事务 + 否,执行事务

ACID * 原子性。不支持事务回滚,即使队列中间的命令出错,后面的也会执行。 * 一致性。数据符合数据库的定义和要求,没有包括非法数据。 * 隔离性。单线程。 * 持久性。只有AOF下appendfsync=always时才持久。RDB不能保证数据第一时间写入硬盘。