В первый день работы, что за опыт босс кидает файлы 30G...

Java база данных

Если вам дано 100 миллионов строк данныхОчень большой файл, позвольте вам импортировать данные в производственную базу данных, как бы вы это сделали?

Вышеупомянутая проблема является полученным реальным бизнес-требованием по переносу исторических данных старой системы в новую производственную систему через автономные файлы.

Из-за сжатых сроков и огромного количества данных в процессе проектирования было продумано решение:

  • разделить файл
  • Многопоточный импорт

разделить файл

Сначала мы можем написать небольшую программу или использовать команду splitsplitРазделите этот огромный файл на более мелкие файлы.

-- 将一个大文件拆分成若干个小文件,每个文件 100000 行
split -l 100000 largeFile.txt -d -a 4 smallFile_

Причина, по которой мы решили сначала разделить большие файлы, в основном состоит из двух причин:

Во-первых, если программа напрямую читает этот большой файл, предположим, что программа внезапно остановится, когда она будет прочитана наполовину, что приведет к прямой потере прогресса чтения файла и необходимости начать чтение снова.

После разделения файла, как только небольшой файл будет прочитан, мы можем переместить его в указанную папку.

Таким образом, даже если приложение выйдет из строя и перезапустится, при повторном чтении нам нужно будет прочитать только оставшиеся файлы.

Во-вторых, файл может быть прочитан только одним приложением, что ограничивает скорость импорта.

После разделения файла мы можем использовать многоузловое развертывание для горизонтального масштабирования. Каждый узел считывает часть файла, что ускоряет импорт в геометрической прогрессии.

Многопоточный импорт

Когда мы разделяем файл, нам нужно прочитать содержимое файла и импортировать его.

При предварительном разделении установите каждый небольшой файл так, чтобы он содержал 10 w строк данных. Из-за беспокойства о том, что данные 10 Вт будут считаны в приложение сразу, использование памяти кучи будет слишком высоким, что приведет к частым"Полный ГК", поэтому в следующем используется способ потокового чтения, чтения данных построчно.

Конечно, если после разбиения файл стал маленьким или настройки кучи памяти приложения велики, мы можем напрямую загрузить файл в память приложения для обработки. Это относительно просто.

Код для чтения построчно выглядит следующим образом:

File file = ...
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
    while (iterator.hasNext()) {
        String line=iterator.nextLine();
        convertToDB(line);
    }

}

Используйте приведенный выше кодcommons-ioсерединаLineIteratorкласс, который используется внизу классаBufferedReaderПрочитать содержимое файла. Он инкапсулирует его в режим итератора, чтобы мы могли легко выполнять итерацию и чтение.

Если вы в настоящее время используете JDK1.8, то описанная выше операция проще, мы можем напрямую использовать собственные классы JDK.Filesпреобразовать файл вStreamспособ чтения, код выглядит следующим образом:

Files.lines(Paths.get("文件路径"), Charset.defaultCharset()).forEach(line -> {
    convertToDB(line);
});

На самом деле, смотрите внимательноFiles#linesБазовый исходный код, по сути, принцип тот же, что и вышеLineIteratorТочно так же он также инкапсулирован в шаблон итератора.

Проблемы с внедрением многопоточности

Приведенный выше код чтения несложно написать, но есть проблема с эффективностью, главным образом потому, что импортируется только один поток, а следующая строка может быть обработана после того, как будет импортирована предыдущая строка данных.

Чтобы ускорить импорт, у нас будет еще несколько потоков для одновременного импорта.

Для многопоточности мы, естественно, будем использовать метод пула потоков.Соответствующие изменения в коде следующие:

File file = ...;
ExecutorService executorService = new ThreadPoolExecutor(
        5,
        10,
        60,
        TimeUnit.MINUTES,
     // 文件数量,假设文件包含 10W 行
        new ArrayBlockingQueue<>(10*10000),
      // guava 提供
        new ThreadFactoryBuilder().setNameFormat("test-%d").build());
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
    while (iterator.hasNext()) {
        String line = iterator.nextLine();
        executorService.submit(() -> {
            convertToDB(line);
        });
    }

}

