Подробное объяснение механизма событий Redis

Redis
Подробное объяснение механизма событий Redis

Redis использует управляемый событиями механизм для обработки большого количества сетевых операций ввода-вывода. Он не использует зрелые решения с открытым исходным кодом, такие как libevent или libev, но сам реализует очень краткую управляемую событиями библиотеку ae_event.

Управляемая событиями библиотека в Redis ориентирована только на сетевой ввод-вывод и таймеры. Библиотека событий обрабатывает следующие два типа событий:

  • Событие файла: используется для обработки сетевого ввода-вывода между сервером Redis и клиентом.
  • Событие времени (время eveat): некоторые операции на сервере Redis (например, функция serverCron) должны выполняться в заданный момент времени, и события времени используются для обработки таких операций синхронизации.

Код библиотеки, управляемой событиями, в основном реализован в файле src/ae.c, и его схематическая диаграмма показана ниже.

事件管理器示意图

aeEventLoopЯвляется ядром всего управляемого событиями, управляет таблицей файловых событий и списком событий времени, Непрерывно циклически обрабатывает события готовности файла и события с истекшим временем. Далее мы сначала представляем файловые события и временные события соответственно, а затем говорим о связанныхaeEventLoopРеализация исходного кода.

файл события

Redis разработал свой собственный обработчик сетевого события на основе шаблона реактора, который является обработчиком событий файла. Обработчик событий File использует технологию мультиплексирования IO, одновременно слушает несколько сокетов и ассоциирует разные обработчики событий для розеток. Когда читаемое или жареное событие сокета срабатывает, вызывается соответствующий обработчик событий.

Технологии мультиплексирования ввода-вывода, используемые Redis, в основном включают:select,epoll,evportиkqueueЖдать. Каждая библиотека функций мультиплексирования ввода-вывода соответствует отдельному файлу в исходном коде Redis, например, ae_select.c, ae_epoll.c, ae_kqueue.c и т. д. Redis выберет технологию мультиплексирования в соответствии с различными операционными системами и различными приоритетами. Платформы реагирования на события обычно используют эту архитектуру, например netty и libevent.

示意图

Как показано на рисунке ниже, обработчик файловых событий состоит из четырех компонентов: сокетов, мультиплексоров ввода-вывода, диспетчеров файловых событий и обработчиков событий.

示意图

Событие файла — это абстракция операций сокета, и событие файла генерируется всякий раз, когда сокет готов принять, прочитать, записать и закрыть операции. Поскольку Redis обычно подключается к нескольким сокетам, одновременно могут происходить несколько файловых событий.

Программа мультиплексирования ввода-вывода отвечает за мониторинг нескольких сокетов, сокет и передачу этих файлов, генерируемых диспетчером событий событий.

Хотя несколько файловых событий могут происходить одновременно, мультиплексор ввода-вывода всегда будет помещать все результирующие сокеты в одну и ту же очередь (что описано далее в этой статье).aeEventLoopизfiredтаблица событий готовности), а затем обработчик файловых событий будет обрабатывать сокеты в очереди упорядоченным, синхронным способом с одним сокетом, то есть обрабатывая готовые файловые события.

一次请求的过程示意图

Поэтому процесс подключения клиента Redis к серверу и отправки команды показан на рисунке выше.

  • Клиент инициирует запрос к серверу для установления соединения сокета, затем прослушивающий сокет генерирует событие AE_READABLE, запускаяобработчик ответа соединениявоплощать в жизнь. Процессор отвечает на запрос на подключение клиента, а затем создает клиентскую втулку, а также состояние клиента, а событие AE_Readable собработчик командных запросовассоциация.
  • После того, как клиент установит соединение и отправит команду на сервер, клиентский сокет сгенерирует событие AE_READABLE, запускающееобработчик командных запросовExecute, процессор считывает клиентскую команду, а затем передает ее соответствующей программе для выполнения.
  • Выполните команду, чтобы получить соответствующую команду ответа. Для того, чтобы пропустить ответный ответ клиенту, сервер будет сопоставить событие AE_WRITIBEAM CLIECTобработчик ответа на командуассоциация. Когда клиент пытается прочитать ответ на команду, клиентский сокет генерирует событие AE_WRITEABLE, которое запускает командуобработчик ответовЗаписывает все ответы на команды в сокет.

