Самый простой способ оптимизировать время отклика службы, никто

Микросервисы Go
Самый простой способ оптимизировать время отклика службы, никто

Предисловие - от Ван Цзюньфэна Кевина

Мы можем сделать среднюю задержку службы в основном около 30 мс.Одним из очень важных предварительных условий является то, что мы много используем технологию MapReduce, так что даже если наша служба вызывает много служб, это зависит только от самой медленной длительности запроса.

Для ваших существующих сервисов вам не нужно оптимизировать операции БД, кэшировать или переписывать бизнес-логику — вам нужно только распараллелить ортогональные (нерелевантные) запросы через MapReduce, и вы сможете значительно сократить время отклика сервиса.

В этой статье Оуян Ан подробно проанализирует детали реализации MapReduce.

Зачем вам MapReduce

В реальных бизнес-сценариях нам часто требуется получить соответствующие атрибуты от разных служб rpc для сборки сложных объектов.

Например, чтобы запросить сведения о продукте:

  1. Сервис продукта – запрос атрибутов продукта
  2. Служба инвентаризации — запрос свойств инвентаризации
  3. Служба цен — Свойства цены запроса
  4. Маркетинговые услуги — Маркетинговые свойства запроса

Если это последовательный вызов, время отклика будет увеличиваться линейно с количеством вызовов rpc, поэтому мы обычно меняем последовательный вызов на параллельный для оптимизации производительности.

Использование waitGroup в простых сценариях также может удовлетворить потребности, но что, если нам нужно проверить, обработать и преобразовать данные, возвращаемые вызовом rpc, и обобщить данные? Продолжать использовать waitGroup немного бессильно, такого инструмента нет в официальной библиотеке go (CompleteFuture предоставляется в java), автор go-zero реализует класс инструмента внутрипроцессного пакетирования данных mapReduce на основе архитектуры mapReduce. идея.

Идеи дизайна

Попробуем представить себя в роли автора, чтобы разобраться в возможных бизнес-сценариях параллельных инструментов:

  1. Запрос сведений о продукте: поддержка одновременных вызовов нескольких служб для объединения атрибутов продукта и поддержка немедленного прекращения вызова ошибок.
  2. Страница сведений о продукте автоматически рекомендует пользовательские карты и купоны: Поддерживает одновременную проверку карт и купонов. Если проверка не пройдена, она будет автоматически отклонена, а все карты и купоны будут возвращены.

Вышеприведенное фактически обрабатывает входные данные и, наконец, выводит очищенные данные.Существует очень классический асинхронный режим для обработки данных: режим производитель-потребитель. Таким образом, мы можем абстрагироваться от жизненного цикла пакетной обработки данных, который можно условно разделить на три этапа:

  1. производство данных
  2. картограф обработки данных
  3. редуктор агрегации данных

Среди них производство данных является обязательным этапом, обработка данных и агрегация данных являются необязательными этапами, производство и обработка данных поддерживают одновременные вызовы, а агрегация данных — это, по сути, операция с чистой памятью и одна сопрограмма.

Давайте подумаем о том, как данные должны проходить между разными этапами.Поскольку обработка данных на разных этапах выполняется разными горутинами, естественно рассмотреть возможность использования каналов для связи между горутинами.

Как реализовать прекращение процесса в любой момент?

Очень просто, просто прослушайте глобальный конечный канал в горутине.

реализация кода перехода на ноль

core/mr/mapreduce.go

Подробный исходный код можно посмотретьGitHub.com/OUSmoke/go…

Необходимые знания - базовое использование канала

Поскольку исходный код MapReduce использует много каналов для связи, давайте кратко упомянем основное использование каналов:

  1. Не забудьте закрыть канал после написания
ch := make(chan interface{})
// 写入完毕需要主动关闭channel
defer func() {
    close(ch)
}()
go func() {
    // v,ok模式 读取channel
    for {
        v, ok := <-ch
        if !ok {
            return
        }
        t.Log(v)
    }

    // for range模式读取channel,channel关闭循环自动退出
    for i := range ch {
        t.Log(i)
    }

    // 清空channel,channel关闭循环自动退出
    for range ch {
    }
}()
for i := 0; i < 10; i++ {
    ch <- i
    time.Sleep(time.Second)
}
  1. Закрытые каналы по-прежнему поддерживают чтение
  2. Ограничьте права чтения и записи канала
// 只读channel
func readChan(rch <-chan interface{}) {
    for i := range rch {
        log.Println(i)
    }
}

// 只写channel
func writeChan(wch chan<- interface{}) {
    wch <- 1
}

Определение интерфейса

Давайте сначала посмотрим на определения трех основных функций:

  1. производство данных
  2. обработка данных
  3. агрегация данных
// 数据生产func
// source - 数据被生产后写入source
GenerateFunc func(source chan<- interface{})

// 数据加工func
// item - 生产出来的数据
// writer - 调用writer.Write()可以将加工后的向后传递至reducer
// cancel - 终止流程func
MapperFunc func(item interface{}, writer Writer, cancel func(error))

// 数据聚合func
// pipe - 加工出来的数据
// writer - 调用writer.Write()可以将聚合后的数据返回给用户
// cancel - 终止流程func
ReducerFunc func(pipe <-chan interface{}, writer Writer, cancel func(error))

Определение пользовательского метода

Способ использования можно посмотреть в официальной документации, здесь повторяться не буду.

Существует множество ориентированных на пользователя методов, которые в основном делятся на две категории:

  1. без возврата
    1. Процесс выполнения прерывается немедленно с ошибкой
    2. Исполнение не фокусируется на ошибках
  2. имеет возвращаемое значение
    1. Вручную записывать в источник, вручную читать канал агрегированных данных
    2. Вручную записывайте источник, автоматически считывайте совокупный канал данных
    3. Внешний входящий источник, автоматически считываемый канал совокупных данных
// 并发执行func,发生任何错误将会立即终止流程
func Finish(fns ...func() error) error

// 并发执行func,即使发生错误也不会终止流程
func FinishVoid(fns ...func())

// 需要用户手动将生产数据写入 source,加工数据后返回一个channel供读取
// opts - 可选参数,目前包含:数据加工阶段协程数量
func Map(generate GenerateFunc, mapper MapFunc, opts ...Option)

// 无返回值,不关注错误
func MapVoid(generate GenerateFunc, mapper VoidMapFunc, opts ...Option)

// 无返回值,关注错误
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option)

// 需要用户手动将生产数据写入 source ,并返回聚合后的数据
// generate 生产
// mapper 加工
// reducer 聚合
// opts - 可选参数,目前包含:数据加工阶段协程数量
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error)

// 支持传入数据源channel,并返回聚合后的数据
// source - 数据源channel
// mapper - 读取source内容并处理
// reducer - 数据处理完毕发送至reducer聚合
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
    opts ...Option) (interface{}, error)

Основными методами являются MapReduceWithSource и Map, а другие методы вызывают их внутренне. Разобраться с методом MapReduceWithSource Карта не проблема.

Реализация исходного кода MapReduceWithSource

Это все на этой картинке

// 支持传入数据源channel,并返回聚合后的数据
// source - 数据源channel
// mapper - 读取source内容并处理
// reducer - 数据处理完毕发送至reducer聚合
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
    opts ...Option) (interface{}, error) {
    // 可选参数设置
    options := buildOptions(opts...)
    // 聚合数据channel,需要手动调用write方法写入到output中
    output := make(chan interface{})
    // output最后只会被读取一次
    defer func() {
        // 如果有多次写入的话则会造成阻塞从而导致协程泄漏
        // 这里用 for range检测是否可以读出数据,读出数据说明多次写入了
        // 为什么这里使用panic呢?显示的提醒用户用法错了会比自动修复掉好一些
        for range output {
            panic("more than one element written in reducer")
        }
    }()
    // 创建有缓冲的chan,容量为workers
    // 意味着最多允许 workers 个协程同时处理数据
    collector := make(chan interface{}, options.workers)
    // 数据聚合任务完成标志
    done := syncx.NewDoneChan()
    // 支持阻塞写入chan的writer
    writer := newGuardedWriter(output, done.Done())
    // 单例关闭
    var closeOnce sync.Once
    var retErr errorx.AtomicError
    // 数据聚合任务已结束,发送完成标志
    finish := func() {
        // 只能关闭一次
        closeOnce.Do(func() {
            // 发送聚合任务完成信号,close函数将会向chan写入一个零值
            done.Close()
            // 关闭数据聚合chan
            close(output)
        })
    }
    // 取消操作
    cancel := once(func(err error) {
        // 设置error
        if err != nil {
            retErr.Set(err)
        } else {
            retErr.Set(ErrCancelWithNil)
        }
        // 清空source channel
        drain(source)
        // 调用完成方法
        finish()
    })

    go func() {
        defer func() {
            // 清空聚合任务channel
            drain(collector)
            // 捕获panic
            if r := recover(); r != nil {
                // 调用cancel方法,立即结束
                cancel(fmt.Errorf("%v", r))
            } else {
                // 正常结束
                finish()
            }
        }()
        // 执行数据加工
        // 注意writer.write将加工后数据写入了output
        reducer(collector, writer, cancel)
    }()
    // 异步执行数据加工
    // source - 数据生产
    // collector - 数据收集
    // done - 结束标志
    // workers - 并发数
    go executeMappers(func(item interface{}, w Writer) {
        mapper(item, w, cancel)
    }, source, collector, done.Done(), options.workers)
    // reducer将加工后的数据写入了output,
    // 需要数据返回时读取output即可
    // 假如output被写入了超过两次
    // 则开始的defer func那里将还可以读到数据
    // 由此可以检测到用户调用了多次write方法
    value, ok := <-output
    if err := retErr.Load(); err != nil {
        return nil, err
    } else if ok {
        return value, nil
    } else {
        return nil, ErrReduceNoOutput
    }
}
// 数据加工
func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},
    done <-chan lang.PlaceholderType, workers int) {
    // goroutine协调同步信号量
    var wg sync.WaitGroup
    defer func() {
        // 等待数据加工任务完成
        // 防止数据加工的协程还未处理完数据就直接退出了
        wg.Wait()
        // 关闭数据加工channel
        close(collector)
    }()
    // 带缓冲区的channel,缓冲区大小为workers
    // 控制数据加工的协程数量
    pool := make(chan lang.PlaceholderType, workers)
    // 数据加工writer
    writer := newGuardedWriter(collector, done)
    for {
        select {
        // 监听到外部结束信号,直接结束
        case <-done:
            return
        // 控制数据加工协程数量
        // 缓冲区容量-1
        // 无容量时将会被阻塞,等待释放容量
        case pool <- lang.Placeholder:
            // 阻塞等待生产数据channel
            item, ok := <-input
            // 如果ok为false则说明input已被关闭或者清空
            // 数据加工完成,执行退出
            if !ok {
                // 缓冲区容量+1
                <-pool
                // 结束本次循环
                return
            }
            // wg同步信号量+1
            wg.Add(1)
            // better to safely run caller defined method
            // 异步执行数据加工,防止panic错误
            threading.GoSafe(func() {
                defer func() {
                    // wg同步信号量-1
                    wg.Done()
                    // 缓冲区容量+1
                    <-pool
                }()

                mapper(item, writer)
            })
        }
    }
}

Суммировать

Я просматривал исходный код mapReduce около двух ночей, и общий вид был довольно утомительным. С одной стороны, я не очень хорошо разбираюсь в языке го, особенно в использовании каналов, из-за чего мне приходится часто останавливаться, чтобы запрашивать соответствующие документы, чтобы понять метод письма автора Мозг (я восхищаюсь мыслительными способностями автора).

Во-вторых, когда вы смотрите на исходный код в первый раз, он определенно будет выглядеть запутанным.На самом деле, найти запись программы можно (общие базовые компоненты, как правило, ориентированы на методы).Во-первых, прочитайте все путь вдоль основной линии, понять каждый код и добавить комментарии, а затем посмотреть код ответвления.

Если есть что-то, что вы действительно не понимаете, проверьте запись о представлении этого кода. Очень вероятно, что это решит проблему с изменением. Например, я много раз читал следующий код и не понимаю его.

// 聚合数据channel,需要手动调用write方法写入到output中
output := make(chan interface{})
// output最后只会被读取一次
defer func() {
    // 如果有多次写入的话则会造成阻塞从而导致协程泄漏
    // 这里用 for range检测是否可以读出数据,读出数据说明多次写入了
    // 为什么这里使用panic呢?显示的提醒用户用法错了会比自动修复掉好一些
    for range output {
        panic("more than one element written in reducer")
    }
}()

Наконец, рисование блок-схемы может в основном понять исходный код.Для меня этот метод глупый, но эффективный.

материал

Информация о канале Go:коло not.com/2016/04/14/…

документация go-zero MapReduce:go-zero.Dev/talent/лошади горячие и ядовитые…

адрес проекта

GitHub.com/ноль микро/…

Добро пожаловатьgo-zeroиstarПоддерживать нас!

Группа обмена WeChat

обрати внимание на"Практика микросервисов』Общедоступный номер и нажмитегруппа обменаПолучите QR-код группы сообщества.