Параллельный поток java8 очень удобен для разработчиков, и его очень легко реализовать для параллельных вычислений. Нижний уровень параллельного потока использует ForkJoinTask для реализации параллельной обработки потока, в полной мере используя многоядерные возможности ЦП. API потока полностью защищает сложную реализацию базового уровня. Разработчикам нужно вызвать только один метод для реализации параллельных вычислений.Это так просто.
Открытый параллельный поток
Чтобы запустить параллельный поток, просто позвоните перед вызовом оператора терминацииparallel()
метод, обеспечивающий параллельные вычисления.
@Test
public void parallelStream(){
IntStream.range(0,100)
.parallel()
.forEach(e-> System.out.println(Thread.currentThread()+" "+e));
}
Выполните приведенный выше код, видно, что код будет выполняться потоком в ForkJoinPool.
Мелкий измельчитель
Здесь сумма от 0 до 100000000000 вычисляется с использованием параллельного потока и обхода потока соответственно.
@Test
public void testSum(){
//并行计算
long time= System.currentTimeMillis();
long sum1 = LongStream.rangeClosed(1,100000000000l).parallel().sum();
System.out.println(System.currentTimeMillis()-time);
//串行计算
time = System.currentTimeMillis();
long sum2 = LongStream.rangeClosed(1,100000000000l).sum();
System.out.println(System.currentTimeMillis()-time);
System.out.println("sum1 = "+sum1+" sum2 = "+sum2);
Assert.assertTrue(sum1==sum2);
}
Результат выполнения показан на рисунке.Мой процессор 6-ядерный, и время последовательных вычислений почти в 5 раз больше, чем параллельных вычислений, параллельные вычисления в полной мере используют многоядерную производительность процессора.Если объем данных больше, разрыв между ними будет еще больше.Преимущества больших параллельных потоков более очевидны.
sequential()
а такжеparallel()
Все потоки, созданные по умолчанию, являются последовательными, параллельный преобразовывает последовательные потоки в параллельные потоки, два являются взаимоисключающими и конфликтующими, а последовательный преобразовывает параллельные потоки в последовательные потоки, но это не означает, что они не могут появляться одновременно.
isParallel()
Может определить, является ли поток параллельным потоком.
@Test
public void isParallel(){
IntStream stream = IntStream.range(0,100);
stream.parallel();
Assert.assertTrue(stream.isParallel());
}
Как показано в коде, выполняется преобразование 'serial->parallel->serial->parallel', и конечный поток является параллельным.
@Test
public void spspIsP(){
IntStream stream = IntStream.range(0,100);
stream.parallel().map(e->e<<1).sequential().parallel();
Assert.assertTrue(stream.isParallel());
}
Как показано в коде, выполняется преобразование 'serial->parallel->serial', и конечный поток является последовательным.
@Test
public void spsIsS(){
IntStream stream = IntStream.range(0,100);
stream.parallel().map(e->e<<1).sequential();
Assert.assertFalse(stream.isParallel());
}
Приведенный выше пример показывает, что сериализация и параллелизм потока зависят от того, какой метод был вызван последним (sequential() или parallel()).
ForkJoinPool
Нижний уровень параллельного потока зависит от пула потоков ForkJoinPool.commonPool, который является потоком, совместно используемым процессом jvm во всем мире.Когда в этом пуле потоков выполняются трудоемкие операции, последующие задачи будут накапливаться, вызывая проблемы с производительностью; число логических ядер -1. Конечно, вы также можете изменить размер пула потоков с помощью параметра jvm 'java.util.concurrent.ForkJoinPool.common.parallelism'. Этот метод здесь не рекомендуется. Лучший способ - используйте пользовательский пул потоков.
Как показано в коде, создайте пул потоков ForkJoinPool с 4 потоками.
@Test
public void CustomPool(){
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
forkJoinPool.submit(()->{
IntStream.range(0,100).parallel().forEach(e-> System.out.println(Thread.currentThread()+" "+e));
}).join();
}
Результат выполнения кода показан на рисунке, параллельный поток выполняется в пользовательском пуле потоков.
Так почему же forkJoinPool.submit() может реализовать собственный пул потоков? Глядя на исходный код, известно, что ForkJoinWorkerThread будет содержать внутри ссылку на ForkJoinPool.При выполнении кода в конечном итоге будет вызван метод doInvoke.Через Thread.currentThread можно узнать, что это поток типа ForkJoinWorkerThread, а затем получить ForkJoinPool и, наконец, использовать этот пул для выполнения вычислений.
/**
* Implementation for invoke, quietlyInvoke.
*
* @return status upon completion
*/
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}
Суммировать
Parallel Stream очень удобен для разработчиков. Одна строка кода может обеспечить многопоточные параллельные вычисления, отлично! Однако параллельные вычисления можно использовать только при определенных условиях, таких как отсутствие зависимостей между задачами, отсутствие условий гонки и т. д.