从日志存储查看kafka高效的原理

  • Post author:
  • Post category:其他

kafka的日志存储

我们知道Kafka中的消息是存储在磁盘上的,那么为什么要使用磁盘作为存储介质?具体消息的存储格式又是什么呢?怎么样能够快速检索到指定的消息?消息不可能无限制存储,那么清理规则又是什么呢?

不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka又引入了日志分段(LogSegment)的概念,将Log切分为多个LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上,Log 和LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以“.txnindex”为后缀的事务索引文件)

为了便于消息的检索,每个LogSegment中的日志文件(以“.log”为文件后缀)都有对应的两个索引文件:偏移量索引文件(以“.index”为文件后缀)和时间戳索引文件(以“.timeindex”为文件后缀)。每个 LogSegment 都有一个基准偏移量 baseOffset,用来表示当前 LogSegment中第一条消息的offset。偏移量是一个64位的长整型数,日志文件和两个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为20位数字,没有达到的位数则用0填充。比如第一个LogSegment的基准偏移量为0,对应的日志文件为00000000000000000000.log。

kafka的日志索引

每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。

Kafka 中的索引文件以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由 broker 端参数 log.index.interval.bytes指定,默认值为4096,即4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小log.index.interval.bytes的值,对应地可以增加或缩小索引项的密度。

日志文件的切分

日志分段文件达到一定的条件时需要进行切分,那么其对应的索引文件也需要进行切分。日志分段文件切分包含以下几个条件,满足其一即可。

  1. 当前日志分段文件的大小超过了 broker 端参数 log.segment.bytes 配置的值。log.segment.bytes参数的默认值为1073741824,即1GB。2. 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于 log.roll.ms或log.roll.hours参数配置的值。如果同时配置了log.roll.ms和log.roll.hours参数,那么log.roll.ms的优先级高。默认情况下,只配置了log.roll.hours参数,其值为168,即7天。
  2. 偏移量索引文件或时间戳索引文件的大小达到broker端参数log.index.size.max.bytes配置的值。log.index.size.max.bytes的默认值为10485760,即10MB。
  3. 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于Integer.MAX_VALUE,即要追加的消息的偏移量不能转变为相对偏移量(offset-baseOffset>Integer.MAX_VALUE)。

对非当前活跃的日志分段而言,其对应的索引文件内容已经固定而不需要再写入索引项,所以会被设定为只读。而对当前活跃的日志分段(activeSegment)而言,索引文件还会追加更多的索引项,所以被设定为可读写。

偏移量索引
  1. relativeOffset:相对偏移量,表示消息相对于baseOffset 的偏移量,占用4 个字节,当前索引文件的文件名即为baseOffset的值。
  2. position:物理地址,也就是消息在日志分段文件中对应的物理位置,占用4个字节。
    relativeOffset  |    position
        4B          |    4B

消息的偏移量(offset)占用8个字节,也可以称为绝对偏移量**。索引项中没有直接使用绝对偏移量而改为只占用4个字节的相对偏移量(relativeOffset=offset-baseOffset)**,这样可以减小索引文件占用的空间。举个例子,一个日志分段的 baseOffset 为 32,那么其文件名就是00000000000000000032.log,offset为35的消息在索引文件中的relativeOffset的值为35-32=3。

时间戳索引
    timestamp       |    relativeOffset
        8B          |    4B

每个索引项占用12个字节,分为两个部分。

  1. timestamp:当前日志分段最大的时间戳。
  2. relativeOffset:时间戳所对应的消息的相对偏移量。

从时间戳索引文件查找偏移量日志的步骤

  1. 步骤:将targetTimeStamp和每个日志分段中的最大时间戳largestTimeStamp逐一对比,直到找到不小于 targetTimeStamp 的 largestTimeStamp 所对应的日志分段。日志分段中的largestTimeStamp的计算是先查询该日志分段所对应的时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取其值,否则取该日志分段的最近修改时间。【找到时间戳索引段中的一段】
  2. 找到相应的日志分段之后,在时间戳索引文件中使用二分查找算法查找到不大于targetTimeStamp的最大索引项,即[1526384718283,28],如此便找到了一个相对偏移量28。【从时间戳索引中找到偏移量】
  3. 在偏移量索引文件中使用二分算法查找到不大于28的最大索引项,即[26,838]。【从偏移量找到总体偏移量】
  4. 从步骤1中找到日志分段文件中的838的物理位置开始查找不小于targetTimeStamp的消息。