событие времени

События времени в Redis делятся на следующие две категории:

  • Временное событие: выполнение программы один раз через указанное время.
  • Периодическое событие: позволяет программе выполняться один раз в указанное время.

Конкретная структура определения временного события Redis выглядит следующим образом.

typedef struct aeTimeEvent {
    /* 全局唯一ID */
    long long id; /* time event identifier. */
    /* 秒精确的UNIX时间戳,记录时间事件到达的时间*/
    long when_sec; /* seconds */
    /* 毫秒精确的UNIX时间戳,记录时间事件到达的时间*/
    long when_ms; /* milliseconds */
    /* 时间处理器 */
    aeTimeProc *timeProc;
    /* 事件结束回调函数,析构一些资源*/
    aeEventFinalizerProc *finalizerProc;
    /* 私有数据 */
    void *clientData;
    /* 前驱节点 */
    struct aeTimeEvent *prev;
    /* 后继节点 */
    struct aeTimeEvent *next;
} aeTimeEvent;

Является ли временное событие синхронизированным или периодическим, зависит от возвращаемого значения обработчика времени:

  • Если возвращаемое значение равно AE_NOMORE, то событие представляет собой синхронизированное событие, которое удаляется при достижении и больше не повторяется.
  • Если возвращаемое значение является значением, отличным от AE_NOMORE, то событие является периодическим событием.При поступлении события времени сервер будет реагировать на событие времени на основе возвращаемого значения процессора времени.whenСвойство обновляется, чтобы это событие снова появлялось через определенный промежуток времени.

Redis ставит все временные события в неупорядоченном связанном списке, и каждый раз Redis пересекает весь связанный список, находит все приехавшие события времени, и вызывает соответствующий обработчик событий.

После представления файловых событий и временных событий давайте посмотримaeEventLoopконкретная реализация.

Создать менеджер событий

Сервер Redis в своей функции инициализацииinitServer, создается менеджер событийaeEventLoopобъект.

функцияaeCreateEventLoopБудет создан менеджер событий, в основном инициализацияaeEventLoopРазличные значения атрибутов , такие какevents,fired,timeEventHeadиapidata:

  • Сначала создайтеaeEventLoopобъект.
  • Инициализируйте таблицу событий неготового файла и таблицу событий готового файла.eventsУказатель на таблицу событий неготового файла,firedУказатель на готовую таблицу событий файла. Содержимое таблицы изначально изменяется при добавлении определенных событий позже.
  • Инициализировать список событий времени, установитьtimeEventHeadиtimeEventNextIdАтрибуты.
  • перечислитьaeApiCreateсоздание функцииepollэкземпляр и инициализироватьapidata.
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;
    /* 创建事件状态结构 */
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    /* 创建未就绪事件表、就绪事件表 */
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    /* 设置数组大小 */
    eventLoop->setsize = setsize;
    /* 初始化执行最近一次执行时间 */
    eventLoop->lastTime = time(NULL);
    /* 初始化时间事件结构 */
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    /* 将多路复用io与事件管理器关联起来 */
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* 初始化监听事件 */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;
err:
   .....
}

aeApiCreateСначала создается функцияaeApiStateобъект, который инициализирует таблицу событий epoll ready, а затем вызываетepoll_createсозданныйepollПример, наконецaeApiStateназначить вapidataАтрибуты.

aeApiStateв объектеepfdместо храненияepollлоготип,eventsЯвляетсяepollмассив готовых событий, когда естьepollКогда происходит событие, все происходитepollСобытия и их дескрипторы будут храниться в этом массиве. Этот массив готовых событий открывается прикладным уровнем, и ядро ​​отвечает за заполнение этого массива всеми происходящими событиями.

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    /* 初始化epoll就绪事件表 */
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    /* 创建 epoll 实例 */
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    /* 事件管理器与epoll关联 */
    eventLoop->apidata = state;
    return 0;
}
typedef struct aeApiState {
    /* epoll_event 实例描述符*/
    int epfd;
    /* 存储epoll就绪事件表 */
    struct epoll_event *events;
} aeApiState;

Создать файл события

aeFileEventЭто файловая структура событий.Для каждого конкретного события есть функции обработки чтения и функции обработки записи. Вызов RedisaeCreateFileEventФункция регистрирует соответствующие файловые события для событий чтения и записи разных сокетов.

