Вспомните анализ больших данных Python на маленькой машине.

Python MySQL Linux Mac анализ данных pandas сбор данных

0x00 Предисловие

Так совпало, что компании неожиданно приходится проводить волну анализа большого количества данных. Это относится к анализу пассажиропотока.

Уровень данных неплох.После сжатия gzip файл SQL (MySQL innoDB) с почти 400 точками имеет размер около 100 ГБ, а исходные записанные данные оцениваются примерно в 18 миллиардов.

После декомпрессии... о Т.

Если вы играете в юанях, вы, естественно, купите десятки высококлассных машин, станете шардом mysql или перейдете напрямую к ведрам больших данных, таким как hadoop и hive, и пусть программисты идут к черту.

Ну, но для игрока, не использующего юаней, такого как я, мне приходится использовать одну машину, чтобы нести его тяжело.

Тогда дерзай.

Конфигурация моей машины следующая:

  • LAN-сервер (Ubuntu 16.04 LTS)

    • Xeon(R) CPU E3-1225 v5 @ 3.30GHz
    • 16 ГБ памяти
    • 1T жесткий диск
  • Компьютер Apple 2016 15 дюймов сверху

    • 1T жесткий диск
    • i7 четырехъядерный

0x01 Этап подготовки данных

Анализируйте большие данные с помощью недорогих машинпервый принцип,этоНе анализируйте большие данные.

какие?

этоИзвлеките как можно больше наименьшего надмножества аналитических данных, необходимых для выводов.

Маленькие машины не могут выполнять массивные вычисления, но с помощью определенной фильтрации и фильтрации данные могут быть отфильтрованы до объема вычислений, с которым может справиться машина. Чтобы достичь цели, позволяющей анализировать массивные данные.

1.1 Импорт данных в MySQL

Давайте сначала проигнорируем 3721, так как файл SQL дан, он должен храниться в библиотеке, тогда проблема:

Есть несколько шагов, чтобы положить слона в холодильник

Импорт данных в базу данных требует нескольких шагов

Или, как быстрее импортировать данные из 400 разных таблиц.

Общие шаги таковы:

  • Добавьте жесткий диск и инициализируйте его
  • Настройте каталог данных MySQL на недавно добавленный жесткий диск.
  • Импорт данных (PV и MySQL)

Добавьте жесткий диск и инициализируйте его

Во-первых,купить и вставить жесткий диск

Используйте lshw для просмотра информации о жестком диске

root@ubuntu:~# lshw -C disk
  *-disk
       description: SCSI Disk
       product: My Passport 25E2
       vendor: WD
       physical id: 0.0.0
       bus info: scsi@7:0.0.0
       logical name: /dev/sdb
       version: 4004
       serial: WX888888HALK
       size: 3725GiB (4TB)
       capabilities: gpt-1.00 partitioned partitioned:gpt
       configuration: ansiversion=6 guid=88e88888-422d-49f0-9ba9-221db75fe4b4 logicalsectorsize=512 sectorsize=4096
  *-disk
       description: ATA Disk
       product: WDC WD10EZEX-08W
       vendor: Western Digital
       physical id: 0.0.0
       bus info: scsi@0:0.0.0
       logical name: /dev/sda
       version: 1A01
       serial: WD-WC888888888U
       size: 931GiB (1TB)
       capabilities: partitioned partitioned:dos
       configuration: ansiversion=5 logicalsectorsize=512 sectorsize=4096 signature=f1b42036
  *-cdrom
       description: DVD reader
       product: DVDROM DH1XXX8SH
       vendor: PLDS
       physical id: 0.0.0
       bus info: scsi@5:0.0.0
       logical name: /dev/cdrom
       logical name: /dev/dvd
       logical name: /dev/sr0
       version: ML31
       capabilities: removable audio dvd
       configuration: ansiversion=5 status=nodisc

Отформатируйте жесткий диск с помощью fdisk и раздела

fdisk /dev/sdb
#输入 n
#输入 p
#输入 1
#输入 w
sudo mkfs -t ext4 /dev/sdb1
mkdir -p /media/mynewdrive
vim /etc/fstab
# /dev/sdb1    /media/mynewdrive   ext4    defaults     0        2
# 直接挂载所有,或者 reboot
mount -a

Пока жесткий диск отформатирован.

Инструкции по установке жестких дисков см. на странице https://help.ubuntu.com/community/InstallingANewHardDrive.

Настроить MySQL

Объем ограничен, только введение в настройку MySQL DataDIR в Ubuntu 16.04, опуская настройку установки и базовую аутентификацию при входе.

Путь по умолчанию для mysql под ubuntu выглядит следующим образом:

/var/lib/mysql/

Приступим к настройке DataDIR

systemctl stop mysql
rsync -av /var/lib/mysql /mnt/volume-nyc1-01
mv /var/lib/mysql /var/lib/mysql.bak
vim /etc/mysql/mysql.conf.d/mysqld.cnf
# 修改至 datadir=/mnt/volume-nyc1-01/mysql
vim /etc/apparmor.d/tunables/alias
# alias /var/lib/mysql/ -> /mnt/volume-nyc1-01/mysql/
sudo systemctl restart apparmor
vim /usr/share/mysql/mysql-systemd-start
# 修改成
if [ ! -d /var/lib/mysql ] && [ ! -L /var/lib/mysql ]; then
 echo "MySQL data dir not found at /var/lib/mysql. Please create one."
 exit 1
fi

if [ ! -d /var/lib/mysql/mysql ] && [ ! -L /var/lib/mysql/mysql ]; then
 echo "MySQL system database not found. Please run mysql_install_db tool."
 exit 1
fi

# 接下来
sudo mkdir /var/lib/mysql/mysql -p
sudo systemctl restart mysql

# 最后 my.conf 修改相关文件路径

Подробнее см. в этой статье https://www.digitalocean.com/community/tutorials/how-to-move-a-mysql-data-directory-to-a-new-location-on-ubuntu-16- 04

После настройки DataDIR данные можно импортировать. Что ж, после всех этих неприятностей я решил перейти на Docker вместо Ubuntu Server в следующий раз, когда столкнусь с этой ситуацией.

Глядя на это сейчас, если я буду делать это снова, я обязательно буду использовать Docker и смонтировать диск с данными на новый жесткий диск.

Например, прямое выполнение команды Docker.

docker run --name some-mysql -v /my/own/datadir:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=my-secret-pw -d mysql:tag

MySQL + PV для импорта данных

Когда мы используем mysql для импорта скриптов, есть несколько способов импортировать

  • исходная команда, однако эту команду легко застрять, когда объем данных велик. (Впечатление состоит в том, чтобы напрямую загрузить файл sql в память и выполнить его. Однако, пока в печати и выполнении участвует большой объем текста, скорость будет намного медленнее.)
  • команда mysql
# mysql 命令的典型导入场景就是这样
mysql -uadmin -p123456 some_db < tb.sql

С командой PV это более волшебно. Есть индикатор прогресса! !

# 附加进度条的导入场景
pv -i 1 -p -t -e ./xxxx_probe.sql | mysql -uadmin -p123456 some_db

Затем вы можете просмотреть использование дисковой памяти ЦП. Если нагрузка (фокус на IO, память) недостаточно полная, используйте tmux, чтобы открыть еще несколько процессов для импорта данных.

Поскольку таблица, соответствующая каждому файлу SQL, отличается, открытие еще нескольких процессов для пакетной вставки не приведет к блокировке таблицы, что может значительно повысить скорость импорта.

1.2 Экспорт данных

Теперь, когда данные импортированы, зачем их экспортировать?

Из-за большого объема данных требуется предварительная очистка. И мы обязательно будем использовать Pandas для анализа в конце.При чтении большого количества данных из базы данных локальной сети скорость pandas будет очень низкой (конкретно из-за скорости передачи по сети?). Поэтому, чтобы избавить себя от хлопот для последующего анализа, я экспортировал данные партиями, а затем классифицировал их в соответствии со своими привычками.

