ConcurrentSkipListMap от JUC

Java

До сих пор мы видели две структуры данных, которые реализуют ключ-значение в мире Java: Hash и TreeMap, каждая из которых имеет свои преимущества и недостатки.

  1. Хеш-таблица: вставка и поиск являются самыми быстрыми, что составляет O (1); если используется связанный список, он не может обеспечить блокировку; упорядочение данных требует явной операции сортировки.
  2. Красно-черное дерево: вставка и поиск O(logn), но постоянный член мал; сложность реализации без блокировки очень высока и обычно требует блокировки; данные естественным образом упорядочены. Однако на этот раз мы представляем третью структуру данных, реализующую ключ-значение: SkipList. SkipList имеет не меньшую эффективность, чем красно-черное дерево, но его принцип и сложность реализации намного проще красно-черного дерева.

SkipList

Что такое скиплист? Список пропуска, называемый списком пропуска, представляет собой структуру данных, которая может заменить сбалансированное дерево, и его элементы данных по умолчанию расположены в порядке возрастания значения ключа, что естественно упорядочено. Список пропуска распределяет отсортированные данные в многоуровневом связанном списке и решает, поднимаются ли данные вверх или нет, со случайным числом от 0 до 1. С помощью алгоритма «пространства во времени» каждый узел добавляет вперед указатели могут игнорировать некоторые узлы, которые не могут быть задействованы при вставке, удалении и поиске, тем самым повышая эффективность.

Давайте сначала посмотрим на простой связанный список, как показано ниже:

Если нам нужно запросить 9, 21 и 30, необходимое количество сравнений равно 3 + 6 + 8 = 17. Есть ли решение по оптимизации? имеют! Мы извлекаем некоторые элементы из связанного списка в качестве «индекса» сравнения следующим образом:

Сначала мы сравниваем с этими индексами, чтобы определить, должен ли следующий элемент идти вправо или вниз.Благодаря существованию «индексов» количество сравнений во время поиска будет значительно уменьшено. Конечно, элементов не так много, поэтому сложно отразить преимущества, когда элементов достаточно, такая индексная структура будет очень полезна.

Особенности SkipList

SkipList имеет следующие функции:

  1. Он состоит из множества слоевых структур, а уровень генерируется случайным образом через определенную вероятность
  2. Каждый слой представляет собой упорядоченный связанный список, который по умолчанию находится в порядке возрастания или может быть отсортирован в соответствии с Компаратором, предоставленным при создании карты, в зависимости от используемого конструктора.
  3. Самый нижний (уровень 1) связанный список содержит все элементы
  4. Если элемент появляется в связанном списке уровня i, он также появится в связанном списке ниже уровня i.
  5. Каждый узел содержит два указателя: один на следующий элемент в том же связанном списке и один на элемент на уровень ниже.

Мы можем сделать несколько расширений к изображению выше, чтобы превратить его в типичную структуру SkipList.

Поиск по списку пропуска

Алгоритм поиска SkipListd относительно прост.Для вышеизложенного мы хотим найти элемент 21, процесс выглядит следующим образом:

  1. Сравните 3, если оно больше 3, ищите (9) позже,
  2. Если он больше 9, продолжайте искать (25), но если он меньше 25, начните искать (16) со следующего слоя из 9.
  3. Задний узел 16 все еще 25, затем продолжите поиск со следующего слоя 16
  4. найдено 21

Красная пунктирная линия обозначает путь.

Вставка SkipList

Операция вставки SkipList в основном включает:

  1. Найдите подходящее место. Здесь должно быть ясно, что при подтверждении уровня K, который будет занят новым узлом, принимается метод подбрасывания монеты, который является полностью случайным. Если занятый уровень K больше уровня связанного списка, повторно подать заявку на новый уровень, в противном случае вставить указанный уровень
  2. Подать заявку на новый узел
  3. Настройте указатель

Предполагая, что элемент, который мы хотим вставить, имеет номер 23, после поиска мы можем подтвердить, что он находится до 25 и после 9, 16 и 21. Конечно, необходимо учитывать уровень K приложения.

