背景

最近开发的一个项目,需要使用任务队列来控制任务的发布和消费。一开始做技术选型的时候,选择的是 RabbitMQ,一来是在其他项目使用过,轻车熟路了,无学习成本,二是有现成的 RabbitMQ 服务器可用,无需运维成本和额外的服务器成本。但后来出现了需要获取任务详情以及任务当前排号的需求,单独使用 RabbitMQ 的话无法实现这些需求。

在经过考量后,RabbitMQ 搭配 redis 的 ZSET 结构能实现以上的需求。考虑到 redis 也能实现队列的功能,因此决定放弃 RabbitMQ,完全使用 redis 实现项目所需的任务队列。

实现流程

需求分析

  1. 任务队列功能(任务发布、任务队列订阅、任务拉取、任务响应)
  2. 获取任务的详细信息
  3. 获取任务的排队情况
  4. 获取任务是否存在
  5. 获取任务总数
  6. 删除任务

实现步骤

  1. 任务队列的结构体设计
  2. 任务队列的接口抽象
  3. 实现队列接口
  4. 处理并发问题

任务队列的结构体设计

首先,该队列是使用 redis 进行实现的,所以必须要有的是 redis 的客户端连接,使用的是 github.com/go-redis/redis/v8 包:

1
redisClient *redis.Client // redis 客户端连接

正常任务队列的常规属性:

1
queueName string // 队列名

用来控制并发的并发锁:

1
2
distributeLock       *distributelock.RedisLock // 分布式锁
distributeLockSecond int64 // 分布式锁的持有时间

用来获取查看处理中的任务:

1
consumeFilterKey     string                    // 正在消费的任务队列的键值

用来控制任务的消费逻辑:

1
2
3
4
5
messageDataChan      chan TaskInfo             // 接收队列数据的通道
consumeRateLimitType string // 队列的消费限制模式,并发数控制/QPS控制
limiter ratelimit.Limiter // QPS 限制的限流器
consumeBatchSize int64 // 每次从队列中拉取的消息数量
subOnce sync.Once // 订阅控制

完整的队列结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
type ZQueue struct {
redisClient *redis.Client // redis 客户端
messageDataChan chan TaskInfo // 接收队列数据的通道
queueName string // 队列名
consumeFilterKey string // 正在消费的任务队列的键值
distributeLock *distributelock.RedisLock // 分布式锁
distributeLockSecond int64 // 分布式锁的持有时间
consumeRateLimitType string // 队列的消费限制模式,并发数控制/QPS控制
limiter ratelimit.Limiter // QPS 限制的限流器
consumeBatchSize int64 // 每次从队列中拉取的消息数量
subOnce sync.Once // 订阅控制
}

接口抽象

首先要设计一下队列中任务消息的结构:

1
2
3
4
type TaskInfo struct {
Tag string // 任务标识,可以为唯一ID或者任务名
Body []byte // 任务详情
}

根据需求,抽象出队列的接口对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Queue interface {
// Publish 发布任务
Publish(ctx context.Context, task TaskInfo, priority ...int64) error
// Subscribe 订阅队列
Subscribe(ctx context.Context) <-chan TaskInfo
// Get 拉取任务
Get(ctx context.Context) (*TaskInfo, error)
// Ack 任务响应
Ack(ctx context.Context, taskTag string, delLock bool) error
// Rank 任务序号
Rank(ctx context.Context, taskTag string) (int64, error)
// TotalCount 任务总数
TotalCount(ctx context.Context) (int64, error)
// Score 任务分数
Score(ctx context.Context, taskTag string) (float64, error)
// Del 删除任务
Del(ctx context.Context, taskTag string) error
// Exist 是否存在
Exist(ctx context.Context, taskTag string) bool
}

队列的创建方法

使用选项模式,自定义构建 ZQ 结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
type FunOption func(zq *ZQueue)

// WithDistributeLock 配置消费的分布式锁
func WithDistributeLock(lockKey string) FunOption {
return func(zq *ZQueue) {
zq.distributeLock = distributelock.NewRedisLock(zq.redisClient, lockKey, false)
}
}
// WithLockSecond 指定分布式锁的持有时间
func WithLockSecond(second int64) FunOption {
return func(zq *ZQueue) {
zq.distributeLockSecond = second
}
}
// WithConsumeRateLimitType 指定任务的消费模式
/*
const (
RateLimitTypeOfConcurrency = "concurrency"
RateLimitTypeOfQps = "qps"
)
*/
func WithConsumeRateLimitType(limitType string, qpsLimit int64, consumeChanSize int64) FunOption {
return func(zq *ZQueue) {
zq.ConsumeRateLimitType = limitType
if limitType == RateLimitTypeOfQps {
zq.limiter = ratelimit.New(int(qpsLimit))
zq.consumeBatchSize = qpsLimit + 1
} else {
zq.consumeBatchSize = consumeChanSize - 1
}
}
}

