Производитель RocketMQ и хранилище сообщений
1. Введение
В этой статье в основном анализируется отправка и хранение сообщений из исходного кода. В RocketMQ есть три способа отправки сообщений: надежная синхронная отправка, надежная асинхронная отправка и односторонняя отправка. С точки зрения модели хранения текущее ПО промежуточного слоя MQ делится на два типа: сохраняемое и непостоянное. В этой статье будет проанализирован механизм хранения сообщений RocketMQ.
2. Сообщения RocketMQ
Сначала взгляните на класс пакета сообщений RocketMQ org.apache.rocketmq.common.message.Message.
Основные атрибуты: тема темы, флаг сообщения, тело сообщения, расширенные атрибуты.
Скрытые свойства:
- тег: тег сообщения, используемый для фильтрации сообщений
- ключи: ключ индекса сообщения
- waitStoreMsgOK: ждать ли завершения хранения сообщения перед возвратом при отправке сообщения
- delayTimeLevel: уровень задержки сообщения, используемый для синхронизации сообщений или повторной попытки сообщения.
Расширенные свойства существуют в свойствах сообщения.
3. Процесс запуска продюсера
Трассируем от стартового метода DefaultMQProducerImpl.
первый шаг: проверьте, соответствует ли productGroup требованиям, и измените имя экземпляра производителя на идентификатор процесса.
//DefaultMQProducerImpl::start
public void start() throws MQClientException {
this.start(true);//默认为true
}
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
//第一步
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
второй шаг: создать экземпляр MQClientInstance.
третий шаг: Зарегистрируйтесь в MQClientInstance и добавьте текущего производителя в управление MQClientInstance, что удобно для последующих вызовов сетевых запросов и обнаружения пульса.
четвертый шаг: Запустить MQClientInstance, если MQClientInstance был запущен, если он был запущен, в этот раз он не будет запущен.
4. Базовый поток отправки сообщений
Процесс отправки сообщений в основном состоит из: проверки сообщений, поиска маршрутов и отправки сообщений (включая механизмы обработки исключений).
Проверка сообщения в основном предназначена для проверки длины сообщения.Мы в основном объясняем маршрутизацию и отправку сообщения.
4.1 Поиск маршрутов
Прежде чем сообщение будет отправлено, вам сначала необходимо получить информацию о маршрутизации темы.
//DefaultMQProducerImpl::tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
Если производитель кэшировал информацию о маршрутизации темы и содержит очередь сообщений, он будет напрямую возвращать информацию о маршрутизации.Если кэша или очереди сообщений нет, он запросит у NameServer информацию о маршрутизации темы. Если это первая отправка сообщения, оно попытается использовать тему по умолчанию для запроса, если она не будет найдена. Если не найдено, будет сообщено об ошибке.
4.2 Выбор сообщения
Очередь сообщений выбирается в соответствии с информацией о маршрутизации, а возвращенная очередь сообщений сортируется по брокеру и порядковому номеру. Прежде всего, при отправке сообщения используется механизм повторных попыток, который выполняется в цикле, выбирая очередь сообщений, отправляя сообщение и возвращаясь, если отправка прошла успешно, и повторяя попытку, если отправка не удалась. Существует два способа выбора сообщения.
- sendLatencyFaultEnable=false, механизм по умолчанию
- sendLatencyFaultEnable=true, включить механизм задержки при сбое брокера
//MQFaultStrategy::selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
4.3 Отправка сообщения
Основная запись API отправки сообщений: DefaultMQProducerImpl::sendKernelImpl
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//省略
}
Детали параметра:
- Message msg: сообщение для отправки
- MessageQueue mq: сообщение будет отправлено в очередь сообщений
- CommunicationMode CommunicationMode: режим отправки сообщений, SYNC, ASYNC, ONEWAY
- SendCallback sendCallback: функция обратного вызова асинхронного сообщения
- TopicPublishInfo TopicPublishInfo: сообщения о маршрутизации тем
- длинный тайм-аут: тайм-аут отправки сообщения
Шаги отправки:
- Получить сетевой адрес брокера в соответствии с MessageQueue
- Присвойте сообщению глобальный уникальный идентификатор.
- Если функция ловушки отправки сообщения зарегистрирована, расширенная логика перед отправкой сообщения выполняется
- Создайте пакет запроса на отправку сообщения
- По способу отправки сообщения сетевая передача осуществляется синхронным, асинхронным и односторонним способами.
- Если функция ловушки отправки сообщения зарегистрирована, выполните логику после
4.3.1 Синхронная передача
Точкой входа для отправки сообщений клиентом MQ является MQClientAPIImpl::sendMessage.
Шаги синхронной отправки:
- Проверить, является ли отправка сообщения разумной
//AbstractSendMessageProcessor::msgCheck
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
&& this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending message is forbidden");
return response;
}
if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
log.warn(errorMsg);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorMsg);
return response;
}
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
} else {
topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
}
}
log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),
requestHeader.getDefaultTopic(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
if (null == topicConfig) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicConfig =
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
topicSysFlag);
}
}
if (null == topicConfig) {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
}
int queueIdInt = requestHeader.getQueueId();
int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
if (queueIdInt >= idValid) {
String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
queueIdInt,
topicConfig.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}
return response;
}
- Если количество повторных попыток сообщения превышает максимально допустимое количество повторных попыток, сообщение попадет в очередь задержки DLD.
- Вызов DefaultMessageStore::putMessage для хранения сообщений
4.3.2 Асинхронная отправка
Для асинхронной отправки нет необходимости блокировать и ждать, пока сервер сообщений вернет результат отправки сообщения, ему нужно только предоставить функцию обратного вызова для клиента, отправляющего сообщение, для обратного вызова, когда он получит результат ответа. По сравнению с синхронной отправкой производительность отправителя значительно повышается в асинхронном режиме.
4.3.3 Односторонняя отправка
Односторонняя отправка не требует ожидания результата или предоставления функции обратного вызова.Отправителю сообщения все равно, успешно ли отправлено сообщение.Принцип тот же, что и при асинхронной отправке, но отправитель сообщения ничего не делает после получения результат.
4.3.4 Пакетная отправка
Пакетная отправка сообщений предназначена для упаковки и отправки нескольких фрагментов информации по одной и той же теме на сервер сообщений, что уменьшает количество сетевых вызовов и увеличивает скорость передачи по сети.
Когда отправляется одно сообщение, содержимое тела сообщения будет храниться в теле. Для пакетной отправки сообщения содержимое нескольких сообщений должно храниться в теле. RocketMQ хранит содержимое нескольких сообщений в фиксированном формате.
Массовая рассылка:
//DefaultMQProducer::send
public SendResult send(
Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs));
}
**Процесс отправки: **Сначала в отправителе сообщения вызовите пакетный метод, чтобы инкапсулировать пакет сообщений в объект MessageBatch. MessageBatch содержит сообщения Listmessages внутри, так что пакетная отправка точно такая же, как одиночный процесс отправки.
Следуйте за ним:
//DefaultMQProducer::send
public SendResult send(
Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs));
}
//DefaultMQProducer::batch
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
MessageBatch msgBatch;
try {
msgBatch = MessageBatch.generateFromList(msgs);
for (Message message : msgBatch) {
Validators.checkMessage(message, this);
MessageClientIDSetter.setUniqID(message);
message.setTopic(withNamespace(message.getTopic()));
}
msgBatch.setBody(msgBatch.encode());
} catch (Exception e) {
throw new MQClientException("Failed to initiate the MessageBatch", e);
}
msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
return msgBatch;
}
//DefaultMQProducerImpl::send
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
5. Хранение сообщений
Большинству бизнес-систем требуется, чтобы MQ имел возможности постоянного хранения, что может значительно повысить высокую доступность системы.
Давайте сначала посмотрим на поток данных RocketMQ:
- CommitLog: файл хранения сообщений, все сообщения тем сообщений хранятся в файле CommitLog.
- ConsumeQueue: очередь потребления сообщений. После того, как сообщение достигнет файла CommitLog, оно будет асинхронно перенаправлено в очередь потребления сообщений для обработки потребителями сообщений.
- IndexFile: индексный файл сообщения, в котором в основном хранится соответствие между ключом сообщения и смещением.
- Служба статуса транзакции: сохраняет статус транзакции каждого сообщения.
- Служба сообщений синхронизации: каждый уровень задержки соответствует очереди потребления сообщений, в которой хранится ход извлечения сообщений из очереди задержки.
Архитектура хранения RocketMQ:
Класс реализации хранилища сообщений: org.apache.rocketmq.store.DefaultMessageStore
Представьте основные атрибуты:
- MessageStoreConfig messageStoreConfig: свойства конфигурации хранилища сообщений
- CommitLog commitLog: класс реализации для хранения файлов CommitLog.
- ConcurrentMap<String/* topic /, ConcurrentMap<Integer/queueId */,ConsumeQueue>> serveQueueTable : кэш-таблица хранилища очереди сообщений, сгруппированная по темам сообщений
- FlushConsumeQueueService flushConsumeQueueService: файл очереди сообщений ConsumeQueue поток очистки
- CleanCommitLogService cleanCommitLogService: очистить запрашиваемую цену службы CommitLog
- CleanConsumeQueueService cleanConsumeQueueService: очищает файловую службу ConsumeQueue.
- IndexService indexService: класс реализации файла индекса
- AllocateMappedFileService allocateMappedFileService: служба размещения MappedFile
- ReputMessageService reputMessageService: распространение сообщений CommitLog, сборка файлов ConsumeQueue и IndexFile на основе файлов CommitLog.
- HAService haService: механизм высокой доступности хранилища
- TransientStorePool transientStorePool: кеш памяти кучи сообщений
- MessageArrivingListener messageArrivingListener: прослушиватель прибытия сообщения в режиме длинного опроса
- BrokerConfig brokerConfig: свойства конфигурации брокера
- StoreCheckpoint storeCheckpoint: точка проверки файловой щетки
- LinkedList dispatcherList: запрос на пересылку файла CommitLog
5.1 Процесс хранения отправки сообщения
Запись в хранилище сообщений: org.apache.rocketmq.store.DefaultMessageStore::putMessage
- Если текущий Брокер перестает работать или Брокер находится в роли SLAVE или текущий Rocket не поддерживает запись, запись сообщения будет отклонена.Если длина сообщения превышает 256 символов, а длина атрибута сообщения превышает 65536 символов, запись сообщения будет быть отвергнутым
- Проверить уровень задержки сообщений
- Получить текущий доступный для записи файл CommitLog
- Перед записью в CommitLog сначала подайте заявку на putMessageLock, то есть сохранение сообщения в файле CommitLog является последовательным
- Время хранения сообщения дизайна
- Добавить сообщение в MappedFile
- Создайте глобально уникальный идентификатор сообщения
- Получить смещение сообщения в очереди сообщений
- Рассчитать общую длину сообщения в соответствии с длиной тела сообщения, длиной темы, длиной атрибута и форматом хранения сообщения.
- Если длина сообщения +END_FILE_MIN_BLANK_LENGTH больше, чем файл CommitLog
- Сохраните память сообщений в ByteBuffer, затем создайте AppendMessageResult.
- Обновить логическое смещение очереди сообщений
- Блокировка putMessageLock будет снята после обработки логики добавления сообщения.
- DefaultAppendMessageCallback::doAppend просто добавляет сообщение в память, ему необходимо сохранить данные из памяти на диск в соответствии с синхронной или асинхронной очисткой.
Упрощено до следующей диаграммы последовательности:
5.2 Процесс сопоставления памяти
RocketMQ повышает производительность доступа к вводу-выводу за счет использования отображаемых в память файлов. Будь то CommitLog, ConsumeQueue или IndexFile, один файл имеет фиксированную длину. Если файл заполнен, а затем создается новый файл, имя файла соответствует первое сообщение Глобальное физическое смещение .
шаг:
- Отображенный в память файл MappedFile создается AllocateMappedFileService.
- Создание MappedFile — типичная модель производитель-потребитель.
- Когда MappedFileQueue вызывает getLastMappedFile для получения MappedFile, поместите запрос в очередь
- Поток AllocateMappedFileService постоянно прослушивает очередь и создает объект MappedFile, когда в очереди есть запрос.
- Наконец, объект MappedFile разогревается, и на нижнем уровне вызываются методы force и mlock.
5.3 Процесс прошивки
После вызова appendMessage MapedFile сообщение загружается только в ByteBuffer, то есть в память, и не помещается на диск. Чтобы разместить диск, память должна быть сброшена на диск.Для commitLog RocketMQ предоставляет два способа размещения диска.
- Сообщения, отправляемые производителем брокеру, сохраняются в MappedFile, а затем синхронизируются на диск через механизм очистки диска.
- Чистка делится на синхронную чистку и асинхронную чистку.
- Асинхронные фоновые потоки очистки выполняются через определенные промежутки времени.
- Синхронное смахивание также является моделью производителя-потребителя. После того как посредник сохраняет сообщение в MappedFile, он создает запрос GroupCommitRequest и помещает его в список, блокирует и ожидает. Фоновый поток получает запрос из списка и очищает диск, а также уведомляет ожидающий поток после успешной очистки диска.
Синхронизировать кисть (CommitLog.java):
//封装的一次刷盘请求
public static class GroupCommitRequest {
//这次请求要刷到的offSet位置,比如已经刷到2,
private final long nextOffset;
//控制flush的拴
private final CountDownLatch countDownLatch = new CountDownLatch(1);
private volatile boolean flushOK = false;
public GroupCommitRequest(long nextOffset) {
this.nextOffset = nextOffset;
}
public long getNextOffset() {
return nextOffset;
}
//刷完了唤醒
public void wakeupCustomer(final boolean flushOK) {
this.flushOK = flushOK;
this.countDownLatch.countDown();
}
public boolean waitForFlush(long timeout) {
try {
this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
return this.flushOK;
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
}
}
/**
* GroupCommit Service
* 批量刷盘服务
*/
class GroupCommitService extends FlushCommitLogService {
//用来接收消息的队列,提供写消息
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
//用来读消息的队列,将消息从内存读到硬盘
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
//添加一个刷盘的request
public void putRequest(final GroupCommitRequest request) {
synchronized (this) {
//添加到写消息的list中
this.requestsWrite.add(request);
//唤醒其他线程
if (!this.hasNotified) {
this.hasNotified = true;
this.notify();
}
}
}
//交换读写队列,避免上锁
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
private void doCommit() {
//读队列不为空
if (!this.requestsRead.isEmpty()) {
//遍历
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
for (int i = 0; (i < 2) && !flushOK; i++) {
//
flushOK = (CommitLog.this.mapedFileQueue.getCommittedWhere() >= req.getNextOffset());
//如果没刷完 即flushOK为false则继续刷
if (!flushOK) {
CommitLog.this.mapedFileQueue.commit(0);
}
}
//刷完了唤醒
req.wakeupCustomer(flushOK);
}
long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
//清空读list
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mapedFileQueue.commit(0);
}
}
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
try {
this.waitForRunning(0);
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
//正常关闭时要把没刷完的刷完
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
this.swapRequests();
}
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
}
Асинхронная чистка (CommitLog.java):
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
//不停轮询
while (!this.isStoped()) {
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
//拿到要刷盘的页数
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
boolean printFlushProgress = false;
// Print flush progress
long currentTimeMillis = System.currentTimeMillis();
//控制刷盘间隔,如果当前的时间还没到刷盘的间隔时间则不刷
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
printFlushProgress = ((printTimes++ % 10) == 0);
}
try {
//是否需要刷盘休眠
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
}
if (printFlushProgress) {
this.printFlushProgress();
}
//commit开始刷盘
CommitLog.this.mapedFileQueue.commit(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}
// Normal shutdown, to ensure that all the flush before exit
boolean result = false;
for (int i = 0; i < RetryTimesOver && !result; i++) {
result = CommitLog.this.mapedFileQueue.commit(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
this.printFlushProgress();
CommitLog.log.info(this.getServiceName() + " service end");
}
6. Резюме и ссылки
резюме
Схема отправки сообщений:
Блок-схема хранения сообщений: