Потоковое вещание с помощью Redis

Redis

задний план

Недавно я получил запрос, одним предложением: отобразить динамику, размещенную подписчиками, которая включает в себя дизайн системы потоковой передачи. В этой статье основное внимание уделяется решению для потоковой передачи каналов, доступному для обычных предприятий.

Связанные концепции

Давайте начнем с простой концепции потоков подачи.

Что такое поток подачи

  • Feed: каждый статус или сообщение в ленте — это лента, например, Weibo в Weibo — это лента.
  • Feed流: поток контента, который постоянно обновляется и предоставляется пользователю. Круг друзей каждого, страницы подписчиков Weibo и т. д. — все это поток новостей.

классификация потока сырья

Существует две общие категории потоков подачи:

  • Timeline: Сортировать по хронологическому порядку выпуска.Если продукт выбирает тип Timeline, это означает, что в потоке новостей не так много каналов, но каждый канал важен и должен быть просмотрен пользователями. Аналогично WeChat Moments, Weibo и т. д.
  • Rank: сортировка по фактору, не связанному со временем, обычно в соответствии с предпочтениями пользователя, обычно используется для рекомендаций по новостям, рекомендациям по продуктам и т. д.

дизайн

Чтобы спроектировать систему подачи сырья, есть два ключевых шага, один из которых — поток подачи.初始化,один推送. Хранение потока фида на самом деле является ключевым моментом, но автор по-прежнему использует MySQL для сохраняемости, и оптимизация может быть рассмотрена в будущем.

Инициализация потока подачи

Инициализация потока фида [поток фида следующей страницы] относится к созданию собственного потока ленты фида пользователя для пользователя, когда поток фида пользователя еще не существует. Как это сделать? На самом деле это очень просто: пройдитесь по списку подписчиков, удалите все каналы следующих пользователей и сохраните идентификатор канала в Redis.sortSetв. Здесь есть несколько ключевых моментов:

  • Данные инициализации: Инициализированные данные необходимо загрузить из базы данных.
  • Значение ключа: значение ключа sortSet должно быть идентифицировано идентификатором текущего пользователя.
  • Значение оценки: если это тип временной шкалы, просто возьмите отметку времени, созданную фидом. Если это ранговый тип, установите в нем значение веса, соответствующее вашему бизнесу.

толкать

После описанной выше инициализации поток фида был помещен в кеш Redis. Следующим шагом является обновление потока фида, который необходимо обновить в следующих четырех случаях:

  1. Последователи публикуют новые фиды:
  2. Отслеживаемые пользователи удаляют фиды.
  3. Пользователь добавил подписку.
  4. Пользователь отписывается.

Процесс публикации/удаления фида

Как работать с вышеупомянутыми четырьмя шагами, будет подробно описано в следующих шагах реализации.Здесь мы сосредоточимся на первом и втором случаях. Потому что, имея дело с большим V [десятками миллионов поклонников], нам нужно обрабатывать поток каналов всех поклонников большого V. В настоящее время вовлеченная сумма будет очень большой, и необходимо уделить больше внимания. Что касается push, обычно существует два типа push/pull.

  • : когда пользователь А публикует новое обновление, оно должно быть отправлено во все фан-каналы пользователя А.
  • : когда пользователь А публикует новое обновление, он не будет продвигать его первым, а возьмет на себя инициативу получить последнюю ленту с личной страницы пользователя А, когда появятся поклонники, а затем выполнит слияние. Если вас беспокоит несколько Vs, вы можете одновременно получать данные из TimeLines нескольких личных страниц Vs.
Двухтактный комбинированный режим

Когда пользователь публикует новую ленту, процесс выглядит следующим образом:

  1. Во-первых, прочитайте свой список поклонников из списка наблюдения и решите, являетесь ли вы большим V.
  2. Напишите собственное сообщение в ленте на личной странице Timeline. Если это большая буква V, процесс записи на этом заканчивается.
  3. Если вы обычный пользователь, вам также необходимо написать свое сообщение в ленте своим поклонникам.Если у вас есть 100 поклонников, вам нужно написать 100 пользователям.

