Анализ исходного кода Phaser для серии синхронизации Java с мертвым стуком

Java

вопрос

(1) Что такое Фазер?

(2) Каковы характеристики Phaser?

(3) Каковы преимущества Phaser по сравнению с CyclicBarrier и CountDownLatch?

Введение

Phaser, переводится как этап, подходит для такого сценария, большая задача может выполняться в несколько этапов, причем задачи каждого этапа могут выполняться одновременно несколькими потоками, но задачи предыдущего этапа должны быть выполнены. задача следующего этапа.

Хотя этот сценарий также можно реализовать с помощью CyclicBarrier или CountryDownLatch, он намного сложнее. Во-первых, может измениться конкретное количество требуемых этапов, а во-вторых, может измениться и количество задач на каждом этапе. По сравнению с CyclicBarrier и CountDownLatch Phaser более гибкий и удобный.

инструкции

Рассмотрим простейший вариант использования:

public class PhaserTest {

    public static final int PARTIES = 3;
    public static final int PHASES = 4;

    public static void main(String[] args) {

        Phaser phaser = new Phaser(PARTIES) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                // 【本篇文章由公众号“彤哥读源码”原创,请支持原创,谢谢!】
                System.out.println("=======phase: " + phase + " finished=============");
                return super.onAdvance(phase, registeredParties);
            }
        };

        for (int i = 0; i < PARTIES; i++) {
            new Thread(()->{
                for (int j = 0; j < PHASES; j++) {
                    System.out.println(String.format("%s: phase: %d", Thread.currentThread().getName(), j));
                    phaser.arriveAndAwaitAdvance();
                }
            }, "Thread " + i).start();
        }
    }
}

Здесь мы определяем большую задачу, которую необходимо выполнить в 4 этапа, и для каждого этапа требуется 3 небольших задачи.Для этих небольших задач мы запускаем 3 потока для выполнения этих небольших задач, и выходные результаты таковы:

Thread 0: phase: 0
Thread 2: phase: 0
Thread 1: phase: 0
=======phase: 0 finished=============
Thread 2: phase: 1
Thread 0: phase: 1
Thread 1: phase: 1
=======phase: 1 finished=============
Thread 1: phase: 2
Thread 0: phase: 2
Thread 2: phase: 2
=======phase: 2 finished=============
Thread 0: phase: 3
Thread 2: phase: 3
Thread 1: phase: 3
=======phase: 3 finished=============

Видно, что на каждом этапе все три потока завершаются перед переходом на следующий этап. Как это достигается, давайте узнаем вместе.

Принципиальная догадка

По принципу AQS, который мы узнали ранее, мы, вероятно, можем догадаться о принципе реализации Phaser.

Во-первых, нам нужно сохранить текущую фазу фазы, количество задач (участников) сторон в текущей фазе и количество незавершенных участников, Эти три переменные можно хранить в переменной состояния.

Во-вторых, очередь необходима для хранения участников, выполнивших задание первыми, и когда последний участник завершит задачу, необходимо разбудить участников в очереди.

Вот и все.

Объединение вышеуказанного случая в:

Изначально текущий этап равен 0, количество участников равно 3, а количество незавершенных участников равно 3;

Первый поток выполняется дляphaser.arriveAndAwaitAdvance();при входе в очередь;

Второй поток выполняется дляphaser.arriveAndAwaitAdvance();при входе в очередь;

Третий поток выполняется дляphaser.arriveAndAwaitAdvance();сначала выполните сводку по этой фазеonAdvance(), а затем разбудить первые два потока, чтобы продолжить выполнение задачи следующего этапа.

Что ж, в целом логично, а так ли это, давайте посмотрим исходный код вместе.

Анализ исходного кода

основной внутренний класс

static final class QNode implements ForkJoinPool.ManagedBlocker {
    final Phaser phaser;
    final int phase;
    final boolean interruptible;
    final boolean timed;
    boolean wasInterrupted;
    long nanos;
    final long deadline;
    volatile Thread thread; // nulled to cancel wait
    QNode next;

    QNode(Phaser phaser, int phase, boolean interruptible,
          boolean timed, long nanos) {
        this.phaser = phaser;
        this.phase = phase;
        this.interruptible = interruptible;
        this.nanos = nanos;
        this.timed = timed;
        this.deadline = timed ? System.nanoTime() + nanos : 0L;
        thread = Thread.currentThread();
    }
}

Узел, который завершает первого участника в очередь, здесь нам нужно только обратить вниманиеthreadа такжеnextДостаточно двух свойств.Очевидно, что это односвязный список, в котором хранятся потоки, поставленные в очередь.

главный атрибут

