Разработка механизма сопоставления: кодовая реализация процесса

Go

Добро пожаловать на официальный аккаунт "Keegan Xiaogang" для получения дополнительных статей.


Разработка движка соответствия: начало

Разработка соответствующего движка: версия MVP

Разработка механизма сопоставления: проектирование структуры данных

Разработка Matching Engine: стыковка черного ящика

Разработка механизма сопоставления: процесс расшифровки «черного ящика»

Разработка механизма сопоставления: кодовая реализация процесса


Запись программы

Мы собираемся начать говорить о логике реализации кода, если вы не помните структуру каталогов, упомянутую ранее, пожалуйста, вернитесь и посмотрите на нее.Преамбула. Первый шаг разговора о реализации кода, естественно, начинается со входа в программу, а ядро ​​состоит из двух функций:init()а такжеmain(), код выглядит следующим образом:

package main

... //other codes

func init() {
	initViper()
	initLog()

	engine.Init()
	middleware.Init()
	process.Init()
}

func main() {
	mux := http.NewServeMux()
	mux.HandleFunc("/openMatching", handler.OpenMatching)
	mux.HandleFunc("/closeMatching", handler.CloseMatching)
	mux.HandleFunc("/handleOrder", handler.HandleOrder)

	log.Printf("HTTP ListenAndServe at port %s", viper.GetString("server.port"))
	if err := http.ListenAndServe(viper.GetString("server.port"), mux); err != nil {
		panic(err)
	}
}

Функция init() выполняет некоторые операции инициализации.Позвольте мне кратко представить эти функции инициализации:

  • initViper(): Инициализация файла конфигурации с использованием сторонней библиотеки конфигурации viper, которая является широко используемой библиотекой конфигурации, ее адрес github:github.com/spf13/viper.
  • initLog(): инициализация лога, программа в основном использует для вывода лог-файлов определённый ею же пакет лога, а реализация лог-пакета будет рассмотрена отдельно в последующих статьях.
  • engine.Init(): Инициализация пакета движка просто инициализирует карту, которая используется для сохранения каналов заказов различных целей транзакций, которые используются в качестве очереди упорядочивания каждой цели транзакции.
  • middleware.Init(): Инициализация промежуточного программного обеспечения, единственное промежуточное программное обеспечение, которое мы используем, — это Redis, так что на самом деле это инициализация соединения Redis. Что касается клиентской библиотеки Redis, мой выборgo-redis/redis.
  • process.Init(): Этот шаг в основном предназначен для загрузки и восстановления запуска каждого целевого механизма транзакции и всех данных заказа из кэша.

Инициализация viper и redis написана со ссылкой на официальную демку, поэтому я не буду ее здесь объяснять. журнал будет обсуждаться отдельно позже. Необходимо тщательно обсудить инициализацию пакета двигателя и пакета процесса.

Среди них, хотя инициализация пакета движка очень проста, она очень критична, а ее код написан наengine/init.goВ файле полный код выглядит следующим образом:

package engine

var ChanMap map[string]chan Order

func Init() {
	ChanMap = make(map[string]chan Order)
}

Ключ этой карты, который сохраняет канал, является символом каждой цели транзакции, что означает, что каждая цель транзакции имеет канал заказа, и эти каналы заказа будут использоваться в качестве очереди последовательности каждой цели транзакции.

Инициализация пакета процесса выглядит следующим образом:

func Init() {
	symbols := cache.GetSymbols()
	for _, symbol := range symbols {
		price := cache.GetPrice(symbol)
		NewEngine(symbol, price)

		orderIds := cache.GetOrderIdsWithAction(symbol)
		for _, orderId := range orderIds {
			mapOrder := cache.GetOrder(symbol, orderId)
			order := engine.Order{}
			order.FromMap(mapOrder)
			engine.ChanMap[order.Symbol] <- order
		}
	}
}

Кратко объясните логику реализации:

  1. Прочитать все символы из кеша, то есть до перезапуска программы были открыты символы всех совпавших целей транзакций;
  2. Считайте цену, соответствующую каждому символу из кеша, которая является последней ценой сделки перед перезапуском программы;
  3. Запустите механизм сопоставления для каждого символа;
  4. Прочитать все ордера по каждому символу из кеша, эти ордера расположены в хронологическом порядке;
  5. Эти ордера добавляются в ордерный канал соответствующего символа по порядку.

Не имеет значения, если вы не понимаете здесь какую-то логику проектирования, и я объясню ее подробно позже, когда буду говорить о соответствующих модулях.

