redis源码阅读(8)-AOF持久化

  • Post author:
  • Post category:其他





AOF 持久化记录服务器执行的所有写操作命令,并在服务器启动时,通过重新执行这些命令来还原数据集。 AOF 文件中的命令全部以 Redis 协议的格式来保存,新命令会被追加到文件的末尾。 Redis 还可以在后台对 AOF 文件进行重写(rewrite),使得 AOF 文件的体积不会超出保存数据集状态所需的实际大小。Redis 还可以同时使用 AOF 持久化和 RDB 持久化。 在这种情况下, 当 Redis 重启时, 它会优先使用 AOF 文件来还原数据集, 因为 AOF 文件保存的数据集通常比 RDB 文件所保存的数据集更完整。你甚至可以关闭持久化功能,让数据只在服务器运行时存在。







AOF数据在redis中的整个流程:



redis接收客户端命令存储到aof_buf–>调用系统函数write()到系统缓冲区–>下刷到磁盘







1、aof_buf 内存buffer



aof_buf是redis开辟的内存缓冲区,从来存储从客户端过来的各种命令以及数据。存储的格式完全复制原命令,也就是c/s之间通信协议格式。如下:



*2

$6

SELECT

$1

0

*3



什么时候存储到aof_buf中呢 当客户端发送数据到redis时,会触发server端的读事件,在读时间处理函数中,会调用propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,int flags)将cmd命令以及各种参数写入到aof_buf中。

propagate又调用feedAppendOnlyFile函数执行具体的吸入aof_buf操作:




void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {


cmd, 解析完成的客户端命令机构体




dictid,操作的数据库id




argv 命令数据




argc 命令数据个数






    sds buf = sdsempty();
    robj *tmpargv[3];

    /* The DB this command was targeting is not the same as the last command
     * we appendend. To issue a SELECT command is needed. */
    if (dictid != server.aof_selected_db) {
        char seldb[64];

        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.aof_selected_db = dictid;
    }



如果当前命令的数据库id不是上一次append到buf时的命令,那么先向buf中写入select命令


if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
        /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        /* Translate SETEX/PSETEX to SET and PEXPIREAT */
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
        decrRefCount(tmpargv[0]);
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else {
        /* All the other commands don't need translation or need the
         * same translation already operated in the command vector
         * for the replication itself. */
        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }




catAppendOnlyGenericCommand,按照通信协议转换已经解析的数据,这个地方为什么在这个地方再重构数据,不是很理解,可能是个优化点,在server端解析之前,直接保存,难道不是更好么~~


将重构好的数据append到buf中


    /* Append to the AOF buffer. This will be flushed on disk just before
     * of re-entering the event loop, so before the client will get a
     * positive reply about the operation performed. */
    if (server.aof_state == REDIS_AOF_ON)
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));


将buf中的数据append到aof_buf中




    /* If a background append only file rewriting is in progress we want to
     * accumulate the differences between the child DB and the current one
     * in a buffer, so that when the child process will do its work we
     * can append the differences to the new append only file. */
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,(unsigned long)sdslen(buf));


这个地方检察rewrite进程是否在运行,如果在运行,因为父子进程的写时复制的问题,并且父进程在不断的写入数据,此时会导致父子进程中db数据不一致的问题,那么aserver.aof_rewrite_buf_blocks,缓冲区就是为了记录不一致的数据,在resize进程完成退出后,会将

aof_rewrite_buf_blocks数据,写入到aof文件中。保证数据的一致性!





到此写入到aof_buf已经完成~~


2、何时调用系统调用write函数


为了保证先于恢复客户端请求之前,将aof_buf中数据刷新到磁盘,在每次eventLoop事件循环之前调用beforeSleep函数,这个函数调用flushAppendOnlyFile(0); 将aof_buf数据write到系统缓冲区中。


    nwritten = write(server.aof_fd,server.aof_buf,(unsigned int)sdslen(server.aof_buf));


3、什么策略将系统缓冲区的数据flush到磁盘中


resdis有三种flush策略:


AOF_FSYNC_ALWAYS:每次eventLoop循环之前flush一次


AOF_FSYNC_EVERYSEC:每秒flush一次



AOF_FSYNC_NO:完全依靠系统机制将缓冲区flush到磁盘中






针对

AOF_FSYNC_ALWAYS和

AOF_FSYNC_NO好理解



 /* Perform the fsync if needed. */
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        /* aof_fsync is defined as fdatasync() for Linux in order to avoid
         * flushing metadata. */
        aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
        server.aof_last_fsync = server.unixtime;
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
        if (!sync_in_progress) aof_background_fsync(server.aof_fd);
        server.aof_last_fsync = server.unixtime;
    }

其中server.unixtime是每秒系统时间的抽样,aof_background_fsync添加任务到任务队列中。


针对AOF_FSYNC_EVERYSEC情况,redis专门启动了一个线程来处理flush任务,redis提供了一个任务队列(链表),主线程和flush子线程共享此任务队列(链表),主线程


每隔一秒向队列中添加一条任务,子线程阻塞等待任务不为空,添加任务后,通过条件变量通知到子线程,子线程唤醒,开始flush数据。


4、write和flush操作的配合


首先获取sync_in_process 代表没有完成的job的数量,也就是flush任务队列的长度。


