Существует множество сценариев параллелизма приложений, и загрузка файлов — очень распространенный сценарий параллелизма.
Зачем вам писать многопоточный загрузчик? Я не знаю, использовали ли вы IDM (Internet Download Manager), но когда я впервые использовал IDM, меня привлек его метод загрузки.
При использовании IDM для загрузки файла вы можете интуитивно увидеть процесс его загрузки: для загрузки файла используется N потоков.В начале файл делится на N сегментов, и каждый сегмент загружается потоком.Когда сегмент скачал, соответствующая ветка простаивает, что делать в это время? Возьмите самый большой сегмент из оставшихся N - 1 сегментов и разделите его на два, чтобы было N сегментов данных, и дайте бездействующим потокам загрузить только что разделенный сегмент.
Всякий раз, когда поток завершает задачу загрузки, он продолжает отделять раздел от оставшихся частей для загрузки до тех пор, пока не будут загружены все части всего файла.
Конечно, файл не может быть сегментирован бесконечно. IDM установит порог сегмента. Когда оставшийся максимальный сегмент меньше этого порога, он больше не будет сегментирован на простаивающие потоки, а будет только ждать всех активных потоков. Загрузка завершена. .
После разговора о стратегии загрузки IDM у нас есть общее представление. Тем не менее, отличное программное обеспечение не создается в одночасье.Хотя мы хотим написать загрузчик, подобный IDM, нам все же нужно начать с простой реализации и продолжать итерации и оптимизацию.
Итак, стратегия, которую мы приняли в начале, была такой: исправить N потоков, разделить файл на N сегментов, и каждый поток отвечает за загрузку сегмента данных.
Этапы реализации
Определите, поддерживает ли сервер возобновляемую загрузку
Первое, что нужно уяснить, это то, что многопоточная загрузка файла использует часть каждого потока для загрузки файла, затем HTTP-сервер должен поддерживать запрос части данных или возобновлять передачу с точки останова, поэтому первый шаг необходимо определить, поддерживает ли целевой файл точки останова. Продолжение.
Есть заголовок HTTP-запросаRange
поле, которое можно использовать для указания диапазона запрашиваемых данных, например, если мы хотим запросить данные с 10-го по 20-й байт, это поле можно записать какRange:bytes=10-20
.
Соответственно, если HTTP-сервер поддерживает докачку, то для указанногоRange
Запросы полей будут возвращать код состояния 206.
Давайте проверим это с помощью Curl:
curl -I --header "Range: bytes=0-" http://mirrors.163.com/debian/ls-lR.gz
Получен ответ:
HTTP/1.1 206 Partial Content
Server: nginx
Date: Wed, 25 Apr 2018 02:57:56 GMT
Content-Type: application/octet-stream
Content-Length: 15316619
Connection: keep-alive
Last-Modified: Mon, 23 Apr 2018 14:38:44 GMT
ETag: "5addeff4-e9b68b"
Content-Range: bytes 0-15316618/15316619
мы установилиRange: bytes=0-
, указывая на то, что запрашиваются данные с 0-го байта до последнего байта, так зачем указывать это поле? Это делается по двум причинам: во-первых, чтобы определить, равен ли код состояния ответа 206, а во-вторых, чтобы получить размер файла.
Если сервер поддерживает возобновляемую загрузку с точки останова, то используем многопоточность для закачки.
сегментация файлов
Получаем размер файла fileSize, и делим его на N сегментов, размер каждого сегмента равенfileSize / N
, так как файл обычно не делится ровно на N сегментов, последний сегмент равен размеру остальных сегментов.
Мы используем массив endPoint для хранения начальной и конечной позиции каждого сегмента.Например, файл 10 B, начальный и конечный диапазон 0 ~ 9. Если он разделен на 3 сегмента для загрузки, тоendPoint = {0, 3, 6, 10}
, для каждого сегмента есть лево-замкнутый и право-открытый интервал.
Объяснение: Для сегмента i (i начинается с 0), отendPoint[i]
начать загрузку вendPoint[i + 1] - 1
останавливаться на достигнутом. Аналогично, для отрезка i + 1 изendPoint[i + 1]
начать сendPoint[i + 2] - 1
останавливаться на достигнутом.
Создайте поток загрузки
Мы создаем поток загрузки для каждого загружаемого сегмента, и каждый сегмент сохраняется в отдельном временном файле.
То, что должен сделать поток загрузки, можно резюмировать следующим образом:
- Установите заголовок запроса
Range
поле для указания области запроса - установить тайм-аут
- Подключиться к HTTP-серверу
- Создать временный файл (загрузить сегмент в первый раз)
- Прочитайте данные, возвращенные сервером, и запишите во временный файл, пока количество прочитанных байтов не сравняется с размером сегмента.
- закрыть временные файлы
Выше приведен плавный процесс загрузки, нам также необходимо повторить попытку при возникновении следующих проблем:
- Если время подключения или время чтения истекло
- Ошибка чтения и записи временного файла
Это еще вопрос, к треду, при повторной попытке он заново качает весь сегмент, или продолжает качать остальные? Мы знаем, что лучше всего продолжить с той части, которая еще не загружена, так как же этого добиться?
Мы можем сделать так: каждый поток сохраняет начальную и конечную позицию своей ответственной части.Когда поток только запущен, начальная и конечная позиция является начальной и конечной позицией сегмента, и создается временный файл для записи загруженные данные. При загрузке данных стартовой позицией потока обновления в реальном времени является следующий байт текущего загружаемого байта, при возникновении ошибки и необходимости повторной попытки он начинается прямо с этой позиции и записывает во временный файл, созданный ранее.
Создать ветку просмотра
Мы создаем поток демона, который отвечает за мониторинг процесса загрузки, скорости загрузки и текущего количества активных потоков файла.После загрузки каждого потока он уведомляет основной поток о необходимости выполнения следующего шага.
Работа с временными файлами
Когда основной поток получает уведомление о том, что все части загружены, временный файл необходимо очистить.
Если это файл, загруженный несколькими потоками, необходимо объединить несколько временных файлов.
Если это однопоточная загрузка, временный файл переименовывается.
Выполнение
Вот схема программы и общее введение.Полный исходный код можно посмотреть на Github:GitHub.com/Я позволяю своей тете быть/просто…
public class HttpDownloader {
private boolean resumable;
private URL url;
private File localFile;
private int[] endPoint;
private Object waiting = new Object();
private AtomicInteger downloadedBytes = new AtomicInteger(0);
private AtomicInteger aliveThreads = new AtomicInteger(0);
private boolean multithreaded = true;
private int fileSize = 0;
private int THREAD_NUM = 5;
private int TIME_OUT = 5000;
private final int MIN_SIZE = 2 << 20;
public HttpDownloader(String Url, String localPath) throws MalformedURLException {...}
public HttpDownloader(String Url, String localPath,
int threadNum, int timeout) throws MalformedURLException {...}
//开始下载文件
public void get() throws IOException {...}
//检测目标文件是否支持断点续传,以决定是否开启多线程下载文件的不同部分
public boolean supportResumeDownload() throws IOException {...}
//监测下载速度及下载状态,下载完成时通知主线程
public void startDownloadMonitor() {...}
//对临时文件进行合并或重命名
public void cleanTempFile() throws IOException {...}
//合并多线程下载产生的多个临时文件
public void merge() {...}
//一个下载线程负责下载文件的某一部分,如果失败则自动重试,直到下载完成
class DownloadThread extends Thread {
private int id;
private int start;
private int end;
private OutputStream out;
public DownloadThread(int id, int start, int end) {...}
//保证文件的该部分数据下载完成
@Override
public void run() {...}
//下载文件指定范围的部分
public boolean download() {...}
}
}
Здесь DownloadThread определяется как внутренний класс HttpDownloader. Это связано с тем, что экземпляр HttpDownloader соответствует задаче загрузки файла. Экземпляр хранит различные данные задачи, а поток загрузки связан с задачей и должен использовать эти данные. . , определенные как внутренние классы, могут совместно использовать эти данные напрямую, что позволяет избежать чрезмерной передачи и хранения параметров.
Чтобы загрузить файл, сначала создайте экземпляр HttpDownloader. Обязательными параметрами являются URL-адрес целевого файла и место локального хранения. Необязательными параметрами являются количество потоков и время ожидания.
Метод входа HttpDownloader — get(), который работает следующим образом:
- Вызовите метод supportResumeDownload(), чтобы определить, поддерживает ли целевой файл возобновление с точки останова и превышает ли он установленное минимальное значение файла, чтобы решить, использовать ли многопоточный режим загрузки;
- Вычислите начальную и конечную позицию каждого сегмента и сохраните ее в endPoint;
- Создайте ветку DownloadThread для скачивания;
- Вызовите метод startDownloadMonitor(), чтобы запустить поток мониторинга;
- Дождитесь загрузки файла;
- Вызовите cleanTempFile() для обработки временных файлов;
- Вывести конечную информацию.
Давайте снова представим DownloadThread, метод входа в него — run(), который работает следующим образом:
- Вызвать метод download() для загрузки указанной части данных;
- В случае успеха поток завершается, в случае неудачи он возвращается к предыдущему шагу.
Скачать тест
Для сервера с ограничением скорости одиночного соединения многопоточная загрузка может показать свои преимущества, если сервер сам не ограничивает скорость соединения, то одиночное соединение тоже может приближаться к лимиту пропускной способности.
Давайте посмотрим на сравнение однопоточного и многопоточного, когда скорость одного соединения намного меньше, чем пропускная способность.
Первый — это отдельный поток для скачивания:
Это заняло 54,133 секунды, а средняя скорость загрузки составила 42 КБ/с.
Откройте 10 тем для скачивания:
Это заняло 10,144 секунды, а средняя скорость загрузки составила 228 КБ/с.
Видно, что по сравнению с однопоточной загрузкой скорость загрузки значительно увеличилась после включения многопоточности.
При фактическом загруженном, в зависимости от состояния сети, установите разное время времени, существует небольшое влияние на скорость загрузки. Если время ожидания слишком мал, он приведет к тому, что нить будет установить соединение, а установление соединения является относительно трудоемким операцией, что приведет к низкой эффективности загрузки; если время ожидания установлено слишком долго, он может быть недействительным, но Клиент ждет, ненужное время расходов.
Эпилог
Это реализация самого простого многопоточного загрузчика.Файл делится на фиксированные сегменты N и назначается потокам загрузки N. Когда поток загружается, поток завершается и больше не используется.
Позже я дополнительно оптимизирую программу. С одной стороны, я приму стратегию загрузки, аналогичную IDM, для дальнейшего повышения эффективности загрузки. С другой стороны, я также улучшу функциональность и надежность, чтобы улучшить обработку исключений.
Заинтересованные друзья могут изучить более эффективные методы загрузки, а если у вас есть какие-либо идеи, пожалуйста, оставьте комментарий.
Статьи по Теме
- Условия гонки, мьютексы и синхронизация в многопоточности Java
- Графический интерфейс Java: Awt/Swing реализует масштабирование и прокрутку изображений.
- Java Swing пишет программу с графическим интерфейсом для добавления, удаления, изменения и проверки базы данных.
- Общие сценарии применения лямбда-выражений Java
- Как Java использует интерфейсы, чтобы избежать обратных вызовов функций
- Глубокое понимание бинарного поиска и его крайних случаев