В ходе этого процесса я также выполнил небольшую фильтрацию данных, например:

  • Выбирайте только те строки и столбцы, которые вам полезны.
  • CSV-файл, который делит данные на наименьшие единицы

Выбирайте только те строки и столбцы, которые вам полезны

select col_a , col_b from some_table where Acondition and bcondition and col_c in ('xx','yy','zz');

Вот некоторые примечательные вещи

  • Попробуйте написать простые суждения слева.
  • Если он не запрашивается повторно, нет необходимости создавать индекс. Просто перейдите ко всей таблице, отфильтруйте нужные данные и сохраните их в CSV.

Разделите данные на CSV-файлы с наименьшими возможными единицами измерения.

Если он разделен по определенной категории или определенному периоду времени, он может быть разделен в любой момент анализа.

Например, большой CSV-файл, содержащий местоположения различных персонажей и места сюжета в Qiong Yao, можно разделить на:

201712_大明湖畔_夏雨荷_还珠格格_你还记得吗.csv
201711_老街_可云_情深深雨蒙蒙_谁来救我.csv
201710_屋子里_云帆_又见一帘幽梦_你的腿不及紫菱的爱情.csv

Когда нам нужно получить этот фрагмент данных, мы можем напрямую его подставить, затем отсортировать, а затем выполнить двоичный поиск. Вы можете быстро прочитать эту часть данных.

1.3 Проверка целостности данных

Данные, предоставленные третьей стороной, будут иметь те или иные проблемы в большей или меньшей степени.В общем, вы можете максимально снизить недостоверность данных, проверив полноту данных.

В такую ​​таблицу я обычно подробно записываю различные параметры и ход полноты данных.

Например:

  • Наличие и фактическое состояние данных
  • Периодическое рекордное количество и статистическое значение баллов
  • макс, мин, среднее, медиана используются, чтобы избежать выбросов
  • Если он разделен на годы, вы должны считать положение каждого дня, иначе вы не будете знать степень отсутствия данных.

0x02 Фаза анализа

После предыдущего шага общий размер файла данных составляет около 1000 ГБ (без сжатия) -> 30 ГБ (разделен на несколько сжатых файлов). Каждый файл весит около нескольких сотен мегабайт.

2.1 Пункт производительности 1: файловая система

Если статистическая логика проста, но количество большое, предпочтительно использовать считываемые файлы. Чтение файлов для статистики происходит очень быстро. (игроки с юанями уходят)

Как и wc, grep, sort, uniq в linux, иногда можно использовать в этом сценарии.

Однако обратите внимание, что если файл особенно велик, обязательно прочитайте итератор один за другим.

2.2 Точки производительности 2: разделить целое на ноль, фильтр уменьшить карту

Это уже обсуждалось в разделе 1.2 выше.

map/reduce/filter может значительно сократить код.

В коллекции есть счетчик, который может сильно сократить код при ведении простой статистики кода.

2.3 Пункт производительности 3: две роли пула процессов

Все мы знаем, что при выполнении ресурсоемких задач с помощью Python мы можем рассмотреть возможность использования нескольких процессов для ускорения:

которыйДля ускорения вычислений, это функция номер один. следующее:

def per_item_calc(item):
    df = pd.read.....
    # complex calc
    return result

with ProcessPoolExecutor(3) as pool:
    result_items = pool.map(per_item_calc,all_tobe_calc_items)

reduce_results = ....

На самом деле разрушение самого процесса может принести мне второй эффектуправлять памятью.

Это будет объяснено в DataFrame в 2.6.

2.4 Пункт производительности 4: List and Set , itertools

Существует 400 наборов наборов UUID, и число каждого списка составляет около 1 000 000, и повторение между списками и списками не очень велико. Я хочу получить все UUID после дедупликации, что мне делать?

При дедупликации естественно думать об использовании коллекции для решения этой проблемы.

Первоначальный подход был

