[Перевод] Перейти к миллиону подключений WebSocket

Go

Всем привет! Меня зовут Сергей Камардин, я инженер Mail.Ru.

В этой статье в основном рассказывается, как использовать Go для разработки высоконагруженных сервисов WebSocket.

Если вы знакомы с WebSockets, но не очень хорошо знакомы с Go, надеюсь, вас по-прежнему интересуют идеи и аспекты оптимизации производительности, изложенные в этой статье.

1. Введение

Для того, чтобы определить рамки этой статьи, необходимо объяснить, зачем нам нужна эта услуга.

У Mail.Ru много stateful-систем. Хранилище электронной почты пользователя является одним из них. У нас есть несколько способов отслеживать изменения состояния и системные события в этой системе, в основном посредством периодического опроса системы или системных уведомлений при изменении состояния.

Есть два пути друг к другу. Но для почты, чем быстрее пользователь получает новое письмо.

Опрос почты составляет около 50 000 HTTP-запросов в секунду, 60% из которых возвращают статус 304, что означает, что в почтовом ящике ничего не изменилось.

Поэтому, чтобы снизить нагрузку на сервер и ускорить доставку писем пользователям, мы решили построить колесо, используя шаблон службы публикации-подписки (также известной как шина сообщений, брокер сообщений или конвейер событий). Одна сторона получает уведомления об изменении состояния, а другая сторона подписывается на такие уведомления.

Предыдущая архитектура:

Текущая архитектура:

Первый сценарий — это предыдущая архитектура. Браузер периодически опрашивает API и запрашивает хранилище (службу почтовых ящиков) на наличие изменений.

Второй вариант — текущая архитектура. Браузер устанавливает соединение WebSocket с API-интерфейсом уведомлений, который является потребителем службы шины. Как только новое сообщение получено, Storage отправляет уведомление об этом на шину (1), которая рассылает его подписчикам (2). API отправляет полученное уведомление через соединение, отправляя его в браузер пользователя (3).

Итак, теперь мы поговорим об этом API или об этом сервисе WebSocket. Заглядывая вперед, наш сервис может иметь 3 миллиона онлайн-подключений в будущем.

2. Обычно используемые методы

Давайте посмотрим, как некоторые части сервера можно реализовать на Go без каких-либо оптимизаций.

пока мы продолжаем использоватьnet/httpПеред этим поговорим о том, как отправлять и получать данные. Эти данные находятся в протоколе WebSocket (например, объект JSON), которые мы будем называть пакетами ниже.

Давайте сделаем это первымChannelСтруктура, структура будет включать в себя логику соединения WebSocket для отправки и получения пакетов данных.

2.1 Структура канала

// WebSocket Channel 的实现
// Packet 结构体表示应用程序级数据
type Packet struct {
    ...
}

// Channel 装饰用户连接
type Channel struct {
    conn net.Conn    // WebSocket 连接
    send chan Packet // 传出的 packets 队列
}

func NewChannel(conn net.Conn) *Channel {
    c := &Channel{
        conn: conn,
        send: make(chan Packet, N),
    }

    go c.reader()
    go c.writer()

    return c
}

Я хочу, чтобы вы заметили,readerиwriterгорутины. Для каждой горутины требуется стек памяти, который может иметь начальный размер от 2 до 8 КБ, в зависимости отв зависимости от операционной системыи версия Go.

Около 3 000 000 упомянутых выше соединений линии, для которых нам нужно потреблять до 24 ГБ памяти (при условии, что один стек goroutine потребляет 4 КБ памяти) для всех соединений. И это не входитChannelСтруктура выделенной памяти,ch.sendПакеты исходящих памяти занимают память и другие внутренние поля.

2.2 I/O goroutines

ПосмотримreaderРеализация:

// Channel’s reading goroutine.
func (c *Channel) reader() {
    // 创建一个缓冲 read 来减少 read 的系统调用
    buf := bufio.NewReader(c.conn)

    for {
        pkt, _ := readPacket(buf)
        c.handle(pkt)
    }
}

Здесь мы используемbufio.Readerуменьшитьread()Количество системных вызовов и читает как можно большеbufЧисло, разрешенное размером буфера. В этом бесконечном цикле мы ждем поступления новых данных. Сначала запомните это предложение:Ожидание поступления новых данных. Мы рассмотрим это позже.

Мы пока опускаем парсинг и обработку входящих пакетов, так как это не важно для обсуждаемых нами оптимизаций. но,bufСтоит обратить внимание: По умолчанию это 4 КБ, что означает, что для подключения требуется дополнительно 12 ГБ памяти.writerЕсть аналогичная ситуация:

// Channel’s writing goroutine.
func (c *Channel) writer() {
    // 创建一个缓冲 write 来减少 write 的系统调用
    buf := bufio.NewWriter(c.conn)

    for pkt := range c.send {
        _ := writePacket(buf, pkt)
        buf.Flush()
    }
}

Мы проезжаем Каналc.sendОбход исходящих пакетов и запись их в буфер. Как мог догадаться внимательный читатель, это еще 12 ГБ потребления памяти для наших 3 миллионов подключений.

2.3 HTTP

реализовал простойChannel, теперь нам нужно подключиться с помощью WebSocket. Поскольку он все еще находится под заголовком обычного пути, мы продолжаем обычным способом.

Примечание. Если вы не знаете, как работает WebSocket, имейте в виду, что клиенты преобразуются в протокол WebSocket с помощью специального HTTP-механизма, который называется Upgrade. После успешной обработки запроса на обновление сервер и клиент будут использовать TCP-соединение для передачи двоичных кадров WebSocket.здесьпредставляет собой описание внутренней структуры соединения.

// 常用的转换为 WebSocket 的方法
import (
    "net/http"
    "some/websocket"
)

http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {
    conn, _ := websocket.Upgrade(r, w)
    ch := NewChannel(conn)
    //...
})

должны знать о том,http.ResponseWriterзаbufio.Readerиbufio.Writer(оба буфера по 4 КБ) выделенная память для*http.Requestинициализация и дальнейший ответ пишет.

Независимо от того, какая библиотека WebSocket используется, после успешного обновленияСервер звонит responseWriter.Hijack()Затем принимаются как буферы ввода-вывода, так и соединения TCP.

Совет: в некоторых случаяхgo:linknameможно использовать, позвонивnet/http.putBufio {Reader, Writer}вернуть буфер вnet/httpвнутриsync.Pool.

Так что нам также нужно 24 ГБ памяти для 3 миллионов подключений.

Итак, теперь в общей сложности 72 ГБ памяти потребляется для приложения, которое вообще ничего не делает!

3. Оптимизация

Давайте рассмотрим то, о чем мы говорили во вступительном разделе, и вспомним, как подключаются пользователи. После переключения на WebSocket клиент отправляет по соединению пакеты, содержащие соответствующие события. тогда (не считаяping/pongи т. д. сообщения), клиент не может отправлять что-либо еще в течение всего времени существования соединения.

Время жизни соединения может длиться от нескольких секунд до нескольких дней.

Поэтому большую часть времениChannel.reader()иChannel.writer()ожидают получения или отправки данных. Вместе с ними ожидаются буферы ввода-вывода по 4 КБ каждый.

Теперь должно быть ясно, где мы можем оптимизировать.

3.1 Netpoll

Channel.reader()Даваяbufio.Reader.Read()внутриconn.Read()закройОжидание поступления новых данных(Примечание переводчика: предзнаменование выше), как только в соединении есть данные, среда выполнения Go «пробуждает» горутину и позволяет ей прочитать следующий пакет данных. После этого горутина снова блокируется в ожидании новых данных. Давайте посмотрим на среду выполнения Go, чтобы понять, почему горутины должны быть «пробуждены».

если мы посмотрим наconn.Read()реализация, где вы увидитеnet.netFD.Read()перечислить:

// Go 内部的非阻塞读.
// net/fd_unix.go

