Является ли Redis действительно однопоточным? В интернете много дискуссий по этому вопросу, и выводы практически единодушны. Прежде чем обсуждать этот вопрос, в этой статье сначала определяются концептуальные границы «единого потока» в задаче:
- 1. Один поток относится к «модели базовой сети».
- 2. Один поток относится к дизайну всей серверной архитектуры Redis.
Для границы 1 ответ — да. До Redis версии 6.0 сетевая модель Redis всегда была однопоточной. Даже в версии 6.0 выполнение всех клиентских команд по-прежнему выполняется в основном потоке. Да; для граница 2, ответ — нет, с момента выпуска у Redis было дваBIO(background I/O service)
Потоки используются для асинхронной обработки задач сохранения и закрытия файлов.В Redis v4.0 к асинхронно трудоемким командам добавлен еще один BIO-поток.В Redis v6.0 также трансформирована сетевая модель ядра Redis .. стать многопоточным.
При ограничениях концептуальной границы 2 мы можем заключить, что:Redis никогда не был однопоточным! С какими проблемами вы столкнулись за последние десять лет с момента выпуска Redis в процессе эволюции системной архитектуры? Как автор антирез думает об этих проблемах? Какие планы были приняты для улучшения? Изучение этих вопросов очень важно для развития разработчиков, и это также является целью написания этой статьи.Автор ответит на эти вопросы с читателями в сочетании с соответствующим исходным кодом.
1. Дизайн инфраструктуры Redis
Сервис с отличной производительностью неотделим от хорошей архитектуры.I/O multiplexing
Реализован один поток для получения массовых клиентских запросов; через один потокReactor
В модели реализована высокопроизводительная обработка событий, модель производитель-потребитель на основе условных переменных строит свою собственнуюBIO系统
. В этом разделе сначала кратко представлены эти схемы инфраструктуры, которые использовались с момента рождения Redis.
1.1 Redis-инкапсуляция мультиплексирования ввода-вывода
Мультиплексирование ввода-вывода относится к множеству сетевых сокетов ввода-вывода, мультиплексирующих один и тот же поток, что решает проблемуC10KЭта проблема. Redis инкапсулирует различные функции мультиплексирования ввода-вывода в один и тот же API для использования на верхнем уровне и может предоставлять услуги тысячам клиентов, просто обрабатывая сетевой ввод-вывод в одном потоке.
API-интерфейсы, предоставляемые модулем мультиплексирования ввода-вывода Redis:
//下面的方法不同版本的redis在src目录下的ae_epoll.c、ae_evport.c、ae_kqueue.c、ae_select.c代码文件中都有实现
static int aeApiCreate(aeEventLoop *eventLoop)
static int aeApiResize(aeEventLoop *eventLoop, int setsize)
static void aeApiFree(aeEventLoop *eventLoop)
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask)
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
Redis может работать на нескольких платформах, поэтому он будет выбирать различные функции мультиплексирования ввода-вывода в качестве подмодулей с помощью определений макросов и предоставлять интерфейс верхнего уровня для инкапсуляции в соответствии с различными платформами компиляции.
/*下面代码在Redis不同版本的ae.c源码文件中均包含
*Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
Redis предпочтет функции мультиплексирования ввода-вывода с временной сложностью 𝑂(1) в качестве базовой реализации, включая evport в Solaris 10, epoll в Linux и kqueue в macOS/FreeBSD. Функция select используется как системный вызов в стандарте POSIX и будет реализована в разных версиях операционной системы, поэтому она используется в качестве итоговой строки.
1.2 Однопоточный цикл событий — сетевая модель Reactor
В серверной архитектуре Reactor представляет собой шаблон проектирования, управляемый событиями. В настоящее время большинство основных высокопроизводительных сетевых библиотек/фреймворков на платформе Linux используют этот шаблон проектирования, например netty, libevent, libev, ACE, POE (Perl), Twisted (Python) и т. д.
Узор реактор по существу относится к использованиюI/O 多路复用(I/O multiplexing) + 非阻塞 I/O(non-blocking I/O)
режим.通常设置一个主线程负责做 event-loop 事件循环和 I/O 读写,通过 select/poll/epoll_wait 等系统调用监听 I/O 事件,业务逻辑提交给其他工作线程去做。而所谓『非阻塞 I/O』的核心思想是指避免阻塞在 read() 或者 write() 或者其他的 I/O 系统调用上,这样可以最大限度的复用 event-loop 线程,让一个线程能服务于多个 sockets。在 Reactor 模式中,I/O 线程只能阻塞在 I/O multiplexing 函数上(select/poll/epoll_wait)。单线程的Reactor网络模型是这样的:
Модель цикла с одним событием в Redis до версии 6.0 на самом деле является очень классической моделью Reactor, см. рисунок ниже:
Однопоточный Reactor Redis обычно работает следующим образом: «модуль мультиплексирования ввода-вывода» будет прослушивать несколько FD, когда эти FD генерируют, принимают, читают, пишут или закрывают файловые события. События доставляются в «файловый диспетчер событий». После того, как диспетчер файловых событий (диспетчер) получит событие, он отправит событие соответствующему обработчику в соответствии с типом события. Мы подробно рассмотрим этот процесс в следующей главе.
1.3 Модель производитель-потребитель, основанная на переменных состояния
Как мы упоминали ранее, у Redis есть собственный поток BIO с момента его выпуска, который в основном используется для обработки некоторых трудоемких фоновых задач. Redis использует модель производитель-потребитель, основанную на переменных условий, для разработки системы BIO. Redis разработан на языке C. Давайте возьмем Linux C в качестве примера, чтобы представить использование блокировок и общих переменных, а затем резюмируем дизайн этой модели.
1.3.1 Блокировка
В случае многопоточности использование блокировок в основном связано со следующими пятью функциями, которые включены в заголовочный файл pthread.h.
- pthread_mutex_init(pthread_mutex_t mutex,const pthread_mutexattr_t attr)
- pthread_mutex_lock(pthread_mutex_t *mutex)
- pthread_mutex_trylock(pthread_mutex_t *mutex)
- pthread_mutex_unlock(pthread_mutex_t *mutex)
- pthread_mutex_destroy(pthread_mutex_t *mutex)
Тип переменной блокировки — pthread_mutex_t, а использование блокировки состоит из трех шагов: инициализация блокировки, блокировка и снятие блокировки.
1.3.2 Инициализация блокировки
pthread_mutex_init Эта функция используется для инициализации блокировок.Чтобы использовать блокировки, вам сначала нужно объявить переменную pthread_mutex_t, а затем использовать эту функцию для инициализации следующим образом:
pthread_mutex_t mutex;
pthread_mutex_init(&mutex,NULL);
Во время инициализации второй параметр может использоваться для установки характера блокировки. После этого шага мы завершили инициализацию блокировки.Когда второй параметр установлен в NULL, один поток блокируется, а другой поток будет блокироваться до тех пор, пока другой поток не снимет блокировку.
1.3.3 Блокировка, снятие блокировки и восстановление ресурсов
Для блокировки можно использовать две функции pthread_mutex_lock и pthread_mutex_trylock. Эти две функции завершили функцию блокировки.После получения мьютекса после инициализации переменной функция может быть вызвана напрямую для завершения функции блокировки. Первая функция всегда будет блокироваться, если другой поток получил блокировку, в то время как вторая функция будет возвращаться напрямую без блокировки.
Функцию pthread_mutex_unlock можно использовать для снятия блокировок.
Функция pthread_mutex_destroy используется для освобождения ресурсов.В случае использования функции pthread_mutex_init для инициализации блокировки эту функцию необходимо использовать для освобождения ресурсов после окончания использования.
1.3.4 Общие переменные
Общие переменные используются в таком сценарии: поток сначала оценивает определенное условие, и если условие не выполняется, он входит в ожидание, когда условие выполняется, поток уведомляется о том, что условие выполнено, и продолжает выполнять задача. Функции, связанные с общими переменными, следующие:
- int pthread_cond_init(pthread_cond_t cond, pthread_condattr_t cond_attr)
- int pthread_cond_wait(pthread_cond_t cond, pthread_mutex_t mutex)
- int pthread_cond_signal(pthread_cond_t *cond)
- int pthread_cond_broadcast(pthread_cond_t *cond)
- int pthread_cond_timedwait(pthread_cond_t cond, pthread_mutex_t mutex, const struct timespec *abstime)
1.3.5 Инициализация общих переменных и условное ожидание
Общие переменные инициализируются с помощью функции pthread_cond_init.Чтобы использовать условные переменные, сначала объявите переменную pthread_cond_t, а затем используйте эту функцию для инициализации. Второй параметр использует значение NULL, которое можно использовать для определенных настроек параметров. После завершения инициализации нужно дождаться установления условия через pthread_cond_wait или pthread_cond_timedwait, при невыполнении условия функция переходит в ожидание, при выполнении условия функция прекращает ожидание и продолжает выполнение. Вторым параметром этой функции является тип pthread_mutex_t, потому что при оценке условия его необходимо сначала заблокировать, чтобы предотвратить ошибки, и необходимо активно выполнять блокировку этой переменной перед выполнением изменения функции.После ввода этой функции MUTEX будет разблокирован внутри; после выполнения функции (то есть после остановки блокировки), она будет переблокирована, конкретная причина будет объяснена после введения этой группы функций.Вторая функция может указывать время ожидания вместо постоянной блокировки.
1.3.6 Уведомление об удовлетворении условий
Как упоминалось выше, когда условия не выполняются, поток вызывает функцию pthread_cond_wait для блокировки и ожидания; в это время, если другие потоки проверяют выполнение условий, они могут вызвать pthread_cond_signal, pthread_cond_broadcast и позволить потоку в выполнение перезапуска состояния ожидания. При наличии нескольких ожидающих потоков можно вызвать функцию pthread_cond_signal для пробуждения одного из потоков, а функция pthread_cond_broadcast разбудит все ожидающие потоки.
1.3.7 Правильный способ реализации модели производитель-потребитель
Выше мы представили базовые функции блокировок и разделяемых переменных, а затем представим общий сценарий, в котором эти две функции используются вместе: есть два потока, один из которых сначала проверяет условие, и это действие проверки должно быть заперт первым. Если условие выполнено, выполнить операцию, в противном случае заблокировать и ждать, пока условие не будет выполнено, поток будет уведомлен о продолжении выполнения; другой поток сначала заблокируется, затем установит условие в true и уведомит другие ожидающие потоки о том, что условие было выполнено и можно продолжать реализацию.
Выше было сказано, что при проверке общих переменных необходима блокировка, причина объясняется следующим псевдокодом.
Первый случай:
线程1
pthread_mutex_lock(&mutex);
while (condition == FALSE) {
pthread_cond_wait(&cond, &mutex);
}
pthread_mutex_unlock(&mutex);
线程2
condition = TRUE;
pthread_cond_signal(&cond);
Видно, что поток 1 сначала проверяет, установлено ли условие, и вызывает функцию ожидания для ожидания, если оно не установлено, перед этим на этом шаге выполняется операция блокировки. Поток 2 устанавливает условие в значение true (при условии, что он каким-то образом знает, что условие должно быть истинным в данный момент), а затем использует функцию pthread_cond_signal, чтобы уведомить поток 1 о прекращении блокировки и продолжении выполнения. Вышеупомянутая программа имеет следующие проблемы, когда несколько потоков выполняются одновременно: Если поток 1 оценивает первым и обнаруживает, что условие не выполнено, он готов ждать.В это время условие в потоке 2 устанавливается в true, и отправляется уведомление, а затем поток 1 блокируется и ждет. случае поток 1 пропускает уведомление, заставляя его ждать. Если условие выполнено, он все еще блокируется и ожидает.
线程1 线程2
pthread_mutex_lock(&mutex);
while (condition == FALSE)
condition = TRUE;
pthread_cond_signal(&cond);
pthread_cond_wait(&cond, &mutex);
Для решения указанной выше проблемы программа была доработана следующим образом. За счет операции блокировки потока 2 такой проблемы можно избежать. Это также объясняет, почему функцию pthread_cond_wait нужно разблокировать после входа.Если она не разблокирована, то поток 2 не может выполнить операцию, условие которой установлено в true, потому что поток 1 уже заблокировал эту переменную перед входом и ожиданием. поток 1 всегда будет ждать, и поток 2 также будет ждать, что приведет к взаимоблокировке.
线程1 线程2
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
while (condition == FALSE) { condition = TRUE;
pthread_cond_wait(&cond, &mutex); pthread_cond_signal(&cond);
} pthread_mutex_unlock(&mutex);
pthread_mutex_unlock(&mutex);
Поскольку ожидание необходимо снова заблокировать при повторном выполнении, после вызова pthread_cond_signal выше блокировку необходимо снять, чтобы завершить ожидание. Кроме того, вы также можете сначала разблокировать, а затем вызвать pthread_cond_signal, оба варианта правильные. Хотя доступ к общим переменным обычно требует блокировки, в этом сценарии конкуренция, вызванная отсутствием блокировки, не приведет к ошибкам, но вызовет проблемы с эффективностью планирования потоков, поэтому его также можно записать таким образом, но стандартная запись метод обычно рекомендуется.
Для условных переменных базовый сценарий использования заключается в том, что некоторые потоки оценивают условия, и если условия не выполняются, они переходят в состояние ожидания. Перед выполнением оценки условия сначала выполняется операция блокировки. Другие потоки отвечают за присвоение условию значения true, а затем уведомляют ожидающий поток о продолжении выполнения. операции.
Кратко опишите модель производитель-потребитель на основе реализации условной переменной, представленной в этом разделе: потоки класса A последовательно выполняют блокировку, проверку (ожидание, если условие не установлено, и снова входят в фазу проверки, если оно установлено), выполнение и разблокировать; в то же время потоки класса B последовательно выполняют блокировку, условие установлено в true, уведомление, разблокировку.
2. Эволюция архитектуры модели базовой сети Redis
Из истории развития Redis мы видим, что его базовая сетевая модель эволюционировала от однопоточной к многопоточной. Так как же реализована однопоточная модель ввода-вывода Redis? Что не так с однопоточностью? Как реализован многопоточный ввод-вывод Redis? Зачем это делать?В этом разделе мы рассмотрим эти вопросы.
2.1 Рабочий процесс однопоточного ввода-вывода
Как мы упоминали ранее, до версии Redis 6.0 его базовой сетевой моделью всегда была однопоточная модель Reactor, использующая технологии мультиплексирования, такие как epoll/select/kqueue, для непрерывной обработки клиентских запросов в однопоточном цикле обработки событий и, наконец, для записи вернуть данные ответа клиенту:
Давайте сначала изучим некоторые основные концепции сетевой модели Redis:
-
client
: клиентский объект, Redis представляет собой типичную архитектуру CS (Клиент Сервер), клиент устанавливает сетевой канал с сервером через сокет и затем отправляет команду запроса, а сервер выполняет запрошенную команду и отвечает. Redis использует структуру client для хранения всей соответствующей информации о клиенте, включая, помимо прочего, соединение с инкапсулированным сокетом *conn, текущий выбранный указатель базы данных *db, чтение в буфере querybuf, запись буфера buf и запись Ответ связанного списка данных Подождите.
/* client对象在redis不同版本的redis.h中均有定义
* With multiplexing we need to take per-client state.
* Clients are taken in a linked list. */
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
int fd; /* Client socket. */
redisDb *db; /* Pointer to currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
sds pending_querybuf; /* If this client is flagged as master, this buffer
represents the yet not applied portion of the
replication stream that we are receiving from
the master. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
...... //此处省略很多其它的属性
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;
-
aeApiPoll
: Как мы упоминали ранее, API мультиплексирования ввода-вывода основан на инкапсуляции системных вызовов, таких как epoll_wait/select/kevent. Он прослушивает и ожидает запуска событий чтения и записи, а затем обрабатывает их. Цикл событий (Event Loop) Основная функция является основой для работы, управляемой событиями. -
acceptTcpHandler
: Процессор ответа на соединение, нижний уровень использует системный вызов accept для принятия новых соединений от клиента, и регистрирует обработчик чтения команды привязки для нового соединения для последующей обработки новых клиентских TCP-соединений; помимо этого процессора существуют соответствующие acceptUnixHandler отвечает за обработку доменных сокетов Unix, а acceptTLSHandler отвечает за обработку зашифрованных соединений TLS. -
readQueryFromClient
: Команда читает процессор, анализ и выполняет команду запроса клиента. -
beforeSleep
: Функция, которая будет выполняться перед входом aeApiPoll в цикл событий для ожидания прихода события, включающая в себя некоторые ежедневные задачи, такие как запись ответа в client->buf или client->reply (объясним, почему тут нужно два буфера) Клиенту сохранить данные буфера AOF на диск и т.д. Соответственно есть и функция afterSleep, которая выполняется после aeApiPoll. -
sendReplyToClient
: Процессор ответов на команды. Когда после цикла событий в буфере записи остаются данные, процессор регистрируется и привязывается к соответствующему соединению. Когда соединение инициирует событие готовности к записи, оно записывает буфер. Остальные данные в области записываются обратно клиенту.
Затем автор описывает рабочий процесс сервера Redis:
- Сервер Redis запускается, открывает цикл событий основного потока (Event Loop), регистрирует обработчик ответа на соединение acceptTcpHandler в файловом дескрипторе, соответствующем настроенному пользователем порту прослушивания, и ожидает поступления нового соединения;
- Клиент и сервер устанавливают сетевое соединение;
- acceptTcpHandler вызывается, основной поток использует API AE для привязки обработчика чтения команды readQueryFromClient к файловому дескриптору, соответствующему новому соединению, и инициализирует клиента для привязки клиентского соединения;
- Клиент отправляет команду запроса, запуская событие готовности к чтению, и основной поток вызывает readQueryFromClient для чтения команды, отправленной клиентом через сокет, и сохранения ее в буфере чтения client->querybuf;
- Затем вызовите processInputBuffer, используйте processInlineBuffer или processMultibulkBuffer для разбора команды в соответствии с протоколом Redis и, наконец, вызовите processCommand для выполнения команды;
- В соответствии с типом запрошенной команды (SET, GET, DEL, EXEC и т. д.) назначьте соответствующий исполнитель команды для выполнения и, наконец, вызовите ряд функций из семейства функций addReply для записи данных ответа в команду записи. выходной буфер соответствующего клиента: client->buf или client->reply, client->buf является предпочтительным буфером записи с фиксированным размером 16 КБ. клиент должен ответить в течение очень большого временного окна, затем он автоматически переключится на связанный список клиент-> ответ.Использование связанного списка теоретически может сохранить бесконечные данные (ограниченные физической памятью машины) и, наконец, добавить клиента в очередь LIFO client_pending_write;
- В цикле событий (Event Loop) основной поток выполняет beforeSleep --> handleClientsWithPendingWrites, проходит через очередь client_pending_write и вызывает writeToClient для обратной записи данных из буфера записи клиента клиенту. буфер записи. Затем зарегистрируйте команду sendReplyToClient, чтобы ответить процессору на событие готовности к записи соединения, и продолжайте записывать остаточные данные ответа в цикле обработки событий, когда клиент доступен для записи.
Автор составил более подробную рабочую блок-схему однопоточного ввода-вывода Redis. Заинтересованные читатели могут взглянуть на описанный выше процесс.
2.2 В чем проблема с одним потоком?
Все мы знаем, что однопоточная программа не может воспользоваться многоядерным процессором сервера, так почему Redis использует один поток в первые дни? Давайте сначала посмотримRedis
Дано официальноFAQ
Основное значение: ЦП не является ограничениемRedis
Узкое место производительности в основном ограничено объемом памяти и сетевым вводом-выводом, поэтому нет проблем с использованием одного потока для модели базовой сети Redis.Если вы хотите использовать многоядерный процессор службы, вы можете использовать его. на сервере Запустите несколько экземпляров на сервере или используйте сегментированный кластер.
Из анализа однопоточной модели Reactor Redis в предыдущем разделе мы знаем, что поток ввода-вывода Redis неблокируется, за исключением ожидания событий, и процессорное время не тратится впустую, поэтому Redis может предоставить высокопроизводительные услуги причина. Однако помимо того, что выше мы можем использовать только одно ядро процессора, у этой модели есть два недостатка:
- Если значение относительно велико, служба Redis будет заблокирована.
- QPS сложно поднять на более высокий уровень
Время основного потока Redis потребляется в основном в двух местах: потребление (вычисление ЦП), логическое вычисление и, поскольку синхронная запись ввода-вывода (мультиплексирование ввода-вывода), режим ядра и пользовательский режим копируют данные друг с другом. Когда значение большое, узкие места Redis сначала появляются в синхронизации ввода-вывода. Здесь, если есть несколько потоков, чтобы разделить эту часть давления, которое Redis QPS может значительно улучшиться, то есть для достижения многопоточной модели сети Redis идеи.
2.3 Реализация многопоточной сетевой модели Redis
Redis официально представила многопоточность в базовой модели сети после версии 6.0. Ранее мы упоминали, что Redis использовала классическую однопоточную модель Reactor до версии 6.0. Вообще говоря, однопоточная модель архитектуры Reactor представила больше. режим Multi-Reactors, и его рабочий режим выглядит следующим образом:
В отличие от режима одиночного реактора, этот режим больше не является однопоточным циклом событий, а каждый из нескольких потоков (вспомогательных реакторов) поддерживает независимый цикл событий, а главный реактор отвечает за получение новых подключений и распределение их между подреакторами. быть независимой обработкой, и, наконец, подреакторы записывают ответ клиенту.
Режим Multiple Reactors обычно может быть эквивалентен режиму Master-Workers. Например, Nginx и Memcached используют эту модель многопоточности. Хотя детали реализации разных проектов немного отличаются, общий режим одинаков.
2.3.1 Рабочий процесс модели многопоточной сети
Хотя Redis также реализует многопоточность, это не стандартная модель Multi-Reactors/Master-Workers.Давайте взглянем на общий дизайн многопоточной сетевой модели Redis:
- Сервер Redis запускается, открывает цикл событий основного потока (Event Loop), регистрирует обработчик ответа на соединение acceptTcpHandler в файловом дескрипторе, соответствующем настроенному пользователем порту прослушивания, и ожидает поступления нового соединения;
- Клиент и сервер устанавливают сетевое соединение;
- acceptTcpHandler вызывается, основной поток использует API AE для привязки обработчика чтения команды readQueryFromClient к файловому дескриптору, соответствующему новому соединению, и инициализирует клиента для привязки клиентского соединения;
- Клиент отправляет команду запроса для запуска события готовности к чтению.Главный поток сервера не читает команду запроса клиента через сокет, а сначала помещает клиента в LIFO-очередь client_pending_read;
- В цикле событий (Event Loop) основной поток выполняет beforeSleep --> handleClientsWithPendingReadsUsingThreads, использует стратегию балансировки нагрузки циклического опроса и равномерно распределяет соединения в очереди client_pending_read на соответствующие локальные очереди задач FIFO ввода-вывода. threads io_threads_list[ id] и сам основной поток, поток ввода-вывода читает команду запроса клиента через сокет, сохраняет ее в client->querybuf и разбирает первую команду, но не выполняет команду, основной поток занят опрос, ожидание завершения всеми потоками ввода-вывода задачи чтения;
- Основной поток и все потоки ввода-вывода завершили задачу чтения. Основной поток завершает опрос занятости, проходит через очередь client_pending_read и выполняет команды запроса для всех клиентских подключений. Сначала вызовите processCommandAndResetClient, чтобы выполнить первую проанализированную команду, и затем вызов processInputBuffer анализирует и выполняет все команды, подключенные клиентом, использует processInlineBuffer или processMultibulkBuffer для анализа команд в соответствии с протоколом Redis и, наконец, вызывает processCommand для выполнения команд;
- В соответствии с типом запрошенной команды (SET, GET, DEL, EXEC и т. д.) назначьте соответствующий исполнитель команды для выполнения и, наконец, вызовите ряд функций из семейства функций addReply для записи данных ответа в команду записи. выходной буфер соответствующего клиента: client->buf или client->reply, client->buf является предпочтительным буфером записи с фиксированным размером 16 КБ. клиент должен ответить в течение очень большого временного окна, затем он автоматически переключится на связанный список клиент-> ответ.Использование связанного списка теоретически может сохранить бесконечные данные (ограниченные физической памятью машины) и, наконец, добавить клиента в очередь LIFO client_pending_write;
- В цикле событий (Event Loop) основной поток выполняется beforeSleep --> handleClientsWithPendingWritesUsingThreads, использует стратегию балансировки нагрузки циклического опроса и равномерно распределяет соединения в очереди client_pending_write на соответствующие локальные очереди задач FIFO ввода-вывода. потоки io_threads_list[ id] и сам основной поток, поток ввода-вывода записывает данные в буфер записи клиента обратно клиенту, вызывая writeToClient, основной поток занят опросом и ожидает, пока все потоки ввода-вывода завершат работу. выполнить задание на выписку;
- Основной поток и все потоки ввода-вывода завершили задачу записи. Основной поток завершает опрос занятости и проходит через очередь client_pending_write. Если в буфере записи клиента еще остались данные, зарегистрируйте sendReplyToClient в Событие готовности к записи соединения и ожидание.Когда клиент доступен для записи, он продолжает записывать остаточные данные ответа в цикле обработки событий.
Большая часть логики здесь согласуется с предыдущей однопоточной моделью. Единственное изменение заключается в том, что логика чтения команд запроса клиента и записи данных ответа является асинхронной и передается для завершения потоку ввода-вывода. быть оплачены здесь.Поток ввода-вывода только читает и анализирует клиентскую команду, не выполняя ее фактически.Выполнение клиентской команды в конечном итоге вернется к основному потоку для завершения.
Автор также составил более подробную рабочую блок-схему многопоточного ввода-вывода Redis. Заинтересованные читатели могут взглянуть на описанный выше процесс.
2.3.2 Анализ исходного кода многопоточного ввода/вывода
Затем автор использует Redis v6.0.1 для анализа более важного исходного кода в реализации многопоточного ввода-вывода.
Давайте сначала посмотрим на инициализацию многопоточности:
/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */
//如果只配置了一个线程,则所有的I/O放到主线程上执行
if (server.io_threads_num == 1) return;
//最多配置的线程数量不超过128
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
//启动线程,线程数量为配置中的线程数
for (int i = 0; i < server.io_threads_num; i++) {
//创建I/O线程的本地任务队列
io_threads_list[i] = listCreate();
if (i == 0) continue; //0号线程是主线程
//初始化 I/O 线程并启动。
pthread_t tid;
// 每个 I/O 线程会分配一个本地锁,用来休眠和唤醒线程。
pthread_mutex_init(&io_threads_mutex[i],NULL);
// 每个 I/O 线程分配一个原子计数器,用来记录当前遗留的任务数量。
io_threads_pending[i] = 0;
// 主线程在启动 I/O 线程的时候会默认先锁住它,直到有 I/O 任务才唤醒它。
pthread_mutex_lock(&io_threads_mutex[i]);
// 启动线程,进入 I/O 线程的主逻辑函数 IOThreadMain
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
initThreadedIO
Он будет вызываться в конце запуска сервера Redis для выполнения инициализации многопоточности ввода-вывода. Многопоточный режим Redis отключен, и пользователю необходимо включить и установить его в конфигурационном файле redis.conf:
io-threads 2
io-threads-do-reads yes
Обратите внимание, что число потоков ввода-вывода, заданное при настройке io-theads, также включает основной поток. io-threads-do-reads определяет, участвует ли поток ввода-вывода в обработке чтения ввода-вывода. Это связано с тем, что чтение Параллельная обработка ввода-вывода не очевидно для повышения производительности Redis.
Давайте еще раз посмотрим на исходный код запроса на чтение. Когда клиент отправляет команду запроса, будет запущен цикл событий основного потока Redis, и будет вызван командный процессор readQueryFromClient.В предыдущей однопоточной модели этот метод будет напрямую читать и анализировать клиентскую команду и выполнить его, но в многопоточном режиме клиент будет добавлен в очередь задач client_pending_read, а затем основной поток будет назначен потоку ввода-вывода для чтения команды запроса клиента:
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;
//如果开启了多线程,将client加入到异步队列之后返回
if (postponeClientRead(c)) return;
...... //省略代码,逻辑同单线程版本几乎一样
}
/* Return 1 if we want to handle the client read later using threaded I/O.
* This is called by the readable handler of the event loop.
* As a side effect of calling this function the client is put in the
* pending read clients and flagged as such. */
int postponeClientRead(client *c) {
//当多线程 I/O 模式开启、主线程没有在处理阻塞任务时,将 client 加入异步队列。
if (io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
// 给 client 打上 CLIENT_PENDING_READ 标识,表示该 client 需要被多线程处理,
// 后续在 I/O 线程中会在读取和解析完客户端命令之后判断该标识并放弃执行命令,让主线程去执行。
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
Основной поток вызовет handleClientsWithPendingReadsUsingThreads в методе beforeSleep() цикла обработки событий:
/* When threaded I/O is also enabled for the reading + parsing side, the
* readable handler will just put normal clients into a queue of clients to
* process (instead of serving them synchronously). This function runs
* the queue using the I/O threads, and process them in order to accumulate
* the reads in the buffers, and also parse the first command available
* rendering it in the client structures. */
int handleClientsWithPendingReadsUsingThreads(void) {
if (!io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
// 遍历待读取的 client 队列 clients_pending_read,
// 通过 RR 轮询均匀地分配给 I/O 线程和主线程自己(编号 0)。
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
// 设置当前 I/O 操作为读取操作,给每个 I/O 线程的计数器设置分配的任务数量,
// 让 I/O 线程可以开始工作:只读取和解析命令,不执行。
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
// 主线程自己也会去执行读取客户端请求命令的任务,以达到最大限度利用 CPU。
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
// 忙轮询,累加所有 I/O 线程的原子任务计数器,直到所有计数器的遗留任务数量都是 0,
// 表示所有任务都已经执行完成,结束轮询。
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
// 遍历待读取的 client 队列,清除 CLIENT_PENDING_READ 和 CLIENT_PENDING_COMMAND 标记,
// 然后解析并执行所有 client 的命令。
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
// client 的第一条命令已经被解析好了,直接尝试执行。
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
continue;
}
}
// 继续解析并执行 client 命令。
processInputBuffer(c);
}
return processed;
}
Основная работа, выполняемая основным потоком, заключается в следующем:
- Пройдите клиентскую очередь client_pending_read для чтения и назначьте все задачи потоку ввода-вывода и основному потоку с помощью стратегии RR для чтения и анализа клиентских команд.
- Опрос занятости ожидает, пока все потоки ввода-вывода завершат свои задачи.
- После того, как основной поток и другие операции ввода-вывода завершат обработку запроса на чтение, они, наконец, перейдут к client_pending_read и выполнят все клиентские команды.
В соответствии с чтением ответ естественно записывается обратно.После завершения чтения, разбора и выполнения команды данные ответа клиентской команды сохраняются в client->buf или client->reply.Далее нужны данные ответа для записи обратно клиенту или в beforeSleep основной поток вызывает handleClientsWithPendingWritesUsingThreads:
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
// 如果用户设置的 I/O 线程数等于 1 或者当前 clients_pending_write 队列中待写出的 client
// 数量不足 I/O 线程数的两倍,则不用多线程的逻辑,让所有 I/O 线程进入休眠,
// 直接在主线程把所有 client 的相应数据回写到客户端。
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
// 唤醒正在休眠的 I/O 线程(如果有的话)。
if (!io_threads_active) startThreadedIO();
if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
// 遍历待写出的 client 队列 clients_pending_write,
// 通过 RR 轮询均匀地分配给 I/O 线程和主线程自己(0号线程)。
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
// 设置当前 I/O 操作为写出操作,给每个 I/O 线程的计数器设置分配的任务数量,
// 让 I/O 线程可以开始工作,把写出缓冲区(client->buf 或 c->reply)中的响应数据回写到客户端。
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
// 主线程自己也会去执行读取客户端请求命令的任务,以达到最大限度利用 CPU。
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
// 忙轮询,累加所有 I/O 线程的原子任务计数器,直到所有计数器的遗留任务数量都是 0。
// 表示所有任务都已经执行完成,结束轮询。
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");
// 最后再遍历一次 clients_pending_write 队列,检查是否还有 client 的中写出缓冲区中有残留数据,
// 如果有,那就为 client 注册一个命令回复器 sendReplyToClient,等待客户端写就绪再继续把数据回写。
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 检查 client 的写出缓冲区是否还有遗留数据。
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
return processed;
}
Подобно запросам на чтение, основная задача основного потока по обработке ответов обратной записи заключается в следующем:
- Проверьте текущую загрузку задачи, если текущего количества задач недостаточно для обработки в многопоточном режиме, приостановите поток ввода-вывода и напрямую синхронно запишите данные ответа обратно клиенту.
- Пробудите спящие потоки ввода-вывода (если есть).
- Пройдите клиентскую очередь client_pending_write для записи и назначьте все задачи потоку ввода-вывода и основному потоку с помощью стратегии RR, чтобы записать данные ответа обратно клиенту.
- Опрос занятости ожидает, пока все потоки ввода-вывода завершат свои задачи.
- Наконец, пройдите client_pending_write, зарегистрируйте обработчик ответа на команду sendReplyToClient для тех клиентов, у которых все еще есть данные ответа, и продолжайте записывать оставшиеся данные ответа в цикле обработки событий после ожидания, когда клиент станет доступным для записи.
Выше мы разобрали работу основного потока, а теперь давайте посмотрим, что сделала основная логика потока ввода/вывода:
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
char thdname[16];
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
while(1) {
// 忙轮询,100w 次循环,等待主线程分配 I/O 任务。
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
// 如果 100w 次忙轮询之后如果还是没有任务分配给它,则通过尝试加锁进入休眠,
// 等待主线程分配任务之后调用 startThreadedIO 解锁,唤醒 I/O 线程去执行。
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(io_threads_pending[id] != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
// 注意:主线程分配任务给 I/O 线程之时,
// 会把任务加入每个线程的本地任务队列 io_threads_list[id],
// 但是当 I/O 线程开始执行任务之后,主线程就不会再去访问这些任务队列,避免数据竞争。
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 如果当前是写出操作,则把 client 的写出缓冲区中的数据回写到客户端。
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
// 如果当前是读取操作,则socket 读取客户端的请求命令并解析第一条命令。
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
// 所有任务执行完之后把自己的计数器置 0,主线程通过累加所有 I/O 线程的计数器
// 判断是否所有 I/O 线程都已经完成工作。
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id);
}
}
После того, как поток ввода-вывода запущен, он сначала войдет в опрос занятости, чтобы определить количество задач в атомарном счетчике, если он не равен нулю, это означает, что основной поток назначил ему задачу и начинает выполнение задача, иначе он был занят опросом миллион раз. Подождите, проверьте счетчик после завершения опроса занятости. Если он все еще равен 0, попробуйте добавить локальную блокировку, потому что основной поток уже заблокировал локальные блокировки всех I/ Потоки O заранее запускают поток ввода-вывода, поэтому поток ввода-вывода O будет спать, ожидая пробуждения основного потока.
Основной поток будет пытаться вызвать startThreadedIO в каждом цикле событий, чтобы разбудить поток ввода-вывода для выполнения задачи. к флагу io_threads_op, установленному основным потоком.Для задачи чтения и анализа или обратной записи данных ответа после получения уведомления от основного потока поток ввода-вывода будет проходить через свою собственную локальную очередь задач io_threads_list[id], и вывести каждого клиента для выполнения задачи:
- Если в настоящее время это операция записи, вызовите writeToClient для обратной записи данных ответа в client->buf или client->reply клиенту через сокет.
- Если текущая операция является операцией чтения, вызовите readQueryFromClient, прочитайте команду клиента через сокет, сохраните ее в client->querybuf, а затем вызовите processInputBuffer для анализа команды.В конце концов, будет проанализирована только первая команда, и то это закончится, нет для выполнения команды.
- Установите его атомарный счетчик в 0 после выполнения всех задач, чтобы сообщить основному потоку, что он завершил свою работу.
До сих пор мы анализировали основной исходный код работы многопоточного ввода-вывода.
2.3.3 Особенности многопоточного ввода/вывода без блокировок
Внимательные читатели могут обнаружить, что многопоточный дизайн ввода-вывода Redis не требует блокировки, а блокировка всегда была особенностью многопоточного дизайна. Redis реализуется с помощью атомарных операций + чередующийся доступ. /O threads: счетчик io_threads_pending, идентификатор ввода-вывода io_threads_op и локальная очередь задач io_threads_list.
io_threads_pending является атомарной переменной и не требует защиты от блокировки Две переменные io_threads_op и io_threads_list позволяют избежать проблемы конкуренции за общие данные, контролируя чередующийся доступ основного потока и потока ввода-вывода: после запуска потока ввода-вывода он пропустит опрос занятости и Блокировка спит и ждет сигнала основного потока.До этого он не будет обращаться к собственной локальной очереди задач io_threads_list[id], а основной поток разбудит ввод-вывод после того, как все задачи будут выделяется в локальную очередь каждого потока ввода-вывода.Поток вывода начинает работать, и основной поток будет обращаться только к своей собственной локальной очереди задач io_threads_list[0] во время выполнения потока ввода-вывода и не будет обращаться к локальная очередь потока ввода-вывода, которая гарантирует, что основной поток всегда будет обращаться к io_threads_list до потока ввода-вывода и не будет обращаться после него, обеспечивая доступ с чередованием. io_threads_op Аналогично, основной поток установит значение io_threads_op до пробуждения потока ввода-вывода и не будет обращаться к этой переменной во время выполнения потока ввода-вывода.
2.3.4 Повышение производительности и недостатки конструкции многопоточного ввода-вывода
Фактически,Redis
Эффект повышения производительности за счет многопоточности может быть лучше, чем вы думаете. Ниже представлен отчет о результатах стресс-тестирования, извлеченных из платформы ITNEXT. Для получения более подробной информации см.:Сравнительный анализ экспериментального многопоточного ввода-вывода Redis
Видно, что при настройке 24 IO-потоков выполняются обычные команды GET и SET,Redis
Он может достигать 20 Вт+ запросов в секунду, что почти удваивает производительность по сравнению с однопоточным.
Как мы упоминали ранее, многопоточная сетевая модель Redis не использует стандартную модель Multi-Reactors/Master-Workers для обеспечения совместимости со старыми версиями. Выполнение клиентских команд по-прежнему выполняется в основном потоке. И связь между основным потоком и потоком ввода-вывода также грубо решается через опрос занятости и блокировки, что приведет к тому, что Redis будет иметь проблему простоя процессора в течение периода обслуживания.Позже, после того, как мы изучим реализацию системы BIO, мы обнаружим, что поток BIO и основной поток. Связь между ними гораздо более элегантна, чем реализация связи в базовой многопоточной сетевой модели Redis. Я верю, что автор Redis в будущем улучшит текущую схему.
3. Эволюция системы Redis BIO
Вывод дан в начале этой статьи: Redis никогда не был однопоточным для проектирования всей серверной архитектуры. Единственный поток, который мы обсуждали, предназначен только для базовой сетевой модели Redis, Мы уже подробно проанализировали эту часть в предыдущем разделе. БИО-система Redis, то есть эти так называемые фоновые потоки, тоже претерпела эволюционный процесс, что произошло в ходе этого процесса? Какие оптимизации сделал антирез, автор Redis, для системы BIO? В этом разделе мы рассмотрим эти два вопроса вместе.
3.1 Ранняя биосигнализация и реализация
Ранний Redis выполнял через биосистему две вещи: одну — выполнять Aof persistence, то есть синхронизировать данные, записанные в кэш страниц системы, на диск, а другую — закрывать файл. Чтобы выполнить эту задачу, он использует метод очереди задач, каждая задача выполняется потоком, задача будет помещена в очередь задач, а затем забрана исполняющим потоком задачи, если очередь пуста, она будет блокироваться и ждать, если при наличии задачи в очереди рабочий поток уведомляется, что достигается через условные переменные. Представлены следующие три аспекта: инициализация задачи, очередь задач и вывод задачи из очереди, а сохранение состояния используется в качестве примера для иллюстрации его использования в системе.Содержимое и анализ кода в этом разделе основаны на Redis версии 3.2.3.
3.1.1 Инициализация задачи
Для задачи, такой как задача сохраняемости, сначала должна быть инициализирована очередь, и для создания этой очереди в redis используется собственная структура связанного списка redis. Эта очередь должна соответствовать следующим характеристикам:
- Производители ставят задачи в очередь.
- Если очередь не пуста, потребитель берет задачу из очереди, в противном случае потребитель переходит в состояние ожидания.
Потребителем здесь является служебный поток, и для того, чтобы завершить функцию ожидания, когда очередь опустеет, redis использует механизм условных переменных. Его код инициализации выглядит следующим образом.
static pthread_t bio_threads[BIO_NUM_OPS];
static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
static pthread_cond_t bio_condvar[BIO_NUM_OPS];
static list *bio_jobs[BIO_NUM_OPS];
Приведенная выше константа BIO_NUM_OPS = 2 указывает на то, что поддерживаются две задачи. Для каждой задачи есть список для размещения задач, переменные pthread_cond_t и pthread_mutex_t для управления параллелизмом и pthread_t для потоков фоновой службы. Инициализация использует функцию bioInit, и часть кода выглядит следующим образом:
for (j = 0; j < BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_condvar[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}//初始化锁与条件变量
......
......
for (j = 0; j < BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
//这里的函数参数是arg = j,也就是每个线程传入一个编号j,0代表关闭文件,1代表aof初始化
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}//初始化线程
После завершения задачи инициализации у нас есть связанный список BIO_NUM_OPS (значение которого равно 2) для представления очереди задач, два потока вызывают функцию bioProcessBackgroundJobs, параметр — число j, и каждая очередь имеет инициализированные блокировки и условные переменные для параллелизма. контроль .
3.1.2 Постановка задач в очередь
Поставить задачу в очередь — значит поставить задачу во главе связанного списка, а значение отложенной соответствующей задачи добавить к +1, указывая на то, что в очереди есть еще одна незавершенная задача. Задания построены следующим образом:
struct bio_job {
time_t time;
void *arg1, *arg2, *arg3;
};
Видно, что задача не очень сложная, достаточно записать только одно время и параметр.Когда мы будем говорить о выполнении задачи позже, мы поговорим о том, как выполнить задачу, записанную такой простой структурой. Код для постановки задачи в очередь выглядит следующим образом:
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));
job->arg1 = arg1;
...
pthread_mutex_lock(&bio_mutex[type]);
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
pthread_cond_signal(&bio_condvar[type]);
pthread_mutex_unlock(&bio_mutex[type]);
}
Этот код постановки в очередь сначала выделяет место для структуры задачи, а затем использует функцию listAddNodeTail, чтобы поместить задачу в начало связанного списка. Здесь используется связанный список, реализованный самим redis. Видно, что при выполнении операции со связанным списком необходимо сначала выполнить блокировку, потому что связанный список здесь является общим ресурсом. После того, как задача успешно добавлена в очередь, вызывается функция pthread_cond_signal, чтобы уведомить заблокированный ожидающий поток о продолжении выполнения. Описанный выше процесс является основным шаблоном, используемым общими переменными: блокировка, установка условия в значение true (здесь задача поставлена в очередь), уведомление и разблокировка.
3.1.3 Задача вне очереди
Мы завершили инициализацию задачи и можем помещать новые задачи в очередь, тогда, когда в очереди есть задачи, фоновый поток, который мы запустили на первом шаге инициализации, вызовет функцию bioProcessBackgroundJobs для обработки задач.Основной код обработки: следующим образом:
void *bioProcessBackgroundJobs(void *arg) {
unsigned long type = (unsigned long) arg;
struct bio_job *job;
while(1) {
listNode *ln;
pthread_mutex_lock(&bio_mutex[type]);
if (listLength(bio_jobs[type]) == 0) {
//条件不成立,等待
pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
//被通知以后,停止阻塞,重新判断条件
continue;
}
//条件成立,直接执行
ln = listFirst(bio_jobs[type]);
job = ln->value;
//取走值以后,解锁
pthread_mutex_unlock(&bio_mutex[type]);
//完成队列处理以后,根据类型调用close函数或者fsync函数。
if (type == BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == BIO_AOF_FSYNC) {
fsync((long)job->arg1);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
pthread_mutex_lock(&bio_mutex[type]);
listDelNode(bio_jobs[type],ln);
bio_pending[type]--;
}
}
Основной процесс приведенного выше кода заключается в том, чтобы сначала определить, пуста ли текущая очередь, и, если она пуста, подождать. В противном случае возьмите структуру задания из очереди и решите, какую функцию вызывать в зависимости от типа потока. Тип здесь получается параметром, переданным для создания потока, который может быть 0 или 1. После получения типа возьмите arg1 из задания в качестве параметра и вызовите функцию закрытия или функцию fsync. arg1 — это дескриптор файла, поэтому, когда задача добавляется в очередь, ей нужно только поместить дескриптор файла, такой как очередь, поэтому структура bio_job задается так просто.
3.1.4 Пример постоянства
Aof persistence — один из двух методов сохранения в Redis.Он записывает каждую операцию Redis в буфер в памяти в виде строки, а затем записывает в файл и использует fsync в соответствующее время.Чтобы сбросить данные на диск , один из способов вызвать fsync — использовать биосистему, описанную выше, которая следует трем упомянутым выше шагам.
Прежде всего, в основной функции в server.c есть функция initServer, которая внутренне вызывает функцию bioInit для завершения инициализации биосистемы Таким образом устанавливается соответствующая структура очереди, а также фоновый поток. созданный. После запуска основного цикла Redis он вводит время сохранения и вызывает функцию flushAppendOnlyFile для завершения работы с сохранением aof. Эта функция будет иметь дело с различными вопросами, такими как конфигурация и оптимизация, связанные с aof. В этой статье мы сосредоточимся только на использовании биосистемы. Соответствующие коды следующие:
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
......
......
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
void aof_background_fsync(int fd) {
bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}
Видно, что он проверяет текущий статус обработки очереди через bioPendingJobsOfType и вызывает bioCreateBackgroundJob для добавления задачи aof в очередь. Поскольку создание потока было завершено ранее, при наличии задачи в очереди поток будет запущен, а операция персистентности будет завершена через упомянутую выше функцию fsync.
Био Redis является очень хорошим примером фактической управляемой системы системы.
3.2 Мучительный вопрос: как удалить большой ключ?
Мы знаем, что команда DEL Redis используется для удаления значения, хранящегося в одном или нескольких ключах.Это блокирующая команда.Иногда вам нужно удалить очень большую пару ключ-значение, например объект с миллионами объектов.Это команда может быть заблокирована на несколько секунд, а поскольку инструкции выполнения Redis выполняются в основном потоке, у всего сервиса неизбежно будет большое количество медленных запросов, и пропускная способность резко упадет! Antirez, автор Redis, также столкнулся с этой проблемой.
Итак, как оптимизировать эту проблему? Читатели, изучавшие исходный код Redis, могли подумать: а как насчет использования схемы прогрессивного перехеширования, использования таймеров и курсоров данных, чтобы каждый раз удалять часть данных? На самом деле, Antirez не рассматривал эту идею, но решение, принятое Antirez в конце концов, состоит в том, чтобы добавить еще одного члена в исходную систему BIO. Так что же не так со схемой прогрессивного удаления, которую мы имеем в виду? Ответ на этот вопрос можно найти в блоге Антриеса в 2015 году:Lazy Redis is better Redis
Автор перехватил пример, приведенный антирезом:
Почему сложно выполнить прогрессивное ленивое удаление? Автор сказал, что когда мы удаляем коллекцию, мы не можем удалять элементы в коллекции так же быстро, как клиент может добавлять элементы в коллекцию, поэтому наша работа по удалению кажется никогда не завершенной.
3.3 Система BIO добавляет нового участника - lazyfree
В предыдущем разделе мы проанализируем приводные серверы обструкции CREDIS при удалении большого ключа с использованием команды del, за исключением случаев использования Flusdb и Plashall, удаляют базу данных, содержащую большое количество клавиш, или Redis, очистки уборки устаревших данных из памяти и передачи данных, Если вы случайно попали в большой объем облигаций, приведет к блоку сервера.
Для решения вышеуказанных проблем в Redis 4.0 представлен механизм lazyfree, который может выполнять операцию удаления ключей или баз данных в фоновом потоке, тем самым максимально избегая блокировки сервера.
Принцип lazyfree представить несложно, то есть при удалении объекта выполняется только логическое удаление, а затем объект отбрасывается на задний план, чтобы фоновый поток мог выполнить настоящий деструкт, чтобы избежать блокировки из-за к чрезмерным размерам объекта.
Так обстоит дело с ленивой реализацией Redis.Давайте в основном рассмотрим реализацию команды unlink (следующий код основан на Redis 4.0):
void unlinkCommand(client *c) {
delGenericCommand(c, 1);
}
Запись очень простая, то есть вызов delGenericCommand, а второй параметр равен 1, чтобы указать, что его нужно удалить асинхронно.
/* This command implements DEL and LAZYDEL. */
void delGenericCommand(client *c, int lazy) {
int numdel = 0, j;
for (j = 1; j < c->argc; j++) {
expireIfNeeded(c->db,c->argv[j]);
int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
dbSyncDelete(c->db,c->argv[j]);
if (deleted) {
signalModifiedKey(c->db,c->argv[j]);
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
"del",c->argv[j],c->db->id);
server.dirty++;
numdel++;
}
}
addReplyLongLong(c,numdel);
}
Функция delGenericCommand определяет, следует ли удалять синхронно или асинхронно в соответствии с параметром lazy. Логика синхронного удаления не меняется. Мы сосредоточимся на реализации недавно добавленного асинхронного удаления.
#define LAZYFREE_THRESHOLD 64
// 首先定义了启用后台删除的阈值,对象中的元素大于该阈值时才真正丢给后台线程去删除,如果对象中包含的元素太少就没有必要丢给后台线程,因为线程同步也要一定的消耗。
int dbAsyncDelete(redisDb *db, robj *key) {
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
//清除待删除key的过期时间
dictEntry *de = dictUnlink(db->dict,key->ptr);
//dictUnlink返回数据库字典中包含key的条目指针,并从数据库字典中摘除该条目(并不会释放资源)
if (de) {
robj *val = dictGetVal(de);
size_t free_effort = lazyfreeGetFreeEffort(val);
//lazyfreeGetFreeEffort来获取val对象所包含的元素个数
if (free_effort > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects,1);
//原子操作给lazyfree_objects加1,以备info命令查看有多少对象待后台线程删除
bioCreateBackgroundJob(BIO_LAZY_FREE ,val,NULL,NULL);
//此时真正把对象val丢到后台线程的任务队列中
dictSetVal(db->dict,de,NULL);
//把条目里的val指针设置为NULL,防止删除数据库字典条目时重复删除val对象
}
}
if (de) {
dictFreeUnlinkedEntry(db->dict,de);
//删除数据库字典条目,释放资源
return 1;
} else {
return 0;
}
}
Выше приведена логика асинхронного удаления: сначала будет очищено время истечения, затем будет вызван dictUnlink для удаления удаляемого объекта из словаря БД, а затем будет оценен размер объекта (не обязательно чтобы удалить его в фоновом режиме), если он достаточно велик, он будет передан в фоновый поток и, наконец, очистит информацию о записи словаря базы данных.
Из приведенной выше логики видно, что при отвязке большого ключа фактическое удаление выполняется фоновым потоком, поэтому Redis не будет заблокирован.
4. Резюме
Содержание этой статьи действительно слишком длинное.Если читатели внимательно прочитают статью, я полагаю, что вы должны быть очень заинтересованы в этой статье. Когда я использую GDB для отладки кода версии Redis 6.0, я вывожу информацию о потоке после запуска Redis Давайте сделаем вывод из следующего изображения в сочетании с содержанием этой статьи:
Вы можете видеть, что в отладочной информации автора Redis запустил 6 потоков, а именно:
-
redis-server
: основной поток Redis, который получает клиентские запросы через мультиплексирование ввода-вывода и выполняет инструкции ЦП, существует с момента выпуска Redis и всегда был основным потоком базовой сетевой модели Redis. -
bio_close_file、bio_aof_fsync
: старый член системы Redis BIO, bio_close_file используется для асинхронного закрытия файлов сервера, а bio_aof_fsync используется для асинхронной синхронизации журналов.Они также существуют с момента выпуска Redis. -
bio_lazy_free
: новый член, добавленный Redis в систему BIO в версии 4.0, он в основном выполняет операцию удаления ключей или баз данных в фоновом потоке, тем самым максимально избегая блокировки сервера. -
io_thd_1、io_thd_2、io_thd_3
: Redis предоставляет конфигурацию рабочих потоков ввода-вывода в версии 6.0. Они используются для разделения давления ввода-вывода основного потока Redis, чтобы основной поток мог более эффективно выполнять работу по выполнению инструкций ЦП.
Выше приведено авторское резюме функций и истории развития каждого потока Redis Контекст очень ясен! Я надеюсь, что читатели не только ограничены пониманием эволюции многопоточности Redis, но и четко знают, с какими проблемами столкнулся Redis в этом процессе, и думают над этими проблемами сами. он основан на том, какие факторы учитываются. Кроме того, в этой статье анализируются многие модели и исходный код архитектуры Redis, такие как многопоточная модель производитель-потребитель, основанная на блокировках и общих переменных, и многопоточный ввод-вывод Redis без блокировок, которые также являются частью дизайна нашей системы, а также места, которые стоит изучить и изучить в программировании. Наконец, еще раз спасибо читателям, которые упорствовали до конца!
Справочник и дополнительная литература
- Ленивый Redis лучше Redis:antirez.com/news/93
- Проблема C10K:www.kegel.com/c10k.html
- Часто задаваемые вопросы о Редисе:redis.io/topics/faq
- Сравнительный анализ экспериментального многопоточного ввода-вывода Redis:IT next.IO/бенчмарк в…
- Redis и мультиплексирование ввода-вывода:D Raven ES — это What/Redis-IO-Wood…
- Модель многопоточной сети Redis полностью раскрыта:Tickets.WeChat.QQ.com/Yes/Screen 2n SP, затем…
- Переменные условия и блокировки:Меньше статьи GitHub.IO/2016/11/05/…
- Подробное объяснение многопоточной обработки ввода-вывода Redis 6.0:Руби-China.org/topics/3992…
- Евангелие от Redis · lazyfree · удаление большого ключа:MySQL.taobao.org/monthly/201…