typedef struct aeFileEvent {
    /* 监听事件类型掩码,值可以是 AE_READABLE 或 AE_WRITABLE */
    int mask;
    /* 读事件处理器 */
    aeFileProc *rfileProc;
    /* 写事件处理器 */
    aeFileProc *wfileProc;
    /* 多路复用库的私有数据 */
    void *clientData;
} aeFileEvent;
/* 使用typedef定义的处理器函数的函数类型 */
typedef void aeFileProc(struct aeEventLoop *eventLoop, 
int fd, void *clientData, int mask);

Например, когда Redis выполняет репликацию master-slave, подчиненному серверу необходимо установить соединение с главным сервером, он инициирует сокетное соединение, а затем вызоветaeCreateFileEventФункция регистрирует соответствующий обработчик событий для событий чтения и записи инициированного сокета, то естьsyncWithMasterфункция.

aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL);
/* 符合aeFileProc的函数定义 */
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {....}

aeCreateFileEventПараметрыfdотносится к конкретнымsocketразъем,procСсылаться наfdКогда событие генерируется, конкретная функция обработки,clientDataЭто данные, которые необходимо передать при обратном вызове обработчика.aeCreateFileEventДелайте в основном три вещи:

  • отfdиндекс, вeventsСоответствующее событие было найдено в таблице неготовых событий.
  • перечислитьaeApiAddEventфункция, событие регистрируется для конкретного базового мультиплексирования ввода-вывода, в данном случае epoll.
  • Заполните обратный вызов события, параметры, тип события и другие параметры.
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
                       aeFileProc *proc, void *clientData)
{
    /* 取出 fd 对应的文件事件结构, fd 代表具体的 socket 套接字 */
    aeFileEvent *fe = &eventLoop->events[fd];
    /* 监听指定 fd 的指定事件 */
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    /* 置文件事件类型,以及事件的处理器 */
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    /* 私有数据 */
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

Как упоминалось выше, существует множество наборов базовых библиотек мультиплексирования ввода-вывода на основе Redis, поэтомуaeApiAddEventТакже несколько наборов реализации, ниже приведен исходный кодepollреализация ниже. Его основная операция заключается в вызовеepollизepoll_ctlфункционировать, чтобыepollЗарегистрируйтесь, чтобы реагировать на события. СвязанныйepollСвязанные знания можно увидеть«Анализ исходного кода Java NIO»

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* 如果 fd 没有关联任何事件,那么这是一个 ADD 操作。如果已经关联了某个/某些事件,那么这是一个 MOD 操作。 */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    /* 注册事件到 epoll */
    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    /* 调用epoll_ctl 系统调用,将事件加入epoll中 */
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

обработка событий

Поскольку в Redis существуют как файловые, так и временные события, сервер должен планировать эти два события, решать, когда обрабатывать файловые события, когда обрабатывать временные события и как их планировать.

aeMainФункция вызывается непрерывно в бесконечном циклеaeProcessEventsфункция для обработки всех событий.

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        /* 如果有需要在事件处理前执行的函数,那么执行它 */
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        /* 开始处理事件*/
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

НижеaeProcessEventsПсевдокод функции , он сначала вычисляет событие времени, ближайшее к текущему времени, чтобы вычислить тайм-аут, а затем вызываетaeApiPollФункция ожидает готовности базового события мультиплексирования ввода-вывода;aeApiPollПосле возврата функции все события файлов, которые были сгенерированы и временные события, которые истекся, будут обработаны.

/* 伪代码 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
    /* 获取到达时间距离当前时间最接近的时间事件*/
    time_event = aeSearchNearestTimer();
    /* 计算最接近的时间事件距离到达还有多少毫秒*/
    remaind_ms = time_event.when - unix_ts_now();
    /* 如果事件已经到达,那么remaind_ms为负数,将其设置为0 */
    if (remaind_ms < 0) remaind_ms = 0;
    /* 根据 remaind_ms 的值,创建 timeval 结构*/
    timeval = create_timeval_with_ms(remaind_ms);
    /* 阻塞并等待文件事件产生,最大阻塞时间由传入的 timeval 结构决定,如果remaind_ms 的值为0,则aeApiPoll 调用后立刻返回,不阻塞*/
    /* aeApiPoll调用epoll_wait函数,等待I/O事件*/
    aeApiPoll(timeval);
    /* 处理所有已经产生的文件事件*/
    processFileEvents();
    /* 处理所有已经到达的时间事件*/
    processTimeEvents();
}