list_of_uuid_set = [ set1 , set2 ... set400 ]
all_uuid_set = reduce(lambda x: x | y, list_of_uuid_set)

1 час прошел. Внезапно везде наступила тишина. Сотни людей собрались внутри и снаружи компании, но никто из них не издал ни звука, некоторые хотели высказаться, их также сдерживала тихая атмосфера, и они сдерживали свои слова. Кажется, что световой индикатор жесткого диска также не горит, издавая тихий и ненормальный звук. Я подумал про себя:

Что в это время делает младшая сестра?Черт возьми, программа снова зависла?

Поднимитесь по SSH и запустите машину. Обнаружено, что реальное хранилище и память заполнены. Интуиция подсказывает мне, что операции над множествами в CPython должны занимать довольно много памяти.

Ну, как это работает, попробуйте использовать список. Объем памяти, занимаемый списком, должен быть относительно небольшим.

def merge(list1,list2):
    list1.append(list2)
    return list1

list_of_uuid_list = [ list1 , list2 ... list400 ]
all_uuid_set = set(reduce(merge, list_of_uuid_list))

1 час прошел. Я похлопал себя по бедру и сказал:

Что в это время делает младшая сестра?Черт возьми, программа снова зависла?

Наконец нашел лучшее решение на StackOverFlow.

list_of_uuid_list = [ list1 , list2 ... list400 ]
all_uuid_set = set(list(itertools.chain(*list_of_uuid_list)))

Запустите его, и вы получите результат менее чем за 5 секунд (обратите внимание, что дедупликация Set включена).

В itertools также есть много интересных функций, которые можно использовать.

https://docs.python.org/3/library/itertools.html

2.5 Пункт производительности 5: влияние IPython на производительность

Когда мы анализируем данные, мы часто используем IPython или Jupyter Notebook.

Однако, хотя это удобно, это принесет немного неприятностей, если вы не обратите на это внимание.

Например, подчеркивание и двойное подчеркивание сохраняют возвращаемое значение предыдущей ЯЧЕЙКИ и возвращаемое значение предыдущей ЯЧЕЙКИ соответственно.

2.6 Пункт производительности 6: проблемы GC, вызванные DataFrame

DataFrame - причина, по которой я использую Pandas, В процессе использования DataFrame на этот раз все еще есть некоторые головные боли. Например, необъяснимые утечки памяти.

def per_item_calc(item):
    df = pd.read.....
    # complex calc
    return result

result_items = []
for item in all_tobe_calc_items:
    result_items.append(per_item_calc(item))

reduce_results = ....

Я читаю DataFrame и назначаю его df в цикле For, а затем подсчитываю результат. Само собой разумеется, что каждый раз, когда есть только простой результат, размер читаемого файла каждый раз один и тот же, и одно и то же будет занимать почти 2 ГБ памяти, и когда я назначаю df,Само собой разумеется, что номер ссылки исходного df должен быть равен 0, он будет удален gc, и будет освобождено 2G памяти., так что маловероятно, что памяти будет не хватать.

Запустив программу, память biubiubiubiu растет, когда цикл около 1000-го, пока не будет заполнена память 16G.

Не лучше ли явно указать del ? код показывает, как показано ниже:

def per_item_calc(item):
    df = pd.read.....
    # complex calc
    del df
    return result

Вроде стало немного лучше, но ненамного лучше.

Однако, как и в прошлый раз, память biubiubiubiu растет, когда цикл достигает 1000-го, пока память 16G не будет заполнена.

Просто при чтении файла предварительно сокращается не сброшенный в последнем цикле df.От предыдущей идеи особой разницы нет. В дополнение к уменьшению памяти более чем на один G на прочитанный файл раньше времени по сравнению с предыдущим методом.

В поисках соответствующей информации, информации, связанной с Pandas GC в Python, не так много, Давайте немного разберемся в ней следующим образом:

Программа Python В Linux или Mac, даже если она является объектом del, Python все равноСтоять в канаве и не гадитьТолько не возвращайте память системе, займите ее сначала сами,У тебя есть возможность убить меняпока процесс не будет уничтожен.

