Как использовать данные Dask для параллельного анализа данных в Python

Программа перевода самородков анализ данных
Как использовать данные Dask для параллельного анализа данных в Python

Многомерное мышление. источник:Pixabay

иногда ты проходишь мимоПанды ПитонаОткройте большой набор данных и попытайтесь получить какие-то метрики, но тогда весь процесс может внезапно остановиться. Если вы используете Pandas для обработки больших данных, возможно, простое среднее значение ряда потребует от вас подождать минуту, и мы даже не позвонимapply. И это всего миллион строк! Когда ваши данные достигают миллиардов, вам лучше использовать Spark или что-то в этом роде.

Я открыл для себя этот инструмент некоторое время назад: способ ускорить анализ данных в Python без необходимости в улучшенной инфраструктуре или переключении языков. Но если набор данных слишком велик, его окончательные результаты оптимизации будут несколько ограничены, но он все равно масштабируется лучше, чем обычные Pandas, и может больше подходить для вашего сценария проблемы, особенно если не выполнять много переиндексации.

Что такое Даск?

Dask— это проект с открытым исходным кодом, который предоставляет вам абстракции для массивов NumPy, фреймов данных Pandas и обычных списков, позволяя вам выполнять их операции параллельно с использованием многоядерных процессоров.

Вот выдержка из этого урока:

Dask предоставляет коллекции Array, Bag и DataFrame более высокого уровня, которые имитируют NumPy, list и Pandas, но допускают параллельные операции с наборами данных, которые не помещаются в основную память. Для больших наборов данных расширенные коллекции Dask могут заменить NumPy и Pandas.

Это звучит здорово! Для этой статьи я опробовал Dask Dataframes и провел на нем несколько тестов.

Читать документацию

Сначала я прочитал официальную документацию, чтобы увидеть точную рекомендацию в документации Dask, а не в обычном Dataframe. Ниже приведеныофициальная документациячасть:

  • Манипулируйте большими наборами данных, даже если они не помещаются в памяти
  • Используйте как можно больше ядер для ускорения длительных вычислений
  • В больших наборах данных вычисления распределяются с помощью стандартных операций Pandas, таких как кластеризация, объединение и вычисления временных рядов.

Далее перечислены несколько быстрых сценариев, но только если вы работаете с данными Dask:

  • Арифметические операции (умножение или сложение последовательностей)
  • Общие агрегации (среднее, минимальное, максимальное, сумма и т. д.)
  • перечислитьприменить (если это индекс, а не groupby('y'), где y не является индексом)
  • вызовите value_counts(), drop_duplicates() или corr()
  • использоватьLoc,isinи построчный выбор для фильтрации

Если вы сочтете это полезным, просто сделайте небольшой обзор фильтрации данных.

#通过引用仅,返回 x >5 的行(根据起初的 df 写入更改)
df2 = df.loc[df['x'] > 5]
#通过引用,仅返回x 为 0、1、2、3 或 4 的行
df3 = df.x.isin(range(4))
#通过只读引用,仅返回 x > 5 的行(不能被写)
df4 = df[df['x']>5]

Как использовать Dask Dataframes

Dask Dataframes имеют API, аналогичный Pandas Dataframes, только агрегаты иapplyэто ленивое вычисление, вам нужно позвонитьcomputeметод расчета. Чтобы сгенерировать Dask Dataframe, вы можете просто вызвать Pandasread_csvметод или просто вызовите данный Pandas Dataframedf.

dd = ddf.from_pandas(df, npartitions=N)

ddfэто имя, которое вы импортировали с помощью DASK Dataframes, иnparitionsэто параметр, который сообщает Dataframe, как вы ожидаете, что он будет разделен.

StackOverflow, рекомендуется разделить Dataframe на разделы с одинаковым количеством ядер на вашем компьютере или кратным этому числу, потому что каждый раздел будет работать в отдельном потоке, и если потоков будет слишком много, он станет слишком между ними дорого.

Начало работы: сравнение!

Я разработал заметку Jupyter, чтобы опробовать эту структуру, и разместил ее наGithub, так что вы можете просмотреть подробности или даже запустить его самостоятельно.

Тесты, которые я запускал, доступны на GitHub, и вот основные из них:

def get_big_mean():
    return dfn.salary.mean().compute()
def get_big_mean_old():
    return df3.salary.mean()

def get_big_max():
    return dfn.salary.max().compute()
def get_big_max_old():
    return df3.salary.max()

def get_big_sum():
    return dfn.salary.sum().compute()
def get_big_sum_old():
    return df3.salary.sum()

def filter_df():
    df = dfn[dfn['salary']>5000]
def filter_df_old():
    df = df3[df3['salary']>5000]

Это подпрограмма с 25 миллионами строкdf3, содержимое используется изпредыдущий постгенерируется скриптом в (случайно нарисованные имена столбцов из спискаимя Фамилияа такжеsalary). Я использовал набор данных из 50 строк и объединил их 500 000 раз, потому что меня интересует только то, сколько времени потребуется для анализа.Per seно не интересует.

