[Перевод] Реализация простого шаблона шины событий в Go

Go

Архитектура, управляемая событиями, — это высоко масштабируемая парадигма компьютерных наук. Это позволяет нам асинхронно обрабатывать события в многопартийной системе.

Автобус событийопубликовать/подписать модельРеализация, в которой издатели публикуют данные, а заинтересованные подписчики могут прослушивать эти данные и действовать на их основе. Это делает издателей слабо связанными с подписчиками. Издатели публикуют события данных в шину событий, а шина отвечает за их отправку подписчикам.

Традиционный подход к реализации шины событий предполагает использование обратных вызовов. Подписчики обычно реализуют интерфейс, а шина событий затем распространяет данные через интерфейс.

Используя модель параллелизма go, мы знаем, что в большинстве мест можно использоватьchannelчтобы заменить обратный вызов. В этой статье мы сосредоточимся на том, как использоватьchannelреализовать шину событий.

мы фокусируемся наТематические события. Издатели публикуют в темы, а подписчики могут их слушать.

определить структуру данных

Чтобы реализовать шину событий, нам нужно определить структуру данных, которую нужно передать. мы можем использоватьstructПросто создайте новый тип данных. мы определяемDataEventСтруктура выглядит следующим образом:

type DataEvent struct {
   Data interface{}
   Topic string
}

Здесь мы определили базовые данные как интерфейс, что означает, что это может быть любое значение. Мы также определяем субъекты как члены структуры. подписчики могут слушатьнесколько тем, поэтому хорошо, что мы используем темы, чтобы подписчики могли различать разные события.

Представляем каналы

Теперь, когда мы определили нашу основную структуру данных для шины событий, нам также нужен способ ее передачи. Для этого мы можем определитьDataEventизDataChannelтип.

// DataChannel 是一个能接收 DataEvent 的 channel
type DataChannel chan DataEvent

// DataChannelSlice 是一个包含 DataChannels 数据的切片
type DataChannelSlice [] DataChannel

DataChannelSliceбыл создан для сохраненияDataChannelфрагменты и ссылаться на них легко.

автобус событий

// EventBus 存储有关订阅者感兴趣的特定主题的信息
type EventBus struct {
   subscribers map[string]DataChannelSlice
   rm sync.RWMutex
}

EventBusимеютsubscribers, который содержитDataChannelSlicesкарта. Мы используем мьютексы для защиты операций чтения и записи от одновременного доступа.

используяmapОпределениеtopicsЭто позволяет нам легко организовывать мероприятия. Тема считаетсяmapключ. Когда кто-то публикует его, мы можем легко найти тему по ключу, а затем распространить событие наchannelдля дальнейшей обработки.

Подписывайтесь на темы

Для тем подписки используйтеchannel. Это как обратный вызов в традиционном методе. Когда издатель публикует данные по теме,channelВы получите данные.

func (eb *EventBus)Subscribe(topic string, ch DataChannel)  {
   eb.rm.Lock()
   if prev, found := eb.subscribers[topic]; found {
      eb.subscribers[topic] = append(prev, ch)
   } else {
      eb.subscribers[topic] = append([]DataChannel{}, ch)
   }
   eb.rm.Unlock()
}

Проще говоря, мы добавляем подписчиков наchannelЗатем структура блокируется в срезе и, наконец, разблокируется после операции.

Опубликовать тему

Чтобы опубликовать событие, издатель должен предоставить подписчикам тему и данные, необходимые для трансляции.

func (eb *EventBus) Publish(topic string, data interface{}) {
   eb.rm.RLock()
   if chans, found := eb.subscribers[topic]; found {
      // 这样做是因为切片引用相同的数组,即使它们是按值传递的
      // 因此我们正在使用我们的元素创建一个新切片,从而能正确地保持锁定
      channels := append(DataChannelSlice{}, chans...)
      go func(data DataEvent, dataChannelSlices DataChannelSlice) {
         for _, ch := range dataChannelSlices {
            ch <- data
         }
      }(DataEvent{Data: data, Topic: topic}, channels)
   }
   eb.rm.RUnlock()
}

В этом методе мы сначала проверяем, есть ли подписчики на тему. Затем мы просто перебираемchannelНарезайте и публикуйте в них события.

Обратите внимание, что мы использовали горутину в методе публикации, чтобы избежать блокировки издателя.

Начинать

Во-первых, нам нужно создать экземпляр шины событий. В реальном сценарии вы можете экспортировать сингл из пакетаEventBus,заставить его работать как синглтон.

var eb = &EventBus{
   subscribers: map[string]DataChannelSlice{},
}

Чтобы протестировать только что созданную шину событий, мы создадим метод, который публикует указанную тему со случайным интервалом.

func publisTo(topic string, data string)  {
   for {
      eb.Publish(topic, data)
      time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
   }
}

Далее нам нужна основная функция, которая может слушать тему. Он использует вспомогательный метод для вывода данных события.

