Elasticsearch — конфликты параллелизма и решения

Elasticsearch

1. Подробно объясните конфликты параллелизма

В сценарии электронной коммерции рабочий процесс выглядит следующим образом:

  1. Чтение информации о продукте, включая количество на складе
  2. Пользователь размещает заказ на покупку
  3. Обновите информацию о продукте и сократите запасы на единицу

Если это многопоточная операция, может быть несколько потоков, одновременно выполняющих вышеупомянутый трехэтапный процесс.Если два человека читают данные о продукте в это время, два потока одновременно обслуживают двух людей и в то же время несут вне инвентаризации продукта Модификация данных. Правильная ситуация: поток A будет хранить -1, установленный на 99 штук, поток B затем будет читать 99 штук, а затем -1, чтобы стать 98 штуками. Если потоки A и B оба считывают по 100 частей, после обработки A они пересматриваются до 99 частей, а после обработки B пересматриваются до 99 частей, то результат неверен.

2. Решения

2.1 Пессимистическая блокировка

При чтении товарных данных эта строка данных одновременно блокируется.Когда поток завершает обработку данных, он разблокируется и начинает обработку другой поток.

Пессимистическая схема управления параллелизмом блокировки заключается в блокировке во всех случаях. После блокировки с этим фрагментом данных может работать только один поток.В разных сценариях блокировки на нем разные, блокировки на уровне строки, блокировки на уровне таблицы, блокировки чтения и блокировки записи.

2.2 Оптимистическая блокировка

Оптимистическая блокировка не блокирует, и каждый поток может работать произвольно. В каждом документе es есть поле версии, которое после создания нового документа равно 1, оно модифицируется один раз и накапливается.Потоки A и B читают данные одновременно, версия=1, инвентарь равен 99. после обработки A, которая будет записана в es.По сравнению с номером версии в es оба равны 1, значит запись прошла успешно, версия=2, а B после обработки тоже 99. По сравнению с номером версии version=2 данных в es, когда они хранятся в es, это, очевидно, отличается.В это время 99 не будет использоваться для обновления, но самые последние данные будут считаны снова, а затем уменьшены на единицу, чтобы стать 98, и вышеуказанная операция будет выполнена для записи.

2.3 Оптимистическая блокировка Elasticsearch

Бэкенд Elasticsearch многопоточен и асинхронен, а множественные запросы идут не по порядку, может быть так, что более поздняя модификация приходит первой, а первая модификация приходит позже.

Многопоточная асинхронная одновременная модификация Elasticsearch основана на собственном номере версии _version для управления параллельным выполнением оптимистичных блокировок.

Когда постмодификация приходит первой, после завершения модификации, когда первая модификация приходит позже, будет сравниваться номер версии _version, и если он не равен, то он сразу отбрасывается, не нужно. Результат будет сохранен в правильном состоянии.

Пример кода:

PUT /test_index/test_type/3
{
  "test_field": "test test"
}
结果:
{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "3",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}
修改
PUT /test_index/test_type/3
{
  "test_field": "test1 test1"
}
结果
{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "3",
  "_version": 2,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}
删除
DELETE /test_index/test_type/3
结果:
{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "3",
  "_version": 3,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}
重新创建
PUT /test_index/test_type/3
{
  "test_field": "test1 test1"
}
结果
{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "4",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}

Операция удаления также добавит 1 к номеру версии этих данных.

После удаления документа со стороны может быть доказано, что он физически не удаляется сразу, потому что некоторая часть его номера версии и другая информация все еще сохраняются. Сначала удалите документ, а потом пересоздайте документ, фактически он будет основан на версии удаления, а затем добавьте 1 к номеру версии

2.4 Пример управления параллелизмом с оптимистичной блокировкой es

  • Создать новые данные

    PUT /test_index/test_type/4
    {
      "test_field": "test"
    }
    
  • Имитация двух клиентов, оба получают одинаковые данные

    GET /test_index/test_type/4
    返回
    {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "4",
        "_version": 1,
        "found": true,
        "_source": {
          "test_field": "test"
        }
     }
    
  • Сначала эти данные обновил один из клиентов, В то же время укажите номер версии данных и убедитесь, что номер версии данных в es совпадает с номером версии данных в клиенте, чтобы его можно было изменить.

    PUT test_index/test_type/4?version=1
    {
      "test_field": "client1 changed"
    }
    返回结果
    {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "4",
        "_version": 2,
        "result": "updated",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "created": false
    }
    
  • Другой клиент пытается изменить данные на основе версии = 1, а также предоставляет номер версии для выполнения оптимистичного контроля параллелизма блокировки.

    PUT test_index/test_type/4?version=1
    {
      "test_field": "client2 changed"
    }
    会出错,返回
    {
        "error": {
          "root_cause": [
            {
              "type": "version_conflict_engine_exception",
              "reason": "[test_type][4]: version conflict, current version [2] is different than the one provided [1]",
              "index_uuid": "rsiZYqiwSCC2XdR8N2bJow",
              "shard": "2",
              "index": "test_index"
            }
          ],
          "type": "version_conflict_engine_exception",
          "reason": "[test_type][4]: version conflict, current version [2] is different than the one provided [1]",
          "index_uuid": "rsiZYqiwSCC2XdR8N2bJow",
          "shard": "2",
          "index": "test_index"
        },
        "status": 409
    }
    

    Оптимистическая блокировка успешно предотвращает проблемы параллелизма

  • После того, как оптимистическая блокировка успешно предотвратит проблемы параллелизма, попробуйте правильно завершить обновление.

    Сделайте запрос GET еще раз, чтобы получить версию

    GET /test_index/test_type/4
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "4",
      "_version": 2,
      "found": true,
      "_source": {
        "test_field": "client1 changed"
      }
    }
    

    Измените его на основе последних данных и номера версии. После изменения укажите последний номер версии. Для успешного выполнения этого шага может потребоваться повторить несколько раз, особенно если несколько потоков одновременно часто обновляют одни и те же данные.

    PUT /test_index/test_type/4?version=2
    {
      "test_field": "client2 changed"
    }
    返回
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "4",
      "_version": 3,
      "result": "updated",
      "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
      },
      "created": false
    }
    

2.5 Оптимистическое управление параллелизмом блокировки на основе внешней версии

es предоставляет функцию, то есть вы можете выполнять управление параллелизмом без внутреннего номера версии _version, который он предоставляет, вы можете выполнять управление параллелизмом на основе номера версии, который вы поддерживаете сами.

?version=1&version_type=external

version_type=external, единственное отличие состоит в том, что _version может быть изменен только тогда, когда указанная вами версия точно такая же, как _version в es, и сообщение об ошибке будет сообщено, пока оно отличается; когда version_type=external, только когда вы обеспечить Модификация может быть завершена только тогда, когда версия больше, чем _version в es.

es, _version=1, ?version=1, можно успешно обновить
es, _version=1, ?version>1&version_type=external, чтобы добиться успеха, например ?version=2&version_type=external

Пример кода:

  • Сначала создайте данные

    PUT test_index/test_type/5
    {
      "test_field": "external test"
    }
    返回
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "5",
      "_version": 1,
      "result": "created",
      "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
      },
      "created": true
    }
    
  • Имитация двух клиентов, запрашивающих эти данные одновременно

    GET /test_index/test_type/5
    返回
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "5",
      "_version": 1,
      "found": true,
      "_source": {
        "test_field": "external test"
      }
    }
    
  • Первый клиент модифицируется первым, в это время программа-клиент получила последний номер версии этих данных в своей базе данных, например, 2

    PUT /test_index/test_type/5?version=2&version_type=external
    {
      "test_field": "external client1 changed"
    }
    返回
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "5",
      "_version": 2,
      "result": "updated",
      "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
      },
      "created": false
    }
    
  • Смоделируйте второго клиента и в то же время получите номер версии, поддерживаемый в вашей собственной базе данных, который также равен 2, и инициируйте модификацию на основе версии = 2.

    PUT /test_index/test_type/5?version=2&version_type=external
    {
      "test_field": "external client2 changed"
    }
    会出错,返回
    {
      "error": {
        "root_cause": [
          {
            "type": "version_conflict_engine_exception",
            "reason": "[test_type][5]: version conflict, current version [2] is higher or equal to the one provided [2]",
            "index_uuid": "rsiZYqiwSCC2XdR8N2bJow",
            "shard": "1",
            "index": "test_index"
          }
        ],
        "type": "version_conflict_engine_exception",
        "reason": "[test_type][5]: version conflict, current version [2] is higher or equal to the one provided [2]",
        "index_uuid": "rsiZYqiwSCC2XdR8N2bJow",
        "shard": "1",
        "index": "test_index"
      },
      "status": 409
    }
    
  • После успешного контроля параллелизма повторно инициируйте обновление на основе номера последней версии.

    GET /test_index/test_type/5
    返回
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "5",
      "_version": 2,
      "found": true,
      "_source": {
        "test_field": "external client1 changed"
      }
    }
    PUT /test_index/test_type/5?version=3&version_type=external
    {
      "test_field": "external client2 changed"
    }
    返回
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "5",
      "_version": 3,
      "result": "updated",
      "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
      },
      "created": false
    }