предисловие
Воспользовавшись свободным временем, я подумал о создании мини-очереди сообщений производства-потребления и сделал это, как только сказал. Я джавар, и в этот раз я сознательно переключился на го. Верно, начать работу с go можно с нуля, и вы можете изучить его, кстати.
Необходимые знания:
- перейти к основному синтаксису
- Понятие очереди сообщений, их три: производитель, потребитель, очередь
Цель
Я не думал о том, насколько сложной будет реализация, потому что время ограничено, просто мини подойдет.Сколько стоит мини?
-
Использовать структуру данных двусвязного списка в качестве очереди
-
Существует несколько тем, по которым производители могут генерировать сообщения, а потребители — потреблять сообщения.
-
Поддерживает одновременную запись производителями
-
Поддержка потребительского чтения, а после одобрения удаление из очереди
-
Сообщения не теряются (постоянные)
-
Высокая производительность (думаю так)
дизайн
Общая структура
протокол
Нижний уровень протокола связи использует tcp, а mq настраивает протокол на основе tcp. Протокол выглядит следующим образом.
type Msg struct {
Id int64
TopicLen int64
Topic string
// 1-consumer 2-producer 3-comsumer-ack 4-error
MsgType int64 // 消息类型
Len int64 // 消息长度
Payload []byte // 消息
}
Payload использует массив байтов, потому что независимо от того, что это за данные, их можно рассматривать только как массив байтов. Msg содержит сообщение, созданное производителем, сообщение, полученное потребителем, ACK и сообщения об ошибках. Первые два будут загружены, а последние два будут пустыми.
Обработка кодирования и декодирования протокола — это обработка байтов Далее идут две функции: from bytes to Msg и from Msg to bytes.
func BytesToMsg(reader io.Reader) Msg {
m := Msg{}
var buf [128]byte
n, err := reader.Read(buf[:])
if err != nil {
fmt.Println("read failed, err:", err)
}
fmt.Println("read bytes:", n)
// id
buff := bytes.NewBuffer(buf[0:8])
binary.Read(buff, binary.LittleEndian, &m.Id)
// topiclen
buff = bytes.NewBuffer(buf[8:16])
binary.Read(buff, binary.LittleEndian, &m.TopicLen)
// topic
msgLastIndex := 16 + m.TopicLen
m.Topic = string(buf[16: msgLastIndex])
// msgtype
buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 8])
binary.Read(buff, binary.LittleEndian, &m.MsgType)
buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 16])
binary.Read(buff, binary.LittleEndian, &m.Len)
if m.Len <= 0 {
return m
}
m.Payload = buf[msgLastIndex + 16:]
return m
}
func MsgToBytes(msg Msg) []byte {
msg.TopicLen = int64(len([]byte(msg.Topic)))
msg.Len = int64(len([]byte(msg.Payload)))
var data []byte
buf := bytes.NewBuffer([]byte{})
binary.Write(buf, binary.LittleEndian, msg.Id)
data = append(data, buf.Bytes()...)
buf = bytes.NewBuffer([]byte{})
binary.Write(buf, binary.LittleEndian, msg.TopicLen)
data = append(data, buf.Bytes()...)
data = append(data, []byte(msg.Topic)...)
buf = bytes.NewBuffer([]byte{})
binary.Write(buf, binary.LittleEndian, msg.MsgType)
data = append(data, buf.Bytes()...)
buf = bytes.NewBuffer([]byte{})
binary.Write(buf, binary.LittleEndian, msg.Len)
data = append(data, buf.Bytes()...)
data = append(data, []byte(msg.Payload)...)
return data
}
очередь
Используйте контейнер/список для реализации принципа «первым поступил — первым обслужен», производитель пишет в конце очереди, а потребитель читает в начале очереди.
package broker
import (
"container/list"
"sync"
)
type Queue struct {
len int
data list.List
}
var lock sync.Mutex
func (queue *Queue) offer(msg Msg) {
queue.data.PushBack(msg)
queue.len = queue.data.Len()
}
func (queue *Queue) poll() Msg{
if queue.len == 0 {
return Msg{}
}
msg := queue.data.Front()
return msg.Value.(Msg)
}
func (queue *Queue) delete(id int64) {
lock.Lock()
for msg := queue.data.Front(); msg != nil; msg = msg.Next() {
if msg.Value.(Msg).Id == id {
queue.data.Remove(msg)
queue.len = queue.data.Len()
break
}
}
lock.Unlock()
}
Метод offer вставляет данные в очередь, poll считывает элемент данных из заголовка очереди, а delete удаляет данные из очереди в соответствии с идентификатором сообщения. На самом деле необходимо использовать структуру Queue для инкапсуляции списка.В качестве базовой структуры данных мы надеемся скрыть больше базовых операций и предоставить клиентам только основные операции.
Операция удаления удаляет сообщение из очереди после того, как потребитель успешно использует и отправляет ACK.Поскольку потребитель может потреблять несколько одновременно, он блокируется при входе в критическую секцию (эм, то, повлияет ли блокировка на производительность, имеет большее влияние )
broker
Брокер действует как сервер и отвечает за получение соединений, получение и ответ на запросы.
package broker
import (
"bufio"
"net"
"os"
"sync"
"time"
)
var topics = sync.Map{}
func handleErr(conn net.Conn) {
if err := recover(); err != nil {
println(err.(string))
conn.Write(MsgToBytes(Msg{MsgType: 4}))
}
}
func Process(conn net.Conn) {
defer handleErr(conn)
reader := bufio.NewReader(conn)
msg := BytesToMsg(reader)
queue, ok := topics.Load(msg.Topic)
var res Msg
if msg.MsgType == 1 {
// comsumer
if queue == nil || queue.(*Queue).len == 0{
return
}
msg = queue.(*Queue).poll()
msg.MsgType = 1
res = msg
} else if msg.MsgType == 2 {
// producer
if ! ok {
queue = &Queue{}
queue.(*Queue).data.Init()
topics.Store(msg.Topic, queue)
}
queue.(*Queue).offer(msg)
res = Msg{Id: msg.Id, MsgType: 2}
} else if msg.MsgType == 3 {
// consumer ack
if queue == nil {
return
}
queue.(*Queue).delete(msg.Id)
}
conn.Write(MsgToBytes(res))
}
Когда MsgType равен 1, сообщение потребляется напрямую, когда MsgType равен 2, сообщение создается производителем.Если очередь пуста, необходимо создать новую очередь и поместить ее в соответствующую тему, когда MsgType равен до 3, это означает, что потребитель успешно потребляет, сообщение можно удалить
Мы говорим, что сообщение не потеряно, реализация здесь не завершена, поэтому я реализовал персистентность (персистентность реализована не полностью). Идея состоит в том, чтобы сериализовать сообщения в очереди, соответствующей теме, согласно формату протокола, и восстанавливать их из файла при запуске брокера.
Персистентность должна учитывать, является ли она инкрементной или полной, и как долго она должна храниться. Это повлияет на сложность и производительность реализации (подумайте о сохраняемости Kafka и Redis). Здесь это означает простую реализацию: таймеры регулярно сохраняется
func Save() {
ticker := time.NewTicker(60)
for {
select {
case <-ticker.C:
topics.Range(func(key, value interface{}) bool {
if value == nil {
return false
}
file, _ := os.Open(key.(string))
if file == nil {
file, _ = os.Create(key.(string))
}
for msg := value.(*Queue).data.Front(); msg != nil; msg = msg.Next() {
file.Write(MsgToBytes(msg.Value.(Msg)))
}
file.Close()
return false
})
default:
time.Sleep(1)
}
}
}
Один вопрос: когда выполняется вышеуказанная операция удаления, нужно ли удалять файл файла с соответствующим сообщением? Ответ заключается в том, что вам нужно удалить его.Если вы не удалите его, вы можете только дождаться следующего полного сохранения, чтобы покрыть его, и в середине возникнут проблемы с грязными данными.
Ниже представлена логика запуска
package main
import (
"awesomeProject/broker"
"fmt"
"net"
)
func main() {
listen, err := net.Listen("tcp", "127.0.0.1:12345")
if err != nil {
fmt.Print("listen failed, err:", err)
return
}
go broker.Save()
for {
conn, err := listen.Accept()
if err != nil {
fmt.Print("accept failed, err:", err)
continue
}
go broker.Process(conn)
}
}
режиссер
package main
import (
"awesomeProject/broker"
"fmt"
"net"
)
func produce() {
conn, err := net.Dial("tcp", "127.0.0.1:12345")
if err != nil {
fmt.Print("connect failed, err:", err)
}
defer conn.Close()
msg := broker.Msg{Id: 1102, Topic: "topic-test", MsgType: 2, Payload: []byte("我")}
n, err := conn.Write(broker.MsgToBytes(msg))
if err != nil {
fmt.Print("write failed, err:", err)
}
fmt.Print(n)
}
потребитель
package main
import (
"awesomeProject/broker"
"bytes"
"fmt"
"net"
)
func comsume() {
conn, err := net.Dial("tcp", "127.0.0.1:12345")
if err != nil {
fmt.Print("connect failed, err:", err)
}
defer conn.Close()
msg := broker.Msg{Topic: "topic-test", MsgType: 1}
n, err := conn.Write(broker.MsgToBytes(msg))
if err != nil {
fmt.Println("write failed, err:", err)
}
fmt.Println("n", n)
var res [128]byte
conn.Read(res[:])
buf := bytes.NewBuffer(res[:])
receMsg := broker.BytesToMsg(buf)
fmt.Print(receMsg)
// ack
conn, _ = net.Dial("tcp", "127.0.0.1:12345")
l, e := conn.Write(broker.MsgToBytes(broker.Msg{Id: receMsg.Id, Topic: receMsg.Topic, MsgType: 3}))
if e != nil {
fmt.Println("write failed, err:", err)
}
fmt.Println("l:", l)
}
Потребитель повторно создает соединение при подтверждении здесь.Если соединение не создано, серверу необходимо прочитать данные из соединения до конца. Подумайте об этом, как и у RabbitMQ ack, есть автоматические и ручные acks.Если это ручное ack, необходимо новое соединение, потому что я не знаю, когда клиент отправляет ack.Если оно автоматическое, конечно то же самое соединение можно использовать, но здесь Просто создайте новое соединение
запускать
Сначала запустите брокера, затем запустите производителя, затем запустите потребителя, хорошо, он может работать, может отправлять сообщения в очередь и потреблять сообщения из очереди.
Суммировать
Хотя все просто, оно реализовано с помощью go. Впервые меня смутил gopath и go mod.Использование следующего синтаксиса, такого как указатели, передача по значению, передача по ссылке и т. д., самое хлопотное — это преобразование типов. javar, использование go для преобразования типов действительно сложно.
В конце концов, я думаю, что это круто использовать go, мне это нравится...