// 状态变量,用于存储当前阶段phase、参与者数parties、未完成的参与者数unarrived_count
private volatile long state;
// 最多可以有多少个参与者,即每个阶段最多有多少个任务
private static final int  MAX_PARTIES     = 0xffff;
// 最多可以有多少阶段
private static final int  MAX_PHASE       = Integer.MAX_VALUE;
// 参与者数量的偏移量
private static final int  PARTIES_SHIFT   = 16;
// 当前阶段的偏移量
private static final int  PHASE_SHIFT     = 32;
// 未完成的参与者数的掩码,低16位
private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
// 参与者数,中间16位
private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
// counts的掩码,counts等于参与者数和未完成的参与者数的'|'操作
private static final long COUNTS_MASK     = 0xffffffffL;
private static final long TERMINATION_BIT = 1L << 63;

// 一次一个参与者完成
private static final int  ONE_ARRIVAL     = 1;
// 增加减少参与者时使用
private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
// 减少参与者时使用
private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;
// 没有参与者时使用
private static final int  EMPTY           = 1;

// 用于求未完成参与者数量
private static int unarrivedOf(long s) {
    int counts = (int)s;
    return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
}
// 用于求参与者数量(中间16位),注意int的位置
private static int partiesOf(long s) {
    return (int)s >>> PARTIES_SHIFT;
}
// 用于求阶段数(高32位),注意int的位置
private static int phaseOf(long s) {
    return (int)(s >>> PHASE_SHIFT);
}
// 已完成参与者的数量
private static int arrivedOf(long s) {
    int counts = (int)s; // 低32位
    return (counts == EMPTY) ? 0 :
        (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
}
// 用于存储已完成参与者所在的线程,根据当前阶段的奇偶性选择不同的队列
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;

Основные свойстваstateа такжеevenQа такжеoddQ:

(1) состояние, переменная состояния, верхние 32 бита хранят текущую фазу фазы, средние 16 бит хранят количество участников, а нижние 16 бит хранят количество незавершенных участников Пожалуйста, поддержите оригинал, спасибо! ];

Phaser

(2) четные Q и нечетные Q, очереди, сохраненные завершенными участниками, пробуждают участников в очереди после того, как последний участник завершит задачу, чтобы продолжить задачу следующего этапа или завершить задачу.

Метод строительства

public Phaser() {
    this(null, 0);
}

public Phaser(int parties) {
    this(null, parties);
}

public Phaser(Phaser parent) {
    this(parent, 0);
}

public Phaser(Phaser parent, int parties) {
    if (parties >>> PARTIES_SHIFT != 0)
        throw new IllegalArgumentException("Illegal number of parties");
    int phase = 0;
    this.parent = parent;
    if (parent != null) {
        final Phaser root = parent.root;
        this.root = root;
        this.evenQ = root.evenQ;
        this.oddQ = root.oddQ;
        if (parties != 0)
            phase = parent.doRegister(1);
    }
    else {
        this.root = this;
        this.evenQ = new AtomicReference<QNode>();
        this.oddQ = new AtomicReference<QNode>();
    }
    // 状态变量state的存储分为三段
    this.state = (parties == 0) ? (long)EMPTY :
        ((long)phase << PHASE_SHIFT) |
        ((long)parties << PARTIES_SHIFT) |
        ((long)parties);
}

Также в конструкторе есть родитель и корень, которые используются для построения многоуровневых стадий, которые выходят за рамки этой статьи и игнорируются.

Ключевым моментом является рассмотрение метода присвоения состояния: старшие 32 бита хранят текущую фазу фазы, средние 16 бит хранят количество участников, а младшие 16 бит хранят количество незавершенных участников.

Давайте посмотрим на исходный код нескольких основных методов:

метод регистрации()

Регистрирует участника, и если метод onAdvance() выполняется при вызове этого метода, метод ожидает завершения своего выполнения.