При обновлении собственного потока фидов процесс обработки выглядит следующим образом:

  1. Сначала прочитайте список больших V, за которыми вы следите.
  2. чтобы читать свой собственный поток новостей.
  3. Если есть большие V, за которыми следуют, временная шкала личной страницы каждого большого V читается одновременно снова.Если соблюдаются 10 больших V, то требуется 10 посещений.
  4. Результаты шагов 2 и 3 объединяются, сортируются по времени и возвращаются пользователю.

На этом процесс считывания фид-потока методом двухтактного комбинирования завершен.

толкающий режим

Если вы просто используете push-режим, это станет проще:

  • Опубликовать ленту:
    • Нет необходимости различать, является ли это большой буквой V или нет, процесс для всех пользователей одинаков и состоит из трех шагов.
  • Читать ленту новостей:
    • Нет необходимости в первом шаге или третьем шаге, требуется только второй шаг, уменьшающий предыдущие 2 + N (N — количество рассматриваемых больших V) служебных данных сети до 1 служебной нагрузки сети. Задержка чтения значительно ухудшилась.
Резюме двух режимов:

Недостаток двухтактной комбинации заключается в том, что при обновлении собственного потока новостей давление чтения временной шкалы личной страницы большого V будет очень высоким.

Как решить:

  1. Не используйте метод оптимизации больших V/обычных пользователей, используйте режим push для активных поклонников и используйте режим pull для неактивных поклонников.
  2. Полное использование push-режима может полностью решить эту проблему, но увеличит емкость хранилища и увеличит общее время отправки больших фидов V. От первого до последнего вентилятора может пройти несколько минут.

выполнить

Автор в основном использует режим чистого проталкивания для реализации системы подачи потока, которая в основном доступна для обычных предприятий.Нижеследующее описывает конкретный код реализации, который в основном состоит из 3 основных частей:

  1. Инициализируйте поток подачи.
  2. Отслеживаемые пользователи публикуют/удаляют каналы, подписчики этого пользователя обновляют свои собственные потоки каналов.
  3. Пользователи добавляют/отписываются, обновляют собственный поток новостей.

Инициализировать поток подачи

Когда пользователь впервые заходит, чтобы обновить поток новостей, а поток новостей еще не существует, нам нужно его инициализировать Конкретный код для инициализации выглядит следующим образом: его в zSet, а затем вернуть его в пейджинге.

    /**
     * 获取关注的人的信息流
     */
    public List<FeedDto> listFocusFeed(Long userId, Integer page, Integer size) {
        String focusFeedKey = "focusFeedKey" + userId;

        // 如果 zset 为空,先初始化
        if (!zSetRedisTemplate.exists(focusFeedKey)) {
            initFocusIdeaSet(userId);
        }

        // 如果 zset 存在,但是存在 0 值
        Double zscore = zSetRedisTemplate.zscore(focusFeedKey, "0");
        if (zscore != null && zscore > 0) {
            return null;
        }

        //分页
        int offset = (page - 1) * size;

        long score = System.currentTimeMillis();
        // 按 score 值从大到小从 zSet 中取出 FeedId 集合
        List<String> list = zSetRedisTemplate.zrevrangeByScore(focusFeedKey, score, 0, offset, size);

        List<FeedDto> result = new ArrayList<>();
        if (QlchatUtil.isNotEmpty(list)) {
            for (String s : list) {
                // 根据 feedId 从缓存中 load 出 feed
                FeedDto feedDto = this.loadCache(Long.valueOf(s));
                if (feedDto != null) {
                    result.add(feedDto);
                }
            }
        }
        return result;
    }

    /**
     * 初始化关注的人的信息流 zSet
     */
    private void initFocusFeedSet( Long userId) {
        String focusFeedKey = "focusFeedKey" + userId;
        zSetRedisTemplate.del(focusIdeaKey);

        // 从数据库中加载当前用户关注的人发布过的 Feed
        List<Feed> list = this.feedMapper.listFocusFeed(userId);

        if (QlchatUtil.isEmpty(list)) {
            //保存0,避免空数据频繁查库
            zSetRedisTemplate.zadd(focusFeedKey, 1, "0");
            zSetRedisTemplate.expire(focusFeedKey, RedisKeyConstants.ONE_MINUTE * 5);
            return;
        }

        // 遍历 FeedList,把 FeedId 存到 zSet 中
        for (Feed feed : list) {
            zSetRedisTemplate.zadd(focusFeedKey, feed.getCreateTime().getTime(), feed.getId().toString());
        }

        zSetRedisTemplate.expire(focusFeedKey, 60 * 60 * 60);
    }

