1. Введение
Агрегационный анализ больших данных очень полезен на предприятиях, и любой, кто имеет опыт разработки больших данных, знает, что ES и Mongo предоставляют специализированные решения по агрегации для решения этой проблемы. Тем не менее, агрегация больших объемов данных в режиме реального времени всегда была проблемой при реализации бизнеса.ES и Mongo по своей природе дружественны к распределению и часто хранят большие объемы данных в разных сегментах;язык Go рождается параллельно, и агрегация данных часто может разделить данные на блоки.Расчет, этот раздел сочетает в себе функции параллельных вычислений языка Go для агрегирования десятков миллионов данных документа монго за одну секунду. У автора нет глубоких исследований больших данных, и я надеюсь, что опытные читатели могут дать критику и дополнительные предложения. Адрес источника этой статьи на github:Mongo эффективно объединяет десятки миллионов документов
2. Общие методы агрегации базы данных mongo
Mongo не имеет тех же парадигмальных ограничений, что и mysql, и может хранить сложные типы, такие как массивы, объекты и другие структуры типа документа, которые mysql плохо обрабатывает.В то же время операция агрегирования намного сложнее. чем майскл.
Mongo предоставляет три способа выполнения операций агрегирования данных документа.В этом разделе кратко описаны различия между тремя способами:
- Совокупный конвейер (агрегатный конвейер)
- Агрегатная вычислительная модель (MapReduce)
- Отдельный порядок агрегации (Группа, Отдельный, Количество)
2.1 Отдельные команды агрегации
Отдельная команда агрегации имеет более низкую производительность, чем агрегация, и меньшую гибкость, чем Mapreduce; она проста в использовании.
- группа: его можно использовать для операций агрегации документов с небольшими объемами данных, чтобы обеспечить более богатые статистические требования, чем подсчет и различение.Вы можете использовать функции js для управления системой логика подсчета.
До версии 2.2 групповая операция могла возвращать только до 10 000 сгруппированных записей, но с версии 2.2 до версии 2.4 mongodb был оптимизирован для поддержки возврата 20 000 сгруппированных записей, если количество сгруппированных записей больше 20 000, то вы могут потребоваться другие методы для статистики, такие как конвейер агрегации или MapReduce
-
count: db.collection.count() эквивалентно db.collection.find().count(), неприменимо к распределенной среде, рекомендуется для распределенной среды агрегат
-
отличительный: можно использовать индекс, синтаксис очень прост: db.collection.distinct(поле, запрос), поле является дедуплицированным полем (одиночное или вложенное поле) имя); запрос — это условие запроса
2.2 Совокупный конвейер платформы агрегации
Фреймворк агрегатной агрегации построен на основе модели конвейера обработки данных.Документы обрабатываются через многоэтапные конвейеры, а затем возвращаются к результатам агрегации;схема агрегатной конвейерной агрегации использует встроенную операцию агрегации mongodb, которая относительно более Эффективность Aggregate предпочтительнее при выполнении операций агрегирования данных mongodb;
агрегат может повысить производительность за счет индексации, и есть некоторые специфические приемы для производительности конвейера (операции агрегатного конвейера выполняются в памяти, существуют ограничения на размер памяти и размер набора данных для обработки);
Совокупная операция конвейера похожа на операцию конвейера в системе unix/Linux.После того, как текущий документ обрабатывается в первом узле конвейера, обработанные данные перебрасываются на следующий узел конвейера до тех пор, пока не будет завершена окончательная обработка, и выходное содержимое;
Пределы совокупного
- Когда размер одного документа в результирующем наборе, возвращаемом агрегатом, превышает 16 МБ, команда сообщит об ошибке (используя агрегат без указания параметра курсора или сохраняя результаты в коллекции, агрегатная команда вернет файл bson, содержащийся в полях результирующий набор.Если результат Общий размер набора превышает предельный размер файла bson (16 МБ), команда выдаст ошибку ;)
- Максимальный лимит памяти на этапе обработки конвейера не может превышать 100 МБ, при превышении этого лимита будет выдано сообщение об ошибке, для обработки больших наборов данных можно включить опцию allowDiskUse, а операции конвейера можно записывать во временные файлы; сценарий использования агрегата применим к сценариям, которые требуют определенных требований к производительности совокупного ответа (оптимизация индекса и комбинации)
2.3 Агрегатная вычислительная модель MapReduce
Сила MapReduce заключается в его способности параллельно выполнять сложную логику агрегирования на нескольких серверах. MapReduce — это вычислительная модель, короче говоря, она делит большой пакет работы (данных) на выполнение (MAP), а затем объединяет результаты в окончательный результат (REDUCE). MapReduce использует идиоматические операции javascript для операций сопоставления и уменьшения, поэтому MapReduce является более гибким и сложным, чем агрегатный конвейер, и потребляет больше производительности, чем агрегатный конвейер; MapReduce обычно использует предварительную обработку количества дискового хранилища. Данные, конвейеры всегда обрабатывают данные в памяти.
Сценарии использования MapReduceОн используется для обработки больших наборов результатов данных и может обрабатывать сложные требования к агрегированию с использованием javascript с высокой гибкостью.
3. Принцип реализации агрегатного конвейера и общий синтаксис
Метод агрегатного конвейера в MongoDB использует агрегат(), синтаксис выглядит следующим образом:
db.COLLECTION_NAME.aggregate(AGGREGATE_OPERATION)
Ниже приведено сравнение метода агрегата() и агрегатного класса mysql.
операции агрегации монго | SQL-операции (функции) | инструкция |
---|---|---|
$match | where | Условный поиск по данным |
$group | group by | групповые данные |
$having | having | Фильтровать агрегированные данные |
$project | select | Выберите поля данных |
$sort | order by | Сортировать данные |
$limit | limit | Ограничить количество возвращаемых данных |
$sum | сумма(), количество() | Поля агрегированной статистики |
Такие операции, как $match и $group в совокупности, называются стадиями в конвейере. Они предоставляют расширенные методы для фильтрации агрегированных данных. $match предоставляет $gt(>), $lt(=), $lte(
$groupГруппирует документы по указанному выражению и выводит каждый документ, сгруппированный по-разному, на следующий этап. Выходной документ содержит поле _id, которое содержит разные группы по ключу. Выходной документ также может содержать вычисляемое поле, содержащее значение некоторого выражения-аккумулятора, сгруппированное по полю _id в $group. $group не выводит конкретные документы, а только статистику. грамматика:
{ $group: { _id: <expression>, <field1>: { <accumulator1> : <expression1> }, ... } }
- Поле _id является обязательным, однако можно указать значение _id, равное NULL, чтобы вычислить кумулятивное значение для всего входного документа.
- Остальные вычисляемые поля являются необязательными и вычисляются с помощью операторов.
Общие операторы аккумулятора:
название | описывать | аналогия sql |
---|---|---|
$avg | Рассчитать среднее значение | avg |
$first | Возвращает первый документ каждой группы в отсортированном порядке, если он есть, или первый документ в сохраненном порядке по умолчанию, если нет. | limit 0,1 |
$last | Возвращает последний документ каждой группы в отсортированном порядке, если он есть, или последний документ в сохраненном порядке по умолчанию, если нет. | - |
$max | По группировке получить максимальное значение, соответствующее всем документам в коллекции. | max |
$min | По группировке получить минимальное значение, соответствующее всем документам в коллекции. | min |
$sum | Рассчитать сумму | sum |
$push | Добавляет значение указанного выражения в массив. | - |
db.collection.aggregate() — конвейер агрегации, основанный на обработке данных.Каждый документ проходит через конвейер, состоящий из нескольких этапов.Последовательность обработки выводит соответствующие результаты. Благодаря этому изображению вы можете понять процесс обработки Aggregate:
Конвейер агрегации может определить, можно ли выполнить агрегацию, используя только подмножество полей в документе. Если это так, конвейер может использовать только эти необходимые поля, тем самым уменьшая объем данных, поступающих в конвейер.
Вот несколько распространенных методов оптимизации:
- 1. Оптимизация порядка $match + $group В конвейере $group использует $match для фильтрации данных документов, что может значительно сократить количество документов, возвращаемых одним конвейером, тем самым повышая эффективность.
- 2. $групповая + $проектная оптимизация После того, как конвейер $group агрегирует данные документа, он по умолчанию вернет документ bson с _id.Мы можем экспортировать данные, используемые в _id, и поместить их в $project Установите только указанное поле ограничения, которое может уменьшить размер выходного документа.
- 3. Оптимизация $skip + $limit Если в вашем конвейере за $skip следует $limit , оптимизатор переместит $limit впереди $skip , в это время значение $limit будет добавлено с $skip количество.
- 4. Если $sort предшествует $limit, оптимизатор может объединить $limit внутри $sort. На этом этапе, если указано ограничение на возврат n результатов, то Операции сортировки должны поддерживать только первые n результатов, а MongoDB нужно хранить только n элементов в памяти.
Дополнительные советы по оптимизации агрегации см.оптимизация агрегации монго.
4. Реализация кода
4.1 Организация данных
Демонстрация кода в этом разделе требует большого объема данных. Вы можете использовать хранимую процедуру mysql для создания массивных данных, а затем импортировать их в базу данных mongo. Метод генерации см. здесь:MySQL быстро генерирует миллионы данных
{
"_id" : ObjectId("5e06de309d1f74e9badda0db"),
"username" : "dvHPRGD1",
"age" : 87,
"sex" : 1,
"salary" : 3251
}
{
"_id" : ObjectId("5e06de309d1f74e9badda0dc"),
"username" : "rNx6NsK",
"age" : 7,
"sex" : 1,
"salary" : 7891
}
......
Документ очень прост. Четыре содержимого: имя (имя пользователя), возраст (возраст), пол (пол) и зарплата (зарплата) генерируются случайным образом. Возраст - это случайное число от 0 до 99, пол - только 0 и 1, а зарплата тоже находится в определенном диапазоне случайных чисел.
4.2 Достижение целей и решения
С источником данных наша цель тоже очень проста, то есть быстро получить общее количество и среднюю зарплату людей разного возраста и пола. По сути, нас просят агрегировать возраст и пол этих десятков миллионов данных, а потом делать статистические расчеты по зарплате.
После того, как цель ясна, мы индексируем возраст и пол в документе, чтобы ускорить наш подсчет. Учитывая, что мы хотим получить результат быстро, мы используем агрегат конвейера агрегации монго. Ранее я говорил, что конвейер агрегации имеет ограничения по памяти и размеру возвращаемых документов. Десять миллионов данных определенно превысят лимит памяти монго. проблема проблема,Разработчики часто используют параметр allDiskUse, чтобы открыть диск для завершения работы по агрегации данных, но вычислительная эффективность диска и памяти различается в сотни раз, что неизбежно влияет на эффективность агрегации и не может гарантировать производительность в реальном времени.
Давайте решим эту проблему с другим мышлением.Хотя у нас много данных документов, возраст ограничен.Есть только 100 номеров от 0 до 99 лет, и есть только два пола.Мы можем использовать go чтобы открыть 100 goroutines для агрегирования возраста в данных документа 0-99, после завершения агрегации данные могут быть объединены вместе для завершения нашей работы по агрегации. Go очень подходит для такого рода работ, потому что стоимость запуска горутины очень мала, а если данные распределены и хранятся на разных машинах, можно добиться распределенной агрегации данных.Задачу агрегации можно разделить на небольшие задачи одну за другой, что обеспечивает предпосылку для параллельных вычислений на языке go, и вроде бы все в порядке.
4.3 Интерпретация кода
У автора ограниченные условия, поэтому для демонстрации я использовал локальную базу данных своего компьютера.Сначала создайте один экземпляр соединения монго (если это распределенная среда, вы можете использовать пул соединений)
// mongoAggregate/mongoClient/mongoClient.go
package mongoClient
import (
"context"
"fmt"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type MongoClient struct {
Client *mongo.Client
Collection *mongo.Collection
}
var (
GMongo *MongoClient
)
func InitMongodb() {
var(
ctx context.Context
opts *options.ClientOptions
client *mongo.Client
err error
collection *mongo.Collection
)
// 连接数据库
ctx, _ = context.WithTimeout(context.Background(), 10*time.Second) // ctx
opts = options.Client().ApplyURI("mongodb://127.0.0.1:27017") // opts
if client, err = mongo.Connect(ctx,opts); err != nil{
fmt.Println(err)
return
}
//链接数据库和表
collection = client.Database("screen_data_stat").Collection("test")
//赋值单例
GMongo = &MongoClient{
Client:client,
Collection:collection,
}
}
...... //入口文件main.go中初始化(init函数) Mongo连接
func init() {
mongoClient.InitMongodb()
}
Агрегатные функции реализованы в пакете агрегатов:
package aggregate
import (
"context"
"log"
"mongoAggregate/mongoClient"
"sync"
"time"
bson2 "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func genPipeline(age int) (bson2.D, bson2.D, bson2.D) {
matchStage := bson2.D{
{"$match", bson2.D{
{"age",
bson2.D{
{"$eq", age},
}},
}},
}
groupStage := bson2.D{
{"$group", bson2.D{
{"_id", bson2.D{
{"age", "$age"},
{"sex", "$sex"},
}},
{"age", bson2.D{
{"$first", "$age"},
}},
{"sex", bson2.D{
{"$first", "$sex"},
}},
{"total", bson2.D{
{"$sum", 1},
}},
{"avgSalary", bson2.D{
{"$avg", "$salary"},
}},
}},
}
projectStage := bson2.D{
{"$project", bson2.D{
{"_id", 0},
{"age", 1},
{"sex", 1},
{"total", 1},
{"avgSalary", 1},
}},
}
return matchStage, groupStage, projectStage
}
func DataAggregate(age int, resultChan chan bson2.M, wg *sync.WaitGroup) {
matchStage, groupStage, projectStage := genPipeline(age)
opts := options.Aggregate().SetMaxTime(15 * time.Second)
cursor, err := mongoClient.GMongo.Collection.Aggregate(context.TODO(), mongo.Pipeline{matchStage, groupStage, projectStage}, opts)
if err != nil {
log.Fatal(err)
}
//打印文档内容
var results []bson2.M
if err = cursor.All(context.TODO(), &results); err != nil {
log.Fatal(err)
}
for _, result := range results {
resultChan <- result
}
wg.Done()
}
Метод genPipeline используется для создания различных этапов конвейера агрегации mongo.Поскольку язык go может возвращать несколько значений, в DataAggregate используется прием с несколькими значениями, а агрегированные результаты отправляются через канал resultChan для завершения агрегации. .WaitGroup предназначен для управления основной функцией. Функция настроена на выход перед другими горутинами и используется для управления количеством параллелизма.
Поскольку мы используем несколько горутин для выполнения параллельных операций, результат, который мы получаем, на самом деле зависит от времени, которое требуется самой медленной горутине для выполнения задачи.Мы обрабатываем результат следующим образом: сортируем и форматируем его как json, затем нам нужно выполнить выходное содержимое определяется следующим образом:
//output/resultSlice.go
package output
// 按照 Person.Age 从大到小排序
type OutPut struct {
Age int32 `json:"age"`
Sex int32 `json:"sex"`
Total int32 `json:"total"`
AvgSalary float64 `json:"avg_salary"`
}
type ResultSlice [] OutPut
func (a ResultSlice) Len() int { // 重写 Len() 方法
return len(a)
}
func (a ResultSlice) Swap(i, j int) { // 重写 Swap() 方法
a[i], a[j] = a[j], a[i]
}
func (a ResultSlice) Less(i, j int) bool { // 重写 Less() 方法, 从大到小排序
return a[j].Age < a[i].Age
}
Интерфейс функции сортировки реализован выше, и мы можем реализовать сортировку результатов вывода по возрасту.
Далее работа, выполняемая основной функцией, более ясна:
func main() {
dataStatResult := make(chan bson2.M)
var output output2.ResultSlice
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go aggregate.DataAggregate(i, dataStatResult, &wg)
}
for value := range dataStatResult {
output = append(output, output2.OutPut{
Age: value["age"].(int32),
Sex: value["sex"].(int32),
Total: value["total"].(int32),
AvgSalary: value["avgSalary"].(float64),
})
if len(output) == 200 {
break
}
}
wg.Wait()
//倒序排列
sort.Sort(output)
for _, v := range output {
result, err := json.Marshal(&v)
if err != nil {
fmt.Printf("json.marshal failed, err:", err)
return
}
fmt.Println(string(result))
}
}
Сначала определите конвейер для связи между основной горутиной и другими параллельными горутинами для получения результатов, рассчитанных другими горутинами.В примере 100 горутин запускаются для групповой агрегации.Агрегированные результаты принимаются через канал dataStatResult и преобразуются в выходные данные. Сохраните их в срезах. После того, как вся работа будет сделана, отсортируйте результаты по возрасту и отформатируйте их как вывод json. Это логика одновременной агрегации огромных объемов данных. Ниже представлен результат авторской агрегации возрастом 0-20 лет (данных около 2 млн, а работа по агрегации завершается за 200мс):
{"age":19,"sex":0,"total":49773,"avg_salary":5346.04197054628}
{"age":19,"sex":1,"total":49985,"avg_salary":4677.7744523357005}
{"age":18,"sex":0,"total":48912,"avg_salary":5335.430671409879}
{"age":18,"sex":1,"total":50136,"avg_salary":4540.624461464816}
{"age":17,"sex":0,"total":49609,"avg_salary":5372.679755689492}
......
5. Резюме
В этой статье в основном описывается применение языка go в сценариях агрегации больших данных и статистики. Фактически, независимо от того, требуется ли производительность в реальном времени, есть идея блочной агрегации. Агрегация MapReduce в Mongo и группировка ES (агрегация ведра) Это все, что нужно для того, чтобы объединять большие данные в небольшие задачи в пакетах, выполнять их одну за другой и, наконец, достигать цели Их эффективность не высока. Параллельные вычисления языка Go могут хорошо применяться в этом сценарии, предоставляя решение для агрегирования массивных данных (на уровне миллиардов) в реальном времени.