Redis использует управляемый событиями механизм для обработки большого количества сетевых операций ввода-вывода. Он не использует зрелые решения с открытым исходным кодом, такие как libevent или libev, но сам реализует очень краткую управляемую событиями библиотеку ae_event.
Управляемая событиями библиотека в Redis ориентирована только на сетевой ввод-вывод и таймеры. Библиотека событий обрабатывает следующие два типа событий:
- Событие файла: используется для обработки сетевого ввода-вывода между сервером Redis и клиентом.
- Событие времени (время eveat): некоторые операции на сервере Redis (например, функция serverCron) должны выполняться в заданный момент времени, и события времени используются для обработки таких операций синхронизации.
Код библиотеки, управляемой событиями, в основном реализован в файле src/ae.c, и его схематическая диаграмма показана ниже.
aeEventLoop
Является ядром всего управляемого событиями, управляет таблицей файловых событий и списком событий времени,
Непрерывно циклически обрабатывает события готовности файла и события с истекшим временем. Далее мы сначала представляем файловые события и временные события соответственно, а затем говорим о связанныхaeEventLoop
Реализация исходного кода.
файл события
Redis разработал свой собственный обработчик сетевого события на основе шаблона реактора, который является обработчиком событий файла. Обработчик событий File использует технологию мультиплексирования IO, одновременно слушает несколько сокетов и ассоциирует разные обработчики событий для розеток. Когда читаемое или жареное событие сокета срабатывает, вызывается соответствующий обработчик событий.
Технологии мультиплексирования ввода-вывода, используемые Redis, в основном включают:select
,epoll
,evport
иkqueue
Ждать. Каждая библиотека функций мультиплексирования ввода-вывода соответствует отдельному файлу в исходном коде Redis, например, ae_select.c, ae_epoll.c, ae_kqueue.c и т. д. Redis выберет технологию мультиплексирования в соответствии с различными операционными системами и различными приоритетами. Платформы реагирования на события обычно используют эту архитектуру, например netty и libevent.
Как показано на рисунке ниже, обработчик файловых событий состоит из четырех компонентов: сокетов, мультиплексоров ввода-вывода, диспетчеров файловых событий и обработчиков событий.
Событие файла — это абстракция операций сокета, и событие файла генерируется всякий раз, когда сокет готов принять, прочитать, записать и закрыть операции. Поскольку Redis обычно подключается к нескольким сокетам, одновременно могут происходить несколько файловых событий.
Программа мультиплексирования ввода-вывода отвечает за мониторинг нескольких сокетов, сокет и передачу этих файлов, генерируемых диспетчером событий событий.
Хотя несколько файловых событий могут происходить одновременно, мультиплексор ввода-вывода всегда будет помещать все результирующие сокеты в одну и ту же очередь (что описано далее в этой статье).aeEventLoop
изfired
таблица событий готовности), а затем обработчик файловых событий будет обрабатывать сокеты в очереди упорядоченным, синхронным способом с одним сокетом, то есть обрабатывая готовые файловые события.
Поэтому процесс подключения клиента Redis к серверу и отправки команды показан на рисунке выше.
- Клиент инициирует запрос к серверу для установления соединения сокета, затем прослушивающий сокет генерирует событие AE_READABLE, запускаяобработчик ответа соединениявоплощать в жизнь. Процессор отвечает на запрос на подключение клиента, а затем создает клиентскую втулку, а также состояние клиента, а событие AE_Readable собработчик командных запросовассоциация.
- После того, как клиент установит соединение и отправит команду на сервер, клиентский сокет сгенерирует событие AE_READABLE, запускающееобработчик командных запросовExecute, процессор считывает клиентскую команду, а затем передает ее соответствующей программе для выполнения.
- Выполните команду, чтобы получить соответствующую команду ответа. Для того, чтобы пропустить ответный ответ клиенту, сервер будет сопоставить событие AE_WRITIBEAM CLIECTобработчик ответа на командуассоциация. Когда клиент пытается прочитать ответ на команду, клиентский сокет генерирует событие AE_WRITEABLE, которое запускает командуобработчик ответовЗаписывает все ответы на команды в сокет.
событие времени
События времени в Redis делятся на следующие две категории:
- Временное событие: выполнение программы один раз через указанное время.
- Периодическое событие: позволяет программе выполняться один раз в указанное время.
Конкретная структура определения временного события Redis выглядит следующим образом.
typedef struct aeTimeEvent {
/* 全局唯一ID */
long long id; /* time event identifier. */
/* 秒精确的UNIX时间戳,记录时间事件到达的时间*/
long when_sec; /* seconds */
/* 毫秒精确的UNIX时间戳,记录时间事件到达的时间*/
long when_ms; /* milliseconds */
/* 时间处理器 */
aeTimeProc *timeProc;
/* 事件结束回调函数,析构一些资源*/
aeEventFinalizerProc *finalizerProc;
/* 私有数据 */
void *clientData;
/* 前驱节点 */
struct aeTimeEvent *prev;
/* 后继节点 */
struct aeTimeEvent *next;
} aeTimeEvent;
Является ли временное событие синхронизированным или периодическим, зависит от возвращаемого значения обработчика времени:
- Если возвращаемое значение равно AE_NOMORE, то событие представляет собой синхронизированное событие, которое удаляется при достижении и больше не повторяется.
- Если возвращаемое значение является значением, отличным от AE_NOMORE, то событие является периодическим событием.При поступлении события времени сервер будет реагировать на событие времени на основе возвращаемого значения процессора времени.
when
Свойство обновляется, чтобы это событие снова появлялось через определенный промежуток времени.
Redis ставит все временные события в неупорядоченном связанном списке, и каждый раз Redis пересекает весь связанный список, находит все приехавшие события времени, и вызывает соответствующий обработчик событий.
После представления файловых событий и временных событий давайте посмотримaeEventLoop
конкретная реализация.
Создать менеджер событий
Сервер Redis в своей функции инициализацииinitServer
, создается менеджер событийaeEventLoop
объект.
функцияaeCreateEventLoop
Будет создан менеджер событий, в основном инициализацияaeEventLoop
Различные значения атрибутов , такие какevents
,fired
,timeEventHead
иapidata
:
- Сначала создайте
aeEventLoop
объект. - Инициализируйте таблицу событий неготового файла и таблицу событий готового файла.
events
Указатель на таблицу событий неготового файла,fired
Указатель на готовую таблицу событий файла. Содержимое таблицы изначально изменяется при добавлении определенных событий позже. - Инициализировать список событий времени, установить
timeEventHead
иtimeEventNextId
Атрибуты. - перечислить
aeApiCreate
создание функцииepoll
экземпляр и инициализироватьapidata
.
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
/* 创建事件状态结构 */
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
/* 创建未就绪事件表、就绪事件表 */
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
/* 设置数组大小 */
eventLoop->setsize = setsize;
/* 初始化执行最近一次执行时间 */
eventLoop->lastTime = time(NULL);
/* 初始化时间事件结构 */
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
/* 将多路复用io与事件管理器关联起来 */
if (aeApiCreate(eventLoop) == -1) goto err;
/* 初始化监听事件 */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
.....
}
aeApiCreate
Сначала создается функцияaeApiState
объект, который инициализирует таблицу событий epoll ready, а затем вызываетepoll_create
созданныйepoll
Пример, наконецaeApiState
назначить вapidata
Атрибуты.
aeApiState
в объектеepfd
место храненияepoll
логотип,events
Являетсяepoll
массив готовых событий, когда естьepoll
Когда происходит событие, все происходитepoll
События и их дескрипторы будут храниться в этом массиве. Этот массив готовых событий открывается прикладным уровнем, и ядро отвечает за заполнение этого массива всеми происходящими событиями.
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
/* 初始化epoll就绪事件表 */
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
/* 创建 epoll 实例 */
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
/* 事件管理器与epoll关联 */
eventLoop->apidata = state;
return 0;
}
typedef struct aeApiState {
/* epoll_event 实例描述符*/
int epfd;
/* 存储epoll就绪事件表 */
struct epoll_event *events;
} aeApiState;
Создать файл события
aeFileEvent
Это файловая структура событий.Для каждого конкретного события есть функции обработки чтения и функции обработки записи. Вызов RedisaeCreateFileEvent
Функция регистрирует соответствующие файловые события для событий чтения и записи разных сокетов.
typedef struct aeFileEvent {
/* 监听事件类型掩码,值可以是 AE_READABLE 或 AE_WRITABLE */
int mask;
/* 读事件处理器 */
aeFileProc *rfileProc;
/* 写事件处理器 */
aeFileProc *wfileProc;
/* 多路复用库的私有数据 */
void *clientData;
} aeFileEvent;
/* 使用typedef定义的处理器函数的函数类型 */
typedef void aeFileProc(struct aeEventLoop *eventLoop,
int fd, void *clientData, int mask);
Например, когда Redis выполняет репликацию master-slave, подчиненному серверу необходимо установить соединение с главным сервером, он инициирует сокетное соединение, а затем вызоветaeCreateFileEvent
Функция регистрирует соответствующий обработчик событий для событий чтения и записи инициированного сокета, то естьsyncWithMaster
функция.
aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL);
/* 符合aeFileProc的函数定义 */
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {....}
aeCreateFileEvent
Параметрыfd
относится к конкретнымsocket
разъем,proc
Ссылаться наfd
Когда событие генерируется, конкретная функция обработки,clientData
Это данные, которые необходимо передать при обратном вызове обработчика.aeCreateFileEvent
Делайте в основном три вещи:
- от
fd
индекс, вevents
Соответствующее событие было найдено в таблице неготовых событий. - перечислить
aeApiAddEvent
функция, событие регистрируется для конкретного базового мультиплексирования ввода-вывода, в данном случае epoll. - Заполните обратный вызов события, параметры, тип события и другие параметры.
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
/* 取出 fd 对应的文件事件结构, fd 代表具体的 socket 套接字 */
aeFileEvent *fe = &eventLoop->events[fd];
/* 监听指定 fd 的指定事件 */
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
/* 置文件事件类型,以及事件的处理器 */
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
/* 私有数据 */
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
Как упоминалось выше, существует множество наборов базовых библиотек мультиплексирования ввода-вывода на основе Redis, поэтомуaeApiAddEvent
Также несколько наборов реализации, ниже приведен исходный кодepoll
реализация ниже. Его основная операция заключается в вызовеepoll
изepoll_ctl
функционировать, чтобыepoll
Зарегистрируйтесь, чтобы реагировать на события. Связанныйepoll
Связанные знания можно увидеть«Анализ исходного кода Java NIO»
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* 如果 fd 没有关联任何事件,那么这是一个 ADD 操作。如果已经关联了某个/某些事件,那么这是一个 MOD 操作。 */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
/* 注册事件到 epoll */
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
/* 调用epoll_ctl 系统调用,将事件加入epoll中 */
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
обработка событий
Поскольку в Redis существуют как файловые, так и временные события, сервер должен планировать эти два события, решать, когда обрабатывать файловые события, когда обрабатывать временные события и как их планировать.
aeMain
Функция вызывается непрерывно в бесконечном циклеaeProcessEvents
функция для обработки всех событий.
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
/* 如果有需要在事件处理前执行的函数,那么执行它 */
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
/* 开始处理事件*/
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
НижеaeProcessEvents
Псевдокод функции , он сначала вычисляет событие времени, ближайшее к текущему времени, чтобы вычислить тайм-аут, а затем вызываетaeApiPoll
Функция ожидает готовности базового события мультиплексирования ввода-вывода;aeApiPoll
После возврата функции все события файлов, которые были сгенерированы и временные события, которые истекся, будут обработаны.
/* 伪代码 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
/* 获取到达时间距离当前时间最接近的时间事件*/
time_event = aeSearchNearestTimer();
/* 计算最接近的时间事件距离到达还有多少毫秒*/
remaind_ms = time_event.when - unix_ts_now();
/* 如果事件已经到达,那么remaind_ms为负数,将其设置为0 */
if (remaind_ms < 0) remaind_ms = 0;
/* 根据 remaind_ms 的值,创建 timeval 结构*/
timeval = create_timeval_with_ms(remaind_ms);
/* 阻塞并等待文件事件产生,最大阻塞时间由传入的 timeval 结构决定,如果remaind_ms 的值为0,则aeApiPoll 调用后立刻返回,不阻塞*/
/* aeApiPoll调用epoll_wait函数,等待I/O事件*/
aeApiPoll(timeval);
/* 处理所有已经产生的文件事件*/
processFileEvents();
/* 处理所有已经到达的时间事件*/
processTimeEvents();
}
иaeApiAddEvent
похожий,aeApiPoll
Есть также несколько реализаций.На самом деле он делает две вещи, вызываяepoll_wait
блокировка ожиданияepoll
Событие готово, а время ожидания — это время ожидания, рассчитанное на основе самого быстрого события времени достижения;epoll
Событие переходит в событие готовности к запуску.aeApiPoll
Это упомянутая выше программа мультиплексирования ввода/вывода. Конкретный процесс показан на рисунке ниже.
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
{
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// 调用epoll_wait函数,等待时间为最近达到时间事件的时间计算而来。
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
// 有至少一个事件就绪?
if (retval > 0)
{
int j;
/*为已就绪事件设置相应的模式,并加入到 eventLoop 的 fired 数组中*/
numevents = retval;
for (j = 0; j < numevents; j++)
{
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN)
mask |= AE_READABLE;
if (e->events & EPOLLOUT)
mask |= AE_WRITABLE;
if (e->events & EPOLLERR)
mask |= AE_WRITABLE;
if (e->events & EPOLLHUP)
mask |= AE_WRITABLE;
/* 设置就绪事件表元素 */
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
// 返回已就绪事件个数
return numevents;
}
processFileEvent
это псевдокод для обработки готовых файловых событий, и описанный выше диспетчер файловых событий, который на самом деле является обходомfired
Подготовьте таблицу событий, затем вызовите различные обработчики, зарегистрированные в событии, в соответствии с соответствующим типом события, прочитайте вызов событияrfileProc
, а событие записи вызываетwfileProc
.
void processFileEvent(int numevents) {
for (j = 0; j < numevents; j++) {
/* 从已就绪数组中获取事件 */
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0;
int invert = fe->mask & AE_BARRIER;
/* 读事件 */
if (!invert && fe->mask & mask & AE_READABLE) {
/* 调用读处理函数 */
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
/* 写事件. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
}
иprocessTimeEvents
это функция, которая обрабатывает временные события, она проходитaeEventLoop
список событий события, если приходит событие времени, выполнить егоtimeProc
функции и в зависимости от того, равно ли возвращаемое значение функцииAE_NOMORE
чтобы определить, является ли временное событие периодическим событием, и изменить время прибытия.
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
....
eventLoop->lastTime = now;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
/* 遍历时间事件链表 */
while(te) {
long now_sec, now_ms;
long long id;
/* 删除需要删除的时间事件 */
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}
/* id 大于最大maxId,是该循环周期生成的时间事件,不处理 */
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
/* 事件已经到达,调用其timeProc函数*/
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
/* 如果返回值不等于 AE_NOMORE,表示是一个周期性事件,修改其when_sec和when_ms属性*/
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
/* 一次性事件,标记为需删除,下次遍历时会删除*/
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}
удалить событие
Когда событие больше не нужно, его необходимо удалить. Например: если fd прослушивает события чтения и записи событий одновременно. Когда вам больше не нужно отслеживать событие записи, вы можете удалить событие записи файла fd.
aeDeleteEventLoop
Процесс выполнения функции сводится к следующим шагам.
1. Согласноfd
Мероприятие найдено в не готовом таблице
2. Отменаfd
Соответствующий соответствующий идентификатор события
3. ЗвонокaeApiFree
функция, ядро отменит соответствующий мониторинг событий на красно-черном дереве мониторинга epoll.
постскриптум
Далее мы продолжим изучать принципы репликации Redis master-slave, и вы можете продолжать обращать на них внимание.