Netty原理-Bytebuffer

  • Post author:
  • Post category:其他


ByteBuffer结构

在ByteBuffer中有三个区域,可丢弃部分,可读部分,可写部分。分别对应了三个指针,readerIndex,writerIndex,capacity,进行读写时,可根据这几个指针的位置进行读写。

读,写,mark,reset方法

在进行数据的读取时通过readerIndex位置开始读取一个字节

public byte readByte() {
    checkReadableBytes0(1);
    int i = readerIndex;
    byte b = _getByte(i);
    readerIndex = i + 1;
    return b;
}

在进行写数据时从writerIndex位置开始写入数据

public ByteBuf writeByte(int value) {
    ensureWritable0(1);
    _setByte(writerIndex++, value);
    return this;
}

mark方法给当前指针做标记

@Override
public ByteBuf markReaderIndex() {
    markedReaderIndex = readerIndex;
    return this;
}

reset方法,重置标记处的指针

public ByteBuf resetReaderIndex() {
    readerIndex(markedReaderIndex);
    return this;
}

ByteBuffer分类

在AbstractByteBuf中包含了 readerIndex; writerIndex; markedReaderIndex; markedWriterIndex; maxCapacity;几个读写相关的指针。

通过ByteBuffer结构,可判断可读,可写区域的字节数。

@Override
public int readableBytes() {
    return writerIndex - readerIndex;
}

@Override
public int writableBytes() {
    return capacity() - writerIndex;
}

@Override
public int maxWritableBytes() {
    return maxCapacity() - writerIndex;
}

Pooled和UnPooled

Pooled和UnPooled的区别在于获取内存时有没有通过缓存池。

内存分配器ByteBufAllocator

在Netty中有两种内存分配器,一种是PooledByteBufAAllocator,一种是UnpooledByteBufAllocator。

UnpooledByteBufAllocator

在newHeapBuffer中最终会调到UnpooledHeapByteBuf的构造函数进行创建一个初始容量大小的Byte数组

protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    return PlatformDependent.hasUnsafe() ?
            new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
            new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(maxCapacity);

    if (initialCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
                "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
    }

    this.alloc = checkNotNull(alloc, "alloc");
    setArray(allocateArray(initialCapacity));
    setIndex(0, 0);
}

在堆外内存的分配逻辑中,最终会调到UnpooledDirectByteBuf的构造函数中通过

 ByteBuffer.allocateDirect(initialCapacity)申请一个初始大小的内存。
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    final ByteBuf buf;
    if (PlatformDependent.hasUnsafe()) {
        buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
                new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
    } else {
        buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }
    return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(maxCapacity);
    ObjectUtil.checkNotNull(alloc, "alloc");
    checkPositiveOrZero(initialCapacity, "initialCapacity");
    checkPositiveOrZero(maxCapacity, "maxCapacity");
    if (initialCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
                "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
    }

    this.alloc = alloc;
    setByteBuffer(allocateDirect(initialCapacity), false);
}

PooledBytebufAllocator

在newDirectBuffer的流程处理如下:

1. 通过threadCache中获取一个PoolThreadCache,threadCache是一个ThreadLocal变量

2. 通过PoolThreadCache中获取PoolArena对象

3.通过PoolArena对象申请内存

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;

    final ByteBuf buf;
    if (directArena != null) {
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        buf = PlatformDependent.hasUnsafe() ?
                UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }

    return toLeakAwareBuffer(buf);
}

在directArea.allocate方法中通过带有RECYCLER回收池这样一个东西,进行的ByteBuf的循环利用,这就是Pool的池化.

static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
    PooledUnsafeDirectByteBuf buf = RECYCLER.get();
    buf.reuse(maxCapacity);
    return buf;
}

在PooledByteBufAllocator构造方法中,会创建heapArenas和directArenas

