Создайте простую функцию Like

Java

Новое: Нравится

Сейчас почти весь медиаконтент, будь то оценка товара, обсуждение темы или круг друзей, поддерживает лайки, а функция лайков стала стандартной фичей интернет-проектов, поэтому мы также стараемся добавить функцию лайков в систему оценивания, чтобы реализовать Комментарий понравился.

Лайки в коротких комментариях Douban:

image-20210407234242890

Подробная информация о подобных требованиях, которые необходимо реализовать:

image-20210414215247131

от отказа

Полностью реализовать функцию подобной системы очень сложно. Для поддержки сотен миллионов пользователей, для архивирования и хранения данных, для поддержки миллионов одновременных операций записи в секунду в периоды пиковой нагрузки, для достижения многоклиентской синхронизации в реальном времени, для записи и поддержания пользовательских отношений, а также для отображения лайков пользователей. список, так что всесторонние требования будут создавать противоречия в дизайне, точно так же, как противоречия в CAP.

Типичными примерами являются противоречия между параллелизмом и синхронизацией. Суть высокого параллелизма в скорости.Скорость передачи по сети и скорость работы программы определяют пропускную способность, которую может нести система.Только за счет быстрой обработки каждого запроса можно обработать большее количество запросов в единицу времени.Это просто слепое увеличение количества подключений и игнорирование запросов.Проблемы времени отклика и параллелизма не могут быть решены принципиально. На мой взгляд, узкое место внутренней скорости работы приложения лежит в трех местах.Приоритет от высокого к низкому - это сетевой запрос, создание объекта и избыточный расчет.Сетевой запрос оказывает решающее влияние на скорость отклика. Однако синхронизация требует от нас выполнения сетевых запросов, таких как синхронизация данных с mysql или redis. У вас не может быть и того, и другого, и существуют непримиримые противоречия между параллелизмом и синхронизацией.

Существует также противоречие между емкостью хранилища и скоростью доступа. Запись списка лайков пользователя означает поддержание отношения лайков пользователя в течение длительного времени. Со временем отношение лайков пользователя не может быть установлено в одной системе хранения и должно быть записано в распределенную систему хранения, что создает дополнительные сложности и планирование. задержка должна быть хорошо спроектирована, чтобы различать измерения, а данные между разными разделами не связаны. Как только запрос охватывает несколько узлов хранения, будут генерироваться каскадные вызовы, что приводит к большой задержке в сети.

Чтобы это произошло, отпустите это. Когда я вижу новое требование, я привык думать в обратном направлении, наблюдая, какие функции не задействованы в этом требовании, а от каких функций можно отказаться. требования. .

Пересоздайте список требований и пропишите, какие функции не нужно реализовывать, при выполнении этого проектного решения вдруг станет ясно.

image-20210414225218732

Менеджеры по продукту дадут вам только форму 1 и редко покажут, что не нужно делать. При принятии решения об отказе это все равно нужно обсудить, потому что эти требования часто мягкие, и это может быть не нужно, если не включено в документ с требованиями, или это может быть не учтено.

Как записать лайки пользователя

Подобное отношение является типичным типом K-V или типом коллекции.Для его реализации более целесообразно использовать Redis.Так какой тип данных в Redis следует использовать?

В следующей таблице перечислены возможные типы данных и их соответствующие преимущества и недостатки.

image-20210414231656907

Ключевыми функциями являются пакетный запрос и использование памяти.Функция пакетного запроса позволяет запрашивать все подобные отношения в одном запросе, а объем памяти позволяет использовать как можно меньше узлов Redis или даже один Redis для решения проблем с хранилищем. .

Я выбираю строковый тип, потому что хеш-тип действительно сложно устранить подобные данные, если только не записывается подобное время и регулярно не выполняется глобальное сканирование, или не записывается двойной ключ хэша, а старый и новый заменяются, что слишком дорого и нецелесообразно. Сам механизм устранения должен решить проблему занятости памяти, поэтому строковый тип не будет занимать ненормально большой объем памяти.

image-20210415101020806

Атомарность подобной операции

Подобная операция должна переписать два значения: одно — отношение пользователя к контенту, а другое — общее количество лайков контента. Могут ли эти два значения быть представлены в одном ключе? Очевидно нет. Следовательно, необходимо сначала установить отношение лайков пользователя, а затем увеличить общее количество лайков.Если отношение лайков уже существует, общее количество лайков увеличить нельзя.

Установка подобного отношения может быть реализована с помощью команды setnx, которая устанавливается только тогда, когда ключ не существует, и возвращает флаг, указывающий, устанавливать его или нет, и решать, увеличивать ли общее количество лайков в соответствии с этим флагом. Например:

if setnx(key1) == 1
then 
	incr(key2)

Кажется, что каждая операция является атомарной, но если такая логика выполняется на стороне клиента, это все равно не удовлетворяет атомарности в целом, и между двумя операциями все еще может быть перерыв, в результате чего успешные лайки не увеличиваются. ситуация случается. Хотя это не является большой проблемой для подобной системы, вероятность редкого появления приемлема, но мы можем сделать лучше.

Функция транзакций или сценариев Redis может решить вышеуказанные проблемы. Реализация скрипта более гибкая и бесплатная, и позволяет уменьшить сетевые запросы.Выбираем метод скрипта:

--点赞操作,写入并自增,如果写入失败则不自增,【原子性、幂等性】
if redis.call('SETNX',KEYS[1],1) == 1
then
    redis.call('EXPIRE',KEYS[1],864000)
    redis.call('INCR',KEYS[2])
end
return redis.call('GET',KEYS[2])
--取消点赞操作,删除并递减,如果删除失败则不递减,【原子性、幂等性】
if redis.call('DEL',KEYS[1]) == 1
then
    redis.call('DECR',KEYS[2])
end
return redis.call('GET',KEYS[2])

Одним из основных требований стабильности является то, что данные нельзя расширять бесконечно, иначе рано или поздно возникнут проблемы.Любая схема хранения должна проектировать соответствующую схему уничтожения для обеспечения стабильной и долговременной работы системы. Поэтому очень важно установить срок действия KEY1, а KEY2 может потребоваться все время хранить, и он удаляется другими механизмами, например, при уничтожении старых отзывов или сворачивании отзывов, соответствующий KEY2 должен быть удален.

Скрипт возвращает общее количество лайков, что полезно для последующего архивирования данных.

Инкапсулировать действия скрипта

Теперь, когда выбран метод хранения Redis, давайте сначала реализуем его. Один шаг за раз, подобная функция будет выполняться надежно.

Сначала используйте Spring для настройки сценария Lua, который автоматически предварительно загружает сценарий, не беспокоясь о предварительной компиляции с загрузкой сценария на сервере Redis.

/**
 * Lua脚本
 */
@Configuration
public class LuaConfiguration {
    /**
     * [点赞]脚本  lua_set_and_incr
     */
    @Bean
    public DefaultRedisScript<Integer> voteScript() {
        DefaultRedisScript<Integer> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/lua_set_and_incr.lua")));
        redisScript.setResultType(Integer.class);
        return redisScript;
    }

    /**
     * [取消点赞]脚本  lua_del_and_decr
     */
    @Bean
    public DefaultRedisScript<Integer> noVoteScript() {
        DefaultRedisScript<Integer> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/lua_del_and_decr.lua")));
        redisScript.setResultType(Integer.class);
        return redisScript;
    }
}
/**
 * 点赞箱
 */
@Repository
public class VoteBox {
    private final RedisTemplate<String, Object> redisTemplate;
    private final DefaultRedisScript<Integer> voteScript;
    private final DefaultRedisScript<Integer> noVoteScript;

    public VoteBox(RedisTemplate<String, Object> redisTemplate, DefaultRedisScript<Integer> voteScript, DefaultRedisScript<Integer> noVoteScript) {
        this.redisTemplate = redisTemplate;
        this.voteScript = voteScript;
        this.noVoteScript = noVoteScript;
    }

    /**
     * 给评价投票(点赞),用户增加评价点赞记录,评价点赞次数+1.该操作是原子性、幂等性的。
     * @param voterId 投票人
     * @param contentId 投票目标内容id
     * @return 返回当前最新点赞数
     */
    public Integer vote(long voterId, long contentId){
        //使用lua脚本
        List<String> list = new ArrayList<>();
        list.add(MessageFormat.format(RedisKeyConstants.VOTE_USER_PATTERN, voterId, contentId));
        list.add(MessageFormat.format(RedisKeyConstants.VOTE_SUM_PATTERN, contentId));
        return redisTemplate.execute(voteScript, list);
    }

    /**
     * 取消给评价投票(点赞),用户删除评价点赞记录,评价点赞次数-1.该操作是原子性、幂等性的。
     * @param voterId 投票人
     * @param contentId 投票目标内容id
     * @return 返回当前最新点赞数
     */
    public Integer noVote(long voterId, long contentId){
        //使用lua脚本
        List<String> list = new ArrayList<>();
        list.add(MessageFormat.format(RedisKeyConstants.VOTE_USER_PATTERN, voterId, contentId));
        list.add(MessageFormat.format(RedisKeyConstants.VOTE_SUM_PATTERN, contentId));
        return redisTemplate.execute(noVoteScript, list);
    }
}

Процесс лайка

Процесс симпатии можно представить следующей диаграммой последовательности:

image-20210415151828448

  1. Сервер получает лайк-запрос пользователя
  2. Выполните скрипт Redis и верните информацию об общем количестве лайков, Redis сохранит временные данные функции Like.
  3. Отправить обычное сообщение в очередь сообщений
  4. После успешного выполнения двух вышеуказанных шагов ответ лайкается, в противном случае он будет добавлен в очередь повторных попыток.
  5. Очередь повторных попыток Асинхронно повторять запросы к Redis или очереди сообщений до тех пор, пока они не будут успешными или пока не будет исчерпано количество повторных попыток.
  6. Потребители очереди сообщений получают сообщения и записывают сообщения в mysql

Зачем брать на себя роль очереди сообщений? Потому что очереди сообщений позволяют изящно разделить синхронизацию и асинхронность. Команду redis необходимо выполнить в текущем запросе. Пользователь хочет увидеть результат выполнения запроса и надеется немедленно увидеть свой статус «Нравится» на других клиентах. Этот пример может быть неуместным. «Нравится» также может быть единичным. Кстати, не беспокойтесь о синхронности, это просто для демонстрационных целей. Сохранение данных или другие операции не должны выполняться в течение текущего жизненного цикла запроса.

Если синхронизацию можно назвать «онлайн-службой», то асинхронную можно назвать «полу-онлайн-полу-офлайн-службой», хотя она и не входит в жизненный цикл запроса, но выполняется на онлайн-сервере, занимает ЦП и память, и занимает пропускную способность сети. Это обязательно окажет влияние на онлайн-бизнес. Когда асинхронный режим настроен, его нужно выпустить вместе с онлайн-бизнесом, что приведет к логической связке. Очередь сообщений делает возможным «автономное обслуживание», потребители могут быть независимыми от онлайн-сервера, независимо развиваться и развертываться независимо, и они полностью отделены как физически, так и логически. Конечно, предполагается, что формат сериализации объекта сообщения непротиворечив, поэтому мне нравится использовать строку в качестве содержимого объекта сообщения вместо сериализации объекта.

Внедрить лайки mysql в библиотеку

После разработки схемы хранения Redis следующим шагом будет разработка схемы хранения mysql.

Во-первых, это структура таблицы:

#点赞/投票归档表
CREATE TABLE IF NOT EXISTS vote_document
(
   id INT primary key auto_increment COMMENT 'ID',
   gmt_create datetime not null default CURRENT_TIMESTAMP COMMENT '创建时间',
   voter_id INT not null COMMENT '投票人id',
   contentr_id INT not null COMMENT '投票内容id',
   voting TINYINT not null COMMENT '投票状态(0:取消投票 1:投票)',
   votes INT not null COMMENT '投下/放弃这一票后,内容在此刻的投票总数',
   create_date INT not null COMMENT '创建日期 如:20210414 用于分区分表'
);

insert into vote_document(voter_id,content_id,voting,votes,create_date)
values(1,1,1,1,'20210414');

Очевидно, что это таблица журнала, которая заменяет Update на Insert.Будь то похоже, непохоже или перепохоже, новые записи добавляются, а не изменяются. На это есть две причины: во-первых, вставка не требует блокировки таблицы, а эффективность выполнения гораздо выше, чем у обновления, во-вторых, она содержит больше информации, и вы можете видеть полное поведение пользователей, что полезно для анализа больших данных.

После того, как «Вставка» заменяет «Обновить», большой проблемой является агрегация данных.Решение состоит в том, чтобы избыточно записывать статус агрегации каждый раз, когда вы вставляете, точно так же, как поле голосов.При анализе вам нужно только взять последнюю запись соответствующей оценки, чтобы узнать общее количество лайков. , без полного сканирования таблицы.

Инвентарный код:

@Repository
public class VoteRepository {
    @Autowired
    private JdbcTemplate db;

    /**
     * 添加点赞
     * @param vote 点赞对象
     * @return 如果插入成功,返回true,否则返回false
     */
    public boolean addVote(/*valid*/ Vote vote) {
        String sql = "insert into vote_document(voter_id,content_id,voting,votes,create_date) values(?,?,?,?,?)";
        return db.update(sql, vote.getVoterId(), vote.getContentId(), vote.getVoting(), vote.getVotes(), Sunday.getDate()) > 0;
    }
}

RocketMQ

Apache RocketMQ — это ПО промежуточного слоя для распределенных сообщений с малой задержкой, высоким уровнем параллелизма, высокой доступностью и высокой надежностью. Очередь сообщений RocketMQ не только обеспечивает возможность асинхронной развязки, сглаживания пиков и заполнения впадин для систем распределенных приложений, но также обладает характеристиками массового накопления сообщений, высокой пропускной способности и надежных повторных попыток, необходимых интернет-приложениям.

Основные концепции очередей сообщений:

  • Тема: Тема сообщения, тип сообщения первого уровня, которому производитель отправляет сообщения.
  • Брокер: посредник/брокер, узел кластера очереди сообщений, отвечающий за сохранение, отправку и получение сообщений.
  • Производитель: также известен как издатель сообщений, отвечающий за создание и отправку сообщений в темы.
  • Потребитель: также известен как подписчик сообщений, отвечающий за получение и использование сообщений из тем.
  • Тег: тег сообщения, вторичный тип сообщения, указывающий конкретную классификацию сообщения в теме темы.
  • Сообщение: комбинация данных и (необязательных) атрибутов, отправленных производителем в тему и в конечном итоге доставленных потребителю.
  • Свойства сообщения: свойства, которые производители могут определить для сообщений, включая ключ сообщения и тег.
  • Группа: класс производителей или потребителей, производители или потребители такого производства или потребления, как правило, одного и того же типа сообщения, и сообщение, выпущенное логически последовательно или по подписке.

Схематическая диаграмма производителя, отправляющего сообщение в очередь сообщений и, наконец, отправляющего его потребителю, выглядит следующим образом:

image-20210112223820896

Типы сообщений можно разделить на:

  • Обычное сообщение. Также известный как параллельные сообщения, нет никакой последовательности, производство и потребление параллельны, с чрезвычайно высокой производительностью пропускной способности.
  • сообщение о транзакции. Предоставляет механизм, обеспечивающий доставку сообщений посреднику.
  • Разделяйте последовательные сообщения. Тема разделена на несколько разделов, и внутри раздела соблюдается принцип «первым пришел — первым вышел».
  • Сообщения глобального заказа. Установите количество разделов темы на 1, и все сообщения следуют в первом месте, в первую очередь принцип.
  • Сообщения по времени. Отправить сообщение на сервер MQ и доставить его в указанный момент времени после времени отправки сообщения (текущее время)
  • сообщение с задержкой. Отправить сообщение на сервер MQ и доставить его в указанное время задержки после времени отправки сообщения (текущее время)

Способы потребления можно разделить на:

  • Потребление кластера. Любое сообщение должно быть обработано любым потребителем в кластере.
  • широковещательное потребление. Рассылайте каждое сообщение всем зарегистрированным потребителям в кластере, чтобы гарантировать, что сообщение будет обработано каждым потребителем хотя бы один раз.

Режим получения сообщений потребителя можно разделить на:

  • Толкать. Запустите отдельный поток, чтобы опросить посредника, чтобы получить сообщение, и вызовите метод получения потребителя, как если бы посредник отправляет сообщение потребителю.
  • Вытащить. Потребители активно извлекают сообщения из очереди сообщений.

Использование RocketMQ

Мы используем очередь сообщений RocketMq облачного продукта.Согласно официальной документации, мы сначала создаем Group и Topic в центре управления облаком, а затем вводим maven-зависимости для создания объекта конфигурации подключения MqConfig. Наконец:

Настройте производителя (в проекте A):

@Configuration
public class ProducerConfig {
    @Autowired
    private MqConfig mqConfig;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public Producer buildProducer() {
        return ONSFactory.createProducer(mqConfig.getMqPropertie());
    }
}

Настройте потребителя (в проекте B):

@Configuration
public class ConsumerClient {
    @Autowired
    private MqConfig mqConfig;

    @Autowired
    private VoteMessageReceiver receiver;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        Properties properties = mqConfig.getMqPropertie();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.GROUP_CONSUMER_ID);
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "10");
        consumerBean.setProperties(properties);

        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
        Subscription subscription = new Subscription();
        subscription.setTopic(mqConfig.TOPIC_ISSUE);
        subscription.setExpression(mqConfig.TAG_ISSUE);
        subscriptionTable.put(subscription, receiver);

        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }
}

Создайте приемник сообщений, слушатель:

/**
 * 投票消息接收器
 */
@Component
public class VoteMessageReceiver implements MessageListener {
    private final VoteRepository voteRepository;

    public VoteMessageReceiver(VoteRepository voteRepository) {
        this.voteRepository = voteRepository;
    }

    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {
            JSONObject object = JSONObject.parseObject(new String(message.getBody()));

            Vote vote = new Vote();
            vote.setVoterId(object.getLongValue("voterId"));
            vote.setContentId(object.getLongValue("contentId"));
            vote.setVoting(object.getIntValue("voting"));
            vote.setVotes(object.getLongValue("votes"));

            try {
                vote.validate();
                voteRepository.addVote(vote);
            } catch (IllegalArgumentException ignored) {
            }

            return Action.CommitMessage;
        }catch (Exception e) {
            e.printStackTrace();
            return Action.ReconsumeLater;
        }
    }
}

Производитель, который отправляет сообщение, немного инкапсулирует его:

/**
 * 消息生产者,消息投递仓库
 */
@Repository
public class MessagePoster {
    private final Producer producer;

    public MessagePoster(Producer producer) {
        this.producer = producer;
    }

    public void sendMessage(String topic, String tag, String content){
        Message message = new Message();
        message.setTopic(topic);
        message.setTag(tag);
        message.setBody(content.getBytes(StandardCharsets.UTF_8));
        producer.send(message);
    }

    public void sendMessage(String topic, String content){
        sendMessage(topic, "", content);
    }
}

Публикуйте потребителей и тестируйте в облачном центре управления (чтобы убедиться, что процесс проходит, шаг за шагом):

image-20210415171026192

Можно ли достичь соглашения?

Могут ли два шага выполнения команды redis и отправки сообщения быть последовательными, то есть завершаться и завершаться ошибкой одновременно? Если это однородная система, вы можете использовать характеристики самой системы для реализации транзакций. Например, транзакции или сценарии Redis могут использоваться для одной и той же операции Redis. Это делалось раньше. Если используется одна и та же операция с базой данных, могут использоваться транзакции базы данных Другие системы хранения также должны иметь аналогичную поддержку.

Но это разнородные системы, координировать которые можно только путем реализации транзакционной логики на стороне клиента или третьей стороной. Распространенной реализацией на стороне клиента является откат:

try{
	redis.call(); 
    mq.call();
}catch(MqException e){	//只有mq出错时才需要回滚
    //使用反向操作回滚
    redis.rollback();
}   

Но что, если откат не удался? Что делать, если сообщение отправляется в MQ, но не получает его? Что делать, если зависимая служба не поддерживает откат? Добиться жесткой консистенции в суровых условиях невозможно.

Или мы должны мыслить в обратном направлении и выборочно отбрасывать некоторые неважные части, чтобы удовлетворить наши потребности. В текущем требовании нет необходимости вводить стороннюю координацию транзакций для синхронизации redis и MQ, но и закрывать глаза на очевидные проблемы транзакций не нужно.

Я подвел распределенную карту представления о распределенной транзакции:

image-20210415202027996

Мы решили использовать очередь повторных попыток для решения этой проблемы.

Спроектировать очередь повторных попыток

Не ограничиваясь текущей проблемой распределенных транзакций, мы разрабатываем более общую очередь повторных попыток.

Сначала разработайте базовую концепцию очереди повторных попыток: задача. Задача состоит из нескольких модулей, вычисляемый модуль представляет объект метода с возвращаемым значением, а исполнительный модуль представляет объект метода без возвращаемого значения, но получает возвращаемое значение предыдущего вычисляемого модуля в качестве входного параметра. Односвязный список модулей поддерживается в задаче. Только когда модуль выполняется успешно, он указывает на следующий модуль для продолжения выполнения, но если выполнение завершается неудачно, он будет продолжать повторять попытки в текущем модуле до тех пор, пока не добьется успеха. и прошедший юнит не будет выполнен. Это обеспечивает стабильную и упорядоченную работу каждого блока, а выполнение каждого звена отказоустойчиво.

image-20210415210047077

Базовый интерфейс позволяет пользователям реализовывать записи журнала для сбоев выполнения задач, таких как постоянные диски или отправка на удаленные серверы, чтобы избежать потери задачи.Это одно из основных решений для поддержания согласованности транзакций.Установка метода по умолчанию позволяет пользователям выборочно реализовано, не обязательно иметь схему обработки отказов.

/**
 * 失败记录器
 */
interface IFailRecorder {
    /**
     * 记录每次重试的失败情况
     * @param attemptTimes 重试次数,第一次重试=0
     * @param e 导致失败的异常
     */
    default void recordFail(int attemptTimes, Exception e){}

    /**
     * 记录每次重试的失败情况
     * @param attemptTimes 重试次数,第一次重试=0
     */
    default void recordFail(int attemptTimes){}

    /**
     * 记录重试之后的最终失败
     * @param e 导致失败的异常,如果没有异常,返回null
     */
    default void recordEnd(Exception e){}
}

Определите базовую единицу выполнения, что означает, что вам нужно выполнить операцию redis или отправить операцию MQ.Метод интерфейса может многократно выполняться планировщиком, поэтому для обеспечения идемпотентности требуется реализация интерфейса.

/**
 * 可重复执行的任务
 */
public interface Repeatable<V> extends IFailRecorder{
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @param repeatTimes repeat times, first repeatTimes is 0
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V compute(int repeatTimes) throws Exception;

    /**
     * Execute with no result, and throws an exception if unable to do so.
     *
     * @param repeatTimes repeat times, first repeatTimes is 0
     * @param receiveValue last step computed result
     * @throws Exception if unable to compute a result
     */
    default void execute(int repeatTimes, V receiveValue) throws Exception{}

    /**
     * Execute with no result, and throws an exception if unable to do so.
     *
     * @param repeatTimes repeat times, first repeatTimes is 0
     * @throws Exception if unable to compute a result
     */
    default void execute(int repeatTimes) throws Exception{}
}

Соответствующий производный абстрактный класс в основном предназначен для того, чтобы помочь пользователю реализовать интерфейс.

/**
 * 可计算任务
 * @param <V> 计算结果类型
 */
public abstract class Computable<V> implements Repeatable<V>{
    @Override
    public void execute(int repeatTimes) throws Exception {
        throw new IllegalAccessException("不支持的方法");
    }

    @Override
    public void execute(int repeatTimes, V receiveValue) throws Exception {
        throw new IllegalAccessException("不支持的方法");
    }
}

/**
 * 可执行任务
 */
public abstract class Executable<V> implements Repeatable<V>{
    @Override
    public V compute(int repeatTimes) throws Exception {
        throw new IllegalAccessException("不支持的方法");
    }
}

Значение повторной попытки

Хороший механизм повторных попыток может сглаживать пики и заполнять впадины, а плохой механизм повторных попыток может подлить масла в огонь.

Это не паникёр, хорошенько подумайте, при каких обстоятельствах программа даст сбой, что можно условно свести к трём ситуациям:

  1. Логическое исключение, вызванное ошибкой параметра
  2. Тайм-аут или предохранитель, вызванный чрезмерной нагрузкой
  3. Нестабильные сети и человеческие аварии

Among them, it is completely meaningless to retry for case 1. The problem of parameter error should be solved by changing the parameter, and the logic abnormality should be repaired. The brainless retry can only cause the error to repeat, which will only waste the ПРОЦЕССОР.对于情况2的重试得小心,因为遇到流量波峰而失败,短时间内重试很可能再次遭遇失败,并且这次重试还会带来更大的流量压力,像滚雪球一样把自己搞垮,也就是火上浇油。

Повторная попытка для случая 3 очень ценна, особенно для сторонних сервисов с соглашениями SLA. Сторонние сервисы могут быть временно недоступны из-за различных аварий (таких как сбои и обновления), но они не нарушают соглашение SLA. Добавьте этот сбой в очередь повторных попыток, чтобы гарантировать успешное выполнение задачи, пока сторонняя служба отвечает в течение длительного периода времени.Если сторонняя служба не отвечает и задача в конечном итоге завершается сбоем, то она часто уничтожается , Соглашение SLA, вы можете подать заявку на компенсацию.

Поэтому при разработке стратегии повторных попыток сначала необходимо определить, при каких обстоятельствах требуется повторная попытка.Можно установить, что при возникновении определенного исключения, такого как ошибка параметра, повторная попытка не требуется, и она может просто потерпеть неудачу. . Он может быть выполнен успешно, только если возвращаемый параметр не пуст. Фиксированный интервал повтора может быть установлен, чтобы обеспечить более длительный период времени между повторами.

Более разумный подход — использовать режим прерывателя цепи и использовать результат запроса текущего подключения к целевому серверу. снова через некоторое время.

Разница между очередью повторных попыток и обычным понижением ограничения тока или предохранителем:

image-20210415234437188

стратегия повторных попыток

Стратегия повтора определяет, когда задача инициирует повтор.Интерфейс стратегии повтора:

/**
 * 重试策略,决定任务何时可以重试
 */
public interface IRetryStrategy {

    /**
     * 现在是否应该执行重试
     * @param attemptTimes 第几次重试
     * @param lastTimestamp 上一次重试的时间戳
     * @param itemId 当前的执行项目id
     * @return 允许重试,返回true,否则,返回false
     */
    boolean shouldTryAtNow(int attemptTimes, long lastTimestamp, int itemId);

    /**
     * 通知一次失败
     * @param itemId 当前的执行项目id
     */
    void noticeFail(int itemId);

    /**
     * 通知一次成功
     * @param itemId 当前的执行项目id
     */
    void noticeSuccess(int itemId);
}

Базовый класс реализации:

/**
 * 指定间隔时间的重试策略
 */
public class DefinedRetryStrategy implements IRetryStrategy {
    private final int[] intervals;

    public DefinedRetryStrategy(int... intervals) {
        if (intervals.length == 0) {
            this.intervals = new int[]{0};
        } else {
            this.intervals = intervals;
        }
    }

    private DefinedRetryStrategy() {
        this.intervals = new int[]{0};
    }

    /**
     * 现在是否应该执行重试
     *
     * @param attemptTimes  第几次重试
     * @param lastTimestamp 上一次重试的时间戳
     * @param itemId        当前的执行项目id
     * @return 允许重试,返回true,否则,返回false
     */
    @Override
    public boolean shouldTryAtNow(int attemptTimes, long lastTimestamp, int itemId) {
        return System.currentTimeMillis() > lastTimestamp + getWaitSecond(attemptTimes) * 1000L;
    }

    @Override
    public void noticeFail(int itemId) {

    }

    @Override
    public void noticeSuccess(int itemId) {

    }

    /**
     * 根据当前重试次数,获取下一次重试等待间隔(单位:秒)
     */
    private int getWaitSecond(int attemptTimes) {
        if (attemptTimes < 0) {
            attemptTimes = 0;
        }

        if (attemptTimes >= intervals.length) {
            attemptTimes = intervals.length - 1;
        }

        return intervals[attemptTimes];
    }
}

Используйте прерыватель цепи для реализации стратегии повторных попыток, а внутренняя реализация прерывателя цепи опущена:

/**
 * 断路器模式实现的智能的重试策略
 */
public class SmartRetryStrategy extends DefinedRetryStrategy {
    //断路器集合
    private final Map<Integer, CircuitBreaker> circuitBreakers = new ConcurrentHashMap<>();

    private final Object LOCK = new Object();

    private static CircuitBreaker newCircuitBreaker() {
        return new ExceptionCircuitBreaker();
    }

    public SmartRetryStrategy(int[] intervals) {
        super(intervals);
    }

    private CircuitBreaker getCircuitBreaker(Integer itemId) {
        if (!circuitBreakers.containsKey(itemId)) {
            synchronized (LOCK) {
                if (!circuitBreakers.containsKey(itemId)) {
                    circuitBreakers.put(itemId, newCircuitBreaker());
                }
            }
        }

        return circuitBreakers.get(itemId);
    }

    /**
     * 现在是否应该执行重试
     *
     * @param attemptTimes  第几次重试
     * @param lastTimestamp 上一次重试的时间戳
     * @param itemId        当前的执行项目id
     * @return 允许重试,返回true,否则,返回false
     */
    @Override
    public boolean shouldTryAtNow(int attemptTimes, long lastTimestamp, int itemId) {
        //如果基本条件不满足,则不能重试
        if (!super.shouldTryAtNow(attemptTimes, lastTimestamp, itemId)) {
            return false;
        }

        //断路器是否允许请求通过
        return canPass(itemId);
    }

    /**
     * 通知一次失败
     *
     * @param itemId 当前的执行项目id
     */
    @Override
    public void noticeFail(int itemId) {
        getCircuitBreaker(itemId).onFail();
    }

    /**
     * 通知一次成功
     *
     * @param itemId 当前的执行项目id
     */
    @Override
    public void noticeSuccess(int itemId) {
        getCircuitBreaker(itemId).onSuccess();
    }

    /**
     * 是否允许通过
     */
    public boolean canPass(int itemId){
        return getCircuitBreaker(itemId).canPass();
    }
}

повторяемые задачи

В соответствии с приведенной выше структурной схемой определите интерфейс повторяемой задачи:

/**
 * 重试任务
 */
public interface IRetryTask<V> {
    /**
     * 执行一次重试
     * @return 如果执行成功,返回true,否则返回false
     */
    boolean tryOnce();

    /**
     * 是否应该关闭任务
     * @return 如果达到最大重试次数,返回true,表示可以关闭
     */
    boolean shouldClose();

    /**
     * 现在是否应该执行重试
     * @return 当等待时间超过重试间隔时间后,允许重试,返回true,否则,返回false
     */
    boolean shouldTryAtNow();

    /**
     * 获取执行结果
     */
    V getResult();
}

Затем создайте абстрактный класс:

/**
 * 重试任务.
 * 非线程安全
 */
public abstract class AbstractRetryTask<V> implements IRetryTask<V> {
    //重试等待间隔
    protected final IRetryStrategy retryStrategy;

    //当前重试次数
    protected int curAttemptTimes = -1;

    //最大重试次数
    private final int maxAttemptTimes;

    //上一次重试的时间戳
    protected long lastTimestamp = 0;

    public AbstractRetryTask(IRetryStrategy retryStrategy, int maxAttemptTimes) {
        this.retryStrategy = retryStrategy;
        this.maxAttemptTimes = maxAttemptTimes;
    }

    /**
     * 执行一次重试
     *
     * @return 如果执行成功,返回true,否则返回false
     */
    @Override
    public boolean tryOnce() {
        if (isFinished()) {
            return true;
        }

        setNextCycle();

        //执行重试
        doTry();

        //重试任务执行异常或者返回null,将视为执行失败
        return isFinished();
    }

    /**
     * 是否结束
     */
    protected abstract boolean isFinished();

    /**
     * 执行回调
     */
    protected abstract void doTry();

    /**
     * 是否应该关闭任务
     *
     * @return 如果达到最大重试次数,返回true,表示可以关闭
     */
    @Override
    public boolean shouldClose() {
        return curAttemptTimes >= maxAttemptTimes;
    }

    //设置下一执行周期
    private void setNextCycle() {
        curAttemptTimes++;
        lastTimestamp = System.currentTimeMillis();
    }
}

и класс реализации:

/**
 * 多段重试任务. 任务链路执行失败时,下一次重试从失败的点继续执行。
 */
@Slf4j
public class SegmentRetryTask<V> extends AbstractRetryTask<V> {
    //分段执行方法
    private final List<Repeatable<V>> segments;

    //当前执行片段,上一次执行中断的片段
    private int currentSegment = 0;

    //上一次的执行结果值
    private V result;

    public SegmentRetryTask(IRetryStrategy retryStrategy, int maxAttemptTimes, List<Repeatable<V>> segments) {
        super(retryStrategy == null ? new DefinedRetryStrategy(0) : retryStrategy, maxAttemptTimes);
        this.segments = segments;
    }

    /**
     * 执行回调
     */
    @Override
    protected void doTry() {
        try {
            for (; currentSegment < segments.size(); currentSegment++) {
                //如果当前断路器打开,不尝试执行
                if (retryStrategy instanceof SmartRetryStrategy){
                    if (!((SmartRetryStrategy)retryStrategy).canPass(currentSegment)) {
                        segments.get(currentSegment).recordFail(curAttemptTimes, new CircuitBreakingException());
                        return;
                    }
                }

                //如果抛异常,分段计数器不增加,下次从这个地方执行
                Repeatable<V> repeatable = segments.get(currentSegment);
                if (!execute(repeatable)) return;
            }
        } catch (Exception e) {
            retryStrategy.noticeFail(currentSegment);
            if (currentSegment < segments.size()) {
                if (shouldClose()) {
                    segments.get(currentSegment).recordEnd(e);
                } else {
                    segments.get(currentSegment).recordFail(curAttemptTimes, e);
                }
            }
        }
    }

    private boolean execute(Repeatable<V> repeatable) throws Exception {
        if (repeatable instanceof Computable) {
            result = repeatable.compute(curAttemptTimes);
            if (result == null) {
                repeatable.recordFail(curAttemptTimes);
                retryStrategy.noticeFail(currentSegment);
                return false;
            }
            retryStrategy.noticeSuccess(currentSegment);
        }

        if (repeatable instanceof Executable) {
            if (result == null) {
                repeatable.execute(curAttemptTimes);
            } else {
                repeatable.execute(curAttemptTimes, result);
            }
            retryStrategy.noticeSuccess(currentSegment);
        }
        return true;
    }

    @Override
    protected boolean isFinished() {
        return currentSegment >= segments.size();
    }

    /**
     * 现在是否应该执行重试
     *
     * @return 当等待时间超过重试间隔时间后,允许重试,返回true,否则,返回false
     */
    @Override
    public boolean shouldTryAtNow() {
        return retryStrategy.shouldTryAtNow(curAttemptTimes, lastTimestamp, currentSegment);
    }

    /**
     * 获取执行结果
     */
    @Override
    public V getResult() {
        return result;
    }
}

Юнит-тест, конечно, юнит-тестов много, не все можно выложить, здесь показаны только репрезентативные:

class SegmentRetryTaskTest {
    private final List<String> messages = new ArrayList<>();

    @Test
    void doTry() {
        List<Repeatable<String>> list = new ArrayList<>();
        list.add(new Computable<>(){
            @Override
            public String compute(int repeatTimes) throws Exception {
                if (repeatTimes < 2)
                    throw new Exception();
                if (repeatTimes < 4)
                    return null;
                messages.add("result:good");
                return "good";
            }

            @Override
            public void recordFail(int attemptTimes, Exception e) {
                messages.add("fail:" + attemptTimes);
            }

            @Override
            public void recordFail(int attemptTimes) {
                messages.add("fail:" + attemptTimes);
            }

            @Override
            public void recordEnd(Exception e) {
                messages.add("end");
            }
        });

        list.add(new Executable<>() {
            @Override
            public void execute(int repeatTimes, String receiveValue) throws Exception {
                messages.add("receive:" + receiveValue);
                throw new Exception("exc");
            }

            @Override
            public void recordEnd(Exception e) {
                messages.add("end:" + e.getMessage());
            }
        });

        IRetryTask retryTask = new SegmentRetryTask<>(new DefinedRetryWaitStrategy(0), 5, list);

        //重试未开始
        assertFalse(retryTask.shouldClose());

        //重试直到成功
        assertFalse(retryTask.tryOnce());
        assertFalse(retryTask.shouldClose());
        assertFalse(retryTask.tryOnce());
        assertFalse(retryTask.tryOnce());
        assertFalse(retryTask.tryOnce());
        assertFalse(retryTask.tryOnce());
        assertFalse(retryTask.tryOnce());
        assertTrue(retryTask.shouldClose());

        assertTrue(messages.contains("result:good"));
        assertTrue(messages.contains("fail:1"));
        assertTrue(messages.contains("fail:2"));
        assertTrue(messages.contains("fail:3"));
        assertFalse(messages.contains("end"));
        assertTrue(messages.contains("receive:good"));
        assertTrue(messages.contains("end:exc"));
    }
}

Работа с очередью повторных попыток

image-20210416101646494

线程安全的重试队列。
* (Spring-retry 和 guava-retrying都不完全适合这个场景,决定自己开发一个简单的重试机制)
* 重试队列会尽最大努力让任务多次执行并成功,使用时需要考虑以下几点。
* 1.重试队列存储在内存之中,暂未同步到磁盘,要求使用者可以承受丢失的风险。
* 2.重试不保证一定会成功,它将在重试一定的次数后结束,如果最终失败,将记录失败结果。
* 3.为了不让频繁的重试让系统的负载过大,建议设置恰当的重试间隔,以起到削峰填谷的作用。
* 4.当超过重试队列允许容纳的数量时,将抛出异常。
* 5.重试任务将在独立的线程中执行,不会阻塞当前线程
* 6.重试任务执行异常或者返回null,将视为执行失败。暂不支持拦截自定义异常。
* 7.由于网络问题,远程过程执行成功未必代表会返回成功,重试任务需要实现幂等性。
* 8."队列"仅指按先进先出的顺序扫描任务,任务移除队列操作取决于其何时完成或结束
*

Реализовать очередь повторных попыток

/**
 * 线程安全的重试队列。
 * @author sunday
 * @version 0.0.1
 */
public final class RetryQueue {
    //重试任务队列(全局唯一)
    private final static Deque<IRetryTask> retryTaskList = new ConcurrentLinkedDeque<>();

    //重试任务工厂
    private final IRetryTaskFactory retryTaskFactory;

    public RetryQueue(IRetryTaskFactory retryTaskFactory) {
        this.retryTaskFactory = retryTaskFactory;
    }

    static {
        Thread daemon = new Thread(RetryQueue::scan);
        daemon.setDaemon(true);
        daemon.setName(RetryConstants.RETRY_THREAD_NAME);
        daemon.start();
    }

    //扫描重试队列,执行重试并移除任务(如果成功),周期性执行
    private static void scan() {
        while (true) {
            //先执行,再删除
            retryTaskList.removeIf(task -> retry(task) || task.shouldClose());

            // wait some times
            try {
                TimeUnit.MILLISECONDS.sleep(RetryConstants.SCAN_INTERVAL);
            } catch (Throwable ignored) {
            }
        }
    }

    //执行重试
    private static boolean retry(/*not null*/IRetryTask task) {
        if (task.shouldTryAtNow()) {
            return task.tryOnce();
        }
        return false;
    }

    /**
     * 提交任务。在当前线程立刻执行,如果失败,则使用设置的重试任务工厂创建包装对象,把这个对象写入重试队列等待异步重试。
     *
     * @param segments 分段执行任务
     * @param <V>      结果返回类型
     * @return 如果当前线程一次就执行成功,同步返回结果值,否则加入重试队列,异步通知结果值。
     * @throws RetryRefuseException 当超过重试队列允许容纳的数量时,将抛出异常
     */
    public final <V> V submit(List<Repeatable<V>> segments) throws RetryRefuseException {
        if (segments == null || segments.size() == 0) {
            return null;
        }

        IRetryTask<V> task = retryTaskFactory.createRetryTask(segments);

        //在当前线程执行
        if(!task.tryOnce()){
            //失败后加入队列
            ensureCapacity();
            retryTaskList.push(task);
        }

        //只要当前已经有执行结果,就返回,即便是加入了重试队列
        return task.getResult();
    }

    /**
     * 提交任务。在当前线程立刻执行,如果失败,则使用设置的重试任务工厂创建包装对象,把这个对象写入重试队列等待异步重试。
     *
     * @param repeatable 执行任务
     * @param <V>        结果返回类型
     * @return 如果当前线程一次就执行成功,同步返回结果值,否则加入重试队列,异步通知结果值。
     * @throws RetryRefuseException 当超过重试队列允许容纳的数量时,将抛出异常
     */
    public final <V> V submit(Repeatable<V> repeatable) throws RetryRefuseException {
        return submit(List.of(repeatable));
    }

    //确保容量
    private void ensureCapacity() throws RetryRefuseException {
        //非线程安全,高并发下可能短暂冲破最大容量,不过问题不大
        if (retryTaskList.size() >= RetryConstants.MAX_QUEUE_SIZE) {
            throw RetryRefuseException.getInstance();
        }
    }

    /**
     * 队列是否为空
     *
     * @return 如果当前无正在执行的任务,返回true
     */
    public boolean isEmpty() {
        return retryTaskList.isEmpty();
    }
}

модульный тест:

class RetryQueueTest {
    private final static int NUM = 100000;
    private List<String> messages1 = Collections.synchronizedList(new ArrayList<>());
    private List<String> messages2 = Collections.synchronizedList(new ArrayList<>());

    IRetryTaskFactory taskFactory = new IRetryTaskFactory() {
        @Override
        public <V> IRetryTask createRetryTask(List<Repeatable<V>> segments) {
            return new SegmentRetryTask<>(new DefinedRetryWaitStrategy(0), 10, segments);
        }
    };

    RetryQueue retryQueue = new RetryQueue(taskFactory);

    @Test
    void submit() {
        List<Repeatable<String>> list = new ArrayList<>();
        list.add(new Executable<>() {
            @Override
            public void execute(int repeatTimes) throws Exception {
                if (repeatTimes < 4)
                    throw new Exception();
                messages1.add("good");
            }
        });

        //模拟高并发提交
        ExecutorService executorService = Executors.newFixedThreadPool(100);
        Semaphore semaphore = new Semaphore(0);
        for (int i = 0; i < NUM; i++) {
            executorService.submit(() -> {
                try {
                    retryQueue.submit(list);
                } catch (RetryRefuseException e) {
                    fail();
                }
                semaphore.release();
            });
        }

        executorService.shutdown();

        //等待执行完成
        try {
            semaphore.acquire(NUM);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //等待执行完成
        while (!retryQueue.isEmpty()) Thread.yield();
        assertEquals(NUM, messages1.size());
        for (String s : messages1) {
            assertEquals(s, "good");
        }
    }
}

Код реализации долгожданной похвалы

Что ж, колесо построено, можно приступать к написанию кода подобного сервиса:

/**
 * 投票服务
 */
@Service
@Slf4j
public class VoteService {
    private final VoteBox voteBox;
    private final MessagePoster mq;
    private final RetryQueue retryQueue = new RetryQueue(new SegmentRetryTaskFactory());

    public VoteService(VoteBox voteBox, MessagePoster mq) {
        this.voteBox = voteBox;
        this.mq = mq;
    }

