Оригинальный адрес:Java 8 Concurrency Tutorial: Atomic Variables and ConcurrentMap
AtomicInteger
java.concurrent.atomicВ пакете есть много классов для атомарных операций. В некоторых случаях атомарные операции можно использовать безsynchronizedУстраняет проблемы безопасности многопоточности при отсутствии ключевых слов и блокировок.
Внутри атомарные классы активно используютсяCAS, которые представляют собой инструкции атомарных операций, поддерживаемые большинством современных процессоров, которые обычно намного быстрее, чем синхронизация блокировок. Если вам нужно одновременно изменить переменную, использование атомарных классов чрезвычайно элегантно.
Теперь выберите атомарный классAtomicIntegerНапример
AtomicInteger atomicInt = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000)
.forEach(i -> executor.submit(atomicInt::incrementAndGet));
stop(executor);
System.out.println(atomicInt.get()); // => 1000
использоватьAtomicIntegerзаменятьIntegerПеременные можно увеличивать в потокобезопасной среде без синхронизации доступа к переменным.incrementAndGet()Метод — это атомарная операция, которую мы можем безопасно вызывать в нескольких потоках.
AtomicIntegerПоддерживает множество атомарных операций,updateAndGet()метод принимаетlambdaвыражения для выполнения любых арифметических операций над целыми числами.
AtomicInteger atomicInt = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000)
.forEach(i -> {
Runnable task = () ->
atomicInt.updateAndGet(n -> n + 2);
executor.submit(task);
});
stop(executor);
System.out.println(atomicInt.get()); // => 2000
accumulateAndGet()метод принимаетIntBinaryOperatorдругой типlambdaвыражение, мы используем этот метод для вычисления суммы 1 - 999:
AtomicInteger atomicInt = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000)
.forEach(i -> {
Runnable task = () ->
atomicInt.accumulateAndGet(i, (n, m) -> n + m);
executor.submit(task);
});
stop(executor);
System.out.println(atomicInt.get()); // => 499500
Есть и другие классы атомарных операций:AtomicBoolean AtomicLong AtomicReference
LongAdder
в видеAtomicLongзаменять,LongAdderКлассы можно использовать для непрерывного добавления значений к числам.
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000)
.forEach(i -> executor.submit(adder::increment));
stop(executor);
System.out.println(adder.sumThenReset()); // => 1000
LongAdderкласс, как и другие классы атомарных операций с целыми числами, обеспечиваетadd()а такжеincrement()метод, который также является потокобезопасным. Но его внутренний результат не является одним значением, этот класс внутренне поддерживает набор переменных, чтобы уменьшить многопоточную конкуренцию. Фактический результат можно получить, позвонивsum()а такжеsumThenReset()чтобы получить.
Этот класс имеет тенденцию превосходить другие атомарные классы, когда обновления из нескольких потоков выполняются чаще, чем чтения. Обычно используется в качестве статистических данных, например, для подсчета количества запросов к веб-серверу.LongAdderНедостатком является то, что он потребляет больше памяти, потому что в памяти хранится набор переменных.
LongAccumulator
LongAccumulatorдаLongAdderБолее общая версия . Вместо выполнения простой операции добавления классLongAccumulatorвокругLongBinaryOperatorСоздается лямбда-выражение типа, как показано в примере кода:
LongBinaryOperator op = (x, y) -> 2 * x + y;
LongAccumulator accumulator = new LongAccumulator(op, 1L);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 10)
.forEach(i -> executor.submit(() -> accumulator.accumulate(i)));
stop(executor);
System.out.println(accumulator.getThenReset()); // => 2539
мы используем функцию2 * x + yи начальное значение1СоздаватьLongAccumulator. каждый звонокaccumulate(i), текущий результат и значениеiОба передаются в качестве аргументов лямбда-выражению.
рисунокLongAdderТакой же,LongAccumulatorВнутренне поддерживает набор переменных, чтобы уменьшить конкуренцию за потоки.
ConcurrentMap
ConcurrentMapрасширенный интерфейсMapинтерфейс и определяет один из наиболее полезных параллельных типов коллекций.Java 8Функциональное программирование было введено путем добавления новых методов в этот интерфейс.
В следующем фрагменте кода демонстрируются эти новые методы:
ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");
forEach()принимает тип какBiConsumerизlambdaвыражение и воляmapизkeyа такжеvalueПередано как параметр.
map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value));
putIfAbsent()метод только в том случае, если данныйkeyХранить данные, только если они не существуютmap, этот метод иputТо же самое потокобезопасно, когда несколько потоков обращаютсяmapНе выполняйте операции синхронизации.
String value = map.putIfAbsent("c3", "p1");
System.out.println(value); // p0
getOrDefault()метод возвращает заданныйkeyизvalue, когдаkeyВозвращает заданное значение, если оно не существует.
String value = map.getOrDefault("hi", "there");
System.out.println(value); // there
replaceAll()метод принимаетBiFunctionТипlambdaвыражение и воляkeyа такжеvalueПередано как параметр для обновленияvalue.
map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value);
System.out.println(map.get("r2")); // d3
compute()Методы иreplaceAll()Метод в чем-то похож, разница в том, что у него есть еще один параметр для обновления указанногоkeyизvalue
map.compute("foo", (key, value) -> value + value);
System.out.println(map.get("foo")); // barbar
ConcurrentHashMap
Все вышеперечисленные методы являютсяConcurrentMapчастью интерфейса и, следовательно, доступным для всех реализаций этого интерфейса. Кроме того, наиболее важной реализациейConcurrentHashMapНесколько новых методов были дополнительно усовершенствованы дляMapвыполнять над ним параллельные операции.
Как и параллельные потоки, эти методыJava 8прошедшийForkJoinPool.commonPool()предоставить специальныеForkJoinPool. Пул использует предустановленный параллелизм, который зависит от количества доступных ядер. У меня есть четыре ядра ЦП на моей машине для достижения трех видов параллелизма:
System.out.println(ForkJoinPool.getCommonPoolParallelism()); // 3
установив следующиеJVMПараметры могут уменьшить или увеличить это значение:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
Мы используем тот же пример для демонстрации, но нижеConcurrentHashMaptype, чтобы можно было вызвать больше методов.
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");
Java 8Вводятся три параллельные операции:forEach, searchа такжеreduce. Каждая операция имеет четыре формы, соответственно используемыеkey, value, entriesа такжеkey-valueкак параметр.
Первым параметром всех этих методов являетсяparallelismThresholdпорог. Порог представляет собой минимальный размер коллекции при параллельном выполнении операций. Например, если пройденный порог500,а такжеmapФактический размер499, операции будут выполняться последовательно в одном потоке. В приведенном ниже примере мы используем порог для форсирования параллельных операций.
ForEach
методforEach()Можно итерировать параллельноmapпара ключ-значение.BiConsumerТипlambdaвыражение принимает текущую итерациюkeyа такжеvalue. Чтобы визуализировать параллельное выполнение, мы печатаем имя текущего потока в консоль. Помните, что в моем случае основнойForkJoinPoolИспользуйте до трех потоков.
map.forEach(1, (key, value) ->
System.out.printf("key: %s; value: %s; thread: %s\n",
key, value, Thread.currentThread().getName()));
// key: r2; value: d2; thread: main
// key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1
// key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2
// key: c3; value: p0; thread: main
Search
search()метод принимаетBiFunctionТипlambdaвыражение, оно можетmapВыполнить операцию поиска и вернуться, если текущая итерация не соответствует требуемым критериям поиска.null. пожалуйста, помните,ConcurrentHashMapявляется неупорядоченным. Функция поиска не должна зависеть от фактического порядка обработки карты. Если есть несколько совпадений, результат может быть неопределенным.
String result = map.search(1, (key, value) -> {
System.out.println(Thread.currentThread().getName());
if ("foo".equals(key)) {
return value;
}
return null;
});
System.out.println("Result: " + result);
// ForkJoinPool.commonPool-worker-2
// main
// ForkJoinPool.commonPool-worker-3
// Result: bar
Ниже находится поиск значения
String result = map.searchValues(1, value -> {
System.out.println(Thread.currentThread().getName());
if (value.length() > 3) {
return value;
}
return null;
});
System.out.println("Result: " + result);
// ForkJoinPool.commonPool-worker-2
// main
// main
// ForkJoinPool.commonPool-worker-1
// Result: solo
Reduce
reduce()метод принимает два типа какBiFunctionизlambdaвыражение. Первая функция преобразует каждую пару ключ-значение в одно значение любого типа. Вторая функция объединяет все эти преобразованные значения в один результат, где огонь игнорируетnullценность.
String result = map.reduce(1,
(key, value) -> {
System.out.println("Transform: " + Thread.currentThread().getName());
return key + "=" + value;
},
(s1, s2) -> {
System.out.println("Reduce: " + Thread.currentThread().getName());
return s1 + ", " + s2;
});
System.out.println("Result: " + result);
// Transform: ForkJoinPool.commonPool-worker-2
// Transform: main
// Transform: ForkJoinPool.commonPool-worker-3
// Reduce: ForkJoinPool.commonPool-worker-3
// Transform: main
// Reduce: main
// Reduce: main
// Result: r2=d2, c3=p0, han=solo, foo=bar