В приведенном выше коде каждый раз, когда считывается строка, она будет напрямую передана в пул потоков для выполнения.

Мы знаем, что принцип пула потоков выглядит следующим образом:

  1. Если количество основных потоков не заполнено, потоки будут созданы непосредственно для выполнения задач.
  2. Если количество основных потоков заполнено, задача будет помещена в очередь.
  3. Если очередь заполнена, для выполнения задачи будет создан другой поток.
  4. Если максимальное количество потоков заполнено и очередь заполнена, будет выполнена политика отклонения.

Блок-схема выполнения пула потоков

Поскольку количество основных потоков, заданное в указанном выше пуле потоков, равно 5, максимальное количество основных потоков достигается быстро, и последующие задачи можно только добавлять в очередь.

Чтобы предотвратить отклонение последующих задач пулом потоков, мы можем принять следующие решения:

  • Установите большой размер очереди, включая все строки во всем файле.
  • Установите максимальное количество потоков большим, число больше, чем количество строк во всем файле

Приведенные выше два решения имеют одинаковую проблему, первое эквивалентно загрузке всего содержимого файла в память, что займет слишком много памяти.

Второй тип создает слишком много потоков, что также занимает слишком много памяти.

Когда память занята слишком много и сборщик мусора не может ее очистить, это может привести к частым"Полный ГК", даже приводит к"ООМ", что приводит к слишком медленному импорту программы.

Конечно, мы также можем использовать третье решение, объединяющее первые два, чтобы установить соответствующую длину очереди и соответствующее максимальное количество потоков. Тем не менее, степень **"подходящего"** действительно трудно уловить, и есть еще некоторые"ООМ"проблема.

Итак, для решения этой проблемы было разработано два решения:

  • CountDownLatchпакетное выполнение
  • Расширить пул потоков

Пакетное выполнение CountDownLatch

предоставлено JDKCountDownLatch, вы можете позволить основному потоку дождаться завершения выполнения подпотоков, прежде чем продолжить выполнение.

Используя эту функцию, мы можем преобразовать код, импортированный с помощью многопоточности.Основная логика выглядит следующим образом:

try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
    // 存储每个任务执行的行数
    List<String> lines = Lists.newArrayList();
    // 存储异步任务
    List<ConvertTask> tasks = Lists.newArrayList();
    while (iterator.hasNext()) {
        String line = iterator.nextLine();
        lines.add(line);
        // 设置每个线程执行的行数
        if (lines.size() == 1000) {
            // 新建异步任务,注意这里需要创建一个 List
            tasks.add(new ConvertTask(Lists.newArrayList(lines)));
            lines.clear();
        }
        if (tasks.size() == 10) {
            asyncBatchExecuteTask(tasks);
        }

    }
    // 文件读取结束,但是可能还存在未被内容
    tasks.add(new ConvertTask(Lists.newArrayList(lines)));
    // 最后再执行一次
    asyncBatchExecuteTask(tasks);
}

В этом коде каждая асинхронная задача будет импортировать 1000 строк данных, а когда накопится 10 асинхронных задач, она вызоветasyncBatchExecuteTaskАсинхронное выполнение с использованием пула потоков.

/**
 * 批量执行任务
 *
 * @param tasks
 */
private static void asyncBatchExecuteTask(List<ConvertTask> tasks) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
    for (ConvertTask task : tasks) {
        task.setCountDownLatch(countDownLatch);
        executorService.submit(task);
    }
    // 主线程等待异步线程 countDownLatch 执行结束
    countDownLatch.await();
    // 清空,重新添加任务
    tasks.clear();
}

asyncBatchExecuteTaskметод создастCountDownLatch, а затем вызвать в основном потокеawaitМетод ожидает завершения выполнения всех асинхронных потоков.

ConvertTaskЛогика асинхронной задачи следующая:

/**
 * 异步任务
 * 等数据导入完成之后,一定要调用 countDownLatch.countDown()
 * 不然,这个主线程将会被阻塞,
 */
private static class ConvertTask implements Runnable {

    private CountDownLatch countDownLatch;

    private List<String> lines;

    public ConvertTask(List<String> lines) {
        this.lines = lines;
    }

    public void setCountDownLatch(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        try {
            for (String line : lines) {
                convertToDB(line);
            }
        } finally {
            countDownLatch.countDown();
        }
    }
}

ConvertTaskЛогика класса задачи очень проста, перебрать все строки и импортировать их в базу данных. После завершения импорта всех данных вызовитеcountDownLatch#countDown.

Когда все асинхронные потоки закончат выполнение, вызовитеcountDownLatch#countDown, основной поток проснется и продолжит чтение файла.

Хотя этот метод решает вышеуказанные проблемы, в этом методе необходимо каждый раз накапливать определенное количество задач, прежде чем начать выполнять все задачи асинхронно.

Кроме того, вам нужно дождаться завершения всех задач, прежде чем запускать следующую партию задач.Время, затраченное на выполнение пакета, равно времени, затрачиваемому самой медленной асинхронной задачей.

Таким образом, потоки в пуле потоков имеют определенное время простоя.Есть ли способ продолжать сжимать пул потоков и поддерживать его работу?

Расширить пул потоков

Возвращаясь к первому вопросу, чтение и импорт файлов на самом деле является моделью потребления «производитель-потребитель».

Основной поток как производитель постоянно читает файл и ставит его в очередь.

Как потребители, асинхронные потоки непрерывно считывают содержимое из очереди и импортируют его в базу данных.

«После того, как очередь заполнена, производитель должен заблокироваться, пока потребитель не примет задачу».

Фактически используемый нами пул потоков также является моделью потребления «производитель-потребитель», в которой также используются блокирующие очереди.

Тогда почему пул потоков не блокируется, когда очередь заполнена?

Это связано с тем, что пул потоков внутренне используетofferметод, этот метод «не будет блокироваться», когда очередь заполнена, а возвращается напрямую.

Итак, есть ли способ заблокировать основной поток для добавления задач, когда очередь пула потоков заполнена?

На самом деле это возможно, мы настраиваем политику отклонения пула потоков и вызываем ее при заполнении очереди.BlockingQueue.putдобиться блокировки производителя.

RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (!executor.isShutdown()) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                // should not be interrupted
            }
        }

    }
};

Таким образом, как только пул потоков будет заполнен, основной поток будет заблокирован.

После использования этого метода мы можем напрямую использовать код, импортированный упомянутой выше многопоточностью.

ExecutorService executorService = new ThreadPoolExecutor(
        5,
        10,
        60,
        TimeUnit.MINUTES,
        new ArrayBlockingQueue<>(100),
        new ThreadFactoryBuilder().setNameFormat("test-%d").build(),
        (r, executor) -> {
            if (!executor.isShutdown()) {
                try {
                   // 主线程将会被阻塞
                    executor.getQueue().put(r);
                } catch (InterruptedException e) {
                    // should not be interrupted
                }
            }

        });
File file = new File("文件路径");

try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
    while (iterator.hasNext()) {
        String line = iterator.nextLine();
        executorService.submit(() -> convertToDB(line));
    }
}    

Резюме

Для очень большого файла мы можем разделить файл на несколько файлов, а затем развернуть несколько приложений для повышения скорости чтения.

Кроме того, в процессе чтения мы также можем использовать многопоточность для параллельного импорта, но нам нужно обратить внимание на то, что после заполнения пула потоков последующие задачи будут отклонены.

Мы можем заблокировать основной поток чтения, расширив пул потоков и настроив политику отклонения.

Ну, это все для сегодняшней статьи. Я не знаю, есть ли у вас другие лучшие решения. Добро пожаловать, чтобы оставить сообщение для обсуждения.