Задержка очереди сообщений

Go

предисловие

В процессе обучения было обнаружено, что zset Redis также может быть использован для реализации облегченной функции очереди отложенных сообщений.Хотя надежность нуждается в повышении, ее можно полностью реализовать для некоторых функциональных требований, не требующих столь больших данных. надежность. На этот раз мы в основном используем zadd, zrangebyscore и zdel в zset в Redis для реализации небольшой демонстрации.

Подготовьтесь заранее Установите redis, redis-go

Поскольку при использовании macOS напрямую

$ brew install redis
$ go get github.com/garyburd/redigo/redis

И поскольку это лениво, при генерации уникального идентификатора задачи напрямую используется objectId в bson, поэтому:

$ go get gopkg.in/mgo.v2/bson

Уникальный идентификатор не требуется, но если есть практические приложения, которые нужно перенести позже, найти соответствующую задачу несложно.

режиссер

Создавайте задачи 10w через цикл for, каждая задача имеет разное время

func producer() {
	count := 0
	//生成100000个任务
	for count < 100000 {
		count++
		dealTime := int64(rand.Intn(5)) + time.Now().Unix()
		uuid := bson.NewObjectId().Hex()
		redis.Client.AddJob(&job.JobMessage{
			Id: uuid,
			DealTime: dealTime,
		},  + int64(dealTime))
	}
}

Функция AddJob находится в другом пакете и использует время, случайно сгенерированное в предыдущей функции, в качестве метки времени для обработки.

// 添加任务
func (client *RedisClient) AddJob(msg *job.JobMessage, dealTime int64) {
	conn := client.Get()
	defer conn.Close()

	key := "JOB_MESSAGE_QUEUE"
	conn.Do("zadd", key, dealTime, util.JsonEncode(msg))
}

потребитель

Процесс обработки потребителя делится на два этапа:

  1. Получить задачи, меньшие или равные текущей метке времени
  2. Определите, кто получил текущую задачу, удалив текущую задачу

Потому что при получении задач с меткой времени, меньшей или равной текущей метке времени, может быть несколько подпрограмм go, которые одновременно считывают текущую задачу, и только одна задача может обработать текущую задачу. Следовательно, нам нужно решение, чтобы определить, кто обрабатывает эту задачу (конечно, если ее может прочитать только один потребитель, она может быть обработана напрямую): В это время ее можно получить через операцию удаления redis, потому что только успешные операции выполняются при удалении указанного значения и вернет не 0, поэтому можно считать, что процедура go, которая успешно удалила текущую очередь, получила текущую задачу.

Вот код:

// 消费者
func consumer() {
	// 启动10个go routine一起去拿
	count := 0
	for count < 10 {
		go func() {
			for {
				jobs := redis.Client.GetJob()
				if len(jobs) <= 0 {
					time.Sleep(time.Second * 1)
					continue
				}
				currentJob := jobs[0]
				// 如果当前抢redis队列成功,
				if redis.Client.DelJob(currentJob) > 0 {
					var jobMessage job.JobMessage
					util.JsonDecode(currentJob, &jobMessage) //自定义的json解析函数
					handleMessage(&jobMessage)
				}

			}

		}()
		count++
	}
}

// 处理任务用函数
func handleMessage(msg *job.JobMessage) {
	fmt.Printf("deal job: %s, require time: %d \n", msg.Id, msg.DealTime)
	go func() {
		countChan <- true
	}()
}

Код части redis, получение задач и удаление задач

// 获取任务
func (client *RedisClient) GetJob() []string {
	conn := client.Get()
	defer conn.Close()

	key := "JOB_MESSAGE_QUEUE"
	timeNow := time.Now().Unix()
	ret, err := redis.Strings(conn.Do("zrangebyscore", key, 0, timeNow, "limit", 0, 1))
	if err != nil {
		panic(err)
	}
	return ret
}

// 删除当前任务, 用来判断是否抢到了当前任务
func (client *RedisClient) DelJob(value string) int {
	conn := client.Get()
	defer conn.Close()

	key := "JOB_MESSAGE_QUEUE"
	ret, err := redis.Int(conn.Do("zrem", key, value))
	if err != nil {
		panic(err)
	}
	return ret
}

Код примерно такой же. После последнего прогона каждые 3-4 секунды может обрабатываться около 1w задач, и скорость действительно...