Отслеживаемые пользователи размещают/удаляют новые каналы

Всякий раз, когда пользователь публикует/удаляет новый фид, нам нужно обновить поток фидов всех фанатов пользователя.Этот шаг обычно занимает много времени, поэтому рекомендуется обрабатывать его асинхронно.Чтобы не загружать слишком много фанатов данные за один раз, здесь используется циклическое разбиение по страницам. Чтобы предотвратить слишком большой поток подачи вентиляторов, мы ограничим длину потока подачи до 1000. Когда длина потока подачи превысит 1000, самый старый поток будет удален.

    /**
     * 新增/删除 feed时,处理粉丝 feed 流
     *
     * @param userId 新增/删除 feed的用户id
     * @param feedId 新增/删除 的feedId
     * @param type   feed_add = 新增feed feed_sub = 删除feed
     */
    public void handleFeed(Long userId, Long feedId, String type) {

        Integer currentPage = 1;
        Integer size = 1000;
        List<FansDto> fansDtos;

        while (true) {
            Page page = new Page();
            page.setSize(size);
            page.setPage(currentPage);
            fansDtos = this.fansService.listFans(userId, page);

            for (FansDto fansDto : fansDtos) {
                String focusFeedKey = "focusFeedKey" + userId;

                // 如果粉丝 zSet 不存在,退出
                if (!this.zSetRedisTemplate.exists(focusFeedKey)) {
                    continue;
                }

                // 新增Feed
                if ("feed_add".equals(type)) {
                    this.removeOldestZset(focusFeedKey);
                    zSetRedisTemplate.zadd(focusFeedKey, System.currentTimeMillis(), feedId);
                }
                // 删除Feed
                else if ("feed_sub".equals(type)) {
                    zSetRedisTemplate.zrem(focusFeedKey, feedId);
                }

            }

            if (fansDtos.size() < size) {
                break;
            }
            currentPage++;
        }

    }

    /**
     * 删除 zSet 中最旧的数据
     */
    private void removeOldestZset(String focusFeedKey){
        // 如果当前 zSet 大于1000,删除最旧的数据
        if (this.zSetRedisTemplate.zcard(focusFeedKey) >= 1000) {
            // 获取当前 zSet 中 score 值最小的
            List<String> zrevrange = this.zSetRedisTemplate.zrevrange(focusFeedKey, -1, -1, String.class);
            if (QlchatUtil.isNotEmpty(zrevrange)) {
                this.zSetRedisTemplate.zrem(focusFeedKey, zrevrange.get(0));
            }
        }
    }

Пользователь добавляет подписку/отписку

Здесь относительно просто добавить/отписаться, а также добавить/удалить недавно отслеживаемый канал в свой собственный поток каналов, но это также необходимо обрабатывать асинхронно.

    /**
     * 关注/取关 时,处理followerId的zSet
     *
     * @param followId   被关注的人
     * @param followerId 当前用户
     * @param type       focus = 关注 unfocus = 取关
     */
    public void handleFocus( Long followId, Long followerId, String type) {

        String focusFeedKey = "focusFeedKey" + userId;

        // 如果粉丝 zSet 不存在,退出
        if (!this.zSetRedisTemplate.exists(focusFeedKey)) {
            return;
        }
        List<FeedDto> FeedDtos = this.listFeedByFollowId(source, followId);
        for (FeedDto feedDto : FeedDtos) {

            // 关注
            if ("focus".equals(type)) {
                this.removeOldestZset(focusFeedKey);
                this.zSetRedisTemplate.zadd(focusFeedKey, feedDto.getCreateTime().getTime(), feedDto.getId());
            }
            // 取关
            else if ("unfocus".equals(type)) {
                this.zSetRedisTemplate.zrem(focusFeedKey, feedDto.getId());
            }


        }
    }

Основной код, показанный выше, предназначен только для того, чтобы предоставить вам идею реализации, а не непосредственно исполняемый код, поскольку реальная реализация будет включать в себя множество других не относящихся к делу классов.

В конце концов

Здесь была введена простая и удобная система Feed Streaming.Все великие боги могут указать на ошибки и высказать больше мнений!

Справочная статья: