- Оригинальный адрес:уровень up.git connect.com/lets-write-…
- Оригинальный автор:Kasun Vithanage
- Адрес перевода:GitHub.com/вода Мел О/…
- Переводчик: хаки хаки
- Уровень переводчика ограничен, если есть ошибка в переводе или понимании, помогите указать
Архитектура, управляемая событиями, — это высоко масштабируемая парадигма компьютерных наук. Это позволяет нам асинхронно обрабатывать события в многопартийной системе.
Автобус событийопубликовать/подписать модельРеализация, в которой издатели публикуют данные, а заинтересованные подписчики могут прослушивать эти данные и действовать на их основе. Это делает издателей слабо связанными с подписчиками. Издатели публикуют события данных в шину событий, а шина отвечает за их отправку подписчикам.
Традиционный подход к реализации шины событий предполагает использование обратных вызовов. Подписчики обычно реализуют интерфейс, а шина событий затем распространяет данные через интерфейс.
Используя модель параллелизма go, мы знаем, что в большинстве мест можно использоватьchannel
чтобы заменить обратный вызов. В этой статье мы сосредоточимся на том, как использоватьchannel
реализовать шину событий.
мы фокусируемся наТематические события. Издатели публикуют в темы, а подписчики могут их слушать.
определить структуру данных
Чтобы реализовать шину событий, нам нужно определить структуру данных, которую нужно передать. мы можем использоватьstruct
Просто создайте новый тип данных. мы определяемDataEvent
Структура выглядит следующим образом:
type DataEvent struct {
Data interface{}
Topic string
}
Здесь мы определили базовые данные как интерфейс, что означает, что это может быть любое значение. Мы также определяем субъекты как члены структуры. подписчики могут слушатьнесколько тем, поэтому хорошо, что мы используем темы, чтобы подписчики могли различать разные события.
Представляем каналы
Теперь, когда мы определили нашу основную структуру данных для шины событий, нам также нужен способ ее передачи. Для этого мы можем определитьDataEvent
изDataChannel
тип.
// DataChannel 是一个能接收 DataEvent 的 channel
type DataChannel chan DataEvent
// DataChannelSlice 是一个包含 DataChannels 数据的切片
type DataChannelSlice [] DataChannel
DataChannelSlice
был создан для сохраненияDataChannel
фрагменты и ссылаться на них легко.
автобус событий
// EventBus 存储有关订阅者感兴趣的特定主题的信息
type EventBus struct {
subscribers map[string]DataChannelSlice
rm sync.RWMutex
}
EventBus
имеютsubscribers
, который содержитDataChannelSlices
карта. Мы используем мьютексы для защиты операций чтения и записи от одновременного доступа.
используяmap
Определениеtopics
Это позволяет нам легко организовывать мероприятия. Тема считаетсяmap
ключ. Когда кто-то публикует его, мы можем легко найти тему по ключу, а затем распространить событие наchannel
для дальнейшей обработки.
Подписывайтесь на темы
Для тем подписки используйтеchannel
. Это как обратный вызов в традиционном методе. Когда издатель публикует данные по теме,channel
Вы получите данные.
func (eb *EventBus)Subscribe(topic string, ch DataChannel) {
eb.rm.Lock()
if prev, found := eb.subscribers[topic]; found {
eb.subscribers[topic] = append(prev, ch)
} else {
eb.subscribers[topic] = append([]DataChannel{}, ch)
}
eb.rm.Unlock()
}
Проще говоря, мы добавляем подписчиков наchannel
Затем структура блокируется в срезе и, наконец, разблокируется после операции.
Опубликовать тему
Чтобы опубликовать событие, издатель должен предоставить подписчикам тему и данные, необходимые для трансляции.
func (eb *EventBus) Publish(topic string, data interface{}) {
eb.rm.RLock()
if chans, found := eb.subscribers[topic]; found {
// 这样做是因为切片引用相同的数组,即使它们是按值传递的
// 因此我们正在使用我们的元素创建一个新切片,从而能正确地保持锁定
channels := append(DataChannelSlice{}, chans...)
go func(data DataEvent, dataChannelSlices DataChannelSlice) {
for _, ch := range dataChannelSlices {
ch <- data
}
}(DataEvent{Data: data, Topic: topic}, channels)
}
eb.rm.RUnlock()
}
В этом методе мы сначала проверяем, есть ли подписчики на тему. Затем мы просто перебираемchannel
Нарезайте и публикуйте в них события.
Обратите внимание, что мы использовали горутину в методе публикации, чтобы избежать блокировки издателя.
Начинать
Во-первых, нам нужно создать экземпляр шины событий. В реальном сценарии вы можете экспортировать сингл из пакетаEventBus
,заставить его работать как синглтон.
var eb = &EventBus{
subscribers: map[string]DataChannelSlice{},
}
Чтобы протестировать только что созданную шину событий, мы создадим метод, который публикует указанную тему со случайным интервалом.
func publisTo(topic string, data string) {
for {
eb.Publish(topic, data)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
}
Далее нам нужна основная функция, которая может слушать тему. Он использует вспомогательный метод для вывода данных события.
func printDataEvent(ch string, data DataEvent) {
fmt.Printf("Channel: %s; Topic: %s; DataEvent: %v\n", ch, data.Topic, data.Data)
}
func main() {
ch1 := make(chan DataEvent)
ch2 := make(chan DataEvent)
ch3 := make(chan DataEvent)
eb.Subscribe("topic1", ch1)
eb.Subscribe("topic2", ch2)
eb.Subscribe("topic2", ch3)
go publisTo("topic1", "Hi topic 1")
go publisTo("topic2", "Welcome to topic 2")
for {
select {
case d := <-ch1:
go printDataEvent("ch1", d)
case d := <-ch2:
go printDataEvent("ch2", d)
case d := <-ch3:
go printDataEvent("ch3", d)
}
}
}
Мы создали три, которые могут подписываться на темыchannels
Подписчики (ch1, ch2, ch3). где ch2 и ch3 прослушивают одно и то же событие.
Мы используем оператор select из самого быстрого возвращаемогоchannel
для получения данных. Затем он распечатывает данные с помощью другой горутины. Использование горутин также не обязательно. Но в некоторых случаях вам приходится выполнять тяжелую обработку событий. Чтобы предотвратить блокировку выбора, мы используем горутину.
Пример вывода будет выглядеть так
Channel: ch1; Topic: topic1; DataEvent: Hi topic 1
Channel: ch2; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch2; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch1; Topic: topic1; DataEvent: Hi topic 1
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
...
Вы можете видеть, что шина событий проходит черезchannel
Распределяйте события.
основанный на простотеchannel
Исходный код шины событий.
полный код
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type DataEvent struct {
Data interface{}
Topic string
}
// DataChannel 是一个能接收 DataEvent 的 channel
type DataChannel chan DataEvent
// DataChannelSlice 是一个包含 DataChannels 数据的切片
type DataChannelSlice []DataChannel
// EventBus 存储有关订阅者感兴趣的特定主题的信息
type EventBus struct {
subscribers map[string]DataChannelSlice
rm sync.RWMutex
}
func (eb *EventBus) Publish(topic string, data interface{}) {
eb.rm.RLock()
if chans, found := eb.subscribers[topic]; found {
// 这样做是因为切片引用相同的数组,即使它们是按值传递的
// 因此我们正在使用我们的元素创建一个新切片,从而正确地保持锁定
channels := append(DataChannelSlice{}, chans...)
go func(data DataEvent, dataChannelSlices DataChannelSlice) {
for _, ch := range dataChannelSlices {
ch <- data
}
}(DataEvent{Data: data, Topic: topic}, channels)
}
eb.rm.RUnlock()
}
func (eb *EventBus) Subscribe(topic string, ch DataChannel) {
eb.rm.Lock()
if prev, found := eb.subscribers[topic]; found {
eb.subscribers[topic] = append(prev, ch)
} else {
eb.subscribers[topic] = append([]DataChannel{}, ch)
}
eb.rm.Unlock()
}
var eb = &EventBus{
subscribers: map[string]DataChannelSlice{},
}
func printDataEvent(ch string, data DataEvent) {
fmt.Printf("Channel: %s; Topic: %s; DataEvent: %v\n", ch, data.Topic, data.Data)
}
func publisTo(topic string, data string) {
for {
eb.Publish(topic, data)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
}
func main() {
ch1 := make(chan DataEvent)
ch2 := make(chan DataEvent)
ch3 := make(chan DataEvent)
eb.Subscribe("topic1", ch1)
eb.Subscribe("topic2", ch2)
eb.Subscribe("topic2", ch3)
go publisTo("topic1", "Hi topic 1")
go publisTo("topic2", "Welcome to topic 2")
for {
select {
case d := <-ch1:
go printDataEvent("ch1", d)
case d := <-ch2:
go printDataEvent("ch2", d)
case d := <-ch3:
go printDataEvent("ch3", d)
}
}
}
Причины использовать каналы вместо обратных вызовов
Традиционный вызов обратно, необходимый для достижения своего рода интерфейс.
Например,
type Subscriber interface {
onData(event Event)
}
С обратными вызовами, если вы хотите подписаться на событие, вам необходимо реализовать этот интерфейс, чтобы шина событий могла его распространять.
type MySubscriber struct {
}
func (m MySubscriber) onData(event Event) {
// 处理事件
}
иchannel
Позволяет регистрировать подписчиков в простой функции без интерфейса.
func main() {
ch1 := make(chan DataEvent)
eb.Subscribe("topic1", ch1)
fmt.Println((<-ch1).Data)
...
}
в заключении
Цель этой статьи — указать на различные реализации написания шины событий.
Возможно, это не идеальное решение.
Например,channel
блокируются до тех пор, пока их кто-нибудь не съест. Это имеет определенные ограничения.
Я использовал фрагменты для хранения всех подписчиков темы. Это используется для упрощения статей. Это требуетSETЗаменить, чтобы в списке не было дубликатов подписчиков.
Традиционный метод обратного вызова можно просто реализовать с использованием тех же принципов. Вы можете легко оформлять и публиковать события асинхронно в горутине.
Я хотел бы услышать ваши мысли об этой статье. :)