Углубленный анализ артефакта сжатия commons-compress

Java задняя часть

Введение

Apache Commons Compress — это подпроект Apache Commons, библиотеки, предназначенной для сжатия инструментов Java.я портал. Нынешняя последняя стабильная версия 1.2, Maven зависит от:

<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-compress</artifactId>
            <version>1.20</version>
        </dependency>

Одновременное уплотнение

образец кода

public static void compressFileList(String zipOutName, List<String> fileNameList) throws IOException, ExecutionException, InterruptedException {
        ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("compressFileList-pool-").build();
        ExecutorService executor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20), factory);
        ParallelScatterZipCreator parallelScatterZipCreator = new ParallelScatterZipCreator(executor);
        OutputStream outputStream = new FileOutputStream(zipOutName);
        ZipArchiveOutputStream zipArchiveOutputStream = new ZipArchiveOutputStream(outputStream);
        //将每个文件的输入流组成ZipArchiveEntry提交给parallelScatterZipcreator执行
        zipArchiveOutputStream.setEncoding("UTF-8");
        for (String fileName : fileNameList) {
            File inFile = new File(fileName);
            final InputStreamSupplier inputStreamSupplier = () -> {
                try {
                    return new FileInputStream(inFile);
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                    return new NullInputStream(0);
                }
            };
            ZipArchiveEntry zipArchiveEntry = new ZipArchiveEntry(inFile.getName());
            zipArchiveEntry.setMethod(ZipArchiveEntry.DEFLATED);
            zipArchiveEntry.setSize(inFile.length());
            zipArchiveEntry.setUnixMode(UnixStat.FILE_FLAG | 436);
            parallelScatterZipCreator.addArchiveEntry(zipArchiveEntry, inputStreamSupplier);
        }
        parallelScatterZipCreator.writeTo(zipArchiveOutputStream);
        zipArchiveOutputStream.close();
        outputStream.close();
        log.info("ParallelCompressUtil->ParallelCompressUtil-> info:{}", JSONObject.toJSONString(parallelScatterZipCreator.getStatisticsMessage()));
    }

ParallelScatterZipCreator

Этот класс является основным классом обработки параллельного сжатия, посмотрите на исходный код:

image.pngМожно видеть, что способ, которым этот класс обрабатывает параллельное сжатие нескольких файлов, заключается в использовании пула потоков для отправки потребления очереди для сжатия, а затем объединения сжатых выходных данных и записи их в выходной файл. продолжай смотреть вниз

image.pngМы видим конструктор этого класса, три параметра:
Первый - создать несколько основных потоков, а максимальное количество потоков - это текущие доступные ресурсы системы, здесь относится ко всем, поэтому не рекомендуется использовать этот тип самостоятельно созданного пула потоков, поскольку он будет занимать системы во время процесса сжатия Все ресурсы, вызывающие влияние на другие программы. И пул потоков отключится, когда будет выполнено окончательное уплотнение.
Второе - промежуточное хранилище, которое понимается как временный файл, сгенерированный во время процесса сжатия, и, наконец, удалить сжатие, здесь используется по умолчанию. Третий - уровень сжатия, который используется, является алгоритм дефлятерия под пакетом Java.util.zip, и есть:

image.pngУровень сжатия по умолчанию используется по умолчанию, и вы можете выбрать его в соответствии со своими потребностями.
наконецZipArchiveOutputStreamЭтот класс сжимает выходные данные, при необходимости, в файл или в новый поток вывода или канал.


В приведенном выше коде цикла for сжимаемые файлы по очереди собираются в ZipArchiveEntry и, наконец, добавляются в пул потоков через parallelScatterZipCreator.addArchiveEntry(ZipArchiveEntry, inputStreamSupplier) для обработки и выполнения несколькими потоками одновременно.

for (String fileName : fileNameList) {
    File inFile = new File(fileName);
    final InputStreamSupplier inputStreamSupplier = () -> {
        try {
            return new FileInputStream(inFile);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
            return new NullInputStream(0);
        }
    };
    ZipArchiveEntry zipArchiveEntry = new ZipArchiveEntry(inFile.getName());
    zipArchiveEntry.setMethod(ZipArchiveEntry.DEFLATED);
    zipArchiveEntry.setSize(inFile.length());
    zipArchiveEntry.setUnixMode(UnixStat.FILE_FLAG | 436);
    parallelScatterZipCreator.addArchiveEntry(zipArchiveEntry, inputStreamSupplier);
}

ZipArchiveEntry — это класс ZipEntry в базовом пакете java.zip с расширениями для дополнительных полей. method — это метод сжатия. Здесь ZipArchiveEntry.DEFLATED записывается по умолчанию. Size — это размер сжатого входного потока. После установки режима unix можно использовать командную строку для операции распаковки. Каждый сжимаемый файл, то есть входной поток, упаковывается в ZipArchiveEntry и добавляется в пул потоков с помощью addArchiveEntry.

 /**
     * Adds an archive entry to this archive.
     * <p>
     * This method is expected to be called from a single client thread
     * </p>
     *
     * @param zipArchiveEntry The entry to add.
     * @param source          The source input stream supplier
     */

    public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
        submitStreamAwareCallable(createCallable(zipArchiveEntry, source));
    }
    /**
    * 将zipEntry创建为callable任务
    */
    public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry,
        final InputStreamSupplier source) {
        final int method = zipArchiveEntry.getMethod();
        if (method == ZipMethod.UNKNOWN_CODE) {
            throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry);
        }
        final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source);
        return new Callable<ScatterZipOutputStream>() {
            @Override
            public ScatterZipOutputStream call() throws Exception {
                ScatterZipOutputStream scatterStream = tlScatterStreams.get();
                //将zipEntry的压缩写入提交至ScatterZipOutputStream中的队列
                scatterStream.addArchiveEntry(zipArchiveEntryRequest);
                return scatterStream;
            }
        };
    }
    
    /**
     * Submit a callable for compression.
     *
     * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
     *
     * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
     * @since 1.19
     */
    public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) {
        futures.add(es.submit(callable));
    }

Здесь вы можете видеть из исходного кода, что окончательный addArchiveEntry должен преобразовать каждую запись в вызываемую задачу и, наконец, отправить ее в пул потоков для выполнения, а возвращенное futrue добавляется в очередь в параллельном классе. Отправленная задача относится к Queue в классе ScatterZipOutputStream, и, наконец, при вызове параллельного метода writeTo все CompressedEntry очереди вынимаются и записываются в конечный результат.

/**
* 将压缩任务提交至队列中
*/
public void addArchiveEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest) throws IOException {
        try (final InputStream payloadStream = zipArchiveEntryRequest.getPayloadStream()) {
            streamCompressor.deflate(payloadStream, zipArchiveEntryRequest.getMethod());
        }
        items.add(new CompressedEntry(zipArchiveEntryRequest, streamCompressor.getCrc32(),
                                      streamCompressor.getBytesWrittenForLastEntry(), streamCompressor.getBytesRead()));
                               
    }
 /**
 * 最终将压缩内容写入输出流
 */
 public void writeTo(final ZipArchiveOutputStream target) throws IOException {
        backingStore.closeForWriting();
        try (final InputStream data = backingStore.getInputStream()) {
            for (final CompressedEntry compressedEntry : items) {
                try (final BoundedInputStream rawStream = new BoundedInputStream(data,
                        compressedEntry.compressedSize)) {
                    target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
                }
            }
        }
    }

Организовал блок-схему

image.png