- 版本:第2位版本号为奇数是非稳定版,偶数才是稳定版本
- https://download.redis.io/releases/
redis7新特性
- multi-AOF: 7之前的版本AOF只有一个文件,现在有多个处于同一目录的AOF文件
- RDB文件格式更新,不兼容老版本的RDB格式
- redis function: 用来替代lua脚本的,暂时用处不多
- ACL: 更细粒度的ACL,能通过selector选择命令集合,对命令集合进行鉴权
10大数据类型
- string
- hash
- list:双端队列
- set:底层是intset或hashtable
- sorted set:每个元素关联一个double类型的分数都set,根据分数排序。
- geo
- hyperloglog:基数统计,使用极少存储空间来统计大量不重复元素的个数
- bitmap
- bitfield:位域,支持范围操作比特位
- stream:redis版MQ
zset
- zrange 是根据从0开始的索引查询,zrangeByScore是根据分数查询
- zadd除了能添加,还能更新,其中GT表示只能更新为更大的分数,如果提供更小的分数则不更新,LT则相反,GT和LT不影响添加
- zadd返回值是新增元素的个数,如果元素已存在则是修改,返回0,CH可以将返回值改为已修改元素的个数
- zmpop: redis7新增,接收多个zset key,弹出第一个非空zset的n个元素, 默认1个,需要指定MIN还是MAX,弹出分数最小或最大的
bitmap
- 用于存储二值类型的数据,以字节为单位,底层是string,可以用strlen获取字节数
hyperloglog
- pfadd返回1表示hll的统计个数增加了,返回0表示不变,pfmerge合并2个hll。底层是string类型
geo
- 添加经纬度和值,底层是zset,经纬度会被编码成一个52位的整数,对应zset中的分数
- geohash返回的是52位整数转换后的字符串,相同前缀的geohash表示在附近
- geosearch: redis6新命令,用于替代georadius,查询附近的地点,可以根据成员名称或经纬度来查询,可以返回附近地点名称、距离、经纬度
list
- blpop: 一个客户端可以同时监听多个list,返回从左往右第一个不为空的list中的list名称和其中的一个元素;多个客户端可以同时监听一个list,获取元素的优先级取决于该命令请求的顺序,先请求的先获取到,再次请求优先级会重置,其它客户端会先消费到;当一个客户端同时监听多个空list,这些list通过事务或lua脚本同时写入元素时,客户端先获取到第一个写入的list的一个元素
stream
- 和list的区别:list每条消息只能保存一个值,而stream能保存多个键值对。
- 和pubsub的区别,pubsub的消息不会持久化,消费者断线会丢失消息,无ack机制,消费出错无法重新消费。stream会持久化消息,有ack
- xadd : 向stream添加一条内容为一个或多个键值对的消息,不存在则创建,如果指定了NOMKSTREAM,则不创建新的stream。*表示自动生成ID,ID由2部分组成,-分割,如果只指定一个整数n,生成的ID是n-0,ID必须单调递增。自动生成ID时,第一部分是时间戳,第2部分是系统时间小于等于当前stream最大时间戳时,递增的值,用于防止时钟回拨或者主从切换时主从系统时间不一致。支持添加消息的同时指定淘汰策略,相当于同时执行xadd和xtrim
xtrim
- 淘汰老消息,可以通过剩余个数或ID来淘汰,返回淘汰了多少条消息
- 根据剩余个数淘汰:剩下多少个最新的消息,其他老消息都淘汰,如果指定的个数大于stream消息总数,则不淘汰任何消息
- 根据ID淘汰:小于该ID的消息都淘汰,等于该ID的不淘汰
- 不指定和"="表示精确淘汰,
- 近似淘汰: "~"表示近似淘汰,淘汰的消息会小于等于精确淘汰的个数。近似淘汰时,还可以指定最大淘汰个数,默认是100*宏节点个数,0表示不限制最大淘汰个数。
xrange
- 根据起始和结束ID范围查询stream,默认闭区间,-和+分别表示最小ID和最大ID
- 可以通过COUNT指定查询最大消息数,返回原本结果集中的前n条消息
- redis6.2新特性:通过"(“和”)"指定开区间,之前的版本需要手动将开区间边界增加后继续查询,来实现开区间
- 因为ID分为2部分,如果只使用第1部分num,起始边界表示num-0,结束边界表示num-[第2部分最大值]
- 开始和结束ID相同时,表示等值查询,如果只指定ID第一部分,则仍是范围查询
xdel
- 根据ID删除消息,返回删除的消息个数,stream底层是radix tree存储,多条消息存储在同一区块中,删除是标记删除,当区块中所有消息都标记为删除时,才会真正删除区块
- radix tree用于优化空间
- https://ivanzz1001.github.io/records/post/data-structure/2018/11/18/ds-radix-tree
xlen
- 获取stream中消息个数,空stream和不存在的key都返回0,需要通过exists或type查询key是否存在,允许空stream存在,其他数据类型(list,set,map,zset)元素为空时,key会自动删除
xread
- 查询大于指定ID的一个或多个steam中的消息,需要为每个stream指定起始ID,可以指定每个stream查询的最大条数COUNT
- 一般用于迭代查询,从0开始,每次查询上一次返回的最大ID,返回nil时迭代结束
- 和xrange的区别:xrange只能查询单个stream,可以指定结束ID
- 只指定ID的第一部分n,等价于n-0
- 可以通过BLOCK指定阻塞读取,超时时间的单位是ms,0表示无限等待,多个客户端阻塞读取时,新消息会分别发给所有阻塞读取的客户端,它们会收到同样的消息
- 读取最新消息:将ID指定为" " ,注意:只能第一次读取时使用 ",注意:只能第一次读取时使用 ",注意:只能第一次读取时使用,后续读取不能用 ,而是用上一次返回的最大消息 I D ,如果用 ,而是用上一次返回的最大消息ID,如果用 ,而是用上一次返回的最大消息ID,如果用,会漏掉2次读取之间的所有消息。
xgroup create
- 创建消费者组,并且关联一个stream,指定消费者组起始消息ID,可以指定具体ID或$,后者表示从最新消息开始消费
- 如果消费者组已经和该stream关联,报错:busy group。如果stream不存在,报错
- 可以执行多次xgroup create为同一个消费者组关联多个stream。
- $表示创建该关联关系之后的所有消息能被组中消费者消费到,而不是第一次xreadgroup之后的消息
xreadgroup
- 消费者组消费,和xread的区别:1、多了消费组名称和消费者名称,同一消费组不会消费相同消息。2、也可以指定多个stream和多个起始ID,但必须是消费组关联的stream的子集,真正的起始ID是消费者起始ID和消费组的起始ID的较小值。比如消费组指定起始ID为n1,消费者起始ID为n2,起始ID是min(n1,n2),n2=0表示从n1消费,n1是$且n2是>时,表示消费尚未消费的消息
- $和>的区别:前者用于创建消费组,表示比最新消息ID更大的;后者用于xreadgroup,表示消费尚未消费的消息。前者不能用于循环消费,否则会丢失读取间隔中投递的消息
- id=0用于重新消费未ack的消息,id=">"表示消费未投递过的消息
xack
- 确认某条ID在某个stream的消息在消费组已被消费
- xreadgroup消费的消息ID会保存到pending entry list(PEL)中,xack确认后消息ID才回从PEL删除
- 未确认的消息可以通过xreadgroup指定id=0读取到
xpending
用于查询PEL,有2种形式:
- 只指定消费组和stream:查询统计信息,PEL种id的个数,最小和最大id,以及其中的id对应的消费者名称和id个数。
- 指定开始和结束id、COUNT:查询详细信息,每个未消费消息的id、对应消费者、距离上次投递的毫秒时间、投递次数。还可以只查询某个消费者未消费的消息。可以指定IDLE n,表示只查询距离上次消费的毫秒时间大于n毫秒的pending消息,用于查询长时间未投递的消息,然后使用XCLAIM
- 支持迭代查询:每次使用上一次返回的最大ID和COUNT查一部分
- 使用xreadgroup,指定ID=0可以重新消费PEL中的消息,并增加PEL中重新投递次数
- 删除PEL:只能通过xack,xdel或xtrim只能删除消息内容,,PEL仍保存了消息ID、消息所属的消费者等信息,但通过xreadgroup查询时,只能查到ID,内容为nil,因此xreadgroup消费消息时,需要判断消息体是否为空。
xclaim
- 用于修改PEL中消息所属的消费者,当某个消费者宕机无法恢复时,让其他消费者消费它的PEL消息
- 需要指定消费组、stream名称、消费者名称、最小空闲时间、一个或多个消息ID,默认会返回空闲时间大于指定时间的消息ID和内容
- 不在PEL或在PEL已删除的消息,不会返回
- xclaim会重置空闲时间,递增投递次数,xreadgroup也会,如果xclaim指定了JUSTID,那么只返回消息ID,不递增投递次数,但会重置空闲时间
- xclaim的其他选项用于AOF使用
- 因为PEL是stream和消费组级别的,所以不能xclaim其他消费组的消息
- 空闲时间的作用:多个消费者同时对一条消息xclaim,只有一个会成功,因为其中一个成功xclaim后重置了空闲时间,其他消费者设置了最小空闲时间,从而防止重复xclaim
- 可以直接在每个消费者通过xpeding+xclaim来消费同组其他消费者空闲时间过长的消息,然后xack
- 6.2新特性:xclaim已经删除的消息,不会返回该消息,并自动从PEL中删除
源码阅读环境
- make CFLAGS=“-g -O0” -j8, 禁用编译器优化,用于源码阅读环境
- daemonize no
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "(gdb) Launch",
"type": "cppdbg",
"request": "launch",
"program": "/home/cwj/redis-7.2.5/src/redis-server",
"args": ["/home/cwj/redis-7.2.5/redis7002.conf"],
"stopAtEntry": false,
"cwd": "/home/cwj/redis-7.2.5/src",
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
},
{
"description": "Set Disassembly Flavor to Intel",
"text": "-gdb-set disassembly-flavor intel",
"ignoreFailures": true
}
]
}
]
}
rdb
- rdb默认目录是当前目录,名称是dump.rdb,可以通过dir和dbfilename配置修改
- fork系统调用:用于创建子进程,一次调用会产生父进程和子进程中的2次返回,父进程返回子进程pid,子进程返回0,可以通过判断返回值来区分父进程和子进程的执行逻辑。创建子进程时,子进程的虚拟地址空间指向父进程的物理空间,复用父进程的代码块和数据块。写时复制:只有当写入某个内存页时,才会复制对应内存页的物理空间
- save配置:可以有多个配置对象,每个对象由2个字段组成,单位是秒数和脏数据的个数,当距离上一次成功生成rdb的时间大于save配置的秒数,且脏数据的个数大于等于save配置的个数时,触发生成rdb快照。如果配置了多个save,每次会遍历这些配置,如果有一个满足则触发
- 如果上一次生成失败了,则尝试次数大于5次才会触发
- 脏数据的个数:不是key的个数,而是修改、新增、或删除的、值或键值对的个数,比如lpush多个值,这些值的总数加进脏数据个数
- RDB快照中保存了魔数、内存数据集、结束符,如果此时有slave,还会保存复制ID和offset
REDIS0011ú redis-ver^E7.2.5ú
redis-bitsÀ@ú^Ectime½Ýtfú^Hused-memÂÀô^X^@ú^Nrepl-stream-dbÀ^Aú^Grepl-id(865c8c634d46ce00df84c36c02727863b322723dú^Krepl-offset¤Ë^F^@ú^Haof-baseÀ^@þ^@û^A^@^@^Bk1^Bv1þ^Aû^A^@^@^Ckk1^Cvv1ÿ¤%ÚIºµ8|
void bgsaveCommand(client *c)
rdbPopulateSaveInfo(&rsi)
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi)
rdbWriteRaw(rdb,magic,9)//魔数
rdbSaveInfoAuxFields(rdb,rdbflags,rsi)//版本、创建时间、replicationId,offset
rdbSaveDb(rdb, j, rdbflags, &key_counter)//内存数据集
rdbSaveType(rdb,RDB_OPCODE_EOF)//结束符,255
if (server.dirty >= sp->changes && // 脏数据个数
server.unixtime-server.lastsave > sp->seconds && //上一次成功生成rdb的时间
(server.unixtime-server.lastbgsave_try > //上一次尝试生成rdb的时间
CONFIG_BGSAVE_RETRY_DELAY || //5次
server.lastbgsave_status == C_OK))
{
rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE);
break;
}
- 会先生成一个临时文件,fsync后,再重命名为rdb文件
- stop-writes-on-bgsave-error:默认为yes,rdb生成失败时,拒绝客户端写入命令
- rdb-del-sync-files:只有关闭rdb和aof时才会使用该配置,默认为false,为true时表示是否自动删除主从复制时master和slave生成的rdb文件
什么时候会触发rdb快照
- save配置条件满足时
- 执行bgsave命令
- save命令: 在主线程中生成rdb,一般不用
- 主从复制
- flushall: 如果配置了save,则立即触发一次rdb快照
- shutdown:如果没有shutdown save,并且没有配置save,则不rdb,否则会rdb
注意:
- flushdb只会将删除的key的个数加到脏数据个数中(server.dirty),不一定会触发rdb,而是要满足save配置条件触发
- 将save设置为空字符串"",表示关闭rdb快照自动生成,bgsave和save不受影响仍能生成快照。
void flushdbCommand(client *c);
server.dirty += emptyData(c->db->id,flags | EMPTYDB_NOFUNCTIONS,NULL);//将删除的key的个数加到脏数据个数
removed += dictSize(dbarray[j].dict);
- redis命令的实现在redis.h中声明了,xxCommand函数
优缺点
- rdb好处:格式紧凑的单个文件,是整个数据集的全量快照,利于归档备份和恢复,比如可以每小时归档一次到高可用存储上,是二进制格式的,读取速度快,相比于AOF,启动时的加载速度更快
- 缺点:因为save要满足一定条件才会触发,异常宕机时,会丢失上一次快照到当前的的数据。除了save命令、shutdown、flushall,其他方式都是通过fork子进程的方式bgsave,fork调用期间父进程不能处理客户端命令,另外如果数据集很大,并且生成rdb期间有大量命令写入,会触发大量写时复制,增加系统负载,可能出现卡顿
AOF
- aof默认关闭,通过appendonly yes开启
- appenddirname:redis7新特性,之前的版本和rdb在同一目录,AOF文件所在目录,相对于dir目录,默认是appendonlydir
- appendfsync:aof刷盘策略,always,everysec,no
- appendfilename:AOF文件名称,默认是appendonly.aof,7之前的版本只有一个aof文件。redis7新特性:mutil aof,会生成3个aof文件,appendonly.aof.[num].base.rdb、appendonly.aof.[num].incr.aof、appendonly.aof.manifest
AOF写入
- 执行一条写命令后,先将该命令写入aof缓冲区中。写命令:和rdb判断脏数据的个数相同,执行该命令前后脏数据个数(server.dirty)增加了,将key修改为和原来相同的值,也会增加脏数据个数。
- 写入aof:serverCron大循环中将缓冲区中的命令写入aof文件,写入后清空缓冲区
- 刷盘:每秒刷盘:刷盘则是在单独的线程中进行,刷盘时会判断是否已有aof刷盘线程,确保只有一个该类型线程。always:同步刷盘,从缓冲区写入aof文件之后立即fsync
//server.c, aof缓冲(server.aof_buf)生成路径
void call(client *c, int flags)//redis命令处理函数
void alsoPropagate(int dbid, robj **argv, int argc, int target)//判断target是否包含PROPAGATE_AOF,如果包含,则将命令追加到server.also_propagate
void afterCommand(client *c)
void postExecutionUnitOperations(void)
static void propagatePendingCommands(void)//将server.also_propagate追加到server.aof_buf
static void propagateNow(int dbid, robj **argv, int argc, int target)
void feedAppendOnlyFile(int dictid, robj **argv, int argc)
serverCron
flushAppendOnlyFile//serverCron大循环中将缓冲区中的命令写入aof文件
aof_background_fsync(server.aof_fd)
bioCreateFsyncJob(fd, server.master_repl_offset, 1)
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
//同步刷盘
if (redis_fsync(server.aof_fd) == -1) {
//调用fsync进行刷盘
serverLog(LL_WARNING,"Can't persist AOF for fsync error when the "
"AOF fsync policy is 'always': %s. Exiting...", strerror(errno));
exit(1);
}
} else if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync) {
//每秒刷盘,并且系统时间大于上次刷盘时间
if (!sync_in_progress) {
//没有正在执行的aof刷盘线程
aof_background_fsync(server.aof_fd);
}
server.aof_last_fsync = server.unixtime;
}
AOF重写
- 需要fork子进程重写,因为重写是根据内存数据集的每个key生成对应redis命令,而不是原来的aof文件。
- 重写前,父进程先创建一个临时文件,将aof fd指向它,重写期间向该文件写入增量命令;重写子进程也创建一个临时文件,根据内存数据集的每个key生成对应redis命令,比如string对应set,list对应rpush,map对应hmset,set对应sadd,zset对应zadd,stream对应xadd、xgroup create
- aof文件包括基础aof、增量aof和清单aof,基础aof是每次重写生成的,其中也是redis命令,是重写时的内存数据集快照;增量aof是客户端发送的写入命令转换为redis协议格式,包含重复数据;清单AOF记录了3种类型的文件:增量aof和清单aof,以及历史aof文件
- 历史aof:重写以前的增量aof和清单aof,因为文件删除是异步线程中进行的,所以需要先保存历史文件信息,以便删除失败时恢复
- 重写成功后,父进程重命名文件,将新文件和历史文件名称加入清单文件,触发异步删除
int rewriteAppendOnlyFileBackground(void)
void openNewIncrAofForAppend()//创建用于重写期间追加的增量aof
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi)//使用rdb重写,rdbSaveRio和rewriteAppendOnlyFile只会执行一个
int rewriteAppendOnlyFile(char *filename)//fork出来的子进程执行它
serverCron
checkChildrenDone//检查重写子进程是否结束,如果结束,调用backgroundRewriteDoneHandler
void backgroundRewriteDoneHandler(int exitcode, int bysignal)
sds getNewBaseFileNameAndMarkPreAsHistory(aofManifest *am)//修改aof清单文件,将当前base aof加到历史列表,创建一个新的后缀数值递增的base aof
sds getNewIncrAofName(aofManifest *am)//创建一个新的后缀数值递增的incr aof,加到incr aof list
void checkChildrenDone(int exitcode, int bysignal)
//通过waitpid系统调用,检查重写子进程是否结束,如果waitpid返回0,则表示结束,传参-1表示等待任意子进程结束,WNOHANG表示不阻塞等待
if ((pid = waitpid(-1, &statloc, WNOHANG)) != 0){
backgroundRewriteDoneHandler()
}
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
snprintf(tmpfile, 256, "temp-rewriteaof-bg-%d.aof",
(int)server.child_pid);
sds new_base_filename = getNewBaseFileNameAndMarkPreAsHistory(temp_am);
rename(tmpfile, new_base_filepath);//
sds new_incr_filename = getNewIncrAofName(temp_am);
rename(temp_incr_filepath, new_incr_filepath);
aofManifestFreeAndUpdate(temp_am);//更新清单文件
aofDelHistoryFiles();//删除历史文件
}
- auto-aof-rewrite-percentage:默认100%,当前增量aof相比于基础aof增长了百分比大于等于多少时,自动触发重写。比如100%表示增量aof相比基础aof增长了100%,也就是增量aof是基础aof的2倍时
- auto-aof-rewrite-min-size:默认64m,当前增量aof大于该值时,自动触发重写
- 上面2个配置同时生效时,才触发自动重写
- rdb快照生成和aof重写不会同时进行,同一时间只会有一个rdb或aof子进程
- bgrewriteaof:手动触发重写的redis命令,不论是否开启aof,都会触发重写
- aof_use_rdb_preamble:混合持久化,使用rdb格式重写,默认true,使用rdb格式重新aof,生成的基础aof以rdb为后缀,如果关闭则以aof后缀
- no-appendfsync-on-rewrite:重写期间是否同步
int aof_rewrite_perc; //auto-aof-rewrite-percentage
off_t aof_rewrite_min_size; //auto-aof-rewrite-min-size
if (server.aof_state == AOF_ON &&// 开启aof
!hasActiveChildProcess() && //没有其他子进程,比如rdb获取其他aof子进程
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)//aof增量文件当前大于aof_rewrite_min_size
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;// 上一次重写后aof大小,或第一次启动时基础aof大小
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc && !aofRewriteLimited()) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();//重写
}
}
if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(SLAVE_REQ_NONE,&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
} else {
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}
- AOF优点:数据更完整,每秒fsync时,最多丢失1秒的数据。是只追加的文件,掉电只会影响最后一条命令,易于修复。增量文件过大时,支持自动重写,重写通过fork子进程进行,重写期间父进程仍能相应客户端请求。增量文件中保存的是ascii格式的内容,易于手工修复
- AOF缺点:文件体积更大,即使使用了rdb重写,重写前仍是增量命令的格式。每秒fsync的对性能影响比rdb更大
- rdb和AOF同时开启时,2者都会写入,rdb单文件适合备份。启动时,优先加载AOF
事务
multi exec discard
- 事务是一组操作,要么全部成功,要么全部失败,并且根据隔离级别,执行过程中具备一定程度的排他性
- redis事务是一个队列中,一次性、顺序性、排他性执行的一组命令,客户端先发送命令进入队列,提交时再一起顺序执行,因为redis是单线程的,事务命令执行期间不会有其他命令执行,其他命令在事务之后执行
- redis事务不能嵌套执行,事务通过exec或discard结束前不能再次执行multi
- 执行multi命令实际上就是开启客户端的事务标志位,执行命令时会先判断客户端是否处于事务状态,如果是,则先不执行该命令,而是将它写入客户端对应的队列中
- exec执行之前,入队的命令报错,会导致整个事务不执行,比如语法错误,内存不足等,返回exec abort;exec执行的报错只有对应报错的命令不执行(部分成功),因此redis事务并没有做到完全的原子性, 未执行的命令返回nil
- 事务不会执行阻塞命令,比如blpop,如果执行了,阻塞命令不执行,返回nil,但不会影响事务中其他非阻塞命令的执行
- exec返回值:依次对应每条事务命令的返回值
void multiCommand(client *c) {
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"MULTI calls can not be nested");//这里说明事务不能嵌套执行
return;
}
c->flags |= CLIENT_MULTI;//执行multi命令实际上就是开启客户端的CLIENT_MULTI标志位
addReply(c,shared.ok);
}
typedef struct client {
multiState mstate;//每个客户端对应一个事务状态
}
typedef struct multiState {
multiCmd *commands; /* 开启事务后,发送的命令保存的队列 */
} multiState;
typedef struct multiCmd {
//入队命令时已经正确解析了命令和参数
robj **argv;//命令的参数
int argv_len;
int argc;
struct redisCommand *cmd;//redis命令
} multiCmd;
watch
- watch:用来对key加乐观锁,只有当key在watch和mutil期间未被修改,事务才能执行成功,否则整个事务不执行。使用场景:需要将一个key查出来做一些修改再保存,不希望修改期间其他客户端修改
- watch不能在事务中执行,可以watch多个key,可以监听当前Redis库不存在的key。
- 什么时候整个事务不执行:1、执行discard。2、exec执行前,命令入队时报错。3、存在任一个watch的key被修改或者过期了
- redis不支持事务回滚,discard只是不执行进入队列的命令
typedef struct redisDb {
dict *watched_keys;//key是对应redis库的所有正在被监听的key,value是一个list,其中的元素简单理解就是client
}
typedef struct watchedKey {
//每个客户端每监听一个key,就会创建一个watchedKey
listNode node;//
robj *key;//被监听的key
redisDb *db;//被监听的key的客户端选择的数据库
client *client;//被监听的key的客户端
unsigned expired:1; /* Flag that we're watching an already expired key. */
} watchedKey;
typedef struct client {
list *watched_keys;//watchedKey的list,表示对应客户端正在监听的所有的key
}
void signalModifiedKey(client *c, redisDb *db, robj *key)//所有key修改时,会调用该方法
void touchWatchedKey(redisDb *db, robj *key)//从db.watched_keys获取key对应的监听客户端,将flag的CLIENT_DIRTY_CAS打开
void execCommand(client *c) {
//...
if (isWatchedKeyExpired(c)) {
//watch的key过期了,将CLIENT_DIRTY_CAS打开,使整个事务不执行
c->flags |= (CLIENT_DIRTY_CAS);
}
if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) {
if (c->flags & CLIENT_DIRTY_EXEC) {
addReplyErrorObject(c, shared.execaborterr);//CLIENT_DIRTY_EXEC打开了,对应exec执行前,命令入队时报错了,整个事务不执行
} else {
addReply(c, shared.nullarray[c->resp]);//flag的CLIENT_DIRTY_CAS打开了,说明存在监听的key被修改了,调用discardTransaction,整个事务不执行
}
discardTransaction(c);
return;
}
c->flags |= CLIENT_DENY_BLOCKING;//不执行阻塞命令
for (j = 0; j < c->mstate.count; j++) {
c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv;
c->argv_len = c->mstate.commands[j].argv_len;
c->cmd = c->realcmd = c->mstate.commands[j].cmd;
call(c,CMD_CALL_FULL);//依次执行队列中的命令,某个命令执行报错,不影响其他命令的执行
}
//...
}
管道
作用:减少命令的往返时间,通常redis命令的处理包括发送命令、执行命令、返回响应,当需要一次性发送大量命令时,每条命令都需要一次发送和响应,网络通信时间占用了较大比例的总时间。管道将这些命令一次性打包发送,redis执行之后,再打包返回执行结果,
- 批量发送提高了网络IO的效率,单条命令的方式,每条命令都需要read和write系统调用,而管道总共只需要一次
- 单条命令的方式通常需要等待上一个命令的执行结果,再执行下一条命令,而管道一次性批量接受所有命令的响应
- redis服务端会缓存管道的命令的执行结果,因此一次管道执行的命令数不宜过多,否则占用过多服务端内存,可以每次发送10k命令(如果每个命令10B,就是100条命令)
- redis-cli --pipe从标准输入读取redis命令
- 和事务的区别:只会依次执行,不保证原子性
- 和原生批量命令的区别:原生批量命令只支持同种类型的key,管道支持不同类型的key
- 和lua脚本的区别:1、lua脚本能保证原子性,管道不保证原子性。2、如果需要上一条命令的执行结果,管道实现不了,只能脚本。3、脚本可以不返回中间结果,管道会返回每条命令的结果。
- 管道其实是由客户端实现的,服务端感知不到,服务端仍然发送一条命令处理一条,返回执行结果,客户端做了2件事情:1、发送时安装一定大小的批次发送,批次大小可以自定义。2、不阻塞等待每条命令的执行结果,而是将一个批次的命令发送后,再一并等待。
cat pipeline_command.txt | redis-cli --pipe
客户端实现
- redisTemplate有2个api使用管道:1、execute方法,pipeline参数设为true。2、executePipelined方法。这2种方法的区别:前者获取不到管道中每条命令的执行结果,只适用于只写命令的管道;后者的返回值是List,对应每条命令的执行结果
- executePipelined的RedisCallback必须返回null,否则报错。
- executePipelined其实内部还是调用了execute,通过closePipeline得到命令的执行结果
- redisTemplate.execute的第2个参数为true时,RedisCallback中获取到的连接是未经过代理的对象,比如LettuceConnection,为false时是经过代理的对象,CloseSuppressingInvocationHandler拦截了close方法,防止业务代码误关闭连接
- openPipeline:执行管道命令前调用,纯客户端操作,初始化结果列表和刷新策略,将连接标记为管道,以便后续使用专有连接。closePipeline:等待所有命令的执行结果,将结果保存到List返回
- 刷新策略:默认每条命令发送后都会冲刷网络缓冲区,可以配置成关闭管道时冲刷,或者发送固定命令数后冲刷
- Lettuce中,非管道和非事务命令,底层公用一个连接;管道和事务命令使用专有连接。因为管道和事务较耗时,为了避免影响其他线程执行命令。
- 管道和非管道命令的区别:1、前者使用专有连接,后者共享连接(对lettuce而言);2、前者发送命令后不阻塞等待执行结果,而是将命令执行结果的Future对象保存起来,closePipeline时再等待执行结果,后者执行每条命令阻塞等待执行结果
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
//...
if (pipeline && !pipelineStatus) {
connToUse.openPipeline();//执行命令前,openPipeline
}
T result = action.doInRedis(connToExpose);//调用RedisCallback,执行命令
// close pipeline
if (pipeline && !pipelineStatus) {
connToUse.closePipeline();//执行命令后,closePipeline,closePipeline返回值是命令的执行结果,这里忽略了
}
/...
return postProcessResult(result, connToUse, existingConnection);//execute的返回值是RedisCallback的返回值
}
public List<Object> executePipelined(RedisCallback<?> action, @Nullable RedisSerializer<?> resultSerializer) {
return execute((RedisCallback<List<Object>>) connection -> {
//实际上调用了execute,通过在RedisCallback中openPipeline和closePipeline来实现
connection.openPipeline();
boolean pipelinedClosed = false;
try {
Object result = action.doInRedis(connection);
if (result != null) {
//executePipelined的RedisCallback必须返回null,否则报错
throw new InvalidDataAccessApiUsageException(
"Callback cannot return a non-null value as it gets overwritten by the pipeline");
}
List<Object> closePipeline = connection.closePipeline();//将closePipeline的返回值返回
pipelinedClosed = true;
return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
} finally {
if (!pipelinedClosed) {
connection.closePipeline();
}
}
});
}
public class LettuceConnectionFactory {
private @Nullable SharedConnection<byte[]> connection;//非管道、非事务的共享连接,保存在连接工厂中,连接工厂是单例的
private SharedConnection<byte[]> getOrCreateSharedConnection() {
synchronized (this.connectionMonitor) {
//通过单例对象来共享
if (this.connection == null) {
this.connection = new SharedConnection<>(connectionProvider);
}
return this.connection;
}
}
class SharedConnection<E> {
private @Nullable StatefulConnection<E, E> connection;//底层真正的共享连接,仍是单例
StatefulConnection<E, E> getConnection() {
synchronized (this.connectionMonitor) {
if (this.connection == null) {
this.connection = getNativeConnection();
}
if (getValidateConnection()) {
validateConnection();
}
return this.connection;
}
}
}
}
public class LettuceConnection extends AbstractRedisConnection {
private @Nullable StatefulConnection<byte[], byte[]> asyncDedicatedConn;//专有连接
RedisClusterAsyncCommands<byte[], byte[]> getAsyncConnection() {
if (isQueueing() || isPipelined()) {
//当开启事务或管道时,底层使用专有连接,否则使用共享连接
return getAsyncDedicatedConnection();
}
if (asyncSharedConn != null) {
if (asyncSharedConn instanceof StatefulRedisConnection) {
return ((StatefulRedisConnection<byte[], byte[]>) asyncSharedConn).async();
}
if (asyncSharedConn instanceof StatefulRedisClusterConnection) {
return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncSharedConn).async();
}
}
return getAsyncDedicatedConnection();
}
}
private LettuceInvoker doInvoke(RedisClusterAsyncCommands<byte[], byte[]> connection, boolean statusCommand) {
if (isPipelined()) {
return new LettuceInvoker(connection, (future, converter, nullDefault) -> {
//...
pipeline(newLettuceResult(future.get(), converter, nullDefault));//管道,将命令执行结果的Future对象保存起来
//...
});
}
return new LettuceInvoker(connection, (future, converter, nullDefault) -> {
//...
Object result = await(future.get());//非管道:阻塞等待执行结果
//...
});
}
public void openPipeline() {
if (!isPipelined) {
isPipelined = true;//后续getAsyncConnection、doInvoke需要根据它判断是否为管道
ppline = new ArrayList<>();//保存执行结果的Future
}
}
public List<Object> closePipeline() {
isPipelined = false;//重置
List<io.lettuce.core.protocol.RedisCommand<?, ?, ?>> futures = new ArrayList<>(ppline.size());
for (LettuceResult<?, ?> result : ppline) {
futures.add(result.getResultHolder());
}
boolean done = LettuceFutures.awaitAll(timeout, TimeUnit.MILLISECONDS,
futures.toArray(new RedisFuture[futures.size()]));//等待结果
if (done) {
return results;
}
}
pubsub
- subscribe可以订阅多个通道,返回值由3部分组成:1、消息类型,subscribe命令对应的类型是subscribe,unsubscribe是unsubscribe,接收到消息是message。2、通道名称,表示订阅或接收到消息时的通道名称。3、subscribe或unsubscribe是当前订阅的通道数,接收到消息时是消息体
- 订阅了通道的连接只能发送固定的命令,RESP3支持所有命令
- 发布订阅和客户端连接选择的redis库无关,pub和sub的客户端选择了不同库,仍能接收到消息
- publish的返回值是接收到该消息的客户端数量
- psubscribe支持根据"*"通配符订阅通道,对应punsubscribe,取消订阅也必须使用同名带通配符的通道。psubscribe订阅的消息由4部分组成:1、消息类型,pubscribe。2、带通配符的通道名称。3、实际匹配到的通道名称。4、消息体
主从
-
replicaof:slave配置文件和命令中设置主节点。replicaof no one: 将从节点变为独立的主节点,旧版本slaveof
-
从节点是只读的
-
从节点过多会导致master带宽占用过高,从而影响客户端命令的发送和接受,解决方法:多级同步,slave向其他slave同步
-
主节点宕机,从节点不会自动切换,而是等待主节点上线,期间集群只能处理只读命令
-
从节点首次连接:会向master发送psync命令,全量复制,会清空原来的数据
-
全量同步:主节点收到psync命令后会触发生成RDB,同时将所有接收到的用于修改数据集的命令缓存起来,主节点完成RDB后,将RDB快照文件和所有缓存的命令发送到slave。slave接收到数据后,将其存盘并加载到内存
-
心跳:master向slave发送PING,间隔repl-ping-replica-period,默认10s
-
增量复制:master将新的收集到的修改命令依次发送给slave
-
断点续传:master会检查backlog中的offset,master和slave都会保存一个复制的offset和masterId,master只会把已经复制的offset后面的数据复制给slave
-
过期和内存淘汰的key也是通过master同步给slave进行删除
-
断点续传失败时,会全量同步
-
master向slave同步,以及slave确认命令已同步都是异步的
-
master可以不开启RDB快照只开启AOF,slave开启RDB,这样减少了master的磁盘IO,但是重启时要注意,如果master没有开启任何持久化配置,重启时会丢失所有的数据,然后slave也会同步清空。
-
master的数据集会对应一个随机数的复制ID和偏移量,偏移量是数据集的字节数,slave端也会保存复制ID和偏移量,PSYNC时,slave将复制ID和偏移量发送给master,master通过复制ID找到偏移量,判断slave的同步进度,如果master的backlog缓冲区包含slave所需的数据,则进行增量同步,否则全量同步。
-
复制ID的作用:主从切换时避免全量复制。当节点第一次成为master,或者从slave提升为master时,会重新生成复制ID,slave连接master时,会复制master的复制ID作为自己的,因此复制ID代表了一届master的数据版本,一个master和它的所有slave的复制ID相同。当slave提升为master时,其他slave因为复制ID和它相同,只需要进行增量复制。
-
当多台slave同时连master时,master只进行一次RDB。
-
SYNC命令不再使用,而是PSYNC
-
repl-diskless-sync:设置master无rdb同步
-
masterauth:从节点的配置,主节点的密码
-
min-replicas-to-write ,min-replicas-max-lag :主节点可以配置有多少个从节点存活时,才接受写命令,一定程度上可以确保数据安全,但由于同步是异步的,主节点返回写入成功时,从节点不一定写入成功。前者是从节点个数,后者是心跳时间距离当前时间的秒数小于配置值,表示存活
-
replica-serve-stale-data:从节点配置,默认为true,master下线时,从节点仍能响应读命令,为no时,从节点拒绝客户端读数据的命令,返回MASTERDOWN Link with MASTER is down and replica-serve-stale-data is set to ‘no’.
-
repl-diskless-sync:默认yes,为false时,全量同步时会fork子进程,子进程在磁盘上生成RDB文件,父进程将它发送给slave;为true时,全量同步时fork子进程,子进程不在磁盘上生成RDB,而是直接从内存中发送给slave。前者在生成RDB文件期间,连接进来的slave会进入队列等待,生成完成后将同一个RDB文件发送给队列中的slave;后者因为传输一旦开始,新的slave进入队列等待,传输完成后再处理队列中的slave,所以会等待几秒,以便并发处理。diskless适合慢磁盘高带宽的场景
-
repl-diskless-sync-delay:diskless同步,等待时间最长的slave大于等于该时间时
-
repl-diskless-sync-max-replicas:diskless同步,额外等待的过程中,如果等待的slave个数达到配置值时,不再等待,提前开始同步,默认0,不启用
-
repl-ping-replica-period:master向slave发送ping的间隔,单位秒
-
repl-backlog-size:master的复制缓冲区大小,越大则容忍slave断联的时间越长,第一个slave连接时,才会创建复制缓冲区
-
repl-backlog-ttl:当没有slave时,复制缓冲区的超时时间,超时释放,为0表示永不超时,默认1h
-
repl-diskless-load: slave diskless同步,slave接收到同步数据后,是否先保存到临时rdb文件再加载到内存数据集,默认disabled,先加载磁盘RDB,可能会导致全量加载慢;swapdb:直接解析sokcet中的RDB数据,解析过程中原有的数据集和正在加载的共存,可能会导致OOM;on-empty-db:原有的内存数据集为空时,才直接加载socket,否则先加载到磁盘
-
rdb-del-sync-files:是否删除从master同步过来的RDB文件,默认no不删除,为yes时,同步完成后并且rdb和aof都关闭时会自动删除
slave同步
516151:S 17 Jun 2024 17:18:35.619 * Connecting to MASTER 192.168.1.165:7000
516151:S 17 Jun 2024 17:18:35.619 * MASTER <-> REPLICA sync started
516151:S 17 Jun 2024 17:18:35.619 * Non blocking connect for SYNC fired the event.
516151:S 17 Jun 2024 17:18:35.619 * Master replied to PING, replication can continue...
516151:S 17 Jun 2024 17:18:35.619 * Trying a partial resynchronization (request 57238eb6eacfb06d6ef87777cd1a1548d6f513bb:1131).
516151:S 17 Jun 2024 17:18:40.962 * Full resync from master: 865c8c634d46ce00df84c36c02727863b322723d:0
516151:S 17 Jun 2024 17:18:40.964 * MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF to disk
516151:S 17 Jun 2024 17:18:40.964 * Discarding previously cached master state.
516151:S 17 Jun 2024 17:18:40.964 * MASTER <-> REPLICA sync: Flushing old data
516151:S 17 Jun 2024 17:18:40.964 * MASTER <-> REPLICA sync: Loading DB in memory
- PSYNC ,支持全量和增量同步的命令,PSYNC ? -1表示全量同步,返回值分为2部分: 1、FULLRESYNC:返回后续RDB对应的复制ID和RDB结束位置的偏移量。2、$num,RDB数据:num是RDB数据的字节数
- 如果master是diskless同步,则没有RDB字节数,而是EOF开头和结束的RDB数据流,因为此时传输开始时不能确定RDB的大小
psync ? -1
+FULLRESYNC 865c8c634d46ce00df84c36c02727863b322723d 87930
$186
REDIS0011 redis-ver7.2.5
redis-bitsewused-mem(-stream-dbzrepl-id(865c8c634d46ce00df84c36c02727863b322723d
-offsetzaof-base~v1V¨Ʉ5XshellXshell*1
$4
ping
struct redisServer {
根据配置replicaof,得到master ip和端口
char *masterhost;
int masterport;
int repl_state;//slave同步状态,对应枚举repl_state
connection *repl_transfer_s;//和master的连接
char master_replid[CONFIG_RUN_ID_SIZE+1];//psync命令,master返回的复制ID
long long master_initial_offset;//psync命令,master返回的offset,增量命令开始前的初始偏移量
int repl_transfer_fd;//全量同步临时RDB文件fd
char *repl_transfer_tmpfile;//全量同步临时RDB文件名称:temp-[时间戳]-[pid].rdb
off_t repl_transfer_size;//master发送的RDB的总大小,等于$num
off_t repl_transfer_read;//已经读取的RDB大小
client *master;//从master同步RDB到内存数据集后,根据repl_transfer_s创建它
}
//slave全量同步状态,流转顺序从前往后
typedef enum {
REPL_STATE_NONE = 0, /*初始状态 */
REPL_STATE_CONNECT, /* 配置了replicaof时,变为此状态 */
REPL_STATE_CONNECTING, /* 开始和master建立连接 */
/* --- Handshake states, must be ordered --- */
REPL_STATE_RECEIVE_PING_REPLY, /* */
REPL_STATE_SEND_HANDSHAKE, /* Send handshake sequence to master */
REPL_STATE_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */
REPL_STATE_RECEIVE_PORT_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_SEND_PSYNC, /* Send PSYNC */
REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
/* --- End of handshake states --- */
REPL_STATE_TRANSFER, /* Receiving .rdb from master */
REPL_STATE_CONNECTED, /* 除了建立TCP连接,还要PING成功、AUTH鉴权成功、PSYN发送成功、RDB数据接受成功,才算已连接master */
} repl_state;
typedef struct client {
//代表master的client, 通过replicationCreateMasterClient创建
uint64_t flags;//CLIENT_MASTER
long long reploff;//初始值为server.master_initial_offset,而创建master client是在同步完rdb之后,执行其他增量命令之前,所以初始值是FULLSYNC的值
}
- slave同步是异步的,建立连接和读数据都是通过注册事件回调函数实现的,不阻塞主线程
- 如果repl-diskless-load是disabled,会创建一个临时RDB文件,接收完毕后将它重命名为真正的RDB文件。
- RDB同步完成后,slave会创建一个代表和master连接的client结构,master相当于一个客户端发送增量同步命令。
//replication.c
void replicationSetMaster(char *ip, int port)//如果配置了replicaof,将repl_state设置为REPL_STATE_CONNECT
int connectWithMaster(void)//跟master建立连接,保存到repl_transfer_s,状态变为REPL_STATE_CONNECTING,设置建立连接处理器为syncWithMaster
void syncWithMaster(connection *conn)//建立连接和连接读数据的处理器
sendCommand(conn,"PING",NULL)//发送PING命令
int slaveTryPartialResynchronization(connection *conn, int read_reply)//发送PSYNC命令,并将接受到的返回值保存到master_replid和master_initial_offset,注意该方法调用2次,第一次read_reply=0,第2次为1,第一次负责发送,发送成功后返回PSYNC_WAIT_REPLY,第2次负责接收,接收成功后返回PSYNC_FULLRESYNC。
sendCommand(conn,"PSYNC",psync_replid,psync_offset,NULL)//发送PSYNC,全量同步为psync ? -1
connSetReadHandler(conn, readSyncBulkPayload)//设置读处理器为readSyncBulkPayload
void readSyncBulkPayload(connection *conn)//发送PSYNC命令后,接收RDB数据的处理器
long long emptyData(int dbnum, int flags, void(callback)(dict*))//清空内存数据集
rename(server.repl_transfer_tmpfile,server.rdb_filename)//将临时文件重命名为真正的rdb文件
rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION)//从真正的rdb文件加载到内存数据集
void replicationCreateMasterClient(connection *conn, int dbid)//根据repl_transfer_s创建master client,将
connSetReadHandler(server.master->conn, readQueryFromClient);//将连接的读处理器修改为readQueryFromClient
void replicationSendAck(void)//slave端RDB加载结束,给master发送REPLCONF ACK,将master client的reploff发送给master,从该偏移量开始增量同步
void syncWithMaster(connection *conn) {
//...
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());//临时RDB文件
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
server.repl_transfer_tmpfile = zstrdup(tmpfile);//文件名
server.repl_transfer_fd = dfd;//fd
//...
connSetReadHandler(conn, readSyncBulkPayload);//设置读处理器为readSyncBulkPayload
//初始化全量数据传输的状态信息
server.repl_state = REPL_STATE_TRANSFER;
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
}
void replicationCreateMasterClient(connection *conn, int dbid) {
server.master = createClient(conn);
if (conn)
connSetReadHandler(server.master->conn, readQueryFromClient);//设置读处理器
server.master->flags |= CLIENT_MASTER;//标识这是master client
server.master->authenticated = 1;//默认AUTH验证通过
//将reploff和read_reploff设为master发送的初始偏移量
server.master->reploff = server.master_initial_offset;
server.master->read_reploff = server.master->reploff;
server.master->user = NULL; /* This client can do everything. */
memcpy(server.master->replid, server.master_replid,//复制ID
sizeof(server.master_replid));
if (dbid != -1) selectDb(server.master,dbid);//尚不清楚
}
//同步命令
void readQueryFromClient(connection *conn) {
if (c->flags & CLIENT_MASTER) {
c->read_reploff += nread;//将读取的字节数加到read_reploff
atomicIncr(server.stat_net_repl_input_bytes, nread);
}
}
master同步
- slave client状态replstate流程:SLAVE_STATE_WAIT_BGSAVE_START->SLAVE_STATE_WAIT_BGSAVE_END,当发送psyn命令时,先处于等待bgsave状态,当没有正在运行的子进程时,等待结束,可以fork RDB子进程
- 对于socket RDB子进程,不会直接通过子进程将RDB数据发送给slave,而是通过管道发送给父进程,然后父进程发送给slave
- 延时批量处理:配置了repl_diskless_sync和repl-diskless-sync-delay时,实际上不会立即处理slave的psync请求,而是放到slaves中,然后在replicationCron中统一处理,它是定时1s执行的,从而实现一定程度的匹配执行psync
- slaveRDB同步完成后,会发送replconf ack命令,master将client加到待写列表,RDB同步期间的命令会写到复制缓冲区,随后主事件轮询中遍历待写列表,复制缓冲区的命令写到socket
- 复制缓冲区由复制缓冲块列表和索引组成,缓冲块中保存了命令数据,索引用来根据偏移量快速查找缓冲块
- 全量同步分为RDB同步和命令同步,增量同步的逻辑和命令同步相同
513691:M 17 Jun 2024 16:11:19.678 * Replica 192.168.1.165:6379 asks for synchronization
513691:M 17 Jun 2024 16:11:19.678 * Full resync requested by replica 192.168.1.165:6379
513691:M 17 Jun 2024 16:11:19.678 * Replication backlog created, my new replication IDs are '57238eb6eacfb06d6ef87777cd1a1548d6f513bb' and '0000000000000000000000000000000000000000'
513691:M 17 Jun 2024 16:11:19.678 * Delay next BGSAVE for diskless SYNC
513691:M 17 Jun 2024 16:11:24.622 * Starting BGSAVE for SYNC with target: replicas sockets
513691:M 17 Jun 2024 16:11:24.622 * Background RDB transfer started by pid 515414
515414:C 17 Jun 2024 16:11:24.624 * Fork CoW for RDB: current 0 MB, peak 0 MB, average 0 MB
513691:M 17 Jun 2024 16:11:24.624 * Diskless rdb transfer, done reading from pipe, 1 replicas still up.
513691:M 17 Jun 2024 16:11:24.683 * Background RDB transfer terminated with success
513691:M 17 Jun 2024 16:11:24.683 * Streamed RDB transfer with replica 192.168.1.165:6379 succeeded (socket). Waiting for REPLCONF ACK from replica to enable streaming
513691:M 17 Jun 2024 16:11:24.683 * Synchronization with replica 192.168.1.165:6379 succeeded
//client.replstate状态值:
#define SLAVE_STATE_WAIT_BGSAVE_START 6 //全量同步时,slave client初始状态
struct redisServer {
list *slaves;//全量同步的slave client列表
char replid[CONFIG_RUN_ID_SIZE+1];//40位的replicationId,第一个slave psync时生成,随机数
char replid2[CONFIG_RUN_ID_SIZE+1];//从master复制而来的replicationId,对于master为空
long long master_repl_offset; //当前复制偏移量
int repl_diskless_sync;//对应repl-diskless-sync
int repl_diskless_sync_delay;//对应配置repl-diskless-sync-delay
int rdb_child_type;//RDB子进程类型,RDB_CHILD_TYPE_NONE表示没有正在运行的RDB子进程,RDB_CHILD_TYPE_DISK:RDB子进程将RDB写到磁盘,RDB_CHILD_TYPE_SOCKET:直接写到socket
list *clients_pending_write;//待写的client列表
list *repl_buffer_blocks;//replBufBlock列表
replBacklog *repl_backlog;//backlog,用于存放全量同步,生成传输RDB时的客户端发送的增量命令
}
typedef struct replBacklog {
//复制缓冲区的索引,每64个缓冲块建立一个索引,加到前缀树,前缀树的key是缓冲块起始位置的偏移量,用来加速查找缓冲块链表
listNode *ref_repl_buf_node; /* 复制缓冲区第一个缓冲块节点 */
size_t unindexed_count; /* 复制缓冲区节点个数 */
rax *blocks_index; /* 基数树(前缀树),相当于一个key是string的map,并且key存在相同前缀 */
long long histlen; /* 复制缓冲区的大小,和master_repl_offset的区别:后者会一直递增,而复制缓冲区有可能释放清0 */
long long offset; /* 复制缓冲区起始位置的偏移量,offset+histlen就是结束位置的偏移量 */
} replBacklog;
typedef struct replBufBlock {
//复制缓冲区的缓冲块
int refcount; /* 引用计数,用于释放backlog */
long long id; /*递增的唯一ID */
long long repl_offset; /* buf起始位置相对于整个复制缓冲区的偏移量 */
size_t size, used;//buf总大小,已写入的大小
char buf[];//实际要写到socket的命令数据
} replBufBlock;
typedef struct client {
//slave client
int replstate;//复制状态,初始值为SLAVE_STATE_WAIT_BGSAVE_START
}
void syncCommand(client *c)//psync 和sync命令的实现,处理同一时间发送psync的slave,直接将RDB发送给它们,如果发送psync时已经开始了,或者配置了repl-diskless-sync-delay,暂不处理,在replicationCron中处理
int startBgsaveForReplication(int mincapa, int req)//当前没有正在运行的子进程时,执行该方法
int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi)//开启了diskless时
void replicationCron(void)
void replicationStartPendingFork(void)// bgsave发送RDB给slave
int shouldStartChildReplication(int *mincapa_out, int *req_out)//获取状态为SLAVE_STATE_WAIT_BGSAVE_START的slave,当slave个数和最大空闲时间满足条件时,返回1,
int startBgsaveForReplication(int mincapa, int req)
int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi)//最终和syncCommand都执行这个方法
replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset())//fork socketRDB子进程之前,先给每个slave发FULLRESYNC,修改repl为SLAVE_STATE_WAIT_BGSAVE_END,发送FULLRESYNC命令
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask)//父进程逻辑,读取管道中的RDB数据,socket RDB子进程将RDB数据写入管道
void replconfCommand(client *c)//处理slave同步rdb完成后发的ack命令
void replicaStartCommandStream(client *slave)//
void putClientInPendingWriteQueue(client *c)//将client加到待写列表clients_pending_write
int handleClientsWithPendingWrites(void)//主线程事件轮询中调用,遍历并移除待写client列表,将client的缓冲区真正写到socket
int writeToClient(client *c, int handler_installed)
int _writeToClient(client *c, ssize_t *nwritten)//对于其中slave client,从ref_repl_buf_node为头节点的链表开始,写到socket
void call(client *c, int flags)//redis命令处理函数
propagateNow//命令处理函数最终会调用该方法,该方法负责将命令写入AOF缓冲区和复制缓冲区backlog
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc)
prepareReplicasToWrite//将所有同步完RDB的slave加到待写列表clients_pending_write
void feedReplicationBuffer(char *s, size_t len)//将命令写入复制缓冲区,并建立索引
void syncCommand(client *c) {
//...
if (masterTryPartialResynchronization(c, psync_offset) == C_OK) {
server.stat_sync_partial_ok++;
return;//增量同步,直接返回,后面的逻辑都是全量同步
}
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;//初始状态
c->repldbfd = -1;//对应slave的RDB文件fd
c->flags |= CLIENT_SLAVE;//设置client flags为CLIENT_SLAVE
listAddNodeTail(server.slaves,c);
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
//第一个全量同步的slave发送psync时,创建replId和backlog
changeReplicationId();//生成40位随机的replid
clearReplicationId2();//清空replid2
createReplicationBacklog();//创建空的backlog
serverLog(LL_NOTICE,"Replication backlog created, my new "
"replication IDs are '%s' and '%s'",
server.replid, server.replid2);
}
if (server.child_type == CHILD_TYPE_RDB &&
server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
{
//发送pysnc时socket RDB子进程正在运行,暂不处理,在在replicationCron中处理
serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
/* CASE 3: There is no BGSAVE is in progress. */
} else {
if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF) &&
server.repl_diskless_sync_delay)
{
//配置了repl_diskless_sync和repl-diskless-sync-delay时,实际上不会立即处理slave的psync请求,而是放到slaves中,然后在replicationCron中统一处理,它是定时1s执行的,从而实现一定程度的匹配执行psync
serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
} else {
/* We don't have a BGSAVE in progress, let's start one. Diskless * or disk-based mode is determined by replica's capacity. */
if (!hasActiveChildProcess()) {
startBgsaveForReplication(c->slave_capa, c->slave_req);//没有开启repl_diskless_sync_delay,并且当前没有子进程,才会立即执行bgsave
} else {
serverLog(LL_NOTICE,
"No BGSAVE in progress, but another BG operation is active. "
"BGSAVE for replication delayed");
}
}
}
}
int shouldStartChildReplication(int *mincapa_out, int *req_out) {
if (slaves_waiting &&
(!server.repl_diskless_sync ||
(server.repl_diskless_sync_max_replicas > 0 &&
slaves_waiting >= server.repl_diskless_sync_max_replicas) || //状态为SLAVE_STATE_WAIT_BGSAVE_START的slave个数
max_idle >= server.repl_diskless_sync_delay)) //等待时间最长的slave的等待时间
{
if (mincapa_out)
*mincapa_out = mincapa;
if (req_out)
*req_out = req;
return 1;
}
return 0;
}
int _writeToClient(client *c, ssize_t *nwritten) {
//主线程事件轮询,获取待写client列表,调用
if (getClientType(c) == CLIENT_TYPE_SLAVE) {
//client slave处理逻辑
replBufBlock *o = listNodeValue(c->ref_repl_buf_node);//缓冲块
//...
*nwritten = connWrite(c->conn, o->buf+c->ref_block_pos,
o->used-c->ref_block_pos);//写到socket
//...
listNode *next = listNextNode(c->ref_repl_buf_node);
if (next && c->ref_block_pos == o->used) {
o->refcount--;
((replBufBlock *)(listNodeValue(next)))->refcount++;
c->ref_repl_buf_node = next;//缓冲块实际上指向全局backlog链表中的一个节点,获取下一个节点,继续写入
c->ref_block_pos = 0;
}
return C_OK;
}
//...
}
增量同步
- slave的复制ID和偏移量都会持久化到RDB,重启时如果能从RDB中加载到它们,则进行尝试进行增量复制,发送psync,master收到后,判断如果复制ID和当前复制ID相同,并且偏移量在复制缓冲区范围内,则将偏移量对应的缓冲块加到slave output buffer,随后写到socket
- 如何触发增量同步:1主1从,停止slave,向master写入命令,启动slave
1989082:S 21 Jun 2024 14:03:45.324 * Before turning into a replica, using my own master parameters to synthesize a cached master: I may be able to synchronize with the new master with just a partial transfer.
1989082:S 21 Jun 2024 14:03:45.324 * Ready to accept connections tcp
1989082:S 21 Jun 2024 14:03:45.324 * Connecting to MASTER 192.168.1.165:7000
1989082:S 21 Jun 2024 14:03:45.324 * MASTER <-> REPLICA sync started
1989082:S 21 Jun 2024 14:03:45.324 * Non blocking connect for SYNC fired the event.
1989082:S 21 Jun 2024 14:03:45.325 * Master replied to PING, replication can continue...
1989082:S 21 Jun 2024 14:03:45.325 * Trying a partial resynchronization (request 9cc9af3d6159c69ecb8e08477dbb4f50d1a46106:100).
1989082:S 21 Jun 2024 14:03:45.325 * Successful partial resynchronization with master.
1989082:S 21 Jun 2024 14:03:45.325 * MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.
1988502:M 21 Jun 2024 14:03:45.325 * Replica 192.168.1.165:7001 asks for synchronization
1988502:M 21 Jun 2024 14:03:45.325 * Partial resynchronization request from 192.168.1.165:7001 accepted. Sending 52 bytes of backlog starting from offset 100.
psync 9cc9af3d6159c69ecb8e08477dbb4f50d1a46106 100
+CONTINUE
*2
$6
SELECT
$1
0
*3
$3
set
$2
k2
$2
v2
struct redisServer {
client *cached_master;//RDB中能加载到复制ID和偏移量时,启动时就会创建cached_master,从而触发增量同步
}
typedef struct client {
//cache master client
long long reploff;//为RDB中加载的复制偏移量,即上次停止时的偏移量
char replid[CONFIG_RUN_ID_SIZE+1];//复制ID
}
void loadDataFromDisk(void)//启动时执行
replicationCacheMasterUsingMyself()//rdb快照中会保存复制ID和偏移量,加载rdb之后,根据它们创建cached_master
slaveTryPartialResynchronization
void replicationCacheMasterUsingMyself(void) {
server.master_initial_offset = server.master_repl_offset;//master_repl_offset从RDB中加载
server.master->reploff = server.master_initial_offset;//设置复制偏移量
memcpy(server.master->replid, server.replid, sizeof(server.replid));//设置复制ID
server.cached_master = server.master;//实际上创建的是cached_master
server.master = NULL;
}
int slaveTryPartialResynchronization(connection *conn, int read_reply) {
//...
if (server.cached_master) {
//如果创建了cached_master,尝试增量同步
psync_replid = server.cached_master->replid;//psync
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
}
//...
}
void syncCommand(client *c)//psync命令处理器
masterTryPartialResynchronization(c, psync_offset)//如果复制ID和当前复制ID相同,并且偏移量在复制缓冲区范围内,则将偏移量对应的缓冲块加到slave output buffer
哨兵
- 哨兵可以同时监控多个master,通过配置多个sentinel monitor [master name] [ip] [port] [quorum],master name的作用:配置分为2类,全局配置和master配置,master配置需要通过master name来指定在哪个master生效,master配置格式:sentinel <option_name> <master_name> <option_value>
- 哨兵启动使用专门的执行程序和配置:redis-sentinel sentinel.conf, 相当于redis-server sentinel.conf --sentinel,默认端口26379
- 启动后会修改redis server和哨兵的配置: 哨兵只需要配master名称和ip端口号,会自动发现slave, 并动态添加到哨兵的配置中,
- 主观下线:单台sentinel ping master超时;客观下线:quorum个哨兵认为master主观下线。客观下线时,哨兵会内部选举出一个leader(raft),由leader哨兵选择新master(failover),判断客观下线不一定需要过半sentinel节点,quorum个即可,而主从切换需要过半节点在线
- 选master:配置的优先级>repli offset>run id,sentinel leader对新master执行replicaof no one,其他slave执行replicaof 新master
- 主从切换时,会有一段时间出现master不可用
- sentinel需要客户端支持
- down-after-milliseconds: 主观下线超时时间
- parallel-syncs :主从切换后,多少台slave可以同时连master。数值越小,重新同步的时间越长,但如果配的过大,slave在同步期间会有一段时间不可用,导致集群可用性变差
- min-replicas-to-write,min-replicas-max-lag:master的配置,至少有几个slave在线时,master才接受写请求。目的是防止client、master和slave网络分区时,slave中选出新master,而client仍向旧master写数据,分区恢复后,旧master变为slave,清空自身数据同步新master,导致client写成功的数据丢失。副作用:当所有slave宕机时,master不能写入数据
- sentinel通过pub sub来向客户端实时推送master故障切换信息
- sentinel ckquorum : 检查哨兵是否满足quorum个在线
- sentinel failover : 手动切换master,不需要其他哨兵同意
- SENTINEL GET-MASTER-ADDR-BY-NAME :获取当前master ip 端口
- 配置动态修改:监控配置:SENTINEL MONITOR 。停止监控:SENTINEL REMOVE 。master配置:SENTINEL SET [ …]。全局配置:sentinel config set …。需要对所有哨兵都执行修改命令,不会自动同步动态修改的配置。
- 读取master所有配置:SENTINEL MASTER
- 全局配置:哨兵用户名密码登
- 移除哨兵:哨兵之间会相互发现,并保存到内部状态中,停止哨兵进程后,还需要在其他哨兵节点执行sentinel reset ,让它们清空内部状态重新发现,然后在每台哨兵执行sentinel master查看是否成功。移除slave也一样,停止slave进程后需要执行sentinel reset
解析master配置
- 读取哨兵配置:先按照正常配置读取配置文件,再将其中sentinel开头的配置单独生成一个数据结构
- redis配置和哨兵配置都会重写配置文件,将配置文件中没有配置的默认配置加上。重写使用追加的方式,先增加一行注释:# Generated by CONFIG REWRITE,再增加重写的配置
- 监听成功时,会在+monitor通道上发送消息
- redis实例(redis instance):master或slave实例
2700281:X 04 Jul 2024 18:06:01.334 * Sentinel new configuration saved on disk
2700281:X 04 Jul 2024 18:06:01.334 * Sentinel ID is f95862ad49d32156fae279d2a0e819a6fa9f40f9
2700281:X 04 Jul 2024 18:06:01.334 # +monitor master mymaster 127.0.0.1 7001 quorum 2
2700281:X 04 Jul 2024 18:06:01.335 * +slave slave 192.168.1.165:7002 192.168.1.165 7002 @ mymaster 127.0.0.1 7001
2700281:X 04 Jul 2024 18:06:01.418 * Sentinel new configuration saved on disk
2700281:X 04 Jul 2024 18:06:01.418 * +slave slave 192.168.1.165:7000 192.168.1.165 7000 @ mymaster 127.0.0.1 7001
2700281:X 04 Jul 2024 18:06:01.526 * Sentinel new configuration saved on disk
2700281:X 04 Jul 2024 18:09:02.152 * +fix-slave-config slave 192.168.1.165:7000 192.168.1.165 7000 @ mymaster 127.0.0.1 7001
2700281:X 04 Jul 2024 18:09:02.152 * +fix-slave-config slave 192.168.1.165:7002 192.168.1.165 7002 @ mymaster 127.0.0.1 7001
2700281:X 04 Jul 2024 18:09:12.167 * +slave slave 127.0.0.1:7000 127.0.0.1 7000 @ mymaster 127.0.0.1 7001
2700281:X 04 Jul 2024 18:09:12.246 * Sentinel new configuration saved on disk
2700281:X 04 Jul 2024 18:09:12.246 * +slave slave 127.0.0.1:7002 127.0.0.1 7002 @ mymaster 127.0.0.1 7001
2700281:X 04 Jul 2024 18:09:12.295 * Sentinel new configuration saved on disk
protected-mode no
port 5000
daemonize yes
pidfile "/var/run/redis-sentinel5000.pid"
loglevel notice
logfile "/home/cwj/redis-data/redis-sentinel5000.log"
dir "/home/cwj/redis-data"
sentinel monitor mymaster 127.0.0.1 7001 2
sentinel auth-pass mymaster 6
acllog-max-len 128
sentinel deny-scripts-reconfig yes
sentinel resolve-hostnames no
sentinel announce-hostnames no
# Generated by CONFIG REWRITE
latency-tracking-info-percentiles 50 99 99.9
user default on nopass sanitize-payload ~* &* +@all
sentinel myid f95862ad49d32156fae279d2a0e819a6fa9f40f9
sentinel config-epoch mymaster 0
sentinel leader-epoch mymaster 0
sentinel current-epoch 0
sentinel known-replica mymaster 127.0.0.1 7000
sentinel known-replica mymaster 192.168.1.165 7000
sentinel known-replica mymaster 127.0.0.1 7002
sentinel known-replica mymaster 192.168.1.165 7002
struct redisServer {
struct sentinelConfig *sentinel_config;//配置文件中,sentinel开头的配置
}
struct sentinelConfig {
//3个list的元素类型都是sentinelLoadQueueEntry
list *pre_monitor_cfg;//需要在加载master之前解析的配置,比如myid
list *monitor_cfg;//sentinel monitor开头的配置,也就是master ip端口号配置
list *post_monitor_cfg;//加载master之后解析的配置
};
struct sentinelState {
dict *masters;//根据sentinel monitor生成的master的信息,key是master name,value是sentinelRedisInstance,flags是SRI_MASTER
char myid[CONFIG_RUN_ID_SIZE+1];//哨兵ID
}
typedef struct sentinelRedisInstance {
//redis实例
int flags;//SRI_MASTER,SRI_SLAVE,SRI_SENTINEL
char *name;//master name
sentinelAddr *addr;//master ip端口
mstime_t down_after_period;//对应配置:down-after-milliseconds
int parallel_syncs;//对应parallel-syncs
char *auth_pass;//对应auth-pass
instanceLink *link;
}
struct rewriteConfigState {
//用于根据最终使用的配置重写配置文件
dict *option_to_line; /* 配置key到配置值的map,用于查找配置文件中是否有对应配置,以及值是否和当前启用的值相同,如果不同则触发重写 */
int numlines; /* 配置文件行数 */
sds *lines; /* 对应了配置文件的每一行,包括注释行 */
int needs_signature; /* 配置文件中是否已有# Generated by CONFIG REWRITE的信息,如果没有,重写配置时会新增一行这个信息 */
};
typedef struct instanceLink {
int disconnected;//初始为1,不为0表示需要重连
}
loadServerConfigFromString(char *config)//加载配置文件逻辑,redis配置和哨兵配置都是该方法,参数是配置内容
queueSentinelConfig(argv+1,argc-1,linenum,lines[i])//当第一个配置参数是sentinel时,执行该方法,将配置文件中sentinel开头的配置保存到server.sentinel_config
main()
loadSentinelConfigFromQueue()//根据server.sentinel_config中的顺序解析配置,先根据sentinel monitor配置创建sentinelState.master,再根据其他配置向其中填充属性
sentinelHandleConfiguration(entry->argv,entry->argc)//解析sentinel开头的配置
createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],atoi(argv[3]),quorum,NULL)//解析sentinel monitor,创建sentinelState.master
main()//哨兵程序启动
sentinelIsRunning()//sentinel mode开启时执行,生成sentinel.myid
sentinelFlushConfig(void)//sentinel配置发生改变,重写进配置文件
rewriteConfig(server.configfile, 0)//第一个参数是哨兵配置文件,前面已经解析了该文件生成congfigs,这一步是比较configs和配置文件,根据configs添加配置文件中未配置的配置项
sentinelGenerateInitialMonitorEvents()//遍历所有监听的master,发送监听事件消息
sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum)//发送单条消息并写入日志,第二个参数是消息的通道
1) "message"
2) "+monitor"
3) "master mymaster 127.0.0.1 7000 quorum 2"
- 和master建立2个连接,一个用于发送命令,另一个用于pubsub
3) "__sentinel__:hello"
4) "127.0.0.1,5002,538a48a26e2de7a5bc8e404f9cf9756b61571cb3,0,mymaster,127.0.0.1,7001,0"
typedef struct sentinelRedisInstance {
//redis实例
instanceLink *link;//和redis实例的连接
}
typedef struct instanceLink {
int disconnected;//初始为1,不为0表示需要重连
redisAsyncContext *cc;//hiredis上下文,hiredis是c实现的redis客户端,hiredis上下文包含了和服务端的连接、命令缓冲区,cc是命令连接,pc是pubsub连接
redisAsyncContext *pc;
}
serverCron()//定时任务
sentinelTimer()//sentinalMode时,执行
sentinelHandleDictOfRedisInstances(sentinel.masters)
sentinelHandleRedisInstance(sentinelRedisInstance *ri)
sentinelReconnectInstance(sentinelRedisInstance *ri) //和master建立2个连接,一个用于发送命令,另一个用于pubsub,订阅__sentinel__:hello通道
sentinelSendPeriodicCommands(sentinelRedisInstance *ri)//1、定时发送INFO和PING命令。2、
文章评论