AOF 持久化记录服务器执行的所有写操作命令,并在服务器启动时,通过重新执行这些命令来还原数据集。 AOF 文件中的命令全部以 Redis 协议的格式来保存,新命令会被追加到文件的末尾。 Redis 还可以在后台对 AOF 文件进行重写(rewrite),使得 AOF 文件的体积不会超出保存数据集状态所需的实际大小。Redis 还可以同时使用 AOF 持久化和 RDB 持久化。 在这种情况下, 当 Redis 重启时, 它会优先使用 AOF 文件来还原数据集, 因为 AOF 文件保存的数据集通常比 RDB 文件所保存的数据集更完整。你甚至可以关闭持久化功能,让数据只在服务器运行时存在。
AOF数据在redis中的整个流程:
redis接收客户端命令存储到aof_buf–>调用系统函数write()到系统缓冲区–>下刷到磁盘
1、aof_buf 内存buffer
aof_buf是redis开辟的内存缓冲区,从来存储从客户端过来的各种命令以及数据。存储的格式完全复制原命令,也就是c/s之间通信协议格式。如下:
*2
$6
SELECT
$1
0
*3
什么时候存储到aof_buf中呢 当客户端发送数据到redis时,会触发server端的读事件,在读时间处理函数中,会调用propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,int flags)将cmd命令以及各种参数写入到aof_buf中。
propagate又调用feedAppendOnlyFile函数执行具体的吸入aof_buf操作:
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
cmd, 解析完成的客户端命令机构体
dictid,操作的数据库id
argv 命令数据
argc 命令数据个数
sds buf = sdsempty();
robj *tmpargv[3];
/* The DB this command was targeting is not the same as the last command
* we appendend. To issue a SELECT command is needed. */
if (dictid != server.aof_selected_db) {
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}
如果当前命令的数据库id不是上一次append到buf时的命令,那么先向buf中写入select命令
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
/* Translate SETEX/PSETEX to SET and PEXPIREAT */
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else {
/* All the other commands don't need translation or need the
* same translation already operated in the command vector
* for the replication itself. */
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
catAppendOnlyGenericCommand,按照通信协议转换已经解析的数据,这个地方为什么在这个地方再重构数据,不是很理解,可能是个优化点,在server端解析之前,直接保存,难道不是更好么~~
将重构好的数据append到buf中
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
if (server.aof_state == REDIS_AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
将buf中的数据append到aof_buf中
/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file. */
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,(unsigned long)sdslen(buf));
这个地方检察rewrite进程是否在运行,如果在运行,因为父子进程的写时复制的问题,并且父进程在不断的写入数据,此时会导致父子进程中db数据不一致的问题,那么aserver.aof_rewrite_buf_blocks,缓冲区就是为了记录不一致的数据,在resize进程完成退出后,会将
aof_rewrite_buf_blocks数据,写入到aof文件中。保证数据的一致性!
到此写入到aof_buf已经完成~~
2、何时调用系统调用write函数
为了保证先于恢复客户端请求之前,将aof_buf中数据刷新到磁盘,在每次eventLoop事件循环之前调用beforeSleep函数,这个函数调用flushAppendOnlyFile(0); 将aof_buf数据write到系统缓冲区中。
nwritten = write(server.aof_fd,server.aof_buf,(unsigned int)sdslen(server.aof_buf));
3、什么策略将系统缓冲区的数据flush到磁盘中
resdis有三种flush策略:
AOF_FSYNC_ALWAYS:每次eventLoop循环之前flush一次
AOF_FSYNC_EVERYSEC:每秒flush一次
AOF_FSYNC_NO:完全依靠系统机制将缓冲区flush到磁盘中
针对
AOF_FSYNC_ALWAYS和
AOF_FSYNC_NO好理解
/* Perform the fsync if needed. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* aof_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
server.aof_last_fsync = server.unixtime;
}
其中server.unixtime是每秒系统时间的抽样,aof_background_fsync添加任务到任务队列中。
针对AOF_FSYNC_EVERYSEC情况,redis专门启动了一个线程来处理flush任务,redis提供了一个任务队列(链表),主线程和flush子线程共享此任务队列(链表),主线程
每隔一秒向队列中添加一条任务,子线程阻塞等待任务不为空,添加任务后,通过条件变量通知到子线程,子线程唤醒,开始flush数据。
4、write和flush操作的配合
首先获取sync_in_process 代表没有完成的job的数量,也就是flush任务队列的长度。
如果flush任务队列不为空,则redis会将此次write操作延时执行,,2秒内任务队列中还有任务,那么强制执行write操作。
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */
if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) {
/* No previous write postponinig, remember that we are
* postponing the flush and return. */
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
}
/* Otherwise fall trough, and go write since we can't wait
* over two seconds. */
server.aof_delayed_fsync++;
redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
如果设置了no-appendfsync-on-rewrite,这个标志位的意思是如果有子进程进行rewrite操作,那么停止flush操作,完全交个操作系统flush数据,也就是暂时AOF_FSYNC_NO。这可能会给刚刚write的数据带来不安全性。
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */
if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) {
/* No previous write postponinig, remember that we are
* postponing the flush and return. */
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
}
/* Otherwise fall trough, and go write since we can't wait
* over two seconds. */
server.aof_delayed_fsync++;
redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
5、任务队列(链表)
arg1只保存aof文件描述符即可,因为子线程会读取bio_jobs 获取文件描述符,然后进行flush操作
struct bio_job {
time_t time; /* Time at which the job was created. */
/* Job specific arguments pointers. If we need to pass more than three
* arguments we can just pass a pointer to a structure or alike. */
void *arg1, *arg2, *arg3;
};
6、在bioInit初始化函数中创建background线程
为了线程安全,包含了一些锁和条件标量的处理,线程函数是bioProcessBackgroundJobs。
/* Initialize the background system, spawning the thread. */
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
/* Initialization of state vars and objects */
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_condvar[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}
/* Set the stack size as by default it may be small in some system */
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, ((ssize_t)stacksize));
/* Ready to spawn our threads. We use the single argument the thread
* function accepts in order to pass the job ID the thread is
* responsible of. */
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
}
}
7、线程函数
从队列中循环取头结点进行flush操作,如果队列为空则会阻塞在pthread_cond_wait调用中,以等待主线程添加任务。
void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
#ifdef _WIN32
size_t type = (size_t) arg;
#else
unsigned long type = (unsigned long) arg;
#endif
sigset_t sigset;
pthread_detach(pthread_self());
pthread_mutex_lock(&bio_mutex[type]);
/* Block SIGALRM so we are sure that only the main thread will
* receive the watchdog signal. */
sigemptyset(&sigset);
sigaddset(&sigset, SIGALRM);
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
redisLog(REDIS_WARNING,
"Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
while(1) {
listNode *ln;
/* The loop always starts with the lock hold. */
if (listLength(bio_jobs[type]) == 0) {
pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
continue;
}
/* Pop the job from the queue. */
ln = listFirst(bio_jobs[type]);
job = ln->value;
/* It is now possible to unlock the background system as we know have
* a stand alone job structure to process.*/
pthread_mutex_unlock(&bio_mutex[type]);
/* Process the job accordingly to its type. */
if (type == REDIS_BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == REDIS_BIO_AOF_FSYNC) {
aof_fsync((long)job->arg1);
} else {
redisPanic("Wrong job type in bioProcessBackgroundJobs().");
}
zfree(job);
/* Lock again before reiterating the loop, if there are no longer
* jobs to process we'll block again in pthread_cond_wait(). */
pthread_mutex_lock(&bio_mutex[type]);
listDelNode(bio_jobs[type],ln);
bio_pending[type]--;
}
}
8、AOF文件持久化resize过程
触发resize过程的条件是:
(1)当前没有rbd持久化进程,也没有aof的resize进程,同时客户端请求BGSAVE请求时,触发resize过程
(2)当前没有rbd持久化进程,也没有aof的resize进程,没有客户端主动请求BGSAVE时,aof文件大小满足aof_rewrite_perc增长率时,触发resize过程。
其中 aof_rewrite_base_size 为上一次resize完成后的oaf文件大小;
aof_current_size aof文件当前大小;
growth为计算出的增长率;
aof_rewrite_perc为配置的重写百分比
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
resize具体的执行过程是调用rewriteAppendOnlyFileBackground()函数,此函数创建子进程进行resize操作,为了尽量减少aof文件的大小,也就是命令的树龄,resize过程会从hash表中读取每一个key,然后用RPUSH, SADD and ZADD等命令来替换。
启动resize进程之后,主进程对非阻塞的调用wait3函数,来捕捉resize进程的结束,当捕捉到resize进程结束之后,会调用
backgroundRewriteDoneHandler(int exitcode, int bysignal)函数,函数内调用
aofRewriteBufferWrite(fd),将
server.aof_rewrite_buf_blocks中差异数据补写到aof临时文件中,文件名字是
temp-rewriteaof-bg-resize子进程进程ID.aof。然后会对临时aof文件进行重命名,文件描述符的改写,和rlink()等等。
至此整个aof 持久化过程结束