public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                              int smallCacheSize, int normalCacheSize,
                              boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
    super(preferDirect);
    threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
    this.smallCacheSize = smallCacheSize;
    this.normalCacheSize = normalCacheSize;

    if (directMemoryCacheAlignment != 0) {
        if (!PlatformDependent.hasAlignDirectByteBuffer()) {
            throw new UnsupportedOperationException("Buffer alignment is not supported. " +
                    "Either Unsafe or ByteBuffer.alignSlice() must be available.");
        }

        // Ensure page size is a whole multiple of the alignment, or bump it to the next whole multiple.
        pageSize = (int) PlatformDependent.align(pageSize, directMemoryCacheAlignment);
    }

    chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);

    checkPositiveOrZero(nHeapArena, "nHeapArena");
    checkPositiveOrZero(nDirectArena, "nDirectArena");

    checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
    if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
        throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
    }

    if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
        throw new IllegalArgumentException("directMemoryCacheAlignment: "
                + directMemoryCacheAlignment + " (expected: power of two)");
    }

    int pageShifts = validateAndCalculatePageShifts(pageSize, directMemoryCacheAlignment);

    if (nHeapArena > 0) {
        heapArenas = newArenaArray(nHeapArena);
        List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
        for (int i = 0; i < heapArenas.length; i ++) {
            PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
                    pageSize, pageShifts, chunkSize,
                    directMemoryCacheAlignment);
            heapArenas[i] = arena;
            metrics.add(arena);
        }
        heapArenaMetrics = Collections.unmodifiableList(metrics);
    } else {
        heapArenas = null;
        heapArenaMetrics = Collections.emptyList();
    }

    if (nDirectArena > 0) {
        directArenas = newArenaArray(nDirectArena);
        List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
        for (int i = 0; i < directArenas.length; i ++) {
            PoolArena.DirectArena arena = new PoolArena.DirectArena(
                    this, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment);
            directArenas[i] = arena;
            metrics.add(arena);
        }
        directArenaMetrics = Collections.unmodifiableList(metrics);
    } else {
        directArenas = null;
        directArenaMetrics = Collections.emptyList();
    }
    metric = new PooledByteBufAllocatorMetric(this);
}

在PoolThreadCache的构造方法中进行按页或子页进行分配

PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
                int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,
                int freeSweepAllocationThreshold) {
    checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
    this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
    this.heapArena = heapArena;
    this.directArena = directArena;
    if (directArena != null) {
        smallSubPageDirectCaches = createSubPageCaches(
                smallCacheSize, directArena.numSmallSubpagePools);

        normalDirectCaches = createNormalCaches(
                normalCacheSize, maxCachedBufferCapacity, directArena);

        directArena.numThreadCaches.getAndIncrement();
    } else {
        // No directArea is configured so just null out all caches
        smallSubPageDirectCaches = null;
        normalDirectCaches = null;
    }
    if (heapArena != null) {
        // Create the caches for the heap allocations
        smallSubPageHeapCaches = createSubPageCaches(
                smallCacheSize, heapArena.numSmallSubpagePools);

        normalHeapCaches = createNormalCaches(
                normalCacheSize, maxCachedBufferCapacity, heapArena);

        heapArena.numThreadCaches.getAndIncrement();
    } else {
        // No heapArea is configured so just null out all caches
        smallSubPageHeapCaches = null;
        normalHeapCaches = null;
    }

    // Only check if there are caches in use.
    if ((smallSubPageDirectCaches != null || normalDirectCaches != null
            || smallSubPageHeapCaches != null || normalHeapCaches != null)
            && freeSweepAllocationThreshold < 1) {
        throw new IllegalArgumentException("freeSweepAllocationThreshold: "
                + freeSweepAllocationThreshold + " (expected: > 0)");
    }
}

缓存数据结构

在MemoryRegionCache中有三部分组成,size,queue,sizeclass。

在queue中每个都是一个Entry,一个Entry中有一个chunk和一个handle(执行chunk的内存地址)

sizeClass是指tiny(0-512B),small(512B-4K),normal(8K-16M)

private abstract static class MemoryRegionCache<T> {
    private final int size;
    private final Queue<Entry<T>> queue;
    private final SizeClass sizeClass;
    private int allocations;

命中缓存的内存分配原则

1. 根据size确定MemoryRegionCache

2. 看是否能命中queue中的entry中的缓存,并初始化buf

3. 根据size进行page级别内存分配还是subpage级别内存分配

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    final int sizeIdx = size2SizeIdx(reqCapacity);

    if (sizeIdx <= smallMaxSizeIdx) {
        tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx);
    } else if (sizeIdx < nSizes) {
        tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx);
    } else {
        int normCapacity = directMemoryCacheAlignment > 0
                ? normalizeSize(reqCapacity) : reqCapacity;
        // Huge allocations are never served via the cache so just call allocateHuge
        allocateHuge(buf, normCapacity);
    }
}



版权声明:本文为csywxr原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。