Переход к вложенной параллельной реализации EDM с анализом ям № 1

Go
Переход к вложенной параллельной реализации EDM с анализом ям № 1

Глядя на отличных друзей вокруг меня, которые давно начали вести блог, я чувствую, что отстаю.К счастью, лучше сделать это поздно, чем никогда, и поощрять себя видеть лучшее. Воспользовавшись последними выходными года, когда солнце было как раз впору, я написал свой первый блог, чтобы начать 2019 год, чтобы завершить установленные для себя флаги в этом году.

Одной из основных причин перехода с PHPer на Gopher является постоянный спрос на производительность и параллелизм в бизнесе.Другой важной причиной является встроенная функция параллелизма языка Go, которая может использовать меньше машинных ресурсов и сэкономить значительные суммы денег, предоставляя те же возможности высокой доступности. Поэтому в этой статье я опишу некоторые подводные камни, с которыми столкнулся в реальной демонстрации изучения параллелизма в Go, и поделюсь прогрессом.

1. Существует три основных способа реализации управления параллелизмом в языке Go:

a) Channel- Разделены на небуферизованные и буферизованные каналы;

b) WaitGroup- Механизм синхронизации между горутинами, предоставляемый пакетом синхронизации;

c) Context- Передавать и обмениваться данными между разными горутинами в цепочке вызовов;

Первые два в основном используются в этой демонстрации.Чтобы узнать об основном использовании, ознакомьтесь с официальной документацией.

2. Демонстрационные требования и анализ:

Требование: Для реализации эффективной отправки почты EDM: необходимо поддерживать несколько стран (можно рассматривать как несколько задач), необходимо записывать статус каждой отправленной задачи (текущий успех, количество неудач), необходимо поддерживать паузу (остановку), Повторно отправить (выполнить) операцию.

Анализ: из требований видно, что в процессе отправки почты несколько стран (несколько задач) могут выполняться одновременно, и одна задача может выполняться одновременно пакетами для достижения быстрых и эффективных требований EDM.

3. Демонстрация фактического исходного кода:

3.1 main.go

package main

import (
  "bufio"
  "fmt"
  "io"
  "log"
  "os"
  "strconv"
  "sync"
  "time"
)

var (
  batchLength = 20
  wg          sync.WaitGroup
  finish      = make(chan bool)
)

func main() {
  startTime := time.Now().UnixNano()

  for i := 1; i <= 3; i++ {
    filename := "./task/edm" + strconv.Itoa(i) + ".txt"
    start := 60

    go RunTask(filename, start, batchLength)
  }

  // main 阻塞等待goroutine执行完成
  fmt.Println(<-finish)

  fmt.Println("finished all tasks.")

  endTime := time.Now().UnixNano()
  fmt.Println("Total cost(ms):", (endTime-startTime)/1e6)
}

// 单任务
func RunTask(filename string, start, length int) (retErr error) {
  for {
    readLine, err := ReadLines(filename, start, length)
    if err == io.EOF {
      fmt.Println("Read EOF:", filename)
      retErr = err
      break
    }
    if err != nil {
      fmt.Println(err)
      retErr = err
      break
    }

    fmt.Println("current line:", readLine)

    start += length

    // 等待一批完成才进入下一批
    //wg.Wait()
  }

  wg.Wait()
  finish <- true

  return retErr
}

Обратите внимание на приведенное вышеwg.Wait()местонахождение (обсуждается ниже), вfinish channelРаньше целью было дождаться ребенкаgoroutineПосле запуска пройти через небуферизованный каналfinishуведомлятьmain goroutine,ПотомmainПробег заканчивается.

func ReadLines() считывает указанные данные строки:

// 读取指定行数据
func ReadLines(filename string, start, length int) (line int, retErr error) {
  fmt.Println("current file:", filename)

  fileObj, err := os.Open(filename)
  if err != nil {
    panic(err)
  }
  defer fileObj.Close()

  // 跳过开始行之前的行-ReadString方式
  startLine := 1
  endLine := start + length
  reader := bufio.NewReader(fileObj)
  for {
    line, err := reader.ReadString(byte('\n'))
    if err == io.EOF {
      fmt.Println("Read EOF:", filename)
      retErr = err
      break
    }
    if err != nil {
      log.Fatal(err)
      retErr = err
      break
    }

    if startLine > start && startLine <= endLine {
      wg.Add(1)
      // go并发执行
      go SendEmail(line)
      if startLine == endLine {
        break
      }
    }

    startLine++
  }

  return startLine, retErr
}

// 模拟邮件发送
func SendEmail(email string) error {
  defer wg.Done()

  time.Sleep(time.Second * 1)
  fmt.Println(email)

  return nil
}