如果flush任务队列不为空,则redis会将此次write操作延时执行,,2秒内任务队列中还有任务,那么强制执行write操作。


    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
        /* With this append fsync policy we do background fsyncing.
         * If the fsync is still in progress we can try to delay
         * the write for a couple of seconds. */
        if (sync_in_progress) {
            if (server.aof_flush_postponed_start == 0) {
                /* No previous write postponinig, remember that we are
                 * postponing the flush and return. */
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                /* We were already waiting for fsync to finish, but for less
                 * than two seconds this is still ok. Postpone again. */
                return;
            }
            /* Otherwise fall trough, and go write since we can't wait
             * over two seconds. */
            server.aof_delayed_fsync++;
            redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
    }




如果设置了no-appendfsync-on-rewrite,这个标志位的意思是如果有子进程进行rewrite操作,那么停止flush操作,完全交个操作系统flush数据,也就是暂时AOF_FSYNC_NO。这可能会给刚刚write的数据带来不安全性。

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
        /* With this append fsync policy we do background fsyncing.
         * If the fsync is still in progress we can try to delay
         * the write for a couple of seconds. */
        if (sync_in_progress) {
            if (server.aof_flush_postponed_start == 0) {
                /* No previous write postponinig, remember that we are
                 * postponing the flush and return. */
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                /* We were already waiting for fsync to finish, but for less
                 * than two seconds this is still ok. Postpone again. */
                return;
            }
            /* Otherwise fall trough, and go write since we can't wait
             * over two seconds. */
            server.aof_delayed_fsync++;
            redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
    }


5、任务队列(链表)


arg1只保存aof文件描述符即可,因为子线程会读取bio_jobs 获取文件描述符,然后进行flush操作


struct bio_job {
    time_t time; /* Time at which the job was created. */
    /* Job specific arguments pointers. If we need to pass more than three
     * arguments we can just pass a pointer to a structure or alike. */
    void *arg1, *arg2, *arg3;
};



6、在bioInit初始化函数中创建background线程


为了线程安全,包含了一些锁和条件标量的处理,线程函数是bioProcessBackgroundJobs。


/* Initialize the background system, spawning the thread. */
void bioInit(void) {
    pthread_attr_t attr;
    pthread_t thread;
    size_t stacksize;
    int j;

    /* Initialization of state vars and objects */
    for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
        pthread_mutex_init(&bio_mutex[j],NULL);
        pthread_cond_init(&bio_condvar[j],NULL);
        bio_jobs[j] = listCreate();
        bio_pending[j] = 0;
    }

    /* Set the stack size as by default it may be small in some system */
    pthread_attr_init(&attr);
    pthread_attr_getstacksize(&attr,&stacksize);
    if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
    pthread_attr_setstacksize(&attr, ((ssize_t)stacksize));

    /* Ready to spawn our threads. We use the single argument the thread
     * function accepts in order to pass the job ID the thread is
     * responsible of. */
    for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
            redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
    }
}


7、线程函数


从队列中循环取头结点进行flush操作,如果队列为空则会阻塞在pthread_cond_wait调用中,以等待主线程添加任务。


void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
#ifdef _WIN32
    size_t type = (size_t) arg;
#else
    unsigned long type = (unsigned long) arg;
#endif
    sigset_t sigset;

    pthread_detach(pthread_self());
    pthread_mutex_lock(&bio_mutex[type]);
    /* Block SIGALRM so we are sure that only the main thread will
     * receive the watchdog signal. */
    sigemptyset(&sigset);
    sigaddset(&sigset, SIGALRM);
    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        redisLog(REDIS_WARNING,
            "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

    while(1) {
        listNode *ln;

        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
            continue;
        }
        /* Pop the job from the queue. */
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread_mutex_unlock(&bio_mutex[type]);

        /* Process the job accordingly to its type. */
        if (type == REDIS_BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == REDIS_BIO_AOF_FSYNC) {
            aof_fsync((long)job->arg1);
        } else {
            redisPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job);

        /* Lock again before reiterating the loop, if there are no longer
         * jobs to process we'll block again in pthread_cond_wait(). */
        pthread_mutex_lock(&bio_mutex[type]);
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;
    }
}




8、AOF文件持久化resize过程


触发resize过程的条件是:


(1)当前没有rbd持久化进程,也没有aof的resize进程,同时客户端请求BGSAVE请求时,触发resize过程


(2)当前没有rbd持久化进程,也没有aof的resize进程,没有客户端主动请求BGSAVE时,aof文件大小满足aof_rewrite_perc增长率时,触发resize过程。


其中 aof_rewrite_base_size 为上一次resize完成后的oaf文件大小;


aof_current_size aof文件当前大小;


growth为计算出的增长率;


aof_rewrite_perc为配置的重写百分比

long long base = server.aof_rewrite_base_size ?
                            server.aof_rewrite_base_size : 1;
            long long growth = (server.aof_current_size*100/base) - 100;
            if (growth >= server.aof_rewrite_perc) {
                redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }


resize具体的执行过程是调用rewriteAppendOnlyFileBackground()函数,此函数创建子进程进行resize操作,为了尽量减少aof文件的大小,也就是命令的树龄,resize过程会从hash表中读取每一个key,然后用RPUSH, SADD and ZADD等命令来替换。




启动resize进程之后,主进程对非阻塞的调用wait3函数,来捕捉resize进程的结束,当捕捉到resize进程结束之后,会调用


backgroundRewriteDoneHandler(int exitcode, int bysignal)函数,函数内调用


aofRewriteBufferWrite(fd),将


server.aof_rewrite_buf_blocks中差异数据补写到aof临时文件中,文件名字是


temp-rewriteaof-bg-resize子进程进程ID.aof。然后会对临时aof文件进行重命名,文件描述符的改写,和rlink()等等。


至此整个aof 持久化过程结束



版权声明:本文为hehe123456ZXC原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。