Netty之Buffer(一)ByteBuf内存泄漏检测1

  • Post author:
  • Post category:其他


Netty channel看完看Buffer。

自从 Netty 4 开始,对象的生命周期由它们的引用计数( reference counts )管理,而不是由垃圾收集器( garbage collector )管理了。ByteBuf 是最值得注意的,它使用了引用计数来改进分配内存和释放内存的性能。–Netty官方文档翻译

引用计数是计算机编程语言中的一种内存管理技术,是指将资源(可以是对象、内存或磁盘空间等等)的被引用次数保存起来,当被引用次数变为零时就将其释放的过程。使用引用计数技术可以实现自动资源管理的目的。同时引用计数还可以指使用引用计数技术回收未使用资源的垃圾回收算法。–百度

Netty 中,通过io.netty.util.ReferenceCounted 接口,定义了引用计数相关的一系列操作:

public interface ReferenceCounted {

    /**
     * 获得引用计数
     * Returns the reference count of this object.  If {@code 0}, it means this object has been deallocated.
     */
    int refCnt();

    /**
     * 增加引用计数 1
     * Increases the reference count by {@code 1}.
     */
    ReferenceCounted retain();
    /**
     * 增加引用计数 n
     *
     * Increases the reference count by the specified {@code increment}.
     */
    ReferenceCounted retain(int increment);

    /**
     * 等价于调用 `#touch(null)` 方法,即 hint 方法参数传递为 null 。
     *
     * Records the current access location of this object for debugging purposes.
     * If this object is determined to be leaked, the information recorded by this operation will be provided to you
     * via {@link ResourceLeakDetector}.  This method is a shortcut to {@link #touch(Object) touch(null)}.
     */
    ReferenceCounted touch();
    /**
     * 出于调试目的,用一个额外的任意的(arbitrary)信息记录这个对象的当前访问地址. 
     * 如果这个对象被检测到泄露了, 这个操作记录的信息将通过ResourceLeakDetector 提供.
     *
     * Records the current access location of this object with an additional arbitrary information for debugging
     * purposes.  If this object is determined to be leaked, the information recorded by this operation will be
     * provided to you via {@link ResourceLeakDetector}.
     */
    ReferenceCounted touch(Object hint);

    /**
     * 减少引用计数 1 。
     * 当引用计数为 0 时,释放
     *
     * Decreases the reference count by {@code 1} and deallocates this object if the reference count reaches at
     * {@code 0}.
     *
     * @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
     */
    boolean release();
    /**
     * 减少引用计数 n 。
     *  当引用计数为 0 时,释放
     *
     * Decreases the reference count by the specified {@code decrement} and deallocates this object if the reference
     * count reaches at {@code 0}.
     *
     * @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
     */
    boolean release(int decrement);
}

ByteBuf 虽然继承了 ReferenceCounted 接口,但是并未实现相应的方法真正与实现相关的类如下:

(菜鸟教程:装饰器模式(Decorator Pattern)允许向一个现有的对象添加新的功能,同时又不改变其结构。这种类型的设计模式属于结构型模式,它是作为现有的类的一个包装。

这种模式创建了一个装饰类,用来包装原有的类,并在保持类方法签名完整性的前提下,提供了额外的功能。

我们通过下面的实例来演示装饰器模式的用法。其中,我们将把一个形状装饰上不同的颜色,同时又不改变形状类。

1、孙悟空有 72 变,当他变成”庙宇”后,他的根本还是一只猴子,但是他又有了庙宇的功能。 2、不论一幅画有没有画框都可以挂在墙上,但是通常都是有画框的,并且实际上是画框被挂在墙上。在挂在墙上之前,画可以被蒙上玻璃,装到框子里;这时画、玻璃和画框形成了一个物体



AbstractReferenceCountedByteBuf ,实现引用计数的获取与增减的操作。

WrappedByteBuf ,实现对 ByteBuf 的装饰器实现类。

WrappedCompositeByteBuf ,实现对 CompositeByteBuf 的装饰器实现类。

SimpleLeakAwareByteBuf、SimpleLeakAwareCompositeByteBuf ,实现了 SIMPLE 级别的内存泄露检测。

AdvancedLeakAwareByteBuf、AdvancedLeakAwareCompositeByteBuf ,实现了 ADVANCED 和 PARANOID 级别的内存泄露检测。

UnreleasableByteBuf ,用于阻止他人对装饰的 ByteBuf 的销毁,避免被错误销毁掉。

ByteBufAllocator 可用于创建 ByteBuf 对象。创建的过程中,它会调用 toLeakAwareBuffer(…) 方法,将 ByteBuf 装饰成 LeakAware ( 可检测内存泄露 )的 ByteBuf 对象:

// AbstractByteBufAllocator.java
protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
    ResourceLeakTracker<ByteBuf> leak;
    switch (ResourceLeakDetector.getLevel()) {
        case SIMPLE:
            leak = AbstractByteBuf.leakDetector.track(buf);
            if (leak != null) {
                buf = new SimpleLeakAwareByteBuf(buf, leak);
            }
            break;
        case ADVANCED:
        case PARANOID:
            leak = AbstractByteBuf.leakDetector.track(buf);
            if (leak != null) {
                buf = new AdvancedLeakAwareByteBuf(buf, leak);
            }
            break;
        default:
            break;
    }
    return buf;
}

