Чтобы изучить RocketMQ, вам нужно понять две вещи: связь и хранение. Тут немного времени ушло на то, чтобы написать lua-плагин wireshark для RocketMQ.Процесс очень интересный.Запишите его и запишите.
Прочитав эту статью, вы освоите следующие знания.
- Как написать скелетный код плагина wireshark lua
- Как реализовать версию плагина Hello World
- Основной формат протокола связи RocketMQ
- Каков формат тела RocketMQ, когда есть сообщение в PULL?
Первый взгляд на плагин Hello World
На странице о wireshark вы можете увидеть версию Lua, которую он поддерживает сейчас.Ниже приведена страница, соответствующая моей версии wireshark v3.0.6.
Как видите, в настоящее время поддерживается версия Lua 5.2.4. Давайте взглянем на кусок скелетного кода.
-- 声明协议
local NAME = "RocketMQ"
local PORTS = { 9876, 10911 }
local proto = Proto.new(NAME, "RocketMQ Protocol")
-- 声明 dissector 函数,处理包
function proto.dissector(tvb, pinfo, tree)
print("load plugin...demo")
pinfo.cols.protocol = proto.name;
pinfo.cols.info = "Hello, World"
end
-- 注册 dissector 到 wireshark
for _, port in ipairs(PORTS) do
DissectorTable.get("tcp.port"):add(port, proto)
end
Найдите каталог плагинов wireshark, на моем компьютере этот путь/Applications/Wireshark.app/Contents/Resources/share/wireshark/
, изменить в нем файл init.lua
vim /Applications/Wireshark.app/Contents/Resources/share/wireshark/init.lua
Добавьте строку в вызов dofile, которая загружает lua-файл выше.
...
dofile("/path/to/demo.lua")
Эффект до и после выполнения выглядит следующим образом.
Разобрать протокол RocketMQ
Протокол связи RocketMQ относительно прост, и общий формат протокола выглядит следующим образом.
Протокол связи RocketMQ состоит из четырех частей:
- Первая часть: первые 4 байта представляют собой общую длину оставшихся трех частей (исключая собственные 4 байта)
- Вторая часть: следующие 4 байта представляют длину части заголовка.
- Третья часть: содержимое следующей длины заголовка — это заголовок протокола, который сохраняется после сериализации с помощью json и в основном используется для представления различных типов ответов на запросы.
- Часть 4: содержание тела
Возьмите реальный пакет в качестве примера:
первые четыре байта00 00 01 9b
Указывает длину всего пакета 411 (0x019b), следующие четыре байта00 00 00 d4
Представляет длину заголовка, здесь 212 (0xD4), следующие 212 байт представляют содержимое заголовка, вы можете видеть, что это строка json, а последние 195 (411-4-212) байтов представляют тело. содержание, конкретный формат сообщения будет рассмотрен ниже.
Далее напишем программу разбора.
Логика парсинга происходит в методе proto.dissector, который имеет следующую сигнатуру.
function proto.dissector(tvb, pinfo, tree)
end
Интерпретация этих параметров следующая:
- tvb — это аббревиатура от «Testy Virtual Buffer», то есть содержимого буфера, содержащего пакеты.
- pinfo — это аббревиатура Packet Information, которая указывает информацию, относящуюся к пакету Packet, и может получить исходный порт, порт назначения и другую информацию о пакете.
- tree представляет дерево отображения пользовательского интерфейса wireshark, и информация, полученная в результате синтаксического анализа пакета, будет добавлена в это иерархическое дерево.
Далее мы покажем четыре части связи RocketMQ с wirehark. Код для изменения функции proto.dissector показан ниже.
function proto.dissector(tvb, pinfo, tree)
print("load plugin...demo")
local subtree = tree:add(proto, tvb())
pinfo.cols.protocol = proto.name;
pinfo.cols.info = ""
local length = tvb(0, 4):uint()
subtree:add("Total Length", length)
local headerLength = tvb(4, 4):uint()
subtree:add("Header Length", headerLength)
local headerData = tvb(8, headerLength):string()
subtree:add("Header", headerData)
local bodyDataLen = length - 4 - headerLength
local bodyData = tvb(8 + headerLength, bodyDataLen):string()
subtree:add("Body", bodyData)
end
Перезагружая lua-скрипт, можно увидеть, что несколько частей протокола RocketMQ в Wireshark отобразились.
Чтобы различить, является ли это запросом связи или ответом, мы можем отличить его по номеру целевого порта и добавить новый метод.
function isRequest(pinfo)
local dstPort = pinfo.dst_port;
for _, port in ipairs(PORTS) do
if (dstPort == port) then
return true
end
end
return false
end
Добавлено различие между запросами и ответами в proto.dissector для более читабельного описания.
if (isRequest(pinfo)) then
pinfo.cols.info:append("[REQUEST]" .. "↑↑↑")
else
pinfo.cols.info:append("[RESPONSE]" .. "↓↓↓")
end
Эффект показан ниже.
Следующее, что нам нужно сделать, это разобрать json и отобразить его лучше Давайте сначала посмотрим на запрос и ответ, когда заголовок и тело имеют формат json. Добавьте рекурсивный метод для единообразной обработки данных в формате json.
-- k,v 分别表示 json 的 key 和 value,tree 表示 UI 树
function parseAndAddTree(k, v, tree)
if (type(v) == 'table') then
local sizeStr = ""
if (#v > 0) then
sizeStr = "size: " .. #v
end;
local childTree = tree:add(k, sizeStr, tree)
for key, value in pairs(v) do
parseAndAddTree(key, value, childTree)
end
else
tree:add(k .. ":", json.stringify(v))
end
end
Добавьте синтаксический анализ заголовка в метод proto.dissector, как показано ниже.
local subtree = tree:add(protoMQ, tvb())
local headerTree = subtree:add("Header", "")
-- 解析 json
local header = json.parse(headerData, 1, "}")
for k, v in pairs(header) do
parseAndAddTree(k, v, headerTree)
end
Перезагрузите и запустите приведенный выше код, эффект будет следующим.
В то же время мы также можем найти более читаемое строковое представление, соответствующее коду запроса и ответа в исходном коде RocketMQ,
local requestCodeMap = {
[10] = "SEND_MESSAGE",
[11] = "PULL_MESSAGE",
[12] = "QUERY_MESSAGE",
...
}
local responseCode = {
[0] = "SUCCESS",
[1] = "SYSTEM_ERROR",
[2] = "SYSTEM_BUSY",
}
Это также можно обработать таким образом, если Body является строкой json, как показано ниже.
Однако в некоторых случаях тело не представлено строкой json.Например, когда отправляется сообщение PULL, если сервер возвращает расходуемое сообщение, то тело сохраняется не как строка, а как пользователь, определенный RocketMQ. , Формат сообщения следующий.
Написание этого анализа является личным усилием. Я реализовал версию lua со ссылкой на исходный код Java RocketMQ. Полный код выглядит следующим образом:
function decodeMessageExt(bodyTree, pinfo, bodyData)
local bodyTree = bodyTree:add("Body", "")
pinfo.cols.info:append(">>>>#FOUND#")
local offset = 0;
bodyTree:add("totalSize", bodyData(offset, 4):int())
offset = offset + 4;
local magicCode = string.format("0X%8.8X", bodyData(offset, 4):uint())
bodyTree:add("magicCode", magicCode)
offset = offset + 4;
bodyTree:add("bodyCRC", bodyData(offset, 4):int())
offset = offset + 4;
bodyTree:add("queueId", bodyData(offset, 4):int())
offset = offset + 4;
bodyTree:add("flag", bodyData(offset, 4):int())
offset = offset + 4;
bodyTree:add("queueOffset", bodyData(offset, 8):int64():tonumber())
offset = offset + 8;
bodyTree:add("physicOffset", bodyData(offset, 8):int64():tonumber())
offset = offset + 8;
bodyTree:add("sysFlag", bodyData(offset, 4):int())
offset = offset + 4;
bodyTree:add("bornTimeStamp", bodyData(offset, 8):int64():tonumber())
offset = offset + 8;
local bornHost = bodyData(offset, 1):uint()
.. "." .. bodyData(offset + 1, 1):uint()
.. "." .. bodyData(offset + 2, 1):uint()
.. "." .. bodyData(offset + 3, 1):uint()
bodyTree:add("bornHost", bornHost)
offset = offset + 4;
bodyTree:add("port", bodyData(offset, 4):int())
offset = offset + 4;
bodyTree:add("storeTimestamp", bodyData(offset, 8):int64():tonumber())
offset = offset + 8;
local storeHost = bodyData(offset, 1):uint()
.. "." .. bodyData(offset + 1, 1):uint()
.. "." .. bodyData(offset + 2, 1):uint()
.. "." .. bodyData(offset + 3, 1):uint()
bodyTree:add("storeHost", storeHost)
offset = offset + 4;
bodyTree:add("storePort", bodyData(offset, 4):int())
offset = offset + 4;
--13 RECONSUMETIMES
bodyTree:add("reconsumeTimes", bodyData(offset, 4):int())
offset = offset + 4;
--14 Prepared Transaction Offset
bodyTree:add("preparedTransactionOffset", bodyData(offset, 8):int64():tonumber())
offset = offset + 8;
--15 BODY
local bodyLen = bodyData(offset, 4):int()
-- bodyTree:add("bodyLen", bodyLen)
offset = offset + 4;
bodyTree:add("body:", bodyData(offset, bodyLen):string())
offset = offset + bodyLen;
--16 TOPIC
local topicLen = bodyData(offset, 1):int()
offset = offset + 1;
-- bodyTree:add("topicLen", topicLen)
local topic = bodyData(offset, topicLen):string()
bodyTree:add("topic:", topic)
pinfo.cols.info:append(" topic:" .. topic)
offset = offset + topicLen;
--17 properties
local propertiesLength = bodyData(offset, 2):int()
offset = offset + 2;
bodyTree:add("propertiesLength", propertiesLength)
if (propertiesLength > 0) then
local propertiesStr = bodyData(offset, propertiesLength):string()
offset = offset + propertiesLength
local propertiesTree = bodyTree:add("propertiesStr", "size: " .. propertiesLength)
for k, v in string.gmatch(propertiesStr, "(%w+)\1(%w+)") do
propertiesTree:add(k, v)
end
end
end
Эффект от бега следующий.
Выкладываю полный код на гитхаб:GitHub.com/Артур-Станция…, Заинтересованные студенты могут взглянуть. Помимо функций из предыдущей статьи, также есть возможность извлекать полезную информацию, например тему, в колонку Info, что удобно для просмотра процесса общения.
постскриптум
Интересно, что не имеет к этому никакого отношения.В фоновой разработке Lua, связующий язык, имеет множество применений в OpenResty и Redis, и есть много интересных применений, которые ждут нас, чтобы открыть для себя.
Написав этот подключаемый модуль, я лучше узнаю подробности взаимодействия RocketMQ, и позже я смогу написать статью о деталях взаимодействия RocketMQ.
Если у вас есть какие-либо вопросы, вы можете отсканировать QR-код ниже и подписаться на мою официальную учетную запись, чтобы связаться со мной.