Кафка PHP наступает на яму (1)

PHP

Недавно появилась функция, которая требует использования очередей сообщений в проекте.Я нашел в Интернете несколько руководств и блогов по kafka, связанных с php, и большинство из них заключаются в установке расширения librdkafka для php (метод установки этого расширения здесь обсуждаться не будет, поищите там еще много руководств), а потом используйте это расширение для разработки напрямую, но когда я использую это расширение для разработки напрямую, я не знаю, почему оно не работает, и оно продолжает сообщать об ошибках (должно быть, я слишком наивен, хахахаха)... ...я нашел несколько связанных пакетов с github и хотел использовать их напрямую, но обнаружил, что многие пакеты созданы несколько лет назад, и они в основном версии kafka 0.x.

Я посмотрел пакет с наибольшим количеством звезд, ссылка:GitHub.com/Weibo AD/Кофе…

Позже я использовал пакет enqueue/rdkafka, ссылка:GitHub.com/PHP-очерёдность…

Ниже показан код, который я тестировал, он просто работает, но не соответствует ожиданиям, которые я хочу

режиссер

$connFactory = new RdKafkaConnectionFactory([
    'global' => [
        'metadata.broker.list' => '127.0.0.1:9092',
        'socket.timeout.ms' => '50'
    ]
]);
$context = $connFactory->createContext();
$message = $context->createMessage('hello world!');
$topic = $context->createTopic('app');
$context->createProducer()->send($topic, $message);

потребитель

$config = [
    'global' => [
        'group.id' => uniqid('', true),
        'metadata.broker.list' => '127.0.0.1:9092',
        'enable.auto.commit' => 'false',
    ],
    'topic' => [
        // 设置从最后一个offset开始读取消息,不会读取到之前的消息
        'auto.offset.reset' => 'latest',
    ],
];
$connFactory = new RdKafkaConnectionFactory($config);
$context = $connFactory->createContext();
$topic = $context->createTopic('app');
$consumer = $context->createConsumer($topic);
while (true) {
    $message = $consumer->receive(30 * 1000);
    if (!$message instanceof RdKafkaMessage && !$message instanceof Message) {
        var_dump($message);
        continue;
    }
    $consumer->acknowledge($message);
    var_dump($message->getBody());
}

Это может быть запущено и может отправлять и получать данные в обычном режиме.На самом деле, я хочу создать очередь, есть несколько производителей, и данные создаются для темы, а затем есть несколько потребителей, но таким образом, есть проблема, когда я запускаю несколько потребителей, каждый потребитель получит сообщение, отправленное производителем, что больше похоже на форму групповой публикации и групповой подписки.Это не тот результат, которого я хочу.Я пошел в Интернет, чтобы найти другое В учебнике некоторые люди говорят, что пока group_id отличается, все в порядке, но мой group_id все случайный, вряд ли он будет одинаковым, это может быть достигнуто разумом, но это невозможно, я также пытался использовать Queue для работы, но все потребители получат сообщение.

Пока я знаю только об этом месте, и мне все еще нужно потратить некоторое время, чтобы посмотреть на него снова.Если есть великий бог, который его видит, пожалуйста, дайте мне несколько указателей! !