protected static CompositeByteBuf toLeakAwareBuffer(CompositeByteBuf buf) {
    ResourceLeakTracker<ByteBuf> leak;
    switch (ResourceLeakDetector.getLevel()) {
        case SIMPLE:
            leak = AbstractByteBuf.leakDetector.track(buf);
            if (leak != null) {
                buf = new SimpleLeakAwareCompositeByteBuf(buf, leak);
            }
            break;
        case ADVANCED:
        case PARANOID:
            leak = AbstractByteBuf.leakDetector.track(buf);
            if (leak != null) {
                buf = new AdvancedLeakAwareCompositeByteBuf(buf, leak);
            }
            break;
        default:
            break;
    }
    return buf;
}

有两个 toLeakAwareBuffer() 方法,一个带 “Composite” 的 组合 ByteBuf 类,另外一个不带 Composite 普通 ByteBuf 类。因为这个不同,所以前者创建的是 SimpleLeakAwareCompositeByteBuf / AdvancedLeakAwareCompositeByteBuf 对象,后者创建的是 SimpleLeakAwareByteBuf / AdvancedLeakAwareByteBuf 对象。两者逻辑一致

{ 禁用(DISABLED) – 完全禁止泄露检测,省点消耗。

简单(SIMPLE) – 默认等级,告诉我们取样的1%的ByteBuf是否发生了泄露,但总共一次只打印一次,看不到就没有了。

高级(ADVANCED) – 告诉我们取样的1%的ByteBuf发生泄露的地方。每种类型的泄漏(创建的地方与访问路径一致)只打印一次。对性能有影响。

偏执(PARANOID) – 跟高级选项类似,但此选项检测所有ByteBuf,而不仅仅是取样的那1%。对性能有绝大的影响 }

io.netty.buffer.AbstractReferenceCountedByteBuf ,实现引用计数的获取与增减的操作。构造方法:

/**
 * {@link #refCnt} 的更新器
 */
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");

/**
 * 引用计数
 */
private volatile int refCnt;

protected AbstractReferenceCountedByteBuf(int maxCapacity) {
    // 设置最大容量
    super(maxCapacity);
    // 初始 refCnt 为 1
    refCntUpdater.set(this, 1);
}

refCnt

@Override
public int refCnt() {
    return refCnt;
}

setRefCnt

/**
 * An unsafe operation intended for use by a subclass that sets the reference count of the buffer directly
 */
protected final void setRefCnt(int refCnt) {
    refCntUpdater.set(this, refCnt);
}

retain

@Override
public ByteBuf retain(int increment) {
    return retain0(checkPositive(increment, "increment"));
}

private ByteBuf retain0(final int increment) {
    // 增加
    int oldRef = refCntUpdater.getAndAdd(this, increment);
    // 原有 refCnt 就是 <= 0 ;或者,increment 为负数
    if (oldRef <= 0 || oldRef + increment < oldRef) {
        // Ensure we don't resurrect (which means the refCnt was 0) and also that we encountered an overflow.
        // 加回去,负负得正。
        refCntUpdater.getAndAdd(this, -increment);
        // 抛出 IllegalReferenceCountException 异常
        throw new IllegalReferenceCountException(oldRef, increment);
    }
    return this;
}

release

@Override
public boolean release() {
    return release0(1);
}