бежать вышеmain.go, 3 задачи завершают все электронные письма одновременно в течение 1 с (./task/edm1.txtСредняя линия представляет почтовый ящик) для отправки.

true

finished all tasks.

Total cost(ms): 1001

Затем возникает проблема: пакеты не реализуются для каждого параллелизма.batchLength = 20, потому что, если оно не отправляется пакетами, пока одно из заданий или одно из писем не будет выполнено, при следующем повторном запуске оно не будет знать, какие пользователи уже отправили его, и будет повторная отправка. . Даже если есть ошибка при отправке пакетами, следующий повторный запуск может начинаться с конечной строки, где произошла последняя ошибка, не более[start - end]ОдинbatchLengthОтправка не удалась, приемлемо.

Итак, 5-я строчка снизу будетwg.Wait()Закомментируйте, откройте комментарий на 8-й строке снизу, следующим образом:

// 单任务
func RunTask(filename string, start, length int) (retErr error) {
  for {
    readLine, err := ReadLines(filename, start, length)
    if err == io.EOF {
      fmt.Println("Read EOF:", filename)
      retErr = err
      break
    }
    if err != nil {
      fmt.Println(err)
      retErr = err
      break
    }

    fmt.Println("current line:", readLine)

    start += length

    // 等待一批完成才进入下一批
    wg.Wait()
  }

  //wg.Wait()
  finish <- true

  return retErr
}

Ошибка при запуске:

panic: sync: WaitGroup is reused before previous Wait has returned

намекатьWaitGroupсуществуетgoroutineПовторное использование между, хотя это и глобальная переменная, кажется, используется неправильно. Как настроить?

3.2 main.go

package main

import (
  "bufio"
  "fmt"
  "io"
  "log"
  "os"
  "strconv"
  "sync"
  "time"
)

var (
  batchLength = 10
  outerWg     sync.WaitGroup
)

func main() {
  startTime := time.Now().UnixNano()

  for i := 1; i <= 3; i++ {
    filename := "./task/edm" + strconv.Itoa(i) + ".txt"
    start := 60

    outerWg.Add(1)
    go RunTask(filename, start, batchLength)
  }

  // main 阻塞等待goroutine执行完成
  outerWg.Wait()

  fmt.Println("finished all tasks.")

  endTime := time.Now().UnixNano()
  fmt.Println("Total cost(ms):", (endTime-startTime)/1e6)
}

// 单任务
func RunTask(filename string, start, length int) (retErr error) {
  for {
    isFinish := make(chan bool)
    readLine, err := ReadLines(filename, start, length, isFinish)
    if err == io.EOF {
      fmt.Println("Read EOF:", filename)
      retErr = err
      break
    }
    if err != nil {
      fmt.Println(err)
      retErr = err
      break
    }

    // 等待一批完成才进入下一批
    fmt.Println("current line:", readLine)
    start += length
    <-isFinish

    // 关闭channel,释放资源
    close(isFinish)
  }

  outerWg.Done()

  return retErr
}

