KafkaConsumer assign VS subscribe

Kafka

задний план

В kafka при нормальных обстоятельствах разные потребители с одним и тем же group.id не будут использовать один и тот же раздел, то есть раздел может использоваться только одним из потребителей с одним и тем же group.id в любой момент времени. Именно этот механизм обеспечивает важные возможности kafka:

  • 1. Пропускную способность можно повысить за счет увеличения разделов и потребителей;
  • 2. Убедитесь, что одно и то же сообщение не будет использовано несколько раз.

В классе KafkaConsumer (официальный API) потребители могут указать раздел темы для использования с помощью назначения и подписки. Конкретный исходный код может относиться к следующему,

Эти два интерфейса, кажется, выполняют одни и те же функции, но есть небольшие различия, и учащиеся, которые используют их впервые, могут быть сбиты с толку.Ниже мы подробно расскажем о различиях между ними.

сравнить результаты

  • KafkaConsumer.subscribe() : автоматически выделять разделы для потребителей.Существует внутренний алгоритм, гарантирующий, что тематические разделы будут равномерно распределены среди разных потребителей в одной и той же группе оптимальным образом.

  • KafkaConsumer.assign() : Вручную и явно укажите разделы темы, которые должны использоваться для потребителя, не ограниченные group.id, что совершенно недопустимо для указанной группы (этот метод не использует управление группой потребителя).

тестовый код

public class KafkaManualAssignTest {
    private static final Logger logger = LoggerFactory.getLogger(KafkaManualAssignTest.class);

    private static Properties props = new Properties();
    private static KafkaConsumer<String, String> c1, c2;

    private static final String brokerList = "localhost:9092";

    static {
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", "assignTest");
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "true");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        c1 = new KafkaConsumer<String, String>(props);
        c2 = new KafkaConsumer<String, String>(props);
    }

    public static void main(String[] args) {
        TopicPartition tp = new TopicPartition("topic", 0);
        // 采用assign方式显示的为consumer指定需要消费的topic, 具有相同group.id的两个消费者
        // 各自消费了一份数据, 出现了数据的重复消费
        c1.assign(Arrays.asList(tp));
        c2.assign(Arrays.asList(tp));


        // 采用subscribe方式, 利用broker为consumer自动分配topic-partitions,
        // 两个消费者各自消费一个partition, 数据互补, 无交叉.
        // c1.subscribe(Arrays.asList("topic"));
        // c2.subscribe(Arrays.asList("topic"));

        while (true) {
            ConsumerRecords<String, String> msg1 = c1.poll(1000L);
            if (msg1 != null) {
                for (ConsumerRecord m1 : msg1) {
                    logger.info("m1 offset : {} , value : {}", m1.offset(), m1.value());
                }
            }

           logger.info("=====================");
           ConsumerRecords<String, String> msg2 = c2.poll(1000L);
           if (msg2 != null) {
               for (ConsumerRecord m2 : msg2) {
                   logger.info("m2 offset : {} , value : {}", m2.offset(), m2.value());
               }
           }

           System.exit(0);
        }
    }
}

официальный API

Официальное объяснение о подписке:

/**
 * Subscribe to the given list of topics to get dynamically assigned partitions.
 * <b>Topic subscriptions are not incremental. This list will replace the current
 * assignment (if there is one).</b> It is not possible to combine topic subscription with group management
 * with manual partition assignment through {@link #assign(Collection)}.
 *
 * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
 *
 * <p>
 * This is a short-hand for {@link #subscribe(Collection, ConsumerRebalanceListener)}, which
 * uses a no-op listener. If you need the ability to seek to particular offsets, you should prefer
 * {@link #subscribe(Collection, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets
 * to be reset. You should also provide your own listener if you are doing your own offset
 * management since the listener gives you an opportunity to commit offsets before a rebalance finishes.
 *
 * @param topics The list of topics to subscribe to
 * @throws IllegalArgumentException If topics is null or contains null or empty elements
 * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called
 *                               previously (without a subsequent call to {@link #unsubscribe()}), or if not
 *                               configured at-least one partition assignment strategy
 */
@Override
public void subscribe(Collection<String> topics) {
    subscribe(topics, new NoOpConsumerRebalanceListener());
}

Официальное объяснение assign:

/**
 * Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment
 * and will replace the previous assignment (if there is one).
 * <p>
 * If the given list of topic partitions is empty, it is treated the same as {@link #unsubscribe()}.
 * <p>
 * Manual topic assignment through this method does not use the consumer's group management
 * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
 * metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(Collection)}
 * and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}.
 * <p>
 * If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new
 * assignment replaces the old one.
 *
 * @param partitions The list of partitions to assign this consumer
 * @throws IllegalArgumentException If partitions is null or contains null or empty topics
 * @throws IllegalStateException If {@code subscribe()} is called previously with topics or pattern
 *                               (without a subsequent call to {@link #unsubscribe()})
 */
@Override
public void assign(Collection<TopicPartition> partitions) {
    acquireAndEnsureOpen();
    try {
        if (partitions == null) {
            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
        } else if (partitions.isEmpty()) {
            this.unsubscribe();
        } else {
            Set<String> topics = new HashSet<>();
            for (TopicPartition tp : partitions) {
                String topic = (tp != null) ? tp.topic() : null;
                if (topic == null || topic.trim().isEmpty())
                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
                topics.add(topic);
            }

            // make sure the offsets of topic partitions the consumer is unsubscribing from
            // are committed since there will be no following rebalance
            this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());

            log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
            this.subscriptions.assignFromUser(new HashSet<>(partitions));
            metadata.setTopics(topics);
        }
    } finally {
        release();
    }
}

предположение

Рекомендуется использовать функцию подписки () для реализации распределения раздела.

Если вы четко не понимаете разделы тем (не темы), которые вам нужно потреблять, и вы можете быть уверены, что все ваши сообщения находятся в этих разделах тем, вы можете использовать assign.