@Override
public boolean release(int decrement) {
    return release0(checkPositive(decrement, "decrement"));
}

@SuppressWarnings("Duplicates")
private boolean release0(int decrement) {
    // 减少
    int oldRef = refCntUpdater.getAndAdd(this, -decrement);
    // 原有 oldRef 等于减少的值
    if (oldRef == decrement) {
        // 释放
        deallocate();
        return true;
        // 减少的值得大于 原有 oldRef ,说明“越界”;或者,increment 为负数
    } else if (oldRef < decrement || oldRef - decrement > oldRef) {
        // Ensure we don't over-release, and avoid underflow.
        // 加回去,负负得正。
        refCntUpdater.getAndAdd(this, decrement);
        // 抛出 IllegalReferenceCountException 异常
        throw new IllegalReferenceCountException(oldRef, -decrement);
    }
    return false;
}

当释放完成,refCnt 等于 0 时,调用 deallocate() 方法,进行真正的释放。这是个抽象方法,需要子类去实现:

/**
 * Called once {@link #refCnt()} is equals 0.
 */
protected abstract void deallocate();

touch, AbstractReferenceCountedByteBuf 并未实现 touch(…) 方法。而是在 AdvancedLeakAwareByteBuf 中才实现

@Override
public ByteBuf touch() {
    return this;
}

@Override
public ByteBuf touch(Object hint) {
    return this;
}

io.netty.buffer.SimpleLeakAwareByteBuf ,继承 WrappedByteBuf 类,Simple 级别的 LeakAware ByteBuf 实现类, 构造方法:

/**
 * 关联的 ByteBuf 对象
 *
 * This object's is associated with the {@link ResourceLeakTracker}. When {@link ResourceLeakTracker#close(Object)}
 * is called this object will be used as the argument. It is also assumed that this object is used when
 * {@link ResourceLeakDetector#track(Object)} is called to create {@link #leak}.
 */
private final ByteBuf trackedByteBuf;
/**
 * ResourceLeakTracker 对象
 */
final ResourceLeakTracker<ByteBuf> leak;

SimpleLeakAwareByteBuf(ByteBuf wrapped, ByteBuf trackedByteBuf, ResourceLeakTracker<ByteBuf> leak) { // wrapped 和 trackedByteBuf 一般不同。
    super(wrapped);
    this.trackedByteBuf = ObjectUtil.checkNotNull(trackedByteBuf, "trackedByteBuf");
    this.leak = ObjectUtil.checkNotNull(leak, "leak");
}

SimpleLeakAwareByteBuf(ByteBuf wrapped, ResourceLeakTracker<ByteBuf> leak) { // wrapped 和 trackedByteBuf 相同。
    this(wrapped, wrapped, leak);
}

leak 属性,ResourceLeakTracker 对象。

trackedByteBuf 属性,真正关联 leak 的 ByteBuf 对象。

slice:

@Override
//调用父 slice(...) 方法,获得 slice ByteBuf 对象。
public ByteBuf slice() {
    return newSharedLeakAwareByteBuf(super.slice());
}

@Override
public ByteBuf slice(int index, int length) {
    return newSharedLeakAwareByteBuf(super.slice(index, length));
}

因为 slice ByteBuf 对象,并不是一个 LeakAware 的 ByteBuf 对象。所以调用 newSharedLeakAwareByteBuf(ByteBuf wrapped) 方法,装饰成 LeakAware 的 ByteBuf 对象:

private SimpleLeakAwareByteBuf newSharedLeakAwareByteBuf(ByteBuf wrapped) {
    return newLeakAwareByteBuf(wrapped, trackedByteBuf /** trackedByteBuf 代表的是原始的 ByteBuf 对象,它是跟 leak 真正进行关联的。而 wrapped 则不是。 **/, leak);
}

protected SimpleLeakAwareByteBuf newLeakAwareByteBuf(ByteBuf buf, ByteBuf trackedByteBuf, ResourceLeakTracker<ByteBuf> leakTracker) {
    return new SimpleLeakAwareByteBuf(buf, trackedByteBuf /** <1> **/, leakTracker);
}

在 SimpleLeakAwareByteBuf 中,还有如下方法:

和 slice(…) 方法是类似的,在调用完父对应的方法后,

再调用 newSharedLeakAwareByteBuf(ByteBuf wrapped) 方法,装饰成 LeakAware 的 ByteBuf 对象:

@Override
public ByteBuf duplicate() {
    return newSharedLeakAwareByteBuf(super.duplicate());
}

@Override
public ByteBuf readSlice(int length) {
    return newSharedLeakAwareByteBuf(super.readSlice(length));
}

@Override
public ByteBuf asReadOnly() {
    return newSharedLeakAwareByteBuf(super.asReadOnly());
}

@Override
public ByteBuf order(ByteOrder endianness) {
    if (order() == endianness) {
        return this;
    } else {
        return newSharedLeakAwareByteBuf(super.order(endianness));
    }
}

retainedSlice

@Override
public ByteBuf retainedSlice() {
    return unwrappedDerived(super.retainedSlice());
}

@Override
public ByteBuf retainedSlice(int index, int length) {
    return unwrappedDerived(super.retainedSlice(index, length));
}

调用父 retainedSlice(…) 方法,获得 slice ByteBuf 对象,引用计数加 1。

因为 slice ByteBuf 对象,并不是一个 LeakAware 的 ByteBuf 对象。所以调用 unwrappedDerived(ByteBuf wrapped) 方法,装饰成 LeakAware 的 ByteBuf 对象:

private ByteBuf unwrappedDerived(ByteBuf derived) {
    // We only need to unwrap SwappedByteBuf implementations as these will be the only ones that may end up in
    // the AbstractLeakAwareByteBuf implementations beside slices / duplicates and "real" buffers.
    ByteBuf unwrappedDerived = unwrapSwapped(derived);

    if (unwrappedDerived instanceof AbstractPooledDerivedByteBuf) {
        // Update the parent to point to this buffer so we correctly close the ResourceLeakTracker.
        ((AbstractPooledDerivedByteBuf) unwrappedDerived).parent(this);

        ResourceLeakTracker<ByteBuf> newLeak = AbstractByteBuf.leakDetector.track(derived);
        if (newLeak == null) {
            // No leak detection, just return the derived buffer.
            return derived;
        }
        return newLeakAwareByteBuf(derived, newLeak);
    }
    return newSharedLeakAwareByteBuf(derived);
}

@SuppressWarnings("deprecation")
private static ByteBuf unwrapSwapped(ByteBuf buf) {
    if (buf instanceof SwappedByteBuf) {
        do {
            buf = buf.unwrap();
        } while (buf instanceof SwappedByteBuf);

        return buf;
    }
    return buf;
}

private SimpleLeakAwareByteBuf newLeakAwareByteBuf(ByteBuf wrapped, ResourceLeakTracker<ByteBuf> leakTracker) {
    return newLeakAwareByteBuf(wrapped, wrapped, leakTracker);
}

在 SimpleLeakAwareByteBuf 中,还有如下方法,和 retainedSlice(…) 方法是类似的,在调用完父对应的方法后,再调用 unwrappedDerived(ByteBuf derived) 方法,装饰成 LeakAware 的 ByteBuf 对象。整理如下:

@Override
public ByteBuf retainedDuplicate() {
    return unwrappedDerived(super.retainedDuplicate());
}

@Override
public ByteBuf readRetainedSlice(int length) {
    return unwrappedDerived(super.readRetainedSlice(length));
}

release

@Override
public boolean release() {
    if (super.release()) { // 释放完成
        closeLeak();
        return true;
    }
    return false;
}

@Override
public boolean release(int decrement) {
    if (super.release(decrement)) { // 释放完成
        closeLeak();
        return true;
    }
    return false;
}

在调用父 release(…) 方法,释放完成后,会调用 closeLeak() 方法,关闭 ResourceLeakTracker:

private void closeLeak() {
    // Close the ResourceLeakTracker with the tracked ByteBuf as argument. This must be the same that was used when
    // calling DefaultResourceLeak.track(...).
    boolean closed = leak.close(trackedByteBuf);
    assert closed;
}

touch, SimpleLeakAwareByteBuf 也并未实现 touch(…) 方法。而是在 AdvancedLeakAwareByteBuf 中才实现.

@Override
public ByteBuf touch() {
    return this;
}

@Override
public ByteBuf touch(Object hint) {
    return this;
}

io.netty.buffer.AdvancedLeakAwareByteBuf ,继承 SimpleLeakAwareByteBuf 类,ADVANCED 和 PARANOID 级别的 LeakAware ByteBuf 实现类。构造方法:

AdvancedLeakAwareByteBuf(ByteBuf buf, ResourceLeakTracker<ByteBuf> leak) {
    super(buf, leak);
}

AdvancedLeakAwareByteBuf(ByteBuf wrapped, ByteBuf trackedByteBuf, ResourceLeakTracker<ByteBuf> leak) {
    super(wrapped, trackedByteBuf, leak);
}

retain,会调用 ResourceLeakTracer中record() 方法,记录信息。

@Override
public ByteBuf retain() {
    leak.record();
    return super.retain();
}

@Override
public ByteBuf retain(int increment) {
    leak.record();
    return super.retain(increment);
}

release,会调用 ResourceLeakTracer中record() 方法,记录信息

@Override
public boolean release() {
    leak.record();
    return super.release();
}

@Override
public boolean release(int decrement) {
    leak.record();
    return super.release(decrement);
}

touch,会调用 ResourceLeakTracer中record(…) 方法,记录信息

@Override
public ByteBuf touch() {
    leak.record();
    return this;
}

@Override
public ByteBuf touch(Object hint) {
    leak.record(hint);
    return this;
}

recordLeakNonRefCountingOperation(ResourceLeakTracker leak) 静态方法,除了引用计数操作相关( 即 retain(…)/release(…)/touch(…) 方法 )方法外,是否要调用记录信息。

private static final String PROP_ACQUIRE_AND_RELEASE_ONLY = "io.netty.leakDetection.acquireAndReleaseOnly";
/**
 * 默认为
 */
private static final boolean ACQUIRE_AND_RELEASE_ONLY;

static {
    ACQUIRE_AND_RELEASE_ONLY = SystemPropertyUtil.getBoolean(PROP_ACQUIRE_AND_RELEASE_ONLY, false);
}

static void recordLeakNonRefCountingOperation(ResourceLeakTracker<ByteBuf> leak) {
    if (!ACQUIRE_AND_RELEASE_ONLY) {
        leak.record();
    }
}

负负得正,所以会调用 ResourceLeakTracer中record(…) 方法,记录信息。

也就是说,ByteBuf 的所有方法,都会记录信息

io.netty.buffer.UnreleasableByteBuf ,继承 WrappedByteBuf 类,用于阻止他人对装饰的 ByteBuf 的销毁,避免被错误销毁掉。

它的实现方法主要是两大点:

  1. 引用计数操作相关( 即 retain(…)/release(…)/touch(…) 方法 )方法,不进行调用:
@Override
public ByteBuf retain(int increment) {
    return this;
}
@Override
public ByteBuf retain() {
    return this;
}

@Override
public ByteBuf touch() {
    return this;
}
@Override
public ByteBuf touch(Object hint) {
    return this;
}

@Override
public boolean release() {
    return false;
}
@Override
public boolean release(int decrement) {
    return false;
}
  1. 拷贝操作相关方法,都会在包一层 UnreleasableByteBuf 对象:
@Override
public ByteBuf slice() {
    return new UnreleasableByteBuf(buf.slice());
}

io.netty.util.ResourceLeakDetector ,内存泄露检测器。

ResourceLeakDetector 为了检测内存是否泄漏,使用了 WeakReference( 弱引用 )和 ReferenceQueue( 引用队列 ):

  • 根据检测级别和采样率的设置,在需要时为需要检测的 ByteBuf 创建WeakReference 引用。
  • 当 JVM 回收掉 ByteBuf 对象时,JVM 会将 WeakReference 放入ReferenceQueue 队列中。
  • 通过对 ReferenceQueue 中 WeakReference 的检查,判断在 GC 前是否有释放ByteBuf 的资源,就可以知道是否有资源释放

静态属性:

private static final String PROP_LEVEL_OLD = "io.netty.leakDetectionLevel";
private static final String PROP_LEVEL = "io.netty.leakDetection.level";
/**
 * 默认内存检测级别
 */
private static final Level DEFAULT_LEVEL = Level.SIMPLE;

private static final String PROP_TARGET_RECORDS = "io.netty.leakDetection.targetRecords";
private static final int DEFAULT_TARGET_RECORDS = 4;

/**
 * 每个 DefaultResourceLeak 记录的 Record 数量
 */
private static final int TARGET_RECORDS;

/**
 * 内存检测级别枚举
 * 
 * Represents the level of resource leak detection.
 */