если уровень К > 3

Необходимо подать заявку на новый уровень (Уровень 4)

Если уровень К = 2

Вы можете вставить его прямо на слой уровня 2

Здесь будет задействован алгоритм: подбрасыванием монеты определяется уровень K. Этот алгоритм анализируется исходным кодом ConcurrentSkipListMap позже. Еще один момент, который следует отметить, заключается в том, что после вставки элементов на уровне K вам необходимо убедиться, что все уровни, меньшие уровня K, должны иметь новые узлы.

Удаление SkipList

Идея удаления узла и вставки узла в основном одинакова: найти узел, удалить узел и настроить указатель.

Например, удалите узел 9 следующим образом:

ConcurrentSkipListMap

Из вышеизложенного мы знаем, что SkipList использует алгоритм пространства-в-времени, а его эффективность вставки и поиска составляет O(logn), что не ниже, чем у красно-черного дерева, но его принцип и сложность реализации намного проще, чем красно-черное дерево. Вообще говоря, если вы будете работать со связанным списком List, на SkipList не будет давления.

ConcurrentSkipListMap использует для внутреннего использования структуру данных SkipLis. Для реализации SkipList ConcurrentSkipListMap предоставляет три внутренних класса для построения такой структуры связанного списка: Node, Index, HeadIndex. Среди них Node представляет самый нижний упорядоченный узел односвязного списка, Index представляет индексный уровень на основе Node, а HeadIndex используется для поддержки индексного уровня. На данный момент мы можем сказать, что ConcurrentSkipListMap поддерживает уровень индекса через HeadIndex, выполняет поиск от верхнего уровня к нижнему уровню через индекс и шаг за шагом сужает область запроса. часть данных необходимо сравнить. Отношения в JDK следующие:

Node

static final class Node<K,V> {
    final K key;
    volatile Object value;
    volatile ConcurrentSkipListMap.Node<K, V> next;

    /** 省略些许代码 */
}

Структура Node ничем не отличается от обычного односвязного списка, ключ-значение и следующая точка для следующего узла.

Index

static class Index<K,V> {
    final ConcurrentSkipListMap.Node<K,V> node;
    final ConcurrentSkipListMap.Index<K,V> down;
    volatile ConcurrentSkipListMap.Index<K,V> right;

    /** 省略些许代码 */
}

Индекс предоставляет узел индекса, основанный на узлах узла, правый указатель указывает на следующий индекс, а нижний узел указывает на более низкий уровень.

HeadIndex

static final class HeadIndex<K,V> extends Index<K,V> {
    final int level;  //索引层,从1开始,Node单链表层为0
    HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
        super(node, down, right);
        this.level = level;
    }
}

Внутри HeadIndex уровень используется для определения уровня.

ConcurrentSkipListMap предоставляет четыре конструктора, каждый из которых будет вызывать метод initialize() для инициализации.

final void initialize() {
    keySet = null;
    entrySet = null;
    values = null;
    descendingMap = null;
    randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
    head = new ConcurrentSkipListMap.HeadIndex<K,V>(new ConcurrentSkipListMap.Node<K,V>(null, BASE_HEADER, null),
            null, null, 1);
}

Обратите внимание, что метод initialize() вызывается не только в конструкторе, таком как clone, clear и readObject, он будет вызываться для этапов инициализации. Здесь нужно обратить внимание на инициализацию randomSeed.

private transient int randomSeed;
randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero

randomSeed Простой генератор случайных чисел (описан позже).

поставить операцию

CoucurrentSkipListMap предоставляет метод put() для связывания указанного значения с указанным ключом в этой карте. Исходный код выглядит следующим образом:

public V put(K key, V value) {
    if (value == null)
        throw new NullPointerException();
    return doPut(key, value, false);
}