// NewZQueue 新建 ZQ
func NewZQueue(redisClient *redis.Client, queueName string, options ...FunOption) Queue {
zq := &ZQueue{
redisClient: redisClient,
queueName: fmt.Sprintf("{%s}", queueName),
distributeLockSecond: 480,
messageDataChan: make(chan TaskInfo, 10),
consumeFilterKey: generateConsumerFilterKey(queueName),
}
for _, option := range options {
option(zq)
}
return zq
}

任务发布

为保证原子性,使用 lua 脚本进行消息发布:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-- KEYS[1]: 队列的 key 值
-- KEYS[2]: 保存消息的 hash 表的 key 值
-- ARGV[1]: 保存到 zset 的 member
-- ARGV[2]: 保存到 zset 的 score
-- ARGV[3]: 保存到 hash 的 field key
-- ARGV[4]: 保存到 hash 的 消息体

local member = ARGV[1]
local score = tonumber(ARGV[2])
local body = ARGV[4]
local hashField = ARGV[3]

local count =redis.call("zadd",KEYS[1],score,member)
-- 消息已经存在
if count ==0 then
return 0
end
redis.call("hsetnx",KEYS[2],hashField,body)
return 1

发布消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Publish 发布任务
// task.Body 序列化后的消息体
// task.Tag 任务的唯一标识,可以是任务 ID 也可以是任务名
// priority 优先级,可用于插队,可以定义一个任务的优先级列表,根据任务优先级指定 priority
// 也可以直接指定为很小的数字,用来将任务插到前排
// 不指定的话默认为当前时间的时间戳,可以实现先进先出的排队逻辑
func (z *ZQueue) Publish(ctx context.Context, task TaskInfo, priority ...int64) error {
// 优先级
score := time.Now().UTC().Unix()
if len(priority) != 0 {
score = priority[0]
}
pushScript := redis.NewScript(PushMsgScript)
success, err := pushScript.Run(ctx,
z.redisClient,
[]string{generateTaskQueueKey(z.queueName), generateTaskHashMapKey(z.queueName)},
task.Tag, score, // zset 的 member 和 score
task.Tag, task.Body, // hash map 的 field 和 value
).Bool()
if err != nil {
return errors.Wrap(err, "publish msg to zset queue is occurred error")
}
if !success {
return errors.Newf("the message had publish duplicate ,task tag is %s", task.Tag)
}
return nil
}

任务消费

订阅模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// Subscribe 订阅任务队列
func (z *ZQueue) Subscribe(ctx context.Context) <-chan TaskInfo {
z.subOnce.Do(func() {
// 订阅的时候再去初始化通道,允许只写进程存在
z.messageDataChan = make(chan TaskInfo, z.consumeBatchSize)
go func() {
for {
// 加锁,任务提交完成即可释放锁
z.distributeLock.SetExpire(uint32(z.distributeLockSecond * 1000))
lockRes, _ := z.distributeLock.Lock(ctx)
if !lockRes {
time.Sleep(5 * time.Second)
continue
}
// 一次性从队列中获取并发处理数量个数的任务
message, err := z.redisClient.ZRange(ctx, generateTaskQueueKey(z.queueName), 0, z.consumeBatchSize).Result()
if err != nil || len(message) == 0 {
goto next
}
switch z.consumeRateLimitType {
case RateLimitTypeOfConcurrency:
z.getMessageByConcurrency(ctx, message)
case RateLimitTypeOfQps:
z.getMessageByQps(ctx, message)
}
next:
// 释放分布式锁,短暂睡眠
if releaseOK, err := z.distributeLock.Release(ctx, false); !releaseOK {
log.Println("zset queue release lock is fail", err)
}
time.Sleep(2 * time.Second)
}
}()
})
return z.messageDataChan
}

