Использование многопоточности в общей структуре сканера Java

Java рептилия

spider.jpg

Введение

NetDiscoveryЭто общий фреймворк сканера, разработанный мной на основе Vert.x, RxJava 2 и других фреймворков. Он содержит богатыехарактеристика.

2. Использование многопоточности

NetDiscoveryХотя RxJava 2 используется для реализации переключения потоков, все еще существует множество сценариев, в которых используется многопоточность. В этой статье перечислены некоторые распространенные многопоточные сценарии использования фреймворков искателей.

2.1 Пауза и возобновление работы сканера

Пауза и возобновление — наиболее распространенный сценарий использования сканера, который реализован здесь с помощью класса CountDownLatch.

CountDownLatch — это класс синхронных инструментов, который позволяет одному или нескольким потокам ожидать выполнения операции других потоков после выполнения.

Метод pause инициализирует класс CountDownLatch pauseCountDown и устанавливает его счетчик равным 1.

Метод возобновления выполняет функцию countDown() функции pauseCountDown, как только ее счетчик достигает нуля.

    /**
     * 爬虫暂停,当前正在抓取的请求会继续抓取完成,之后的请求会等到resume的调用才继续抓取
     */
    public void pause() {
        this.pauseCountDown = new CountDownLatch(1);
        this.pause = true;
        stat.compareAndSet(SPIDER_STATUS_RUNNING, SPIDER_STATUS_PAUSE);
    }

    /**
     * 爬虫重新开始
     */
    public void resume() {

        if (stat.get() == SPIDER_STATUS_PAUSE
                && this.pauseCountDown!=null) {

            this.pauseCountDown.countDown();
            this.pause = false;
            stat.compareAndSet(SPIDER_STATUS_PAUSE, SPIDER_STATUS_RUNNING);
        }
    }

Когда запрос сканера берется из очереди сообщений, он сначала решает, нужно ли приостановить работу сканера, и если его необходимо приостановить, будет выполнен вызов await() для pauseCountDown. await() будет блокировать поток, то есть приостанавливать работу сканера до тех пор, пока счетчик CountDownLatch не станет равным 0, после чего поиск можно возобновить.

        while (getSpiderStatus() != SPIDER_STATUS_STOPPED) {

            //暂停抓取
            if (pause && pauseCountDown!=null) {
                try {
                    this.pauseCountDown.await();
                } catch (InterruptedException e) {
                    log.error("can't pause : ", e);
                }

                initialDelay();
            }
            // 从消息队列中取出request
           final Request request = queue.poll(name);
           ......
      }

2.2 Скорость обхода мультиширотного управления

На следующей диаграмме показан поток одного искателя.

basic_principle.png

Если скорость сканирования сканера слишком высока, он будет распознан системой другой стороны.NetDiscoveryБазовая защита от ползания может быть достигнута путем ограничения скорости.

существуетNetDiscoveryВнутренне поддерживает несколько широт для реализации ограничения скорости сканера. Эти широты также в основном соответствуют процессу одиночного обходчика.

2.2.1 Request