Сначала оцените, является ли значение нулевым, выдайте исключение NullPointerException, в противном случае вызовите метод doPut. Фактически, если вы видели исходный код JDK, вы должны быть знакомы с этой операцией. Многие методы в исходном коде JDK предназначены для выполнения необходимой проверки Сначала выполняется настоящая операция, вызывая метод do**().

В методе doPut() много содержимого, и мы разбираем его шаг за шагом.

private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node<K,V> z;             // added node
    if (key == null)
        throw new NullPointerException();
    // 比较器
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        for (Node<K, V> b = findPredecessor(key, cmp), n = b.next; ; ) {

        /** 省略代码 */

Метод doPut() имеет три параметра.В дополнение к ключу и значению существует также логический тип onlyIfAbsent, который используется для выполнения действия, если текущий ключ существует. Когда onlyIfAbsent ложно, замените значение, когда оно истинно, верните значение. Интерпретируется в коде как:

 if (!map.containsKey(key))
    return map.put(key, value);
else
     return map.get(key);

Сначала определите, является ли ключ нулевым, если он нулевой, выдайте NullPointerException,Отсюда мы можем подтвердить, что ConcurrentSkipList не поддерживает ключ или значение равно null.. Затем вызовите метод findPredecessor(), передав ключ для подтверждения местоположения. Метод findPredecessor() на самом деле предназначен для подтверждения позиции, в которую должен быть вставлен ключ.

private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
     if (key == null)
         throw new NullPointerException(); // don't postpone errors
     for (;;) {
         // 从head节点开始,head是level最高级别的headIndex
         for (Index<K,V> q = head, r = q.right, d;;) {

             // r != null,表示该节点右边还有节点,需要比较
             if (r != null) {
                 Node<K,V> n = r.node;
                 K k = n.key;
                 // value == null,表示该节点已经被删除了
                 // 通过unlink()方法过滤掉该节点
                 if (n.value == null) {
                     //删掉r节点
                     if (!q.unlink(r))
                         break;           // restart
                     r = q.right;         // reread r
                     continue;
                 }

                 // value != null,节点存在
                 // 如果key 大于r节点的key 则往前进一步
                 if (cpr(cmp, key, k) > 0) {
                     q = r;
                     r = r.right;
                     continue;
                 }
             }

             // 到达最右边,如果dowm == null,表示指针已经达到最下层了,直接返回该节点
             if ((d = q.down) == null)
                 return q.node;
             q = d;
             r = d.right;
         }
     }
 }

Метод findPredecessor() имеет очень ясное значение: найти предшественников. Начните с headIndex самого высокого уровня и сравнивайте его шаг за шагом вправо, пока правый не станет нулевым или ключ узла справа больше текущего ключа, затем посмотрите вниз и повторите процесс по очереди, пока значение вниз равно нулю, то есть найден предшественник.Глядя на возвращенный результат, обратите внимание, что это узел, а не элемент, поэтому позиция вставки должна быть самой нижней точкой списка узлов.

В этом процессе ConcurrentSkipListMap дает этому методу другую функцию, то есть оценивая, является ли значение узла нулевым, если оно равно нулю, это означает, что узел был удален, и узел удаляется путем вызова функции unlink(). метод.

final boolean unlink(Index<K,V> succ) {
    return node.value != null && casRight(succ, succ.right);
}

Процесс удаления узла очень прост, достаточно изменить правильный указатель.

Что вы делаете после нахождения узла-предшественника с помощью findPredecessor()? смотреть вниз:

