предисловие
Являясь превосходным промежуточным программным обеспечением распределенного обмена сообщениями, RocketMQ может предоставить деловым сторонам стабильные и надежные службы обмена сообщениями с высокой производительностью и низкой задержкой. Его основными преимуществами являются надежное хранилище потребления, высокая производительность и низкая задержка для отправки сообщений, мощное накопление сообщений и возможности обработки сообщений.
С точки зрения методов хранения в основном есть несколько аспектов:
- Файловая система
- Распределенное хранилище KV
- Реляционная база данных
С точки зрения эффективности файловые системы выше хранилища KV, а хранилище KV выше реляционных баз данных. Поскольку непосредственная работа с файловой системой, безусловно, является самой быстрой, промежуточное программное обеспечение для очереди сообщений в отрасли, такое какRocketMQ 、RabbitMQ 、kafka
Оба используют файловую систему для хранения сообщений.
Сегодня мы начнем с его файла хранилища, чтобы изучить механизм хранения сообщений RocketMQ.
1. Журнал фиксации
CommitLog
, файл хранилища сообщений, сообщения для всех тем хранятся вCommitLog
в файле.
Наши бизнес-системыRocketMQ
Отправьте сообщение, независимо от того, насколько сложен процесс в середине, в конечном итоге это сообщение будет сохранено дляCommitLog
документ.
Мы знаем, чтоBroker服务器
только одинCommitLog
файл(группа),RocketMQ
Сообщения всех тем будут храниться в одном файле, и каждое сообщение будет храниться в этом файле, и каждое сообщение будет записываться последовательно.
Может быть, когда-нибудь ты захочешь увидеть этоCommitLog
Как в файле выглядит сохраненный контент?
1. Отправка сообщения
Конечно, нам нужно идти вCommitLog
Некоторый контент записывается в файл, поэтому сначала рассмотрим пример отправки сообщения.
public static void main(String[] args) throws Exception {
MQProducer producer = getProducer();
for (int i = 0;i<10;i++){
Message message = new Message();
message.setTopic("topic"+i);
message.setBody(("清幽之地的博客").getBytes());
SendResult sendResult = producer.send(message);
}
producer.shutdown();
}
Отправляем сообщения в 10 разных тем, если только в однуBroker
машины, они будут сохранены в том жеCommitLog
в файле. На данный момент расположение этого файлаC:/Users/shiqizhen/store/commitlog/00000000000000000000
.
2. Прочитайте содержимое файла
Мы не можем открыть этот файл напрямую, потому что это бинарный файл, поэтому нам нужно прочитать его байтовый массив через программу.
public static ByteBuffer read(String path)throws Exception{
File file = new File(path);
FileInputStream fin = new FileInputStream(file);
byte[] bytes = new byte[(int)file.length()];
fin.read(bytes);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
return buffer;
}
Как показано в приведенном выше коде, вы можете прочитать все содержимое файла, указав путь к файлу. Чтобы облегчить следующую операцию, мы преобразуем прочитанный массив байтов вjava.nio.ByteBuffer
объект.
3. Анализ
Перед разбором нам нужно выяснить две вещи:
- Формат сообщения, то есть, какие поля содержит сообщение;
- Размер в байтах, занимаемый каждым полем.
На изображении выше мы видели формат сообщения, которое содержит 19 полей. Что касается размера байта, некоторые 4 байта, некоторые 8 байт, мы не будем вдаваться в подробности по порядку, просто посмотрим непосредственно на код.
/**
* commitlog 文件解析
* @param byteBuffer
* @return
* @throws Exception
*/
public static MessageExt decodeCommitLog(ByteBuffer byteBuffer)throws Exception {
MessageExt msgExt = new MessageExt();
// 1 TOTALSIZE
int storeSize = byteBuffer.getInt();
msgExt.setStoreSize(storeSize);
if (storeSize<=0){
return null;
}
// 2 MAGICCODE
byteBuffer.getInt();
// 3 BODYCRC
int bodyCRC = byteBuffer.getInt();
msgExt.setBodyCRC(bodyCRC);
// 4 QUEUEID
int queueId = byteBuffer.getInt();
msgExt.setQueueId(queueId);
// 5 FLAG
int flag = byteBuffer.getInt();
msgExt.setFlag(flag);
// 6 QUEUEOFFSET
long queueOffset = byteBuffer.getLong();
msgExt.setQueueOffset(queueOffset);
// 7 PHYSICALOFFSET
long physicOffset = byteBuffer.getLong();
msgExt.setCommitLogOffset(physicOffset);
// 8 SYSFLAG
int sysFlag = byteBuffer.getInt();
msgExt.setSysFlag(sysFlag);
// 9 BORNTIMESTAMP
long bornTimeStamp = byteBuffer.getLong();
msgExt.setBornTimestamp(bornTimeStamp);
// 10 BORNHOST
int bornhostIPLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 : 16;
byte[] bornHost = new byte[bornhostIPLength];
byteBuffer.get(bornHost, 0, bornhostIPLength);
int port = byteBuffer.getInt();
msgExt.setBornHost(new InetSocketAddress(InetAddress.getByAddress(bornHost), port));
// 11 STORETIMESTAMP
long storeTimestamp = byteBuffer.getLong();
msgExt.setStoreTimestamp(storeTimestamp);
// 12 STOREHOST
int storehostIPLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 : 16;
byte[] storeHost = new byte[storehostIPLength];
byteBuffer.get(storeHost, 0, storehostIPLength);
port = byteBuffer.getInt();
msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByAddress(storeHost), port));
// 13 RECONSUMETIMES
int reconsumeTimes = byteBuffer.getInt();
msgExt.setReconsumeTimes(reconsumeTimes);
// 14 Prepared Transaction Offset
long preparedTransactionOffset = byteBuffer.getLong();
msgExt.setPreparedTransactionOffset(preparedTransactionOffset);
// 15 BODY
int bodyLen = byteBuffer.getInt();
if (bodyLen > 0) {
byte[] body = new byte[bodyLen];
byteBuffer.get(body);
msgExt.setBody(body);
}
// 16 TOPIC
byte topicLen = byteBuffer.get();
byte[] topic = new byte[(int) topicLen];
byteBuffer.get(topic);
msgExt.setTopic(new String(topic, CHARSET_UTF8));
// 17 properties
short propertiesLength = byteBuffer.getShort();
if (propertiesLength > 0) {
byte[] properties = new byte[propertiesLength];
byteBuffer.get(properties);
String propertiesString = new String(properties, CHARSET_UTF8);
Map<String, String> map = string2messageProperties(propertiesString);
}
int msgIDLength = storehostIPLength + 4 + 8;
ByteBuffer byteBufferMsgId = ByteBuffer.allocate(msgIDLength);
String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset());
msgExt.setMsgId(msgId);
return msgExt;
}
4. Вывод содержимого сообщения
public static void main(String[] args) throws Exception {
String filePath = "C:\\Users\\shiqizhen\\store\\commitlog\\00000000000000000000";
ByteBuffer buffer = read(filePath);
List<MessageExt> messageList = new ArrayList<>();
while (true){
MessageExt message = decodeCommitLog(buffer);
if (message==null){
break;
}
messageList.add(message);
}
for (MessageExt ms:messageList) {
System.out.println("主题:"+ms.getTopic()+" 消息:"+
new String(ms.getBody())+"队列ID:"+ms.getQueueId()+" 存储地址:"+ms.getStoreHost());
}
}
Запустив этот код, мы можем увидеть непосредственноCommitLog
Содержимое файла:
主题:topic0 消息:清幽之地的博客 队列ID:1 存储地址:/192.168.44.1:10911
主题:topic1 消息:清幽之地的博客 队列ID:0 存储地址:/192.168.44.1:10911
主题:topic2 消息:清幽之地的博客 队列ID:1 存储地址:/192.168.44.1:10911
主题:topic3 消息:清幽之地的博客 队列ID:0 存储地址:/192.168.44.1:10911
主题:topic4 消息:清幽之地的博客 队列ID:3 存储地址:/192.168.44.1:10911
主题:topic5 消息:清幽之地的博客 队列ID:1 存储地址:/192.168.44.1:10911
主题:topic6 消息:清幽之地的博客 队列ID:2 存储地址:/192.168.44.1:10911
主题:topic7 消息:清幽之地的博客 队列ID:3 存储地址:/192.168.44.1:10911
主题:topic8 消息:清幽之地的博客 队列ID:2 存储地址:/192.168.44.1:10911
主题:topic9 消息:清幽之地的博客 队列ID:0 存储地址:/192.168.44.1:10911
Без лишнего текстового описания, через приведенные выше коды, я считаю, что вы правыCommitLog
файл для дальнейшего понимания.
На данный момент мы рассматриваем другой вопрос:
CommitLog
В файле сохраняются сообщения всех тем, но при потреблении мы подписываемся на тему для потребления.RocketMQ
Как эффективно получать сообщения?
2. Потребление очереди
Для того, чтобы решить вышеуказанную проблему,RocketMQ
представилConsumeQueue
Файл очереди потребления.
продолжать говоритьConsumeQueue
Прежде чем мы должны сначала понять другое понятие, а именноMessageQueue
.
1. Очередь сообщений
Мы знаем, что при отправке сообщения нам нужно указать тему. Потом при создании темы есть очень важный параметрMessageQueue
.简单来说,就是你这个Topic对应了多少个队列,也就是几个MessageQueue
, по умолчанию 4. Так какова его функция?
Это механизм разделения данных. Например, в нашем топике 100 штук данных, а в топике по умолчанию 4 очереди, поэтому в каждой очереди около 25 штук данных.
Затем этиMessageQueue
да иBroker
связаны друг с другом, то есть каждыйMessageQueue
может быть в разномBroker
На машине это зависит от количества имеющихся у вас очередей и вашего брокерского кластера.
Давайте взглянем на картинку выше, название темы по порядку, всего их 4MessageQueue
, каждый с 25 частями данных. Потому что в авторской локальной среде есть только одинBroker
, поэтому ихbrokerName
Все указывают на одну и ту же машину.
теперь, когдаMessageQueue
Если их несколько, то при отправке сообщения необходимо каким-то образом выбрать очередь. По умолчанию очередь сообщений получается путем опроса.
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
Конечно,RocketMQ
Также есть механизм задержки отказа, немного усложняющийся при выборе очереди сообщений, который мы сегодня обсуждать не будем.
2. Потребление очереди
законченныйMessageQueue
, ПосмотримConsumerQueue
. Мы сказали выше, что это для эффективного поиска тематических сообщений.
ConsumerQueue
также является набором групповых файлов, его расположениеC:/Users/shiqizhen/store/consumequeue
. Ниже этого каталога находится папка с именем «Тема», а затем следующий уровеньMessageQueue
Папка с именем по идентификатору очереди и, наконец, один или несколько файлов.
После этого наслаиванияRocketMQ
Можно получить как минимум следующую информацию:
- Сначала по названию темы вы можете найти конкретную папку;
- Затем найдите конкретный файл по идентификатору очереди сообщений;
- Наконец, по содержимому файла найдите конкретное сообщение.
Итак, что хранится в этом файле?
3. Разобрать файл
ускоритьConsumerQueue
Быстрая скорость поиска и экономия места на диске, полное количество сообщений не будет храниться в файле. Он хранится в следующем формате:
Точно так же давайте сначала напишем кусок кода и выведем его в этом формате.ConsumerQueue
содержимое файла.
public static void main(String[] args)throws Exception {
String path = "C:\\Users\\shiqizhen\\store\\consumequeue\\order\\0\\00000000000000000000";
ByteBuffer buffer = read(path);
while (true){
long offset = buffer.getLong();
long size = buffer.getInt();
long code = buffer.getLong();
if (size==0){
break;
}
System.out.println("消息长度:"+size+" 消息偏移量:" +offset);
}
System.out.println("--------------------------");
}
Раньше у насorder
В этой ветке написано 100 кусков данных, так что вотorder#messagequeue#0
В нем 25 записей.
消息长度:173 消息偏移量:2003
消息长度:173 消息偏移量:2695
消息长度:173 消息偏移量:3387
消息长度:173 消息偏移量:4079
消息长度:173 消息偏移量:4771
消息长度:173 消息偏移量:5463
消息长度:173 消息偏移量:6155
消息长度:173 消息偏移量:6847
消息长度:173 消息偏移量:7539
消息长度:173 消息偏移量:8231
消息长度:173 消息偏移量:8923
消息长度:173 消息偏移量:9615
消息长度:173 消息偏移量:10307
消息长度:173 消息偏移量:10999
消息长度:173 消息偏移量:11691
消息长度:173 消息偏移量:12383
消息长度:173 消息偏移量:13075
消息长度:173 消息偏移量:13767
消息长度:173 消息偏移量:14459
消息长度:173 消息偏移量:15151
消息长度:173 消息偏移量:15843
消息长度:173 消息偏移量:16535
消息长度:173 消息偏移量:17227
消息长度:173 消息偏移量:17919
消息长度:173 消息偏移量:18611
--------------------------
Осторожные друзья, вы, должно быть, нашли его. В приведенном выше выводе разница между смещениями сообщений равна = длина сообщения * длина очереди.
4. Сообщение запроса
Теперь мы проходимConsumerQueue
Теперь, когда известна длина и смещение сообщения, его легче найти.
public static MessageExt getMessageByOffset(ByteBuffer commitLog,long offset,int size) throws Exception {
ByteBuffer slice = commitLog.slice();
slice.position((int)offset);
slice.limit((int) (offset+size));
MessageExt message = CommitLogTest.decodeCommitLog(slice);
return message;
}
Затем мы можем положиться на этот метод для достиженияConsumerQueue
Получите конкретное содержание сообщения.
public static void main(String[] args) throws Exception {
//consumerqueue根目录
String consumerPath = "C:\\Users\\shiqizhen\\store\\consumequeue";
//commitlog目录
String commitLogPath = "C:\\Users\\shiqizhen\\store\\commitlog\\00000000000000000000";
//读取commitlog文件内容
ByteBuffer commitLogBuffer = CommitLogTest.read(commitLogPath);
//遍历consumerqueue目录下的所有文件
File file = new File(consumerPath);
File[] files = file.listFiles();
for (File f:files) {
if (f.isDirectory()){
File[] listFiles = f.listFiles();
for (File queuePath:listFiles) {
String path = queuePath+"/00000000000000000000";
//读取consumerqueue文件内容
ByteBuffer buffer = CommitLogTest.read(path);
while (true){
//读取消息偏移量和消息长度
long offset = (int) buffer.getLong();
int size = buffer.getInt();
long code = buffer.getLong();
if (size==0){
break;
}
//根据偏移量和消息长度,在commitloh文件中读取消息内容
MessageExt message = getMessageByOffset(commitLogBuffer,offset,size);
if (message!=null){
System.out.println("消息主题:"+message.getTopic()+" MessageQueue:"+
message.getQueueId()+" 消息体:"+new String(message.getBody()));
}
}
}
}
}
}
Запустив этот код, вы можете получить все сообщения 10 тем из предыдущего тестового примера.
消息主题:topic0 MessageQueue:1 消息体:清幽之地的博客
消息主题:topic1 MessageQueue:0 消息体:清幽之地的博客
消息主题:topic2 MessageQueue:1 消息体:清幽之地的博客
消息主题:topic3 MessageQueue:0 消息体:清幽之地的博客
消息主题:topic4 MessageQueue:3 消息体:清幽之地的博客
消息主题:topic5 MessageQueue:1 消息体:清幽之地的博客
消息主题:topic6 MessageQueue:2 消息体:清幽之地的博客
消息主题:topic7 MessageQueue:3 消息体:清幽之地的博客
消息主题:topic8 MessageQueue:2 消息体:清幽之地的博客
消息主题:topic9 MessageQueue:0 消息体:清幽之地的博客
5. Потребляйте новости
Когда сообщение потребляется, процесс поиска сообщения аналогичен. Однако стоит отметить, чтоConsumerQueue
документы иCommitLog
Файлов может быть несколько, поэтому будет процесс поиска файлов Давайте посмотрим на исходный код.
Во-первых, в соответствии с ходом потребления, чтобы найти соответствующийConsumerQueue
, чтобы получить содержимое файла.
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
//ConsumerQueue文件大小
int mappedFileSize = this.mappedFileSize;
//根据消费进度,找到在consumerqueue文件里的偏移量
long offset = startIndex * CQ_STORE_UNIT_SIZE;
if (offset >= this.getMinLogicOffset()) {
//返回ConsumerQueue映射文件
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
if (mappedFile != null) {
//返回文件里的某一块内容
SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
return result;
}
}
return null;
}
затем получить сообщениеCommitLog
Смещение и длина сообщения в файле, получить сообщение.
public SelectMappedBufferResult getMessage(final long offset, final int size) {
//commitlog文件大小
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
//根据消息偏移量,定位到具体的commitlog文件
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
//根据消息偏移量和长度,获取消息内容
int pos = (int) (offset % mappedFileSize);
return mappedFile.selectMappedBuffer(pos, size);
}
return null;
}
6. Запрос по идентификатору сообщения
Выше мы видели способ найти сообщение по смещению, ноRocketMQ
Также предусмотрено несколько других способов запроса сообщений.
- Запрос по ключу сообщения;
- Запрос по уникальному ключу;
- Запрос по идентификатору сообщения.
это здесь,Message Key和Unique Key
Все они генерируются клиентом перед отправкой сообщения. Мы можем установить его сами, или он может быть автоматически сгенерирован клиентом,Message Id
вBroker
Генерируется, когда клиент сохраняет сообщение.
Message Id
Всего 16 байтов, содержащих адрес хоста хранилища сообщений иCommitLog
Смещение смещения в файле. В качестве доказательства есть исходный код:
/**
* 创建消息ID
* @param input
* @param addr Broker服务器地址
* @param offset 正在存储的消息,在Commitlog中的偏移量
* @return
*/
public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
input.flip();
int msgIDLength = addr.limit() == 8 ? 16 : 28;
input.limit(msgIDLength);
input.put(addr);
input.putLong(offset);
return UtilAll.bytes2string(input.array());
}
когда мыMessage Id
При запросе сообщения у брокера он сначала передаетdecodeMessageId
метод для анализа адреса брокера и смещения сообщения.
public static MessageId decodeMessageId(final String msgId) throws Exception {
SocketAddress address;
long offset;
int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2;
byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));
ByteBuffer bb = ByteBuffer.wrap(port);
int portInt = bb.getInt(0);
//解析出来Broker地址
address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);
//偏移量
byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16));
bb = ByteBuffer.wrap(data);
offset = bb.getLong(0);
return new MessageId(address, offset);
}
так черезMessage Id
При запросе сообщений это фактически напрямую от определенного брокера.CommitLog
Запрос в указанном месте является точным запросом.
Это хорошо, но если вы пройдетеMessage Key 和 Unique Key
Спрашивая,RocketMQ
Как это делается?
3. Указатель
1. индексный индексный файл
ConsumerQueue
Очередь потребления сообщений — это индексный файл, специально созданный для подписки на сообщения, который повышает скорость получения сообщений на основе тем и очередей сообщений.
Кроме того,RocketMQ
Представьте механизм хеш-индекса для создания индекса для сообщения, и его ключMessage Key 和 Unique Key
.
Итак, давайте сначала посмотрим на структуру индексного индексного файла:
Для простоты понимания мы по-прежнему используем код для разбора этого файла.
public static void main(String[] args) throws Exception {
//index索引文件的路径
String path = "C:\\Users\\shiqizhen\\store\\index\\20200506224547616";
ByteBuffer buffer = CommitLogTest.read(path);
//该索引文件中包含消息的最小存储时间
long beginTimestamp = buffer.getLong();
//该索引文件中包含消息的最大存储时间
long endTimestamp = buffer.getLong();
//该索引文件中包含消息的最大物理偏移量(commitlog文件偏移量)
long beginPhyOffset = buffer.getLong();
//该索引文件中包含消息的最大物理偏移量(commitlog文件偏移量)
long endPhyOffset = buffer.getLong();
//hashslot个数
int hashSlotCount = buffer.getInt();
//Index条目列表当前已使用的个数
int indexCount = buffer.getInt();
//500万个hash槽,每个槽占4个字节,存储的是index索引
for (int i=0;i<5000000;i++){
buffer.getInt();
}
//2000万个index条目
for (int j=0;j<20000000;j++){
//消息key的hashcode
int hashcode = buffer.getInt();
//消息对应的偏移量
long offset = buffer.getLong();
//消息存储时间和第一条消息的差值
int timedif = buffer.getInt();
//该条目的上一条记录的index索引
int pre_no = buffer.getInt();
}
System.out.println(buffer.position()==buffer.capacity());
}
Мы видим, что окончательный результат вывода верен, что доказывает правильность процесса синтаксического анализа.
2. Создайте индекс
В теле сообщения, которое мы отправляем, содержитсяMessage Key 或 Unique Key
, то для каждого из них будет построен индекс.
Здесь есть два важных момента:
- Вычислить положение слота Hash согласно сообщению Key;
- Рассчитайте начальную позицию записи индекса на основе количества слотов хэша и индекса индекса.
поставить токЗапись указателяЗначение индекса, записанное в слот HashabsSlotPos
на позиции;Информация, относящаяся к записи указателя(hashcode/消息偏移量/时间差值/hash槽的值)
, смещение от началаabsIndexPos
Начните, последовательность записывается байт за байтом.
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {
//计算key的hash
int keyHash = indexKeyHashMethod(key);
//计算hash槽的坐标
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
//计算时间差值
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
//计算INDEX条目的起始偏移量
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
//依次写入hashcode、消息偏移量、时间戳、hash槽的值
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
//将当前INDEX中包含的条目数量写入HASH槽
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
return true;
}
return false;
}
После того, как индекс индекса построен таким образом, в соответствии сMessage Key 或 Unique Key
Запрашивать сообщения легко.
Например, мы проходимRocketMQ
клиентский инструмент, согласноUnique Key
чтобы запросить сообщение.
adminImpl.queryMessageByUniqKey("order", "FD88E3AB24F6980059FDC9C3620464741BCC18B4AAC220FDFE890007");
существуетBroker
конец, черезUnique Key
Чтобы вычислить положение слота хэша, чтобы найти данные индекса индекса. Получить физическое смещение сообщения из индекса индекса и, наконец, в соответствии с физическим смещением сообщения, непосредственно вCommitLog
Вы можете найти его в файле .
Суммировать
В этой статье обсуждаетсяRocketMQ
Основная идея хранения сообщений и поиска сообщений в . Промежуточный процесс исходного кода очень сложен, но с помощью этого восходящего метода мы начинаем непосредственно с файлов и анализируем их файловые структуры, чтобы разобраться в их отношениях и функциях и надеемся оказать положительное влияние на друзей. .
Нелегко быть оригинальным, это понравится приглашенным официальным лицам перед отъездом.Это будет движущей силой для автора продолжать писать~