Из вышеизложенного видно: идея регулировки заключается в использовании внешнего слояWaitGroupконтроль, внутренний слойchannelКонтролируйте, выполняйте и сообщайте об ошибке :(

fatal error: all goroutines are asleep - deadlock!



goroutine 1 [semacquire]:

sync.runtime_Semacquire(0x55fe7c)

	/usr/local/go/src/runtime/sema.go:56 +0x39

sync.(*WaitGroup).Wait(0x55fe70)

	/usr/local/go/src/sync/waitgroup.go:131 +0x72

main.main()

	/home/work/data/www/docker_env/www/go/src/WWW/edm/main.go:31 +0x1ab



goroutine 5 [chan send]:

main.ReadLines(0xc42001c0c0, 0xf, 0x3c, 0xa, 0xc42008e000, 0x0, 0x0, 0x0)

При ближайшем рассмотрении я обнаружил, что код, определенный вышеisFinishнебуферизованныйchannel, отправив электронное письмоSendMail()Чтение небуферизованного канала без данных заблокирует текущийgoroutine,разноеgoroutineТоже самое заблокировано, поэтому появляетсяall goroutines are asleep - deadlock!

Поэтому измените приведенный выше код, чтобы он имел буфер, и продолжайте пробовать:

isFinish := make(chan bool, 1)
// 读取指定行数据
func ReadLines(filename string, start, length int, isFinish chan bool) (line int, retErr error) {
  fmt.Println("current file:", filename)

  // 控制每一批发完再下一批
  var wg sync.WaitGroup

  fileObj, err := os.Open(filename)
  if err != nil {
    panic(err)
  }
  defer fileObj.Close()

  // 跳过开始行之前的行-ReadString方式
  startLine := 1
  endLine := start + length
  reader := bufio.NewReader(fileObj)
  for {
    line, err := reader.ReadString(byte('\n'))
    if err == io.EOF {
      fmt.Println("Read EOF:", filename)
      retErr = err
      break
    }
    if err != nil {
      log.Fatal(err)
      retErr = err
      break
    }

    if startLine > start && startLine <= endLine {

      wg.Add(1)
      // go并发执行
      go SendEmail(line, wg)
      if startLine == endLine {
        isFinish <- true
        break
      }
    }

    startLine++
  }

  wg.Wait()

  return startLine, retErr
}

// 模拟邮件发送
func SendEmail(email string, wg sync.WaitGroup) error {
  defer wg.Done()

  time.Sleep(time.Second * 1)
  fmt.Println(email)

  return nil
}

Запустите и сообщите об ошибке :(

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:

sync.runtime_Semacquire(0x55fe7c)

	/usr/local/go/src/runtime/sema.go:56 +0x39

sync.(*WaitGroup).Wait(0x55fe70)

На этот раз подсказка немного другая, похоже, это внутренний слойWaitGroupВозникла тупиковая ситуация, продолжайте проверку, чтобы найти внутренний слойwgЭто передача по значению, вы должны использовать передачу указателя по ссылке.

// go并发执行
go SendEmail(line, wg)

Последний измененный код выглядит следующим образом:

// 读取指定行数据
func ReadLines(filename string, start, length int, isFinish chan bool) (line int, retErr error) {
  fmt.Println("current file:", filename)

  // 控制每一批发完再下一批
  var wg sync.WaitGroup

  fileObj, err := os.Open(filename)
  if err != nil {
    panic(err)
  }
  defer fileObj.Close()

  // 跳过开始行之前的行-ReadString方式
  startLine := 1
  endLine := start + length
  reader := bufio.NewReader(fileObj)
  for {
    line, err := reader.ReadString(byte('\n'))
    if err == io.EOF {
      fmt.Println("Read EOF:", filename)
      retErr = err
      break
    }
    if err != nil {
      log.Fatal(err)
      retErr = err
      break
    }

    if startLine > start && startLine <= endLine {

      wg.Add(1)
      // go并发执行
      go SendEmail(line, &wg)
      if startLine == endLine {
        isFinish <- true
        break
      }
    }

    startLine++
  }

  wg.Wait()

  return startLine, retErr
}

// 模拟邮件发送
func SendEmail(email string, wg *sync.WaitGroup) error {
  defer wg.Done()

  time.Sleep(time.Second * 1)
  fmt.Println(email)

  return nil
}

Спешите запустить его, на этот раз это наконец удалось :)

current line: 100

current file: ./task/edm2.txt

Read EOF: ./task/edm2.txt

Read EOF: ./task/edm2.txt

finished all tasks.

Total cost(ms): 4003

Каждая задача имитирует 100 строк, начиная со строки 60, четыре задачи выполняются одновременно, каждая задача снова выполняется одновременно в пакетах, а следующий пакет контролируется после завершения каждого пакета, поэтому общее время выполнения составляет около 4 с, в соответствии с ожиданиями. Для получения полного исходного кода, пожалуйста, прочитайте исходный текст или перейдите на GitHub:GitHub.com/Соломинка99/quote…

4. Резюме:

В этой статье моделируется реализация высокопроизводительного параллельного EDM с помощью двух уровней вложенного параллелизма Go. В следующий раз будут обсуждаться некоторые специфические ошибки, прерывание и повторное выполнение строки управления. Основная логика была пройдена, и несколько подводных камней резюмируется следующим образом:

а) WaitGroup обычно используется для того, чтобы основная основная сопрограмма дождалась выхода всех под-сопрограмм, а затем изящно вышла из основной сопрограммы; обратите внимание на расположение wg.Wait() при вложении;

б) Используйте каналы разумно, небуферизованный чан блокирует текущую горутину, а буферизованный чан не блокирует текущую горутину, если ограничение не заполнено, не забудьте освободить ресурсы чан после использования;

c) Обратите внимание на разумное использование передачи значения или ссылки между функциями (по сути, передача значения, значение памяти указателя переданного указателя);



Постскриптум: Первый блог почти закончен. Он будет закончен во второй половине дня, если вы не будете внимательны. Логика и читабельность письма могут быть не очень хорошими. Пожалуйста, простите меня. Комментарии и исправления приветствуются. Спасибо за прочтение.

稻草人生