Недавно появилась функция, которая требует использования очередей сообщений в проекте.Я нашел в Интернете несколько руководств и блогов по kafka, связанных с php, и большинство из них заключаются в установке расширения librdkafka для php (метод установки этого расширения здесь обсуждаться не будет, поищите там еще много руководств), а потом используйте это расширение для разработки напрямую, но когда я использую это расширение для разработки напрямую, я не знаю, почему оно не работает, и оно продолжает сообщать об ошибках (должно быть, я слишком наивен, хахахаха)... ...я нашел несколько связанных пакетов с github и хотел использовать их напрямую, но обнаружил, что многие пакеты созданы несколько лет назад, и они в основном версии kafka 0.x.
Я посмотрел пакет с наибольшим количеством звезд, ссылка:GitHub.com/Weibo AD/Кофе…
Позже я использовал пакет enqueue/rdkafka, ссылка:GitHub.com/PHP-очерёдность…
Ниже показан код, который я тестировал, он просто работает, но не соответствует ожиданиям, которые я хочу
режиссер
$connFactory = new RdKafkaConnectionFactory([
'global' => [
'metadata.broker.list' => '127.0.0.1:9092',
'socket.timeout.ms' => '50'
]
]);
$context = $connFactory->createContext();
$message = $context->createMessage('hello world!');
$topic = $context->createTopic('app');
$context->createProducer()->send($topic, $message);
потребитель
$config = [
'global' => [
'group.id' => uniqid('', true),
'metadata.broker.list' => '127.0.0.1:9092',
'enable.auto.commit' => 'false',
],
'topic' => [
// 设置从最后一个offset开始读取消息,不会读取到之前的消息
'auto.offset.reset' => 'latest',
],
];
$connFactory = new RdKafkaConnectionFactory($config);
$context = $connFactory->createContext();
$topic = $context->createTopic('app');
$consumer = $context->createConsumer($topic);
while (true) {
$message = $consumer->receive(30 * 1000);
if (!$message instanceof RdKafkaMessage && !$message instanceof Message) {
var_dump($message);
continue;
}
$consumer->acknowledge($message);
var_dump($message->getBody());
}
Это может быть запущено и может отправлять и получать данные в обычном режиме.На самом деле, я хочу создать очередь, есть несколько производителей, и данные создаются для темы, а затем есть несколько потребителей, но таким образом, есть проблема, когда я запускаю несколько потребителей, каждый потребитель получит сообщение, отправленное производителем, что больше похоже на форму групповой публикации и групповой подписки.Это не тот результат, которого я хочу.Я пошел в Интернет, чтобы найти другое В учебнике некоторые люди говорят, что пока group_id отличается, все в порядке, но мой group_id все случайный, вряд ли он будет одинаковым, это может быть достигнуто разумом, но это невозможно, я также пытался использовать Queue для работы, но все потребители получат сообщение.
Пока я знаю только об этом месте, и мне все еще нужно потратить некоторое время, чтобы посмотреть на него снова.Если есть великий бог, который его видит, пожалуйста, дайте мне несколько указателей! !