// getMessageByConcurrency 根据并发数拉取任务
func (z *ZQueue) getMessageByConcurrency(ctx context.Context, message []string) {
// 获取正在处理的任务数量
if num, err := z.redisClient.SCard(ctx, z.consumeFilterKey).Result(); err != nil || num >= int64(len(message)) {
return
}
for i := 0; i < len(message); i++ {
member := message[i]
// 从缓存中获取member 对应的消息
inputMsg, err := z.redisClient.HGet(ctx, generateTaskHashMapKey(z.queueName), member).Bytes()
if err != nil || len(inputMsg) == 0 {
// TODO 空消息体应该删除,否则一直占用队列空间
// 当前消息不存在或者获取错误,继续进行下个消息的消费
continue
}
isConsuming, err := z.redisClient.SIsMember(ctx, z.consumeFilterKey, member).Result()
if err != nil || isConsuming {
// 正在处理,跳过
continue
}
z.redisClient.SAdd(ctx, z.consumeFilterKey, member)
// 将消息推送到数据channel
z.messageDataChan <- TaskInfo{
Tag: member,
Body: inputMsg,
}
}
}

// getMessageByQps 根据 QPS 拉取任务
func (z *ZQueue) getMessageByQps(ctx context.Context, message []string) {
for i := 0; i < len(message); i++ {
z.limiter.Take()
member := message[i]
// 从缓存中获取member 对应的消息
inputMsg, err := z.redisClient.HGet(ctx, generateTaskHashMapKey(z.queueName), member).Bytes()
if err != nil || len(inputMsg) == 0 {
continue
}
isConsuming, err := z.redisClient.SIsMember(ctx, z.consumeFilterKey, member).Result()
if err != nil || isConsuming {
continue
}
z.redisClient.SAdd(ctx, z.consumeFilterKey, member)
// 将消息推送到数据channel
z.messageDataChan <- TaskInfo{
Tag: member,
Body: inputMsg,
}
}
}

拉取模式:TODO: TO BE COMPLETED

1
2
3
4
5
// Get 拉取任务
func (z *ZQueue) Get(ctx context.Context) (*TaskInfo, error) {
// TODO TO BE COMPLETED
return nil, nil
}

任务响应

为保证原子性,使用 lua 脚本进行任务响应:

1
2
3
4
5
6
7
8
-- KEYS[1]: 队列的 key 
-- KEYS[2]: 保存消息的哈希表的key
-- ARGV[1]: 消息体body

local member = ARGV[1]
local res = redis.call('ZREM',KEYS[1],member)
redis.call('HDEL',KEYS[2],member)
return res

任务响应:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Ack ack task
func (z *ZQueue) Ack(ctx context.Context, taskTag string, delLock bool) error {
for {
lockRes, _ := z.distributeLock.Lock(ctx)
if !lockRes {
time.Sleep(200 * time.Millisecond)
continue
} else {
break
}
}
defer func() {
res, err := z.distributeLock.Release(ctx, delLock)
if !res {
log.Println("zset queue release lock is fail", err)
}
}()
err := z.redisClient.Eval(ctx, AckMsgScript,
[]string{generateTaskQueueKey(z.queueName), generateTaskHashMapKey(z.queueName)},
taskTag,
).Err()
if err != nil {
return errors.Wrap(err, "remove message from queue is occured error")
}
z.redisClient.SRem(ctx, z.consumeFilterKey, taskTag)
return nil
}

获取任务序号

1
2
3
4
5
6
7
8
// Rank 获取任务序号
func (z *ZQueue) Rank(ctx context.Context, taskTag string) (int64, error) {
rank, err := z.redisClient.ZRank(ctx, generateTaskQueueKey(z.queueName), taskTag).Result()
if err != nil {
return 0, errors.Wrap(err, "get message rank from queue is occurred error")
}
return rank, nil
}

获取任务总数

1
2
3
4
5
6
7
8
9
// TotalCount 获取队列中的任务总数
func (z *ZQueue) TotalCount(ctx context.Context) (int64, error) {
count, err := z.redisClient.ZCard(ctx, generateTaskQueueKey(z.queueName)).Result()
if err != nil {
return 0, errors.Wrap(err, "get queue total member count is occurred error")
}
return count, nil

}

获取任务分数

1
2
3
4
5
6
7
8
// Score 获取任务分数
func (z *ZQueue) Score(ctx context.Context, taskTag string) (float64, error) {
score, err := z.redisClient.ZScore(ctx, generateTaskQueueKey(z.queueName), taskTag).Result()
if err != nil {
return 0, errors.Wrap(err, "get queue member score is occurred error")
}
return score, nil
}

删除任务