public enum Level {
    /**
     * Disables resource leak detection.
     */
    DISABLED,
    /**
     * Enables simplistic sampling resource leak detection which reports there is a leak or not,
     * at the cost of small overhead (default).
     */
    SIMPLE,
    /**
     * Enables advanced sampling resource leak detection which reports where the leaked object was accessed
     * recently at the cost of high overhead.
     */
    ADVANCED,
    /**
     * Enables paranoid resource leak detection which reports where the leaked object was accessed recently,
     * at the cost of the highest possible overhead (for testing purposes only).
     */
    PARANOID;

    /**
     * Returns level based on string value. Accepts also string that represents ordinal number of enum.
     *
     * @param levelStr - level string : DISABLED, SIMPLE, ADVANCED, PARANOID. Ignores case.
     * @return corresponding level or SIMPLE level in case of no match.
     */
    static Level parseLevel(String levelStr) {
        String trimmedLevelStr = levelStr.trim();
        for (Level l : values()) {
            if (trimmedLevelStr.equalsIgnoreCase(l.name()) || trimmedLevelStr.equals(String.valueOf(l.ordinal()))) {
                return l;
            }
        }
        return DEFAULT_LEVEL;
    }
}

/**
 * 内存泄露检测等级
 */
private static Level level;

/**
 * 默认采集频率
 */
// There is a minor performance benefit in TLR if this is a power of 2.
static final int DEFAULT_SAMPLING_INTERVAL = 128;
 
  static {
      // 获得是否禁用泄露检测
      final boolean disabled;
      if (SystemPropertyUtil.get("io.netty.noResourceLeakDetection") != null) {
          disabled = SystemPropertyUtil.getBoolean("io.netty.noResourceLeakDetection", false);
          logger.debug("-Dio.netty.noResourceLeakDetection: {}", disabled);
          logger.warn("-Dio.netty.noResourceLeakDetection is deprecated. Use '-D{}={}' instead.", PROP_LEVEL, DEFAULT_LEVEL.name().toLowerCase());
      } else {
           disabled = false;
      }
  
      // 获得默认级别
      Level defaultLevel = disabled? Level.DISABLED : DEFAULT_LEVEL;
      // 获得配置的级别字符串,从老版本的配置
      // First read old property name (兼容老版本)
      String levelStr = SystemPropertyUtil.get(PROP_LEVEL_OLD, defaultLevel.name());
      // 获得配置的级别字符串,从新版本的配置
      // If new property name is present, use it
      levelStr = SystemPropertyUtil.get(PROP_LEVEL, levelStr);
      // 获得最终的级别
      Level level = Level.parseLevel(levelStr);
      // 设置最终的级别
      ResourceLeakDetector.level = level;
  
      // 初始化 TARGET_RECORDS
      TARGET_RECORDS = SystemPropertyUtil.getInt(PROP_TARGET_RECORDS, DEFAULT_TARGET_RECORDS);
  
      if (logger.isDebugEnabled()) {
          logger.debug("-D{}: {}", PROP_LEVEL, level.name().toLowerCase());
          logger.debug("-D{}: {}", PROP_TARGET_RECORDS, TARGET_RECORDS);
      }
  }

level 静态属性,内存泄露等级。上面提过四个等级DISABLED,SIMPLE,ADVANCED,PARANOID。

默认级别为 DEFAULT_LEVEL = Level.SIMPLE。

构造方法:

/**
 * DefaultResourceLeak 集合
 *
 * the collection of active resources
 */
private final ConcurrentMap<DefaultResourceLeak<?>, LeakEntry> allLeaks = PlatformDependent.newConcurrentHashMap();

/**
 * 引用队列
 */
private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();
/**
 * 已汇报的内存泄露的资源类型的集合
 */
private final ConcurrentMap<String, Boolean> reportedLeaks = PlatformDependent.newConcurrentHashMap();

/**
 * 资源类型
 */
private final String resourceType;
/**
 * 采集评率
 */
private final int samplingInterval;

public ResourceLeakDetector(Class<?> resourceType, int samplingInterval) {
    this(simpleClassName(resourceType) /** <1> **/, samplingInterval, Long.MAX_VALUE);
}

allLeaks 属性,DefaultResourceLeak 集合。因为 Java 没有自带的 ConcurrentSet ,所以只好使用使用 ConcurrentMap 。也就是说,value 属性实际没有任何用途。

