Практика веб-сокетов Golang

задняя часть сервер GitHub WebSocket

Вот краткое введение в websocket, которое на самом деле просто краткое введение.

1. Сценарии применения

В некоторых сценариях, таких как торговля по K-линии, нам нужно, чтобы клиентская часть опрашивала внутреннюю часть, чтобы постоянно получать или обновлять статус ресурса. Задача опроса не считается громоздким способом, ведь каждый http-запрос имеет три рукопожатия и четыре волны в дополнение к собственной передаче информации о ресурсах. Альтернативой опросу является повторное использование http-соединения, точнее того же tcp-соединения. Этот метод может быть длительным http-соединением или веб-сокетом.

2. Разница между Websocket и HTTP Long Connection

Прежде всего, Websocket и HTTP являются совершенно разными протоколами, хотя нижний слой является TCP / IP. Долговое соединение HTTP также является протоколом HTTP. Самая большая разница между протоколом HTTP и Websocket является то, что HTTP основан на режиме запроса / ответа, и клиент Websocket и Server может инициировать нажатие данных, например сервер в сторону приложения, более подходящую для WebSocket (эта сцена HTTP Long Connection также возможно, а клиент отправляет сообщение на время окончания сервера, например Нагреть, а затем сообщение, пославленное обратно в виде ответа в виде ответа.

здесьgist.GitHub.com/legend T смотрел/1…Я написал фрагмент кода github gist, чтобы каждый мог его испытать.

3. Лучшие практики Голанга

Давайте сначала определим наш сценарий использования: в транзакции много данных, таких как K-линия, например, рыночные данные регулярно обновляются, здесь мы можем использовать веб-сокет для этого. Проще говоря, передняя часть запрашивает определенные данные из задней части, такие как данные K-линии, передняя часть и задняя часть устанавливают соединение через веб-сокет, а задняя часть постоянно возвращает информацию во внешний интерфейс.

Прежде чем мы напишем интерфейс веб-сокета, нам нужно немного подумать о том, как абстрагироваться и как спроектировать нашу структуру веб-сокета, чтобы обеспечить хорошую масштабируемость кода.

3.1 Hub

Прежде всего, что такое хаб, следующая картинка - результат изображения Google. Чтобы провести простую аналогию, порты USB 3.0 (синие) на картинке эквивалентны каждому соединению tcp, а описанные выше интерфейсы являются восходящими источниками данных нашего концентратора.

В первый раз, когда я хотел определить степень детализации концентратора, нужно было использовать контроллер, который является маршрутизатором запросов. Но потом подумал, эта конструкция слишком сложна, потому что у роутера много всяких параметров, и разные параметры могут соответствовать разным данным.

Так как же его определить? Не с точки зрения функциональности, а с точки зрения источников данных. Нам нужно только посмотреть, сколько категорий постоянно обновляемых данных необходимо предоставить, и каждая категория здесь соответствует хабу.

3.2 Broadcast

Через 3.1 мы определили хаб, дальше нужно подумать, как сделать трансляцию.

Самый простой способ — перебрать все подключения на концентраторе и выполнить conn.Write(). Этот метод очень прост и груб, и проблема очевидна: каждый conn.Write() является сетевым вводом-выводом, и мы имеем дело с несколькими сетевыми операциями ввода-вывода последовательно, что неэффективно.

Последовательно-параллельно. Мы по-прежнему просматриваем все подключения на концентраторе, а затем каждый conn.Write() запускает для этого горутину, которая на самом деле является мультиплексированием ввода-вывода.

Подумайте, нет ли проблем с описанным выше методом. На самом деле есть: проблемы с масштабируемостью. Если параметров интерфейса вебсокета много, и мы хотим вернуть разные результаты на разные конн в соответствии с параметрами, что нам делать? Это также очень просто, инкапсулируйте вышеуказанное соединение один раз и инкапсулируйте его в структуру. Когда я давным-давно обсуждал расширяемость функций в статье, я также сказал, что разработка параметров функций в виде структур — это хороший способ расширения.

3.3 Восприятие данных концентратора

После 3.2, как мне получить данные вещания?Возьмите на себя инициативу, чтобы вытащить их из источника информации, или кто-то другой толкает их? Самый простой способ реализовать это — построить модель производитель-потребитель, а реализовать модель производитель-потребитель в golang особенно просто. Вместе с нами нам нужно только настроить канал в хабе.

В моем понимании, то, как данные, которые будут транслироваться, выживают, должно быть бизнес-логикой и не должно быть связано с базовой структурой.

4. talk is cheap, show me the code

Код использует следующие два пакета в качестве примера:

  1. GitHub.com/Сказал, что написал/Пчела…
  2. GitHub.com/gorilla/веб…

Процесс контроллера.

type WsController struct {
    beego.Controller
}
var upgrader = websocket.Upgrader{
    ReadBufferSize:  maxMessageSize,
    WriteBufferSize: maxMessageSize,
}

func (this *WsController) WSTest() {
    defer this.ServeJSON()
    ws, err := upgrader.Upgrade(this.Ctx.ResponseWriter, this.Ctx.Request, nil)
    // 这里 ws 就是 websocket.Conn,是 websocket 对 net.Conn 的封装
    if err != nil {
        this.Data["json"] = "fail"
        return
    }

    // WsClient 是我们对 websocket.Conn 的再一层封装,后面细说
    wsClient := &WsClient{
        WsConn:  ws,
        WsSend:  make(chan []byte, maxMessageSize),
        HttpRequest:  this.Ctx.Request,     //记录请求参数
    }

    service.ServeWsExample(wsClient)
}

Структура WSCLIENT.

type WsClient struct {
    WsConn  *websocket.Conn
    WsSend  chan []byte
    HttpRequest http.Request 
}

WsClient имеет два основных метода: обработка данных, отправленных клиентом, и обработка данных, отправленных сервером. Это использует функции в качестве параметров для достижения максимальной гибкости, но дизайн параметров функций не обязательно является наиболее подходящим.Если у вас есть более подходящий, вы можете посоветовать.

func (client *WsClient) ReadMsg(fn func(c *WsClient, s string)) {
    for {
        _, msg, err := client.WsConn.ReadMessage()
        if err != nil {
            break
        }
        fn(client, string(msg))
    }
}

func (client *WsClient) WriteMsg(fn func(s string) error) {
    for {
        select {
        case msg, ok := <-client.WsSend:
            if !ok {
                client.WsConn.WriteMessage(websocket.CloseMessage, []byte{})
                return 
            }
            if err := fn(string(msg)); err != nil {
                return
            }
        }
    }
}

Центр.

type WsHub struct {
    Clients    map[*WsClient]bool   // clients to be broadcast
    Broadcast  chan string
    Register   chan *WsClient
    UnRegister chan *WsClient
    LastMsg    string     // 最近一次的广播内容。如果我们是 1 分钟广播一次,新来一个请求还没有到广播的时间,就返回最近一次广播的内容
    Identity   string     //可以用作做标志
}

Концентратор включает в себя экспортированный метод Run и закрытый метод BroadCast().

func (hub *WsHub) Run() {
    for {
        select {
        case c := <-hub.Register:
            hub.Clients[c] = true
            c.WsSend <- []byte(hub.LastMsg)
            break
        case c := <-hub.UnRegister:
            _, ok := hub.Clients[c]
            if ok {
                delete(hub.Clients, c)
                close(c.WsSend)
            }
            break
        case msg := <-hub.Broadcast:
            hub.LastMsg = msg
            hub.broadCast()
            break
        }
    }
}

func (hub *WsHub) broadCast() {
    for c := range hub.Clients {
        select {
        case c.WsSend <- []byte(hub.LastMsg):
            break
        default:
            close(c.WsSend)
            delete(hub.Clients, c)
        }

    }
}

Теперь мы соединяем клиента и концентратор, который является первым примером.service.ServeWsExample(wsClient).

// 初始化
func initWs() {
    WsHubs = make(map[string]*util.WsHub)
    hubList := []string{"hub1", "hub2", "hub2"}

    for _, hub := range hubList {
        WsHubs[hub] = &WsHub {
            Clients: make(map[*util.WsClient]bool),
            Broadcast: make(chan string),
            Register:  make(chan *util.WsClient),
            UnRegister: make(chan *util.WsClient),
            //Identity:   hub.String(),
        }
        go mockBroadCast(WsHubs[hub].Broadcast)
        go WsHubs[hub].Run()
    }
}

func mockBroadCast(broadCast chan string) {
    for {
        broadCast <- "hello world"
        time.Sleep(time.Second * 10)
    }
}

// controller 请求路由到相应的 ServeWsExample 函数
func ServeWsExample(c *util.WsClient, pair string) {
    defer func() {
        WsHubs[pair].UnRegister <- c
        c.WsConn.Close()
    }()

    WsHubs[pair].Register <- c
    go c.WriteMsg(func(string) error {})
    c.ReadMsg(func(*WsClient, string){})
}

Еще один момент, который следует отметить, это то, что генератор (то есть процесс отправки данных в хаб) здесь не прописан, потому что способ написания производителя является более гибким, поэтому давайте просто напишем его здесь.

//init
func init() {
    go Producer()
}

// 生产者
func Producer() {
    for {
        // generate msg
        msg := "hello, I am legendtkl"

        // select the proper hub to send the msg
        WsHubs["hub1"].Broadcast <- msg
    }
}

5. Пишите в конце

Один из вопросов, о котором я думал после работы, — как измерить масштабируемость кода и как написать код с высокой масштабируемостью? Обмен приветствуется.

Если приведенный выше демонстрационный код необходим, я упакую его и загружу на github.