func (fd *netFD) Read(p []byte) (n int, err error) {
    //...
    for {
        n, err = syscall.Read(fd.sysfd, p)
        if err != nil {
            n = 0
            if err == syscall.EAGAIN {
                if err = fd.pd.waitRead(); err == nil {
                    continue
                }
            }
        }
        //...
        break
    }
    //...
}

Go использует сокеты в неблокирующем режиме. EAGAIN означает, что в сокете нет данных, и чтение пустого сокета не будет заблокировано, ОС вернет нам управление. (Примечание переводчика: EAGAIN означает, что в настоящее время нет доступных данных, повторите попытку позже)

Мы видим из дескриптора файла подключения aread()Функция системного вызова. если чтение возвращаетОшибка EAGAIN, то среда выполнения вызываетpollDesc.waitRead():

// Go 内部关于 netpoll 的使用
// net/fd_poll_runtime.go

func (pd *pollDesc) waitRead() error {
   return pd.wait('r')
}

func (pd *pollDesc) wait(mode int) error {
   res := runtime_pollWait(pd.runtimeCtx, mode)
   //...
}

еслиКопать, мы увидим, что netpoll используется в Linuxepollреализовано, тогда как в BSD это делается с помощьюkqueueосуществленный. Почему бы не использовать тот же метод для соединений? Мы можем выделить буфер чтения и запускать горутину чтения только тогда, когда это действительно необходимо: когда в сокете есть данные для чтения.

На github.com/golang/go есть функция экспорта netpoll.issue.

3.2 Удаление потребления памяти горутинами

Предположим, у нас есть Goреализация сетевого опроса. Теперь мы можем избежать запуска во внутреннем буфереChannel.reader()Goroutine, но подписные события читаемые данные в связи:

// 使用 netpoll
ch := NewChannel(conn)

// 通过 netpoll 实例观察 conn
poller.Start(conn, netpoll.EventRead, func() {
    // 我们在这里产生 goroutine 以防止在轮询从 ch 接收数据包时被锁。
    go Receive(ch)
})

// Receive 从 conn 读取数据包并以某种方式处理它。
func (ch *Channel) Receive() {
    buf := bufio.NewReader(ch.conn)
    pkt := readPacket(buf)
    c.handle(pkt)
}

Channel.writer()Проще, потому что мы можем запускать только Goroutine и выделить буферы при отправке пакетов:

// 当我们需要时启动 writer goroutine
func (ch *Channel) Send(p Packet) {
    if c.noWriterYet() {
        go ch.writer()
    }
    ch.send <- p
}

Следует отметить, что когда операционная системаwrite()вернуться по вызовуEAGAIN, мы не обрабатываем эту ситуацию. Мы полагаемся на среду выполнения Go, чтобы справиться с этой ситуацией, поскольку на сервере она встречается редко. Однако при необходимости его можно комбинировать сreader()обрабатывается таким же образом.

когда изch.sendПосле чтения исходящих пакетов модуль записи завершит свою работу и освободит память горутины и буфера отправки.

Идеально!我们通过去除两个运行的 goroutine 中的内存消耗和 I/O 缓冲区的内存消耗节省了 48 GB。

3.3 Контроль ресурсов

Большое количество подключений связано не только с большим потреблением памяти. При разработке сервиса мы столкнулись с повторяющимися состояниями гонки и взаимоблокировками, вызванными самостоятельным DDoS.

Например, если по какой-то причине мы вдруг не можем справитьсяping/pongсообщение, но обработчик незанятого соединения продолжает закрывать такие соединения (при условии, что соединение уничтожено, данные не предоставлены), клиент теряет соединение каждые N секунд и пытается снова подключиться, вместо того, чтобы ждать события.

Или перегруженный сервер будет заблокирован из обслуживания, если это произойдет до того, как балансировщик нагрузки (например, Nginx) передаст запрос следующему экземпляру сервера, это будет хорошо.

Также, вне зависимости от загрузки сервера, если все клиенты вдруг (вероятно, по неправильным причинам) отправят нам пакеты, потребление прежних 48 ГБ памяти будет неизбежным, так как для каждого соединения нужно выделять горутины и буферы.

Пул горутин

В приведенном выше случае мы можем использовать пул горутин, чтобы ограничить количество пакетов, обрабатываемых одновременно. Вот простая реализация такого пула:

// goroutine 池的简单实现
package gopool

func New(size int) *Pool {
    return &Pool{
        work: make(chan func()),
        sem:  make(chan struct{}, size),
    }
}

func (p *Pool) Schedule(task func()) error {
    select {
    case p.work <- task:
    case p.sem <- struct{}{}:
        go p.worker(task)
    }
}

func (p *Pool) worker(task func()) {
    defer func() { <-p.sem }
    for {
        task()
        task = <-p.work
    }
}

Теперь наш код netpoll выглядит следующим образом:

// 处理 goroutine 池中的轮询事件。
pool := gopool.New(128)

poller.Start(conn, netpoll.EventRead, func() {
    // 我们在所有 worker 被占用时阻塞 poller
    pool.Schedule(func() {
        Receive(ch)
    })
})

Теперь мы не только читаем, когда в сокете есть читаемые данные, но и можем занять незанятую горутину в пуле.

Так же изменяемSend():

// 复用 writing goroutine
pool := gopool.New(128)

func (ch *Channel) Send(p Packet) {
    if c.noWriterYet() {
        pool.Schedule(ch.writer)
    }
    ch.send <- p
}

заменятьgo ch.writer(), мы хотим написать многоразовые горутины. Поэтому за наличиеNПул горутин, мы можем гарантировать одновременную обработкуNзапросы и вN + 1, мы не будем назначатьN + 1буфер. Пул горутин также позволяет нам ограничивать количество новых подключений.Accept()иUpgrade()и избежать большинства DDoS-атак.

3.4 обновить нулевую копию

Как упоминалось ранее, клиент использует HTTP Upgrade для переключения на протокол WebSocket. Вот как выглядит протокол WebSocket:

## HTTP Upgrade 示例

GET /ws HTTP/1.1
Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version: 13
Upgrade: websocket

HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocket

То есть в нашем случае HTTP-запрос и его Header необходимы для перехода на протокол WebSocket. это знание иhttp.Requestсодержимое, хранящееся вЧто для оптимизации нам нужно отказаться от лишнего выделения памяти и копирования памяти при обработке HTTP-запросов, и отказатьсяnet/httpбиблиотека.

Например,http.RequestEстьПоля заголовка с одинаковым названием, это поле используется для копирования данных из соединения для заполнения заголовка запроса. Представьте, сколько дополнительной памяти требуется этому полю, например, при обнаружении относительно большого заголовка Cookie.

Реализация веб-сокета

К сожалению, на момент нашей оптимизации все существующие библиотеки использовали стандартныйnet/httpБиблиотека обновлена. Кроме того, ни одна из библиотек (обе) не может использовать описанные выше оптимизации чтения и записи. Чтобы использовать эти оптимизации, нам нужно использовать относительно низкоуровневый API для обработки WebSockets. Чтобы повторно использовать буфер, нам нужно превратить функцию протокола в это:

func ReadFrame(io.Reader) (Frame, error)
func WriteFrame(io.Writer, Frame) error

Если есть библиотека с таким API, мы можем читать пакеты из соединения (и записывать пакеты тоже) следующим образом:

// 预期的 WebSocket 实现API
// getReadBuf, putReadBuf 用来复用 *bufio.Reader (with sync.Pool for example).
func getReadBuf(io.Reader) *bufio.Reader
func putReadBuf(*bufio.Reader)

// 当 conn 中的数据可读取时,readPacket 被调用
func readPacket(conn io.Reader) error {
    buf := getReadBuf()
    defer putReadBuf(buf)

    buf.Reset(conn)
    frame, _ := ReadFrame(buf)
    parsePacket(frame.Payload)
    //...
}

