1. Подробно объясните конфликты параллелизма
В сценарии электронной коммерции рабочий процесс выглядит следующим образом:
- Чтение информации о продукте, включая количество на складе
- Пользователь размещает заказ на покупку
- Обновите информацию о продукте и сократите запасы на единицу
Если это многопоточная операция, может быть несколько потоков, одновременно выполняющих вышеупомянутый трехэтапный процесс.Если два человека читают данные о продукте в это время, два потока одновременно обслуживают двух людей и в то же время несут вне инвентаризации продукта Модификация данных. Правильная ситуация: поток A будет хранить -1, установленный на 99 штук, поток B затем будет читать 99 штук, а затем -1, чтобы стать 98 штуками. Если потоки A и B оба считывают по 100 частей, после обработки A они пересматриваются до 99 частей, а после обработки B пересматриваются до 99 частей, то результат неверен.
2. Решения
2.1 Пессимистическая блокировка
При чтении товарных данных эта строка данных одновременно блокируется.Когда поток завершает обработку данных, он разблокируется и начинает обработку другой поток.
Пессимистическая схема управления параллелизмом блокировки заключается в блокировке во всех случаях. После блокировки с этим фрагментом данных может работать только один поток.В разных сценариях блокировки на нем разные, блокировки на уровне строки, блокировки на уровне таблицы, блокировки чтения и блокировки записи.
2.2 Оптимистическая блокировка
Оптимистическая блокировка не блокирует, и каждый поток может работать произвольно. В каждом документе es есть поле версии, которое после создания нового документа равно 1, оно модифицируется один раз и накапливается.Потоки A и B читают данные одновременно, версия=1, инвентарь равен 99. после обработки A, которая будет записана в es.По сравнению с номером версии в es оба равны 1, значит запись прошла успешно, версия=2, а B после обработки тоже 99. По сравнению с номером версии version=2 данных в es, когда они хранятся в es, это, очевидно, отличается.В это время 99 не будет использоваться для обновления, но самые последние данные будут считаны снова, а затем уменьшены на единицу, чтобы стать 98, и вышеуказанная операция будет выполнена для записи.
2.3 Оптимистическая блокировка Elasticsearch
Бэкенд Elasticsearch многопоточен и асинхронен, а множественные запросы идут не по порядку, может быть так, что более поздняя модификация приходит первой, а первая модификация приходит позже.
Многопоточная асинхронная одновременная модификация Elasticsearch основана на собственном номере версии _version для управления параллельным выполнением оптимистичных блокировок.
Когда постмодификация приходит первой, после завершения модификации, когда первая модификация приходит позже, будет сравниваться номер версии _version, и если он не равен, то он сразу отбрасывается, не нужно. Результат будет сохранен в правильном состоянии.
Пример кода:
PUT /test_index/test_type/3
{
"test_field": "test test"
}
结果:
{
"_index": "test_index",
"_type": "test_type",
"_id": "3",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
修改
PUT /test_index/test_type/3
{
"test_field": "test1 test1"
}
结果
{
"_index": "test_index",
"_type": "test_type",
"_id": "3",
"_version": 2,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
删除
DELETE /test_index/test_type/3
结果:
{
"_index": "test_index",
"_type": "test_type",
"_id": "3",
"_version": 3,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
重新创建
PUT /test_index/test_type/3
{
"test_field": "test1 test1"
}
结果
{
"_index": "test_index",
"_type": "test_type",
"_id": "4",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
Операция удаления также добавит 1 к номеру версии этих данных.
После удаления документа со стороны может быть доказано, что он физически не удаляется сразу, потому что некоторая часть его номера версии и другая информация все еще сохраняются. Сначала удалите документ, а потом пересоздайте документ, фактически он будет основан на версии удаления, а затем добавьте 1 к номеру версии
2.4 Пример управления параллелизмом с оптимистичной блокировкой es
-
Создать новые данные
PUT /test_index/test_type/4 { "test_field": "test" }
-
Имитация двух клиентов, оба получают одинаковые данные
GET /test_index/test_type/4 返回 { "_index": "test_index", "_type": "test_type", "_id": "4", "_version": 1, "found": true, "_source": { "test_field": "test" } }
-
Сначала эти данные обновил один из клиентов, В то же время укажите номер версии данных и убедитесь, что номер версии данных в es совпадает с номером версии данных в клиенте, чтобы его можно было изменить.
PUT test_index/test_type/4?version=1 { "test_field": "client1 changed" } 返回结果 { "_index": "test_index", "_type": "test_type", "_id": "4", "_version": 2, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": false }
-
Другой клиент пытается изменить данные на основе версии = 1, а также предоставляет номер версии для выполнения оптимистичного контроля параллелизма блокировки.
PUT test_index/test_type/4?version=1 { "test_field": "client2 changed" } 会出错,返回 { "error": { "root_cause": [ { "type": "version_conflict_engine_exception", "reason": "[test_type][4]: version conflict, current version [2] is different than the one provided [1]", "index_uuid": "rsiZYqiwSCC2XdR8N2bJow", "shard": "2", "index": "test_index" } ], "type": "version_conflict_engine_exception", "reason": "[test_type][4]: version conflict, current version [2] is different than the one provided [1]", "index_uuid": "rsiZYqiwSCC2XdR8N2bJow", "shard": "2", "index": "test_index" }, "status": 409 }
Оптимистическая блокировка успешно предотвращает проблемы параллелизма
-
После того, как оптимистическая блокировка успешно предотвратит проблемы параллелизма, попробуйте правильно завершить обновление.
Сделайте запрос GET еще раз, чтобы получить версию
GET /test_index/test_type/4 { "_index": "test_index", "_type": "test_type", "_id": "4", "_version": 2, "found": true, "_source": { "test_field": "client1 changed" } }
Измените его на основе последних данных и номера версии. После изменения укажите последний номер версии. Для успешного выполнения этого шага может потребоваться повторить несколько раз, особенно если несколько потоков одновременно часто обновляют одни и те же данные.
PUT /test_index/test_type/4?version=2 { "test_field": "client2 changed" } 返回 { "_index": "test_index", "_type": "test_type", "_id": "4", "_version": 3, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": false }
2.5 Оптимистическое управление параллелизмом блокировки на основе внешней версии
es предоставляет функцию, то есть вы можете выполнять управление параллелизмом без внутреннего номера версии _version, который он предоставляет, вы можете выполнять управление параллелизмом на основе номера версии, который вы поддерживаете сами.
?version=1&version_type=external
version_type=external, единственное отличие состоит в том, что _version может быть изменен только тогда, когда указанная вами версия точно такая же, как _version в es, и сообщение об ошибке будет сообщено, пока оно отличается; когда version_type=external, только когда вы обеспечить Модификация может быть завершена только тогда, когда версия больше, чем _version в es.
es, _version=1, ?version=1, можно успешно обновить
es, _version=1, ?version>1&version_type=external, чтобы добиться успеха, например ?version=2&version_type=external
Пример кода:
-
Сначала создайте данные
PUT test_index/test_type/5 { "test_field": "external test" } 返回 { "_index": "test_index", "_type": "test_type", "_id": "5", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": true }
-
Имитация двух клиентов, запрашивающих эти данные одновременно
GET /test_index/test_type/5 返回 { "_index": "test_index", "_type": "test_type", "_id": "5", "_version": 1, "found": true, "_source": { "test_field": "external test" } }
-
Первый клиент модифицируется первым, в это время программа-клиент получила последний номер версии этих данных в своей базе данных, например, 2
PUT /test_index/test_type/5?version=2&version_type=external { "test_field": "external client1 changed" } 返回 { "_index": "test_index", "_type": "test_type", "_id": "5", "_version": 2, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": false }
-
Смоделируйте второго клиента и в то же время получите номер версии, поддерживаемый в вашей собственной базе данных, который также равен 2, и инициируйте модификацию на основе версии = 2.
PUT /test_index/test_type/5?version=2&version_type=external { "test_field": "external client2 changed" } 会出错,返回 { "error": { "root_cause": [ { "type": "version_conflict_engine_exception", "reason": "[test_type][5]: version conflict, current version [2] is higher or equal to the one provided [2]", "index_uuid": "rsiZYqiwSCC2XdR8N2bJow", "shard": "1", "index": "test_index" } ], "type": "version_conflict_engine_exception", "reason": "[test_type][5]: version conflict, current version [2] is higher or equal to the one provided [2]", "index_uuid": "rsiZYqiwSCC2XdR8N2bJow", "shard": "1", "index": "test_index" }, "status": 409 }
-
После успешного контроля параллелизма повторно инициируйте обновление на основе номера последней версии.
GET /test_index/test_type/5 返回 { "_index": "test_index", "_type": "test_type", "_id": "5", "_version": 2, "found": true, "_source": { "test_field": "external client1 changed" } } PUT /test_index/test_type/5?version=3&version_type=external { "test_field": "external client2 changed" } 返回 { "_index": "test_index", "_type": "test_type", "_id": "5", "_version": 3, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": false }