1
2
3
4
5
6
7
8
9
10
11
// Del 删除任务
func (z *ZQueue) Del(ctx context.Context, taskTag string) error {
err := z.redisClient.Eval(ctx, AckMsgScript,
[]string{generateTaskQueueKey(z.queueName), generateTaskHashMapKey(z.queueName)},
taskTag,
).Err()
if err != nil {
return errors.Wrap(err, "remove message from queue is occurred error")
}
return nil
}

任务是否存在

1
2
3
4
5
6
7
8
// Exist 任务是否存在
func (z *ZQueue) Exist(ctx context.Context, taskTag string) bool {
_, err := z.redisClient.ZScore(ctx, generateTaskQueueKey(z.queueName), taskTag).Result()
if err == redis.Nil || err != nil {
return false
}
return true
}

使用指南

使用方法

创建队列

1
2
3
4
queue := zqueue.NewZQueue(rCli, TestQueueName,
zqueue.WithDistributeLock(TestLockKey), zqueue.WithLockSecond(TestLockExpireSecond),
zqueue.WithConsumeRateLimitType(zqueue.RateLimitTypeOfConcurrency, 0, 5),
)

发布任务

不指定优先级:

1
2
3
4
queue.Publish(ctx,TaskInfo{
Tag:"0001",
Body:[]byte("message1 body")
})

指定优先级:

1
2
3
4
queue.Publish(ctx,TaskInfo{
Tag:"0002",
Body:[]byte("message2 body")
},0)

订阅任务消息

1
2
3
4
msgChan := queue.Subscribe(ctx)
for v := range msgChan{
// consume message
}

更多队列信息

1
2
3
4
5
6
7
8
// 任务序号
rank, _ := queue.Rank(ctx, taskID)
// 任务是否存在
exist := queue.Exist(ctx,taskID)
// 任务分数
score, _ := queue.Score(ctx, taskID)
// 队列总任务数
total, _ := queue.TotalCount(ctx)

完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main

import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"os"
"strconv"
"time"
zqueue "z-queue"
)

const (
TestQueueName = "Testing"
TestLockKey = "lock-key:testing"
TestLockExpireSecond = 5
)

func main() {
addr := os.Getenv("REDIS_EXAMPLE_ADDR")
pwd := os.Getenv("REDIS_EXAMPLE_PASSWORD")
db, _ := strconv.Atoi(os.Getenv("REDIS_EXAMPLE_DB"))
rCli := redis.NewClient(&redis.Options{
Addr: addr,
Password: pwd,
DB: db,
})

queue := zqueue.NewZQueue(rCli, TestQueueName,
zqueue.WithDistributeLock(TestLockKey), zqueue.WithLockSecond(TestLockExpireSecond),
zqueue.WithConsumeRateLimitType(zqueue.RateLimitTypeOfConcurrency, 0, 5),
)
go func() {
// subscript
ctx := context.Background()
msgChan := queue.Subscribe(ctx)
for message := range msgChan {
rank, _ := queue.Rank(ctx, message.Tag)
exist := queue.Exist(ctx, message.Tag)
score, _ := queue.Score(ctx, message.Tag)
total, _ := queue.TotalCount(ctx)
fmt.Printf("taskinfo: ID: %s, rank: %d, exist: %v, score: %d, total: %d\n",
message.Tag, rank, exist, int64(score), total)
go consumeMsg(queue, message.Tag)
}
}()

for i := 0; i < 10; i++ {
tID := fmt.Sprintf("taskID-0000-000%d", i)
err := queue.Publish(context.Background(), zqueue.TaskInfo{
Tag: tID,
Body: []byte("publish a message"),
})
if err != nil {
panic("push message is error")
}
time.Sleep(time.Second)
}
time.Sleep(60 * time.Second)
}

var consumerChan = make(chan struct{}, 2) // 可以调整缓冲区大小,验证并发限制

func consumeMsg(queue zqueue.Queue, taskID string) {
now := time.Now()
consumerChan <- struct{}{}
if time.Since(now) > time.Second {
fmt.Println("msg count out of limit", taskID)
}
time.Sleep(3 * time.Second)
fmt.Println("consume success", taskID)
_ = queue.Ack(context.Background(), taskID, false)
<-consumerChan
}

代码已上传至 github,感兴趣的同学可以点此跳转查阅源码,欢迎提交 issue 和 PR。

结语

本文主要记录了使用 go 和 redis 开发一个任务队列的过程,文章是在代码开发完成之后才撰写的,所以没有描述开发途中遇到的一些问题。该项目目前还处在初期,还有许多需要完善的地方,欢迎各位大佬提交 issue 和 PR。