иaeApiAddEventпохожий,aeApiPollЕсть также несколько реализаций.На самом деле он делает две вещи, вызываяepoll_waitблокировка ожиданияepollСобытие готово, а время ожидания — это время ожидания, рассчитанное на основе самого быстрого события времени достижения;epollСобытие переходит в событие готовности к запуску.aeApiPollЭто упомянутая выше программа мультиплексирования ввода/вывода. Конкретный процесс показан на рисунке ниже.

aeApiPoll示意图

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) 
{
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    // 调用epoll_wait函数,等待时间为最近达到时间事件的时间计算而来。
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    // 有至少一个事件就绪?
    if (retval > 0) 
    {
        int j;
        /*为已就绪事件设置相应的模式,并加入到 eventLoop 的 fired 数组中*/
        numevents = retval;
        for (j = 0; j < numevents; j++) 
	{
            int mask = 0;
            struct epoll_event *e = state->events+j;
            if (e->events & EPOLLIN)
		mask |= AE_READABLE;
            if (e->events & EPOLLOUT)
		mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) 
		mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP)
		mask |= AE_WRITABLE;
            /* 设置就绪事件表元素 */
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    
    // 返回已就绪事件个数
    return numevents;
}

processFileEventэто псевдокод для обработки готовых файловых событий, и описанный выше диспетчер файловых событий, который на самом деле является обходомfiredПодготовьте таблицу событий, затем вызовите различные обработчики, зарегистрированные в событии, в соответствии с соответствующим типом события, прочитайте вызов событияrfileProc, а событие записи вызываетwfileProc.

void processFileEvent(int numevents) {
    for (j = 0; j < numevents; j++) {
            /* 从已就绪数组中获取事件 */
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0;
            int invert = fe->mask & AE_BARRIER;
	        /* 读事件 */
            if (!invert && fe->mask & mask & AE_READABLE) {
                /* 调用读处理函数 */
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }
            /* 写事件. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            processed++;
        }
    }
}

иprocessTimeEventsэто функция, которая обрабатывает временные события, она проходитaeEventLoopсписок событий события, если приходит событие времени, выполнить егоtimeProcфункции и в зависимости от того, равно ли возвращаемое значение функцииAE_NOMOREчтобы определить, является ли временное событие периодическим событием, и изменить время прибытия.

static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    time_t now = time(NULL);
    ....
    eventLoop->lastTime = now;

    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    /* 遍历时间事件链表 */
    while(te) {
        long now_sec, now_ms;
        long long id;

        /* 删除需要删除的时间事件 */
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            if (te->prev)
                te->prev->next = te->next;
            else
                eventLoop->timeEventHead = te->next;
            if (te->next)
                te->next->prev = te->prev;
            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;
        }

        /* id 大于最大maxId,是该循环周期生成的时间事件,不处理 */
        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        aeGetTime(&now_sec, &now_ms);
        /* 事件已经到达,调用其timeProc函数*/
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;

            id = te->id;
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            /* 如果返回值不等于 AE_NOMORE,表示是一个周期性事件,修改其when_sec和when_ms属性*/
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                /* 一次性事件,标记为需删除,下次遍历时会删除*/
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        te = te->next;
    }
    return processed;
}

удалить событие

Когда событие больше не нужно, его необходимо удалить. Например: если fd прослушивает события чтения и записи событий одновременно. Когда вам больше не нужно отслеживать событие записи, вы можете удалить событие записи файла fd.

aeDeleteEventLoopПроцесс выполнения функции сводится к следующим шагам. 1. СогласноfdМероприятие найдено в не готовом таблице 2. ОтменаfdСоответствующий соответствующий идентификатор события 3. ЗвонокaeApiFreeфункция, ядро ​​отменит соответствующий мониторинг событий на красно-черном дереве мониторинга epoll.

постскриптум

Далее мы продолжим изучать принципы репликации Redis master-slave, и вы можете продолжать обращать на них внимание.

Блог программиста Ли Сяобина

Рекомендуемое чтение

Категории