func printDataEvent(ch string, data DataEvent)  {
   fmt.Printf("Channel: %s; Topic: %s; DataEvent: %v\n", ch, data.Topic, data.Data)
}
func main()  {
   ch1 := make(chan DataEvent)
   ch2 := make(chan DataEvent)
   ch3 := make(chan DataEvent)
   eb.Subscribe("topic1", ch1)
   eb.Subscribe("topic2", ch2)
   eb.Subscribe("topic2", ch3)
   go publisTo("topic1", "Hi topic 1")
   go publisTo("topic2", "Welcome to topic 2")
   for {
      select {
      case d := <-ch1:
         go printDataEvent("ch1", d)
      case d := <-ch2:
         go printDataEvent("ch2", d)
      case d := <-ch3:
         go printDataEvent("ch3", d)
      }
   }
}

Мы создали три, которые могут подписываться на темыchannelsПодписчики (ch1, ch2, ch3). где ch2 и ch3 прослушивают одно и то же событие.

Мы используем оператор select из самого быстрого возвращаемогоchannelдля получения данных. Затем он распечатывает данные с помощью другой горутины. Использование горутин также не обязательно. Но в некоторых случаях вам приходится выполнять тяжелую обработку событий. Чтобы предотвратить блокировку выбора, мы используем горутину.

Пример вывода будет выглядеть так

Channel: ch1; Topic: topic1; DataEvent: Hi topic 1
Channel: ch2; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch2; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch1; Topic: topic1; DataEvent: Hi topic 1
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
...

Вы можете видеть, что шина событий проходит черезchannelРаспределяйте события.

основанный на простотеchannelИсходный код шины событий.

полный код

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type DataEvent struct {
	Data  interface{}
	Topic string
}

// DataChannel 是一个能接收 DataEvent 的 channel
type DataChannel chan DataEvent

// DataChannelSlice 是一个包含 DataChannels 数据的切片
type DataChannelSlice []DataChannel

// EventBus 存储有关订阅者感兴趣的特定主题的信息
type EventBus struct {
	subscribers map[string]DataChannelSlice
	rm          sync.RWMutex
}

func (eb *EventBus) Publish(topic string, data interface{}) {
	eb.rm.RLock()
	if chans, found := eb.subscribers[topic]; found {
		// 这样做是因为切片引用相同的数组,即使它们是按值传递的
		// 因此我们正在使用我们的元素创建一个新切片,从而正确地保持锁定
		channels := append(DataChannelSlice{}, chans...)
		go func(data DataEvent, dataChannelSlices DataChannelSlice) {
			for _, ch := range dataChannelSlices {
				ch <- data
			}
		}(DataEvent{Data: data, Topic: topic}, channels)
	}
	eb.rm.RUnlock()
}

func (eb *EventBus) Subscribe(topic string, ch DataChannel) {
	eb.rm.Lock()
	if prev, found := eb.subscribers[topic]; found {
		eb.subscribers[topic] = append(prev, ch)
	} else {
		eb.subscribers[topic] = append([]DataChannel{}, ch)
	}
	eb.rm.Unlock()
}

var eb = &EventBus{
	subscribers: map[string]DataChannelSlice{},
}

func printDataEvent(ch string, data DataEvent) {
	fmt.Printf("Channel: %s; Topic: %s; DataEvent: %v\n", ch, data.Topic, data.Data)
}

func publisTo(topic string, data string) {
	for {
		eb.Publish(topic, data)
		time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
	}
}

func main() {
	ch1 := make(chan DataEvent)
	ch2 := make(chan DataEvent)
	ch3 := make(chan DataEvent)

	eb.Subscribe("topic1", ch1)
	eb.Subscribe("topic2", ch2)
	eb.Subscribe("topic2", ch3)

	go publisTo("topic1", "Hi topic 1")
	go publisTo("topic2", "Welcome to topic 2")

	for {
		select {
		case d := <-ch1:
			go printDataEvent("ch1", d)
		case d := <-ch2:
			go printDataEvent("ch2", d)
		case d := <-ch3:
			go printDataEvent("ch3", d)
		}
	}
}

Причины использовать каналы вместо обратных вызовов

Традиционный вызов обратно, необходимый для достижения своего рода интерфейс.

Например,

type Subscriber interface {
   onData(event Event)
}

С обратными вызовами, если вы хотите подписаться на событие, вам необходимо реализовать этот интерфейс, чтобы шина событий могла его распространять.

type MySubscriber struct {
}
func (m MySubscriber) onData(event Event)  {
   // 处理事件
}

иchannelПозволяет регистрировать подписчиков в простой функции без интерфейса.

func main() {
   ch1 := make(chan DataEvent)
   eb.Subscribe("topic1", ch1)
   fmt.Println((<-ch1).Data)
   ...
}

в заключении

Цель этой статьи — указать на различные реализации написания шины событий.

Возможно, это не идеальное решение.

Например,channelблокируются до тех пор, пока их кто-нибудь не съест. Это имеет определенные ограничения.

Я использовал фрагменты для хранения всех подписчиков темы. Это используется для упрощения статей. Это требуетSETЗаменить, чтобы в списке не было дубликатов подписчиков.

Традиционный метод обратного вызова можно просто реализовать с использованием тех же принципов. Вы можете легко оформлять и публиковать события асинхронно в горутине.

Я хотел бы услышать ваши мысли об этой статье. :)