ByteBuffer结构
在ByteBuffer中有三个区域,可丢弃部分,可读部分,可写部分。分别对应了三个指针,readerIndex,writerIndex,capacity,进行读写时,可根据这几个指针的位置进行读写。
读,写,mark,reset方法
在进行数据的读取时通过readerIndex位置开始读取一个字节
public byte readByte() { checkReadableBytes0(1); int i = readerIndex; byte b = _getByte(i); readerIndex = i + 1; return b; }
在进行写数据时从writerIndex位置开始写入数据
public ByteBuf writeByte(int value) { ensureWritable0(1); _setByte(writerIndex++, value); return this; }
mark方法给当前指针做标记
@Override public ByteBuf markReaderIndex() { markedReaderIndex = readerIndex; return this; }
reset方法,重置标记处的指针
public ByteBuf resetReaderIndex() { readerIndex(markedReaderIndex); return this; }
ByteBuffer分类
在AbstractByteBuf中包含了 readerIndex; writerIndex; markedReaderIndex; markedWriterIndex; maxCapacity;几个读写相关的指针。
通过ByteBuffer结构,可判断可读,可写区域的字节数。
@Override public int readableBytes() { return writerIndex - readerIndex; } @Override public int writableBytes() { return capacity() - writerIndex; } @Override public int maxWritableBytes() { return maxCapacity() - writerIndex; }
Pooled和UnPooled
Pooled和UnPooled的区别在于获取内存时有没有通过缓存池。
内存分配器ByteBufAllocator
在Netty中有两种内存分配器,一种是PooledByteBufAAllocator,一种是UnpooledByteBufAllocator。
UnpooledByteBufAllocator
在newHeapBuffer中最终会调到UnpooledHeapByteBuf的构造函数进行创建一个初始容量大小的Byte数组
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { return PlatformDependent.hasUnsafe() ? new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) : new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity); }
public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = checkNotNull(alloc, "alloc"); setArray(allocateArray(initialCapacity)); setIndex(0, 0); }
在堆外内存的分配逻辑中,最终会调到UnpooledDirectByteBuf的构造函数中通过
ByteBuffer.allocateDirect(initialCapacity)申请一个初始大小的内存。
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { final ByteBuf buf; if (PlatformDependent.hasUnsafe()) { buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) : new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return disableLeakDetector ? buf : toLeakAwareBuffer(buf); }
public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); ObjectUtil.checkNotNull(alloc, "alloc"); checkPositiveOrZero(initialCapacity, "initialCapacity"); checkPositiveOrZero(maxCapacity, "maxCapacity"); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; setByteBuffer(allocateDirect(initialCapacity), false); }
PooledBytebufAllocator
在newDirectBuffer的流程处理如下:
1. 通过threadCache中获取一个PoolThreadCache,threadCache是一个ThreadLocal变量
2. 通过PoolThreadCache中获取PoolArena对象
3.通过PoolArena对象申请内存
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; final ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }
在directArea.allocate方法中通过带有RECYCLER回收池这样一个东西,进行的ByteBuf的循环利用,这就是Pool的池化.
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) { PooledUnsafeDirectByteBuf buf = RECYCLER.get(); buf.reuse(maxCapacity); return buf; }
在PooledByteBufAllocator构造方法中,会创建heapArenas和directArenas
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int smallCacheSize, int normalCacheSize, boolean useCacheForAllThreads, int directMemoryCacheAlignment) { super(preferDirect); threadCache = new PoolThreadLocalCache(useCacheForAllThreads); this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize; if (directMemoryCacheAlignment != 0) { if (!PlatformDependent.hasAlignDirectByteBuffer()) { throw new UnsupportedOperationException("Buffer alignment is not supported. " + "Either Unsafe or ByteBuffer.alignSlice() must be available."); } // Ensure page size is a whole multiple of the alignment, or bump it to the next whole multiple. pageSize = (int) PlatformDependent.align(pageSize, directMemoryCacheAlignment); } chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder); checkPositiveOrZero(nHeapArena, "nHeapArena"); checkPositiveOrZero(nDirectArena, "nDirectArena"); checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment"); if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) { throw new IllegalArgumentException("directMemoryCacheAlignment is not supported"); } if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) { throw new IllegalArgumentException("directMemoryCacheAlignment: " + directMemoryCacheAlignment + " (expected: power of two)"); } int pageShifts = validateAndCalculatePageShifts(pageSize, directMemoryCacheAlignment); if (nHeapArena > 0) { heapArenas = newArenaArray(nHeapArena); List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length); for (int i = 0; i < heapArenas.length; i ++) { PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment); heapArenas[i] = arena; metrics.add(arena); } heapArenaMetrics = Collections.unmodifiableList(metrics); } else { heapArenas = null; heapArenaMetrics = Collections.emptyList(); } if (nDirectArena > 0) { directArenas = newArenaArray(nDirectArena); List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length); for (int i = 0; i < directArenas.length; i ++) { PoolArena.DirectArena arena = new PoolArena.DirectArena( this, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment); directArenas[i] = arena; metrics.add(arena); } directArenaMetrics = Collections.unmodifiableList(metrics); } else { directArenas = null; directArenaMetrics = Collections.emptyList(); } metric = new PooledByteBufAllocatorMetric(this); }
在PoolThreadCache的构造方法中进行按页或子页进行分配
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity"); this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; this.heapArena = heapArena; this.directArena = directArena; if (directArena != null) { smallSubPageDirectCaches = createSubPageCaches( smallCacheSize, directArena.numSmallSubpagePools); normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena); directArena.numThreadCaches.getAndIncrement(); } else { // No directArea is configured so just null out all caches smallSubPageDirectCaches = null; normalDirectCaches = null; } if (heapArena != null) { // Create the caches for the heap allocations smallSubPageHeapCaches = createSubPageCaches( smallCacheSize, heapArena.numSmallSubpagePools); normalHeapCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, heapArena); heapArena.numThreadCaches.getAndIncrement(); } else { // No heapArea is configured so just null out all caches smallSubPageHeapCaches = null; normalHeapCaches = null; } // Only check if there are caches in use. if ((smallSubPageDirectCaches != null || normalDirectCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null) && freeSweepAllocationThreshold < 1) { throw new IllegalArgumentException("freeSweepAllocationThreshold: " + freeSweepAllocationThreshold + " (expected: > 0)"); } }
缓存数据结构
在MemoryRegionCache中有三部分组成,size,queue,sizeclass。
在queue中每个都是一个Entry,一个Entry中有一个chunk和一个handle(执行chunk的内存地址)
sizeClass是指tiny(0-512B),small(512B-4K),normal(8K-16M)
private abstract static class MemoryRegionCache<T> { private final int size; private final Queue<Entry<T>> queue; private final SizeClass sizeClass; private int allocations;
命中缓存的内存分配原则
1. 根据size确定MemoryRegionCache
2. 看是否能命中queue中的entry中的缓存,并初始化buf
3. 根据size进行page级别内存分配还是subpage级别内存分配
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { final int sizeIdx = size2SizeIdx(reqCapacity); if (sizeIdx <= smallMaxSizeIdx) { tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx); } else if (sizeIdx < nSizes) { tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx); } else { int normCapacity = directMemoryCacheAlignment > 0 ? normalizeSize(reqCapacity) : reqCapacity; // Huge allocations are never served via the cache so just call allocateHuge allocateHuge(buf, normCapacity); } }