Во-первых, запрос, который инкапсулирует сканер, поддерживает приостановку. После того, как Запрос будет взят из очереди сообщений, будет проверено, нужно ли его приостановить.

        while (getSpiderStatus() != SPIDER_STATUS_STOPPED) {

            //暂停抓取
            ......

            // 从消息队列中取出request
            final Request request = queue.poll(name);

            if (request == null) {

                waitNewRequest();
            } else {

                if (request.getSleepTime() > 0) {

                    try {
                        Thread.sleep(request.getSleepTime());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                ......
            }
        }

2.2.2 Download

Когда сканер выполняет загрузку, загрузчик создает объект RxJava Maybe. Ограничение скорости загрузки реализовано с помощью RxJava's compose и Transformer.

Следующий код показывает DownloaderDelayTransformer:

import cn.netdiscovery.core.domain.Request;
import io.reactivex.Maybe;
import io.reactivex.MaybeSource;
import io.reactivex.MaybeTransformer;

import java.util.concurrent.TimeUnit;

/**
 * Created by tony on 2019-04-26.
 */
public class DownloaderDelayTransformer implements MaybeTransformer {

    private Request request;

    public DownloaderDelayTransformer(Request request) {
        this.request = request;
    }

    @Override
    public MaybeSource apply(Maybe upstream) {

        return request.getDownloadDelay() > 0 ? upstream.delay(request.getDownloadDelay(), TimeUnit.MILLISECONDS) : upstream;
    }
}

Загрузчик может реализовать ограничение скорости загрузки, если он использует compose и DownloaderDelayTransformer.

Возьмите UrlConnectionDownloader в качестве примера:

        Maybe.create(new MaybeOnSubscribe<InputStream>() {

                @Override
                public void subscribe(MaybeEmitter<InputStream> emitter) throws Exception {

                    emitter.onSuccess(httpUrlConnection.getInputStream());
                }
            })
             .compose(new DownloaderDelayTransformer(request))
             .map(new Function<InputStream, Response>() {

                @Override
                public Response apply(InputStream inputStream) throws Exception {

                    ......
                    return response;
                }
            });

2.2.3 Domain

Ограничение скорости домена относится к реализации платформы Scrapy и сохраняет каждое доменное имя и соответствующее ему недавнее время доступа в ConcurrentHashMap. Каждый раз, когда делается запрос, атрибут domainDelay запроса может быть установлен, чтобы реализовать ограничение скорости одного запроса для определенного домена.

import cn.netdiscovery.core.domain.Request;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Created by tony on 2019-05-06.
 */
public class Throttle {

    private Map<String,Long> domains = new ConcurrentHashMap<String,Long>();

    private static class Holder {
        private static final Throttle instance = new Throttle();
    }

    private Throttle() {
    }

    public static final Throttle getInsatance() {
        return Throttle.Holder.instance;
    }

    public void wait(Request request) {

        String domain = request.getUrlParser().getHost();
        Long lastAccessed = domains.get(domain);

        if (lastAccessed!=null && lastAccessed>0) {
            long sleepSecs = request.getDomainDelay() - (System.currentTimeMillis() - lastAccessed);
            if (sleepSecs > 0) {
                try {
                    Thread.sleep(sleepSecs);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        domains.put(domain,System.currentTimeMillis());
    }
}

Когда Запрос удаляется из очереди сообщений, он сначала решает, нужно ли приостановить Запрос, а затем решает, нужно ли приостанавливать доступ к Домену.

        while (getSpiderStatus() != SPIDER_STATUS_STOPPED) {

            //暂停抓取
            ......

            // 从消息队列中取出request
            final Request request = queue.poll(name);

            if (request == null) {

                waitNewRequest();
            } else {

                if (request.getSleepTime() > 0) {

                    try {
                        Thread.sleep(request.getSleepTime());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                Throttle.getInsatance().wait(request);
 
                ......
            }
        }

2.2.4 Pipeline

Процесс обработки запроса поисковым роботом примерно таков: вызов сетевого запроса (включая механизм повтора) -> сохранение ответа на страницу -> анализ страницы -> последовательное выполнение конвейеров -> выполнение запроса запроса.

                // request正在处理
                downloader.download(request)
                        .retryWhen(new RetryWithDelay(maxRetries, retryDelayMillis, request)) // 对网络请求的重试机制
                        .map(new Function<Response, Page>() {

                            @Override
                            public Page apply(Response response) throws Exception {
                                // 将 response 存放到 page
                                ......                            
                                return page;
                            }
                        })
                        .map(new Function<Page, Page>() {

                            @Override
                            public Page apply(Page page) throws Exception {

                                if (parser != null) {

                                    parser.process(page);
                                }

                                return page;
                            }
                        })
                        .map(new Function<Page, Page>() {

                            @Override
                            public Page apply(Page page) throws Exception {

                                if (!page.getResultItems().isSkip() && Preconditions.isNotBlank(pipelines)) {

                                    pipelines.stream()
                                            .forEach(pipeline -> {
                                                pipeline.process(page.getResultItems());
                                            });
                                }

                                return page;
                            }
                        })
                        .observeOn(Schedulers.io())
                        .subscribe(new Consumer<Page>() {

                            @Override
                            public void accept(Page page) throws Exception {

                                log.info(page.getUrl());

                                if (request.getAfterRequest() != null) {

                                    request.getAfterRequest().process(page);
                                }

                                signalNewRequest();
                            }
                        }, new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception {

                                log.error(throwable.getMessage(), throwable);
                            }
                        });

Суть ограничения скорости Pipeline реализована с помощью операторов задержки и блокировки RxJava.

map(new Function<Page, Page>() {

        @Override
        public Page apply(Page page) throws Exception {

               if (!page.getResultItems().isSkip() && Preconditions.isNotBlank(pipelines)) {

                   pipelines.stream()
                          .forEach(pipeline -> {

                                if (pipeline.getPipelineDelay()>0) {

                                        // Pipeline Delay
                                        Observable.just("pipeline delay").delay(pipeline.getPipelineDelay(),TimeUnit.MILLISECONDS).blockingFirst();
                                 }

                                pipeline.process(page.getResultItems());
                          });
               }

                return page;
       }
})

Кроме того,NetDiscoveryПоддерживает настройку сканера путем настройки файла application.yaml или application.properties. Конечно, он также поддерживает настройку параметров ограничения скорости и поддерживает использование случайных значений для настройки соответствующих параметров ограничения скорости.

2.3 Работа искателя без блокировки

В более ранних версиях нельзя было добавлять новые запросы после запуска сканера. Потому что сканер по умолчанию выходит из программы после обработки запроса в очереди.

С помощью Условия новая версия может добавлять Запросы в очередь сообщений, даже если сканер запущен.

Роль Condition состоит в том, чтобы иметь более точный контроль над блокировками. Он используется для замены традиционных функций ожидания () и notify () объекта для реализации взаимодействия между потоками. сотрудничество потоков.Совместная работа безопаснее и эффективнее.

ReentrantLock и Condition должны быть определены в Spider.

Затем определите методы waitNewRequest() и signalNewRequest(). Их функции заключаются в том, чтобы приостановить текущий поток сканера для ожидания нового запроса и разбудить поток сканера для обработки запроса в очереди сообщений.

    private ReentrantLock newRequestLock = new ReentrantLock();
    private Condition newRequestCondition = newRequestLock.newCondition();
  
    ......

    private void waitNewRequest() {
        newRequestLock.lock();

        try {
            newRequestCondition.await(sleepTime, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("waitNewRequest - interrupted, error {}", e);
        } finally {
            newRequestLock.unlock();
        }
    }

    public void signalNewRequest() {
        newRequestLock.lock();

        try {
            newRequestCondition.signalAll();
        } finally {
            newRequestLock.unlock();
        }
    }

Видно, что если запрос не удаляется из очереди сообщений, выполняется WaitNewRequest().

        while (getSpiderStatus() != SPIDER_STATUS_STOPPED) {

            //暂停抓取
            if (pause && pauseCountDown!=null) {
                try {
                    this.pauseCountDown.await();
                } catch (InterruptedException e) {
                    log.error("can't pause : ", e);
                }

                initialDelay();
            }

            // 从消息队列中取出request
            final Request request = queue.poll(name);

            if (request == null) {

                waitNewRequest();
            } else {
                ......
            }
     }

Затем в интерфейс Queue включается метод по умолчанию pushToRunninSpider(), который вызывает Spider.signalNewRequest() в дополнение к отправке запроса в очередь.

    /**
     * 把Request请求添加到正在运行爬虫的Queue中,无需阻塞爬虫的运行
     *
     * @param request request
     */
    default void pushToRunninSpider(Request request, Spider spider) {

        push(request);
        spider.signalNewRequest();
    }

Наконец, даже если сканер уже запущен, Запрос можно добавить в Очередь, соответствующую этому сканеру, в любое время.

        Spider spider = Spider.create(new DisruptorQueue())
                .name("tony")
                .url("http://www.163.com");

        CompletableFuture.runAsync(()->{
            spider.run();
        });

        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        spider.getQueue().pushToRunninSpider(new Request("https://www.baidu.com", "tony"),spider);

        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        spider.getQueue().pushToRunninSpider(new Request("https://www.jianshu.com", "tony"),spider);

        System.out.println("end....");

Суммировать

Адрес github фреймворка Crawler:GitHub.com/Zonda71…

В этой статье кратко изложено, как универсальные фреймы рептилий используют многопоточность в некоторых конкретных сценариях. В будущем NetDiscovery также добавит более общие функции.


Технологический стек Android и Java: еженедельно обновляйте оригинальные статьи о технологиях, публика может сканировать двумерный кодовый номер ниже и беспокоиться, с нетерпением ожидая вашего общего развития и прогресса.