Короче говоря, нам нужна собственная библиотека WebSocket.

github.com/gobwas/ws

идеологически, письменноwsБиблиотеки предназначены для того, чтобы не навязывать пользователю логику работы своего протокола. Все методы чтения и записи реализуют стандартные интерфейсы io.Reader и io.Writer, поэтому их можно использовать с буферизацией или любым другим вводом-выводом или без нее.

кроме стандартной библиотекиnet/httpВ дополнение к запросу на обновление,wsТакже поддерживают нулевое обновление, запрос обновления и процессу переключения для выделения памяти или скопированы без памяти Websocket.ws.Upgrade()приниматьio.ReadWriter(net.Connреализует этот интерфейс). Другими словами, мы можем использовать стандартныйnet.Listen()Преобразовать полученное соединение изln.Accept()трансфер вws.Upgrade(). Эта библиотека позволяет скопировать какие-либо данные запроса для использования приложениями (например,Cookieиспользуется для аутентификации сеансов).

Ниже приведен запрос на обновлениеОриентирырезультат: стандартная библиотекаnet/httpуслуги с обновлениями без копированияnet.Listen():

BenchmarkUpgradeHTTP    5156 ns/op    8576 B/op    9 allocs/op
BenchmarkUpgradeTCP     973 ns/op     0 B/op       0 allocs/op

переключить наwsиОбновление без копированияСэкономили нам еще 24 ГБ памяти — вnet/httpПространство, выделяемое для буферов ввода-вывода при обработке запросов.

3.5 Резюме

Подведем итоги этих оптимизаций.

  • Горутина чтения с внутренним буфером стоит дорого. Решение: netpoll(epoll, kqueue); повторно используйте буфер.
  • Писать горутины с внутренними буферами дорого. Решение: запускайте горутины только при необходимости, повторно используйте буферы.
  • Если подключений много, netpoll не будет работать должным образом. Решение: используйте пул горутин и ограничьте количество рабочих процессов в пуле.
  • net/httpНе самый быстрый способ выполнить обновление до WebSocket. Решение. Используйте обновления памяти с нулевым копированием для незащищенных TCP-соединений.

Код сервиса выглядит так:

// WebSocket 服务器示例,包含 netpoll,goroutine 池和内存零拷贝的升级。
import (
    "net"
    "github.com/gobwas/ws"
)

ln, _ := net.Listen("tcp", ":8080")

for {
    // 尝试在空闲池的 worker 内的接收传入的连接。如果超过 1ms 没有空闲 worker,则稍后再试。这有助于防止 self-ddos 或耗尽服务器资源的情况。
    err := pool.ScheduleTimeout(time.Millisecond, func() {
        conn := ln.Accept()
        _ = ws.Upgrade(conn)

        // 使用 Channel 结构体包装 WebSocket 连接
        // 将帮助我们处理应用包
        ch := NewChannel(conn)

        // 等待连接传入字节
        poller.Start(conn, netpoll.EventRead, func() {
            // 不要超过资源限制
            pool.Schedule(func() {
                // 读取并处理传入的包
                ch.Recevie()
            })
        })
    })
    if err != nil {
        time.Sleep(time.Millisecond)
    }
}

Суммировать

Преждевременная оптимизация — корень всех зол (или, по крайней мере, большей их части) в программировании. -- Дональд Кнут

Конечно, вышеуказанные оптимизации - зависит от спроса, но не во всех случаях. Например, оптимизация может не иметь смысла, если соотношение между простоями ресурсами (памятью, CPU) и количеством онлайн-соединений высоки. Тем не менее, мы получаем выгоду от знания того, где и что оптимизировать.

Спасибо за Ваше внимание!

Цитировать


via: Woohoo.бесплатный код camp.org/news/в джунглях о…

автор:Sergey KamardinПереводчик:щелк-щелкВычитка:polaris1119

Эта статья написанаGCTTоригинальная компиляция,Перейти на китайский языкЧесть запуска