содержание
- Сцены
- Однопоточная реализация
- Многопоточная реализация -
ExecutorService
- Многопоточная реализация -
ForkJoinPool
- контрольная работа
- Суммировать
- Ссылка на ссылку
В обычной работе, когда объем данных относительно велик, программа работает медленно, и производительность программы нуждается в повышении, обычно задействуется многопоточность. Некоторые друзья не очень понимают использование многопоточности, эта статья в основном объясняетThreadPoolExecutor
иForkJoinPool
использование.
Сцены
Сначала мы предполагаем такой сценарий, есть интерфейс, используемый для вычисления суммы массива. Интерфейс определяется следующим образом:
package mutilthread;
/**
* 求和的接口
* @Author: Rebecca
* @Description:
* @Date: Created in 2019/6/18 15:28
* @Modified By:
*/
public interface Calculator {
long sumUp(int[] numbers) throws Exception;
}
Однопоточная реализация
В начале наш код должен быть реализован с использованием общего одного потока.Преимущество этого заключается в том, что код относительно прост.Недостатком является то, что при относительно большом объеме данных программа работает медленно и не может использовать многоядерные процессоры.
package mutilthread;
import java.util.ArrayList;
import java.util.List;
/**
* 单线程的类
* @Author: Rebecca
* @Description:
* @Date: Created in 2019/6/18 10:24
* @Modified By:
*/
public class SingleThread implements Calculator {
/**
* 用单线程计算数组的和
* @param calcData 需要求和的数组
* @return
* @author Rebecca 10:51 2019/6/18
* @version 1.0
*/
@Override
public long sumUp(int[] calcData) {
// 此句代码只是为了延长程序运行时间,和程序逻辑无关
List<SingleThread> tasks = new ArrayList<SingleThread>();
int calcDataLength = calcData.length;
long sum = 0l;
for (int i = 0; i < calcDataLength; i++) {
sum += calcData[i];
// 此句代码只是为了延长程序运行时间,和程序逻辑无关
tasks.add(new SingleThread());
}
return sum;
}
}
Многопоточная реализация -ExecutorService
Поскольку недостаток одного потока серьезно влияет на скорость обработки программы, мы оптимизируем код для многопоточности.ExecutorService
реализовать.
package mutilthread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
/**
* 用 ThreadPoolExecutor 线程池计算数组的和
* @Author: Rebecca
* @Description:
* @Date: Created in 2019/6/18 10:50
* @Modified By:
*/
public class MutilThreadOfThreadPoolExecutor implements Calculator {
/**
* 用 ThreadPoolExecutor 线程池计算数组的和
* @param calcData 需要求和的数组
* @return
* @author Rebecca 10:51 2019/6/18
* @version 1.0
*/
@Override
public long sumUp(int[] calcData) throws Exception {
// 创建线程池
ExecutorService executorService = new ThreadPoolExecutor(5, 10, // 线程数
60l, TimeUnit.SECONDS, // 超时时间
new ArrayBlockingQueue<Runnable>(100, true), // 线程处理数据的方式
Executors.defaultThreadFactory(), // 创建线程的工厂
new CallerRunsPolicy()); // 超出处理范围的处理方式
int calcDataLength = calcData.length;
long sum = 0l;
int threadSize = 5;
for (int i = 0; i < threadSize; i++) {
int arrStart = calcDataLength / threadSize * i;
int arrEnd = calcDataLength / threadSize * (i+1);
SumTask task = new SumTask(calcData, arrStart, arrEnd);
// 线程池处理数据
Future<Long> future = executorService.submit(task);
sum += future.get().longValue();
}
// 关闭线程池
executorService.shutdown();
return sum;
}
public static class SumTask implements Callable<Long> {
private int[] arr;
private int start, end;
public SumTask() {}
public SumTask(int[] arr, int start, int end)
{
this.arr = arr;
this.start = start;
this.end = end;
}
@Override
public Long call()
{
// 此句代码只是为了延长程序运行时间,和程序逻辑无关
List<SumTask> tasks = new ArrayList<SumTask>();
long sum = 0l;
for (int i = start; i < end; i++)
{
sum += arr[i];
// 此句代码只是为了延长程序运行时间,和程序逻辑无关
tasks.add(new SumTask());
}
return sum;
}
}
}
Executors
Он также предоставляет некоторые методы, которые могут быть созданы непосредственноExecutorService
пул потоков, напримерnewSingleThreadExecutor()
,newCachedThreadPool()
,newFixedThreadPool()
,newScheduledThreadPool()
,по сравнению сThreadPoolExecutor
предоставленный конструктор,Executors
Предоставленный метод принимает только 2 параметра или даже меньше, ноnew ThreadPoolExecutor()
Затем вам нужно передать кучу параметров. Так почему же мы все еще используемnew ThreadPoolExecutor()
Как насчет этого пути?
Ответ очень простой, чтобы программа не появлялась OOM. если ты виделExecutors
Исходный код метода, связанного с построением пула потоков, будет найден, а также используется внутри.new ThreadPoolExecutor()
способ создания пула потоков. Но есть один параметр, который он передает:Integer.MAX_VALUE
. Что означает этот параметр? То есть максимальное количество потоков, разрешенных в пуле потоков. Если пул потоков действительно созданInteger.MAX_VALUE
Количество потоков у программы точно будет OOM.
// Executors的newCachedThreadPool方法源码
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
Чтобы избежать этого, мы обычно используемnew ThreadPoolExecutor()
Таким образом создаются пулы потоков. Так что же означает такое количество параметров?
Не волнуйтесь, на самом деле мы можем группировать воспоминания:
Группа 1 (количество связанных потоков):
- corePoolSize: количество основных потоков. Даже если в пуле потоков нет задач, эти потоки не будут уничтожены, потому что для создания и уничтожения потоков требуются ресурсы ЦП.
- maxPoolSize: максимальное количество потоков, которые можно создать в пуле потоков.
Группа 2 (зависит от времени разрушения неосновной нити):
- keepAliveTime: время уничтожения неосновных потоков. Неосновные потоки не всегда могут занимать ресурсы в пуле потоков, поэтому их нужно уничтожать
- unit: единица времени для уничтожения. значение
TimeUnit
перечисление типов в
Группа 3 (обработка данных пула потоков):
- workQueue: способ обработки данных потоком. Обычно предоставляется JDK
ArrayBlockingQueue
(массив) иLinkedBlockingDeque
(связанный список) - обработчик: метод обработки вне области обработки.
AbortPolicy
: если вне досягаемости, броситьRejectedExecutionException
аномальный;
CallerRunsPolicy
: если он превышает диапазон обработки, он будет обработан потоком, вызывающим пул потоков;
DiscardOldestPolicy
: Если он превышает диапазон обработки, удалите самый старый элемент и сохраните новый элемент.DiscardPolicy
: Если он превышает диапазон обработки, он не будет обрабатываться и отбрасываться.
Группа 4 (фабрики, создающие потоки):
- threadFactory: фабрика для создания потоков, обычно мы используем
Executors.defaultThreadFactory()
Просто
// ThreadPoolExecutor的构造方法源码
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
Предположим, у нас есть ряд задач, которые разделены на 3 группы, количество задач в каждой группе равно 3, и в пуле потоков для обработки есть только 3 потока, тогда порядок обработки следующий:
шаг 1:
Группа задач 1 обрабатывается потоком 1, а поток 1 обрабатывает первую задачу в группе задач 1; Группа задач 2 обрабатывается потоком 2, а поток 2 обрабатывает первую задачу в группе задач 2; Группа задач 3 обрабатывается потоком 3, который обрабатывает первую задачу в группе задач 3;
Шаг 2:
Поток 2 обрабатывается быстрее, и обрабатываются все задачи в группе задач 2. Поскольку ни одна группа задач не ожидает обработки, поток 2 в это время находится в состоянии простоя. В настоящее время группа задач 1, обрабатываемая потоком 1, обработала только первую задачу, поэтому есть ли способ для потока 2 украсть вторую задачу в группе задач 1 и обработать ее, чтобы сократить время ожидания?
После JDK7 предоставляетсяForkJoinPool
Пул потоков может быть реализован ~ Тогда посмотрите вниз
Многопоточная реализация -ForkJoinPool
Мы по-прежнему используем пример с суммированием для моделирования задачи кражи.
package mutilthread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* 用 ForkJoinPool 线程池计算数组的和
* @Author: Rebecca
* @Description:
* @Date: Created in 2019/6/18 10:50
* @Modified By:
*/
public class MutilThreadOfForkJoinPool implements Calculator {
private ForkJoinPool pool;
public MutilThreadOfForkJoinPool() {
// jdk8之后可以用公用的 ForkJoinPool: pool = ForkJoinPool.commonPool()
pool = new ForkJoinPool();
}
/**
* 用 ForkJoinPool 线程池计算数组的和
* @param calcData 需要求和的数组
* @return
* @author Rebecca 10:51 2019/6/18
* @version 1.0
*/
@Override
public long sumUp(int[] calcData) {
SumTask task = new SumTask(calcData, 0, calcData.length - 1);
return pool.invoke(task);
}
public static class SumTask extends RecursiveTask<Long> {
private int[] numbers;
private int start;
private int end;
private SumTask(){}
public SumTask(int[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// 当需要计算的数字小于 10万 时,直接计算结果
if (end - start < 1000000) {
long total = 0;
// 此句代码只是为了延长程序运行时间,和程序逻辑无关
List<SumTask> tasks = new ArrayList<SumTask>();
for (int i = start; i <= end; i++) {
total += numbers[i];
// 此句代码只是为了延长程序运行时间,和程序逻辑无关
tasks.add(new SumTask());
}
return total;
} else { // 否则,把任务一分为二,递归计算
int middle = (start + end) / 2;
SumTask taskLeft = new SumTask(numbers, start, middle);
SumTask taskRight = new SumTask(numbers, middle + 1, end);
taskLeft.fork();
taskRight.fork();
return taskLeft.join() + taskRight.join();
}
}
}
}
RecursiveTask
изfork
Методы иThread
изstart
Метод аналогичен. Профессиональный термин для этой «задачи воровства» называетсяалгоритм кражи работы, используя JDK7ForkJoinPool
Этого можно достичь. До JDK7,LinkedBlockingDeque
также используетсяалгоритм кражи работы.
контрольная работа
Ниже приведен код тестового класса
package mutilThread;
import mutilthread.CalcData;
import mutilthread.MutilThreadOfForkJoinPool;
import mutilthread.MutilThreadOfThreadPoolExecutor;
import mutilthread.SingleThread;
import org.junit.Test;
/**
* 线程测试类
* @Author: Rebecca
* @Description:
* @Date: Created in 2019/6/18 10:40
* @Modified By:
*/
public class ThreadTest {
@Test
public void testThread() throws Exception {
int[] data = CalcData.getCalcData();
// 单线程测试
SingleThread singleThread = new SingleThread();
long startTime = System.currentTimeMillis();
System.out.println("数组的和: " + singleThread.sumUp(data));
System.out.println("单线程耗时: " + (System.currentTimeMillis() - startTime) + " ms");
// 多线程(ThreadPoolExecutor)测试
MutilThreadOfThreadPoolExecutor threadPool = new MutilThreadOfThreadPoolExecutor();
startTime = System.currentTimeMillis();
System.out.println("数组的和: " + threadPool.sumUp(data));
System.out.println("多线程(ThreadPoolExecutor)耗时: " + (System.currentTimeMillis() - startTime) + " ms");
// 多线程(ForkJoinPool)测试
MutilThreadOfForkJoinPool forkJoinPool = new MutilThreadOfForkJoinPool();
startTime = System.currentTimeMillis();
System.out.println("数组的和: " + forkJoinPool.sumUp(data));
System.out.println("多线程(ForkJoinPool)耗时: " + (System.currentTimeMillis() - startTime) + " ms");
}
}
Результат работы программы:
数组的和: 499913683383
单线程耗时: 3307 ms
数组的和: 499913683383
多线程(ThreadPoolExecutor)耗时: 197 ms
数组的和: 499913683383
多线程(ForkJoinPool)耗时: 169 ms
Располагаются в таблице следующим образом:
тип резьбы | Время (мс) |
---|---|
один поток | 3307 |
Многопоточность (ThreadPoolExecutor) | 197 |
Многопоточность (ForkJoinPool) | 169 |
Суммировать
- Как правило, когда мы используем многопоточность, мы будем использовать
ExecuterService
, для строительстваnew ThreadPoolExecutor()
, вообще не используетсяExecutors
Предоставляет метод создания пулов потоков, чтобы избежать OOM; - Пулы потоков лучше управляются, чем группы потоков (не упоминаются в этой статье);
- Доступно после JDK7
ForkJoinPool
, относительноExecuterService
Более быстрое исполнение. - Связь между потоками имеет свою цену.
Если вы будете внимательны, то обнаружите, что в приведенном выше примере кода есть две избыточные строки кода:
// 此句代码只是为了延长程序运行时间,和程序逻辑无关
List<SumTask> tasks = new ArrayList<SumTask>();
// 此句代码只是为了延长程序运行时间,和程序逻辑无关
tasks.add(new SumTask());
Если вы не добавляете дополнительный код для создания объектов, а просто накапливаете сумму массива, вы обнаружите, что однопоточное выполнение более эффективно. Поэтому при реальном использовании необходимо сравнивать реальную бизнес-логику и выбирать подходящий метод. Если бизнес-логика проста, а обработка программы быстрая, многопоточность вообще не нужна.
существуетForkJoinPool
, установленный размер массива равен 100 000, причина, по которой установлено это число, заключается в том, чтобы следоватьExecutorService
способ сравнить, если вForkJoinPool
Если длина установленного массива слишком мала, производительность будет ниже.ExecutorService
Случай.
Класс, используемый в программе для генерации расчетных данных.
package mutilthread;
import java.util.Random;
/**
* 生成计算数据的类
* @Author: Rebecca
* @Description:
* @Date: Created in 2019/6/18 10:25
* @Modified By:
*/
public class CalcData {
// 长度为1000万
private static int calcDataLength = 10000000;
public static int[] getCalcData() {
Random random = new Random();
int[] calcData = new int[calcDataLength];
for (int i = 0; i < calcDataLength; i++) {
// 0~10的随机数 生成[m,n]范围内指定的随机数: rand.nextInt(n -m + 1) +m;
calcData[i] = random.nextInt(100001);
}
return calcData;
}
}
Ссылка на ссылку
Заметки по параллельному программированию на Java: как использовать ForkJoinPool и принципы
Серия Java Concurrency Thread Pool (2) Использование ThreadPoolExecutor для создания пула потоков