Разговор о ConsumeFromWhere из RocketMQ

RocketMQ

последовательность

В этой статье в основном изучается ConsumeFromWhere из RocketMQ.

ConsumeFromWhere

rocketmq-common-4.5.2-sources.jar!/org/apache/rocketmq/common/consumer/ConsumeFromWhere.java

public enum ConsumeFromWhere {
    CONSUME_FROM_LAST_OFFSET,

    @Deprecated
    CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
    @Deprecated
    CONSUME_FROM_MIN_OFFSET,
    @Deprecated
    CONSUME_FROM_MAX_OFFSET,
    CONSUME_FROM_FIRST_OFFSET,
    CONSUME_FROM_TIMESTAMP,
}
  • ConsumeFromWhere определяет значения перечисления CONSUME_FROM_LAST_OFFSET, CONSUME_FROM_FIRST_OFFSET, CONSUME_FROM_TIMESTAMP.

computePullFromWhere

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java

public class RebalancePushImpl extends RebalanceImpl {

	//......

    @Override
    public long computePullFromWhere(MessageQueue mq) {
        long result = -1;
        final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
        final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
        switch (consumeFromWhere) {
            case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
            case CONSUME_FROM_MIN_OFFSET:
            case CONSUME_FROM_MAX_OFFSET:
            case CONSUME_FROM_LAST_OFFSET: {
                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                if (lastOffset >= 0) {
                    result = lastOffset;
                }
                // First start,no offset
                else if (-1 == lastOffset) {
                    if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        result = 0L;
                    } else {
                        try {
                            result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                        } catch (MQClientException e) {
                            result = -1;
                        }
                    }
                } else {
                    result = -1;
                }
                break;
            }
            case CONSUME_FROM_FIRST_OFFSET: {
                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                if (lastOffset >= 0) {
                    result = lastOffset;
                } else if (-1 == lastOffset) {
                    result = 0L;
                } else {
                    result = -1;
                }
                break;
            }
            case CONSUME_FROM_TIMESTAMP: {
                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                if (lastOffset >= 0) {
                    result = lastOffset;
                } else if (-1 == lastOffset) {
                    if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        try {
                            result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                        } catch (MQClientException e) {
                            result = -1;
                        }
                    } else {
                        try {
                            long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
                                UtilAll.YYYYMMDDHHMMSS).getTime();
                            result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
                        } catch (MQClientException e) {
                            result = -1;
                        }
                    }
                } else {
                    result = -1;
                }
                break;
            }

            default:
                break;
        }

        return result;
    }

    //......
}
  • ComputePullFromWhere из RebalancePushImpl будет оценивать defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere()
  • Для CONSUME_FROM_LAST_OFFSET, если offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE) больше или равен 0, обновить результат до этого значения, если lastOffset равен -1, обновить результат до 0, когда mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX ) , в противном случае обновите результат до mQClientFactory.getMQAdminImpl().maxOffset(mq)
  • Для CONSUME_FROM_FIRST_OFFSET, если offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE) больше или равен 0, обновить результат до этого значения, если lastOffset равно -1, обновить результат до 0; для CONSUME_FROM_TIMESTAMP, если offsetStore.readOffset(mq, ReadOffsetType) .READ_FROM_STORE ) больше или равно 0, затем обновите результат до этого значения; если lastOffset равно -1, возьмите mQClientFactory.getMQAdminImpl().maxOffset(mq) для mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX), в противном случае возьмите defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp() для поиска QClientFactory.getMQAdminImpl().searchOffset и обновите возвращаемое значение до результата

резюме

  • ConsumeFromWhere определяет значения перечисления CONSUME_FROM_LAST_OFFSET, CONSUME_FROM_FIRST_OFFSET, CONSUME_FROM_TIMESTAMP; ComputePullFromWhere из RebalancePushImpl будет судить по умолчаниюMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere()
  • Если offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE) больше или равен 0, обновить результат до этого значения; для lastOffset равно -1 и mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX), CONSUME_FROM_LAST_OFFSET равно 0, CONSUME_FROM_TIMESTAMP равно mQClientFactory.getMQAdminImpl().maxOffset(mq)
  • В случае, когда lastOffset равен -1, но не q.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX), CONSUME_FROM_LAST_OFFSET принимает mQClientFactory.getMQAdminImpl().maxOffset(mq), CONSUME_FROM_TIMESTAMP принимает mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp)

doc