последовательность
В этой статье в основном изучается ConsumeFromWhere из RocketMQ.
ConsumeFromWhere
rocketmq-common-4.5.2-sources.jar!/org/apache/rocketmq/common/consumer/ConsumeFromWhere.java
public enum ConsumeFromWhere {
CONSUME_FROM_LAST_OFFSET,
@Deprecated
CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
@Deprecated
CONSUME_FROM_MIN_OFFSET,
@Deprecated
CONSUME_FROM_MAX_OFFSET,
CONSUME_FROM_FIRST_OFFSET,
CONSUME_FROM_TIMESTAMP,
}
- ConsumeFromWhere определяет значения перечисления CONSUME_FROM_LAST_OFFSET, CONSUME_FROM_FIRST_OFFSET, CONSUME_FROM_TIMESTAMP.
computePullFromWhere
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
public class RebalancePushImpl extends RebalanceImpl {
//......
@Override
public long computePullFromWhere(MessageQueue mq) {
long result = -1;
final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
case CONSUME_FROM_MIN_OFFSET:
case CONSUME_FROM_MAX_OFFSET:
case CONSUME_FROM_LAST_OFFSET: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
}
// First start,no offset
else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
result = 0L;
} else {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
case CONSUME_FROM_FIRST_OFFSET: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
result = 0L;
} else {
result = -1;
}
break;
}
case CONSUME_FROM_TIMESTAMP: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
} else {
try {
long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
UtilAll.YYYYMMDDHHMMSS).getTime();
result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
default:
break;
}
return result;
}
//......
}
- ComputePullFromWhere из RebalancePushImpl будет оценивать defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere()
- Для CONSUME_FROM_LAST_OFFSET, если offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE) больше или равен 0, обновить результат до этого значения, если lastOffset равен -1, обновить результат до 0, когда mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX ) , в противном случае обновите результат до mQClientFactory.getMQAdminImpl().maxOffset(mq)
- Для CONSUME_FROM_FIRST_OFFSET, если offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE) больше или равен 0, обновить результат до этого значения, если lastOffset равно -1, обновить результат до 0; для CONSUME_FROM_TIMESTAMP, если offsetStore.readOffset(mq, ReadOffsetType) .READ_FROM_STORE ) больше или равно 0, затем обновите результат до этого значения; если lastOffset равно -1, возьмите mQClientFactory.getMQAdminImpl().maxOffset(mq) для mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX), в противном случае возьмите defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp() для поиска QClientFactory.getMQAdminImpl().searchOffset и обновите возвращаемое значение до результата
резюме
- ConsumeFromWhere определяет значения перечисления CONSUME_FROM_LAST_OFFSET, CONSUME_FROM_FIRST_OFFSET, CONSUME_FROM_TIMESTAMP; ComputePullFromWhere из RebalancePushImpl будет судить по умолчаниюMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere()
- Если offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE) больше или равен 0, обновить результат до этого значения; для lastOffset равно -1 и mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX), CONSUME_FROM_LAST_OFFSET равно 0, CONSUME_FROM_TIMESTAMP равно mQClientFactory.getMQAdminImpl().maxOffset(mq)
- В случае, когда lastOffset равен -1, но не q.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX), CONSUME_FROM_LAST_OFFSET принимает mQClientFactory.getMQAdminImpl().maxOffset(mq), CONSUME_FROM_TIMESTAMP принимает mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp)