Как спроектировать и реализовать поточно-ориентированную карту? (Часть 2)

задняя часть Go
Как спроектировать и реализовать поточно-ориентированную карту? (Часть 2)

В прошлой статье мы обсудили, как реализовать Карту, а также обсудили ряд моментов оптимизации. В следующей части мы продолжим обсуждение того, как реализовать потокобезопасную карту. Когда дело доходит до безопасности потоков, нам нужно начать с концепции.

Потокобезопасность — это когда ваш блок кода находится в процессе с несколькими потоками, работающими одновременно, и эти потоки могут запускать этот код одновременно. Если результат каждого прогона такой же, как и результат однопоточного прогона, а значения других переменных такие же, как и ожидалось, он является потокобезопасным.

Блок кода может не быть потокобезопасным, если он содержит обновления общих данных. Но если аналогичные операции в блоке кода находятся в критической секции, то блок кода является потокобезопасным.

Как правило, существует два способа избежать условий гонки для достижения безопасности потоков:

Категория 1 — Избегайте общего состояния

  1. возвращающийсяRe-entrancy)

Часто в вопросах безопасности потоков наиболее распространенным блоком кода является функция. Самый эффективный способ сделать функцию потокобезопасной — сделать ее реентерабельной. Можно сказать, что функция является реентерабельной, если все потоки в процессе могут вызывать функцию одновременно, и функция выдает ожидаемый результат независимо от фактического выполнения их вызова функции.

Если функция принимает общие данные в качестве возвращаемого результата или включает их в свой возвращаемый результат, то эта функция определенно не является реентерабельной. Любая функция, содержащая код, управляющий общими данными, не является реентерабельной.

Чтобы реализовать потокобезопасные функции, можно поместить весь код в критическую секцию. Однако использование мьютексов всегда будет потреблять определенное количество системных ресурсов и времени, и в процессе использования мьютексов всегда будут различные игры и компромиссы. Поэтому, пожалуйста, разумно используйте мьютексы для защиты кода, включающего операции с общими данными.

Уведомление: реентерабельность — это просто достаточное и ненужное условие для потокобезопасности,не достаточное условие. Этот контрпример будет рассмотрен ниже.

  1. локальное хранилище потока

Если переменная уже локализована, то у каждого потока есть своя собственная копия. Эти переменные сохраняют свои значения за пределами подпрограммы и других границ кода и являются потокобезопасными, поскольку эти переменные хранятся локально для каждого потока, даже если код, обращающийся к ним, может одновременно выполняться другим потоком.

  1. неизменный

После инициализации объекта его нельзя изменить. Это означает, что совместно используются только данные только для чтения, что также обеспечивает присущую потокобезопасность. Изменяемые (не постоянные) операции можно реализовать, создав для них новые объекты, а не модифицируя существующие. Java, С# и
Реализация строк в Python использует этот подход.

Вторая категория — синхронизация потоков

Методы первого типа относительно просты и могут быть реализованы посредством преобразования кода. Однако если вы столкнулись с ситуацией, когда вам необходимо обмениваться данными в потоках, первый тип метода не может быть решен. В это время появился второй тип решения, использующий метод синхронизации потоков для решения проблемы безопасности потоков.

Давайте начнем сегодня с синхронизации потоков.


1. Теория синхронизации потоков

В многопоточных программах общие данные часто используются как средство передачи данных между потоками. Поскольку значительная часть адресов виртуальной памяти, принадлежащих процессу, может совместно использоваться всеми потоками процесса, большая часть этих общих данных хранится в пространстве памяти. Если два потока читают один и тот же фрагмент общей памяти одновременно, но получают разные данные, программа подвержена некоторым ошибкам.

Чтобы обеспечить непротиворечивость общих данных, самый простой и тщательный способ — сделать данные инвариантными. Конечно, такой абсолютный подход в большинстве случаев неосуществим. Например, в функции будет использоваться счетчик для записи количества вызовов функции. Этот счетчик не должен быть установлен как константа. В случае, когда это должна быть переменная, должна быть гарантирована согласованность общих данных, что приводит к понятию критической секции.

Появление критической секции должно сделать область доступной или выполняться только последовательно. Критическая секция может быть ресурсом или фрагментом кода. Самый эффективный способ гарантировать наличие критических секций — использовать механизмы синхронизации потоков.

Во-первых, вводятся два метода совместного использования синхронизации данных.

1. Мьютекс

В то же время ограничение, что только один поток может находиться в критической секции, называется взаимным исключением.Каждая нить должна заблокировать объект перед входом в критическую секцию.В критическую секцию может войти только тот поток, который успешно заблокировал объект, иначе заблокируют. Этот объект называется мьютексом или мьютексом.

Вообще говоря, блокировка взаимного исключения, о которой мы обычно говорим, может достичь этой цели.

Может быть несколько мьютексов, и может быть несколько критических секций, которые они защищают. Начнем с простого, мьютекса и критической секции.

(1) Мьютекс и критическая секция

На рисунке выше показан пример мьютекса и критической секции. Когда поток 1 впервые входит в критическую секцию, текущая критическая секция находится в разблокированном состоянии, поэтому она сначала блокируется. Поток 1 получает значение в критической секции.

В это время поток 2 готов войти в критическую область.Поскольку поток 1 блокирует критическую область, поток 2 не может войти в критическую область, и поток 2 переходит из состояния готовности в состояние сна. Поток 1 продолжает запись в общие данные в критической секции.

Когда поток 1 завершает все операции, поток 1 вызывает операцию разблокировки. Когда критический раздел разблокирован, он попытается разбудить спящий поток 2. После пробуждения поток 2 снова переходит из состояния сна в состояние готовности. Поток 2 готов войти в критическую секцию.Когда критическая секция находится в разблокированном состоянии, поток 2 блокирует критическую секцию.

После серии операций чтения и записи он будет окончательно разблокирован, когда покинет критическую секцию.

Когда поток покидает критическую секцию, он должен не забыть разблокировать соответствующий мьютекс. Таким образом, другие потоки, спящие из-за блокировки критической секции, все еще имеют шанс проснуться. Следовательно, блокировка и разблокировка одной и той же переменной мьютекса должны происходить парами. Невозможно ни многократно заблокировать мьютекс, ни разблокировать мьютекс несколько раз.

Многократная блокировка мьютекса может привести к тому, что критический раздел в конечном итоге заблокируется навсегда. Некоторые люди могут спросить, что происходит, когда разблокированный мьютекс становится разблокированным несколько раз?

До Go 1.8 многократная разблокировка мьютекса не приводила к блокировке какой-либо горутины, но могла вызвать панику во время выполнения. До Go 1.8 можно было попытаться восстановиться от этой паники, но после восстановления это может вызвать ряд проблем, например, горутина повторных операций разблокировки будет навсегда заблокирована. Так что после Go 1.8 такие паники во время выполнения стали неустранимыми. Следовательно, повторная разблокировка переменной мьютекса приведет к выполнению операции во время выполнения, и в конечном итоге программа аварийно завершится.

(2) Несколько мьютексов и критическая секция

В этом случае очень легко вызвать взаимоблокировку потока. Так что старайтесь не перекрывать критические секции, защищенные разными мьютексами.

В приведенном выше примере в критической секции есть 2 мьютекса: мьютекс A и мьютекс
Б.

Поток 1 сначала блокирует мьютекс A, а затем поток 2 блокирует мьютекс B. Когда поток 1 никогда не освобождает мьютекс A, пока он успешно не заблокирует мьютекс B. Точно так же поток 2 никогда не освободит мьютекс B, пока он успешно не заблокирует мьютекс A. Затем в это время и поток 1, и поток 2 преобразуются из состояния готовности в состояние сна, поскольку они не могут заблокировать мьютекс, который им нужно заблокировать. Это тупик потока.

Причины взаимоблокировки потока следующие:

    1. конкуренция за системные ресурсы
    1. Порядок рекомендации процесса является незаконным
    1. Необходимые условия взаимоблокировки (если какое-либо из необходимых условий не выполняется, взаимоблокировка не возникает)
      (1) Условия взаимного исключения
      (2) Отсутствие условия депривации
      (3) Условия запроса и удержания
      (4) Циклическое состояние ожидания

Есть несколько способов избежать взаимоблокировок потоков:

    1. предотвращение взаимоблокировок
      (1) Упорядоченное распределение ресурсов (разрушение условий ожидания цикла)
      (2) Метод атомарного распределения ресурсов (удаление условий запроса и удержания)
    1. избежать тупика
      Алгоритм банкира
    1. Обнаружение взаимоблокировки
      Теория тупика (метод упрощения графа распределения ресурсов), хотя этот метод можно обнаружить, его нельзя предотвратить.Если обнаружен тупик, он должен сотрудничать с методом разблокировки тупика.

Есть несколько способов полностью решить тупиковую ситуацию:

    1. лишение ресурсов
    1. отменить процесс
    1. попробуй заблокировать - назад
      Если вам нужно заблокировать две переменные одну за другой (в неопределенном порядке) при выполнении блока кода, то после успешной блокировки одного из мьютексов следует использовать метод пробной блокировки для блокировки другой переменной. Если попытка заблокировать второй мьютекс терпит неудачу, заблокированный первый мьютекс разблокируется, а два мьютекса снова и снова блокируются.

Как показано на рисунке выше, когда поток 2 блокирует мьютекс B, он снова пытается заблокировать мьютекс A. В этот раз блокировка не удалась, поэтому он также разблокирует мьютекс B. Затем поток 1 придет, чтобы заблокировать мьютекс A. В это время тупика не будет.

    1. Фиксированная блокировка ордера

Этот метод позволяет потоку 1 и потоку 2 блокировать мьютекс в одном и том же порядке, а мьютекс 2 блокировать только после успешной блокировки мьютекса 1. Это гарантирует, что никакие другие потоки, которым также необходимо заблокировать эти мьютексы, не войдут туда, пока один поток полностью не покинет перекрывающиеся критические секции.

(3) Несколько мьютексов и несколько критических секций

В случае нескольких критических секций и множественных мьютексов зависит от того, будут ли конфликтующие области.Если есть конфликтующая область, которая пересекается друг с другом, поток, входящий в критическую секцию, будет переходить в состояние сна до тех пор, пока поток в критической раздел завершает задачу. , проснуться снова.

В общем, мьютексы следует использовать как можно реже. Критическая секция, защищаемая каждым мьютексом, должна быть достаточно большой и максимально возможной. Однако, если обнаруживается, что несколько потоков часто входят и выходят из более крупной критической секции, и между ними часто возникают конфликты доступа, то более крупная критическая секция должна быть разделена на более мелкие и защищена разными мьютексами. Цель этого состоит в том, чтобы уменьшить количество потоков, ожидающих входа в один и тот же критический раздел, тем самым уменьшая вероятность блокировки потоков и уменьшая время, в течение которого они вынуждены спать, что в определенной степени улучшает общую производительность программы.

Прежде чем говорить о другом методе синхронизации потоков, ответьте на вопрос, оставленный в начале статьи: повторный вход — это лишь достаточное и необходимое условие для потокобезопасности, а не достаточное и необходимое условие. Этот контрпример будет рассмотрен ниже.

Самый важный момент в этом вопросе:мьютекс не реентерабельный.

Например:

В приведенном ниже коде функция increment_counter является потокобезопасной, но не реентерабельной.


#include <pthread.h>

int increment_counter ()
{
    static int counter = 0;
    static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

    pthread_mutex_lock(&mutex);

    // only allow one thread to increment at a time
    ++counter;
    // store value before any other threads increment it further
    int result = counter;    

    pthread_mutex_unlock(&mutex);

    return result;
}

В приведенном выше коде функцию increment_counter можно вызывать из нескольких потоков, поскольку существует блокировка мьютекса для синхронизации доступа к общей переменной counter. Но если эта функция используется в реентерабельном обработчике прерывания, если в
pthread_mutex_lock(&мьютекс) и pthread_mutex_unlock(&мьютекс)
Если есть другое прерывание, вызывающее функцию increment_counter, функция будет выполнена во второй раз, в этот раз, поскольку мьютекс заблокирован, функция будет заблокирована в pthread_mutex_lock(&mutex), а поскольку у мьютекса нет шансов быть заблокированным
разблокировать, блокировка будет длиться вечно. Короче проблема в томpthreadМьютекс не является реентерабельным.

Решение состоит в том, чтобы установить свойство PTHREAD_MUTEX_RECURSIVE. Однако для данной задачи явно слишком дорого использовать мьютекс для защиты простой операции приращения, поэтомуc++11серединаатомарная переменная&action=edit&redlink=1) предоставляет альтернативу, которая делает эту функцию и потокобезопасной, и реентерабельной (и более лаконичной):


#include <atomic>

int increment_counter ()
{
    static std::atomic<int> counter(0);

    // increment is guaranteed to be done atomically
    int result = ++counter;

    return result;
}

В Go мьютексы представлены структурой мьютексов в пакете кода стандартной библиотеки sync. Тип sync.Mutex имеет только два общедоступных метода указателя: Lock и Unlock. Первый используется для блокировки текущего мьютекса, а второй используется для разблокировки текущего мьютекса.

2. Переменные условия

Среди методов синхронизации потоков также есть метод синхронизации, который можно сравнить с мьютексом, условной переменной.

Переменные условия отличаются от мьютексов.Функция переменных условия состоит не в том, чтобы гарантировать, что только один поток одновременно обращается к определенным общим данным, а в том, чтобы уведомить другие потоки, которые заблокированы из-за изменений в состоянии соответствующих общих данных. Переменные условия всегда используются в сочетании с переменными мьютекса.

На самом деле такие проблемы довольно распространены. Давайте сначала возьмем пример производитель-потребитель.

Если вы не используете переменную условия, просто используйте мьютекс и посмотрите, что произойдет.

Пока поток-производитель не завершит операцию добавления, другие потоки-производители и потоки-потребители не могут выполнять операции. Один и тот же товар может потребляться только одним потребителем.

Если используются только мьютексы, могут возникнуть 2 проблемы.

    1. После того, как поток-производитель получает мьютекс, он обнаруживает, что товар заполнен, и не может добавлять новые товары. Так что поток будет ждать. Новые производители не могут войти в критическую секцию, как и потребители. На данный момент он заблокирован.
    1. После того, как поток-потребитель получает мьютекс, он обнаруживает, что товар пуст и не может быть использован. В это время поток также будет ожидать. Новые производители и потребители также недоступны. На этот раз тоже зашло в тупик.

Это проблема, которую нельзя решить только с помощью мьютексов. Между несколькими потоками срочно необходим механизм синхронизации, позволяющий этим потокам взаимодействовать.

Условная переменная — это знакомая операция P-V. Все должны быть знакомы с этим, поэтому давайте кратко рассмотрим его.

Операция P является операцией ожидания, что означает блокировку текущего потока до тех пор, пока он не получит уведомление от условной переменной.

Операция V является сигнальной операцией, что означает, что условная переменная отправляет уведомление по крайней мере одному потоку, ожидающему его уведомления, чтобы указать, что состояние некоторых общих данных изменилось.

Broadcast рассылает уведомление, что означает, что условная переменная отправляет уведомление всем потокам, ожидающим ее уведомления, чтобы указать, что состояние общих данных изменилось.

сигнал может срабатывать несколько раз, если он срабатывает 3 раза, это означает, что отправлено 3 уведомления о сигнале. Как показано выше.

Прелесть схемы операций P-V заключается в том, что количество операций P равно количеству операций V. Сколько раз ждать и сколько раз соответствует сигнал. Глядя на картинку выше, этот цикл такой замечательный.

проблема производителя-потребителя

Эту проблему можно описать наглядно, как на картинке выше, охранник охраняет безопасность критической секции. Билетная касса фиксирует текущее значение семафона, который также контролирует, открывает ли швейцар критическую секцию.

Только один поток может войти в критическую секцию, когда один поток уже есть, другой поток будет заблокирован. Билетная касса также записывает количество потоков, заблокированных в данный момент.

Когда предыдущий поток уйдет, билетная касса скажет швейцару разрешить потоку войти в критическую секцию.

Используйте псевдокод P-V для описания производителя-потребителя:

Исходная переменная:


semaphore  mutex = 1; // 临界区互斥信号量
semaphore  empty = n; // 空闲缓冲区个数
semaphore  full = 0; // 缓冲区初始化为空

Тема производителя:


producer()
{
  while(1) {
    produce an item in nextp;
    P(empty);
    P(mutex);
    add nextp to buffer;
    V(mutex);
    V(full);
  }
}

Потребительская нить:



consumer()
{
  while(1) {
    P(full);
    P(mutex);
    remove an item from buffer;
    V(mutex);
    V(empty);
    consume the item;
  }
}

Хотя P и V не являются парными в одной программе производителя и потребителя, P и V все же являются парными во всей программе.

Проблема чтения и записи: сначала читатели, потом писатели

Читатели имеют приоритет, а писатели задерживаются. Пока есть читатели, последующие читатели могут приходить и читать по желанию.

Читатель должен сначала ввести rmutex, проверить readcount, затем изменить значение readcout и, наконец, прочитать данные. Для каждого процесса чтения, который является записывающим, необходимо изменить значение readcount, поэтому взаимный исключительный доступ rmutex должен быть установлен отдельно.

Исходная переменная:


int readcount = 0;     // 读者数量
semaphore  rmutex = 1; // 保证更新 readcount 互斥
semaphore  wmutex = 1; // 保证读者和写着互斥的访问文件

Читательская ветка:


reader()
{
  while(1) {
    P(rmutex);              // 准备进入,修改 readcount,“开门”
    if(readcount == 0) {    // 说明是第一个读者
      P(wmutex);            // 拿到”钥匙”,阻止写线程来写
    }
    readcount ++;
    V(rmutex);
    reading;
    P(rmutex);              // 准备离开
    readcount --;
    if(readcount == 0) {    // 说明是最后一个读者
      V(wmutex);            // 交出”钥匙”,让写线程来写
    }
    V(rmutex);              // 离开,“关门”
  }
}

Ветка писателя:


writer()
{
  while(1) {
    P(wmutex);
    writing;
    V(wmutex);
  }
}

Проблема «читатель-писатель»: сначала писатели, потом читатели

Есть писатели, которые пишут, а последующим читателям читать запрещено. Читатели раньше писателя, уходят после прочтения. Пока есть ожидающие писатели, последующим читателям вход воспрещен.

Исходная переменная:


int readcount = 0;     // 读者数量
semaphore  rmutex = 1; // 保证更新 readcount 互斥
semaphore  wmutex = 1; // 保证读者和写着互斥的访问文件
semaphore  w = 1;      // 用于实现“写者优先”

Читательская ветка:


reader()
{
  while(1) {
    P(w);                   // 在没有写者的时候才能请求进入
    P(rmutex);              // 准备进入,修改 readcount,“开门”
    if(readcount == 0) {    // 说明是第一个读者
      P(wmutex);            // 拿到”钥匙”,阻止写线程来写
    }
    readcount ++;
    V(rmutex);
    V(w);
    reading;
    P(rmutex);              // 准备离开
    readcount --;
    if(readcount == 0) {    // 说明是最后一个读者
      V(wmutex);            // 交出”钥匙”,让写线程来写
    }
    V(rmutex);              // 离开,“关门”
  }
}

Ветка писателя:


writer()
{
  while(1) {
    P(w);
    P(wmutex);
    writing;
    V(wmutex);
    V(w);
  }
}

Обеденная проблема философа

Предположим, пять философов сидят за круглым обеденным столом и делают одно из двух: едят или думают. Когда они едят, они перестают думать, а когда они думают, они перестают есть. В середине стола стоит большая тарелка макарон, а между каждыми двумя философами — вилка. Поскольку макароны трудно есть одной вилкой, предполагается, что философы должны есть двумя вилками. Они могут использовать только две вилки слева и справа от них. Проблема обедающих философов также иногда описывается в терминах риса и палочек для еды, а не макарон и вилки, потому что ясно, что для того, чтобы есть рис, необходимы две палочки для еды.

Исходная переменная:


semaphore  chopstick[5] = {1,1,1,1,1}; // 初始化信号量
semaphore  mutex = 1;                  // 设置取筷子的信号量

Философская ветка:


Pi()
{
  do {
    P(mutex);                     // 获得取筷子的互斥量
    P(chopstick[i]);              // 取左边的筷子
    P(chopstick[ (i + 1) % 5 ]);  // 取右边的筷子
    V(mutex);                     // 释放取筷子的信号量
    eat;
    V(chopstick[i]);              // 放回左边的筷子
    V(chopstick[ (i + 1) % 5 ]);  // 放回右边的筷子
    think;
  }while(1);
}

Таким образом, мьютекс может защитить критическую секцию и предотвратить возникновение условий гонки. Переменные условия используются в качестве дополнения, чтобы сделать многостороннюю совместную работу более эффективной.

В стандартной библиотеке Go тип sync.Cond в пакете синхронизации представляет переменные условия. Но в отличие от мьютексов и блокировок чтения-записи, простое объявление не может создать пригодную для использования условную переменную, и требуется функция sync.NewCond.


func NewCond( l locker) *Cond

В коллекции методов типа *sync.Cond есть 3 метода, а именно Wait, Signal и Broadcast.

2. Простая схема блокировки резьбы

Самый простой способ реализовать потокобезопасное решение — это заблокировать.

Давайте сначала посмотрим, как реализовать потокобезопасный словарь в OC.

В исходном коде Weex реализован набор потокобезопасных словарей. Имя класса — WXThreadSafeMutableDictionary.


/**
 *  @abstract Thread safe NSMutableDictionary
 */
@interface WXThreadSafeMutableDictionary<KeyType, ObjectType> : NSMutableDictionary
@property (nonatomic, strong) dispatch_queue_t queue;
@property (nonatomic, strong) NSMutableDictionary* dict;
@end

Конкретная реализация выглядит следующим образом:


- (instancetype)initCommon
{
    self = [super init];
    if (self) {
        NSString* uuid = [NSString stringWithFormat:@"com.taobao.weex.dictionary_%p", self];
        _queue = dispatch_queue_create([uuid UTF8String], DISPATCH_QUEUE_CONCURRENT);
    }
    return self;
}

При инициализации потокобезопасного словаря создается параллельная очередь.


- (NSUInteger)count
{
    __block NSUInteger count;
    dispatch_sync(_queue, ^{
        count = _dict.count;
    });
    return count;
}

- (id)objectForKey:(id)aKey
{
    __block id obj;
    dispatch_sync(_queue, ^{
        obj = _dict[aKey];
    });
    return obj;
}

- (NSEnumerator *)keyEnumerator
{
    __block NSEnumerator *enu;
    dispatch_sync(_queue, ^{
        enu = [_dict keyEnumerator];
    });
    return enu;
}

- (id)copy{
    __block id copyInstance;
    dispatch_sync(_queue, ^{
        copyInstance = [_dict copy];
    });
    return copyInstance;
}

Все эти методы считывают использование dispatch_sync .


- (void)setObject:(id)anObject forKey:(id<NSCopying>)aKey
{
    aKey = [aKey copyWithZone:NULL];
    dispatch_barrier_async(_queue, ^{
        _dict[aKey] = anObject;
    });
}

- (void)removeObjectForKey:(id)aKey
{
    dispatch_barrier_async(_queue, ^{
        [_dict removeObjectForKey:aKey];
    });
}

- (void)removeAllObjects{
    dispatch_barrier_async(_queue, ^{
        [_dict removeAllObjects];
    });
}

Все методы, связанные с записью, используют dispatch_barrier_async.

Давайте посмотрим, как Go реализует простую поточно-ориентированную карту с мьютексами.

Поскольку должен использоваться мьютекс, мы инкапсулируем карту, содержащую мьютекс.


type MyMap struct {
    sync.Mutex
    m map[int]int
}

var myMap *MyMap

func init() {
    myMap = &MyMap{
        m: make(map[int]int, 100),
    }
}

Затем просто реализуйте базовый метод Map.


func builtinMapStore(k, v int) {
    myMap.Lock()
    defer myMap.Unlock()
    myMap.m[k] = v
}

func builtinMapLookup(k int) int {
    myMap.Lock()
    defer myMap.Unlock()
    if v, ok := myMap.m[k]; !ok {
        return -1
    } else {
        return v
    }
}

func builtinMapDelete(k int) {
    myMap.Lock()
    defer myMap.Unlock()
    if _, ok := myMap.m[k]; !ok {
        return
    } else {
        delete(myMap.m, k)
    }
}

Идея реализации относительно проста: блокировка добавляется перед каждой операцией, а разблокировка добавляется, когда каждая функция завершает отложенное выполнение.

Потокобезопасный словарь, реализованный с помощью этого метода блокировки, имеет то преимущество, что он относительно прост, но недостатком является невысокая производительность. В конце статьи будет проведено сравнение производительности нескольких методов реализации, а если говорить о цифрах, то вы узнаете, насколько плоха производительность на основе метода блокировки мьютекса.

В языках с нативными поточно-ориентированными картами их нативные базовые реализации не являются потокобезопасными из-за простой блокировки, например ConcurrentHashMap в Java и новый sync.map, добавленный в Go 1.9.

3. Современная поточно-ориентированная схема Lock-Free CAS

В базовой реализации Java ConcurrentHashMap используется большое количество технологий Lock-Free, таких как volatile, final и CAS, чтобы уменьшить влияние конкуренции блокировок на производительность.

Атомарные операции также широко используются в Go, и CAS — одна из них. Сравнить и поменять местами — это «Сравнить и поменять местами», или сокращенно CAS.


func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)

func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)

func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)

func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)

func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

CAS сначала определит, равно ли измененное значение, на которое указывает параметр addr, значению параметра old. Если оно равно, соответствующая функция заменит старое значение новым значением, представленным параметром new. В противном случае операция замены игнорируется.

Это явно отличается от блокировок мьютексов.CAS всегда предполагает, что обрабатываемое значение не изменилось, и как только подтверждается, что это предположение верно, значение немедленно заменяется. Практика мьютекса более осторожна, всегда предполагая, что будут параллельные операции по изменению обрабатываемого значения и необходимо использовать блокировки для размещения связанных операций в критической секции для защиты. Можно сказать, что практика мьютекса имеет тенденцию быть пессимистичной, а практика CAS — оптимистичной, аналогичной оптимистичной блокировке.

Самым большим преимуществом подхода CAS является то, что он может выполнять операции замены значений, безопасные для параллелизма, без создания мьютексов и критических секций. Это значительно снижает влияние операций синхронизации потоков на производительность программы. Конечно, у CAS есть и некоторые недостатки, о которых будет сказано в следующей главе.

Далее посмотрим, как реализован исходный код. В следующем примере в качестве примера используется 64-разрядная версия, а 32-разрядная аналогична.


TEXT ·CompareAndSwapUintptr(SB),NOSPLIT,$0-25
    JMP    ·CompareAndSwapUint64(SB)

TEXT ·CompareAndSwapInt64(SB),NOSPLIT,$0-25
    JMP    ·CompareAndSwapUint64(SB)

TEXT ·CompareAndSwapUint64(SB),NOSPLIT,$0-25
    MOVQ    addr+0(FP), BP
    MOVQ    old+8(FP), AX
    MOVQ    new+16(FP), CX
    LOCK
    CMPXCHGQ    CX, 0(BP)
    SETEQ    swapped+24(FP)
    RET

Наиболее важным шагом в приведенной выше реализации является CMPXCHG.

Запросить у IntelДокументация

В документации говорится:

Сравните значение eax и операнда-адресата (первого операнда), если они совпадают, флаг ZF установлен, а значение операнда-источника (второго операнда) записывается в операнд-адресат, в противном случае очищается
ZF и записать значение операнда назначения обратно в eax.

Таким образом, принцип работы CMPXCHG:

Сравните значение _OLD и (*__ptr), если они совпадают, установлен флаг zf,
Значение _new записывается в (*__ptr), в противном случае флаг ZF сбрасывается и значение (*__ptr) записывается обратно в _old.

На платформе Intel это будет реализовано с помощью LOCK CMPXCHG, где LOCK — это блокировка процессора.

Руководство Intel описывает префикс LOCK следующим образом:

    1. Гарантирует, что операции чтения-изменения-записи в память выполняются атомарно. В процессорах Pentium и предшествующих Pentium инструкции с префиксом LOCK блокируют шину во время выполнения, делая временно невозможным доступ других процессоров к памяти по шине. Очевидно, что это приведет к дорогостоящим накладным расходам. Начиная с процессоров Pentium 4, Intel Xeon и P6, Intel провела значимую оптимизацию, основанную на оригинальной блокировке шины: если область памяти, к которой нужно получить доступ, уже находится в процессоре во время выполнения префиксной инструкции LOCK. Если внутренняя кэш заблокирован (то есть строка кэша, содержащая область памяти, в настоящее время находится в монопольном или модифицированном состоянии), и область памяти полностью содержится в одной строке кэша, процессор выполнит инструкцию напрямую. Поскольку строка кэша всегда блокируется во время выполнения инструкции, другие процессоры не могут читать/записывать область памяти, к которой будет обращаться инструкция, поэтому атомарность выполнения инструкции может быть гарантирована. Этот процесс операции называется блокировкой кеша.Блокировка кеша значительно уменьшит накладные расходы на выполнение инструкций с префиксом LOCK, но когда уровень конкуренции между несколькими процессорами высок или адреса памяти, к которым обращаются инструкции, не выровнены, шина все равно будет заблокирована. .
    1. Изменение порядка этой инструкции с предыдущими и последующими инструкциями чтения и записи отключено.
    1. Сбросить все данные из буфера записи в память.

После прочтения описания видно, что существует два основных типа блокировок ЦП, блокировки шины и блокировки кэша. Блокировки шины используются в старых ЦП, а блокировки кэша — в новых ЦП.

Так называемая блокировка шины заключается в использовании сигнала LOCK#, предоставляемого ЦП.Когда процессор выводит этот сигнал на шину, запросы других процессоров будут заблокированы, поэтому ЦП может использовать только общую память. Метод блокировки шины блокирует шину во время выполнения, делая временно невозможным доступ других процессоров к памяти через шину. Следовательно, накладные расходы на блокировку шины относительно велики, и последние процессоры в некоторых случаях используют блокировку кэша вместо блокировки шины для оптимизации.

Так называемая «блокировка кэша» означает, что если область памяти, кэшированная в строке кэша процессора, заблокирована во время операции LOCK, когда он выполняет операцию блокировки для обратной записи памяти, процессор не генерирует на шине.
LOCK#, вместо этого он изменяет адрес внутренней памяти и позволяет механизму когерентности кэша гарантировать атомарность операции, поскольку механизм когерентности кэша предотвращает одновременную модификацию данных области памяти, кэшированных более чем двумя процессорами, когда другой процессор делает недействительной строку кэша, когда записывает обратно данные строки кэша, которая была заблокирована.

Есть две ситуации, в которых процессор не может использовать блокировки кэша.

  • Во-первых, когда обрабатываемые данные не могут быть кэшированы внутри процессора или обрабатываемые данные охватывают несколько строк кэша, процессор вызывает блокировку шины.

  • Второй случай: некоторые процессоры не поддерживают блокировку кеша. Некоторые старые ЦП вызывают блокировку шины, даже если заблокированная область памяти находится в строке кэша процессора.

Хотя блокировки кэша могут значительно снизить нагрузку на выполнение блокировок ЦП, если существует высокий уровень конкуренции между несколькими процессорами или адреса памяти, к которым обращаются инструкции, не выровнены, шина все равно будет заблокирована. Таким образом, блокировка кеша и блокировка шины взаимодействуют друг с другом, и эффект становится лучше.

Таким образом, использование CAS для обеспечения безопасности потоков намного эффективнее, чем использование мьютекса.

4. Дефекты КАС

Хотя эффективность CAS высока, все еще существуют три основные проблемы.

1. Проблема АВА

Поток 1 собирается использовать CAS для замены значения переменной с A на B. Перед этим поток 2 заменяет значение переменной с A на C, а затем с C на A. Затем, когда поток 1 выполняет CAS, он обнаруживает, что значение переменной по-прежнему равно A, поэтому CAS завершается успешно. Но на самом деле сцена в этот раз отличается от оригинала. Картинка также отличается, чтобы разделить две буквы А, поэтому она отмечена разными цветами. В конце концов поток 2 заменяет A на B . Это классическая проблема ABA. Но какие проблемы это вызывает у проекта?

Представьте, что есть такой связанный стек, в стеке хранится связанный список, вершина стека — A, а следующий указатель A указывает на B. В потоке 1 замените верхний элемент A на B на CAS. Затем приходит поток 2, и поток 2 выдает связанный список, содержащий элементы A и B. Затем push поступает в связанный список A-C-D, а верхний элемент стека по-прежнему A. В это время поток 1 обнаруживает, что A не изменился, поэтому его заменяет B. В это время следующее из B фактически равно нулю. После завершения замены связанный список C-D, управляемый потоком 2, отключается от заголовка. То есть, когда операция CAS потока 1 завершается, C-D теряется и больше никогда не может быть найден. В стеке остался только один элемент B. Это явно ошибка.

Так как решить эту ситуацию? Самый распространенный подход — добавить номер версии для идентификации.

Номер версии добавляется к каждой операции, так что проблема ABA может быть решена идеально.

2. Время цикла может быть слишком большим

Если прокрутка CAS будет безуспешной в течение длительного времени, это вызовет очень большие накладные расходы на выполнение ЦП. Если инструкция Pause, предоставляемая ЦП, может поддерживаться, эффективность CAS может быть в определенной степени повышена. Инструкция Pause выполняет две функции. Во-первых, она может задерживать конвейерное выполнение инструкции (de-pipeline), чтобы ЦП не потреблял слишком много ресурсов выполнения. Время задержки зависит от конкретной версии реализации. На некоторых процессорах задержка время равно нулю. Во-вторых, можно избежать очистки конвейера ЦП, вызванной нарушением порядка памяти при выходе из цикла, тем самым повышая эффективность выполнения ЦП.

3. Можно гарантировать только атомарные операции над общей переменной

Операции CAS могут гарантировать атомарность операций только с одной общей переменной, но гарантировать атомарность операций с несколькими общими переменными. Общей практикой может считаться использование замков.

Однако вы также можете использовать структуру для объединения двух переменных в одну переменную. Таким образом, CAS можно продолжать использовать для обеспечения атомарных операций.

5. Примеры Lock-Free решений

Прежде чем взять в качестве примера схему Lock-Free, давайте рассмотрим схему мьютекса. Выше мы реализовали поточно-ориентированную карту Go с использованием мьютексов. Что касается производительности этой карты, вы можете посмотреть на данные при сравнении.

1. NO Lock — Бесплатный план

Как реализовать потокобезопасный словарь без использования схемы Lock-Free и простой схемы мьютекса? Ответ заключается в использовании конструкции сегментных замков.Отношения гонки существуют только в пределах одного и того же сегмента, и между блокировками разных сегментов нет конкуренции замков. по сравнению со всем
Конструкция блокировки карты, блокировка сегмента значительно повышает производительность обработки в среде с высокой степенью параллелизма.



type ConcurrentMap []*ConcurrentMapShared


type ConcurrentMapShared struct {
    items        map[string]interface{}
    sync.RWMutex // 读写锁,保证进入内部 map 的线程安全
}

Блокировка сегмента Сегмент имеет степень параллелизма. Параллелизм можно понимать как максимальное количество потоков, которые могут одновременно обновлять ConccurentMap без конкуренции блокировок во время работы программы, что на самом деле является количеством блокировок сегментов в ConcurrentMap. То есть длина массива.


var SHARD_COUNT = 32

Если параллелизм установлен слишком маленьким, это вызовет серьезные проблемы с конкуренцией блокировок; если параллелизм установлен слишком большой, доступ, изначально расположенный в одном и том же сегменте, будет распространяться на другие сегменты, и частота попаданий в кэш ЦП уменьшится, что приведет к Падает производительность программы.

Инициализация ConcurrentMap заключается в инициализации массива и инициализации каждого словаря в массиве.


func New() ConcurrentMap {
    m := make(ConcurrentMap, SHARD_COUNT)
    for i := 0; i < SHARD_COUNT; i++ {
        m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
    }
    return m
}

ConcurrentMap в основном использует Segment для уменьшения гранулярности блокировок, и делит Map на несколько Segments, при ставе нужно добавлять блокировки чтения-записи, а при получении добавляются только блокировки чтения.

Теперь, когда он сегментирован, логика того, какому сегменту соответствует каждый ключ, определяется хеш-функцией.


func fnv32(key string) uint32 {
    hash := uint32(2166136261)
    const prime32 = uint32(16777619)
    for i := 0; i < len(key); i++ {
        hash *= prime32
        hash ^= uint32(key[i])
    }
    return hash
}

Вышеприведенная хэш-функция будет каждый раз вычислять другое значение хеш-функции на основе входящей строки.


func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
    return m[uint(fnv32(key))%uint(SHARD_COUNT)]
}

Возьмите остаток длины массива в соответствии с хеш-значением и выньте ConcurrentMapShared в ConcurrentMap. Сохраните ключ-значение, соответствующий этому сегменту, в ConcurrentMapShared.



func (m ConcurrentMap) Set(key string, value interface{}) {
    // Get map shard.
    shard := m.GetShard(key)
    shard.Lock()
    shard.items[key] = value
    shard.Unlock()
}

Приведенный выше абзац — это заданная операция ConcurrentMap. Идея очень понятна: сначала вынуть ConcurrentMapShared в соответствующем сегменте, затем добавить блокировку чтения-записи, записать ключ-значение и снять блокировку чтения-записи после успешной записи.


func (m ConcurrentMap) Get(key string) (interface{}, bool) {
    // Get shard
    shard := m.GetShard(key)
    shard.RLock()
    // Get item from shard.
    val, ok := shard.items[key]
    shard.RUnlock()
    return val, ok
}

Приведенный выше абзац — это операция получения ConcurrentMap. Идея также очень ясна: сначала выньте ConcurrentMapShared в соответствующем сегменте, затем добавьте блокировку чтения, прочитайте ключ-значение и снимите блокировку чтения после успешного чтения.

Разница между этой операцией и операцией set заключается в том, что требуются только блокировки чтения, блокировки чтения-записи не требуются.


func (m ConcurrentMap) Count() int {
    count := 0
    for i := 0; i < SHARD_COUNT; i++ {
        shard := m[i]
        shard.RLock()
        count += len(shard.items)
        shard.RUnlock()
    }
    return count
}

Операция Count ConcurrentMap заключается в обходе каждого элемента в каждом элементе сегмента массива ConcurrentMap для вычисления общего числа.


func (m ConcurrentMap) Keys() []string {
    count := m.Count()
    ch := make(chan string, count)
    go func() {
        // 遍历所有的 shard.
        wg := sync.WaitGroup{}
        wg.Add(SHARD_COUNT)
        for _, shard := range m {
            go func(shard *ConcurrentMapShared) {
                // 遍历所有的 key, value 键值对.
                shard.RLock()
                for key := range shard.items {
                    ch <- key
                }
                shard.RUnlock()
                wg.Done()
            }(shard)
        }
        wg.Wait()
        close(ch)
    }()

    // 生成 keys 数组,存储所有的 key
    keys := make([]string, 0, count)
    for k := range ch {
        keys = append(keys, k)
    }
    return keys
}

Вышеприведенное возвращает все ключи в ConcurrentMap, а результат загружается в массив строк.



type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}

func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) {
    shard := m.GetShard(key)
    shard.Lock()
    v, ok := shard.items[key]
    res = cb(ok, v, value)
    shard.items[key] = res
    shard.Unlock()
    return res
}

Приведенный выше код представляет собой операцию Upsert. Если он уже существует, обновите его. Если это новый элемент, используйте функцию UpsertCb, чтобы вставить новый. Идея состоит в том, чтобы сначала найти соответствующий сегмент по строке, а затем добавить блокировку чтения-записи. Здесь можно добавить только блокировки чтения-записи, потому что операции обновления и вставки требуют записи. Считайте значение, соответствующее ключу, а затем вызовите функцию UpsertCb, чтобы обновить результат до значения, соответствующего ключу. Наконец, снимите блокировку чтения-записи.

Здесь стоит отметить функцию UpsertCb, поскольку эта функция является обратным вызовом, который возвращает новый элемент для вставки в карту. Эта функция вызывается тогда и только тогда, когда блокировка чтения-записи заблокирована, поэтому ей нельзя разрешать пытаться читать другие значения ключей в той же карте. Потому что это приведет к взаимоблокировке потока. Причина взаимоблокировки в том, что sync.RWLock в Go не поддерживает повторный вход.

Посмотреть полный кодconcurrent_map.go

Хотя этот метод сегментации намного лучше, чем простое добавление мьютекса, поскольку сегментация еще больше уменьшает заблокированный диапазон, но этот диапазон все еще относительно велик, можем ли мы еще больше уменьшить блокировку?

Еще один момент — настройка параллелизма, которая должна быть разумной, ни слишком большой, ни слишком маленькой.

2. Блокировка — Бесплатный план

В версии Go 1.9 по умолчанию реализована поточно-безопасная карта, отказавшись от концепции Segment (блокировка сегмента), но позволив новый способ реализации, используя алгоритм CAS, схему Lock-Free.

После того, как будет принята схема Lock-Free, объем блокировки может быть сужен еще больше, чем при предыдущей блокировке разделения и сегмента. Производительность значительно улучшена.

Далее давайте посмотрим, как реализовать потокобезопасную высокопроизводительную карту с помощью CAS.

Официально sync.map имеет следующее описание:

Эта карта является потокобезопасной, а операции чтения, вставки и удаления поддерживают постоянную временную сложность. Кроме того, несколько горутин могут безопасно вызывать методы Map одновременно. Нулевое значение этой карты допустимо, а нулевое значение — это пустая карта. Поточно-безопасные Карты нельзя копировать после первого использования.

Вот объяснение того, почему его нельзя скопировать. Потому что копия структуры создает не только копию значения, но и копию полей в ней. В результате средства защиты параллельных потоков, которые должны применяться здесь, также неэффективны.

Присвоение другим переменным в качестве исходного значения, передача его в функцию в качестве значения параметра, возврат его из функции в качестве значения результата, передача его по каналу в качестве значения элемента и т. д. приведет к копированию значения. Правильнее всего использовать переменную типа указателя на этот тип.

Структура данных sync.map в Go 1.9 выглядит следующим образом:



type Map struct {

    mu Mutex

    // 并发读取 map 中一部分的内容是线程安全的,这是不需要
    // read 这部分自身读取就是线程安全的,因为是原子性的。但是存储的时候还是需要 Mutex
    // 存储在 read 中的 entry 在并发读取过程中是允许更新的,即使没有 Mutex 信号量,也是线程安全的。但是更新一个以前删除的 entry 就需要把值拷贝到 dirty Map 中,并且必须要带上 Mutex
    read atomic.Value // readOnly

    // dirty 中包含 map 中必须要互斥量 mu 保护才能线程安全的部分。为了使 dirty 能快速的转化成 read map,dirty 中包含了 read map 中所有没有被删除的 entries
    // 已经删除过的 entries 不存储在 dirty map 中。在 clean map 中一个已经删除的 entry 一定是没有被删除过的,并且当新值将要被存储的时候,它们会被添加到 dirty map 中。
    // 当 dirty map 为 nil 的时候,下一次写入的时候会通过 clean map 忽略掉旧的 entries 以后的浅拷贝副本来初始化 dirty map。
    dirty map[interface{}]*entry

    // misses 记录了 read map 因为需要判断 key 是否存在而锁住了互斥量 mu 进行了 update 操作以后的加载次数。
    // 一旦 misses 值大到足够去复制 dirty map 所需的花费的时候,那么 dirty map 就被提升到未被修改状态下的 read map,下次存储就会创建一个新的 dirty map。
    misses int
}

В этой карте есть мьютекс mu, прочитанное атомарное значение и карта словаря, не являющаяся потокобезопасной. Ключ этого словаря имеет тип интерфейса {}, а значение — тип *entry. Наконец, есть счетчик типа int.

Давайте сначала поговорим об атомарных значениях. Тип atomic.Value имеет два общедоступных метода указателя: Load и Store. Метод Load используется для атомарного чтения значения, хранящегося в экземпляре атомарного значения, он возвращает результат типа interface{} и не принимает аргументов. Метод Store используется для атомарного сохранения значения в экземпляре атомарного значения, он принимает параметр типа interface{} без каких-либо результатов. Метод Load экземпляра атомарного значения всегда будет возвращать nil до тех пор, пока значение не будет сохранено в экземпляре атомарного значения с помощью метода Store.

В этом потокобезопасном словаре и Load, и Store являются структурой данных, доступной только для чтения.


// readOnly 是一个不可变的结构体,原子性的存储在 Map.read 中
type readOnly struct {
    m map[interface{}]*entry
    // 标志 dirty map 中是否包含一些不在 m 中的 key 。
    amended bool // true if the dirty map contains some key not in m.
}

Не потокобезопасный словарь хранится в readOnly, и это точно такой же тип, как и словарь, хранящийся в грязной карте выше. Ключ имеет тип interface{}, а значение имеет тип *entry.


// entry 是一个插槽,与 map 中特定的 key 相对应
type entry struct {
    p unsafe.Pointer // *interface{}
}

Указатель p указывает на тип *interface{}, в котором хранится адрес записи. Если p \=\= nil, запись удаляется, а m.dirty \=\= nil. Если p \=\= expunged, это означает, что запись была удалена и m.dirty ! = nil , то запись отсутствует в m.dirty.

За исключением двух вышеуказанных случаев, запись действительна и записана в m.read.m[key], если m.dirty!= nil, запись сохраняется в m.dirty[key].

Запись может быть удалена атомарной заменой на nil. Когда m.dirty создается в следующий раз, запись будет автоматически заменена на nil удаленным указателем, а m.dirty[key] не соответствует никакому значению. Пока p != expunged, запись может обновить связанное значение с помощью операции атомарной замены. Если p \=\= expunged, то запись, которая хочет обновить связанное значение с помощью операции атомарной замены, может обновить значение только после установки m.dirty[key] = e в первый раз. Это сделано для того, чтобы его можно было найти на грязной карте.

Подводя итог, можно сказать, что структура данных sync.map такая же, как указано выше.

Давайте взглянем на некоторые операции поточно-ориентированного sync.map.


func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    // 如果 key 对应的 value 不存在,并且 dirty map 包含 read map 中没有的 key,那么开始读取  dirty map 
    if !ok && read.amended {
        // dirty map 不是线程安全的,所以需要加上互斥锁
        m.mu.Lock()
        // 当 m.dirty 被提升的时候,为了防止得到一个虚假的 miss ,所以此时我们加锁。
        // 如果再次读取相同的 key 不 miss,那么这个 key 值就就不值得拷贝到 dirty map 中。
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            e, ok = m.dirty[key]
            // 无论 entry 是否存在,记录这次 miss 。
            // 这个 key 将会缓慢的被取出,直到 dirty map 提升到 read map
            m.missLocked()
        }
        m.mu.Unlock()
    }
    if !ok {
        return nil, false
    }
    return e.load()
}

Приведенный выше код является операцией загрузки. Возвращается значение, соответствующее ключу входного параметра. Возвращает nil, если значение не существует. Грязная карта сохранит некоторые ключи, которых нет в карте чтения, а затем прочитает значение, соответствующее ключу в грязной карте. Обратите внимание, что вам нужно добавить мьютекс при чтении, потому что грязная карта не является потокобезопасной.


func (m *Map) missLocked() {
    m.misses++
    if m.misses < len(m.dirty) {
        return
    }
    m.read.Store(readOnly{m: m.dirty})
    m.dirty = nil
    m.misses = 0
}

Приведенный выше код предназначен для записи количества промахов. Только когда количество промахов превышает длину грязной карты, грязная карта будет сохранена в карте чтения. И пустой грязный, и количество промахов тоже очищается.

Прежде чем рассматривать операцию Store, давайте поговорим об удаленной переменной.


// expunged 是一个指向任意类型的指针,用来标记从 dirty map 中删除的 entry
var expunged = unsafe.Pointer(new(interface{}))

Переменная expunged — это указатель для отметки записей, которые были удалены из грязной карты.


func (m *Map) Store(key, value interface{}) {
    read, _ := m.read.Load().(readOnly)
    // 从 read map 中读取 key 失败或者取出的 entry 尝试存储 value 失败,直接返回
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return
    }

    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        // e 指向的是非 nil 的
        if e.unexpungeLocked() {
            // entry 先前被删除了,这就意味着存在一个非空的 dirty map 里面并没有存储这个 entry
            m.dirty[key] = e
        }
        // 使用 storeLocked 函数之前,必须保证 e 没有被清除
        e.storeLocked(&value)
    } else if e, ok := m.dirty[key]; ok {
        // 已经存储在 dirty map 中了,代表 e 没有被清除
        e.storeLocked(&value)
    } else {
        if !read.amended {
            // 到这个 else 中就意味着,当前的 key 是第一次被加到 dirty map 中。
            // store 之前先判断一下 dirty map 是否为空,如果为空,就把 read map 浅拷贝一次。
            m.dirtyLocked()
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        // 在 dirty 中存储 value
        m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
}

Store сначала считывает ключ из прочитанной карты, а затем сохраняет его значение. Если запись помечена как удаленная из грязной карты, ее необходимо сохранить обратно в грязную карту.

Если в считанной карте нет соответствующего ключа, перейти к грязной карте для чтения. Грязная карта напрямую хранит соответствующее значение.

В итоге ни прочитанная карта, ни грязная карта не имеют этого значения ключа, а это значит, что ключ добавляется в грязную карту впервые. Сохраните этот ключ и соответствующее значение в грязной карте.


// 当 entry 没有被删除的情况下去存储一个 value。
// 如果 entry 被删除了,tryStore 方法返回 false,并且保留 entry 不变
func (e *entry) tryStore(i *interface{}) bool {
    p := atomic.LoadPointer(&e.p)
    if p == expunged {
        return false
    }
    for {
        if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
            return true
        }
        p = atomic.LoadPointer(&e.p)
        if p == expunged {
            return false
        }
    }
}

Реализация функции tryStore аналогична принципу CAS. Она будет повторять цикл, чтобы определить, помечена ли запись как удаленная. Если запись успешно заменена на i после операции CAS, она вернет true, в противном случае, если она помечен как удаленный, он вернет false.



// unexpungeLocked 函数确保了 entry 没有被标记成已被清除。
// 如果 entry 先前被清除过了,那么在 mutex 解锁之前,它一定要被加入到 dirty map 中
func (e *entry) unexpungeLocked() (wasExpunged bool) {
    return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

Если unexpungeLocked записи возвращает значение true, то запись была помечена как удаленная, после чего для нее будет установлено значение nil с помощью операции CAS.

Давайте посмотрим на реализацию операции удаления.


func (m *Map) Delete(key interface{}) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        // 由于 dirty map 是非线程安全的,所以操作前要加锁
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            // 删除 dirty map 中的 key
            delete(m.dirty, key)
        }
        m.mu.Unlock()
    }
    if ok {
        e.delete()
    }
}

Реализация операции удаления относительно проста.Если ключ существует в карте чтения, его можно удалить напрямую.Если ключ не существует, а ключ существует в грязной карте, то ключ в грязной карте необходимо удалить . При работе с грязной картой не забудьте добавить блокировку для ее защиты.


func (e *entry) delete() (hadValue bool) {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == nil || p == expunged {
            return false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, nil) {
            return true
        }
    }
}

Конкретная реализация удаления записи описана выше. Эта операция полностью атомарна. Цикл оценивает, является ли запись нулевой или отмечена как удаленная, и в этом случае возвращает false, указывая на то, что удаление не удалось. В противном случае выполняется операция CAS, указатель p записи устанавливается в nil и возвращается значение true, указывающее, что удаление прошло успешно.

До сих пор была проанализирована реализация потокобезопасного sync.map, поставляемого с Go 1.9. Официальная реализация в принципе не использует блокировки, и блокировка мьютекса тоже основана на CAS. карта чтения также атомарна. Поэтому производительность улучшена по сравнению с предыдущей версией блокировки.

Насколько сильна производительность Lock-Free? Далее проведите тест производительности.

5. Сравнение производительности

Тест производительности в основном фокусируется на 3 аспектах: вставка, получение, удаление. Тестовые объекты в основном предназначены для проверки производительности собственной карты с простой блокировкой взаимного исключения, карты с сегментированной блокировкой и карты без блокировки.

Весь код для теста производительности выложен на github, адрес находится по адресуздесь, инструкции по тестированию производительности:


go test -v -run=^$ -bench . -benchmem

1. Вставить Вставить тест производительности


// 插入不存在的 key (粗糙的锁)
func BenchmarkSingleInsertAbsentBuiltInMap(b *testing.B) {
    myMap = &MyMap{
        m: make(map[string]interface{}, 32),
    }
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        myMap.BuiltinMapStore(strconv.Itoa(i), "value")
    }
}

// 插入不存在的 key (分段锁)
func BenchmarkSingleInsertAbsent(b *testing.B) {
    m := New()
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        m.Set(strconv.Itoa(i), "value")
    }
}

// 插入不存在的 key (syncMap)
func BenchmarkSingleInsertAbsentSyncMap(b *testing.B) {
    syncMap := &sync.Map{}
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        syncMap.Store(strconv.Itoa(i), "value")
    }
}

Результаты теста:


BenchmarkSingleInsertAbsentBuiltInMap-4          2000000           857 ns/op         170 B/op           1 allocs/op
BenchmarkSingleInsertAbsent-4                    2000000           651 ns/op         170 B/op           1 allocs/op
BenchmarkSingleInsertAbsentSyncMap-4             1000000          1094 ns/op         187 B/op           5 allocs/op

Экспериментальный результат состоит в том, что производительность блокировки сегмента является самой высокой. Результаты теста объясняются здесь: -4 означает, что в тесте используется 4-ядерный ЦП, 2000000 означает количество циклов, 857 нс/оп означает среднее время, затрачиваемое на каждое выполнение, а 170 Б/оп означает память, выделенную на куча для каждого выполнения.Общее, allocs/op представляет собой количество раз, когда память выделяется в куче за выполнение.

Таким образом, чем больше количество циклов, тем меньше времени требуется, чем меньше общее количество выделенной памяти, чем меньше количество выделенной памяти, тем выше производительность. Первый столбец количества циклов удален из приведенной ниже диаграммы производительности, и потрачены только оставшиеся 3 элемента, поэтому чем короче столбец, тем лучше производительность. Правила и результаты тестов каждой из следующих гистограмм имеют те же значения, что и здесь, и не будут повторяться ниже.


// 插入存在 key (粗糙锁)
func BenchmarkSingleInsertPresentBuiltInMap(b *testing.B) {
    myMap = &MyMap{
        m: make(map[string]interface{}, 32),
    }
    myMap.BuiltinMapStore("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        myMap.BuiltinMapStore("key", "value")
    }
}

// 插入存在 key (分段锁)
func BenchmarkSingleInsertPresent(b *testing.B) {
    m := New()
    m.Set("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        m.Set("key", "value")
    }
}

// 插入存在 key (syncMap)
func BenchmarkSingleInsertPresentSyncMap(b *testing.B) {
    syncMap := &sync.Map{}
    syncMap.Store("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        syncMap.Store("key", "value")
    }
}

Результаты теста:


BenchmarkSingleInsertPresentBuiltInMap-4        20000000            74.6 ns/op           0 B/op           0 allocs/op
BenchmarkSingleInsertPresent-4                  20000000            61.1 ns/op           0 B/op           0 allocs/op
BenchmarkSingleInsertPresentSyncMap-4           20000000           108 ns/op          16 B/op           1 allocs/op

Как видно из рисунка, sync.map хуже, чем два других, когда дело доходит до Store. Независимо от вставки несуществующего ключа или существующего ключа, производительность сегментированных замков в настоящее время является наилучшей.

2. Прочтите тест производительности Get


// 读取存在 key (粗糙锁)
func BenchmarkSingleGetPresentBuiltInMap(b *testing.B) {
    myMap = &MyMap{
        m: make(map[string]interface{}, 32),
    }
    myMap.BuiltinMapStore("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        myMap.BuiltinMapLookup("key")
    }
}

// 读取存在 key (分段锁)
func BenchmarkSingleGetPresent(b *testing.B) {
    m := New()
    m.Set("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        m.Get("key")
    }
}

// 读取存在 key (syncMap)
func BenchmarkSingleGetPresentSyncMap(b *testing.B) {
    syncMap := &sync.Map{}
    syncMap.Store("key", "value")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        syncMap.Load("key")
    }
}

Результаты теста:


BenchmarkSingleGetPresentBuiltInMap-4           20000000            71.5 ns/op           0 B/op           0 allocs/op
BenchmarkSingleGetPresent-4                     30000000            42.3 ns/op           0 B/op           0 allocs/op
BenchmarkSingleGetPresentSyncMap-4              30000000            40.3 ns/op           0 B/op           0 allocs/op

Как видно из рисунка, производительность sync.map в Load очень хорошая, намного выше, чем у двух других.

3. Смешанный тест производительности с одновременной вставкой и чтением

Следующая реализация включает параллельные операции вставки и чтения. Из-за специфики реализации блокировки сегментов количество сегментов будет в некоторой степени влиять на производительность, поэтому в следующем эксперименте будет протестирована блокировка сегментов в 4 сегментах по 1, 16, 32 и 256, чтобы увидеть производительность соответственно. он изменяется, две другие поточно-ориентированные Карты не изменяются.

Так как параллельных кодов слишком много, я не буду их здесь выкладывать, желающие могут их прочитать.здесь

Результаты теста публикуются непосредственно ниже:

Параллельная вставка несуществующего значения ключа


BenchmarkMultiInsertDifferentBuiltInMap-4        1000000          2359 ns/op         330 B/op          11 allocs/op
BenchmarkMultiInsertDifferent_1_Shard-4          1000000          2039 ns/op         330 B/op          11 allocs/op
BenchmarkMultiInsertDifferent_16_Shard-4         1000000          1937 ns/op         330 B/op          11 allocs/op
BenchmarkMultiInsertDifferent_32_Shard-4         1000000          1944 ns/op         330 B/op          11 allocs/op
BenchmarkMultiInsertDifferent_256_Shard-4        1000000          1991 ns/op         331 B/op          11 allocs/op
BenchmarkMultiInsertDifferentSyncMap-4           1000000          3760 ns/op         635 B/op          33 allocs/op

Как видно из рисунка, sync.map хуже, чем два других, когда дело доходит до Store. При одновременной вставке несуществующих ключей количество сегментов, деленное на блокировки сегментов, не имеет никакого отношения к производительности.

Параллельная вставка существующих ключевых значений


BenchmarkMultiInsertSameBuiltInMap-4             1000000          1182 ns/op         160 B/op          10 allocs/op
BenchmarkMultiInsertSame-4                       1000000          1091 ns/op         160 B/op          10 allocs/op
BenchmarkMultiInsertSameSyncMap-4                1000000          1809 ns/op         480 B/op          30 allocs/op

Как видно из рисунка, sync.map хуже, чем два других, когда дело доходит до Store.

Чтение существующего значения ключа одновременно


BenchmarkMultiGetSameBuiltInMap-4                2000000           767 ns/op           0 B/op           0 allocs/op
BenchmarkMultiGetSame-4                          3000000           481 ns/op           0 B/op           0 allocs/op
BenchmarkMultiGetSameSyncMap-4                   3000000           464 ns/op           0 B/op           0 allocs/op

Как видно из рисунка, производительность sync.map в Load намного лучше, чем у двух других.

Параллельная вставка считывает несуществующее значение ключа


BenchmarkMultiGetSetDifferentBuiltInMap-4        1000000          3281 ns/op         337 B/op          12 allocs/op
BenchmarkMultiGetSetDifferent_1_Shard-4          1000000          3007 ns/op         338 B/op          12 allocs/op
BenchmarkMultiGetSetDifferent_16_Shard-4          500000          2662 ns/op         337 B/op          12 allocs/op
BenchmarkMultiGetSetDifferent_32_Shard-4         1000000          2732 ns/op         337 B/op          12 allocs/op
BenchmarkMultiGetSetDifferent_256_Shard-4        1000000          2788 ns/op         339 B/op          12 allocs/op
BenchmarkMultiGetSetDifferentSyncMap-4            300000          8990 ns/op        1104 B/op          34 allocs/op

Как видно из рисунка, sync.map хуже, чем два других, когда дело доходит до Store. При одновременной вставке и чтении несуществующего ключа количество сегментов, деленное на блокировки сегментов, не имеет никакого отношения к производительности.

Параллельная вставка считывает существующее значение ключа


BenchmarkMultiGetSetBlockBuiltInMap-4            1000000          2095 ns/op         160 B/op          10 allocs/op
BenchmarkMultiGetSetBlock_1_Shard-4              1000000          1712 ns/op         160 B/op          10 allocs/op
BenchmarkMultiGetSetBlock_16_Shard-4             1000000          1730 ns/op         160 B/op          10 allocs/op
BenchmarkMultiGetSetBlock_32_Shard-4             1000000          1645 ns/op         160 B/op          10 allocs/op
BenchmarkMultiGetSetBlock_256_Shard-4            1000000          1619 ns/op         160 B/op          10 allocs/op
BenchmarkMultiGetSetBlockSyncMap-4                500000          2660 ns/op         480 B/op          30 allocs/op

Как видно из рисунка, sync.map хуже, чем два других, когда дело доходит до Store. При одновременной вставке и чтении существующих ключей чем меньше сегмент делится на блокировку сегмента, тем выше производительность!

4. Удалить тест производительности Удалить


// 删除存在 key (粗糙锁)
func BenchmarkDeleteBuiltInMap(b *testing.B) {
    myMap = &MyMap{
        m: make(map[string]interface{}, 32),
    }
    b.RunParallel(func(pb *testing.PB) {
        r := rand.New(rand.NewSource(time.Now().Unix()))
        for pb.Next() {
            // The loop body is executed b.N times total across all goroutines.
            k := r.Intn(100000000)
            myMap.BuiltinMapDelete(strconv.Itoa(k))
        }
    })
}

// 删除存在 key (分段锁)
func BenchmarkDelete(b *testing.B) {
    m := New()
    b.RunParallel(func(pb *testing.PB) {
        r := rand.New(rand.NewSource(time.Now().Unix()))
        for pb.Next() {
            // The loop body is executed b.N times total across all goroutines.
            k := r.Intn(100000000)
            m.Remove(strconv.Itoa(k))
        }
    })
}

// 删除存在 key (syncMap)
func BenchmarkDeleteSyncMap(b *testing.B) {
    syncMap := &sync.Map{}
    b.RunParallel(func(pb *testing.PB) {
        r := rand.New(rand.NewSource(time.Now().Unix()))
        for pb.Next() {
            // The loop body is executed b.N times total across all goroutines.
            k := r.Intn(100000000)
            syncMap.Delete(strconv.Itoa(k))
        }
    })
}

Результаты теста:


BenchmarkDeleteBuiltInMap-4                     10000000           130 ns/op           8 B/op           1 allocs/op
BenchmarkDelete-4                               20000000            76.7 ns/op           8 B/op           1 allocs/op
BenchmarkDeleteSyncMap-4                        30000000            45.4 ns/op           8 B/op           0 allocs/op

Как видно из рисунка, sync.map идеально подходит для элемента Delete по сравнению с двумя другими.

6. Резюме

Эта статья начинается с теоретических основ безопасности потоков и рассказывает о некоторых методах обработки в безопасности потоков. Что включает в себя знание мьютексов и условных переменных. Из решения Lock я рассказал о решениях Lock-Free, связанных с CAS. Наконец, анализ исходного кода и тест производительности были проведены для sync.map, недавно добавленного в Go 1.9.

Результаты теста sync.map по схеме Lock-Free не так хороши, как ожидалось. За исключением Load и Delete, которые далеки от двух других, производительность всех операций, связанных с Store, ниже, чем реализация двух других Maps. Но тому есть причина.

Глядя на изменения Java ConcurrentHashmap полностью:

ConcurrentHashmap в JDK 6,7 в основном использует сегмент для уменьшения детализации блокировки, делит HashMap на несколько сегментов, требует блокировки сегмента при размещении, не блокирует при получении, использует volatile для обеспечения видимости, когда требуется статистика Когда он глобальный (например, size), он сначала попытается несколько раз вычислить modcount, чтобы определить, изменили ли другие потоки операцию в этих попытках, и если нет, то напрямую вернет размер. Если есть, вам нужно заблокировать все Сегменты по очереди для расчета.

В ConcurrentHashmap в JDK 7 при слишком большой длине коллизии будут очень частыми, а операции добавления, модификации, удаления и проверки связанного списка будут занимать много времени и сказываться на производительности, поэтому concurrentHashmap полностью переписан в JDK8, и количество кода меняется от исходного более чем на 1000 строк, стало более чем на 6000 строк, и реализация также сильно отличается от оригинального сегментированного хранилища.

Основные изменения дизайна ConcurrentHashmap в JDK 8 заключаются в следующем:

  • Вместо использования сегментов используются узлы, а узлы блокируются, чтобы уменьшить степень детализации блокировки.
  • Состояние MOVED разработано. Когда поток 2 все еще помещает данные в процесс изменения размера, поток 2 помогает изменить размер.
  • Используйте 3 операции CAS, чтобы обеспечить атомарность некоторых операций на узле, этот способ заменяет блокировку.
  • Разные значения sizeCtl представляют разные значения и играют контролирующую роль.

Видно, что как только появилась первая версия Go 1.9, она напрямую отказалась от подхода Segment и приняла схему CAS без блокировки для повышения производительности. Но он не соответствует дизайну Java, подобному Node, для всего словаря. Но весь sync.map в три раза быстрее обычного нативного не потокобезопасного Map по трем показателям производительности ns/op, B/op и allocs/op!

Тем не менее, я считаю, что Google должен продолжить оптимизацию этой части, ведь в исходном коде есть еще несколько TODO, давайте займемся разработкой других будущих версий Go, автор будет продолжать уделять этому внимание.

(Во время крайнего срока для этой статьи автор внезапно обнаружил реализацию Map сегментированной блокировки, которая имеет более высокую производительность и такие функции, как балансировка нагрузки. Это должен быть поток, реализованный на языке Go с лучшей производительностью, которую я видел. пока Safe Map, анализ исходного кода его реализации могу написать только отдельно в следующем посте в блоге или проанализировать позже, когда у меня будет время)


Ссылка:
«Перейти к практическому программированию параллелизма»
Split-Ordered Lists: Lock-Free Extensible Hash Tables
Semaphores are Surprisingly Versatile
потокобезопасность
Углубленный анализ принципа JAVA CAS
Резюме Java ConcurrentHashMap

Репозиторий GitHub:Halfrost-Field

Follow: halfrost · GitHub

Source: HAL frost.com/go_flatter_poor обзор…