Как изящно закрыть Go Channel

задняя часть Go Язык программирования

Принцип закрытия канала

Не закрывайте канал на стороне потребителя и не закрывайте канал при наличии нескольких одновременно работающих производителей.

Другими словами, канал должен быть закрыт только в [единственной или последней оставшейся] сопрограмме производителя, чтобы уведомить потребителя о том, что больше нет смысла продолжать чтение. Пока вы придерживаетесь этого принципа, вы можете гарантировать невозможность отправки данных в закрытый канал.

Правильный способ насильственно закрыть канал

Если вы хотите закрыть канал на стороне потребителя или закрыть канал на нескольких производителях, вы можете использовать механизм восстановления, чтобы застраховаться, чтобы избежать сбоя программы из-за паники.

func SafeClose(ch chan T) (justClosed bool) {
	defer func() {
		if recover() != nil {
			justClosed = false
		}
	}()
	
	// assume ch != nil here.
	close(ch) // panic if ch is closed
	return true // <=> justClosed = true; return
}

Использование этого метода явно нарушает описанный выше принцип закрытия канала, и тогда с производительностью все в порядке, ведь SafeClose вызывается только один раз в каждой сопрограмме, и потери производительности невелики.

Метод восстановления также можно использовать при создании сообщений.

func SafeSend(ch chan T, value T) (closed bool) {
	defer func() {
		if recover() != nil {
			// The return result can be altered 
			// in a defer function call.
			closed = true
		}
	}()
	
	ch <- value // panic if ch is closed
	return false // <=> closed = false; return
}

Метод вежливо закрыть канал

Есть также много людей, которые часто используют sync.Once для закрытия канала, что гарантирует, что он будет закрыт только один раз.

type MyChannel struct {
	C    chan T
	once sync.Once
}

func NewMyChannel() *MyChannel {
	return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
	mc.once.Do(func() {
		close(mc.C)
	})
}

Мы также можем использовать sync.Mutex для той же цели.

type MyChannel struct {
	C      chan T
	closed bool
	mutex  sync.Mutex
}

func NewMyChannel() *MyChannel {
	return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
	mc.mutex.Lock()
	if !mc.closed {
		close(mc.C)
		mc.closed = true
	}
	mc.mutex.Unlock()
}

func (mc *MyChannel) IsClosed() bool {
	mc.mutex.Lock()
	defer mc.mutex.Unlock()
	return mc.closed
}

Вы должны знать, что разработчики golang не предоставляют методы SafeClose или SafeSend по какой-то причине.Они не рекомендуют закрывать каналы на стороне потребителя или на нескольких параллельных производителях.Например, закрытие канала только для чтения полностью запрещено синтаксисом. .

Как красиво закрыть канал

Большим недостатком описанного выше метода SafeSend является то, что его нельзя использовать в операторах case в блоках выбора. И еще один очень важный недостаток заключается в том, что для таких, как я, одержимых кодом, использование panic/recover и sync/mutex не так элегантно. Ниже мы представляем чисто элегантные решения, которые можно использовать в различных сценариях.

Несколько потребителей, один производитель. Это самая простая ситуация, просто позвольте продюсеру закрыть канал напрямую.

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)
	
	// ...
	const MaxRandomNumber = 100000
	const NumReceivers = 100
	
	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)
	
	// ...
	dataCh := make(chan int, 100)
	
	// the sender
	go func() {
		for {
			if value := rand.Intn(MaxRandomNumber); value == 0 {
				// The only sender can close the channel safely.
				close(dataCh)
				return
			} else {			
				dataCh <- value
			}
		}
	}()
	
	// receivers
	for i := 0; i < NumReceivers; i++ {
		go func() {
			defer wgReceivers.Done()
			
			// Receive values until dataCh is closed and
			// the value buffer queue of dataCh is empty.
			for value := range dataCh {
				log.Println(value)
			}
		}()
	}
	
	wgReceivers.Wait()
}

Несколько производителей, один потребитель. Эта ситуация немного сложнее, чем описанная выше. Мы не можем закрыть канал на стороне потребителя, потому что это нарушает принцип закрытия канала. Но мы можем заставить потребителя отключить дополнительный сигнал, чтобы уведомить отправителя о прекращении производства данных.

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)
	
	// ...
	const MaxRandomNumber = 100000
	const NumSenders = 1000
	
	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(1)
	
	// ...
	dataCh := make(chan int, 100)
	stopCh := make(chan struct{})
	// stopCh is an additional signal channel.
	// Its sender is the receiver of channel dataCh.
	// Its reveivers are the senders of channel dataCh.
	
	// senders
	for i := 0; i < NumSenders; i++ {
		go func() {
			for {
				// The first select here is to try to exit the goroutine
				// as early as possible. In fact, it is not essential
				// for this example, so it can be omitted.
				select {
				case <- stopCh:
					return
				default:
				}
				
				// Even if stopCh is closed, the first branch in the
				// second select may be still not selected for some
				// loops if the send to dataCh is also unblocked.
				// But this is acceptable, so the first select
				// can be omitted.
				select {
				case <- stopCh:
					return
				case dataCh <- rand.Intn(MaxRandomNumber):
				}
			}
		}()
	}
	
	// the receiver
	go func() {
		defer wgReceivers.Done()
		
		for value := range dataCh {
			if value == MaxRandomNumber-1 {
				// The receiver of the dataCh channel is
				// also the sender of the stopCh cahnnel.
				// It is safe to close the stop channel here.
				close(stopCh)
				return
			}
			
			log.Println(value)
		}
	}()
	
	// ...
	wgReceivers.Wait()
}

В приведенном выше примере производитель также является получателем канала выходного сигнала, и канал выходного сигнала по-прежнему закрывается его производителем, так что это по-прежнему не нарушает принцип закрытия канала. Стоит отметить, что в этом примере ни производитель, ни получатель не закрывают канал данных сообщения, канал закроется сам, когда нет ссылки на горутину, и его не нужно закрывать явно.

Несколько производителей, несколько потребителей

Это самый сложный случай, мы не можем позволить ни получателю, ни отправителю закрыть канал. Мы даже не можем заставить приемник отключить выходной сигнал, чтобы уведомить производителя о прекращении производства. Потому что мы не можем нарушать принцип закрытия канала. Но мы можем ввести дополнительный координатор, чтобы закрыть дополнительный выходной сигнальный канал.

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
	"strconv"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)
	
	// ...
	const MaxRandomNumber = 100000
	const NumReceivers = 10
	const NumSenders = 1000
	
	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)
	
	// ...
	dataCh := make(chan int, 100)
	stopCh := make(chan struct{})
		// stopCh is an additional signal channel.
		// Its sender is the moderator goroutine shown below.
		// Its reveivers are all senders and receivers of dataCh.
	toStop := make(chan string, 1)
		// The channel toStop is used to notify the moderator
		// to close the additional signal channel (stopCh).
		// Its senders are any senders and receivers of dataCh.
		// Its reveiver is the moderator goroutine shown below.
	
	var stoppedBy string
	
	// moderator
	go func() {
		stoppedBy = <- toStop
		close(stopCh)
	}()
	
	// senders
	for i := 0; i < NumSenders; i++ {
		go func(id string) {
			for {
				value := rand.Intn(MaxRandomNumber)
				if value == 0 {
					// Here, a trick is used to notify the moderator
					// to close the additional signal channel.
					select {
					case toStop <- "sender#" + id:
					default:
					}
					return
				}
				
				// The first select here is to try to exit the goroutine
				// as early as possible. This select blocks with one
				// receive operation case and one default branches will
				// be optimized as a try-receive operation by the
				// official Go compiler.
				select {
				case <- stopCh:
					return
				default:
				}
				
				// Even if stopCh is closed, the first branch in the
				// second select may be still not selected for some
				// loops (and for ever in theory) if the send to
				// dataCh is also unblocked.
				// This is why the first select block is needed.
				select {
				case <- stopCh:
					return
				case dataCh <- value:
				}
			}
		}(strconv.Itoa(i))
	}
	
	// receivers
	for i := 0; i < NumReceivers; i++ {
		go func(id string) {
			defer wgReceivers.Done()
			
			for {
				// Same as the sender goroutine, the first select here
				// is to try to exit the goroutine as early as possible.
				select {
				case <- stopCh:
					return
				default:
				}
				
				// Even if stopCh is closed, the first branch in the
				// second select may be still not selected for some
				// loops (and for ever in theory) if the receive from
				// dataCh is also unblocked.
				// This is why the first select block is needed.
				select {
				case <- stopCh:
					return
				case value := <-dataCh:
					if value == MaxRandomNumber-1 {
						// The same trick is used to notify
						// the moderator to close the
						// additional signal channel.
						select {
						case toStop <- "receiver#" + id:
						default:
						}
						return
					}
					
					log.Println(value)
				}
			}
		}(strconv.Itoa(i))
	}
	
	// ...
	wgReceivers.Wait()
	log.Println("stopped by", stoppedBy)
}

Вышеупомянутые три сценария не могут охватывать все, но они являются наиболее распространенными и распространенными тремя сценариями, и в основном все сценарии можно разделить на три вышеуказанные категории.

в заключении

Нет сценария, достойного нарушения принципа закрытия канала, если вы столкнулись с таким особым сценарием, рекомендуется задуматься о собственном дизайне и не пора ли провести рефакторинг.

Читайте статьи по теме и обратите внимание на паблик-аккаунт "Code Cave"