kafka的日志清理

Kafka 将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作。Kafka 中每一个分区副本都对应一个 Log,而 Log 又可以分为多个日志分段,这样也便于日志的清理操作。Kafka提供了两种日志清理策略。

  1. 日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段。
  2. 日志压缩(Log Compaction):针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本。

我们可以通过broker端参数log.cleanup.policy来设置日志清理策略,此参数的默认值为“delete”,即采用日志删除的清理策略。如果要采用日志压缩的清理策略,就需要将log.cleanup.policy设置为“compact”,并且还需要将log.cleaner.enable(默认值为true)设定为true。通过将log.cleanup.policy参数设置为“delete,compact”,还可以同时支持日志删除和日志压缩两种策略。日志清理的粒度可以控制到主题级别,比如与log.cleanup.policy 对应的主题级别的参数为 cleanup.policy

日志删除

在Kafka的日志管理器中会有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300000,即5分钟。当前日志分段的保留策略有3种:基于时间的保留策略、基于日志大小的保留策略和基于日志起始偏移量的保留策略。

基于时间

日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值(retentionMs)来寻找可删除的日志分段文件集合(deletableSegments)。默认情况下只配置了log.retention.hours参数,其值为168,故默认情况下日志分段文件的保留时间为7天

若待删除的日志分段的总数等于该日志文件中所有的日志分段的数量,那么说明所有的日志分段都已过期,但该日志文件中还要有一个日志分段用于接收消息的写入,即必须要保证有一个活跃的日志分段 activeSegment,在此种情况下,会先切分出一个新的日志分段作为activeSegment,然后执行删除操作。

删除日志分段时,首先会从Log对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作。然后将日志分段所对应的所有文件添加上“.deleted”的后缀(当然也包括对应的索引文件)。最后交由一个以“delete-file”命名的延迟任务来删除这些以“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过file.delete.delay.ms参数来调配,此参数的默认值为60000,即1分钟。

基于日志大小

日志删除任务会检查当前日志的大小是否超过设定的阈值(retentionSize)来寻找可删除的日志分段的文件集合(deletableSegments)

retentionSize可以通过broker端参数log.retention.bytes来配置,默认值为-1,表示无穷大。

基于日志起始偏移量

一般情况下,日志文件的起始偏移量 logStartOffset 等于第一个日志分段的 baseOffset,但这并不是绝对的,logStartOffset 的值可以通过 DeleteRecordsRequest 请求(比如使用KafkaAdminClient的deleteRecords()方法、使用kafka-delete-records.sh脚本)、日志的清理和截断等操作进行修改。

日志压缩

Kafka中的Log Compaction是指在默认的日志删除(Log Retention)规则之外提供的一种清理过时数据的方式。如图5-16所示,Log Compaction对于有相同key的不同value值,只保留最后一个版本。如果应用只关心key对应的最新value值,则可以开启Kafka的日志清理功能,Kafka会定期将相同key的消息进行合并,只保留最新的value值。

kafka的磁盘存储

Kafka 依赖于文件系统(更底层地来说就是磁盘)来存储和缓存消息。在我们的印象中,对于各个存储介质的速度认知大体同图所示的相同,层级越高代表速度越快。很显然,磁盘处于一个比较尴尬的位置,这不禁让我们怀疑Kafka 采用这种持久化形式能否提供有竞争力的性能。在传统的消息中间件 RabbitMQ 中,就使用内存作为默认的存储介质,而磁盘作为备选介质,以此实现高吞吐和低延迟的特性。然而,事实上磁盘可以比我们预想的要快,也可能比我们预想的要慢,这完全取决于我们如何使用它。
image

操作系统可以针对线性读写做深层次的优化,比如预读(read-ahead,提前将一个比较大的磁盘块读入内存)和后写(write-behind,将很多小的逻辑写操作合并起来组成一个大的物理写操作)技术。顺序写盘的速度不仅比随机写盘的速度快,而且也比随机写内存的速度快。【面试问到过】