for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
       // 前辈节点的next != null
       if (n != null) {
           Object v; int c;
           Node<K,V> f = n.next;

           // 不一致读,主要原因是并发,有节点捷足先登
           if (n != b.next)               // inconsistent read
               break;

           // n.value == null,该节点已经被删除了
           if ((v = n.value) == null) {   // n is deleted
               n.helpDelete(b, f);
               break;
           }

           // 前辈节点b已经被删除
           if (b.value == null || v == n) // b is deleted
               break;

           // 节点大于,往前移
           if ((c = cpr(cmp, key, n.key)) > 0) {
               b = n;
               n = f;
               continue;
           }

           // c == 0 表示,找到一个key相等的节点,根据onlyIfAbsent参数来做判断
           // onlyIfAbsent ==false,则通过casValue,替换value
           // onlyIfAbsent == true,返回该value
           if (c == 0) {
               if (onlyIfAbsent || n.casValue(v, value)) {
                   @SuppressWarnings("unchecked") V vv = (V)v;
                   return vv;
               }
               break; // restart if lost race to replace value
           }
           // else c < 0; fall through
       }

       // 将key-value包装成一个node,插入
       z = new Node<K,V>(key, value, n);
       if (!b.casNext(n, z))
           break;         // restart if lost race to append to b
       break outer;
   }

Найдя подходящее место, вставьте узел в это место. Процесс вставки узла относительно прост, то есть ключ-значение помещается в узел, а затем добавляется в связанный список с помощью метода casNext(). Конечно, перед установкой необходимо выполнить ряд проверочных работ.

Каков следующий шаг после вставки узла на самом низком уровне? Новый индекс. Предыдущий блоггер упомянул, что при вставке узла уровень, на который вставляется новый узел, будет определяться подбрасыванием монеты.Из-за возможности параллелизма ConcurrentSkipListMap использует ThreadLocalRandom для генерации случайных чисел. следующее:

int rnd = ThreadLocalRandom.nextSecondarySeed();

Идея подбрасывания монеты для определения уровня очень проста, то есть, если монета выпала орлом, уровень +1 достигается подбрасыванием монеты, в противном случае он останавливается следующим образом:

// 抛硬币决定层次
while (((rnd >>>= 1) & 1) != 0)
    ++level;

При описании узлов вставки SkipList поясняется, что определенный уровень будет обрабатываться в двух случаях: во-первых, если уровень больше самого большого уровня, необходимо добавить новый слой, в противном случае он будет на соответствующем уровне. и меньше, чем уровень Уровень выполняет обработку добавления узлов.

level <= headIndex.level

// 如果决定的层次level比最高层次head.level小,直接生成最高层次的index
// 由于需要确认每一层次的down,所以需要从最下层依次往上生成
if (level <= (max = h.level)) {
    for (int i = 1; i <= level; ++i)
        idx = new ConcurrentSkipListMap.Index<K,V>(z, idx, null);
}

Начиная с нижнего слоя, индекс инициализируется для каждого слоя, меньшего, чем уровень, каждый узел указывает на вновь добавленный узел, нижний указывает на элемент следующего слоя, а следующий с правой стороны — все пустое значение. Весь процесс очень прост: инициализируйте индекс для каждого слоя меньшего, чем уровень, а затем добавьте его в исходную цепочку индексов.

level > headIndex.level