refQueue 属性,就是引用队列( ReferenceQueue 队列 )。

reportedLeaks 属性,已汇报的内存泄露的资源类型的集合。

resourceType 属性,资源类型,使用资源类的类名简写。

samplingInterval 属性,采集频率。

在 AbstractByteBuf 类中,我们可以看到创建了所有 ByteBuf 对象统一使用的 ResourceLeakDetector 对象。

static final ResourceLeakDetector<ByteBuf> leakDetector = ResourceLeakDetectorFactory.instance().newResourceLeakDetector(ByteBuf.class);
//ResourceLeakDetector 的创建,通过 io.netty.util.ResourceLeakDetectorFactory ,基于工厂模式的方式来创建。

track(…) 方法,给指定资源( 例如 ByteBuf 对象 )创建一个检测它是否泄漏的 ResourceLeakTracker 对象:

 public final ResourceLeakTracker<T> track(T obj) {
     return track0(obj);
 }
 
 @SuppressWarnings("unchecked")
 private DefaultResourceLeak track0(T obj) {
     Level level = ResourceLeakDetector.level;
     // DISABLED 级别,不创建
     if (level == Level.DISABLED) {
         return null;
     }
 
     // SIMPLE 和 ADVANCED
     if (level.ordinal() < Level.PARANOID.ordinal()) {
         // 随机
         if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
             // 汇报内存是否泄漏
             reportLeak();
             // 创建 DefaultResourceLeak 对象
             return new DefaultResourceLeak(obj, refQueue, allLeaks);
         }
         return null;
     }
 
     // PARANOID 级别
     // 汇报内存是否泄漏
     reportLeak();
     // 创建 DefaultResourceLeak 对象
     return new DefaultResourceLeak(obj, refQueue, allLeaks);
 }

在每次一次创建 DefaultResourceLeak 对象时,调用 #reportLeak() 方法,汇报内存是否泄漏,reportLeak() 方法,检测是否有内存泄露。若有,则进行汇报:

 private void reportLeak() {
     // 如果不允许打印错误日志,则无法汇报,清理队列,并直接结束。因此调用 #clearRefQueue() 方法,清理队列,并直接结束。
     if (!logger.isErrorEnabled()) {
         // 清理队列
         clearRefQueue();
         return;
     }
 
     // 循环引用队列,直到为空
     // Detect and report previous leaks.
     for (;;) {
         @SuppressWarnings("unchecked")
         DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();
         if (ref == null) {
             break;
         }
 
         // 清理,并返回是否内存泄露
         if (!ref.dispose()) {
             continue;
         }
 
         // 获得 Record 日志
         String records = ref.toString();
         // 相同 Record 日志,只汇报一次
         if (reportedLeaks.putIfAbsent(records, Boolean.TRUE) == null) {
             if (records.isEmpty()) {
                 reportUntracedLeak(resourceType);
             } else {
                 reportTracedLeak(resourceType, records);
             }
         }
     }
 }
/**
 * This method is called when a traced leak is detected. It can be overridden for tracking how many times leaks
 * have been detected.
 */
protected void reportTracedLeak(String resourceType, String records) {
    logger.error(
            "LEAK: {}.release() was not called before it's garbage-collected. " +
            "See http://netty.io/wiki/reference-counted-objects.html for more information.{}",
            resourceType, records);
}

/**
 * This method is called when an untraced leak is detected. It can be overridden for tracking how many times leaks
 * have been detected.
 */
protected void reportUntracedLeak(String resourceType) {
    logger.error("LEAK: {}.release() was not called before it's garbage-collected. " +
            "Enable advanced leak reporting to find out where the leak occurred. " +
            "To enable advanced leak reporting, " +
            "specify the JVM option '-D{}={}' or call {}.setLevel() " +
            "See http://netty.io/wiki/reference-counted-objects.html for more information.",
            resourceType, PROP_LEVEL, Level.ADVANCED.name().toLowerCase(), simpleClassName(this));
}

clearRefQueue() 方法,清理队列:

private void clearRefQueue() {
    for (;;) {
        @SuppressWarnings("unchecked")
        DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();
        if (ref == null) {
            break;
        }
        // 清理,并返回是否内存泄露
        ref.dispose();
    }
}



版权声明:本文为weixin_43257196原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。