前言

在现代分布式系统和微服务架构中,消息队列扮演着至关重要的角色。它不仅能够实现系统之间的解耦,还能提供异步处理、负载均衡和削峰填谷等多种功能。RabbitMQ 作为业界广泛使用的消息代理之一,凭借其稳定性、灵活性和丰富的功能,成为了众多开发者和企业的首选。本文将深入探讨消息队列的基础知识,并逐步介绍 RabbitMQ 的基本概念、核心特性及其在 Go 语言中的应用,帮助你在项目中充分利用这一强大的工具。

消息队列

消息队列的作用

消息队列最主要的三个优势:解耦异步削峰

解耦

解耦指的是系统各个部分可以独立开发、部署和维护,不需要直接互相依赖。通过消息队列,生产者和消费者可以独立运行,即使其中一个出现问题,也不会直接影响到另一个。

🌰举个例子

假设有一个Web应用程序需要记录大量的日志信息,这些日志信息需要存储到不同的日志处理系统中,例如文件系统、数据库或云存储。

不使用消息队列:

  • Web 应用程序直接调用多个日志处理系统的 API;
  • 每次记录日志时,Web 应用程序必须等待所有日志处理系统处理完毕;

解耦-不使用消息队列

使用消息队列;

  • Web 应用程序将日志消息发送到消息队列;
  • 各个日志处理系统(文件系统、数据库、云存储)分别从消息队列中读取日志消息进行处理;

解耦-使用消息队列

两者相比,可以得出使用消息队列的优势:

  • Web 应用程序与日志处理系统解耦,各日志处理系统的故障不会直接影响 Web应用程序的正常运行;
  • 各日志处理系统可以独立扩展和维护;

异步

异步处理意味着生产者不需要等待消费者处理完毕即可继续处理其他任务。这样可以提高系统的响应速度和处理能力。

🌰举个例子

在一个用户注册的场景中,需要发送注册确认邮件。

不使用消息队列:

  • 用户注册时,系统直接发送确认邮件;
  • 用户需要等待邮件发送成功才能完成注册;

异步-不使用消息队列

使用消息队列:

  • 用户注册后,系统将发送邮件的任务放入消息队列;
  • 系统立即响应用户,注册完成;
  • 发送邮件的任务由后台系统异步处理,从消息队列中读取任务并发送邮件;

异步-使用消息队列

两者相比,可以得出使用消息队列的优势:

  • 用户注册和发送邮件的任务被异步处理,用户注册系统无需等待邮件发送完成,提高了系统的响应速度

削峰

削峰指的是通过消息队列将瞬时大量的请求平滑处理,避免系统被突发的高峰流量压垮。消息队列可以缓冲高峰流量,将其平滑到可处理的速率。

🌰举个例子

假设某电商网站在促销活动开始时,订单量激增,服务器压力剧增。

不使用消息队列:

  • 订单系统直接处理所有订单请求,可能导致服务器过载崩溃;

削峰-不使用消息队列

使用消息队列:

  • 订单系统将订单请求放入消息队列;
  • 后台处理系统按照自身处理能力从消息队列中读取订单进行处理;
  • 通过消息队列的缓冲作用,避免服务器过载;

削峰-使用消息队列

两者相比,可以得出使用消息队列的优势:

  • 订单系统将订单请求放入消息队列,消息队列作为缓冲区,平滑了高峰流量,避免了系统过载

主流的消息队列以及如何选择

kafka

Apache Kafka 是一个分布式流处理平台,旨在处理实时数据流。

优点:

  • 高吞吐量:适用于处理大规模数据流。
  • 持久化存储:消息可以持久化到磁盘,确保消息不丢失。
  • 可扩展性:容易扩展,支持集群模式。
  • 强大的一致性保障:支持多副本和分区,保证高可用性和数据一致性。

缺点:

  • 复杂性:配置和管理较为复杂,需要一定的维护成本。
  • 延迟:由于设计为高吞吐量,可能在低延迟场景下不如其他消息队列。

RocketMQ

RocketMQ 是阿里基于 kafka 和自己的业务需求开发的一款消息队列,天生为金融互联网领域而生,用于对可靠性要求很高的场景,尤其是电商平台的订单扣款和流量削峰。 RocketMQ 经过了多次阿里双11的考验,在可靠性上是没话说的。

优点:

  • 高性能和高吞吐量:适用于高并发场景;
  • 可靠性和一致性:提供消息持久化、事务消息和严格的顺序保证;
  • 可扩展性:支持分布式架构,易于扩展集群;
  • 丰富的功能:支持定时消息、延时消息、批量消息和顺序消息等;
  • 支持多种协议:如 JMS、OpenMessaging 等,兼容性强;

缺点:

  • 复杂性:部署和管理相对复杂,需要一定的学习和维护成本;
  • 社区支持:相比于 Kafka 和 RabbitMQ,社区和生态系统相对较小;

RabbitMQ

RabbitMQ 是使用 erlang 语言开发的一款消息队列,结合了 erlang 语言本身的并发优势,性能好,时效性可达微妙级,还有自带的管理界面。

优点:

  • 灵活的路由:支持复杂的路由策略,包括直接交换、主题交换、扇出交换等;
  • 可靠性:提供消息确认机制,确保消息可靠传递;
  • 丰富的客户端支持:支持多种编程语言和协议(如 AMQP);

缺点:

  • 性能和吞吐量:相比 Kafka,吞吐量较低。
  • 复杂性:配置和使用较为复杂,尤其是在高可用性场景下。

ActiveMQ

ActiveMQ 是一个功能齐全的消息代理,支持多种消息传递协议。它是 Apache 软件基金会的顶级项目之一,适用于企业级消息传递。

优点:

  • 支持多种协议:包括 AMQP、MQTT、STOMP 等;
  • 高可用性和持久性:提供消息持久化和高可用性选项
  • 企业级功能:支持事务、消息过滤等高级功能;

缺点:

  • 性能:在高吞吐量场景下,性能可能不如 Kafka;
  • 复杂性:配置和调优需要一定的经验;

ZeroMQ

ZeroMQ 是一个高性能异步消息库,旨在构建分布式和并行计算应用。它提供了多种消息传递模式,包括请求-响应、发布-订阅、推送-拉取等。

优点:

  • 极高的性能:ZeroMQ 设计为轻量级消息传递库,具有非常低的延迟和高吞吐量;
  • 灵活性:支持多种消息传递模式,可以构建多种网络拓扑;
  • 多语言支持:支持多种编程语言,适合跨语言开发;
  • 无中间件:无需独立的消息服务器,直接在应用之间传递消息,减少了中间层的复杂性和延迟;

缺点:

  • 缺乏持久化:ZeroMQ 本身不提供消息持久化功能,需要额外的解决方案来实现数据持久性;
  • 无内置高级特性:如消息确认、重试、顺序保证等,需要在应用层实现这些功能;
  • 学习曲线:由于其灵活性和功能丰富,可能需要一定的学习曲线来熟悉其 API 和模式;

总结对比

消息队列 简述 优点 缺点 应用场景
Kafka 分布式流处理平台 高吞吐量、持久化存储、可扩展性、一致性保障 复杂性高、延迟相对较高 实时数据流处理、日志收集和分析、事件溯源、数据管道
RocketMQ 分布式消息中间件,高吞吐量、低延迟、高可靠性和高可扩展性 高性能、高吞吐量、可靠性和一致性、丰富的功能、多协议支持 部署和管理复杂、社区支持较小 金融服务、电商平台、实时数据处理
RabbitMQ 开源消息代理软件,支持 AMQP 灵活的路由、可靠性高、丰富的客户端支持 性能和吞吐量较低、配置复杂 任务队列、分布式系统中的消息传递、工作流系统
ActiveMQ 企业级消息代理,支持多种协议 支持多种协议、高可用性和持久性、企业级功能 性能较低、配置和调优复杂 企业级消息传递、复杂路由和过滤、需要事务支持的场景
ZeroMQ 高性能异步消息库,支持多种消息传递模式 极高性能、灵活性强、多语言支持、无需独立消息服务器 缺乏持久化、无内置高级特性、学习曲线较陡 高性能计算、分布式系统、实时分析

RabbitMQ

结构和名词解释

一个简单的 Rabbit 消息队列结构如图所示:

RabbitMQ简单结构示意

从左到右依次介绍以上各个名词的含义:

  • producer:生产者,消息的发送者;
  • connection:TCP 连接,用与客户端和 RabbitMQ 服务器间的数据传递;
  • channel:通道,一条 TCP 连接上可以创建多个通道,客户端通过通道与 RabbitMQ 建立通信,达到复用 TCP 连接,减小 TCP 连接的创建开销的目的;
  • broker:主机,接收和分发消息的应用,可以认为 broker 就是 RabbitMQ server;
  • virtual host(vhost):虚拟服务器,将整个 RabbitMQ 划分为多个虚拟服务器,实现多租户、安全、分组等功能。每个 vhost 之间是互相隔离的;
  • exchange:交换机,消息到达 RabbitMQ 后首先会到达交换机,交换机根据规则将消息传递到一个或多个消息队列;
  • binding:交换机通过 bind-key 与消息队列进行绑定;
  • queue:消息队列,根据队列规则存放消息,等待消费者取走;
  • consumer:消费者

AMQP

从上文我们知道 RabbitMQ 实现了 AMQP 协议,此处简单介绍一下 AMQP 协议是什么。

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个开放的标准应用层协议,主要用于消息导向的中间件。它允许消息在不同系统或应用程序之间可靠、安全地传递。

AMQP 协议具有以下核心特性:

  • 消息可靠传递:消息确认和事务,确保消息从生产者到消费者的可靠传递;
  • 灵活的消息路由:通过交换机和绑定实现复杂的消息路由机制;
  • 消息安全:支持多钟认证和授权机制,确保消息的安全传输;
  • 支持事务:支持事务,确保一组消息的原子性操作;

作为一个标准化的协议,所有客户端都可以依据规范去实现 AMQP 协议。这代表着 RabbitMQ 可以和不同语言的客户端库进行通信,只要它们遵循了 AMQP 协议。

交换机

在 RabbitMQ 中,交换机(exchange)是消息路由的核心组件。交换机根据绑定规则将从生产者接收到的消息路由到一个或多个队列。交换机有多种属性,下面详细介绍这些属性以及交换机的类型。

属性列表

  1. 名称(name):
    • 交换机的唯一标识符
    • 可以是任意字符串,但空字符串表示的是默认交换机
  2. 类型(type):下文单独介绍
  3. 持久性(durable):
    • 设置为持久化的交换机,在 RabbitMQ 服务器重启后仍然存在
    • 持久化的交换机存储在磁盘上,但消息是否持久化还需要取决于消息队列和消息
  4. 自动删除(auto-delete):
    • 当所有绑定交交换机上的队列都被删除后,自动删除该交换机
    • 适用于临时使用的交换机
  5. 内部(internal):
    • 内部交换机只能被其他交换机绑定,客户端不能直接发送消息到该交换机
    • 用于实现复杂的路由逻辑
  6. 参数(arguments):用于设置一些其他的参数

交换机类型

  1. 扇形交换机(fanout):该类型的交换机会将消息投递到与之绑定的所有消息队列,即广播。一般会在发布订阅模式中使用这种类型;
  2. 直连交换机(direct):该类型的交换机会根据消息的 routing-key,将其投递到 bind-key 完全匹配的消息队列。在简单、工作、路由模式中会使用这种类型;
  3. 主题交换机(topic):该类型的交换机允许与消息队列绑定的 bind-key 使用通配符*#,其中*代表一个单词,#代表多个单词,每个单词之间使用.进行分割。消息投递时,会根据 routing-key 将其投递到匹配的消息队列。一般会在主题模式中使用这种类型;

小tips:

从上文的介绍中我们知道,直连类型的交换机会将消息投递到 routing-key 和 bind-key 匹配的队列,但若是没有为交换机绑定队列时,消息将会发送到 routing-key 指定名字的消息队列上。简单点理解的话就是,没有为直连交换机绑定队列时,消息的 routing-key 即是目标队列的名字。

RabbitMQ 每一个 vhost 都有一个默认的交换机,名字为 “”,可以称其为 default exchange,这是一个 direct 类型的交换机。

这两点将会在后续简单模式和工作模式的消息队列中应用到。

队列

在 RabbitMQ 中,队列(queue)是消息存储的核心组件。生产者将消息发送到队列中,消费者从队列中获取并处理消息。RabbitMQ 的队列有多种属性,这些属性可以配置队列的行为和特性。下面详细介绍这些属性。

  1. 名称(name):
    • 队列的唯一标识符,可以是任意字符串,如果不指定名称,RabbitMQ 会生成一个唯一的名称
  2. 持久性(durable):
    • 持久队列在 RabbitMQ 服务器重启后仍然存在
    • 持久队列仅保证队列的定义是持久的,不保证队列中的消息也是持久的。消息的持久化需要单独配置
  3. 自动删除(auto-delete):
    • 当所有消费者断开连接后,自动删除该队列
    • 适用于临时使用的队列,如RPC的响应队列
  4. 独占(exclusive):
    • 独占队列仅限于创建它的连接使用,并且在连接断开时自动删除该队列
    • 适用于客户端与服务器之间的临时通信,如唯一会话
  5. 消息生存时间(TTL):
    • 队列中消息的存活时间,可以在队列级别和消息级别分别设置
    • 通过队列参数 x-message-ttl 设置,以毫秒为单位
    • 适用于希望消息在队列中保留一定时间后自动删除的场景
  6. 最大长度(max-length):
    • 队列中允许的最大消息数量,当达到此限制时,旧消息将被丢弃或移除
    • 通过队列参数 x-max-length 设置
    • 适用于希望限制队列长度,防止队列无限增长的场景
  7. 最大字节数(max-length-bytes):
    • 队列中允许的最大字节数,当达到此限制时,旧消息将被丢弃或移除
    • 通过队列参数 x-max-length-bytes 设置
    • 适用于希望限制队列的内存使用,防止队列无限增长的场景
  8. 死信交换机(dead-letter exchange, DLX):
    • 当消息在队列中因超时、被拒绝或达到最大长度时,会被转发到死信交换机
    • 通过队列参数 x-dead-letter-exchange 设置
    • 适用于希望处理死信消息,如消息重试或错误处理的场景
  9. 死信路由键(dead-letter routing key):
    • 转发到死信交换机的消息可以指定路由键
    • 通过队列参数 x-dead-letter-routing-key 设置
    • 与死信交换机配合使用,进一步控制死信消息的路由

RabbitMQ 的几种模式

简单模式(HELLO-WORLD模式)

生产者发送消息到消息队列,消费者从消息队列中拉取消息。

简单模式示意图

简单模式使用 direct 类型的交换机,一般情况会直接使用 “” 作为交换机名,即使用默认交换机。

在使用时,只需要将目标消息队列的名字通过 routing-key 传递到 RabbitMQ,默认交换机由于没有跟目标队列建立绑定关系,会将消息投递到队列名为 routing-key 的队列中。

通过这种方式,可以实现忽略交换机存在,不需要显示创建交换机,只需指定目标队列名即可进行消息的投递。

工作模式

工作模式其实就是多消费者的简单模式,生产者发送消息到消息队列,多个消费者从消息队列中拉取消息。

工作模式示意图

工作模式区分于简单模式,需要注意消息队列的消息分发原则。默认情况下消息队列将遵循循环调度原则(轮询),消息会按顺序依次发送给每个消费者,每个消费者接收到的消息总数是平均的。

可以在消费端通过设置限流来设置分发模式为公平调度:设置预取值为1,告诉 RabbitMQ 每次只向一个 worker 发送一条消息,在该 worker 响应之前,不再发送新的消息。

例如,在 golang 客户端中,可以使用 Qos 方法设置预取值:

1
2
3
4
5
func (ch *Channel) Qos(prefetchCount int, prefetchSize int, global bool) error
// prefetchCount : 消费端的消息处理上限,设置为N后一旦该消费端有N个消息没有ACK就会阻塞,不再从消息队列中消费消息
// prefetchSize : 消息大小限制,一般设置为0,消费端不做限制
// global : 是否将上述设置运用于channel
// 需要注意的是 Qos 只在 auto-Ack 为 false 的情况下才会生效

发布订阅模式

生产者将数据发送到交换机,交换机会将数据发送到所有订阅的消息队列(广播)。

发布订阅模式示意图

发布订阅模式使用 fanout 类型的交换机,在使用前,需要先创建交换机和队列,并将两者绑定起来,bind-key 可以不做指定。

生产者发布消息时,仅需要指定交换机名,消息传递到交换机后,会广播到所有与之绑定的消息队列中,消息队列再将消息传递给消费者。需要注意的是,若是一个消息队列有多个消费者,遵循的仍然是工作模式的消息分发规则。

路由模式

生产者将数据交给交换机时会指定一个路由键,交换机根据该路由键将数据发送给绑定了该路由键的消息队列。

路由模式示意图

路由模式使用 direct 类型的交换机,在使用前,需要创建交换机和队列,将两者通过指定的 bind-key 进行绑定。

生产者发布消息时,需要指定交换机和 routing-key,消息传递到交换机后,会分发到 bind-key 匹配的消息队列中,有多个队列匹配的话,每个队列都会收到消息,消息队列再将消息传递给消费者。

主题模式

生产者将数据交给交换机时会指定一个路由键,交换机根据该路由键将数据发送给绑定了该路由键的消息队列。

主题模式示意图

路由模式使用 topic 类型的交换机,在使用前,需要创建交换机和队列,将两者通过指定的 bind-key 进行绑定,bind-key 可以使用*#来分别指代一个单词和多个单词,单词和单词之间使用.进行分割。

生产者发布消息时,需要指定交换机和 routing-key,消息传递到交换机后,会分发到 bind-key 规则匹配的消息队列中,有多个队列匹配的话,每个队列都会收到消息,消息队列再将消息传递给消费者。

使用 RabbitMQ 的工作流程

RabbitMQ 服务端

  1. 创建用户并指定密码;
  2. 创建虚拟服务器;

生产者客户端

  1. 连接 RabbitMQ;
  2. 打开一个通信 channel;
  3. 声明一个交换机(简单模式和工作模式可以忽略);
  4. 声明一个队列;
  5. 绑定交换机和队列(简单模式和工作模式可以忽略);
  6. 发布消息到交换机;
  7. 关闭通道和连接;

消费者客户端

  1. 连接 RabbitMQ;
  2. 打开一个通信 channel;
  3. 声明一个交换机(简单模式和工作模式可以忽略);
  4. 声明一个队列;
  5. 绑定交换机和队列(简单模式和工作模式可以忽略);
  6. 从消息队列中接收消息;
  7. 响应消息;
  8. 关闭通道和连接;

可以注意到,生产者和消费者都进行了交换机、队列声明和绑定,这是由于无法保证两端的启动顺序。若是任一端可以保证发送或接收消息前,交换机和队列已经存在了,则可以省略 3-5 步骤。

注意,生产者和消费者重复进行声明的交换机和队列,属性必须完全一致。已创建的交换机/队列,在第二次重复声明时,若是属性不一致的话会返回错误。

在 golang 中使用 RabbitMQ

安装并导入 amqp 包

github.com/streadway/amqp 是 golang 中使用较多的用来操作 RabbitMQ 的包,可以使用以下命令安装它:

1
go get github.com/streadway/amqp
1
import "github.com/streadway/amqp"

需要注意一点,该仓库已经不在维护了。在使用的时候,可以考虑采用下面的 fork 仓库:

1
go get github.com/rabbitmq/amqp091-go

若是从 streadway/amqp 迁移到 rabbitmq/amqp091-go,可以使用包别名来减少代码的修改:

1
import amqp "github.com/rabbitmq/amqp091-go"

amqp 包的方法介绍

连接操作

连接 RabbieMQ 服务器:

1
2
3
func Dial(url string) (*Connection, error)
// url 使用 AMQP URI 格式的字符串
// 如:amqp://user:pwd@localhost:5672/vhost

创建 channel:

1
func (c *Connection) Channel() (*Channel, error)

设置通道的预取值:

1
2
3
4
5
6
7
func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error
/*
prefetchCount : 消费端的消息处理上限,设置为N后一旦该消费端有 N 个消息没有 ACK 就会阻塞,不再从消息队列中消费消息
prefetchSize : 消息大小限制,一般设置为 0,消费端不做限制
global : 是否将上述设置运用于 channel
需要注意的是 Qos 只在 auto-Ack 为 false 的情况下才会生效
*/

交换机和队列操作

声明交换机:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
/*
name:交换机名
kind:交换机类型,可选:
ExchangeDirect = "direct"
ExchangeFanout = "fanout"
ExchangeTopic = "topic"
ExchangeHeaders = "headers"
durable:持久化
autoDelete:自动删除
internal:内部
noWait:异步声明,不等待服务器响应即可直接声明
args:更多参数
*/

声明队列:

1
2
3
4
5
6
7
8
9
10
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)

/*
name:队列名
durable:持久化
autoDelete:自动删除
exclusive:独占
noWait:异步声明,不等待服务器响应即可直接声明
args:更多参数
*/

交换机和队列绑定:

1
2
3
4
5
6
7
8
func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error
/*
name:队列名
key:bind-key
exchange:交换机名
noWait:异步声明,不等待服务器响应即可直接声明
args:更多参数
*/

消息发布和消费

发布消息:

1
2
3
4
5
6
7
8
func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
/*
exchange:交换机名
key:routing-key
mandatory:强制消费标志,置为 true 时,若是消息无法被路由到任何队列,则消息会返回给生产者,并触发一个 basic.return 回调
immediate:立即消费标志,置为 true 时,若是没有队列能够立即消费该消息(所有绑定的队列都没有消费者),则消息会返回给生产者,并触发一个 basic.return 回调
msg:消息体
*/

消费消息(订阅,服务端推送模式):

1
2
3
4
5
6
7
8
9
10
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
/*
queue:队列名
consumer:消费者名
autoAck:自动应答
exclusive:独占,当置为true时,服务器将确保这是该队列中的唯一消费者。当置为 false 时,服务器将在多个消费者之间公平地分发交付
noLocal:RabbitMQ 不支持该标志,可忽略
noWait:异步操作,不等待服务器响应即可直接声明
args:其他参数
*/

消费消息(拉取):

1
2
3
4
5
6
7
8
func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error)
/*
queue:队列名
autoAck:自动应答
msg:消息
ok:是否获取到消息,成功获取到消息该值为 true,该值为 false 时 msg 的内容不可用
err:是否发生错误,不为 nil 时 msg 和 ok 的内容不可用
*/

消息应答

通过 channel 应答/不应答指定消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
func (ch *Channel) Ack(tag uint64, multiple bool) error
/*
tag:消息标志
multiple:批量应答,置为 true 时将应答之前收到的所有消息
*/

func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error
/*
tag:消息标志
multiple:批量应答,置为 true 时将应答之前收到的所有消息
requeue:消息是否返回队列中
*/

直接使用消息进行应答/不应答:

1
2
3
func (d Delivery) Ack(multiple bool) error

func (d Delivery) Nack(multiple, requeue bool) error

代码演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main

import (
"fmt"
"github.com/streadway/amqp"
"log"
"strconv"
"time"
)

const (
RabbitMQUri = "amqp://guest:guest@localhost:5672/example"
ExchangeName = "exampleExchange"
QueueNameA = "exampleQueueA"
QueueNameB = "exampleQueueB"
BindKeyA = "exampleBindKeyA"
BindKeyB = "exampleBindKeyB"
)

func main() {
conn, err := amqp.Dial(RabbitMQUri)
if err != nil {
log.Fatalln(err)
}
defer conn.Close()
// 创建通道,使用通道来操作 RabbitMQ
ch, err := conn.Channel()
defer ch.Close()
// 创建交换机
err = ch.ExchangeDeclare(ExchangeName, amqp.ExchangeDirect, true, false, false, false, nil)
if err != nil {
log.Fatalln(err)
}
// 创建队列
queueA, err := ch.QueueDeclare(QueueNameA, true, false, false, false, nil)
if err != nil {
log.Fatalln(err)
}
queueB, err := ch.QueueDeclare(QueueNameB, true, false, false, false, nil)
if err != nil {
log.Fatalln(err)
}
// 绑定队列
err = ch.QueueBind(queueA.Name, BindKeyA, ExchangeName, false, nil)
if err != nil {
log.Fatalln(err)
}
err = ch.QueueBind(queueB.Name, BindKeyB, ExchangeName, false, nil)
if err != nil {
log.Fatalln(err)
}
// 启动一个协程来写消息
go func() {
ch, _ := conn.Channel()
defer ch.Close()
for i := 0; i < 10; i++ {
msgBody := "HelloWorld! times:" + strconv.Itoa(i)
queueName := BindKeyB
if i%2 == 0 {
queueName = BindKeyA
}
err := ch.Publish(ExchangeName, queueName, false, false, amqp.Publishing{
ContentType: "text/plain",
DeliveryMode: amqp.Persistent, // 交付模式,Transient(0/1)表示暂时,amqp.Persistent(2)表示持久,暂时交付可以提高吞吐量,持久交付可以保证数据在服务重启后不丢失
Priority: 0, // 优先级
Timestamp: time.Now().UTC(),
Body: []byte(msgBody),
})
if err != nil {
log.Fatalln(err)
}
}
}()

// 开启两个消费者消费 A 队列的消息
go func() {
ch, _ := conn.Channel()
defer ch.Close()
consumerName := "consumerA-1"
msgs, err := ch.Consume(queueA.Name, consumerName, false, false, false, false, nil)
if err != nil {
log.Fatalln(err)
}
for msg := range msgs {
fmt.Printf("%s consume : %v == %s\n", consumerName, msg.Timestamp, string(msg.Body))
time.Sleep(500 * time.Millisecond)
_ = msg.Ack(false)
}
}()

go func() {
ch, _ := conn.Channel()
defer ch.Close()
consumerName := "consumerA-2"
msgs, err := ch.Consume(queueA.Name, consumerName, false, false, false, false, nil)
if err != nil {
log.Fatalln(err)
}
for msg := range msgs {
fmt.Printf("%s consume : %v == %s\n", consumerName, msg.Timestamp, string(msg.Body))
time.Sleep(500 * time.Millisecond)
_ = msg.Ack(false)
}
}()

// 主协程消费 B 队列的消息
for {
time.Sleep(200 * time.Millisecond) // 使用延迟,确保发布者已经向队列 B 发布了消息
ch, _ := conn.Channel()
msg, ok, err := ch.Get(QueueNameB, false)
if err != nil {
log.Fatalln(err)
}
if !ok {
_ = ch.Close()
break
}
fmt.Printf("consumeB : %v == %s\n", msg.Timestamp, string(msg.Body))
time.Sleep(500 * time.Millisecond)
_ = msg.Ack(false)
}

time.Sleep(3 * time.Second)
fmt.Println("main goroutine finish")
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
consumerA-1 consume : 2023-05-14 11:30:50 +0800 CST == HelloWorld! times:0
consumerA-2 consume : 2023-05-14 11:30:50 +0800 CST == HelloWorld! times:8
consumeB : 2023-05-14 11:30:50 +0800 CST == HelloWorld! times:1
consumerA-1 consume : 2023-05-14 11:30:50 +0800 CST == HelloWorld! times:2
consumeB : 2023-05-14 11:30:50 +0800 CST == HelloWorld! times:3
consumerA-1 consume : 2023-05-14 11:30:50 +0800 CST == HelloWorld! times:4
consumerA-1 consume : 2023-05-14 11:30:50 +0800 CST == HelloWorld! times:6
consumeB : 2023-05-14 11:30:50 +0800 CST == HelloWorld! times:5
consumeB : 2023-05-14 11:30:50 +0800 CST == HelloWorld! times:7
consumeB : 2023-05-14 11:30:50 +0800 CST == HelloWorld! times:9
main goroutine finish

上述运行结果中,consumerA-2 只接收到了一条消息,未符合上文所说的多消费者公平分发规则,原因是两个消费者协程并不是同时启动的,按照队列规则可以知道,A 队列接收到的消息编号为 0、2、4、6、8,A-2 接收到的消息编号为 8,说明 A-2 是在 A-1 已经收到了四条消息之后才启动并进行消费的。消息的打印由于多协程的时间片分配,所以没有按照消费顺序进行排序。

总结

通过本文的介绍,我们了解了 RabbitMQ 的基础知识和核心特性,并掌握了如何在 Go 中使用 RabbitMQ。消息队列的引入确实为我们提供了诸多便利,如解耦、异步处理和负载均衡等,解决了许多复杂的系统问题。

然而,我们也需要认识到,引入消息队列也会带来新的挑战,如系统复杂度的增加、潜在的稳定性问题以及维护成本的提升。因此,是否使用消息队列,需要结合项目的具体需求和场景进行仔细权衡。只有在全面评估其带来的优势和潜在的风险后,才能做出最适合项目发展的决策。

希望通过本文的学习,你能够在设计和实现分布式系统时更加游刃有余,充分发挥消息队列的优势,同时有效应对其带来的挑战。