Глядя на отличных друзей вокруг меня, которые давно начали вести блог, я чувствую, что отстаю.К счастью, лучше сделать это поздно, чем никогда, и поощрять себя видеть лучшее. Воспользовавшись последними выходными года, когда солнце было как раз впору, я написал свой первый блог, чтобы начать 2019 год, чтобы завершить установленные для себя флаги в этом году.
Одной из основных причин перехода с PHPer на Gopher является постоянный спрос на производительность и параллелизм в бизнесе.Другой важной причиной является встроенная функция параллелизма языка Go, которая может использовать меньше машинных ресурсов и сэкономить значительные суммы денег, предоставляя те же возможности высокой доступности. Поэтому в этой статье я опишу некоторые подводные камни, с которыми столкнулся в реальной демонстрации изучения параллелизма в Go, и поделюсь прогрессом.
1. Существует три основных способа реализации управления параллелизмом в языке Go:
a) Channel
- Разделены на небуферизованные и буферизованные каналы;
b) WaitGroup
- Механизм синхронизации между горутинами, предоставляемый пакетом синхронизации;
c) Context
- Передавать и обмениваться данными между разными горутинами в цепочке вызовов;
Первые два в основном используются в этой демонстрации.Чтобы узнать об основном использовании, ознакомьтесь с официальной документацией.
2. Демонстрационные требования и анализ:
Требование: Для реализации эффективной отправки почты EDM: необходимо поддерживать несколько стран (можно рассматривать как несколько задач), необходимо записывать статус каждой отправленной задачи (текущий успех, количество неудач), необходимо поддерживать паузу (остановку), Повторно отправить (выполнить) операцию.
Анализ: из требований видно, что в процессе отправки почты несколько стран (несколько задач) могут выполняться одновременно, и одна задача может выполняться одновременно пакетами для достижения быстрых и эффективных требований EDM.
3. Демонстрация фактического исходного кода:
3.1 main.go
package main
import (
"bufio"
"fmt"
"io"
"log"
"os"
"strconv"
"sync"
"time"
)
var (
batchLength = 20
wg sync.WaitGroup
finish = make(chan bool)
)
func main() {
startTime := time.Now().UnixNano()
for i := 1; i <= 3; i++ {
filename := "./task/edm" + strconv.Itoa(i) + ".txt"
start := 60
go RunTask(filename, start, batchLength)
}
// main 阻塞等待goroutine执行完成
fmt.Println(<-finish)
fmt.Println("finished all tasks.")
endTime := time.Now().UnixNano()
fmt.Println("Total cost(ms):", (endTime-startTime)/1e6)
}
// 单任务
func RunTask(filename string, start, length int) (retErr error) {
for {
readLine, err := ReadLines(filename, start, length)
if err == io.EOF {
fmt.Println("Read EOF:", filename)
retErr = err
break
}
if err != nil {
fmt.Println(err)
retErr = err
break
}
fmt.Println("current line:", readLine)
start += length
// 等待一批完成才进入下一批
//wg.Wait()
}
wg.Wait()
finish <- true
return retErr
}
Обратите внимание на приведенное вышеwg.Wait()
местонахождение (обсуждается ниже), вfinish channel
Раньше целью было дождаться ребенкаgoroutine
После запуска пройти через небуферизованный каналfinish
уведомлятьmain goroutine
,Потомmain
Пробег заканчивается.
func ReadLines() считывает указанные данные строки:
// 读取指定行数据
func ReadLines(filename string, start, length int) (line int, retErr error) {
fmt.Println("current file:", filename)
fileObj, err := os.Open(filename)
if err != nil {
panic(err)
}
defer fileObj.Close()
// 跳过开始行之前的行-ReadString方式
startLine := 1
endLine := start + length
reader := bufio.NewReader(fileObj)
for {
line, err := reader.ReadString(byte('\n'))
if err == io.EOF {
fmt.Println("Read EOF:", filename)
retErr = err
break
}
if err != nil {
log.Fatal(err)
retErr = err
break
}
if startLine > start && startLine <= endLine {
wg.Add(1)
// go并发执行
go SendEmail(line)
if startLine == endLine {
break
}
}
startLine++
}
return startLine, retErr
}
// 模拟邮件发送
func SendEmail(email string) error {
defer wg.Done()
time.Sleep(time.Second * 1)
fmt.Println(email)
return nil
}
бежать вышеmain.go
, 3 задачи завершают все электронные письма одновременно в течение 1 с (./task/edm1.txt
Средняя линия представляет почтовый ящик) для отправки.
true
finished all tasks.
Total cost(ms): 1001
Затем возникает проблема: пакеты не реализуются для каждого параллелизма.batchLength = 20
, потому что, если оно не отправляется пакетами, пока одно из заданий или одно из писем не будет выполнено, при следующем повторном запуске оно не будет знать, какие пользователи уже отправили его, и будет повторная отправка. . Даже если есть ошибка при отправке пакетами, следующий повторный запуск может начинаться с конечной строки, где произошла последняя ошибка, не более[start - end]
ОдинbatchLength
Отправка не удалась, приемлемо.
Итак, 5-я строчка снизу будетwg.Wait()
Закомментируйте, откройте комментарий на 8-й строке снизу, следующим образом:
// 单任务
func RunTask(filename string, start, length int) (retErr error) {
for {
readLine, err := ReadLines(filename, start, length)
if err == io.EOF {
fmt.Println("Read EOF:", filename)
retErr = err
break
}
if err != nil {
fmt.Println(err)
retErr = err
break
}
fmt.Println("current line:", readLine)
start += length
// 等待一批完成才进入下一批
wg.Wait()
}
//wg.Wait()
finish <- true
return retErr
}
Ошибка при запуске:
panic: sync: WaitGroup is reused before previous Wait has returned
намекатьWaitGroup
существуетgoroutine
Повторное использование между, хотя это и глобальная переменная, кажется, используется неправильно. Как настроить?
3.2 main.go
package main
import (
"bufio"
"fmt"
"io"
"log"
"os"
"strconv"
"sync"
"time"
)
var (
batchLength = 10
outerWg sync.WaitGroup
)
func main() {
startTime := time.Now().UnixNano()
for i := 1; i <= 3; i++ {
filename := "./task/edm" + strconv.Itoa(i) + ".txt"
start := 60
outerWg.Add(1)
go RunTask(filename, start, batchLength)
}
// main 阻塞等待goroutine执行完成
outerWg.Wait()
fmt.Println("finished all tasks.")
endTime := time.Now().UnixNano()
fmt.Println("Total cost(ms):", (endTime-startTime)/1e6)
}
// 单任务
func RunTask(filename string, start, length int) (retErr error) {
for {
isFinish := make(chan bool)
readLine, err := ReadLines(filename, start, length, isFinish)
if err == io.EOF {
fmt.Println("Read EOF:", filename)
retErr = err
break
}
if err != nil {
fmt.Println(err)
retErr = err
break
}
// 等待一批完成才进入下一批
fmt.Println("current line:", readLine)
start += length
<-isFinish
// 关闭channel,释放资源
close(isFinish)
}
outerWg.Done()
return retErr
}
Из вышеизложенного видно: идея регулировки заключается в использовании внешнего слояWaitGroup
контроль, внутренний слойchannel
Контролируйте, выполняйте и сообщайте об ошибке :(
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0x55fe7c)
/usr/local/go/src/runtime/sema.go:56 +0x39
sync.(*WaitGroup).Wait(0x55fe70)
/usr/local/go/src/sync/waitgroup.go:131 +0x72
main.main()
/home/work/data/www/docker_env/www/go/src/WWW/edm/main.go:31 +0x1ab
goroutine 5 [chan send]:
main.ReadLines(0xc42001c0c0, 0xf, 0x3c, 0xa, 0xc42008e000, 0x0, 0x0, 0x0)
При ближайшем рассмотрении я обнаружил, что код, определенный вышеisFinish
небуферизованныйchannel
, отправив электронное письмоSendMail()
Чтение небуферизованного канала без данных заблокирует текущийgoroutine
,разноеgoroutine
Тоже самое заблокировано, поэтому появляетсяall goroutines are asleep - deadlock!
Поэтому измените приведенный выше код, чтобы он имел буфер, и продолжайте пробовать:
isFinish := make(chan bool, 1)
// 读取指定行数据
func ReadLines(filename string, start, length int, isFinish chan bool) (line int, retErr error) {
fmt.Println("current file:", filename)
// 控制每一批发完再下一批
var wg sync.WaitGroup
fileObj, err := os.Open(filename)
if err != nil {
panic(err)
}
defer fileObj.Close()
// 跳过开始行之前的行-ReadString方式
startLine := 1
endLine := start + length
reader := bufio.NewReader(fileObj)
for {
line, err := reader.ReadString(byte('\n'))
if err == io.EOF {
fmt.Println("Read EOF:", filename)
retErr = err
break
}
if err != nil {
log.Fatal(err)
retErr = err
break
}
if startLine > start && startLine <= endLine {
wg.Add(1)
// go并发执行
go SendEmail(line, wg)
if startLine == endLine {
isFinish <- true
break
}
}
startLine++
}
wg.Wait()
return startLine, retErr
}
// 模拟邮件发送
func SendEmail(email string, wg sync.WaitGroup) error {
defer wg.Done()
time.Sleep(time.Second * 1)
fmt.Println(email)
return nil
}
Запустите и сообщите об ошибке :(
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0x55fe7c)
/usr/local/go/src/runtime/sema.go:56 +0x39
sync.(*WaitGroup).Wait(0x55fe70)
На этот раз подсказка немного другая, похоже, это внутренний слойWaitGroup
Возникла тупиковая ситуация, продолжайте проверку, чтобы найти внутренний слойwg
Это передача по значению, вы должны использовать передачу указателя по ссылке.
// go并发执行
go SendEmail(line, wg)
Последний измененный код выглядит следующим образом:
// 读取指定行数据
func ReadLines(filename string, start, length int, isFinish chan bool) (line int, retErr error) {
fmt.Println("current file:", filename)
// 控制每一批发完再下一批
var wg sync.WaitGroup
fileObj, err := os.Open(filename)
if err != nil {
panic(err)
}
defer fileObj.Close()
// 跳过开始行之前的行-ReadString方式
startLine := 1
endLine := start + length
reader := bufio.NewReader(fileObj)
for {
line, err := reader.ReadString(byte('\n'))
if err == io.EOF {
fmt.Println("Read EOF:", filename)
retErr = err
break
}
if err != nil {
log.Fatal(err)
retErr = err
break
}
if startLine > start && startLine <= endLine {
wg.Add(1)
// go并发执行
go SendEmail(line, &wg)
if startLine == endLine {
isFinish <- true
break
}
}
startLine++
}
wg.Wait()
return startLine, retErr
}
// 模拟邮件发送
func SendEmail(email string, wg *sync.WaitGroup) error {
defer wg.Done()
time.Sleep(time.Second * 1)
fmt.Println(email)
return nil
}
Спешите запустить его, на этот раз это наконец удалось :)
current line: 100
current file: ./task/edm2.txt
Read EOF: ./task/edm2.txt
Read EOF: ./task/edm2.txt
finished all tasks.
Total cost(ms): 4003
Каждая задача имитирует 100 строк, начиная со строки 60, четыре задачи выполняются одновременно, каждая задача снова выполняется одновременно в пакетах, а следующий пакет контролируется после завершения каждого пакета, поэтому общее время выполнения составляет около 4 с, в соответствии с ожиданиями. Для получения полного исходного кода, пожалуйста, прочитайте исходный текст или перейдите на GitHub:GitHub.com/Соломинка99/quote…
4. Резюме:
В этой статье моделируется реализация высокопроизводительного параллельного EDM с помощью двух уровней вложенного параллелизма Go. В следующий раз будут обсуждаться некоторые специфические ошибки, прерывание и повторное выполнение строки управления. Основная логика была пройдена, и несколько подводных камней резюмируется следующим образом:
а) WaitGroup обычно используется для того, чтобы основная основная сопрограмма дождалась выхода всех под-сопрограмм, а затем изящно вышла из основной сопрограммы; обратите внимание на расположение wg.Wait() при вложении;
б) Используйте каналы разумно, небуферизованный чан блокирует текущую горутину, а буферизованный чан не блокирует текущую горутину, если ограничение не заполнено, не забудьте освободить ресурсы чан после использования;
c) Обратите внимание на разумное использование передачи значения или ссылки между функциями (по сути, передача значения, значение памяти указателя переданного указателя);
Постскриптум: Первый блог почти закончен. Он будет закончен во второй половине дня, если вы не будете внимательны. Логика и читабельность письма могут быть не очень хорошими. Пожалуйста, простите меня. Комментарии и исправления приветствуются. Спасибо за прочтение.