public int register() {
    return doRegister(1);
}
private int doRegister(int registrations) {
    // state应该加的值,注意这里是相当于同时增加parties和unarrived
    long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
    final Phaser parent = this.parent;
    int phase;
    for (;;) {
        // state的值
        long s = (parent == null) ? state : reconcileState();
        // state的低32位,也就是parties和unarrived的值
        int counts = (int)s;
        // parties的值
        int parties = counts >>> PARTIES_SHIFT;
        // unarrived的值
        int unarrived = counts & UNARRIVED_MASK;
        // 检查是否溢出
        if (registrations > MAX_PARTIES - parties)
            throw new IllegalStateException(badRegister(s));
        // 当前阶段phase
        phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            break;
        // 不是第一个参与者
        if (counts != EMPTY) {                  // not 1st registration
            if (parent == null || reconcileState() == s) {
                // unarrived等于0说明当前阶段正在执行onAdvance()方法,等待其执行完毕
                if (unarrived == 0)             // wait out advance
                    root.internalAwaitAdvance(phase, null);
                // 否则就修改state的值,增加adjust,如果成功就跳出循环
                else if (UNSAFE.compareAndSwapLong(this, stateOffset,
                                                   s, s + adjust))
                    break;
            }
        }
        // 是第一个参与者
        else if (parent == null) {              // 1st root registration
            // 计算state的值
            long next = ((long)phase << PHASE_SHIFT) | adjust;
            // 修改state的值,如果成功就跳出循环
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
                break;
        }
        else {
            // 多层级阶段的处理方式
            synchronized (this) {               // 1st sub registration
                if (state == s) {               // recheck under lock
                    phase = parent.doRegister(1);
                    if (phase < 0)
                        break;
                    // finish registration whenever parent registration
                    // succeeded, even when racing with termination,
                    // since these are part of the same "transaction".
                    while (!UNSAFE.compareAndSwapLong
                           (this, stateOffset, s,
                            ((long)phase << PHASE_SHIFT) | adjust)) {
                        s = state;
                        phase = (int)(root.state >>> PHASE_SHIFT);
                        // assert (int)s == EMPTY;
                    }
                    break;
                }
            }
        }
    }
    return phase;
}
// 等待onAdvance()方法执行完毕
// 原理是先自旋一定次数,如果进入下一个阶段,这个方法直接就返回了,
// 如果自旋一定次数后还没有进入下一个阶段,则当前线程入队列,等待onAdvance()执行完毕唤醒
private int internalAwaitAdvance(int phase, QNode node) {
    // 保证队列为空
    releaseWaiters(phase-1);          // ensure old queue clean
    boolean queued = false;           // true when node is enqueued
    int lastUnarrived = 0;            // to increase spins upon change
    // 自旋的次数
    int spins = SPINS_PER_ARRIVAL;
    long s;
    int p;
    // 检查当前阶段是否变化,如果变化了说明进入下一个阶段了,这时候就没有必要自旋了
    while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
        // 如果node为空,注册的时候传入的为空
        if (node == null) {           // spinning in noninterruptible mode
            // 未完成的参与者数量
            int unarrived = (int)s & UNARRIVED_MASK;
            // unarrived有变化,增加自旋次数
            if (unarrived != lastUnarrived &&
                (lastUnarrived = unarrived) < NCPU)
                spins += SPINS_PER_ARRIVAL;
            boolean interrupted = Thread.interrupted();
            // 自旋次数完了,则新建一个节点
            if (interrupted || --spins < 0) { // need node to record intr
                node = new QNode(this, phase, false, false, 0L);
                node.wasInterrupted = interrupted;
            }
        }
        else if (node.isReleasable()) // done or aborted
            break;
        else if (!queued) {           // push onto queue
            // 节点入队列
            AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
            QNode q = node.next = head.get();
            if ((q == null || q.phase == phase) &&
                (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
                queued = head.compareAndSet(q, node);
        }
        else {
            try {
                // 当前线程进入阻塞状态,跟调用LockSupport.park()一样,等待被唤醒
                ForkJoinPool.managedBlock(node);
            } catch (InterruptedException ie) {
                node.wasInterrupted = true;
            }
        }
    }
    
    // 到这里说明节点所在线程已经被唤醒了
    if (node != null) {
        // 置空节点中的线程
        if (node.thread != null)
            node.thread = null;       // avoid need for unpark()
        if (node.wasInterrupted && !node.interruptible)
            Thread.currentThread().interrupt();
        if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
            return abortWait(phase); // possibly clean up on abort
    }
    // 唤醒当前阶段阻塞着的线程
    releaseWaiters(phase);
    return p;
}

Логика добавления популяции участников следующая:

(1) Чтобы добавить участника, нужно одновременно увеличить два значения партий и неприбывших, то есть средние 16 бит и младшие 16 бит состояния;

(2) Если это первый участник, попытаться атомарно обновить значение состояния и выйти в случае успеха;

(3) Если это не первый участник, проверьте, выполняется ли onAdvance(), ожидает ли он завершения выполнения onAdvance(), если нет, попробуйте атомарно обновить значение состояния, пока оно не завершится успешно;

(4) Ожидание завершения onAdvance() означает ожидание путем сначала вращения, а затем перехода из очереди в очередь, что сокращает переключение контекста потока;

Метод прибытияAndAwaitAdvance()

Текущая фаза текущего потока завершена, ожидая завершения текущей фазы другими потоками.

Если текущий поток является последним, прибывшим в эту фазу, текущий поток выполнит метод onAdvance() и разбудит другие потоки для перехода к следующей фазе.

public int arriveAndAwaitAdvance() {
    // Specialization of doArrive+awaitAdvance eliminating some reads/paths
    final Phaser root = this.root;
    for (;;) {
        // state的值
        long s = (root == this) ? state : reconcileState();
        // 当前阶段
        int phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            return phase;
        // parties和unarrived的值
        int counts = (int)s;
        // unarrived的值(state的低16位)
        int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
        if (unarrived <= 0)
            throw new IllegalStateException(badArrive(s));
        // 修改state的值
        if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
                                      s -= ONE_ARRIVAL)) {
            // 如果不是最后一个到达的,则调用internalAwaitAdvance()方法自旋或进入队列等待
            if (unarrived > 1)
                // 这里是直接返回了,internalAwaitAdvance()方法的源码见register()方法解析
                return root.internalAwaitAdvance(phase, null);
            
            // 到这里说明是最后一个到达的参与者
            if (root != this)
                return parent.arriveAndAwaitAdvance();
            // n只保留了state中parties的部分,也就是中16位
            long n = s & PARTIES_MASK;  // base of next state
            // parties的值,即下一次需要到达的参与者数量
            int nextUnarrived = (int)n >>> PARTIES_SHIFT;
            // 执行onAdvance()方法,返回true表示下一阶段参与者数量为0了,也就是结束了
            if (onAdvance(phase, nextUnarrived))
                n |= TERMINATION_BIT;
            else if (nextUnarrived == 0)
                n |= EMPTY;
            else
                // n 加上unarrived的值
                n |= nextUnarrived;
            // 下一个阶段等待当前阶段加1
            int nextPhase = (phase + 1) & MAX_PHASE;
            // n 加上下一阶段的值
            n |= (long)nextPhase << PHASE_SHIFT;
            // 修改state的值为n
            if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
                return (int)(state >>> PHASE_SHIFT); // terminated
            // 唤醒其它参与者并进入下一个阶段
            releaseWaiters(phase);
            // 返回下一阶段的值
            return nextPhase;
        }
    }
}

