ExecutorService + Callable + Future Inlessages Multi-Threaded Control Consularency и возвращает данные

Java

1. Старт

Многозадачное параллельное выполнение блокируется и ожидает завершения всех выполнений, а количество параллелизма контролируется через пул потоков. Первая половина — это код (перечисленный в соответствии с потоком выполнения), а вторая половина — соответствующие комментарии.

2. Создайте Callable с возвращаемыми данными

public Callable<String> get(String key, Map<String, Object> params) {

	return () -> {
		System.out.println("开始查询=======" + params.toString());
		Thread.sleep(500);
		long currentTimeMillis1 = System.currentTimeMillis();
		System.out.println("get任务耗时:" + (currentTimeMillis1 - currentTimeMillis) + "ms");
		return key + "哈哈哈哈哈哈";
	};
}

3. Создайте циклический вызовCallable<String>

List<Callable<String>> tasks = new ArrayList<>();
for (int i = 0; i < 30; i++) {

	// ... params

	Callable<String> callable = mSmsStatisticsService.get(i + "key", params);
	tasks.add(callable);
}

В-четвертых, создайте пул потоков

/**
 * corePoolSize:核心线程数。
 * maximumPoolSize:线程池允许创建的最大线程数。
 * keepAliveTime:非核心线程闲置的超时时间。超过这个时间则回收。
 * TimeUnit:keepAliveTime参数的时间单位。
 * workQueue:任务队列。
 * ThreadFactory:线程工厂,用于创建线程。
 * RejectedExecutionHandler:饱和策略。
 * 
 * ThreadPoolExecutor(int corePoolSize,
 *                    int maximumPoolSize,
 *                    long keepAliveTime,
 *                    TimeUnit unit,
 *                    BlockingQueue<Runnable> workQueue,
 *                    ThreadFactory threadFactory,
 *                    RejectedExecutionHandler handler)
 */
ExecutorService executorService = new ThreadPoolExecutor(5, 32,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024),
                r -> {
                    Thread thread = new Thread(r);
                    // thread.setName("sdwfqin");
                    return thread;
                }, new ThreadPoolExecutor.AbortPolicy());

Пятое, запускайте нити и получить данные возврата

List<String> resultList = new ArrayList<>();
// 启动线程并获取返回数据
List<Future<String>> futures = executorService.invokeAll(tasks);
for (Future<String> future : futures) {
	resultList.add(future.get());
}

Шестой, закрыть пул резьбы

executorService.shutdown();

7. Вышеупомянутые пояснения, связанные с API

  1. ExecutorService

    1. основной метод

      1. представить: отправленоCallableметод, возвращаетFuture, указывая, что submit имеет возвращаемое значение
      2. Выполнить: executeRunnableметод без возвращаемого значения
      3. invokeAny: получаетCallableКоллекция объектов в качестве параметра. Вызов этого метода не возвращаетFutureобъект, но возвращает одну из коллекцийCallableобъект, и нет гарантии, что результат, возвращаемый после вызова, являетсяCallable, просто знай, что это ониCallableодна из казней заканчиваетсяCallableобъект.
      4. invokeAll: вызывает все существующие в наборе параметровCallableобъект и возвращает коллекцию, содержащую объекты Future, которые можно использовать для управления каждымCallableрезультат исполнения.
      5. выключение: изящное выключениеExecutorService, когда этот метод вызывается,ExecutorServiceПрекратите получать новые задачи и дождитесь завершения уже отправленных задач (как отправленных, так и незавершенных). Когда все отправленные задачи выполнены, пул потоков закрывается.
    2. политика отказа

      1. ThreadPoolExecutor.AbortPolicy: прервать задачу и броситьRejectedExecutionExceptionаномальный.
      2. ThreadPoolExecutor.DiscardPolicy: также отбрасывает задачи, но не генерирует исключения.
      3. ThreadPoolExecutor.DiscardOldestPolicy: отменить задачу в начале очереди и выполнить следующие задачи.
      4. ThreadPoolExecutor.CallerRunsPolicy: задача обрабатывается вызывающим потоком.
    3. очередь блокировки

      1. ArrayBlockingQueue: очередь FIFO на основе массива, ограниченная
      2. LinkedBlockingQueue: очередь FIFO на основе связанного списка, неограниченная
      3. SynchronousQueue: небуферизованная очередь ожидания, неограниченная
      4. что не ограничено: если емкость не указана, по умолчаниюInteger.MAX_VALUE
    4. Общий пул потоков

      1. Executors.newcachedThreadPool (): можно кэшировать пул резьбы
      2. Executors.newSingleThreadExecutor(): пул с одним потоком
      3. Executors.newFixedThreadPool(3): пул потоков с фиксированным номером потока.
      4. Executors.newScheduledThreadPool(5): фиксированное количество потоков, поддерживает временные и периодические задачи.
      5. ThreadPoolExecutor(): создан вручную
  2. Future

    1. основной метод

      1. get: Когда задача завершается, она возвращает результат.Если работа не завершена на момент ее вызова, поток будет заблокирован до тех пор, пока задача не будет выполнена.
      2. Получить (давние тайм-аут, Timeunit): результат будет возвращен после ожидания наибольшего времени ожидания
      3. cancel(boolean mayInterruptIfRunning): может использоваться для остановки задачи, если задача может быть остановлена ​​(черезmayInterruptIfRunningЧтобы судить) можно вернуть true, если задача была завершена или остановлена, или задачу нельзя остановить, она вернет false.
      4. isDone(): определяет, завершен ли текущий метод.
      5. isCancel(): определить, отменен ли текущий метод