    /**
     * 给评价投票(点赞)
     *
     * @param voterId   投票人
     * @param contentId 投票目标内容id
     * @param voting    是否进行点赞(true:点赞  false:取消点赞)
     * @return 当前内容点赞后的总数,如果点赞失败,抛出异常
     * @throws VoteException 投票异常
     */
    public int vote(long voterId, long contentId, boolean voting) throws VoteException {
        /*
         * 第零种情况:用户请求没有发送到服务器,用户可以适时重试。
         * 第一种情况:执行1失败,最终点赞失败,记录日志,加入重试队列池,用户也可以适时重试。
         * 第二种情况:执行1成功,但返回时网络异常,最终点赞失败,记录日志,加入重试队列池,用户也可能适时重试,该方法是幂等的。
         * 第三种情况:执行1成功,但并未增加点赞总数,因为这次是重复提交。仍然执行之后的逻辑,该方法是幂等的。
         * 第四种情况:执行1成功,但执行2失败,记录日志,把发送mq加入重试队列池,返回成功。
         * 第五种情况:执行方法成功,但返回过程网络异常,用户未收到响应,用户稍后可以查询出点赞结果,用户也可以适时重试
         */

        List<Repeatable<Integer>> list = new ArrayList<>();

        //1.先在redis中投票
        list.add(new Computable<>() {
            @Override
            public Integer compute(int repeatTimes) {
                return voting ? voteBox.vote(voterId, contentId) : voteBox.noVote(voterId, contentId);
            }

            @Override
            public void recordFail(int attemptTimes, Exception e) {
                //只记录第一次错误
                if (attemptTimes == 0)
                    log.warn("function VoteService.vote.redis make exception:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);
            }

            @Override
            public void recordEnd(Exception e) {
                //放弃重试.当然,日志会记录下来,或者通过其他机制将失败记录到中央存储库中,最终还是可以恢复。
                log.warn("function VoteService.vote.redis quit:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);
            }
        });

        //2.再通知mq
        list.add(new Executable<>() {
            @Override
            public void execute(int repeatTimes, Integer receiveValue) {
                JSONObject object = new JSONObject();
                object.put("voterId", voterId);
                object.put("contentId", contentId);
                object.put("voting", voting ? 1 : 0);
                object.put("votes", receiveValue);
                mq.sendMessage(SystemConstants.VOTE_TOPIC, object.toString());
            }

            @Override
            public void recordFail(int attemptTimes, Exception e) {
                if (attemptTimes == 0)
                    log.warn("function VoteService.vote.mq make exception:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);
            }

            @Override
            public void recordEnd(Exception e) {
                log.trace("function VoteService.vote.mq quit:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);
            }
        });

        Integer value = null;
        try {
            //系统可能因为mq或者redis自身的过载等问题导致点赞失败,我们想珍惜用户的一次点赞,所以选择为他重试。
            value = retryQueue.submit(list);
        } catch (RetryRefuseException e) {
            log.error("function VoteService.vote.refuse make exception:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);
        }

        if (value == null){
            //当前无法获得投票总数,意味着点赞操作失败,虽然我们会稍后重试,但仍将这个信息告知用户,他们可以进行更理智的选择。
            throw new VoteException("投票失败,请稍后再试");
        }

        return value;
    }

    private static class SegmentRetryTaskFactory implements IRetryTaskFactory {
        private final static IRetryStrategy waitStrategy = new SmartRetryStrategy(new int[]{10,100,100,1000,10000});

        @Override
        public <V> IRetryTask<V> createRetryTask(List<Repeatable<V>> segments) {
            return new SegmentRetryTask<>(waitStrategy, 5, segments);
        }
    }
}

Дополнительные инструкции:

  1. Цель инкапсуляции фабричных объектов — упростить параметры конструктора и повторно использовать неизменяемые объекты, такие как стратегии повтора.
  2. До тех пор, пока выполнение очереди повторных попыток возвращает результат, даже если он только частично успешен, его все равно можно рассматривать как успешный ответ интерфейса, а оставшаяся часть добавляется в очередь повторных попыток.
  3. Если выполнение очереди повторных попыток завершается сбоем и результаты не возвращаются, будет выброшено исключение, ведь в этот момент она действительно не удалась, и пользователь имеет право знать об этом.
  4. Только когда предохранитель закрыт, задача будет выполнена, в противном случае она будет ждать вечно.Вы можете установить соответствующую стратегию прерывания, чтобы улучшить этот механизм.
  5. Колесо очереди повторов полезно и во многих других сценариях, насколько я понимаю, это примерно "складской слой".

Но что касается реализации лайков, то нет необходимости использовать retry, на самом деле mq высокодоступен на нескольких нодах, и вообще проблем не будет, тем более, что в mq есть своя функция retry. Механизм повторных попыток mq заключается в том, что в случае сбоя запроса он немедленно инициирует запрос к другому брокеру, что является схемой балансировки нагрузки и высокой доступности. В сценариях, где жесткие транзакции не требуются, mq можно считать надежным.

Поставить лайк отзыву

Данные оценочного списка относительно статичны и не содержат персонализированной информации о пользователе и могут быть легко кэшированы для всеобщего доступа. Мы выбираем динамическое и статическое разделение, статические данные остаются неизменными в соответствии с исходной стратегией кэширования, динамические данные получаются исключительно из сервиса Redis, а затем добавляются к статическим данным.

Уровень службы и уровень управления — это уровень агрегации данных и уровень делегирования задач.

Что касается агрегации данных, то есть три режима:

image-20210416110925640

Мы выбираем третий путь, на этот раз мы разрабатываем аналогичную функцию как часть системы оценки.

Добавьте следующий код в RemarkService:

/**
 * 给评价列表添加点赞信息,在现有列表数据上修改
 * @param remarks 评价列表
 * @param consumerId 用户id
 * @return 修改后的评价列表
 */
public JSONArray appendVoteInfo(JSONArray remarks, Integer consumerId){
    if (remarks == null || remarks.size() == 0) {
        return remarks;
    }

    //获取评价id列表
    List<Object> idList = new ArrayList<>();
    for (int i = 0; i < remarks.size(); i++) {
        idList.add(remarks.getJSONObject(i).getString("id"));
    }

    //获取并添加点赞总数
    List<String> voteKeys = new ArrayList<>();
    for (Object s : idList) {
        voteKeys.add(MessageFormat.format(RedisKeyConstants.VOTE_SUM_PATTERN, s));
    }
    List<Object> voteValues = redisRepository.readAll(voteKeys);
    for (int i = 0; i < remarks.size(); i++) {
        remarks.getJSONObject(i).put("votes", voteValues.get(i) == null ? 0 : voteValues.get(i));
    }

    //未传用户id,查询时不附带个人点赞数据
    if (consumerId == null) {
        return remarks;
    }

    //获取并添加个人点赞状态
    List<String> votesKeys = new ArrayList<>();
    for (Object s : idList) {
        votesKeys.add(MessageFormat.format(RedisKeyConstants.VOTE_USER_PATTERN, consumerId, s));
    }
    List<Object> votingValues = redisRepository.readAll(votesKeys);
    for (int i = 0; i < remarks.size(); i++) {
        remarks.getJSONObject(i).put("voting", votingValues.get(i) == null ? 0 : 1);
    }

    return remarks;
}

//更新商品的评价缓存
private void updateRemarkCache(String itemId){
    //吞掉异常,让更新评价方法不影响原操作的执行结果
    try {
        redisRepository.refreshKeys(RedisKeyConstants.REMARK_PREFIX + itemId);
    } catch (Exception e) {
        log.warn("function RemarkService.updateRemarkCache make exception:{} by:{}", e.getMessage(), itemId);
    }
}

Измените интерфейс списка оценки запросов и агрегируйте содержимое:

/**
 * 查询商品关联的评价,一次查询固定的条目
 * @param itemId 商品id
 * @param curIndex 当前查询坐标
 */
@GetMapping("/remark")
public APIBody listRemarks(String itemId, int curIndex, Integer consumerId){
    Assert.isTrue(!StringUtils.isEmpty(itemId), "商品id不能为空");
    Assert.isTrue(curIndex > 0, "查询坐标异常");

    JSONArray list = remarkService.listRemarks(itemId, curIndex, SystemConstants.REMARK_MAX_LIST_LENGTH);

    //原列表是从redis或db中读取的静态数据,而点赞数据每时每刻都在变化,分开获取这两个部分。
    return APIBody.buildSuccess(remarkService.appendVoteInfo(list, consumerId));
}

Точка оптимизации: общее количество лайков в оценке фиксировано и не имеет отношения к пользователю, оно может быть объединено с содержимым оценки и кэшировано в памяти, а информация о лайках пользователя может запрашиваться только в Redis для каждого запроса.

Рекомендовать хорошие отзывы

Полная система оценки должна иметь возможность выводить рекомендуемый список высококачественного оценочного контента в качестве экрана по умолчанию, когда пользователи просматривают оценки продукта.

Что такое «премиум-контент»? Насколько я понимаю, оценочный контент актуален, очень популярен и содержателен. Среди них «общее количество лайков» является одним из важных показателей для измерения популярности. В настоящее время мы используем количество лайков как единственный показатель для расчета качественного контента и предоставления интерфейса запросов. Эта дизайнерская идея также может быть использована, когда в будущем будут введены другие индикаторы.

В оценочной таблице есть поле голосования, которое можно отсортировать, чтобы сгенерировать первые n фрагментов данных:

select id,consumer_id,order_id,score,header,content,images,user_name,user_face,gmt_create from remark where item_id = ? and status = '1' order by votes desc limit ?

Следует отметить, что поле голосования обновляется не так, как нравится пользователю, потому что частые обновления неэффективны. Поле голосов можно обновлять путем регулярного суммирования.В таблице лайков сохраняется последнее общее количество лайков оценки, поэтому вы можете фильтровать последние лайки соответствующего контента каждый 1 день или 1 час, а затем обновлять голоса.

Независимо от того, в какой базе данных или таблице находятся базовые данные, независимо от того, как они расположены, я называю этот шаг «возвратом к источнику», что является поведенческой концепцией в случае промаха кеша.

При загрузке рекомендательных обзоров используется алгоритм возврата к источнику.

public List<Remark> listRecommendRemarks(/*not null*/ String itemId, int expectCount){
    if (expectCount <= 0)
        return new ArrayList<>();

    Assert.isTrue(expectCount <= MAX_LIST_SIZE, "不允许一次性查询过多内容");

    String sql = "select id,consumer_id,order_id,score,header,content,images,user_name,user_face,gmt_create from remark where item_id = ? and status = '1' order by votes desc limit ?";
    return db.query(sql, (resultSet, i) -> {
        Remark remark = new Remark();
        remark.setId(resultSet.getLong(1));
        remark.setConsumerId(resultSet.getLong(2));
        remark.setOrderId(resultSet.getString(3));
        remark.setItemId(itemId);
        remark.setScore(resultSet.getShort(4));
        remark.setHeader(resultSet.getString(5));
        remark.setContent(resultSet.getString(6));
        remark.setImages(resultSet.getString(7));
        remark.setUsername(resultSet.getString(8));
        remark.setUserface(resultSet.getString(9));
        remark.setCreateTime(resultSet.getString(10));
        return remark;
    }, itemId, expectCount);
}

Следующее, что нужно сделать, это сохранить эту часть контента в кэш, а затем вывести его.

Атомарно заменить список

Рекомендуемая оценка — это список. Я решил использовать тип данных LIST Redis, который может легко выполнять запросы диапазона. См. список оценки в предыдущей статье.

Однако в Redis не предусмотрена операция замены списка напрямую, это может быть достигнуто только путем комбинирования таких команд, как DEL, LRPUSH, RENAME и т. д., но операция комбинирования клиента не является атомарной. требуется:

--删除并创建列表
--params    1           2
--KEYS      列表键名      代理键
--ARGV      列表

redis.call('DEL', KEYS[1])
for i= 1, #ARGV do
    redis.call('RPUSH', KEYS[1], ARGV[i])
end

--延长代理锁的过期时间
redis.call('SET', KEYS[2], 1)
redis.call('EXPIRE',KEYS[2], 3600)

Основной код для запроса оценки рекомендации выглядит следующим образом:

@Cacheable(value = "recommend")
public JSONArray listRecommendRemarks(/*not null*/ String itemId, int start, int stop) {
    try {
        if (remarkRedis.shouldUpdateRecommend(itemId)) {
            //加锁成功,需要加载数据库中的评价内容到redis
            remarkQueue.push(itemId, () -> reloadRecommendRemarks(itemId));
        }

        return appendVoteInfo(remarkRedis.readRecommendRange(itemId, start, stop));
    } catch (Exception e) {
        log.error("function RemarkService.listRecommendRemarks make exception:{} by:{},{},{}", e.getMessage(), itemId, start, stop);
        return SystemConstants.EMPTY_ARRAY;
    }
}

Среди них по-прежнему используется режим суррогатного ключа, так что срок действия списка основных бизнес-данных, хранящихся в Redis, никогда не истекает, что позволяет избежать поломки кеша и частых распределенных блокировок и блокировок.

Некоторый важный код операции redis:

//保存推荐内容并重置过期时间
public void saveRecommendData(String itemId, /*not null*/ List<Remark> list) {
    String[] argv = new String[list.size()];
    for (int i = 0; i < list.size(); i++) {
        argv[i] = JSONObject.fromObject(list.get(i)).toString();
    }
    redisTemplate.execute(resetListScript,
            List.of(RedisKeyConstants.REMARK_RECOMMEND_PREFIX + itemId,
                    RedisKeyConstants.REMARK_RECOMMEND_PROXY_PREFIX + itemId), argv);
}

//读取推荐内容
public JSONArray readRecommendRange(String itemId, int start, int stop) {
    String key = RedisKeyConstants.REMARK_RECOMMEND_PREFIX + itemId;
    return range(start, stop, key);
}

//是否应该更新推荐
public boolean shouldUpdateRecommend(String itemId) {
    Boolean flag = redisTemplate.opsForValue().setIfAbsent(RedisKeyConstants.REMARK_RECOMMEND_PROXY_PREFIX + itemId);
    return flag == null || !flag;
}

Холодный старт и пустые данные

Холодный старт относится к первому подключению службы к сети или перезапуску Redis с нулевым кешем.В это время какой-либо кеш не загружен, или он был загружен ранее, а теперь его больше нет из-за аварии. В это время срок блокировки прокси-сервера истечет, команда SETNX будет выполнена успешно, и поток, который успешно заблокируется, синхронизирует данные базы данных с Redis, так что КЛЮЧ бизнес-данных больше не будет пустым. Если процесс синхронизации завершится неудачно, блокировка автоматически истечет через 2 секунды, а новый поток продолжит выполнение незавершенной задачи. Если бизнес-данные загружены, срок действия блокировки прокси-сервера будет задержан на 1 час, поэтому синхронизация будет запущена через 1 час. Весь процесс асинхронный, и запрошенный пользователем поток будет читать только бизнес-данные KEY, если есть, то вернет, а если нет, то будет пустым. Другими словами, интерфейс становится пустым только в течение нескольких секунд после холодного запуска, что приемлемо, поскольку холодный запуск происходит только тогда, когда новые службы подключены к сети или память Redis не может быть восстановлена.

Пустые данные означают, что содержимое базы данных изначально пусто. В соответствии с приведенными выше дизайнерскими идеями можно сделать вывод, что если содержимое базы данных пусто, то КЛЮЧ бизнес-данных пуст, то есть равен нулю, и заполнитель не сохраняется, потому что прокси-КЛЮЧ уже сыграл роль заполнителя. . С этой точки зрения простой прокси-ключ может предотвратить сбой кеша, предотвратить блокировку синхронизации и заполнители.

следовать за

Методы реализации некоторых лотерей и секкиллов могут быть обновлены.