背景
最近开发的一个项目,需要使用任务队列来控制任务的发布和消费。一开始做技术选型的时候,选择的是 RabbitMQ,一来是在其他项目使用过,轻车熟路了,无学习成本,二是有现成的 RabbitMQ 服务器可用,无需运维成本和额外的服务器成本。但后来出现了需要获取任务详情以及任务当前排号的需求,单独使用 RabbitMQ 的话无法实现这些需求。
在经过考量后,RabbitMQ 搭配 redis 的 ZSET 结构能实现以上的需求。考虑到 redis 也能实现队列的功能,因此决定放弃 RabbitMQ,完全使用 redis 实现项目所需的任务队列。
实现流程
需求分析
- 任务队列功能(任务发布、任务队列订阅、任务拉取、任务响应)
- 获取任务的详细信息
- 获取任务的排队情况
- 获取任务是否存在
- 获取任务总数
- 删除任务
实现步骤
- 任务队列的结构体设计
- 任务队列的接口抽象
- 实现队列接口
- 处理并发问题
任务队列的结构体设计
首先,该队列是使用 redis 进行实现的,所以必须要有的是 redis 的客户端连接,使用的是 github.com/go-redis/redis/v8
包:
1
| redisClient *redis.Client
|
正常任务队列的常规属性:
用来控制并发的并发锁:
1 2
| distributeLock *distributelock.RedisLock distributeLockSecond int64
|
用来获取查看处理中的任务:
用来控制任务的消费逻辑:
1 2 3 4 5
| messageDataChan chan TaskInfo consumeRateLimitType string limiter ratelimit.Limiter consumeBatchSize int64 subOnce sync.Once
|
完整的队列结构如下:
1 2 3 4 5 6 7 8 9 10 11 12
| type ZQueue struct { redisClient *redis.Client messageDataChan chan TaskInfo queueName string consumeFilterKey string distributeLock *distributelock.RedisLock distributeLockSecond int64 consumeRateLimitType string limiter ratelimit.Limiter consumeBatchSize int64 subOnce sync.Once }
|
接口抽象
首先要设计一下队列中任务消息的结构:
1 2 3 4
| type TaskInfo struct { Tag string Body []byte }
|
根据需求,抽象出队列的接口对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| type Queue interface { Publish(ctx context.Context, task TaskInfo, priority ...int64) error Subscribe(ctx context.Context) <-chan TaskInfo Get(ctx context.Context) (*TaskInfo, error) Ack(ctx context.Context, taskTag string, delLock bool) error Rank(ctx context.Context, taskTag string) (int64, error) TotalCount(ctx context.Context) (int64, error) Score(ctx context.Context, taskTag string) (float64, error) Del(ctx context.Context, taskTag string) error Exist(ctx context.Context, taskTag string) bool }
|
队列的创建方法
使用选项模式,自定义构建 ZQ 结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| type FunOption func(zq *ZQueue)
func WithDistributeLock(lockKey string) FunOption { return func(zq *ZQueue) { zq.distributeLock = distributelock.NewRedisLock(zq.redisClient, lockKey, false) } }
func WithLockSecond(second int64) FunOption { return func(zq *ZQueue) { zq.distributeLockSecond = second } }
func WithConsumeRateLimitType(limitType string, qpsLimit int64, consumeChanSize int64) FunOption { return func(zq *ZQueue) { zq.ConsumeRateLimitType = limitType if limitType == RateLimitTypeOfQps { zq.limiter = ratelimit.New(int(qpsLimit)) zq.consumeBatchSize = qpsLimit + 1 } else { zq.consumeBatchSize = consumeChanSize - 1 } } }
func NewZQueue(redisClient *redis.Client, queueName string, options ...FunOption) Queue { zq := &ZQueue{ redisClient: redisClient, queueName: fmt.Sprintf("{%s}", queueName), distributeLockSecond: 480, messageDataChan: make(chan TaskInfo, 10), consumeFilterKey: generateConsumerFilterKey(queueName), } for _, option := range options { option(zq) } return zq }
|
任务发布
为保证原子性,使用 lua 脚本进行消息发布:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
local member = ARGV[1] local score = tonumber(ARGV[2]) local body = ARGV[4] local hashField = ARGV[3]
local count =redis.call("zadd",KEYS[1],score,member)
if count ==0 then return 0 end redis.call("hsetnx",KEYS[2],hashField,body) return 1
|
发布消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
func (z *ZQueue) Publish(ctx context.Context, task TaskInfo, priority ...int64) error { score := time.Now().UTC().Unix() if len(priority) != 0 { score = priority[0] } pushScript := redis.NewScript(PushMsgScript) success, err := pushScript.Run(ctx, z.redisClient, []string{generateTaskQueueKey(z.queueName), generateTaskHashMapKey(z.queueName)}, task.Tag, score, task.Tag, task.Body, ).Bool() if err != nil { return errors.Wrap(err, "publish msg to zset queue is occurred error") } if !success { return errors.Newf("the message had publish duplicate ,task tag is %s", task.Tag) } return nil }
|
任务消费
订阅模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| func (z *ZQueue) Subscribe(ctx context.Context) <-chan TaskInfo { z.subOnce.Do(func() { z.messageDataChan = make(chan TaskInfo, z.consumeBatchSize) go func() { for { z.distributeLock.SetExpire(uint32(z.distributeLockSecond * 1000)) lockRes, _ := z.distributeLock.Lock(ctx) if !lockRes { time.Sleep(5 * time.Second) continue } message, err := z.redisClient.ZRange(ctx, generateTaskQueueKey(z.queueName), 0, z.consumeBatchSize).Result() if err != nil || len(message) == 0 { goto next } switch z.consumeRateLimitType { case RateLimitTypeOfConcurrency: z.getMessageByConcurrency(ctx, message) case RateLimitTypeOfQps: z.getMessageByQps(ctx, message) } next: if releaseOK, err := z.distributeLock.Release(ctx, false); !releaseOK { log.Println("zset queue release lock is fail", err) } time.Sleep(2 * time.Second) } }() }) return z.messageDataChan }
func (z *ZQueue) getMessageByConcurrency(ctx context.Context, message []string) { if num, err := z.redisClient.SCard(ctx, z.consumeFilterKey).Result(); err != nil || num >= int64(len(message)) { return } for i := 0; i < len(message); i++ { member := message[i] inputMsg, err := z.redisClient.HGet(ctx, generateTaskHashMapKey(z.queueName), member).Bytes() if err != nil || len(inputMsg) == 0 { continue } isConsuming, err := z.redisClient.SIsMember(ctx, z.consumeFilterKey, member).Result() if err != nil || isConsuming { continue } z.redisClient.SAdd(ctx, z.consumeFilterKey, member) z.messageDataChan <- TaskInfo{ Tag: member, Body: inputMsg, } } }
func (z *ZQueue) getMessageByQps(ctx context.Context, message []string) { for i := 0; i < len(message); i++ { z.limiter.Take() member := message[i] inputMsg, err := z.redisClient.HGet(ctx, generateTaskHashMapKey(z.queueName), member).Bytes() if err != nil || len(inputMsg) == 0 { continue } isConsuming, err := z.redisClient.SIsMember(ctx, z.consumeFilterKey, member).Result() if err != nil || isConsuming { continue } z.redisClient.SAdd(ctx, z.consumeFilterKey, member) z.messageDataChan <- TaskInfo{ Tag: member, Body: inputMsg, } } }
|
拉取模式:TODO: TO BE COMPLETED
1 2 3 4 5
| func (z *ZQueue) Get(ctx context.Context) (*TaskInfo, error) { return nil, nil }
|
任务响应
为保证原子性,使用 lua 脚本进行任务响应:
1 2 3 4 5 6 7 8
|
local member = ARGV[1] local res = redis.call('ZREM',KEYS[1],member) redis.call('HDEL',KEYS[2],member) return res
|
任务响应:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| func (z *ZQueue) Ack(ctx context.Context, taskTag string, delLock bool) error { for { lockRes, _ := z.distributeLock.Lock(ctx) if !lockRes { time.Sleep(200 * time.Millisecond) continue } else { break } } defer func() { res, err := z.distributeLock.Release(ctx, delLock) if !res { log.Println("zset queue release lock is fail", err) } }() err := z.redisClient.Eval(ctx, AckMsgScript, []string{generateTaskQueueKey(z.queueName), generateTaskHashMapKey(z.queueName)}, taskTag, ).Err() if err != nil { return errors.Wrap(err, "remove message from queue is occured error") } z.redisClient.SRem(ctx, z.consumeFilterKey, taskTag) return nil }
|
获取任务序号
1 2 3 4 5 6 7 8
| func (z *ZQueue) Rank(ctx context.Context, taskTag string) (int64, error) { rank, err := z.redisClient.ZRank(ctx, generateTaskQueueKey(z.queueName), taskTag).Result() if err != nil { return 0, errors.Wrap(err, "get message rank from queue is occurred error") } return rank, nil }
|
获取任务总数
1 2 3 4 5 6 7 8 9
| func (z *ZQueue) TotalCount(ctx context.Context) (int64, error) { count, err := z.redisClient.ZCard(ctx, generateTaskQueueKey(z.queueName)).Result() if err != nil { return 0, errors.Wrap(err, "get queue total member count is occurred error") } return count, nil
}
|
获取任务分数
1 2 3 4 5 6 7 8
| func (z *ZQueue) Score(ctx context.Context, taskTag string) (float64, error) { score, err := z.redisClient.ZScore(ctx, generateTaskQueueKey(z.queueName), taskTag).Result() if err != nil { return 0, errors.Wrap(err, "get queue member score is occurred error") } return score, nil }
|
删除任务
1 2 3 4 5 6 7 8 9 10 11
| func (z *ZQueue) Del(ctx context.Context, taskTag string) error { err := z.redisClient.Eval(ctx, AckMsgScript, []string{generateTaskQueueKey(z.queueName), generateTaskHashMapKey(z.queueName)}, taskTag, ).Err() if err != nil { return errors.Wrap(err, "remove message from queue is occurred error") } return nil }
|
任务是否存在
1 2 3 4 5 6 7 8
| func (z *ZQueue) Exist(ctx context.Context, taskTag string) bool { _, err := z.redisClient.ZScore(ctx, generateTaskQueueKey(z.queueName), taskTag).Result() if err == redis.Nil || err != nil { return false } return true }
|
使用指南
使用方法
创建队列
1 2 3 4
| queue := zqueue.NewZQueue(rCli, TestQueueName, zqueue.WithDistributeLock(TestLockKey), zqueue.WithLockSecond(TestLockExpireSecond), zqueue.WithConsumeRateLimitType(zqueue.RateLimitTypeOfConcurrency, 0, 5), )
|
发布任务
不指定优先级:
1 2 3 4
| queue.Publish(ctx,TaskInfo{ Tag:"0001", Body:[]byte("message1 body") })
|
指定优先级:
1 2 3 4
| queue.Publish(ctx,TaskInfo{ Tag:"0002", Body:[]byte("message2 body") },0)
|
订阅任务消息
1 2 3 4
| msgChan := queue.Subscribe(ctx) for v := range msgChan{ }
|
更多队列信息
1 2 3 4 5 6 7 8
| rank, _ := queue.Rank(ctx, taskID)
exist := queue.Exist(ctx,taskID)
score, _ := queue.Score(ctx, taskID)
total, _ := queue.TotalCount(ctx)
|
完整示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| package main
import ( "context" "fmt" "github.com/go-redis/redis/v8" "os" "strconv" "time" zqueue "z-queue" )
const ( TestQueueName = "Testing" TestLockKey = "lock-key:testing" TestLockExpireSecond = 5 )
func main() { addr := os.Getenv("REDIS_EXAMPLE_ADDR") pwd := os.Getenv("REDIS_EXAMPLE_PASSWORD") db, _ := strconv.Atoi(os.Getenv("REDIS_EXAMPLE_DB")) rCli := redis.NewClient(&redis.Options{ Addr: addr, Password: pwd, DB: db, })
queue := zqueue.NewZQueue(rCli, TestQueueName, zqueue.WithDistributeLock(TestLockKey), zqueue.WithLockSecond(TestLockExpireSecond), zqueue.WithConsumeRateLimitType(zqueue.RateLimitTypeOfConcurrency, 0, 5), ) go func() { ctx := context.Background() msgChan := queue.Subscribe(ctx) for message := range msgChan { rank, _ := queue.Rank(ctx, message.Tag) exist := queue.Exist(ctx, message.Tag) score, _ := queue.Score(ctx, message.Tag) total, _ := queue.TotalCount(ctx) fmt.Printf("taskinfo: ID: %s, rank: %d, exist: %v, score: %d, total: %d\n", message.Tag, rank, exist, int64(score), total) go consumeMsg(queue, message.Tag) } }()
for i := 0; i < 10; i++ { tID := fmt.Sprintf("taskID-0000-000%d", i) err := queue.Publish(context.Background(), zqueue.TaskInfo{ Tag: tID, Body: []byte("publish a message"), }) if err != nil { panic("push message is error") } time.Sleep(time.Second) } time.Sleep(60 * time.Second) }
var consumerChan = make(chan struct{}, 2)
func consumeMsg(queue zqueue.Queue, taskID string) { now := time.Now() consumerChan <- struct{}{} if time.Since(now) > time.Second { fmt.Println("msg count out of limit", taskID) } time.Sleep(3 * time.Second) fmt.Println("consume success", taskID) _ = queue.Ack(context.Background(), taskID, false) <-consumerChan }
|
代码已上传至 github,感兴趣的同学可以点此跳转查阅源码,欢迎提交 issue 和 PR。
结语
本文主要记录了使用 go 和 redis 开发一个任务队列的过程,文章是在代码开发完成之后才撰写的,所以没有描述开发途中遇到的一些问题。该项目目前还处在初期,还有许多需要完善的地方,欢迎各位大佬提交 issue 和 PR。