Примерная логика прибытияAndAwaitAdvance такова:

(1) Изменить значение неприбывшей части состояния минус 1;

(2) Если он прибыл не последним, вызовите метод internalAwaitAdvance() для вращения или постановки в очередь;

(3) Если он прибыл последним, вызовите метод onAdvance(), затем измените значение состояния на значение, соответствующее следующему этапу, и разбудите другие ожидающие потоки;

(4) Вернуть значение следующего этапа;

Суммировать

(1) Phaser подходит для многоэтапных и многозадачных сценариев, и задачи каждого этапа можно очень точно контролировать;

(2) Phaser использует переменные состояния и очереди для реализации всей логики. ];

(3) Старшие 32 бита состояния хранят фазу текущего этапа, средние 16 бит хранят количество партий (задач) на текущем этапе, младшие 16 бит хранят количество не прибывших участников;

(4) Очередь будет выбирать разные очереди в соответствии с четностью текущего этапа;

(5) Когда прибудет не последний участник, он будет вращаться или встанет в очередь, чтобы дождаться, пока все участники выполнят задание;

(6) Когда последний участник завершит задачу, он разбудит поток в очереди и перейдет к следующему этапу;

пасхальные яйца

Преимущества Phaser перед CyclicBarrier и CountDownLatch?

О: Есть два основных преимущества:

(1) Phaser может выполнять несколько этапов, в то время как CyclicBarrier или CountDownLatch обычно могут контролировать только один или два этапа задач;

(2) Количество задач на каждом этапе Phaser можно контролировать, а количество задач в CyclicBarrier или CountDownLatch нельзя изменить после его определения.

Рекомендуемое чтение

1,Открытие серии java-синхронизации мертвых приседаний

2,Небезопасный анализ мертвого магического класса Java

3.JMM (модель памяти Java) из мертвой серии синхронизации Java

4.Неустойчивый анализ мертвой серии синхронизации Java

5.Синхронный анализ мертвой серии синхронизации Java

6.Напишите блокировку самостоятельно в серии «Синхронизация Java»

7.Начало AQS в серии синхронизации Java

8,ReentrantLock анализ исходного кода тупиковой серии синхронизации Java (1) - справедливая блокировка, нечестная блокировка

9,ReentrantLock Анализ исходного кода мертвой серии синхронизации Java (2) — условная блокировка

10.ReentrantLock VS синхронизирован в серии синхронизации java

11.Анализ исходного кода ReentrantReadWriteLock мертвой серии синхронизации Java

12.Анализ исходного кода семафора серии Dead Java Synchronization

13.Анализ исходного кода CountDownLatch серии Dead Java Synchronization

14.Заключительная глава AQS в серии о синхронизации Java.

15.Анализ исходного кода StampedLock мертвой серии синхронизации Java

16.Анализ исходного кода CyclicBarrier мертвой серии синхронизации java


Добро пожаловать, чтобы обратить внимание на мою общедоступную учетную запись «Брат Тонг читает исходный код», проверить больше статей из серии исходного кода и поплавать в океане исходного кода с братом Тонгом.

qrcode