dfnоснован наdf3Фрейм данных Dask.

Результаты первой партии: не слишком оптимистичны

Во-первых, я пробовал тестировать с 3 разделами, так как у меня всего 4 ядра, я не хочу перегружать свой компьютер. Мои результаты с Dask были неоптимальными, и мне также пришлось долго ждать, чтобы получить результаты, которые, я боюсь, могут быть связаны с тем, что я делаю слишком мало разделов:

204.313940048 seconds for get_big_mean
39.7543280125 seconds for get_big_mean_old

131.600986004 seconds for get_big_max
43.7621600628 seconds for get_big_max_old

120.027213097 seconds for get_big_sum
7.49701309204 seconds for get_big_sum_old

0.581165790558 seconds for filter_df
226.700095892 seconds for filter_df_old

Вы можете видеть, что большинство операций выполняются намного медленнее, когда я использую Dask. Это дало мне подсказку, что мне, возможно, придется использовать больше разделов. Количество времени, необходимое для создания ленивых вычислений, также незначительно (в некоторых случаях менее полсекунды), и если я использую их повторно, они не амортизируются с течением времени.

я также использовалapplyметод, чтобы проверить это:

def f(x):
    return (13*x+5)%7

def apply_random_old():
    df3['random']= df3['salary'].apply(f)
    
def apply_random():
    dfn['random']= dfn['salary'].apply(f).compute()

Результат без разницы:

369.541605949 seconds for apply_random
157.643756866 seconds for apply_random_old

Так что в целом большинство операций по-прежнему в два раза быстрее, хотя фильтр намного быстрее. Меня беспокоит то, что, возможно, мне также стоит позвонитьcomputeЭто функция, поэтому используйте этот результат в качестве сравнения.

Больше разделов: невероятная скорость

После таких разочаровывающих результатов я подумал, что, может быть, я не использую достаточно разделов. Смысл этого в том, чтобы работать параллельно, может быть, мне нужно больше параллелизма? Итак, я провел тот же тест с 8 разделами, и вот результаты, которые я получил (я проигнорировал непараллельные кадры данных, поскольку они в основном одинаковы):

3.08352184296 seconds for get_big_mean
1.3314101696 seconds for get_big_max
1.21639800072 seconds for get_big_sum
0.228978157043 seconds for filter_df

112.135010004 seconds for apply_random
50.2007009983 seconds for value_count_test

Правильно, большинство операций выполняются более чем в 10 раз быстрее, чем обычный Dataframe,applyПолучил более высокую скорость! я все еще здесьsalaryзапустить по очередиvalue_countметод. Для контекста имейте в виду, что когда я запускал этот тест на обычном Dataframe, мне пришлось остановить процесс через 10 минут ожидания, а на этот раз это заняло всего 50 секунд! По сути, я просто использую не тот инструмент, и это очень быстро. Гораздо быстрее, чем обычные кадры данных.

в заключении

Учитывая, что я запускаю 250 миллионов строк контента в минуту на очень старом 4-ядерном ПК, я думаю, что он найдет место в реальном мире. Поэтому я предлагаю, чтобы в следующий раз, когда вы будете работать с наборами данных локально или из одного экземпляра AWS, рассмотреть возможность использования этой платформы, она действительно эффективна.

Я надеюсь, что вы нашли этот набор статей полезным или интересным! Написание его заняло больше времени, чем я ожидал, потому что некоторые тесты требовали временислишком долго. Не забудьте сообщить мне, если вы знаете о Dask, прежде чем читать, или если вы используете его на работе или в проекте. Кроме того, дайте мне знать, если есть другие замечательные функции, я не проверял, если я сделал что-то не так! Ваши отзывы и комментарии — одна из главных причин, почему я пишу, потому что мы все растем благодаря этому.

Если вам понравилась эта статья, вы можете продолжать поддерживать меня.может продолжать поддерживать мое письмо. Вы также можете узнать больше руководств, советов и рекомендаций по Python здесь!

Если вы обнаружите ошибки в переводе или в других областях, требующих доработки, добро пожаловать наПрограмма перевода самородковВы также можете получить соответствующие бонусные баллы за доработку перевода и PR. начало статьиПостоянная ссылка на эту статьюЭто ссылка MarkDown этой статьи на GitHub.


Программа перевода самородковэто сообщество, которое переводит высококачественные технические статьи из Интернета сНаггетсДелитесь статьями на английском языке на . Охват контентаAndroid,iOS,внешний интерфейс,задняя часть,блокчейн,продукт,дизайн,искусственный интеллектЕсли вы хотите видеть более качественные переводы, пожалуйста, продолжайте обращать вниманиеПрограмма перевода самородков,официальный Вейбо,Знай колонку.