// leve > head.level 则新增一层
 else { // try to grow by one level
     // 新增一层
     level = max + 1;

     // 初始化 level个item节点
     @SuppressWarnings("unchecked")
     ConcurrentSkipListMap.Index<K,V>[] idxs =
             (ConcurrentSkipListMap.Index<K,V>[])new ConcurrentSkipListMap.Index<?,?>[level+1];
     for (int i = 1; i <= level; ++i)
         idxs[i] = idx = new ConcurrentSkipListMap.Index<K,V>(z, idx, null);

     //
     for (;;) {
         h = head;
         int oldLevel = h.level;
         // 层次扩大了,需要重新开始(有新线程节点加入)
         if (level <= oldLevel) // lost race to add level
             break;
         // 新的头结点HeadIndex
         ConcurrentSkipListMap.HeadIndex<K,V> newh = h;
         ConcurrentSkipListMap.Node<K,V> oldbase = h.node;
         // 生成新的HeadIndex节点,该HeadIndex指向新增层次
         for (int j = oldLevel+1; j <= level; ++j)
             newh = new ConcurrentSkipListMap.HeadIndex<K,V>(oldbase, newh, idxs[j], j);

         // HeadIndex CAS替换
         if (casHead(h, newh)) {
             h = newh;
             idx = idxs[level = oldLevel];
             break;
         }
     }

Когда уровень, определяемый подбрасыванием монеты, превышает максимальный уровень, необходимо добавить новый слой для обработки. Логика обработки следующая:

  1. Инициализируйте соответствующий массив индексов, размер равен уровню + 1, а затем создайте индекс для каждой единицы, параметры следующие: узел — это новая Z, вниз — индекс следующего уровня, справа — ноль.
  2. Операция расширения выполняется через цикл for. Для обработки с самого высокого уровня добавляется новый HeadIndex с параметрами: Node и down — Node и HeadIndex самого высокого уровня, right — индекс только что созданного соответствующего уровня, level — соответствующий уровень. Наконец, текущая головка заменяется головкой только что добавленного слоя через CAS. С помощью вышеуказанных шагов мы обнаружили, что, хотя узел-предшественник был найден, узел был вставлен, уровень был определен и соответствующий индекс был сгенерирован, но эти индексы не были вставлены на соответствующий уровень, поэтому следующее код для индекса вставляется в соответствующий слой.
// 从插入的层次level开始
  splice: for (int insertionLevel = level;;) {
      int j = h.level;
      //  从headIndex开始
      for (ConcurrentSkipListMap.Index<K,V> q = h, r = q.right, t = idx;;) {
          if (q == null || t == null)
              break splice;

          // r != null;这里是找到相应层次的插入节点位置,注意这里只横向找
          if (r != null) {
              ConcurrentSkipListMap.Node<K,V> n = r.node;

              int c = cpr(cmp, key, n.key);

              // n.value == null ,解除关系,r右移
              if (n.value == null) {
                  if (!q.unlink(r))
                      break;
                  r = q.right;
                  continue;
              }

              // key > n.key 右移
              if (c > 0) {
                  q = r;
                  r = r.right;
                  continue;
              }
          }

          // 上面找到节点要插入的位置,这里就插入
          // 当前层是最顶层
          if (j == insertionLevel) {
              // 建立联系
              if (!q.link(r, t))
                  break; // restart
              if (t.node.value == null) {
                  findNode(key);
                  break splice;
              }
              // 标志的插入层 -- ,如果== 0 ,表示已经到底了,插入完毕,退出循环
              if (--insertionLevel == 0)
                  break splice;
          }

          // 上面节点已经插入完毕了,插入下一个节点
          if (--j >= insertionLevel && j < level)
              t = t.down;
          q = q.down;
          r = q.right;
      }
  }

Этот код разделен на две части, одна часть заключается в том, чтобы найти позицию, в которую вставляется узел соответствующего уровня, вторая часть вставляется в эту позицию, а затем перемещается вниз.

На этом операция размещения ConcurrentSkipListMap завершена. Объем кода немного велик, вот краткое изложение:

  1. Сначала найдите узел-предшественник Node с помощью метода findPredecessor().
  2. В соответствии с возвращенным узлом-предшественником и ключом-значением создайте новый узел Node и установите следующий через CAS.
  3. Установите узел Node, а затем установите индексный узел. Уровень определяется подбрасыванием монеты.Если определенный уровень больше, чем существующий максимальный уровень, будет добавлен новый уровень, а затем будет создан новый связанный список предметов.
  4. Наконец, вставьте новый список элементов в структуру SkipList.

получить операцию

По сравнению с операцией put операция get намного проще, и процесс фактически эквивалентен только первому шагу операции put:

private V doGet(Object key) {
      if (key == null)
          throw new NullPointerException();
      Comparator<? super K> cmp = comparator;
      outer: for (;;) {
          for (ConcurrentSkipListMap.Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
              Object v; int c;
              if (n == null)
                  break outer;
              ConcurrentSkipListMap.Node<K,V> f = n.next;
              if (n != b.next)                // inconsistent read
                  break;
              if ((v = n.value) == null) {    // n is deleted
                  n.helpDelete(b, f);
                  break;
              }
              if (b.value == null || v == n)  // b is deleted
                  break;
              if ((c = cpr(cmp, key, n.key)) == 0) {
                  @SuppressWarnings("unchecked") V vv = (V)v;
                  return vv;
              }
              if (c < 0)
                  break outer;
              b = n;
              n = f;
          }
      }
      return null;
  }

Как и в первом шаге операции размещения, сначала вызовите метод findPredecessor(), чтобы найти узел-предшественник, а затем следуйте вправо, чтобы найти его до конца вправо. ответственность за удаление узла, значение которого равно нулю.

удалить операцию

Операция удаления заключается в удалении указанного ключевого узла следующим образом:

public V remove(Object key) {
    return doRemove(key, null);
}

Вызовите метод doRemove() напрямую, где remove имеет два параметра, один из которых является ключом, а другой — значением, поэтому метод doRemove предоставляет как ключ удаления, так и ключ-значение, которые удовлетворяют обоим.

final V doRemove(Object key, Object value) {
       if (key == null)
           throw new NullPointerException();
       Comparator<? super K> cmp = comparator;
       outer: for (;;) {
           for (ConcurrentSkipListMap.Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
               Object v; int c;
               if (n == null)
                   break outer;
               ConcurrentSkipListMap.Node<K,V> f = n.next;

               // 不一致读,重新开始
               if (n != b.next)                    // inconsistent read
                   break;

               // n节点已删除
               if ((v = n.value) == null) {        // n is deleted
                   n.helpDelete(b, f);
                   break;
               }

               // b节点已删除
               if (b.value == null || v == n)      // b is deleted
                   break;

               if ((c = cpr(cmp, key, n.key)) < 0)
                   break outer;

               // 右移
               if (c > 0) {
                   b = n;
                   n = f;
                   continue;
               }

               /*
                * 找到节点
                */

               // value != null 表示需要同时校验key-value值
               if (value != null && !value.equals(v))
                   break outer;

               // CAS替换value
               if (!n.casValue(v, null))
                   break;
               if (!n.appendMarker(f) || !b.casNext(n, f))
                   findNode(key);                  // retry via findNode
               else {
                   // 清理节点
                   findPredecessor(key, cmp);      // clean index

                   // head.right == null表示该层已经没有节点,删掉该层
                   if (head.right == null)
                       tryReduceLevel();
               }
               @SuppressWarnings("unchecked") V vv = (V)v;
               return vv;
           }
       }
       return null;
   }

Вызовите метод findPredecessor(), чтобы найти узел-предшественник, затем переместитесь вправо, затем сравните, используйте CAS, чтобы заменить значение на ноль после нахождения, а затем оцените, является ли узел единственным индексом этого слоя, если да, вызовите метод tryReduceLevel() для размещения этого слоя. Слой высох и удаление завершено.

На самом деле, из этого видно, что метод удаления только устанавливает значение узла в ноль и на самом деле не удаляет узел узла, На самом деле, из приведенной выше операции ввода и получения мы видим, что они будут судить узел при поиске узла.Если значение равно нулю, если оно равно нулю, вызовите метод unLink(), чтобы отменить ассоциацию, как показано ниже:

if (n.value == null) {
    if (!q.unlink(r))
        break;           // restart
    r = q.right;         // reread r
    continue;
}

размер операции

Операция size() ConcurrentSkipListMap отличается от ConcurrentHashMap тем, что она не поддерживает глобальную переменную для подсчета количества элементов, поэтому ее необходимо проходить каждый раз при вызове этого метода.

public int size() {
    long count = 0;
    for (Node<K,V> n = findFirst(); n != null; n = n.next) {
        if (n.getValidValue() != null)
            ++count;
    }
    return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
}

Вызовите метод findFirst(), чтобы найти первый узел, а затем используйте следующий узел узла для подсчета. Наконец, верните статистику вплоть до Integer.MAX_VALUE. Обратите внимание, что это безопасно при параллелизме потоков.