Kafka 在设计时采用了文件追加的方式来写入消息**,即只能在日志文件的尾部追加新的消息,并且也不允许修改已写入的消息,这种方式属于典型的顺序写盘的操作**,所以就算 Kafka使用磁盘作为存储介质,它所能承载的吞吐量也不容小觑。但这并不是让Kafka在性能上具备足够竞争力的唯一因素。

页缓存 page cache

页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘 I/O 的操作。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。为了弥补性能上的差异,现代操作系统越来越“激进地”将内存作为磁盘缓存,甚至会非常乐意将所有可用的内存用作磁盘缓存,这样当内存回收时也几乎没有性能损失,所有对于磁盘的读写也将经由统一的缓存。

当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据所在的页(page)是否在页缓存(pagecache)中,如果存在(命中)则直接返回数据【预读】,从而避免了对物理磁盘的 I/O 操作;如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程。

同样,如果一个进程需要将数据写入磁盘,那么操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页。被修改过后的页也就变成了脏页,操作系统会在合适的时间【后写】把脏页中的数据写入磁盘,以保持数据的一致性。

Linux操作系统中的vm.dirty_background_ratio参数用来指定当脏页数量达到系统内存的百分之多少之后就会触发 pdflush/flush/kdmflush 等后台回写进程的运行来处理脏页,一般设置为小于10的值即可,但不建议设置为0。与这个参数对应的还有一个vm.dirty_ratio参数,它用来指定当脏页数量达到系统内存的百分之多少之后就不得不开始对脏页进行处理,在此过程中,新的 I/O 请求会被阻挡直至所有脏页被冲刷到磁盘中。对脏页有兴趣的读者还可以自行查阅vm.dirty_expire_centisecs、vm.dirty_writeback.centisecs等参数的使用说明。

对一个进程而言,它会在进程内部缓存处理所需的数据,然而这些数据有可能还缓存在操作系统的页缓存中,因此同一份数据有可能被缓存了两次。并且,除非使用Direct I/O的方式,否则页缓存很难被禁止。此外,用过Java的人一般都知道两点事实:对象的内存开销非常大,通常会是真实数据大小的几倍甚至更多,空间使用率低下;Java的垃圾回收会随着堆内数据的增多而变得越来越慢。基于这些因素,使用文件系统并依赖于页缓存的做法明显要优于维护一个进程内缓存或其他结构,至少我们可以省去了一份进程内部的缓存消耗,同时还可以通过结构紧凑的字节码来替代使用对象的方式以节省更多的空间。如此,我们可以在 32GB 的机器上使用28GB至30GB的内存而不用担心GC所带来的性能问题。此外,即使Kafka服务重启,页缓存还是会保持有效,然而进程内的缓存却需要重建。这样也极大地简化了代码逻辑,因为维护页缓存和文件之间的一致性交由操作系统来负责,这样会比进程内维护更加安全有效。

Kafka 中大量使用了页缓存,这是 Kafka 实现高吞吐的重要因素之一。虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务的,但在Kafka中同样提供了同步刷盘及间断性强制刷盘(fsync)的功能,这些功能可以通过 log.flush.interval.messages、log.flush.interval.ms 等参数来控制。同步刷盘可以提高消息的可靠性,防止由于机器掉电等异常造成处于页缓存而没有及时写入磁盘的消息丢失。不过笔者并不建议这么做,刷盘任务就应交由操作系统去调配,消息的可靠性应该由多副本机制来保障,而不是由同步刷盘这种严重影响性能的行为来保障。

Linux系统会使用磁盘的一部分作为swap分区,这样可以进行进程的调度:把当前非活跃的进程调入swap 分区,以此把内存空出来让给活跃的进程。对大量使用系统页缓存的 Kafka而言,应当尽量避免这种内存的交换,否则会对它各方面的性能产生很大的负面影响。我们可以通过修改vm.swappiness参数(Linux系统参数)来进行调节。vm.swappiness参数的上限为 100,它表示积极地使用 swap 分区,并把内存上的数据及时地搬运到 swap 分区中;vm.swappiness 参数的下限为 0,表示在任何情况下都不要发生交换(vm.swappiness=0的含义在不同版本的 Linux 内核中不太相同,这里采用的是变更后的最新解释),这样一来,当内存耗尽时会根据一定的规则突然中止某些进程。建议将这个参数的值设置为 1,这样保留了swap的机制而又最大限度地限制了它对Kafka性能的影响。

磁盘IO流程

