1 Stream
Stream — это набор API для работы с массивами и коллекциями.
1.1 Особенности
- Не структура данных, не внутреннее хранилище.
- Индексированный доступ не поддерживается.
- Расчет задержки
- Параллелизм
- Легко генерировать данные или коллекции
- Поддерживает такие операции, как фильтрация, поиск, преобразование, суммирование и агрегирование.
1.2 Механизм работы
Поток делится на источник-источник, промежуточную операцию и операцию завершения.
Источником потока может быть массив, коллекция, метод генератора, канал ввода-вывода и т. д.
Поток может иметь ноль или более промежуточных операций, каждая промежуточная операция возвращает новый поток для использования следующей операцией, а поток имеет только одну завершающую операцию.
Поток не начинает обход, пока не встретит завершающую операцию.
1.3 Создание потока
- Передать массив, Stream.of()
- по коллекции
- Создано методом Stream.generate
- С помощью метода Stram.iterate
- Другие API
import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class CreateStream {
//通过数组,Stream.of()
static void gen1(){
String[] str = {"a","b","c"};
Stream<String> str1 = Stream.of(str);
}
//通过集合
static void gen2(){
List<String> strings = Arrays.asList("a", "b", "c");
Stream<String> stream = strings.stream();
}
//通过Stream.generate方法来创建
static void gen3(){
//这是一个无限流,通过这种方法创建在操作的时候最好加上limit进行限制
Stream<Integer> generate = Stream.generate(() -> 1);
generate.limit(10).forEach(x -> System.out.println(x));
}
//通过Stram.iterate方法
static void gen4(){
Stream<Integer> iterate = Stream.iterate(1, x -> x +1);
iterate.forEach(x -> System.out.println(x));
}
//其他API
static void gen5(){
String str = "abc";
IntStream chars = str.chars();
chars.forEach(x -> System.out.println(x));
}
}
2 Stream часто используемый API
2.1 Промежуточные операции
2.1.1 фильтр
Операция принимает предикат (функция, возвращающая логическое значение) в качестве аргумента и возвращает поток, содержащий все элементы, соответствующие предикату. Грубо говоря, это задание условия, и фильтр будет перехватывать данные в потоке по этому условию.
public static void testFilter(){
List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
//截取所有能被2整除得数据
List<Integer> collect = integers.stream().filter(i -> i % 2 == 0).collect(Collectors.toList());
System.out.println("collect = " + collect);
}
результат:
collect = [2, 4, 6, 8, 10]
2.1.2 отдельная дедупликация
Эта операция возвращает поток с разными элементами (реализованными в соответствии с методами hashCode и equals элементов, сгенерированных потоком).
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 1, 3, 3, 2, 4);
List<Integer> collect = numbers.stream().distinct().collect(Collectors.toList());
System.out.println("collect = " + collect);
}
результат:
collect = [1, 2, 3, 4]
2.1.3 упорядоченная сортировка
Чтобы отсортировать данные в потоке, вы можете отсортировать поток в естественном порядке или с использованием правил сортировки, определенных интерфейсом Comparator. Компараторы могут быть инициализированы с помощью лямбда-выражений, а также могут реверсировать отсортированный поток.
public static void main(String[] args) {
List<Integer> integers = Arrays.asList(5, 8, 2, 6, 41, 11);
//排序默认为顺序 顺序 = [2, 5, 6, 8, 11, 41]
List<Integer> sorted = integers.stream().sorted().collect(Collectors.toList());
System.out.println("顺序 = " + sorted);
//逆序 逆序 = [41, 11, 8, 6, 5, 2]
List<Integer> reverseOrder = integers.stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList());
System.out.println("逆序 = " + reverseOrder);
//也可以接收一个lambda
List<Integer> ages = integers.stream().sorted(Comparator.comparing(User::getAge)).collect(Collectors.toList());
}
2.1.4 ограничение перехвата
Этот метод возвращает поток до заданной длины.
public static void testLimit(){
List<Integer> integers = Arrays.asList(1, 2, 1, 3, 3, 2, 4);
//截取流中得前三个元素 collect = [1, 2, 1]
List<Integer> collect = integers.stream().limit(3).collect(Collectors.toList());
System.out.println("collect = " + collect);
}
2.1.5 пропустить отказаться
Этот метод возвращает поток с отброшенными первыми n элементами. Если в потоке меньше n элементов, возвращается пустой поток.
public static void testSkip(){
List<Integer> integers = Arrays.asList(1, 2, 1, 3, 3, 2, 4);
//丢掉流中得前三个元素 collect = [3, 3, 2, 4]
List<Integer> collect = integers.stream().skip(3).collect(Collectors.toList());
System.out.println("collect = " + collect);
}
2.1.6 индукция карты
Этот метод принимает функцию в качестве параметра, который применяется к каждому элементу и сопоставляет его с новым элементом. Это получение данных каждого элемента в потоке в соответствии с указанной функцией и их повторная сборка в новый элемент.
public static void main(String[] args) {
//自己建好得一个获取对象list得方法
List<Dish> dishList = Dish.getDishList();
//获取每一道菜得名称 并放到一个list中
List<String> collect = dishList.stream().map(Dish::getName).collect(Collectors.toList());
//collect = [pork, beef, chicken, french fries, rice, season fruit, pizza, prawns, salmon]
System.out.println("collect = " + collect);
}
2.1.7 Сведение плоской карты
Ключ метода позволяет заменить каждое значение в одном потоке другим, а затем связать все потоки в один поток.
Учитывая список слов ["Hello", "World"], вы хотите вернуть список ["H", "e", "l", "o", "W", "r", "d"] , вы можете подумать, что это легко, с помощью карты вы можете сопоставить каждое слово с таблицей символов, а затем вызвать его, чтобы отфильтровать повторяющиеся символы, но проблема с этим подходом заключается в том, что лямбда, переданная методу карты, возвращает String[ ] (Список строк). Таким образом, поток, возвращаемый картой, фактически имеет тип Stream
Правильный способ написать это должен состоять в том, чтобы сгладить его через flatMap и соответствующим образом обработать его.
public static void main(String[] args) {
String[] words = {"Hello", "World"};
List<String> collect = Stream.of(words). //数组转换流
map(w -> w.split("")). //去掉“”并获取到两个String[]
flatMap(Arrays::stream). //方法调用将两个String[]扁平化为一个stream
distinct(). //去重
collect(Collectors.toList());
//collect = [H, e, l, o, W, r, d]
System.out.println("collect = " + collect);
}
}
2.1.8 peek
peek предназначен для вставки действия перед возобновлением выполнения каждого элемента потока.
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(2, 3, 4, 5);
List<Integer> result =
numbers.stream()
.peek(x -> System.out.println("from stream: " + x))
.map(x -> x + 17)
.peek(x -> System.out.println("after map: " + x))
.filter(x -> x % 2 == 0)
.peek(x -> System.out.println("after filter: " + x))
.limit(3)
.peek(x -> System.out.println("after limit: " + x))
.collect(Collectors.toList());
}
результат:
from stream: 2
after map: 19
from stream: 3
after map: 20
after filter: 20
after limit: 20
from stream: 4
after map: 21
from stream: 5
after map: 22
after filter: 22
after limit: 22
2.1.9 собирать
Как видно из приведенного выше кода, collect собирает данные в финальном потоке и, наконец, генерирует список, набор или карту.
public static void main(String[] args) {
List<Dish> dishList = Dish.getDishList();
//list
List<Dish> collect = dishList.stream().limit(2).collect(Collectors.toList());
//set
Set<Dish> collect1 = dishList.stream().limit(2).collect(Collectors.toSet());
//map
Map<String, Dish.Type> collect2 = dishList.stream().limit(2).collect(Collectors.toMap(Dish::getName, Dish::getType));
}
Существует три перегрузки метода toMap, который генерирует карту, и параметры, передаваемые в него, различаются. Здесь передаются два параметра типа Function. Конечно, функций коллекционеров больше, и в следующих сборщиках будут другие подробные объяснения.
2.2 Завершение операции
- цикл forEach
- Рассчитать минимум, максимум, количество, среднее
- соответствует anyMatch, allMatch, noneMatch, findFirst, findAny
- Совокупное сокращение
- коллекционер собирать
2.3 Найдите и сопоставьте
Другая распространенная процедура обработки данных заключается в том, чтобы проверить, соответствуют ли определенные элементы в наборе данных заданному атрибуту. Stream API предоставляет такие возможности с помощью методов allMatch, anyMatch, noneMatch, findFirst и findAny.
Поиск и сопоставление являются терминальными операциями.
2.3.1 anyMatch
Метод anyMatch может ответить на вопрос «есть ли в потоке элемент, соответствующий заданному предикату». вернет логическое значение.
public class AnyMatch {
public static void main(String[] args) {
List<Dish> dish = Dish.getDish();
boolean b = dish.stream().anyMatch(Dish::isVegetarian);
System.out.println(b);
}
}
2.3.2 allMatch
Метод allMatch похож на anyMatch и проверяет, может ли заданный предикат соответствовать в потоке.
class AllMatch{
public static void main(String[] args) {
List<Dish> dish = Dish.getDish();
//是否所有菜的热量都小于1000
boolean b = dish.stream().allMatch(d -> d.getCalories() < 1000);
System.out.println(b);
}
}
2.3.3 noneMatch
Метод noneMatch гарантирует, что ни один элемент в потоке не соответствует заданному предикату.
class NoneMatch{
public static void main(String[] args) {
List<Dish> dish = Dish.getDish();
//没有任何菜的热量大于等于1000
boolean b = dish.stream().allMatch(d -> d.getCalories() >= 1000);
System.out.println(b);
}
}
Все три операции anyMatch, noneMatch и allMatch используют так называемое короткое замыкание.
2.3.4 findAny
Метод findAny вернет любой элемент в текущем потоке.
class FindAny{
public static void main(String[] args) {
List<Dish> dish = Dish.getDish();
Optional<Dish> any = dish.stream().filter(Dish::isVegetarian).findAny();
System.out.println("any = " + any);
}
}
2.3.5 findFirst
Метод findFirst находит первый нужный элемент.
class FindFirst{
public static void main(String[] args) {
List<Dish> dish = Dish.getDish();
Optional<Dish> any = dish.stream().filter(Dish::isVegetarian).findFirst();
System.out.println("any = " + any);
}
}
2.4 Уменьшение Уменьшение
Такие запросы должны многократно объединять все элементы в потоке, чтобы получить значение, например Integer . Такой запрос можно классифицировать как операцию редукции (приведение потока к значению). В терминах функционального языка программирования это называется сверткой, потому что вы можете Цкуру думает о том, чтобы несколько раз сложить длинный лист бумаги (ваш поток) в маленький квадрат, и это результат операции складывания.
2.4.1 Поэлементное суммирование
public static void main(String[] args) {
List<Integer> integers = Arrays.asList(1, 2, 3, 6, 8);
//求list中的和,以0为基数
Integer reduce = integers.stream().reduce(0, (a, b) -> a + b);
//Integer的静态方法
int sum = integers.stream().reduce(0, Integer::sum);
System.out.println("reduce = " + reduce);
}
2.4.2 Максимальные и минимальные значения
public static void main(String[] args) {
List<Integer> integers = Arrays.asList(1, 2, 3, 6, 8);
Optional<Integer> min = integers.stream().reduce(Integer::min);
System.out.println("min = " + min);
Optional<Integer> max = integers.stream().reduce(Integer::max);
System.out.println("max = " + max);
}
2.5 Коллекторы Коллекторы
2.5.1 Найти максимальное и минимальное значения в потоке minBy maxBy
public static void main(String[] args) {
List<Dish> dish = Dish.getDish();
//创建一个Comparator来进行比较 比较菜的卡路里
Comparator<Dish> dishComparator = Comparator.comparingInt(Dish::getCalories);
//maxBy选出最大值
Optional<Dish> collect = dish.stream().collect(Collectors.maxBy(dishComparator));
System.out.println("collect = " + collect);
//选出最小值
Optional<Dish> collect1 = dish.stream().collect(Collectors.minBy(dishComparator));
System.out.println("collect1 = " + collect1);
}
2.5.2 Сводка summingInt
Collectors.summingInt . Он принимает функцию, которая сопоставляет объекты с целыми числами, необходимыми для суммирования, и возвращает сборщик.
public static void main(String[] args) {
List<Dish> dish = Dish.getDish();
//计算总和
int collect = dish.stream().collect(Collectors.summingInt(Dish::getCalories));
System.out.println("collect = " + collect);
}
2.5.3 Среднее усреднениеInt
public static void main(String[] args) {
List<Dish> dish = Dish.getDish();
//计算平均数
Double collect = dish.stream().collect(Collectors.averagingInt(Dish::getCalories));
System.out.println("collect = " + collect);
}
2.5.4 Соединение строк
public static void main(String[] args) {
List<Dish> dish = Dish.getDish();
String collect = dish.stream().map(Dish::getName).collect(Collectors.joining());
System.out.println("collect = " + collect);
}
Метод joining factory имеет перегруженную версию, которая принимает разделитель между элементами, поэтому вы получаете список названий блюд, разделенных запятыми.
String collect = dish.stream().map(Dish::getName).collect(Collectors.joining(", "));
2.5.5 получить общее количество в потоке
long howManyDishes = dish.stream().collect(Collectors.counting());
2.6 Группировка
2.6.1 Группировка groupingBy
public static void main(String[] args) {
List<Dish> dish = Dish.getDish();
//groupingBy接受一个function作为参数
Map<Dish.Type, List<Dish>> collect = dish.stream().collect(Collectors.groupingBy(Dish::getType));
System.out.println("collect = " + collect);
}
Условия, которые вы хотите использовать для классификации, могут быть более сложными, чем простые средства доступа к свойствам. Например, вы можете классифицировать блюда с содержанием менее 400 калорий как «диетические», от 400 до 700 калорий — как «нормальные», а блюда с содержанием более 700 калорий — как «высококалорийные» (жирные). Поскольку автор класса Dish не записал эту операцию как метод, вы не можете использовать ссылки на методы, но вы можете написать эту логику как лямбда-выражение.
public static void main(String[] args) {
List<Dish> dishList = Dish.getDish();
Map<String, List<Dish>> collect = dishList.stream().collect(Collectors.groupingBy(dish->{
if (dish.getCalories() <= 400) {
return "DIET";
} else if (dish.getCalories() <= 700) {
return "NORMAL";
} else {
return "FAT";
}
}));
System.out.println("collect = " + collect);
}
2.6.2 Многоуровневая группировка
Чтобы реализовать многоуровневую группировку, мы можем использовать сборщик, созданный двухаргументной версией фабричного метода Collectors.groupingBy, который принимает второй аргумент коллектора типа в дополнение к обычным функциям сортировки. Затем, чтобы выполнить вторичную группировку, мы можем передать внутренний groupingBy внешнему groupingBy и определить вторичный критерий для классификации элементов в потоке.
public static void main(String[] args) {
List<Dish> dish = Dish.getDish();
Map<Dish.Type, Map<String, List<Dish>>> collect = dish.stream().collect(Collectors.groupingBy(Dish::getType, Collectors.groupingBy(d -> {
if (d.getCalories() <= 400) {
return "DIET";
} else if (d.getCalories() <= 700) {
return "NORMAL";
} else {
return "FAT";
}
})));
System.out.println("collect = " + collect);
}
2.6.3 Сбор данных по подгруппам
На предыдущей стороне мы видели, что второй сборщик groupingBy может быть передан внешнему сборщику для достижения многоуровневой группировки. Но далее второй коллектор, передаваемый первому groupingBy, может быть любого типа, не обязательно другого groupingBy.
Например, для подсчета количества блюд каждого типа в меню можно передать счетчик подсчета вторым параметром коллектора groupingBy.
Map<Dish.Type, Long> typesCount = dish.stream().collect(groupingBy(Dish::getType, counting()));
Обычная groupingBy(f) с одним аргументом (где f — категориальная функция) на самом деле является сокращением для groupingBy(f,toList()) .
Преобразуйте результат сборщика в другой тип:
Метод фабрики Collectors.collectingAndThen
3 параллельных потока
Параллельный поток — это поток, который делит содержимое на несколько фрагментов и обрабатывает каждый фрагмент отдельно в другом потоке. Это позволяет автоматически распределять нагрузку по заданной операции на все ядра многоядерного процессора, оставляя их все занятыми.
3.1 Преобразование последовательного потока в параллельный поток
Вы можете преобразовать поток в параллельный поток, чтобы предыдущий процесс сокращения функций (он же суммирование) выполнялся параллельно — вызовите параллельный метод в последовательном потоке:
public static long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel()
.reduce(0L, Long::sum);
}
Поток разделен на несколько частей внутри. Следовательно, операции индукции могут выполняться независимо и параллельно на разных блоках.Наконец, одна и та же операция индукции будет объединять результаты частичной индукции каждого подпотока для получения результатов индукции всего исходного потока.
Точно так же вы можете превратить параллельный поток в последовательный, просто вызвав для него последовательный метод.Настройте пул потоков, используемый параллельным потоком
Глядя на параллельный метод потоков, вы можете задаться вопросом, а откуда берутся потоки для параллельных потоков? Сколько их там? Как настроить этот процесс?
По умолчанию ForkJoinPool используется внутри параллельного потока (раздел 7.2 будет говорить о структуре ветвления/слияния дальше), а его количество потоков по умолчанию — это количество ваших процессоров, которое получается с помощью Runtime.getRuntime(). Available-Processors. () . Но вы можете изменить размер пула потоков через системное свойство java.util.concurrent.ForkJoinPool.common.parallelism следующим образом: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12"); Это глобальная настройка, поэтому она повлияет на все параллельные потоки в коде. И наоборот, в настоящее время невозможно указать это значение специально для параллельного потока. Как правило, размер ForkJoinPool, равный количеству процессоров, является хорошим вариантом по умолчанию, и если у вас нет очень веской причины, мы настоятельно рекомендуем вам не изменять его.
3.2 Эффективное использование параллельных потоков
Мы утверждаем, что параллельные методы суммирования должны работать лучше, чем последовательные и итерационные методы. Однако в программной инженерии угадывание определенно не лучший способ! Особенно когда речь идет об оптимизации производительности, вы всегда должны следовать трем золотым правилам: измерять, измерять и еще раз измерять.
- Если сомневаетесь, измерьте. Превратить последовательный поток в параллельный легко, но не всегда хорошо. В этом разделе мы указывали, что параллельные потоки не всегда быстрее последовательных. Кроме того, параллельные потоки иногда могут противоречить вашей интуиции, поэтому первый и самый важный совет при выборе последовательных или параллельных потоков — проверить их производительность с помощью соответствующих тестов.
- Обратите внимание на упаковку. Операции автоупаковки и распаковки могут значительно снизить производительность. Java 8 имеет потоки примитивного типа ( IntStream , LongStream , DoubleStream ), чтобы избежать этой операции, и их следует использовать, когда это возможно.
- Некоторые операции по своей природе хуже работают с параллельными потоками, чем с последовательными. В частности, операции, зависящие от порядка элементов, такие как limit и findFirst, очень затратны для выполнения в параллельных потоках. Например, findAny будет работать лучше, чем findFirst, потому что его необязательно выполнять по порядку. Вы всегда можете вызвать неупорядоченный метод, чтобы превратить упорядоченный поток в неупорядоченный поток. Что ж, вызов limit для неупорядоченного параллельного потока может быть более эффективным, чем одиночный упорядоченный поток (скажем, источником данных является List ), если вам нужно n элементов в потоке, а не конкретно первые n.
- Также учитывайте общую вычислительную стоимость конвейера операций потока. Пусть N — общее количество элементов, подлежащих обработке, а Q — приблизительная стоимость обработки элемента в конвейере, тогда N*Q — грубая качественная оценка этой стоимости. Высокое значение Q означает, что производительность с большей вероятностью будет хорошей при использовании параллельных потоков.
- Для небольших объемов данных выбор параллельных потоков почти никогда не является хорошим решением. Преимущества параллельной обработки нескольких элементов перевешиваются дополнительными накладными расходами на распараллеливание.
- Подумайте, легко ли разложить структуру данных позади потока. Например, ArrayList гораздо эффективнее разбивать, чем LinkedList, потому что первый можно разбить равномерно без обхода, а второй нужно пройти. Кроме того, потоки примитивных типов, созданные с помощью метода фабрики диапазонов, могут быть быстро декомпозированы.
- Характеристики самого потока, а также то, как промежуточные операции в конвейере изменяют поток, могут изменить производительность процесса декомпозиции. Например, поток SIZED можно разделить на две части одинакового размера, чтобы каждую часть можно было эффективно обрабатывать параллельно, но количество элементов, которые могут быть отброшены операцией фильтрации, непредсказуемо, что приводит к неизвестному размеру потока. сам поток.
- Также подумайте, велика или мала стоимость объединения шагов в терминальных операциях (например, метод объединения в Collector). Если этот шаг является дорогостоящим, то стоимость объединения частичных результатов из каждого подпотока может перевесить прирост производительности, достигаемый за счет параллельных потоков.
3.3 Структура ветвления/слияния
Целью структуры ветвления/слияния является рекурсивное разделение параллелизуемых задач на более мелкие задачи, а затем объединение результатов каждой подзадачи для получения общего результата. Это реализация интерфейса ExecutorService, который назначает подзадачи рабочим потокам в пуле потоков (называемом ForkJoinPool).
3.3.1 Использование RecursiveTask
Для отправки задач в этот пул необходимо создать подкласс RecursiveTask, где R — распараллеленная задача (начиная с и все подзадачи), или, если задача не возвращает результат, тип RecursiveAction (когда конечно, он может обновлять другие нелокальные органы).
Чтобы определить RecursiveTask, просто реализуйте его единственный абстрактный метод, calculate :
protected abstract R compute();
Этот метод также определяет логику разделения задачи на подзадачи и логику создания результата одной подзадачи, когда дальнейшее разделение невозможно или неудобно.
3.3.2 Суммирование с помощью RecursiveTask
public class ForkJoinSumCalculator
extends java.util.concurrent.RecursiveTask<Long> {
private final long[] numbers;
private final int start;
private final int end;
public static final long THRESHOLD = 10_000;
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
return computeSequentially();
}
//创建一个子任务来为数组得前一半求和
ForkJoinSumCalculator leftTask =
new ForkJoinSumCalculator(numbers, start, start + length / 2);
//利 用 另 一 个ForkJoinPool线程异步执行新创建的子任务
leftTask.fork();
//创建一个子任务来为数组得后一半求和
ForkJoinSumCalculator rightTask =
new ForkJoinSumCalculator(numbers, start + length / 2, end);
//同步执行第二个子任务,有可能进一步递归
Long rightResult = rightTask.compute();
//读取第一个任务得结构,未完成就等待
Long leftResult = leftTask.join();
return leftResult + rightResult;
}
private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
public static void main(String[] args) {
long l = ForkJoinSumCalculator.forkJoinSum(5);
System.out.println("l = " + l);
}
}