карта разума
предисловие
проходить через«Введение в ZooKeeper»После этого мы изучили основы использования ZooKeeper.
На самом деле возможности применения ZooKeeper очень обширны, и реализация распределенных блокировок — лишь одна из них. Далее мы реализуем решение распределенной блокировки для ZooKeeper.Проблема в секретерасширять.
1. В чем проблема пиковой перепроданности?
Здесь не должно быть незнакомых секкилл-активностей, поэтому нет необходимости объяснять слишком много.
Нетрудно представить, что в этом сценарии «seckill» действительно будут ситуации, когда несколько пользователей конкурируют за «ресурсы».То есть одновременно идет несколько потоков, в этом случае легко иметь неточные данные, то есть проблема перепроданности..
1.1 Демонстрация проекта
Используя демонстрацию программы ниже, я использовалSpringBoot2.0, Mybatis, Mybatis-Plus, SpringMVCСоберите простой проект, адрес github:
Создайте таблицу информации о продукте:
CREATE TABLE `tb_commodity_info` (
`id` varchar(32) NOT NULL,
`commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',
`commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',
`number` int(10) DEFAULT '0' COMMENT '商品数量',
`description` varchar(2048) DEFAULT '' COMMENT '商品描述',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';
Добавить элемент[Чар Сью Бао] в:
Логика основного кода выглядит следующим образом:
@Override
public boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception {
//1.先查询数据库中商品的数量
TbCommodityInfo commodityInfo = commodityInfoMapper.selectById(commodityId);
//2.判断商品数量是否大于0,或者购买的数量大于库存
Integer count = commodityInfo.getNumber();
if (count <= 0 || number > count) {
//商品数量小于或者等于0,或者购买的数量大于库存,则返回false
return false;
}
//3.如果库存数量大于0,并且购买的数量小于或者等于库存。则更新商品数量
count -= number;
commodityInfo.setNumber(count);
boolean bool = commodityInfoMapper.updateById(commodityInfo) == 1;
if (bool) {
//如果更新成功,则打印购买商品成功
System.out.println("购买商品[ " + commodityInfo.getCommodityName() + " ]成功,数量为:" + number);
}
return bool;
}
Логическая схема выглядит следующим образом:
Приведенная выше логика, если это однопоточный запрос, проблем нет.
Но есть проблема с многопоточностью. Теперь я создам несколько потоков, сделаю запросы через HttpClient и посмотрю, что произойдет:
public static void main(String[] args) throws Exception {
//请求地址
String url = "http://localhost:8080/mall/commodity/purchase";
//请求参数,商品ID,数量
Map<String, String> map = new HashMap<>();
map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
map.put("number", "1");
//创建10个线程通过HttpClient进行发送请求,测试
for (int i = 0; i < 10; i++) {
//这个线程的逻辑仅仅是发送请求
CommodityThread commodityThread = new CommodityThread(url, map);
commodityThread.start();
}
}
Чтобы объяснить, количество булочек со свининой, приготовленных на гриле, равно 100. Одновременно можно купить 10 ниток.Если все покупки успешны, количество инвентаря должно быть 90.
На самом деле 10 потоков действительно успешно куплены:
Однако товарный запас в базе неточен:
2. Попробуйте использовать локальную блокировку
В приведенном выше сценарии приблизительный процесс выглядит следующим образом:
проблему можно увидетьСуть в том, что два потока «одновременно» запрашивают оставшуюся инвентаризацию, а затем обновляют инвентаризацию, что приводит к. Фактически для решения этой проблемыПросто убедитесь, что несколько потоков выполняются последовательно в этой логике, то есть блокировка.
Существует два типа локальных блокировок, предоставляемых JDK: синхронизированные блокировки и блокировки.
В любом случае, я использую здесь синхронизированный для простоты:
//使用synchronized修饰方法
@Override
public synchronized boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception {
//省略...
}
Затем протестируйте многопоточную одновременную привязку прямо сейчас и посмотрите на результаты:
Задача решена! ! !
Как вы думаете, это закончилось, как это, глядя на индикатор выполнения, это не просто.
Мы знаем, что в реальных проектах часто развертывается только один сервер, поэтому мы могли бы также запустить два сервера с номерами портов 8080 и 8081, чтобы смоделировать реальный сценарий проекта:
Напишите тестовый скрипт, который поочередно запрашивает аналоговые несколько серверов для обработки запросов, а пользователь добавляет сцену:
public static void main(String[] args) throws Exception {
//请求地址
String url = "http://localhost:%s/mall/commodity/purchase";
//请求参数,商品ID,数量
Map<String, String> map = new HashMap<>();
map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
map.put("number", "1");
//创建10个线程通过HttpClient进行发送请求,测试
for (int i = 0; i < 10; i++) {
//8080、8081交替请求,每个服务器处理5个请求
String port = "808" + (i % 2);
CommodityThread commodityThread = new CommodityThread(String.format(url, port), map);
commodityThread.start();
}
}
Прежде всего, посмотрите на ситуацию покупки, это должна быть удачная покупка:
Ключевым моментом является правильность количества запасов:
Есть 10 запросов на успешную покупку, инвентарь должен быть 90, а здесь инвентарь 95. оказываетсяЛокальная блокировка не может решить проблему перепроданности при флэш-покупке нескольких серверов..
Почему это происходит, смотрите схему:
По сути, это похоже на проблему многопоточности.Несколько серверов запрашивают базу данных, получают одну и ту же инвентаризацию, а затем обновляют инвентаризацию, что приводит к неверным данным.. Чтобы обеспечить правильное количество на складе,Суть в том, что несколько серверов должны гарантировать, что только один сервер выполняет эту логику., то есть добавить распределенную блокировку.
Это также отражает роль распределенных блокировок, которые гарантируют, что несколько серверов могут выполняться только одним сервером.
Существует три реализации распределенных блокировок, а именно Redis, ZooKeeper и базы данных (например, mysql).
3. Используйте ZooKeeper для реализации распределенных блокировок
3.1 Принцип
Фактически распределенные блокировки реализуются с использованием характеристик временных последовательных узлов ZooKeeper. Как этого добиться?
Предполагая, что теперь есть клиент A, который необходимо заблокировать, создается временный узел последовательности по пути «/Lock». Затем получите список узлов в разделе «/Lock» и оцените, является ли его серийный номер наименьшим.Если это наименьший серийный номер, блокировка выполнена успешно!
Теперь есть еще один клиент.Клиент Б должен быть заблокирован, поэтому временный узел последовательности также создается по пути «/Lock». По-прежнему получите список узлов в разделе «/ Lock» и оцените, является ли его собственный номер узла наименьшим. Выясняется, что он не самый маленький, блокировка не срабатывает, и тогда он следит за своим предыдущим узлом.
Как снять блокировку, по сути, это удалить временный узел. Предположим, что клиент А снимает блокировку и удаляет узел 01. Это вызовет событие прослушивания узла 02, и клиент снова получит список узлов, а затем определит, является ли это наименьшим порядковым номером, и если это наименьший порядковый номер, он будет заблокирован.
Если несколько клиентов на самом деле одинаковы, временный узел будет создан, как только они появятся, а затем они начнут определять, являются ли они наименьшим серийным номером.Если нет, они будут отслеживать предыдущий узел, чтобы сформировать механизм очередей. . Он также формирует эффект блокировки, гарантируя, что выполняется только одно выполнение нескольких серверов.
Если предположить, что один из клиентов не работает, согласно характеристикам временного узла, ZooKeeper автоматически удалит соответствующий временный узел., что эквивалентно автоматическому снятию блокировки.
3.2 Рукописный код для реализации распределенной блокировки
Сначала добавьте зависимости Maven
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.4</version>
</dependency>
Затем следуйте идеям приведенного выше анализа, чтобы ввести код для создания класса ZkLock:
public class ZkLock implements Lock {
//计数器,用于加锁失败时,阻塞
private static CountDownLatch cdl = new CountDownLatch(1);
//ZooKeeper服务器的IP端口
private static final String IP_PORT = "127.0.0.1:2181";
//锁的根路径
private static final String ROOT_NODE = "/Lock";
//上一个节点的路径
private volatile String beforePath;
//当前上锁的节点路径
private volatile String currPath;
//创建ZooKeeper客户端
private ZkClient zkClient = new ZkClient(IP_PORT);
public ZkLock() {
//判断是否存在根节点
if (!zkClient.exists(ROOT_NODE)) {
//不存在则创建
zkClient.createPersistent(ROOT_NODE);
}
}
//加锁
public void lock() {
if (tryLock()) {
System.out.println("加锁成功!!");
} else {
// 尝试加锁失败,进入等待 监听
waitForLock();
// 再次尝试加锁
lock();
}
}
//尝试加锁
public synchronized boolean tryLock() {
// 第一次就进来创建自己的临时节点
if (StringUtils.isBlank(currPath)) {
currPath = zkClient.createEphemeralSequential(ROOT_NODE + "/", "lock");
}
// 对节点排序
List<String> children = zkClient.getChildren(ROOT_NODE);
Collections.sort(children);
// 当前的是最小节点就返回加锁成功
if (currPath.equals(ROOT_NODE + "/" + children.get(0))) {
return true;
} else {
// 不是最小节点 就找到自己的前一个 依次类推 释放也是一样
int beforePathIndex = Collections.binarySearch(children, currPath.substring(ROOT_NODE.length() + 1)) - 1;
beforePath = ROOT_NODE + "/" + children.get(beforePathIndex);
//返回加锁失败
return false;
}
}
//解锁
public void unlock() {
//删除节点并关闭客户端
zkClient.delete(currPath);
zkClient.close();
}
//等待上锁,加锁失败进入阻塞,监听上一个节点
private void waitForLock() {
IZkDataListener listener = new IZkDataListener() {
//监听节点更新事件
public void handleDataChange(String s, Object o) throws Exception {
}
//监听节点被删除事件
public void handleDataDeleted(String s) throws Exception {
//解除阻塞
cdl.countDown();
}
};
// 监听上一个节点
this.zkClient.subscribeDataChanges(beforePath, listener);
//判断上一个节点是否存在
if (zkClient.exists(beforePath)) {
//上一个节点存在
try {
System.out.println("加锁失败 等待");
//加锁失败,阻塞等待
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 释放监听
zkClient.unsubscribeDataChanges(beforePath, listener);
}
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
public void lockInterruptibly() throws InterruptedException {
}
public Condition newCondition() {
return null;
}
}
Добавьте блокировку на уровне контроллера:
@PostMapping("/purchase")
public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId, @RequestParam(name = "number") Integer number) throws Exception {
boolean bool;
//获取ZooKeeper分布式锁
ZkLock zkLock = new ZkLock();
try {
//上锁
zkLock.lock();
//调用秒杀抢购的service方法
bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);
} catch (Exception e) {
e.printStackTrace();
bool = false;
} finally {
//解锁
zkLock.unlock();
}
return bool;
}
Для теста осталось еще два сервера, 8080 и 8081. Затем запустите тестовый скрипт:
public static void main(String[] args) throws Exception {
//请求地址
String url = "http://localhost:%s/mall/commodity/purchase";
//请求参数,商品ID,数量
Map<String, String> map = new HashMap<>();
map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
map.put("number", "1");
//创建10个线程通过HttpClient进行发送请求,测试
for (int i = 0; i < 10; i++) {
//8080、8081交替请求
String port = "808" + (i % 2);
CommodityThread commodityThread = new CommodityThread(String.format(url, port), map);
commodityThread.start();
}
}
Результат правильный:
3.3 Готовое колесо
Curator — это фреймворк для работы ZooKeeper с открытым исходным кодом от Apache. Среди них функция реализации распределенных блокировок ZooKeeper.
Конечно, реализация распределенных блокировок — это лишь малая часть этого фреймворка, и есть много других применений.Официальный сайтучиться.
Сначала добавьте зависимости Maven:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
Это все равно, чтобы заблокировать, где он должен быть заблокирован:
@PostMapping("/purchase")
public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId,
@RequestParam(name = "number") Integer number) throws Exception {
boolean bool = false;
//设置重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
// 启动客户端
client.start();
InterProcessMutex mutex = new InterProcessMutex(client, "/locks");
try {
//加锁
if (mutex.acquire(3, TimeUnit.SECONDS)) {
//调用抢购秒杀service方法
bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//解锁
mutex.release();
client.close();
}
return bool;
}
В-четвертых, яма столкнулась
Я попытался написать распределенные блокировки с помощью родного ZooKeeper, и это было что-то вроде взрыва. Столкнулся с множеством ям и, наконец, сдался и использовал API zkclient. Может быть, я слишком овощ, чтобы использовать его.
Ниже я расскажу о некоторых проблемах, с которыми я столкнулся, и я надеюсь, что вы сможете быстро определить проблему, когда столкнетесь с таким же типом исключения.
4.1 Session expired
Эта ошибка связана с использованием собственного API ZooKeeper. В основном, когда я вхожу в режим отладки для отладки.
Поскольку нативному ZooKeeper необходимо установить время ожидания сеанса, в общем режиме отладки мы застрянем на одном месте для отладки, что определенно превысит установленное время сеанса~
4.2 KeeperErrorCode = ConnectionLoss
Это тоже ошибка нативного API ZooKeeper, как она появилась?
Основная причина в том, что созданный клиент ZooKeeper асинхронен при подключении к серверу, так как подключение требует времени, код уже начал выполняться create() или exists(), после чего сообщается об ошибке.
Решение. Используйте счетчик CountDownLatch для блокировки, остановите блокировку после успешного подключения, а затем выполните такие операции, как create() или exists().
4.3 Несогласованность данных в одновременных обновлениях запросов
Эта ошибка действительно разрывается~
Сначала я добавил распределенные блокировки на сервисный слой, а потом подумал, что дело сделано. Затем запустите 8080, 8081 для параллельного тестирования. 10 потоков все успешно куплено, но результат не правильный!
Первой реакцией было то, что возникла проблема с кодом, который я реализовал, поэтому я заменил его на распределенную блокировку, реализованную фреймворком куратора, фреймворк с открытым исходным кодом должен подойти. Я не думал, что это сработает~
Поскольку это не проблема самой блокировки, это не проблема транзакции.Операция предыдущей транзакции по обновлению инвентаря не была отправлена, а затем придет следующий запрос на запрос. Поэтому я немного увеличил область блокировки и поместил ее на слой контроллера.. Это сработало!
Вы, возможно, заметили, что в моем примере выше я добавляю распределенные блокировки на уровень контроллера.На самом деле, я не люблю писать слишком много кода на уровне контроллера.
Может быть, есть более элегантный способ, но, к сожалению, у меня недостаточно возможностей, если у вас есть лучший способ добиться этого, вы можете поделиться им ~
Дополнение: Большой парень в комментарии ниже сказал, что можно обернуть слой вне оригинального метода и протестировать его самостоятельно. Это должно быть делом бизнеса.
Можно ли успешно разместить его на уровне контроллера, потому что на уровне контроллера нет транзакции?Первоначально я написал аннотацию @Transactional в классе для службы, поэтому во всем классе есть транзакции, поэтому транзакция не был отправлен на обновление базы после разблокировки., а потом приходит следующий запрос и находит данные, которые не были обновлены.
Чтобы быть более элегантным, поместите аннотацию @Transactional в метод службы snapped.
Затем оберните нетранзакционный метод блокировки.
V. Резюме
Наконец, давайте рассмотрим и подведем итоги:
- Прежде всего, мы моделируем сценарий многопоточного шипа на одной машине.Если используется одна машина, для решения проблемы можно использовать локальные блокировки.
- Затем смоделируйте сценарий многопоточности на нескольких серверах, идея состоит в том, чтобы использовать ZooKeeper для реализации решения распределенной блокировки.
- Проиллюстрируйте, как ZooKeeper реализует распределенные блокировки.
- Затем вручную напишите код для реализации распределенных блокировок.
- Наконец, суммируйте найденные ямы.
Надеюсь, эта статья будет вам полезна
Если вы хотите впервые увидеть мои обновленные статьи, вы можете выполнить поиск в общедоступной учетной записи на WeChat "java技术爱好者
",Не хочу быть соленой рыбой, я программист, стремящийся запомниться всем. Увидимся в следующий раз! ! !
Возможности ограничены, если есть какие-то ошибки или неуместности, просьба критиковать и исправлять их, учиться и обмениваться вместе!