Преимущество ByteBuf в Netty
Каковы недостатки ByteBuffer, используемого NIO
1: Динамическое расширение невозможно. Длина байтового буфера фиксирована и является начальным заданным значением. Его больше нельзя расширить. Когда записанное содержимое превышает емкость байтового буфера, возникает исключение за пределами границ сообщать.
2.: API сложен в использовании.Когда вы хотите прочитать данные, вам нужно вызвать метод buffer.flip() и перевести его в режим чтения.Если вы не будете осторожны, может произойти ошибка, и данные не быть прочитаны или считанные данные неверны
Преимущества ByteBuf и какие усовершенствования были сделаны
1: API более удобен в работе, его можно записывать или читать напрямую.
2: поддержка динамического расширения, когда записанные данные превышают емкость ByteBuf, они будут расширяться динамически и не будут сообщать об ошибке.
3: Предоставляет множество реализаций ByteBuf, которые можно использовать более гибко.
4: Обеспечивает эффективный механизм нулевого копирования
5: BYTEBUF может быть повторно используется памятью
Пример операции ByteBuf
Операции ByteBuf
В ==ByteBuf есть три важных свойства: == 1: емкость емкости, размер изначально указанного ByteBuf
2: позиция чтения readIndex, при последовательном чтении запишите значение индекса прочитанных данных
3: позиция записи writeIndex, при последовательной записи записывать значение индекса записанных данных
== Общие методы ByteBuf: == 1: getByte и setByte, получить данные по указанному индексу, который получается случайным образом и не изменит значения readIndex и writeIndex
2: read*, чтение последовательно, изменит значение readIndex
3: запись*, последовательная запись, изменит значение writeIndex
4: discardReadBytes, очистить прочитанное содержимое
5: очистить, очистить буфер
6: Операция поиска
7: Отметить и сбросить
8: Подсчет ссылок и освобождение
Простой демонстрационный пример
/**
* ByteBuf的使用示例
*/
public class ByteBufDemo {
public static void main(String[] args) {
//分配非池化,10个字节的ByteBuf
ByteBuf buf = Unpooled.buffer(10);
//看下ByteBuf
System.out.println("------------------------原始的ByteBuf-------------------------------");
System.out.println("ByteBuf参数:" + buf.toString());
System.out.println("ByteBuf中的内容:" + Arrays.toString(buf.array()) + "\n");
//写入内容到ByteBuf
byte[] bytes = {1, 2, 3, 4, 5};
buf.writeBytes(bytes);
System.out.println("------------------------写入内容后的ByteBuf-------------------------------");
System.out.println("ByteBuf参数:" + buf.toString());
System.out.println("ByteBuf中的内容:" + Arrays.toString(buf.array()) + "\n");
//从ByteBuf中读取内容
buf.readByte();
buf.readByte();
System.out.println("------------------------读取一些内容后的ByteBuf-------------------------------");
System.out.println("ByteBuf参数:" + buf.toString());
System.out.println("ByteBuf中的内容:" + Arrays.toString(buf.array()) + "\n");
//清除读过的内容
//把读过的数据清除后,readIndex变为0,writeIndex变为3
//后面尚未读取的内容,会复制到前面去,把原来的值覆盖掉
//再次写入时,3,4,5后面的4,5会被覆盖掉
buf.discardReadBytes();
System.out.println("------------------------清除读过的数据后的ByteBuf-------------------------------");
System.out.println("ByteBuf参数:" + buf.toString());
System.out.println("ByteBuf中的内容:" + Arrays.toString(buf.array()) + "\n");
//再次写入内容到ByteBuf
byte[] bytesO = {6};
buf.writeBytes(bytesO);
System.out.println("------------------------再次写入内容后的ByteBuf-------------------------------");
System.out.println("ByteBuf参数:" + buf.toString());
System.out.println("ByteBuf中的内容:" + Arrays.toString(buf.array()) + "\n");
//清空读和写的索引值
//readIndex和writeIndex会重置为0,ByteBuf中的内容并不会重置
buf.clear();
System.out.println("------------------------清空读和写的索引值后的ByteBuf-------------------------------");
System.out.println("ByteBuf参数:" + buf.toString());
System.out.println("ByteBuf中的内容:" + Arrays.toString(buf.array()) + "\n");
//再次写入内容到ByteBuf
byte[] bytes2 = {1, 2, 3};
buf.writeBytes(bytes2);
System.out.println("------------------------再次写入内容后的ByteBuf-------------------------------");
System.out.println("ByteBuf参数:" + buf.toString());
System.out.println("ByteBuf中的内容:" + Arrays.toString(buf.array()) + "\n");
//清空ByteBuf的内容
//不会重置readIndex和writeIndex
buf.setZero(0, buf.capacity());
System.out.println("------------------------清空ByteBuf的内容后的ByteBuf-------------------------------");
System.out.println("ByteBuf参数:" + buf.toString());
System.out.println("ByteBuf中的内容:" + Arrays.toString(buf.array()) + "\n");
//再次写入超出指定容量的数据到ByteBuf
//会进行扩容
byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
buf.writeBytes(bytes3);
System.out.println("------------------------再次写入超出指定容量的数据后的ByteBuf-------------------------------");
System.out.println("ByteBuf参数:" + buf.toString());
System.out.println("ByteBuf中的内容:" + Arrays.toString(buf.array()) + "\n");
}
}
Выходной результат:В приведенном выше примере используется ByteBuf в куче Давайте посмотрим на ByteBuf вне кучи:
//分配非池化,10个字节的directBuffer
ByteBuf buf = Unpooled.directBuffer(10);
//看下ByteBuf
System.out.println("------------------------原始的ByteBuf-------------------------------");
System.out.println("ByteBuf参数:" + buf.toString());
directBuffer не может использовать метод массива, иначе будет сообщено об ошибке: java.lang.UnsupportedOperationException: direct buffer, а использование ByteBuf выделяется своим базовым распределителем, а не новым, который будет подробно описан ниже.На картинке выше видно, что readIndex и writeIndex делят буфер на три части, readIndex будет меньше или равен writeIndex, это должно быть легко понять, я еще не писал там, вы можете прочитать, что Вы можете прочитать Шерстяная ткань.
Память в куче и вне кучи
Socket — это API-интерфейс сетевого взаимодействия, предоставляемый нижним уровнем операционной системы приложению верхнего уровня.Когда данные для чтения или записи находятся в куче JVM, необходимо скопировать копию данных для чтения в куча JVM в операционную систему. , а затем сокет читает ее, а преимущество прямой памяти в том, что сокет можно читать напрямую, избавляя от необходимости копирования.
Динамическое расширение ByteBuf
Давайте возьмем в качестве примера ByteBuf в куче, чтобы просмотреть исходный код и проанализировать динамическое расширение ByteBuf: Динамическое расширение нужно делать, когда емкости ByteBuf не хватает при записи данных, поэтому необходимо отслеживать следующий код:
buf.writeBytes(bytes);
Отслеживая вышеприведенный writeBytes, сначала войдите в абстрактный класс ByteBuf и введите следующий абстрактный метод:
public abstract ByteBuf writeBytes(byte[] src);
Его класс реализации выглядит следующим образом:Введите метод первого AbstractByteBuf:
@Override
public ByteBuf writeBytes(byte[] src) {
writeBytes(src, 0, src.length);
return this;
}
Следующий метод снова вызывается:
@Override
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
//检查是否可以写入
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
//把当前的写入位置加上写入数据的长度
writerIndex += length;
return this;
}
src данные для записи, длина — это длина записываемых данных Затем метод входит в параметр sureWritable, переданные параметры: длина записываемых данных
@Override
public ByteBuf ensureWritable(int minWritableBytes) {
//参数校验
checkPositiveOrZero(minWritableBytes, "minWritableBytes");
//检查容量是否可以写入这么多数据
ensureWritable0(minWritableBytes);
return this;
}
//检查参数是否小于0
public static int checkPositiveOrZero(int i, String name) {
if (i < 0) {
throw new IllegalArgumentException(name + ": " + i + " (expected: >= 0)");
}
return i;
}
После того, как проверка параметра будет завершена, он войдет в метод sureWritable0:
final void ensureWritable0(int minWritableBytes) {
//确保缓冲区可以访问
ensureAccessible();
//如果写入的数据长度小于等于剩余可写数据的容量,就直接返回
//就是说,容量足够写入,不需要扩容
if (minWritableBytes <= writableBytes()) {
return;
}
if (checkBounds) {
//maxCapacity是int的最大值
//检查写入的数据长度是否比可以写入的最大容量还要大
//是的话就抛异常
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
}
//正式的扩容方法
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
//把扩容后的新容量设置进去
capacity(newCapacity);
}
Введите метод расширения класса AbstractByteBufAllocator:
//常量 4M
static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
//校验参数
checkPositiveOrZero(minNewCapacity, "minNewCapacity");
//minNewCapacity = writerIndex + minWritableBytes
//已经写入的数据索引加上当前写入的数据长度,就是需要的最小的容量
//判断是否比最大容量还大,是的话就抛异常
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
//如果需要的最小容量等于4M,就直接返回4M,作为扩容后的容量
if (minNewCapacity == threshold) {
return threshold;
}
//如果需要的最小容量大于4M,就按照下面的扩容方式扩容
if (minNewCapacity > threshold) {
//newCapacity = 15 / 4194304 * 4194304
int newCapacity = minNewCapacity / threshold * threshold;
//如果计算出的容量大于最大容量减去4M,就把最大容量赋值给新的容量
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
//如果需要的最小容量小于4M,就按照下面的方式扩容
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
Посмотрите еще раз на метод емкости: Далее следует использовать метод arraycopy, чтобы поместить расширенную емкость в ByteBuf.
@Override
public ByteBuf capacity(int newCapacity) {
checkNewCapacity(newCapacity);
int oldCapacity = array.length;
byte[] oldArray = array;
if (newCapacity > oldCapacity) {
byte[] newArray = allocateArray(newCapacity);
System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
setArray(newArray);
freeArray(oldArray);
} else if (newCapacity < oldCapacity) {
byte[] newArray = allocateArray(newCapacity);
int readerIndex = readerIndex();
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
System.arraycopy(oldArray, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
freeArray(oldArray);
}
return this;
}
Вот отслеженные шаги кода:==Обзор механизма динамического расширения:== 1: Когда вызывается метод write*, он будет проверен методом sureWritable0. 2: метод calculateNewCapacity — это метод, используемый для расчета емкости
==Метод расчета расширения:== 1: Требуемая емкость не превышает 4M, и она начнет расширяться с 64 байт, удваивая каждый раз, пока рассчитанная емкость не будет соответствовать требуемой минимальной емкости.Если текущий размер 256, было записано 200 байт, и снова для записи 60 байт, минимально необходимая емкость составляет 260 байт, тогда расширенная емкость составляет 64 * 2 * 2 * 2 = 512. 2: Если требуемая емкость превышает 4M, метод расчета расширения следующий: новая емкость = минимальное требование новой емкости / 4M * 4M + 4M, если текущий размер равен 3M, записывается 2M, а затем записывается 3M, минимум требуемая емкость составляет 5 м, тогда расширенная емкость составляет 5/4 * 4 + 4 = 8 м
Рис. 1. Требуемая емкость менее 4 МБ:Рис. 2. Требуемая емкость превышает 4 МБ:
Какие есть реализации ByteBuf
ByteBuf имеет 8 реализаций из 3-х измерений:
Диаграмма классов ByteBuf
//堆内
ByteBuf buf = Unpooled.buffer(10);
//堆外
ByteBuf buf = Unpooled.directBuffer(10);
ByteBuf предоставляет класс Unpooled без пула, который можно использовать напрямую. Он не предоставляет класс Pooled. Давайте отследим исходный код, чтобы увидеть, как распределяется ByteBuf:
Метод выделения Unpooled.buffer
Сначала войдите в класс Unpooled:
private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT;
//使用默认的分配器分配堆内buffer
public static ByteBuf buffer(int initialCapacity) {
return ALLOC.heapBuffer(initialCapacity);
}
В класс интерфейса ByteBufAllocator войдет следующее:
//分配一个指定容量的堆内buf
ByteBuf heapBuffer(int initialCapacity);
Затем введите абстрактный класс AbstractByteBufAllocator:
//如果没有指定初始容量,默认的初始容量大小是256
static final int DEFAULT_INITIAL_CAPACITY = 256;
//最大容量,为int的最大值
static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;
@Override
public ByteBuf heapBuffer(int initialCapacity) {
return heapBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
}
@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
//如果初始化的容量是0,最大的容量也是0,就返回一个空的Buf
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newHeapBuffer(initialCapacity, maxCapacity);
}
//校验参数
private static void validate(int initialCapacity, int maxCapacity) {
//检查参数
checkPositiveOrZero(initialCapacity, "initialCapacity");
//如果初始化的容量大于最大容量,就抛异常
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity: %d (expected: not greater than maxCapacity(%d)",
initialCapacity, maxCapacity));
}
}
Затем есть абстрактный метод newHeapBuffer:
protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);
Поскольку инициализация здесь не в пуле, она войдет в класс UnpooledByteBufAllocator:
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
//PlatformDependent.hasUnsafe()是检查当前操作系统是否支持unsafe操作
//根据支持与否,进入不同的类
return PlatformDependent.hasUnsafe() ?
new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
Записи, поддерживающие небезопасную работу, следующие:
InstrumentedUnpooledUnsafeHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
Запись Unsafe следующая:
InstrumentedUnpooledHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
Теперь перейдите к поддержке небезопасных операций и введите класс UnpooledUnsafeHeapByteBuf:
UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
Родительский класс UnpooledHeapByteBuf вызывается снова:
//分配器
private final ByteBufAllocator alloc;
//byte数组,ByteBuf数据底层就是使用这个存储
byte[] array;
public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
//检查分配器是否为空
checkNotNull(alloc, "alloc");
//如果初始化的容量大于最大容量,就抛异常
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
//设置当前的数组是分配之后的数组
setArray(allocateArray(initialCapacity));
//初始化readIndex和writeIndex
setIndex(0, 0);
}
//分配数组
protected byte[] allocateArray(int initialCapacity) {
//返回一个具有initialCapacity容量大小的byte数组
return new byte[initialCapacity];
}
//set数组
private void setArray(byte[] initialArray) {
array = initialArray;
tmpNioBuf = null;
}
Метод setIndex в классе AbstractByteBuf:
//初始化readerIndex和writerIndex
@Override
public ByteBuf setIndex(int readerIndex, int writerIndex) {
if (checkBounds) {
checkIndexBounds(readerIndex, writerIndex, capacity());
}
setIndex0(readerIndex, writerIndex);
return this;
}
final void setIndex0(int readerIndex, int writerIndex) {
this.readerIndex = readerIndex;
this.writerIndex = writerIndex;
}
После перехода к AbstractByteBuf выше был выделен не объединенный в пул ByteBuf в куче.Ниже приведен отслеживаемый код:== Резюме: == Как видите, выделен не объединенный в пул ByteBuf в куче, а его нижний слой представляет собой массив байтов.
Метод выделения Unpooled.directBuffer
Первая запись также является классом Unpooled:
public static ByteBuf directBuffer(int initialCapacity) {
return ALLOC.directBuffer(initialCapacity);
}
Затем введите абстрактный класс ByteBufAllocator:
ByteBuf directBuffer(int initialCapacity);
Затем в класс AbstractByteBufAllocator:
@Override
public ByteBuf directBuffer(int initialCapacity) {
return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
}
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
//如果初始化的容量和最大容量都是0,就返回一个空的Buf
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
//校验参数
validate(initialCapacity, maxCapacity);
return newDirectBuffer(initialCapacity, maxCapacity);
}
protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);
Так как выделение тоже не пуловое, newDirectBuffer войдет в класс реализации в классе UnpooledByteBufAllocator:
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
final ByteBuf buf;
//同样的,会判断是否支持unsafe操作
if (PlatformDependent.hasUnsafe()) {
buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
Взяв за пример InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf, последние два на самом деле мало чем отличаются.Войдите в конструктор класса UnpooledUnsafeNoCleanerDirectByteBuf:
UnpooledUnsafeNoCleanerDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
Родительский класс UnpooledUnsafeDirectByteBuf снова вызывается:
ByteBuffer buffer;
public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
if (alloc == null) {
throw new NullPointerException("alloc");
}
//校验参数
checkPositiveOrZero(initialCapacity, "initialCapacity");
checkPositiveOrZero(maxCapacity, "maxCapacity");
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
setByteBuffer(allocateDirect(initialCapacity), false);
}
//分配的是一个NIO中的ByteBuffer
protected ByteBuffer allocateDirect(int initialCapacity) {
return ByteBuffer.allocateDirect(initialCapacity);
}
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
if (tryFree) {
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
}
}
}
this.buffer = buffer;
memoryAddress = PlatformDependent.directBufferAddress(buffer);
tmpNioBuf = null;
capacity = buffer.remaining();
}
allocateDirect под классом ByteBuffer:
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}
Диаграмма трассировки кода:== Резюме: == Выделите ByteBuf без пула, вне кучи, вы можете видеть, что нижний уровень реализован DirectByteBuffer NIO.
Класс FIG ByteBufAllocator
Мультиплексирование памяти ByteBuf
Выделить объединенную память
Я знаю, как выделить память без пула в соответствии с исходным кодом выше, так как же выделить память из пула? См. иллюстрацию ниже:Вышеупомянутый шаг — выделение пула памяти, а затем она будет проанализирована в соответствии с исходным кодом.
буферный пул памяти
механизм выделения памяти jemalloc 1: В пуле памяти есть три основные области, а именно: крошечная, маленькая, нормальная. 2: Каждая область разделена на сетки разного размера, и каждая сетка может кэшировать только блоки памяти соответствующего размера. 3: Максимально поддерживаемая память сетки составляет 32 КБ. Все, что больше этого размера, не может быть кэшировано и может быть только освобождено. 4: Каждый тип сетки имеет соответствующий номер: крошечная: 512, маленькая: 256, нормальная: 64. Например, в крошечной области есть 512 сеток каждого размера. Если они заполнены, они не будут переработаны. память будет освобождена
Восстановить пул памяти
Процесс выделения пула памяти
Приведенный выше анализ выделения памяти без пула, давайте посмотрим, как выделить память из пула:
ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
//分配的内存最大长度为496
ByteBuf buf1 = allocator.ioBuffer(495);
System.out.printf("buf1: 0x%X%n", buf1.memoryAddress());
//此时会被回收到tiny的512b格子中
buf1.release();
//从tiny的512b格子去取
ByteBuf buf2 = allocator.ioBuffer(495);
System.out.printf("buf2: 0x%X%n", buf2.memoryAddress());
buf2.release();
Сначала взгляните на класс ByteBufAllocator:
//默认ByteBuf分配器,在ByteBufUtil中初始化
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
Отслеживая первый код allocator.ioBuffer(495), сначала введите класс AbstractByteBufAllocator:
@Override
public ByteBuf ioBuffer(int initialCapacity) {
//如果支持Unsafe,就分配堆外内存
if (PlatformDependent.hasUnsafe()) {
return directBuffer(initialCapacity);
}
//不支持Unsafe,就分配堆内内存
return heapBuffer(initialCapacity);
}
Затем вызовите метод directBuffer ниже класса:
@Override
public ByteBuf directBuffer(int initialCapacity) {
return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
}
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
//如果初始化的容量和最大容量等于0,就返回一个空的ByteBuf
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newDirectBuffer(initialCapacity, maxCapacity);
}
//校验参数
private static void validate(int initialCapacity, int maxCapacity) {
checkPositiveOrZero(initialCapacity, "initialCapacity");
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity: %d (expected: not greater than maxCapacity(%d)",
initialCapacity, maxCapacity));
}
}
Затем он войдет в класс PooledByteBufAllocator аллокатора пула ByteBuf, который может реализовать повторное использование памяти:
// cache sizes 缓存默认值
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
//从当前线程中获取cache对象
PoolThreadCache cache = threadCache.get();
//从cache中获取Arena
//Arena可以理解为一个netty提供的实际进行buf分配和管理的工具
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
//如果有directArena就分配池化内存
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else { //如果没有directArena,就使用非池化Unpooled
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
После повторного входа в трек Poolarena категории: Вы можете увидеть ниже, есть три типа крошечных, маленьких, нормальных
enum SizeClass {
Tiny,
Small,
Normal
}
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
//获取一个ByteBuf对象
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
//分配内存
allocate(cache, buf, reqCapacity);
return buf;
}
@Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
//如果支持Unsafe,就初始化一个PooledUnsafeDirectByteBuf
if (HAS_UNSAFE) {
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else { //不支持Unsafe,就初始化一个PooledDirectByteBuf
return PooledDirectByteBuf.newInstance(maxCapacity);
}
}
Введите класс PooledUnsafeDirectByteBuf ниже: Получить баф из стека рециркуляции потока, если стека нет, то создаст новый, если есть, то вернет баф в стек
//调用RECYCLER.get()时,线程栈中没有可以复用的时,会调用newObject方法,此时创建出来的buf是空的
private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
@Override
protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
return new PooledUnsafeDirectByteBuf(handle, 0);
}
};
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
//RECYCLER,回收机制
PooledUnsafeDirectByteBuf buf = RECYCLER.get();
//取出来的可能是之前的buf,使用之前清理一下
buf.reuse(maxCapacity);
return buf;
}
Затем снова вернитесь к методу allocate в классе PoolArena, чтобы выделить память:
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
//将需要的内存大小计算为2^n
final int normCapacity = normalizeCapacity(reqCapacity);
//需要分配的内存是否是tiny或者small类型
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512 //分配一个tiny内存
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
//分配一块新的内存
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);
return;
}
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
Метод allocateTiny в классе PoolThreadCache:
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
//从cache中获取buf
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity);
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
Получите соответствующую сетку в соответствии с требуемой емкостью и перейдите к методу tinyIdx в классе PoolArena:
static int tinyIdx(int normCapacity) {
return normCapacity >>> 4;
}
Метод allocate класса PoolThreadCache выделяет память сетки кэша для buf.
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
// no cache found so just return false here
return false;
}
boolean allocated = cache.allocate(buf, reqCapacity);
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();
}
return allocated;
}
Вот шаги на фигуру конкретный код отслеживания:Вышеуказанный исходный код является примером типов крошечного типа. Другие два типа похожи. Когда новая память создана для первого распределения, а затем успешно восстановлена в пуле буфера памяти, память соответствующего размера снова выделяется, и буфер памяти будет напрямую выделен из памяти. Возьми его из бассейна и не выделит новую память снова
Процесс восстановления памяти
Затем проследите метод release(), чтобы увидеть процесс утилизации памяти.
buf1.release();
Вход в класс AbstractReferenceCountedByteBuf в первый раз: Счетчик ссылок Buf, используемый для повторного использования памяти, имеет счетчик refCnt, счетчик keep() увеличивается на единицу, а счетчик release() уменьшается на единицу. До тех пор, пока счетчик не станет равным 0, для освобождения вызывается функция освобождения (deallocate()), а метод освобождения (deallocate()) реализуется самим конкретным buf.
@Override
public boolean release() {
return release0(1);
}
private boolean release0(int decrement) {
int rawCnt = nonVolatileRawCnt(), realCnt = toLiveRealCnt(rawCnt, decrement);
//判断当前buf有没有被引用了,没有的话就调用deallocate
if (decrement == realCnt) {
if (refCntUpdater.compareAndSet(this, rawCnt, 1)) {
deallocate();
return true;
}
return retryRelease0(decrement);
}
return releaseNonFinal0(decrement, rawCnt, realCnt);
}
Введите класс PooledByteBuf:
@Override
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
//表示当前的buf不在使用任何一块内存区域
this.handle = -1;
//设置memory为null
memory = null;
//释放buf的内存
chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
tmpNioBuf = null;
chunk = null;
//把buf对象放入对象回收栈
recycle();
}
}
Снова войдите в класс PoolArena:
void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
//判断是否是unpooled
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
//判断是哪种类型,tiny、small、normal
SizeClass sizeClass = sizeClass(normCapacity);
//放入缓存
if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
freeChunk(chunk, handle, sizeClass, nioBuffer);
}
}
//计算内存区域是哪种类型
private SizeClass sizeClass(int normCapacity) {
if (!isTinyOrSmall(normCapacity)) {
return SizeClass.Normal;
}
return isTiny(normCapacity) ? SizeClass.Tiny : SizeClass.Small;
}
Затем к классу PoolThreadCache:
boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
long handle, int normCapacity, SizeClass sizeClass) {
MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
if (cache == null) {
return false;
}
//加入到缓存队列
return cache.add(chunk, nioBuffer, handle);
}
private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
//判断是哪种类型,然后把内存回收到哪一块
switch (sizeClass) {
case Normal:
return cacheForNormal(area, normCapacity);
case Small:
return cacheForSmall(area, normCapacity);
case Tiny:
return cacheForTiny(area, normCapacity);
default:
throw new Error();
}
}
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity);
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
Приведенная выше схема шагов кода отслеживания:
Механизм нулевого копирования ByteBuf
Механизм нулевого копирования Netty является реализацией прикладного уровня, который мало связан с лежащими в основе JVM и механизмами памяти операционной системы.
несколько примеров
==1: CompositeByteBuf, который объединяет несколько ByteBuf в один логический ByteBuf, избегая копирования между каждым ByteBuf==
public static void test1() {
ByteBuf buf1 = Unpooled.buffer(4);
ByteBuf buf2 = Unpooled.buffer(3);
byte[] bytes1 = {1,2};
byte[] bytes2 = {3,4,5};
buf1.writeBytes(bytes1);
buf2.writeBytes(bytes2);
CompositeByteBuf byteBuf = Unpooled.compositeBuffer();
byteBuf = byteBuf.addComponents(true, buf1, buf2);
System.out.println("byteBuf: " + byteBuf.toString());
}
Приведенные выше результаты вывода: ridx — это позиция чтения последовательного чтения, widx — позиция записи последовательной записи, cap — емкость нового ByteBuf, компоненты означают, что новый ByteBuf состоит из нескольких ByteBuf.== 2: WROPTEMBUFTER () Метод, обертывает массив BYTE [] в объект BYTEBUF ==
public static void test2() {
byte[] bytes = {1,2,3,4,5};
ByteBuf buf = Unpooled.wrappedBuffer(bytes);
System.out.println("buf:" + buf.toString());
}
В выходном результате: ridx — это позиция чтения последовательного чтения, widx — позиция записи последовательной записи, cap — емкость ByteBuf, новый ByteBuf хранит ссылочный адрес массива, а исходный массив фактически эксплуатировался.==3: метод slice(), который делит объект ByteBuf на несколько объектов ByteBuf==
public static void test3() {
ByteBuf buf = Unpooled.wrappedBuffer("hello".getBytes());
ByteBuf byteBuf = buf.slice(1,2);
System.out.println("byteBuf:" + byteBuf.toString());
}
В результате вы можете видеть, что есть два ByteBuf, один из которых является исходным, новый ByteBuf хранит ссылочный адрес исходного ByteBuf, а другой является ссылочным адресом разделенного ByteBuf.