В функции main() определяются три упомянутых ранее интерфейса, которые передаются соответствующим обработчикам для обработки конкретных запросов, а затем запускается служба http.

handler

Поскольку интерфейсов всего несколько и они очень просты, сторонние веб-фреймворки не вводятся, а все обработчики реализованы изначально. Первый взглядOpenMatchingПолная реализация:

package handler

import (
	"encoding/json"
	"io/ioutil"
	"net/http"
	"strings"

	"matching/errcode"
	"matching/process"

	"github.com/shopspring/decimal"
)

type openMatchingParams struct {
	Symbol string          `json:"symbol"`
	Price  decimal.Decimal `json:"price"`
}

func OpenMatching(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")
	if r.Method != http.MethodPost {
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	var params openMatchingParams
	if err := json.Unmarshal(body, &params); err != nil {
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	if strings.TrimSpace(params.Symbol) == "" {
		w.Write(errcode.BlankSymbol.ToJson())
		return
	}

	if params.Price.IsNegative() {
		w.Write(errcode.InvalidPrice.ToJson())
		return
	}

	if e := process.NewEngine(params.Symbol, params.Price); !e.IsOK() {
		w.Write(e.ToJson())
		return
	}

	w.Write(errcode.OK.ToJson())
}

Логика очень проста, сначала определяется, был ли POST-запрос, затем считываются данные в теле и превращаются в объекты структуры, затем выполняется простая проверка параметров и, наконец, вызываетсяprocess.NewEngine(symbol, price)Введите следующий шаг бизнес-логики, если результат ОК, также верните ОК в качестве ответа на запрос.

Кроме того, сторонниеdecimal.DecimalТип используется для представления цены, а вся программа использует десятичное число для представления чисел с плавающей запятой и выполнения точных вычислений.

CloseMatchingа такжеHandleOrderЛогика реализации такая же, CloseMatching, наконец, вызоветprocess.CloseEngine(symbol)Функция переходит к следующему шагу обработки, и, наконец, вызывается HandleOrder.process.Dispatch(order)Перейти к следующему шагу. Однако структура заказа определяется в пакете двигателя, а его структура выглядит следующим образом:

type Order struct {
	Action    enum.OrderAction `json:"action"`
	Symbol    string           `json:"symbol"`
	OrderId   string           `json:"orderId"`
	Side      enum.OrderSide   `json:"side"`
	Type      enum.OrderType   `json:"type"`
	Amount    decimal.Decimal  `json:"amount"`
	Price     decimal.Decimal  `json:"price"`
	Timestamp int64            `json:"timestamp"`
}

Видно, что помимо типа Decimal существует еще несколько типов пакета enum, которые на самом деле являются типами перечисления, определенными нашей программой. Сам язык Golang не предоставляет того же ключевого слова enum, что и другие языки, для определения типов перечисления, поэтому обычно используетсяОпределение типа + константаЧтобы смоделировать тип перечисления, возьмем enum.OrderAction в качестве примера:

type OrderAction string

const (
	ActionCreate OrderAction = "create"
	ActionCancel OrderAction = "cancel"
)

Несколько других типов перечисления также определяются таким образом.

Кроме того, для облегчения преобразования в строку и проверки допустимости параметров в программе также предусмотрены две функции для каждого типа перечисления, на примере OrderAction:

func (o OrderAction) String() string {
	switch o {
	case ActionCreate:
		return "create"
	case ActionCancel:
		return "cancel"
	default:
		return "unknown"
	}
}

func (o OrderAction) Valid() bool {
	if o.String() == "unknown" {
		return false
	}
	return true
}

Несколько других типов перечисления также определяют две похожие функции, код которых больше не публикуется.

пакет процессов

Давайте посмотрим, какие файлы находятся в пакете процесса:

└── process                  #
    ├── close_engine.go      # 关闭引擎
    ├── dispatch.go          # 分发订单
    ├── init.go              # 初始化
    └── new_engine.go        # 启动新引擎

init.go — это функция инициализации, о которой упоминалось выше. Остальные три файла определяют следующую логическую реализацию, соответствующую трем обработчикам выше.

завести новый двигатель

Первый взглядnew_engine.go:

package process

import (
	"matching/engine"
	"matching/errcode"
	"matching/middleware/cache"

	"github.com/shopspring/decimal"
)

func NewEngine(symbol string, price decimal.Decimal) *errcode.Errcode {
	if engine.ChanMap[symbol] != nil {
		return errcode.EngineExist
	}

	engine.ChanMap[symbol] = make(chan engine.Order, 100)
	go engine.Run(symbol, price)

	cache.SaveSymbol(symbol)
	cache.SavePrice(symbol, price)

	return errcode.OK
}

Логика также относительно проста.Первым шагом является определение того, является ли ChanMap[symbol] пустым.ChanMap — это карта, используемая для сохранения канала ордера при инициализации пакета движка. Если ChanMap[символ] не пуст, это означает, что механизм сопоставления для символа был запущен, и возвращается ошибка. Если он пустой, то инициализируем канал этого символа, как видно из кода, ChanMap[символ] инициализируется как канал ордера с размером буфера 100.

Затем вызовите engine.Run() для запуска горутины.Эта строка кода означает, что соответствующий механизм указанного символа запускается горутиной.

Затем и символ, и цена кэшируются.

Наконец, вернитесь к OK, готово.

2. Раздайте заказ

Далее давайте посмотрим, как выглядит реализация Dispatch:

func Dispatch(order engine.Order) *errcode.Errcode {
	if engine.ChanMap[order.Symbol] == nil {
		return errcode.EngineNotFound
	}

	if order.Action == enum.ActionCreate {
		if cache.OrderExist(order.Symbol, order.OrderId, order.Action.String()) {
			return errcode.OrderExist
		}
	} else {
		if !cache.OrderExist(order.Symbol, order.OrderId, enum.ActionCreate.String()) {
			return errcode.OrderNotFound
		}
	}

	order.Timestamp = time.Now().UnixNano() / 1e3
	cache.SaveOrder(order.ToMap())
	engine.ChanMap[order.Symbol] <- order

	return errcode.OK
}

Первым делом нужно определить, пуста ли карта ChanMap[order.Symbol], если она пуста, значит двигатель не включен и заказ не может быть обработан.

Второй шаг — определить, существует ли заказ. Если это заказ на создание, заказ не должен быть найден в кеше, иначе это означает повторный запрос. Если это заказ на отмену, если заказ не может быть найден в кеше, это означает, что заказ был полностью выполнен или заказ был успешно отменен.

Третий шаг — установить время заказа как текущее время, а единицей измерения времени являются 100 наносекунд, что может гарантировать, что длина метки времени будет ровно 16 бит, и не будет проблем с искажением точности при сохранении в Redis. В этой последующей статье будет рассказано о детальном проектировании Redis.

Четвертый шаг — кэширование заказа.

Пятый шаг — передать заказ в соответствующий канал заказа, и соответствующий движок получит заказ из канала для обработки. Этот шаг реализует распределение заказа.

Шаг шестой, вернуться в ОК.

3. Заглушите двигатель

Реализация закрытия движка очень проста, смотрите код:

func CloseEngine(symbol string) *errcode.Errcode {
	if engine.ChanMap[symbol] == nil {
		return errcode.EngineNotFound
	}

	close(engine.ChanMap[symbol])

	return errcode.OK
}

Основной код представляет собой всего одну строку, которая закрывает канал ордера, соответствующий символу. Последующая обработка фактически выполняется в движке, и позже мы объясним эту схему в сочетании с кодом в движке.

Реализация ввода двигателя

Начальная запись горутины механизма транзакций:engine.Run()функцию, давайте посмотрим на ее кодовую реализацию:

func Run(symbol string, price decimal.Decimal) {
	lastTradePrice := price

	book := &orderBook{}
	book.init()

	log.Info("engine %s is running", symbol)
	for {
		order, ok := <-ChanMap[symbol]
		if !ok {
			log.Info("engine %s is closed", symbol)
			delete(ChanMap, symbol)
			cache.Clear(symbol)
			return
		}
		log.Info("engine %s receive an order: %s", symbol, order.ToJson())
		switch order.Action {
		case enum.ActionCreate:
			dealCreate(&order, book, &lastTradePrice)
		case enum.ActionCancel:
			dealCancel(&order, book)
		}
	}
}

Первым шагом является определение и инициализация переменной book, которая используется для сохранения всегоКнига транзакционных ордеров.

Тогда естьforцикл, первая строка в цикле for берется из соответствующегоsymbolЗаказ читается в канале заказа , и когда заказ читается,orderпеременная будет иметь значение, иokпеременнаяtrue. Если ордера в канале нет, он будет блокироваться на этой строке кода до тех пор, пока ордер не будет получен из канала или канал не будет закрыт.

Когда канал закрыт, наконец, прочитать из каналаokпеременнаяfalse, конечно, перед этим будут последовательно читаться остальные ордера в канале. Когда ok имеет значение false, движок выполнит два шага: первый — удалить запись, соответствующую символу, из ChanMap, а другой — очистить кэшированные данные, соответствующие символу. последнее использованиеreturnдля выхода из цикла for, так что вся функция Run() завершается и завершается, что означает, что двигатель фактически выключается.

Когда ордер прочитан, он определяет, размещать или отменять ордер, а затем выполняет соответствующую логическую обработку.

Давайте сначала рассмотрим логику отмены ордера, которая относительно проста:

func dealCancel(order *Order, book *orderBook) {
	var ok bool
	switch order.Side {
	case enum.SideBuy:
		ok = book.removeBuyOrder(order)
	case enum.SideSell:
		ok = book.removeSellOrder(order)
	}

	cache.RemoveOrder(order.ToMap())
	mq.SendCancelResult(order.Symbol, order.OrderId, ok)
	log.Info("engine %s, order %s cancel result is %s", order.Symbol, order.OrderId, ok)
}

Суть состоит из трех шагов:

  1. удалить ордер из книги ордеров;
  2. удалить заказ из кеша;
  3. Отправьте результат отмены в MQ.

Логика размещения ордера более сложная.Для разных типов ордеров требуется различная логическая обработка.Пожалуйста, смотрите код:

func dealCreate(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
	switch order.Type {
	case enum.TypeLimit:
		dealLimit(order, book, lastTradePrice)
	case enum.TypeLimitIoc:
		dealLimitIoc(order, book, lastTradePrice)
	case enum.TypeMarket:
		dealMarket(order, book, lastTradePrice)
	case enum.TypeMarketTop5:
		dealMarketTop5(order, book, lastTradePrice)
	case enum.TypeMarketTop10:
		dealMarketTop10(order, book, lastTradePrice)
	case enum.TypeMarketOpponent:
		dealMarketOpponent(order, book, lastTradePrice)
	}
}

Каждый тип делится на направления покупки и продажи, например, tradeLimit():

func dealLimit(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
	switch order.Side {
	case enum.SideBuy:
		dealBuyLimit(order, book, lastTradePrice)
	case enum.SideSell:
		dealSellLimit(order, book, lastTradePrice)
	}
}

Затем давайте взглянем на логику обработки DealBuyLimit():

func dealBuyLimit(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
LOOP:
	headOrder := book.getHeadSellOrder()
	if headOrder == nil || order.Price.LessThan(headOrder.Price) {
		book.addBuyOrder(order)
		log.Info("engine %s, a order has added to the orderbook: %s", order.Symbol, order.ToJson())
	} else {
		matchTrade(headOrder, order, book, lastTradePrice)
		if order.Amount.IsPositive() {
			goto LOOP
		}
	}
}

Позвольте мне проанализировать этот поток обработки:

  1. Поручил продавать книги, прочитанные из единой очереди в порядке заведующей;
  2. Если головной ордер пуст, или цена нового ордера (ордера на покупку) ниже, чем цена головного ордера (ордера на продажу), транзакция не может быть сопоставлена, то новый ордер будет добавлен в очередь ордеров на покупку доверенного книга;
  3. Если головной ордер не пуст, а цена нового ордера (ордера на покупку) больше или равна цене головного ордера (ордера на продажу), два ордера могут быть сопоставлены и исполнены, тогда два ордера будут исполнены;
  4. Если оставшееся количество нового заказа не равно нулю после обработки транзакции предыдущего шага, продолжайте повторять первый шаг.

Среди них запись, соответствующая транзакции, будет отправлена ​​в MQ в качестве выходной записи.

Обработка других типов также аналогична и не будет объясняться по отдельности.

Реализация пакета двигателя будет обсуждаться здесь первым, а реализация других частей будет обсуждаться в последующих статьях.

резюме

В этом разделе в основном рассматривается весь процесс обработки данных с помощью кода, включая некоторые подробные проекты. После понимания кодов, перечисленных в этой статье, я понимаю более половины реализации всего сервиса сопоставления.

Вопрос на этот раз: Можно ли изменить канал заказа, сохраненный ChanMap, на небуферизованный канал? В чем разница между логикой обработки с небуферизованными каналами и с буферизованными каналами? Каковы плюсы и минусы каждого из двух вариантов?


Отсканируйте следующий QR-код, чтобы подписаться на официальную учетную запись (имя общедоступной учетной записи: Киган Сяоган)

Личный блог автора