从编程角度而言,一般磁盘I/O的场景有以下四种。

  1. 用户调用标准C库进行I/O操作,数据流为:应用程序buffer→C库标准IObuffer→文件系统页缓存→通过具体文件系统到磁盘。
  2. 用户调用文件 I/O,数据流为:应用程序 buffer→文件系统页缓存→通过具体文件系统到磁盘
  3. 用户打开文件时使用O_DIRECT,绕过页缓存直接读写磁盘。
  4. 用户使用类似dd工具,并使用direct参数,绕过系统cache与文件系统直接写磁盘。发起I/O请求的步骤可以表述为如下的内容(以最长链路为例)。
  • 写操作:用户调用fwrite把数据写入C库标准IObuffer后就返回,即写操作通常是异步操作;数据写入C库标准IObuffer后,不会立即刷新到磁盘,会将多次小数据量相邻写操作先缓存起来合并,最终调用write函数一次性写入(或者将大块数据分解多次write 调用)页缓存;数据到达页缓存后也不会立即刷新到磁盘,内核有 pdflush 线程在不停地检测脏页,判断是否要写回到磁盘,如果是则发起磁盘I/O请求。

  • 读操作:用户调用fread到C库标准IObuffer中读取数据,如果成功则返回,否则继续;到页缓存中读取数据,如果成功则返回,否则继续;发起 I/O 请求,读取数据后缓存buffer和C库标准IObuffer并返回。可以看出,读操作是同步请求。

  • I/O请求处理:通用块层根据I/O请求构造一个或多个bio结构并提交给调度层;调度器将 bio 结构进行排序和合并组织成队列且确保读写操作尽可能理想:将一个或多个进程的读操作合并到一起读,将一个或多个进程的写操作合并到一起写,尽可能变随机为顺序(因为随机读写比顺序读写要慢),读必须优先满足,而写也不能等太久

零拷贝

除了消息顺序追加、页缓存等技术,Kafka还使用零拷贝(Zero-Copy)技术来进一步提升性能。所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换。对 Linux操作系统而言,零拷贝技术依赖于底层的 sendfile()方法实现。对应于 Java 语言,FileChannal.transferTo()方法的底层实现就是sendfile()方法。

单纯从概念上理解“零拷贝”比较抽象,这里简单地介绍一下它。考虑这样一种常用的情形:你需要将静态内容(类似图片、文件)展示给用户。这个情形就意味着需要先将静态内容从磁盘中复制出来放到一个内存buf中,然后将这个buf通过套接字(Socket)传输给用户,进而用户获得静态内容。这看起来再正常不过了,但实际上这是很低效的流程,我们把上面的这种情形抽象成下面的过程:

    read(file, tmp_buf, len);
    write(socket, tmp_buf, len);

首先调用read()将静态内容(这里假设为文件A)读取到tmp_buf,然后调用write()将tmp_buf写入Socket,

在这个过程中,文件A经历了4次复制的过程:

  1. 调用read()时,文件A中的内容被复制到了内核模式下的Read Buffer中。
  2. CPU控制将内核模式数据复制到用户模式下。
  3. 调用write()时,将用户模式下的内容复制到内核模式下的Socket Buffer中。
  4. 将内核模式下的Socket Buffer的数据复制到网卡设备中传送。
    image

从上面的过程可以看出,数据平白无故地从内核模式到用户模式“走了一圈”,浪费了 2次复制过程:第一次是从内核模式复制到用户模式;第二次是从用户模式再复制回内核模式,即上面4次过程中的第2步和第3步。而且在上面的过程中,内核和用户模式的上下文的切换也是4次。

如果采用了零拷贝技术,那么应用程序可以直接请求内核把磁盘中的数据传输给 Socket,
image

零拷贝技术通过DMA(Direct Memory Access)技术将文件内容复制到内核模式下的Read Buffer中。不过没有数据被复制到 Socket Buffer,相反只有包含数据的位置和长度的信息的文件描述符被加到Socket Buffer中。DMA引擎直接将数据从内核模式中传递到网卡设备(协议引擎)。这里数据只经历了2次复制就从磁盘中传送出去了,并且上下文切换也变成了2次。零拷贝是针对内核模式而言的,数据在内核模式下实现了零拷贝。


版权声明:本文为qq_23747281原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。