Хотя самым важным для компьютерной программы является правильность, если программа не может дать правильных результатов, то ценность программы сильно снижается. Но производительность программы также является очень важным аспектом.Если программа будет работать слишком медленно, это также повлияет на сферу применения программы и стоимость аппаратной настройки.
В предыдущей статье "4. Невидимые подводные камни многопоточности", мы узнали о механизме синхронизации между потоками, который в основном предназначен для обеспечения корректности работы программы в многопоточной среде. В этой статье мы углубимся в узкие места производительности многопоточных программ и различные методы оптимизации, поэтому начнем с измерения и анализа производительности программы.
Анализ производительности многопоточных программ
Давайте сначала посмотрим на использованиеAtomicLong
В программе для многопоточного подсчета в следующей программе будут запущены два потока, и каждый поток будетcount
Выполните 100 миллионов операций накопления (10 в 8-й степени), этот код получает текущее время в начале и в конце, а затем вычисляет время работы программы через эти два значения времени.
public class AtomicIntegerTest {
private static AtomicLong count = new AtomicLong(0);
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
Runnable task = new Runnable() {
public void run() {
for (int i = 0; i < 1e8; ++i) {
count.incrementAndGet();
}
}
};
Thread t1 = new Thread(task);
Thread t2 = new Thread(task);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("count = " + count);
long endTime = System.currentTimeMillis();
System.out.println(String.format("总耗时:%.2fs", (endTime - startTime) / 1e3));
}
}
Конечный результат выполнения этой программы на моем компьютере составляет 2,44 с, что кажется немного большим, поэтому давайте посмотрим на результат накопления целочисленной переменной 200 миллионов раз непосредственно в одном потоке. Вот программный код, который накапливает 200 миллионов раз в одном потоке:
public class SingleThreadTest {
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
long count = 0;
for (int i = 0; i < 2e8; ++i) {
count += 1;
}
System.out.println("count = " + count);
long endTime = System.currentTimeMillis();
System.out.println(String.format("总耗时:%.2fs", (endTime - startTime) / 1e3));
}
}
Этот код выполняется всего за 0,33 с. Код, который мы накапливаем в двух потоках, намного медленнее, чем однопоточный код. Разве это не противоречит нашему первоначальному намерению использовать многопоточность для ускорения программы?
Межпоточная синхронизация многопоточных программ является ключом к влиянию на производительность многопоточных программ.С одной стороны, части, которые должны быть сериализованы в программе, значительно увеличивают общее время работы системы. с другой стороны, накладные расходы на само поведение синхронизации относительно велики, особенно в случае конфликта. В приведенном выше коде причина, по которой многопоточная программа накопления работает намного медленнее, чем однопоточная программа, заключается в том, что вAtomicLong
статическая переменная типаcount
Есть два потока, вызывающих в то же время наincrementAndGet
метод для накопления, что приведет к серьезному конфликту этой статической переменной.
Когда поток успешно изменяет переменнуюcount
После значения другой поток, который модифицирует, не сможет измениться и снова попытается выполнить операцию накопления. и потому чтоAtomicLong
Объект типа используется сvolatile
переменная для хранения фактического целочисленного значения, которое мы обсуждали в предыдущей статье "Невидимые ловушки многопоточности", видно, что даvolatile
Операция модификации переменной должна записывать измененные данные из кеша обратно в память, которая также используетсяAtomicLong
Основная причина, по которой накопление занимает намного больше времени, чем однопоточная версия накопления.
Так есть ли у нас лучший способ решить эту проблему?
Оптимизация с разделением задач
В приведенном выше примере все, что нам нужно, — это конечный накопленный результат, поэтому, чтобы уменьшить накладные расходы на синхронизацию между потоками, мы можем разделить накопленные задачи на разные потоки для выполнения и добавить результаты каждого потока в конце. мы можем получить окончательный результат. В следующем коде мы используем этот метод, t1 накапливает 100 миллионов раз для count1, t2 накапливает 100 миллионов раз для count2 и, наконец, добавляет count1 и count2, чтобы получить окончательный результат, давайте запустим его вместе и посмотрим, как это работает.
public class TwoThreadTest {
private static long count1 = 0;
private static long count2 = 0;
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
Thread t1 = new Thread(new Runnable() {
public void run() {
for (int i = 0; i < 1e8; ++i) {
count1 += 1;
}
}
});
Thread t2 = new Thread(new Runnable() {
public void run() {
for (int i = 0; i < 1e8; ++i) {
count2 += 1;
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
long count = count1 + count2;
System.out.println("count = " + count);
long endTime = System.currentTimeMillis();
System.out.println(String.format("总耗时:%.2fs", (endTime - startTime) / 1e3));
}
}
Время выполнения этой программы на моем компьютере составляет 0,20 с, что является большим улучшением по сравнению с предыдущими однопоточными 0,33 с, и намного опережает исходное накопление двух потоков.AtomicLong
2.44s версии переменной типа. Это показывает, что наш предыдущий анализ верен.AtomicLong
Виновник плохой работы программы.
Тем не менее, структура этой версии все еще немного примитивна, и может быть проще справиться с простым требованием накопления, но при столкновении со сложными одновременными задачами может потребоваться написать много сложного кода, и это легко ошибиться. Например, если мы хотим разбить задачу на 10 потоков для выполнения, то нам нужно сначала разделить 200 миллионов совокупных задач на 10 частей, а затем создать массив, содержащий 10 объектов Thread, чтобы их можно было спаривать по отдельности. разные диапазоны, и, наконец, пройтиjoin
Метод ожидает завершения выполнения этих 10 потоков, что не кажется легкой задачей. Неважно, ниже мы представим широко используемую структуру разделения задач и запуска для решения этой проблемы.С помощью этой структуры мы можем легко писать программы разделения задач, которые легко писать и расширять.
Задачи с ForkJoinPool
В JDK 1.7 была представлена новая многопоточная среда выполнения задач, называемаяForkJoinPool
.ForkJoinPool
это класс Java, который реализует функцию, представляющую пул потоковExecutorService
интерфейс, поэтому он использует методы и общие классы пула потоковThreadPoolExecutor
Аналогично, но в этом разделе нам не нужно подробно разбираться в использовании пулов потоков, но заинтересованные читатели могут обратиться к этой статье.Играйте с пулом потоков от 0 до 1выяснить.
Пул потоков — это набор потоков, и потоки в нем всегда будут ожидать выполнения задач, поэтому мы можем отправлять задачи в пул потоков в виде объектов задач, а затем пул потоков будет использовать потоки в нем для выполнения. задачи. существуетForkJoinPool
При использовании пул потоков относится кForkJoinPool
объект типа, а объект задачи относится к объекту, который наследуется отForkJoinTask
предмет класса. В приведенном ниже примере кода мы использовали пользовательскийRecursiveTask
Подкласс класса задач,RecursiveTask
класс наследуется отForkJoinTask
Добрый.
Recursive
Это означает рекурсию, что означает, что мы можем создать новый объект класса задач для представления подзадач текущей задачи во время выполнения этого класса задач, а затем вернуть результат текущей задачи, объединив результаты нескольких подзадач. Например, задача в начале состоит в том, чтобы накопить 200 миллионов раз, но затем мы можем разделить ее на сумму результатов двух подзадач, которые были накоплены 100 миллионов раз соответственно. которые были накоплены 100 миллионов раз, также могут быть добавлены снова.Разделите на две подзадачи, которые накапливаются 50 миллионов раз. Это разделение будет продолжаться до тех пор, пока мы не посчитаем, что масштаб задачи достаточно мал, затем будут вычислены результаты подзадач, а затем возвращены задаче верхнего уровня для обработки, чтобы получить результат задачи верхнего уровня.
Не беда, если вы не поняли предыдущее текстовое описание, давайте найдем ответ в коде:
public class ForkJoinTest {
private static class AccumulateTask extends RecursiveTask<Long> {
private long start;
private long end;
private long threshold;
/**
* 任务的构造函数
*
* @param start 任务处理范围的起始点(包含)
* @param end 任务处理范围的结束点(不包含)
* @param threshold 任务拆分的阈值
*/
public AccumulateTask(long start, long end, long threshold) {
this.start = start;
this.end = end;
this.threshold = threshold;
}
@Override
protected Long compute() {
long left = start;
long right = end;
// 终止条件:如果当前处理的范围小于等于阈值(threshold),
// 那么就直接通过循环执行累加操作
if (right - left <= (int) threshold) {
long result = 0;
for (long i = left; i < right; ++i) {
result += 1;
}
return result;
}
// 获取当前处理范围的中心点
long mid = (start + end) / 2;
// 拆分出两个子任务,一个从start到mid,一个从mid到end
ForkJoinTask<Long> leftTask = new AccumulateTask(start, mid, threshold);
ForkJoinTask<Long> rightTask = new AccumulateTask(mid, end, threshold);
// 通过当前线程池运行两个子任务
leftTask.fork();
rightTask.fork();
try {
// 获取两个子任务的结果并返回
return leftTask.get() + rightTask.get();
} catch (Exception e) {
return 0L;
}
}
}
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
// 创建总任务,范围是从1到两亿(包含),阈值为10的7次方,所以最终至少会有10个任务进行for循环的累加
AccumulateTask forkJoinTask = new AccumulateTask(1, (int) 2e8+1, (long) 1e7);
// 使用一个新创建的ForkJoinPool任务池运行ForkJoin任务
new ForkJoinPool().submit(forkJoinTask);
// 打印任务结果
System.out.println("count = " + forkJoinTask.get());
// 计算程序耗时并打印
long endTime = System.currentTimeMillis();
System.out.println(String.format("总耗时:%.2fs", (endTime - startTime) / 1e3));
}
}
В приведенном выше коде мы будем продолжать создавать подзадачи, которые накапливаются в пределах указанного диапазона, пока диапазон задач не станет меньше порогового значения (в коде 10 в 7-й степени), подзадачи не будут разделены, а будут получено зацикливанием Совокупные результаты. После этого возвращаемые результаты подзадач добавляются в задачу верхнего уровня и возвращаются как результат задачи верхнего уровня. В итоге мы можем получить результат накопления 200 миллионов раз.
В этой программе наиболее важными являются три операции над объектом задачи:
- Создайте объект задачи, который используется в коде
new AccumulateTask(start, mid, threshold)
а такжеnew AccumulateTask(mid, end, threshold)
, эти две части кода будут создавать подзадачи с той же логикой, что и родительская задача, за исключением разных областей действия, каждая подзадача будет отвечать за выполнение половины области действия родительской задачи; - Выполнять подзадачи, вызывая объект задачи
fork()
метод позволяет отправлять подзадачи в текущуюForkJoinPool
в исполнении; - Подождите, пока подзадача вернет результат, вызвав объект задачи подзадачи.
get()
метод, родительская задача будет ждать завершения выполнения подзадачи и вернуть результат, а затем сложить результаты двух подзадач, чтобы получить результат выполнения родительской задачи.
Почему одни и те же пулы потоков, ноThreadPoolExecutor
Трудно ли классу реализовать такую реализацию? Внимательные читатели могли заметить, что мы разделим две подзадачи в задаче и дождемся завершения обеих подзадач, прежде чем вернуть результат родительской задачи. если вThreadPoolExecutor
В ожидании завершения выполнения подзадачи и получения результата родительская задача всегда будет блокироваться и занимать поток.В этом случае, если родительских задач слишком много, подзадача не будет использовать потоки, и запущенный процесс будет можно не продолжать.. а такжеForkJoinPool
Этот специальный пул потоков решает эту проблему.Родительская задача может передать поток другим задачам, ожидая выполнения дочерней задачи, так что потоки не будут заблокированы родительской задачей в заблокированном состоянии.
Этот метод разделения задач на независимые подзадачи, выполнения их в разных потоках и, наконец, пошагового объединения результатов называется Map-Reduce. Этот метод широко используется в офлайн-технологии больших данных, и не будет преувеличением сказать, что технологии, связанные с большими данными, разрабатываются на основе идеи Map-Reduce.
переменная в потоке
Из приведенных выше примеров видно, что для многопоточных программ обмен данными является самой большой проблемой. Совместно используемые данные могут не только вызвать проблемы гонки данных и вызвать проблемы программы, но также с введением операций синхронизации потоков производительность программы будет снижена, а время выполнения многопоточных программ может быть даже намного больше, чем у программ. однопоточные программы. В приведенном выше примере мы используемForkJoinPool
Совокупная задача разделяется и выполняется, а каждая подзадача в основном полностью независима, что обеспечивает максимальное распараллеливание. Однако в некоторых случаях мы можем не достичь такого идеального решения, а в некоторых случаях все же будут определенные операции синхронизации потоков и соответствующие критические участки кода.
Итак, что мы можем сделать в этих случаях, чтобы сделать программу максимально производительной?
Предположим, теперь мы хотим подсчитать количество вызовов метода.Если может быть несколько потоков, вызывающих метод одновременно, то вызовы нескольких потоков должны подсчитываться одновременно. В этом случае мы можем рассмотреть возможность сохранения целочисленной переменной в каждом потоке, чтобы сохранить количество вызовов в каждом потоке, а затем, когда мы получим общее число, нам нужно будет только сложить числа в каждом потоке, чтобы вычислить его. При накоплении счетчика нам нужно только изменить переменную, соответствующую текущему потоку, и, естественно, нет проблемы гонки данных.java.util.concurrent.atomic
аккумулятор в упаковкеLongAdder
Этот тип мышления принят классом, и этот тип мышления также имеет свою специализированную терминологию, которая называется **"замыкание нити"**,закрытие потокаОтносится к этому методу оптимизации, чтобы избежать совместного использования данных между потоками через переменные внутри потока.
Java также имеет специальныеThreadLocal
Класс может обрабатывать переменные в потоке, но он вообще не используется для сценариев агрегации данных типа многопоточного накопления из-за производительности и сохранения данных при уничтожении потока, но очень удобен в сохранении и получении данных. Вы можете узнать.
ConcurrentHashMap
java.util.concurrent
Пакет предоставляет нам большое количество инструментов параллельного программирования, таких как блокировки, атомарные классы, пулы потоков и ForkJoinPool. Наконец, давайте узнаемjava.util.concurrent
Поточно-безопасная структура данных, предоставленная нам в пакете.
В Java наш часто используемый класс Map:HashMap
, но этот класс не потокобезопасен, если мыHashMap
Если объект читается и записывается, это может вызвать некоторые проблемы с программой. Есть также один, который существует с JDK 1.0.Hashtable
Класс может гарантировать потокобезопасность, но когда мы открываем исходный код этого класса, мы видим, что большинство методов в этом классе добавленыsynchronized
теги, которые включают в себя наиболее часто используемыеget
а такжеput
метод, а значитHashtable
Объекты класса могут одновременно использоваться только одним потоком, что относительно неэффективно.
Но на самом деле друзья, знакомые со структурой HashMap, могут знать, что внутренняя структура HashMap разделена на множество сегментов, и каждая пара ключ-значение будет помещена в разные сегменты в соответствии со значением hashCode значения ключа. На самом деле, при выполнении операций модификации нам нужно только заблокировать соответствующий бакет, а при выполнении операций чтения в большинстве случаев нам не нужно блокировать. Представлено в JDK 1.5ConcurrentHashMap
В основном эти два пункта могут быть достигнуты.
В этом классе мы оптимизируем производительность параллелизма двумя способами:
- За счет ограничения объема кода, защищаемого блокировками, снижается вероятность конфликтов блокировок, а также количество требуемых блокировок и накладные расходы на синхронизацию;
- С другой стороны, поскольку нет необходимости блокировать чтение, необходимо блокировать только операции записи.В некоторых случаях, когда операций чтения много, но операций записи мало, мы можем значительно снизить стоимость операций чтения, тем самым повысив производительность. программы.
а такжеConcurrentHashMap
Большинство методов не только оптимизируют эффективность механизма синхронизации, но также предоставляют множество атомарных методов работы, подобных CAS.ConcurrentHashMap
Общие операции класса:
-
V putIfAbsent(K key, V value)
, атомарная операция, если карта не содержит ключа, выполнитьmap.put(key, value)
И верните возвращаемое значение метода put, в противном случае верните напрямуюmap.get(key)
Значение , то есть текущее значение; -
boolean remove(Object key, Object value)
, атомарная операция, если выполняется условие (карта содержит пару ключ-значение, соответствующую ключу && значение параметра равно текущему значению значения в паре ключ-значение), то удалить пару ключ-значение, соответствующую key и вернуть true, иначе вернуть false; -
boolean replace(K key, V oldValue, V newValue)
, атомарная операция, когда она выполняется (карта содержит пару ключ-значение, соответствующую ключу && параметр oldValue равен текущему значению значения в паре ключ-значение), изменить значение, соответствующее ключу, на новое значение и вернуть true, иначе вернуть false.
Суммировать
В этой статье мы начнем с использованияAtomicLong
Начался тест производительности многопоточной программы накопления, и производительность программы была значительно оптимизирована за счет идеи Map-Reduce, в процессе которой также были задействованыForkJoinPool
использование класса. После этого мы официально предложили «закрытие потока» через переменные в потоке.концепция, если мы можем сделатьЕсли поток закрыт**, то благодаря уменьшению накладных расходов на синхронизацию между потоками производительность потока значительно повысится. Наконец, мы представляемjava.util.concurrent
Класс данных с защитой от параллелизма, предоставленный нам в пакетеConcurrentHashMap
. Я считаю, что с помощью этой статьи вы можете узнать о различных методах оптимизации многопоточной производительности, но самое главное — найти узкое место в производительности многопоточной программы, чтобы вы могли корректировать меры в соответствии с различными ситуациями в реальные практические сценарии Используйте соответствующие методы для решения различных проблем с узкими местами. Точка зрения в этой статье состоит в том, что узким местом многопоточных программ в основном является проблема конкуренции данных, вызванная общими данными.Если между разными потоками нет общих данных и кода критической код раздела, насколько это возможно, производительность многопоточных программ может быть улучшена, что оказывает положительное влияние.