Научит вас создавать распределенную систему поисковых роботов на основе Java.

Java рептилия
"

Без использования краулера я попытался после многих исследований реализовать распределенную систему краулеров, которая может сохранять данные в разных местах, таких как MySQL, HBase и т. д.

Поскольку эта система разработана на основе идеи кодирования, ориентированной на интерфейс, у нее есть определенная расширяемость.Заинтересованные друзья могут понять ее дизайнерскую идею непосредственно, просмотрев код.

Хотя код по-прежнему тесно связан во многих местах, пока это требует некоторого времени и усилий, многие из них можно извлечь и настроить.

Из-за времени я написал только сканеры двух веб-сайтов Jingdong и Suning.com, но вполне возможно реализовать случайное планирование сканеров на разных сайтах.Основываясь на структуре их кода, я напишу сканеры Gome. , Tmall и т. д., что несложно.Большой, но предполагается, что это займет некоторое время и усилия.

Потому что при парсинге данных веб-страницы, например, когда я сканирую цены на продукты Suning.com, цены получаются асинхронно, а API представляет собой длинный ряд чисел, мне потребовалось несколько часов, чтобы обнаружить закономерность. , и конечно признаю, что я неопытен.

Дизайн этой системы, помимо базового сканирования данных, уделяет больше внимания следующим аспектам:

  • Как добиться распределенного?Когда одна и та же программа упаковывается и распространяется для запуска на разные узлы, это не влияет на общий обход данных.

  • Как реализовать случайное циклическое планирование URL?Суть в том, чтобы делать случайные выборки для разных доменных имен верхнего уровня.

  • Как регулярно добавлять URL-адреса торрентов в репозиторий URL-адресов?Для достижения цели не дать гусеничной системе остановиться.

  • Как отслеживать программу узла сканера и отправлять оповещения по электронной почте?

  • Как реализовать библиотеку случайных IP-прокси?Цель чем-то похожа на пункт 2, как для анти-антирептилий.

Ниже приводится общее базовое введение в эту систему. У меня есть очень подробные комментарии в коде. Заинтересованные друзья могут обратиться к коду. Наконец, я дам анализ данных моего сканера.

Также следует отметить, что данная краулерная система реализована на базе Java, но сам язык еще не самое главное, заинтересованные друзья могут попробовать реализовать ее на Python.

Архитектура распределенной краулерной системы

Общая архитектура системы выглядит следующим образом:

Как видно из приведенной выше архитектуры, вся система в основном делится на три части:

  • гусеничная система

  • Система отправки URL

  • Система охранной сигнализации

Система искателей используется для обхода данных, поскольку система предназначена для распределенного использования, поэтому сам искатель может работать на разных узлах сервера.

Ядро системы планирования URL-адресов лежит в хранилище URL-адресов.Так называемое хранилище URL-адресов фактически использует Redis для сохранения списка URL-адресов, которые необходимо сканировать, и использует URL-адреса в нашем планировщике URL-адресов в соответствии с определенной стратегией. С этой точки зрения репозиторий URL-адресов на самом деле является очередью URL-адресов.

Система мониторинга и оповещения в основном следит за узлами краулера.Хотя один из узлов краулера, запущенных параллельно, выходит из строя, это не влияет на сам общий поиск данных (только снижает скорость краулера), но мы все же надеемся, что сможем для активного получения узлов. Отключение уведомлений вместо пассивного обнаружения.

Далее будет дано некоторое базовое введение в идеи дизайна всей системы, основанные на трех вышеупомянутых аспектах и ​​объединенные с некоторыми фрагментами кода.

гусеничная система

Система краулеров — это независимый процесс.Мы упаковываем нашу систему краулеров в пакет jar и распределяем его по разным узлам для выполнения, чтобы параллельное сканирование данных могло повысить эффективность краулера. (Примечание: мониторинг ZooKeeper относится к системе мониторинга и сигнализации, а планировщик URL-адресов относится к системе планирования URL-адресов)

Случайный IP-прокси

Добавление случайного IP-прокси в основном предназначено для защиты от сканирования, поэтому, если есть библиотека IP-прокси, и вы можете случайным образом использовать разные прокси при создании http-клиентов, это очень поможет нам для защиты от сканирования.

Чтобы использовать библиотеку IP-прокси в системе, вам необходимо сначала добавить доступную информацию об адресе прокси-сервера в текстовый файл:

# IPProxyRepository.txt58.60.255.104:8118219.135.164.245:312827.44.171.27:9999219.135.164.245:312858.60.255.104:811858.252.6.165:9000......скопировать код

должны знать о том,Вышеупомянутые IP-адреса прокси-серверов - это некоторые IP-адреса прокси-серверов, которые я получил на прокси-сервере West Thorn, которые могут быть недоступны.Рекомендуется потратить деньги на покупку партии IP-адресов прокси-серверов, что может сэкономить много времени и энергии для поиска IP-адресов прокси. .

Затем в классе инструмента, который создает http-клиент, когда класс инструмента используется в первый раз, эти IP-адреса прокси-сервера загружаются в память и загружаются в HashMap в Java:

// IP地址代理库Mapprivate static Map<String, Integer> IPProxyRepository = new HashMap<>();private static String[] keysArray = null;   // keysArray是为了方便生成随机的代理对象/**     * 初次使用时使用静态代码块将IP代理库加载进set中     */static {    InputStream in = HttpUtil.class.getClassLoader().getResourceAsStream("IPProxyRepository.txt");  // 加载包含代理IP的文本    // 构建缓冲流对象    InputStreamReader isr = new InputStreamReader(in);    BufferedReader bfr = new BufferedReader(isr);    String line = null;    try {        // 循环读每一行,添加进map中        while ((line = bfr.readLine()) != null) {            String[] split = line.split(":");   // 以:作为分隔符,即文本中的数据格式应为192.168.1.1:4893            String host = split[0];            int port = Integer.valueOf(split[1]);            IPProxyRepository.put(host, port);        }        Set<String> keys = IPProxyRepository.keySet();        keysArray = keys.toArray(new String[keys.size()]);  // keysArray是为了方便生成随机的代理对象    } catch (IOException e) {        e.printStackTrace();    }}скопировать код

После этого каждый раз при сборке http клиента вы будете сначала заходить на карту чтобы посмотреть есть ли IP прокси, если есть то использовать его, если нет то не использовать прокси:

CloseableHttpClient httpClient = null;HttpHost proxy = null;if (IPProxyRepository.size() > 0) {  // 如果ip代理地址库不为空,则设置代理    proxy = getRandomProxy();    httpClient = HttpClients.custom().setProxy(proxy).build();  // 创建httpclient对象} else {    httpClient = HttpClients.custom().build();  // 创建httpclient对象}HttpGet request = new HttpGet(url); // 构建htttp get请求......скопировать код

Случайные прокси-объекты генерируются следующими методами:

/**     * 随机返回一个代理对象     *     * @return     */public static HttpHost getRandomProxy() {    // 随机获取host:port,并构建代理对象    Random random = new Random();    String host = keysArray[random.nextInt(keysArray.length)];    int port = IPProxyRepository.get(host);    HttpHost proxy = new HttpHost(host, port);  // 设置http代理    return proxy;}скопировать код

Таким образом, с помощью вышеуказанного дизайна в основном реализуется функция случайного IP-прокси.Конечно, есть еще много мест, которые можно улучшить.

Например, при использовании этого IP-прокси и сбое запроса можно ли зафиксировать эту ситуацию, при превышении определенного количества раз она будет удалена из библиотеки прокси, а также будет создан журнал для справки разработчикам или эксплуатации и обслуживающий персонал Это вполне достижимо, но я не буду делать этот шаг.

Загрузчик веб-страниц

Загрузчик веб-страницы используется для загрузки данных на веб-странице, которая в основном разработана на основе следующего интерфейса:

/** * 网页数据下载 */public interface IDownload {    /**     * 下载给定url的网页数据     * @param url     * @return     */    public Page download(String url);}скопировать код

Исходя из этого, в системе реализован только один http get загрузчик, но и он может выполнять нужные нам функции:

/** * 数据下载实现类 */public class HttpGetDownloadImpl implements IDownload {    @Override    public Page download(String url) {        Page page = new Page();        String content = HttpUtil.getHttpContent(url);  // 获取网页数据        page.setUrl(url);        page.setContent(content);        return page;    }}скопировать код

веб-парсер

Парсер веб-страницы должен анализировать интересующие нас данные на загруженной веб-странице и сохранять их в объект для дальнейшей обработки хранилищем данных для сохранения в различные постоянные хранилища.Он разработан на основе следующего интерфейса:

/** * 网页数据解析 */public interface IParser {    public void parser(Page page);}скопировать код

Парсер веб-страниц также является относительно важным компонентом в разработке всей системы.Функция не сложная, в основном потому, что кодов много.Для разных товаров в разных торговых центрах соответствующие парсеры могут быть разными.

Следовательно, необходимо разрабатывать продукты для специальных торговых центров, потому что, очевидно, шаблоны веб-сайтов, используемые JD.com, определенно отличаются от тех, которые использует Suning.com, а те, которые использует Tmall, определенно отличаются от тех, которые использует JD.com.

Таким образом, это полностью основано на ваших собственных потребностях в разработке. Это просто означает, что в процессе разработки парсера будут обнаружены некоторые повторяющиеся коды. В настоящее время вы можете абстрагировать эти коды для разработки класса инструмента.

В настоящее время данные о продуктах мобильных телефонов JD.com и Suning.com сканируются в системе, поэтому записываются эти два класса реализации:

/** * 解析京东商品的实现类 */public class JDHtmlParserImpl implements IParser {    ......}/** * 苏宁易购网页解析 */public class SNHtmlParserImpl implements IParser {    ......}скопировать код

хранилище данных

Хранилище данных в основном сохраняет объекты данных, проанализированные синтаксическим анализатором веб-страницы, в разные таблицы, а для мобильных телефонов, просканированных на этот раз, объектами данных являются следующие объекты страницы:

/** * 网页对象,主要包含网页内容和商品数据 */public class Page {    private String content;              // 网页内容    private String id;                    // 商品Id    private String source;               // 商品来源    private String brand;                // 商品品牌    private String title;                // 商品标题    private float price;                // 商品价格    private int commentCount;        // 商品评论数    private String url;                  // 商品地址    private String imgUrl;             // 商品图片地址    private String params;              // 商品规格参数    private List<String> urls = new ArrayList<>();  // 解析列表页面时用来保存解析的商品url的容器}скопировать код

Соответственно, в MySQL структура данных таблицы выглядит следующим образом:

-- ------------------------------ Table structure for phone-- ----------------------------DROP TABLE IF EXISTS `phone`;CREATE TABLE `phone` (  `id` varchar(30) CHARACTER SET armscii8 NOT NULL COMMENT '商品id',  `source` varchar(30) NOT NULL COMMENT '商品来源,如jd suning gome等',  `brand` varchar(30) DEFAULT NULL COMMENT '手机品牌',  `title` varchar(255) DEFAULT NULL COMMENT '商品页面的手机标题',  `price` float(10,2) DEFAULT NULL COMMENT '手机价格',  `comment_count` varchar(30) DEFAULT NULL COMMENT '手机评论',  `url` varchar(500) DEFAULT NULL COMMENT '手机详细信息地址',  `img_url` varchar(500) DEFAULT NULL COMMENT '图片地址',  `params` text COMMENT '手机参数,json格式存储',  PRIMARY KEY (`id`,`source`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;скопировать код

Структура таблицы в HBase выглядит следующим образом:

## cf1 存储 id source price comment brand url## cf2 存储 title params imgUrlcreate 'phone', 'cf1', 'cf2'## 在HBase shell中查看创建的表hbase(main):135:0> desc 'phone'Table phone is ENABLED                                                                                                phone                                                                                                                 COLUMN FAMILIES DESCRIPTION                                                                                           {NAME => 'cf1', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                                                                    {NAME => 'cf2', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                                                                    2 row(s) in 0.0350 secondsскопировать код

То есть в HBase устанавливаются два семейства столбцов, а именно cf1 и cf2, где cf1 используется для сохранения информации поля идентификатора исходной цены, комментария к бренду, url; cf2 используется для сохранения информации поля параметров заголовка imgUrl.

В разных хранилищах данных используются разные классы реализации, но все они разработаны на основе одного и того же интерфейса, представленного ниже:

/** * 商品数据的存储 */public interface IStore {    public void store(Page page);}скопировать код

Затем на основе этого были разработаны класс реализации хранилища MySQL, класс реализации хранилища HBase и класс реализации вывода консоли.Например, класс реализации хранилища MySQL на самом деле представляет собой простой оператор вставки данных:

/** * 使用dbc数据库连接池将数据写入mysql表中 */public class MySQLStoreImpl implements IStore {    private QueryRunner queryRunner = new QueryRunner(DBCPUtil.getDataSource());    @Override    public void store(Page page) {        String sql = "insert into phone(id, source, brand, title, price, comment_count, url, img_url, params) values(?, ?, ?, ?, ?, ?, ?, ?, ?)";        try {            queryRunner.update(sql, page.getId(),                    page.getSource(),                    page.getBrand(),                    page.getTitle(),                    page.getPrice(),                    page.getCommentCount(),                    page.getUrl(),                    page.getImgUrl(),                    page.getParams());        } catch (SQLException e) {            e.printStackTrace();        }    }}скопировать код

Класс реализации хранилища HBase — это обычно используемый код оператора вставки HBase Java API:

......// cf1:pricePut pricePut = new Put(rowKey);// 必须要做是否为null判断,否则会有空指针异常pricePut.addColumn(cf1, "price".getBytes(), page.getPrice() != null ? String.valueOf(page.getPrice()).getBytes() : "".getBytes());puts.add(pricePut);// cf1:commentPut commentPut = new Put(rowKey);commentPut.addColumn(cf1, "comment".getBytes(), page.getCommentCount() != null ? String.valueOf(page.getCommentCount()).getBytes() : "".getBytes());puts.add(commentPut);// cf1:brandPut brandPut = new Put(rowKey);brandPut.addColumn(cf1, "brand".getBytes(), page.getBrand() != null ? page.getBrand().getBytes() : "".getBytes());puts.add(brandPut);......скопировать код

Конечно, что касается места хранения данных, его можно выбрать вручную при инициализации краулера:

// 3.注入存储器iSpider.setStore(new HBaseStoreImpl());скопировать код

В настоящее время код не был написан таким образом, чтобы его можно было хранить в нескольких местах одновременно.Согласно текущей структуре кода, добиться этого относительно просто, достаточно изменить соответствующий код.

Фактически, вы можете сначала сохранить данные в MySQL, а затем импортировать их в HBase через Sqoop.Подробности см. в статье Sqoop, которую я написал.

Тем не менее важно отметить, что если вы решите сохранить данные в HBase, убедитесь, что у вас есть доступная среда кластера, и вам необходимо добавить следующие документы конфигурации в путь к классам:

core-site.xmlhbase-site.xmlhdfs-site.xmlскопировать код

Студенты, интересующиеся большими данными, могут пропустить этот пункт. Если вы не касались этого раньше, вы можете использовать хранилище MySQL напрямую. Вам нужно только внедрить хранилище MySQL при инициализации программы-краулера:

// 3.注入存储器iSpider.setStore(new MySQLStoreImpl());скопировать код

Система отправки URL

Система планирования URL-адресов является мостом и ключом к реализации распределенного распределения всей системы сканирования. Именно благодаря использованию системы планирования URL-адресов вся система сканирования может более эффективно получать URL-адреса случайным образом (Redis в качестве хранилища) и реализовывать распространение всей системы.

репозиторий URL-адресов

Как видно из схемы архитектуры, так называемый репозиторий URL — это не что иное, как репозиторий Redis, то есть Redis используется в нашей системе для хранения списка URL-адресов.

Именно таким образом мы можем гарантировать, что наши программы распространяются, если URL-адрес уникален, так что независимо от того, сколько у нас программ-сканеров, данные, сохраненные в конце, представляют собой только одну копию и не будут повторяться. .

В то же время стратегия получения URL-адресов в хранилище URL-адресов реализована посредством очередей, о которых можно узнать позже через реализацию планировщика URL-адресов.

Кроме того, в нашем хранилище URL в основном сохраняются следующие данные:

Исходный список URL-адресов, тип данных Redis — список

Начальный URL-адрес хранится постоянно. Через определенный период времени таймер URL-адреса получит URL-адрес через исходный URL-адрес и вставит его в очередь URL-адресов с высоким приоритетом, которую должен использовать наш сканер.

Это гарантирует, что наша программа-краулер сможет продолжать сканировать данные, не останавливая выполнение программы.

Очередь URL-адресов с высоким приоритетом, установлен тип данных Redis

Что такое очередь URL-адресов с высоким приоритетом? Фактически, он используется для сохранения URL-адреса списка. Итак, что такое URL-адрес листинга?

Грубо говоря, список содержит несколько продуктов, на примере JD.com мы открываем список мобильных телефонов:

Этот адрес содержит не URL-адрес конкретного продукта, а список нескольких данных (продуктов для мобильных телефонов), которые нам нужно просканировать.

Анализируя каждый URL-адрес высокого уровня, мы можем получить множество URL-адресов определенных продуктов, а URL-адреса конкретных продуктов являются URL-адресами с низким приоритетом, которые будут храниться в очереди URL-адресов с низким приоритетом.

Итак, если взять эту систему в качестве примера, сохраненные данные аналогичны следующему:

jd.com.higher    --https://list.jd.com/list.html?cat=9987,653,655&page=1    ... suning.com.higher    --https://list.suning.com/0-20006-0.html    ...скопировать код

Очередь URL-адресов с низким приоритетом, установлен тип данных Redis

URL-адрес с низким приоритетом на самом деле является URL-адресом определенного продукта, например следующего продукта для мобильных телефонов:

Загрузив данные URL-адреса и проанализировав их, мы можем получить нужные данные.

Итак, если взять эту систему в качестве примера, сохраненные данные аналогичны следующему:

jd.com.lower    --https://item.jd.com/23545806622.html    ...suning.com.lower    --https://product.suning.com/0000000000/690128156.html    ...скопировать код

Планировщик URL

Так называемый планировщик URL-адресов является стратегией планирования Java-кода хранилища URL-адресов, но поскольку его ядром является планирование, оно помещено в планировщик URL-адресов для иллюстрации.В настоящее время его планирование разработано на основе следующих интерфейсов:

/** * url 仓库 * 主要功能: *      向仓库中添加url(高优先级的列表,低优先级的商品url) *      从仓库中获取url(优先获取高优先级的url,如果没有,再获取低优先级的url) * */public interface IRepository {    /**     * 获取url的方法     * 从仓库中获取url(优先获取高优先级的url,如果没有,再获取低优先级的url)     * @return     */    public String poll();    /**     * 向高优先级列表中添加商品列表url     * @param highUrl     */    public void offerHigher(String highUrl);    /**     * 向低优先级列表中添加商品url     * @param lowUrl     */    public void offerLower(String lowUrl);}скопировать код

Его реализация на основе Redis в качестве репозитория URL-адресов выглядит следующим образом:

/** * 基于Redis的全网爬虫,随机获取爬虫url: * * Redis中用来保存url的数据结构如下: * 1.需要爬取的域名集合(存储数据类型为set,这个需要先在Redis中添加) *      key *          spider.website.domains *      value(set) *          jd.com  suning.com  gome.com *      key由常量对象SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY 获得 * 2.各个域名所对应的高低优先url队列(存储数据类型为list,这个由爬虫程序解析种子url后动态添加) *      key *          jd.com.higher *          jd.com.lower *          suning.com.higher *          suning.com.lower *          gome.com.higher *          gome.come.lower *      value(list) *          相对应需要解析的url列表 *      key由随机的域名 + 常量 SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX或者SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX获得 * 3.种子url列表 *      key *          spider.seed.urls *      value(list) *          需要爬取的数据的种子url *       key由常量SpiderConstants.SPIDER_SEED_URLS_KEY获得 * *       种子url列表中的url会由url调度器定时向高低优先url队列中 */public class RandomRedisRepositoryImpl implements IRepository {    /**     * 构造方法     */    public RandomRedisRepositoryImpl() {        init();    }    /**     * 初始化方法,初始化时,先将redis中存在的高低优先级url队列全部删除     * 否则上一次url队列中的url没有消耗完时,再停止启动跑下一次,就会导致url仓库中有重复的url     */    public void init() {        Jedis jedis = JedisUtil.getJedis();        Set<String> domains = jedis.smembers(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY);        String higherUrlKey;        String lowerUrlKey;        for(String domain : domains) {            higherUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX;            lowerUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX;            jedis.del(higherUrlKey, lowerUrlKey);        }        JedisUtil.returnJedis(jedis);    }    /**     * 从队列中获取url,目前的策略是:     *      1.先从高优先级url队列中获取     *      2.再从低优先级url队列中获取     *  对应我们的实际场景,应该是先解析完列表url再解析商品url     *  但是需要注意的是,在分布式多线程的环境下,肯定是不能完全保证的,因为在某个时刻高优先级url队列中     *  的url消耗完了,但实际上程序还在解析下一个高优先级url,此时,其它线程去获取高优先级队列url肯定获取不到     *  这时就会去获取低优先级队列中的url,在实际考虑分析时,这点尤其需要注意     * @return     */    @Override    public String poll() {        // 从set中随机获取一个顶级域名        Jedis jedis = JedisUtil.getJedis();        String randomDomain = jedis.srandmember(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY);    // jd.com        String key = randomDomain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX;                // jd.com.higher        String url = jedis.lpop(key);        if(url == null) {   // 如果为null,则从低优先级中获取            key = randomDomain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX;    // jd.com.lower            url = jedis.lpop(key);        }        JedisUtil.returnJedis(jedis);        return url;    }    /**     * 向高优先级url队列中添加url     * @param highUrl     */    @Override    public void offerHigher(String highUrl) {        offerUrl(highUrl, SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX);    }    /**     * 向低优先url队列中添加url     * @param lowUrl     */    @Override    public void offerLower(String lowUrl) {        offerUrl(lowUrl, SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX);    }    /**     * 添加url的通用方法,通过offerHigher和offerLower抽象而来     * @param url   需要添加的url     * @param urlTypeSuffix  url类型后缀.higher或.lower     */    public void offerUrl(String url, String urlTypeSuffix) {        Jedis jedis = JedisUtil.getJedis();        String domain = SpiderUtil.getTopDomain(url);   // 获取url对应的顶级域名,如jd.com        String key = domain + urlTypeSuffix;            // 拼接url队列的key,如jd.com.higher        jedis.lpush(key, url);                          // 向url队列中添加url        JedisUtil.returnJedis(jedis);    }}скопировать код

Вы также можете узнать с помощью анализа кода, что ядром является то, как планировать URL-адреса в репозитории URL-адресов (Redis).

URL-таймер

Через некоторое время URL-адреса как в очереди URL-адресов с высоким приоритетом, так и в очереди URL-адресов с низким приоритетом будут использоваться.

Чтобы позволить программе продолжить сканирование данных и уменьшить вмешательство человека, исходный URL-адрес можно вставить в Redis заранее, а затем можно использовать таймер URL-адреса для извлечения URL-адреса из исходного URL-адреса и сохранения его в высокоприоритетном Очередь URL для синхронизации программы. Цель непрерывного сканирования данных.

После использования URL-адреса необходимость непрерывного обхода данных в цикле зависит от индивидуальных потребностей бизнеса, поэтому этот шаг не является обязательным, но такая операция также предусмотрена.

Потому что на самом деле данные, которые нам нужно сканировать, будут обновляться через регулярные промежутки времени.Если мы хотим, чтобы данные, которые мы сканируем, регулярно обновлялись, то таймер будет играть очень важную роль.

Однако следует отметить, что после того, как было решено, что данные необходимо просматривать повторно, проблема повторяющихся данных должна быть рассмотрена при разработке реализации памяти, то есть повторяющиеся данные должны быть операцией обновления.

В настоящее время память, которую я разработал, не включает эту функцию, и заинтересованные друзья могут реализовать ее самостоятельно, просто нужно оценить, существуют ли данные в базе данных, прежде чем вставлять данные.

Следует также отметить, что таймер URL-адреса — это отдельный процесс, и его необходимо запускать отдельно.

Таймер реализован на базе Quartz, код его работы следующий:

/** * 每天定时从url仓库中获取种子url,添加进高优先级列表 */public class UrlJob implements Job {    // log4j日志记录    private Logger logger = LoggerFactory.getLogger(UrlJob.class);    @Override    public void execute(JobExecutionContext context) throws JobExecutionException {        /**         * 1.从指定url种子仓库获取种子url         * 2.将种子url添加进高优先级列表         */        Jedis jedis = JedisUtil.getJedis();        Set<String> seedUrls = jedis.smembers(SpiderConstants.SPIDER_SEED_URLS_KEY);  // spider.seed.urls Redis数据类型为set,防止重复添加种子url        for(String seedUrl : seedUrls) {            String domain = SpiderUtil.getTopDomain(seedUrl);   // 种子url的顶级域名            jedis.sadd(domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX, seedUrl);            logger.info("获取种子:{}", seedUrl);        }        JedisUtil.returnJedis(jedis);//        System.out.println("Scheduler Job Test...");    }}скопировать код

Реализация планировщика выглядит следующим образом:

/** * url定时调度器,定时向url对应仓库中存放种子url * * 业务规定:每天凌晨1点10分向仓库中存放种子url */public class UrlJobScheduler {    public UrlJobScheduler() {        init();    }    /**     * 初始化调度器     */    public void init() {        try {            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();            // 如果没有以下start方法的执行,则是不会开启任务的调度            scheduler.start();            String name = "URL_SCHEDULER_JOB";            String group = "URL_SCHEDULER_JOB_GROUP";            JobDetail jobDetail = new JobDetail(name, group, UrlJob.class);            String cronExpression = "0 10 1 * * ?";            Trigger trigger = new CronTrigger(name, group, cronExpression);            // 调度任务            scheduler.scheduleJob(jobDetail, trigger);        } catch (SchedulerException e) {            e.printStackTrace();        } catch (ParseException e) {            e.printStackTrace();        }    }    public static void main(String[] args) {        UrlJobScheduler urlJobScheduler = new UrlJobScheduler();        urlJobScheduler.start();    }    /**     * 定时调度任务     * 因为我们每天要定时从指定的仓库中获取种子url,并存放到高优先级的url列表中     * 所以是一个不间断的程序,所以不能停止     */    private void start() {        while (true) {        }    }}скопировать код

Система охранной сигнализации

Система мониторинга и сигнализации добавляется в основном для того, чтобы пользователи могли активно обнаруживать простои узлов, а не пассивно, потому что на практике сканер может работать непрерывно.

И мы развернем нашу программу-краулер на нескольких узлах, поэтому необходимо следить за узлами, и когда возникает проблема с узлом, ее можно вовремя обнаружить и исправить.Следует отметить, что система мониторинга и сигнализации независимый процесс, для которого требуется только Пуск.

Фундаментальный

Сначала вам нужно создать узел /ispider в ZooKeeper:

[zk: localhost:2181(CONNECTED) 1] create /ispider ispiderCreated /ispiderскопировать код

Разработка системы мониторинга и оповещения в основном зависит от реализации ZooKeeper, и программа мониторинга отслеживает каталог узлов ниже ZooKeeper:

[zk: localhost:2181(CONNECTED) 0] ls /ispider[]скопировать код

Когда сканер запустится, он зарегистрирует временный каталог узла в каталоге узла:

[zk: localhost:2181(CONNECTED) 0] ls /ispider[192.168.43.166]скопировать код

Когда узел выходит из строя, временный каталог узла удаляется ZooKeeper.

[zk: localhost:2181(CONNECTED) 0] ls /ispider[]скопировать код

В то же время, поскольку мы отслеживаем каталог узла /ispider, когда ZooKeeper удаляет каталог узла под ним (или добавляет каталог узла), ZooKeeper отправит уведомление нашей программе мониторинга.

То есть наша программа мониторинга получит обратный вызов, чтобы системное действие тревоги могло быть выполнено в программе обратного вызова, чтобы завершить функцию мониторинга тревоги.

Инструкции по Java API ZooKeeper

Вы можете использовать собственный Java API ZooKeeper.Я использую собственный API в другой инфраструктуре RPC, которую я написал (нижний уровень основан на Netty для удаленной связи).

Но, очевидно, код будет намного сложнее и потребует большего изучения и понимания самого ZooKeeper, чтобы его было проще использовать.

Поэтому для снижения сложности разработки здесь используется сторонний пакетный API, а именно куратор, для разработки клиентской программы ZooKeeper.

Поисковая система ZooKeeper регистрация

При запуске системы сканирования наша программа запускает клиент ZooKeeper для регистрации информации о своем узле в ZooKeeper, в основном IP-адреса.

И создайте узел, названный в честь IP-адреса узла, на котором находится программа-краулер, в каталоге узла /ispider, например, /ispider/192.168.43.116, код реализации выглядит следующим образом:

/** * 注册zk */private void registerZK() {    String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181";    int baseSleepTimeMs = 1000;    int maxRetries = 3;    RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);    CuratorFramework curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy);    curator.start();    String ip = null;    try {        // 向zk的具体目录注册 写节点 创建节点        ip = InetAddress.getLocalHost().getHostAddress();        curator.create().withMode(CreateMode.EPHEMERAL).forPath("/ispider/" + ip, ip.getBytes());    } catch (UnknownHostException e) {        e.printStackTrace();    } catch (Exception e) {        e.printStackTrace();    }}скопировать код

Следует отметить, что созданный нами узел является временным узлом, для реализации функции мониторинга и сигнализации он должен быть временным узлом.

программа мониторинга

Во-первых, нам нужно отслеживать каталог узла в ZooKeeper, В нашей системе дизайн заключается в мониторинге каталога узла /ispider:

public SpiderMonitorTask() {    String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181";    int baseSleepTimeMs = 1000;    int maxRetries = 3;    RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);    curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy);    curator.start();    try {        previousNodes = curator.getChildren().usingWatcher(this).forPath("/ispider");    } catch (Exception e) {        e.printStackTrace();    }}скопировать код

Выше зарегистрирован наблюдатель в ZooKeeper, который является программой обратного вызова, которая получает уведомление, в этой программе выполняется логика нашего будильника:

/** * 这个方法,当监控的zk对应的目录一旦有变动,就会被调用 * 得到当前最新的节点状态,将最新的节点状态和初始或者上一次的节点状态作比较,那我们就知道了是由谁引起的节点变化 * @param event */@Overridepublic void process(WatchedEvent event) {    try {        List<String> currentNodes = curator.getChildren().usingWatcher(this).forPath("/ispider");        //            HashSet<String> previousNodesSet = new HashSet<>(previousNodes);        if(currentNodes.size() > previousNodes.size()) { // 最新的节点服务,超过之前的节点服务个数,有新的节点增加进来            for(String node : currentNodes) {                if(!previousNodes.contains(node)) {                    // 当前节点就是新增节点                    logger.info("----有新的爬虫节点{}新增进来", node);                }            }        } else if(currentNodes.size() < previousNodes.size()) {  // 有节点挂了    发送告警邮件或者短信            for(String node : previousNodes) {                if(!currentNodes.contains(node)) {                    // 当前节点挂掉了 得需要发邮件                    logger.info("----有爬虫节点{}挂掉了", node);                    MailUtil.sendMail("有爬虫节点挂掉了,请人工查看爬虫节点的情况,节点信息为:", node);                }            }        } // 挂掉和新增的数目一模一样,上面是不包括这种情况的,有兴趣的朋友可以直接实现包括这种特殊情况的监控        previousNodes = currentNodes;   // 更新上一次的节点列表,成为最新的节点列表    } catch (Exception e) {        e.printStackTrace();    }    // 在原生的API需要再做一次监控,因为每一次监控只会生效一次,所以当上面发现变化后,需要再监听一次,这样下一次才能监听到    // 但是在使用curator的API时则不需要这样做}скопировать код

Конечно, в приведенной выше логике все еще есть некоторые проблемы с оценкой того, не работает ли узел, вы можете изменить приведенный выше программный код.

Модуль отправки электронной почты

Вы можете использовать код шаблона, но следует отметить, что при его использовании используйте свой собственный адрес электронной почты для информации об отправителе.

Ниже приведено электронное письмо, полученное при зависании узла искателя:

На самом деле, если услуга SMS куплена, также можно отправить SMS на наш мобильный телефон через SMS API.

Настоящий бой: сканирование всей сети JD.com и Suning.com с данными о мобильных телефонах

Поскольку я также упомянул, когда представлял эту систему ранее, я написал только парсеры веб-страниц JD.com и Suning.com, поэтому следующим шагом будет сканирование данных о продуктах мобильных телефонов по всей их сети.

Описание окружающей среды

Необходимо убедиться, что сервисы Redis и ZooKeeper доступны.Кроме того, если вам необходимо использовать HBase для хранения данных, вам необходимо убедиться, что HBase в кластере Hadoop доступен, а соответствующие файлы конфигурации добавлены в путь к классам краулера.

Еще одна вещь, которую следует отметить, это то, что таймер URL-адреса и система оповещения мониторинга работают как отдельные процессы и также являются необязательными.

результаты сканирования

Дважды просканировал, пытаясь сохранить данные в MySQL и HBase соответственно, и выдал следующую ситуацию с данными.

Сохранить в MySQL

mysql> select count(*) from phone;+----------+| count(*) |+----------+|    12052 |+----------+1 row in setmysql> select count(*) from phone where source='jd.com';+----------+| count(*) |+----------+|     9578 |+----------+1 row in setmysql> select count(*) from phone where source='suning.com';+----------+| count(*) |+----------+|     2474 |+----------+1 row in setскопировать код

Проверьте данные в визуализаторе:

Сохранить в HBase

hbase(main):225:0* count 'phone'Current count: 1000, row: 11155386088_jd.comCurrent count: 2000, row: 136191393_suning.comCurrent count: 3000, row: 16893837301_jd.comCurrent count: 4000, row: 19036619855_jd.comCurrent count: 5000, row: 1983786945_jd.comCurrent count: 6000, row: 1997392141_jd.comCurrent count: 7000, row: 21798495372_jd.comCurrent count: 8000, row: 24154264902_jd.comCurrent count: 9000, row: 25687565618_jd.comCurrent count: 10000, row: 26458674797_jd.comCurrent count: 11000, row: 617169906_suning.comCurrent count: 12000, row: 769705049_suning.com                 12348 row(s) in 1.5720 seconds=> 12348скопировать код

Посмотреть ситуацию с данными в HDFS:

Объем данных и анализ реальной ситуации

Цзиндон:Список JD.com содержит около 160 страниц, и в каждом списке содержится 60 данных о продуктах, так что общее количество составляет около 9600, и наши данные в основном непротиворечивы.

Позже, путем анализа журнала, мы можем узнать, что потеря данных, как правило, вызвана тайм-аутом соединения, поэтому при выборе среды для сканера рекомендуется выполнять ее на хосте с хорошим сетевым окружением.

При этом было бы лучше, если бы была библиотека IP-прокси-адресов, кроме того, время ожидания соединения можно дополнительно контролировать в нашей программе.

Если есть URL-адрес, по которому не удается выполнить обход данных, вы можете добавить его в очередь повторных попыток URL-адресов. В настоящее время я не использовал эту функцию, и заинтересованные студенты могут попробовать ее.

Интернет-рынок Suning:Давайте взглянем на данные Suning: в них около 100 страниц списков мобильных телефонов, и на каждой странице также 60 данных о товарах, так что всего около 6000.

Но видно, что наших данных всего порядка 3000 (чего не хватает, так это проблемы сбоя соединения, вызванной частым обходом), почему так?

Это связано с тем, что после открытия определенной страницы со списком Suning сначала загружает 30 продуктов, а когда указатель мыши скользит вниз, остальные 30 данных о продуктах будут загружены через другой API. Это касается каждой страницы со списком. на самом деле мы пропускаем половину данных о продукте без сканирования.

Зная причину, добиться не сложно, но из-за нехватки времени я этого не делал, а друзья, кому интересно, могут подкинуть.

Анализ производительности системы сканирования с помощью журналов

В нашей системе краулинга каждое ключевое место, такое как загрузка веб-страницы, анализ данных и т. д., имеет логгер, поэтому с помощью лога мы можем приблизительно проанализировать соответствующие временные параметры.

2018-04-01 21:26:03 [pool-1-thread-1] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下载网页:https://list.jd.com/list.html?cat=9987,653,655&page=1,消耗时长:590 ms,代理信息:null:null2018-04-01 21:26:03 [pool-1-thread-1] [cn.xpleaf.spider.core.parser.Impl.JDHtmlParserImpl] [INFO] - 解析列表页面:https://list.jd.com/list.html?cat=9987,653,655&page=1, 消耗时长:46ms2018-04-01 21:26:03 [pool-1-thread-3] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - 解析列表页面:https://list.suning.com/0-20006-0.html, 消耗时长:49ms2018-04-01 21:26:04 [pool-1-thread-5] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下载网页:https://item.jd.com/6737464.html,消耗时长:219 ms,代理信息:null:null2018-04-01 21:26:04 [pool-1-thread-2] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下载网页:https://list.jd.com/list.html?cat=9987,653,655&page=2&sort=sort_rank_asc&trans=1&JL=6_0_0,消耗时长:276 ms,代理信息:null:null2018-04-01 21:26:04 [pool-1-thread-4] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下载网页:https://list.suning.com/0-20006-99.html,消耗时长:300 ms,代理信息:null:null2018-04-01 21:26:04 [pool-1-thread-4] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - 解析列表页面:https://list.suning.com/0-20006-99.html, 消耗时长:4ms......2018-04-01 21:27:49 [pool-1-thread-3] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下载网页:https://club.jd.com/comment/productCommentSummaries.action?referenceIds=23934388891,消耗时长:176 ms,代理信息:null:null2018-04-01 21:27:49 [pool-1-thread-3] [cn.xpleaf.spider.core.parser.Impl.JDHtmlParserImpl] [INFO] - 解析商品页面:https://item.jd.com/23934388891.html, 消耗时长:413ms2018-04-01 21:27:49 [pool-1-thread-2] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下载网页:https://review.suning.com/ajax/review_satisfy/general-00000000010017793337-0070079092-----satisfy.htm,消耗时长:308 ms,代理信息:null:null2018-04-01 21:27:49 [pool-1-thread-2] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - 解析商品页面:https://product.suning.com/0070079092/10017793337.html, 消耗时长:588ms......скопировать код

В среднем время загрузки данных веб-страницы продукта варьируется от 200 до 500 миллисекунд, конечно, это также зависит от условий сети в это время.

Кроме того, если вы хотите фактически рассчитать данные о времени сканирования продукта, вы можете рассчитать его с помощью данных ниже журнала:

  • Время загрузки данных страницы продукта

  • Время получить данные о ценах

  • Время получить данные комментариев

На моем хосте (CPU: E5 10 ядер, RAM: 32GB, включена 1 виртуальная машина и 3 виртуальные машины) ситуация следующая:

Видно, что при использовании 3 узлов время не будет соответственно сокращено до 1/3 от исходного, потому что проблема, влияющая на производительность краулера в это время, в основном связана с сетевыми проблемами, количество узлов большой, количество потоков велико, и сетевых запросов тоже много.

Однако пропускная способность определена, и если прокси не используется, запросы будут частыми и количество сбоев соединения будет увеличиваться, что также имеет определенное влияние на время.Если используется случайная библиотека прокси, ситуация будет намного лучше.

Но что можно сказать наверняка, так это то, что после горизонтального расширения и добавления узлов поискового робота время нашего поискового робота может быть значительно сокращено, что также является преимуществом распределенной системы поискового робота.

Стратегии борьбы с рептилиями, используемые в краулерной системе

При проектировании всей системы обхода в основном используются следующие стратегии для достижения цели предотвращения обхода:

  • Использовать прокси для доступа --> библиотека IP-прокси, случайный IP-прокси.

  • Случайный доступ к URL домена верхнего уровня --> система планирования URL.

  • Каждый поток бездействует в течение короткого периода времени после обхода части товарных данных перед обходом.

Суммировать

Следует отметить, что эта система реализована на основе Java, но я лично считаю, что язык сам по себе не является проблемой, а ядро ​​заключается в проектировании и понимании всей системы.

Я написал эту статью, чтобы поделиться с вами архитектурой такого распределенного поискового робота. Если вас интересует исходный код, вы можете проверить его на моем GitHub.

Автор: Е Юнхао

Монтажер: Тао Цзялун, Сунь Шуцзюань

Адрес кода: https://github.com/xpleaf/ispider

Источник: http://blog.51cto.com/xpleaf/2093952

Вклад: Для тех, кто заинтересован в представлении статей, пожалуйста, свяжитесь с editor@51cto.com

Е Юнхао, инженер по работе с большими данными, сертифицированный инженер Huawei HCIE-RS. Он работал в Huawei, NetEase и других компаниях и в настоящее время занимается изучением и исследованиями в области больших данных.

Рекомендуемые статьи:

Я использовал Python для сканирования данных о 4400 продуктах Taobao и нашел эти «скрытые правила».

После прочтения этой статьи, если вы не понимаете блокчейн, вы проиграете и будете использовать Python для создания блокчейна с нуля.

Сотрудники Huawei, Ali и NetEase работают в нерабочее время. Вы не можете выиграть их, работая сверхурочно...