java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Netty分布式ByteBuf使用directArena分配缓冲区

Netty分布式ByteBuf使用directArena分配缓冲区过程解析

作者:向南是个万人迷

这篇文章主要介绍了Netty分布式ByteBuf使用directArena分配缓冲区过程解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

上一小节简单分析了PooledByteBufAllocator中, 线程局部缓存和arean的相关逻辑, 这一小节简单分析下directArena分配缓冲区的相关过程

directArena分配缓冲区

回到newDirectBuffer中

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;
    ByteBuf buf;
    if (directArena != null) { 
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        if (PlatformDependent.hasUnsafe()) {
            buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
        } else {
            buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
    }
    return toLeakAwareBuffer(buf);
}

获取了directArena对象之后, 通过allocate方法分配一个ByteBuf, 这里allocate方法是PoolArena类中的方法

跟到allocate方法中:

PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { 
    PooledByteBuf<T> buf = newByteBuf(maxCapacity); 
    allocate(cache, buf, reqCapacity);
    return buf;
}

首先通过newByteBuf获得一个ByteBuf对象

再通过allocate方法进行分配, 这里要注意, 这里进行分配的时候是线程私有的directArena进行分配

我们跟到newByteBuf方法中

因为是directArena调用的newByteBuf, 所以这里会进入DirectArena类的newByteBuf中:

protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) { 
    if (HAS_UNSAFE) { 
        return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
    } else {
        return PooledDirectByteBuf.newInstance(maxCapacity);
    }
}

因为默认通常是有unsafe对象的, 所以这里会走到这一步中PooledUnsafeDirectByteBuf.newInstance(maxCapacity)

通过静态方法newInstance创建一个PooledUnsafeDirectByteBuf对象

跟到newInstance方法中:

static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
    PooledUnsafeDirectByteBuf buf = RECYCLER.get();
    buf.reuse(maxCapacity);
    return buf;
}

这里通过RECYCLER.get()这种方式拿到一个ByteBuf对象, RECYCLER其实是一个对象回收站, 这部分内容会在后面的内容中详细剖析, 这里我们只需要知道, 这种方式能从回收站中拿到一个对象, 如果回收站里没有相关对象, 则创建一个新

因为这里有可能是从回收站中拿出的一个对象, 所以通过reuse进行复用

跟到reuse方法中

final void reuse(int maxCapacity) {
    maxCapacity(maxCapacity);
    setRefCnt(1);
    setIndex0(0, 0);
    discardMarks();
}

这里设置了的最大可扩容内存, 对象的引用数量, 读写指针位置都重置为0, 以及读写指针的位置标记也都重置为0

我们回到PoolArena的allocate方法中:

PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { 
    PooledByteBuf<T> buf = newByteBuf(maxCapacity); 
    allocate(cache, buf, reqCapacity);
    return buf;
}

拿到了ByteBuf对象, 就可以通过allocate(cache, buf, reqCapacity)方法进行内存分配了

跟到allocate方法中

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    //规格化
    final int normCapacity = normalizeCapacity(reqCapacity);
    if (isTinyOrSmall(normCapacity)) { 
        int tableIdx;
        PoolSubpage<T>[] table;
        //判断是不是tinty
        boolean tiny = isTiny(normCapacity);
        if (tiny) { // < 512
            //缓存分配
            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                return;
            }
            //通过tinyIdx拿到tableIdx
            tableIdx = tinyIdx(normCapacity);
            //subpage的数组
            table = tinySubpagePools;
        } else {
            if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                return;
            }
            tableIdx = smallIdx(normCapacity);
            table = smallSubpagePools;
        }
        //拿到对应的节点
        final PoolSubpage<T> head = table[tableIdx];

        synchronized (head) {
            final PoolSubpage<T> s = head.next;
            //默认情况下, head的next也是自身
            if (s != head) {
                assert s.doNotDestroy && s.elemSize == normCapacity;
                long handle = s.allocate();
                assert handle >= 0;
                s.chunk.initBufWithSubpage(buf, handle, reqCapacity);

                if (tiny) {
                    allocationsTiny.increment();
                } else {
                    allocationsSmall.increment();
                }
                return;
            }
        }
        allocateNormal(buf, reqCapacity, normCapacity);
        return;
    }
    if (normCapacity <= chunkSize) {
        //首先在缓存上进行内存分配
        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
            //分配成功, 返回
            return;
        }
        //分配不成功, 做实际的内存分配
        allocateNormal(buf, reqCapacity, normCapacity);
    } else {
        //大于这个值, 就不在缓存上分配
        allocateHuge(buf, reqCapacity);
    }
}

这里看起来逻辑比较长, 其实主要步骤分为两步

1.首先在缓存上进行分配

对应步骤是:

  cache.allocateTiny(this, buf, reqCapacity, normCapacity)

  cache.allocateSmall(this, buf, reqCapacity, normCapacity)

  cache.allocateNormal(this, buf, reqCapacity, normCapacity)

2.如果在缓存上分配不成功, 则实际分配一块内存

对应步骤是

  allocateNormal(buf, reqCapacity, normCapacity)

在这里对几种类型的内存进行介绍:

之前的小节我们介绍过, 缓冲区内存类型分为tiny, small, 和normal, 其实还有种不常见的类型叫做huge, 那么这几种类型的内存有什么区别呢, 实际上这几种类型是按照缓冲区初始化空间的范围进行区分的, 具体区分如下:

tiny类型对应的缓冲区范围为0-512B

small类型对应的缓冲区范围为512B-8K

normal类型对应的缓冲区范围为8K-16MB

huge类型对应缓冲区范围为大于16MB

简单介绍下有关范围的含义:

16MB对应一个chunk, netty是以chunk为单位向操作系统申请内存的

8k对应一个page, page是将chunk切分后的结果, 一个chunk对应2048个page

8k以下对应一个subpage, subpage是page的切分, 一个page可以切分多个subpage, 具体切分几个需要根据subpage的大小而定, 比如只要分配1k的缓冲区, 则会将page切分成8个subpage

以上就是directArena内存分配的大概流程和相关概念,更多关于Netty分布式ByteBuf directArena分配缓冲区的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文