Хорошо? Это отличается от того, что я хочу? Как управлять объектами в пандах и где GC не на месте? Еще не сказал.

Ссылаться на:

  • https://stackoverflow.com/questions/23183958/python-memory-management-dictionary
  • http://effbot.org/pyfaq/why-doesnt-python-release-the-memory-when-i-delete-a-large-object.htm

Но одно меня просветило.

пока процесс не будет уничтожен.

Разве в Python нет модуля ProcessPoolExecutor?

Итак, вопрос в том, ProcessPoolExecutor динамически создает процессы и назначает задачи, назначая процесс каждому элементу для работы? Или после создания трех процессов назначить элемент простаивающему процессу для работы?

  • Если последнее, то это серьезный пул процессов. Кажется, что карта прошла, и процесс не уничтожается, если задача не завершена или она не завершается аварийно. Это не дает нам решения проблемы утечки памяти.
  • Если это первое, то это не пул потоков, но это может помочь мне решить проблему утечки памяти.

Вы сказали, что пул процессов должен быть немного прежним. Но прежде чем проверить, этот пул процессов - это просто ваша идея, принесенная из других языков. Это пул потоков? Что это за пул процессов? Если процесс зависает во время выполнения, его будет меньше в это время. Поток, это добавит еще один процесс? ?

Как посмотреть верификацию?

  1. Запустите программу, введите Htop, чтобы увидеть PID процесса
  2. посмотреть исходный код
# https://github.com/python/cpython/blob/3.6/Lib/concurrent/futures/process.py#L440
def _adjust_process_count(self):
    for _ in range(len(self._processes), self._max_workers):
        p = multiprocessing.Process(
                target=_process_worker,
                args=(self._call_queue,
                        self._result_queue))
        p.start()
        self._processes[p.pid] = p

Из исходного кода поток, управляющий процессом, создается в основном потоке, а поток, управляющий процессом, создает процессы max_workers (в моем примере всего 3 воркера).

является пулом процессов.

Ну, если это пул процессов, кажется, что карта прошла, и процесс не уничтожается, если задача не выполняется или завершается аварийно. Это не дает нам решения проблемы утечки памяти.

Подождите, а что, если вы используете пул с несколькими процессами?

def per_item_calc(item):
    df = pd.read.....
    # complex calc
    return result

result_items = []
step = 300
for idx in range(0,len(all_tobe_calc_items),step):
    pieces_tobe_calc_items = all_tobe_calc_items[idx:idx+step]
    with ProcessPoolExecutor(3) as pool:
        pieces_result_items = pool.map(per_item_calc,pieces_tobe_calc_items)
        result_items.append(pieces_result_items)

reduce_results = list(itertools.chain(*result_items))

Конечно, это способ заставить ОС сделать сборщик мусора за меня.То есть Python не может мне помочь с GC, а операционная система мне помогает с GC

PS: На самом деле модуль многопроцессорности тоже можно использовать, но пул потоков может немного контролировать количество создаваемых процессов.

Подводя итог, для большого количества обработки DataFrame:

  1. Несколько пулов процессов — один из способов справиться с этим.
  2. Минимизируйте количество DataFrames
  3. Минимизируйте КОПИРОВАНИЕ, вызванное назначением, и установите inplace=True при изменении
  4. При чтении CSV указывайте тип соответствующего столбца {'col_a': np.float64, 'col_b': np.int32}, иначе панды будут генерировать большое количество объектов

0xDD Дополнительная история

На этот раз в процессе анализа данных сломалась и моя материнская плата Mac, которая, к счастью, была еще на гарантии и отправлена ​​в магазин Apple на ремонт. Престижность послепродажного обслуживания Apple.

0xEE обновление

  • 2017-12-07Инициализировать эту статью
  • 2017-12-16Добавить текст для этапа анализа
  • 2017-12-26Удалите некоторые TODO и разместите на моем веб-сайте
  • 2017-12-31официальный релиз