交换器负责接收客户端传递过来的消息,并转发到对应的队列中。在 RabbitMQ 中支持四种交换器(交换机):
- Direct:直连交换器(默认)
- Fanout:扇形交换器
- Topic:主题交换器
- Header:首部交换器
Direct 交换器
direct 交换器是 RabbitMQ 的默认交换器。默认进行公平调度:所有接收者依次从消息队列中获取值。Publisher 给哪个队列发消息,消息只给那个单独的队列,不会对交换器绑定的其他队列造成影响。
- 一个队列绑定多个消费者
- 使用到的 API
org.springframework.amqp.core.Queue
:消息队列AmqpTemplate
:操作 RabbitMQ 的接口,负责发送或接收消息@RabbitListener(queues = "")
:注解某个方法为接收消息方法
Publisher 代码实现
下面演示使用 Direct 交换器的 Publisher 的代码实现。
新建 Spring Boot 项目
-
在 IDEA 中创建新 Maven 工程
-
导入依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.12</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
编写配置文件
编写配置文件 application.yml
spring:
rabbitmq:
host: 192.168.80.130
username: penghao
password: penghao
创建配置类
注入 Queue,注意所在包是 org.springframework.amqp.core.Queue
。
package site.penghao.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
@Bean
protected Queue getQueue() {
return new Queue("myQueue");
}
}
测试
只需要使用 AmqpTemplate 进行操作即可。
@SpringBootTest
public class MyTest {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void test() {
amqpTemplate.convertAndSend("myQueue", "hello world!");
System.out.println("OK");
}
}
Consumer 代码实现
新建 Spring Boot 项目
类似地,创建 Spring Boot 项目并配置相关信息。
-
在 IDEA 中创建新 Maven 工程
-
导入依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.12</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
编写配置文件
编写配置文件 application.yml
spring:
rabbitmq:
host: 192.168.80.130
username: penghao
password: penghao
编写接收函数
使用 @RabbitListener
配置接收函数,能在应用启动时自动接收对应队列的值,并在接收信息后自动执行接收函数。
package site.penghao.receive;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DemoReceive {
@RabbitListener(queues = "myQueue")
public void demo(String msg) {
System.out.println("Receive1: " + msg);
}
@RabbitListener(queues = "myQueue")
public void demo2(String msg) {
System.out.println("Receive2: " + msg);
}
}
启动
直接在 Spring Boot Application 中启动即可,应用会自动接收消息队列中的消息。并且,对于定义的两个接收函数,它们将轮流收到消息,Direct 交换器是公平的。
执行效果:
Receive2: message[1]
Receive1: message[0]
Receive1: message[2]
Receive2: message[3]
Receive1: message[4]
Receive2: message[5]
Receive1: message[6]
Receive2: message[7]
Receive1: message[8]
Receive2: message[9]
Fanout 交换器
扇形交换器,实际上做的事情就是广播,fanout 会把消息发送给所有绑定在当前交换器上的队列。且每个队列消息中的第一个 Consumer 能收到消息。
- 一个交换器需要绑定多个队列。
- 使用到的 API:
- FanoutExchange:fanout 交换器
- Binding:绑定交换器和队列
- BindingBuilder:Binding 的构建器
- amq.fanout:内置 fanout 交换器的名称
Publisher 代码实现
新建 Spring Boot 项目
-
在 IDEA 中创建新 Maven 工程
-
导入依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.12</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
编写配置文件
编写配置文件 application.yml
spring:
rabbitmq:
host: 192.168.80.130
username: penghao
password: penghao
创建配置类
注入 Queue,注入 Fanout 交换器,并且绑定消息队列和交换机。
注意 Queue 所在包是
org.springframework.amqp.core.Queue
。
@Configuration
public class RabbitmqConfig {
@Bean
protected Queue queue1() {
return new Queue("myFanout1");
}
@Bean
protected Queue queue2() {
return new Queue("myFanout2");
}
@Bean
protected FanoutExchange getFanout() {
return new FanoutExchange("amq.fanout");
}
@Bean
public Binding binding(Queue queue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
@Bean
public Binding binding2(Queue queue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
}
测试
使用 AmqpTemplate 进行操作即可。
@SpringBootTest
public class MyTest {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void testFanout() {
// 第一个参数是 fanout,第二个参数是 routing-key,第三个参数是 msg
amqpTemplate.convertAndSend("amq.fanout", "core", "fanout msg");
System.out.println("OK");
}
}
Consumer 代码实现
新建 Spring Boot 项目
类似地,创建 Spring Boot 项目并配置相关信息。
-
在 IDEA 中创建新 Maven 工程
-
导入依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.12</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
编写配置文件
编写配置文件 application.yml
spring:
rabbitmq:
host: 192.168.80.130
username: penghao
password: penghao
编写接收函数
使用 @RabbitListener
配置接收函数,能在应用启动时自动接收对应队列的值,并在接收信息后自动执行接收函数。
这里设置两个 fanout 队列的接收函数,都可以接收到 Publisher 发送的每一条消息。
注意:如果 myFanout1 队列同时有两个消费者,那么将只有一个能接收到消息。
package site.penghao.receive;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DemoReceive {
@RabbitListener(queues = "myFanout1")
public void fanout1(String msg) {
System.out.println("Fanout1: " + msg);
}
@RabbitListener(queues = "myFanout2")
public void fanout2(String msg) {
System.out.println("Fanout2: " + msg);
}
}
启动
直接在 Spring Boot Application 中启动即可,应用会自动接收消息队列中的消息。并且,对于定义的两个接收函数,它们将都能收到消息,Fanout 交换器是多播的。
执行效果:
Fanout2: fanout msg
Fanout1: fanout msg
Topic 交换器
Topic 交换器允许在路由键(routing-key)中出现匹配规则。路由键的写法和包的写法相同,是site.penghao.XXX.YYY
格式,绑定时可以带有特殊符号,例如 *
表示任意一个单词(不包含.
),#
表示 0 个或多个字符(包含.
)。对于接收方而言,topic 依旧是公平调度的。
- 使用到的 API:
- TopicExchange:Topic 交换器
- amq.topic:内置 topic 交换器名称
Publisher 代码实现
新建 Spring Boot 项目
-
在 IDEA 中创建新 Maven 工程
-
导入依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.12</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
编写配置文件
编写配置文件 application.yml
spring:
rabbitmq:
host: 192.168.80.130
username: penghao
password: penghao
创建配置类
注入 Queue,注入 Topic 交换器,绑定消息队列和交换机并设置路由键,路由键的格式为 com.example.XXX.YYY
。
注意 Queue 所在包是
org.springframework.amqp.core.Queue
。
@Configuration
public class RabbitmqConfig {
@Bean
public Queue topicQueue1() {
return new Queue("topic1");
}
@Bean
public Queue topicQueue2() {
return new Queue("topic2");
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("amq.topic");
}
@Bean
public Binding topicBinding1(Queue topicQueue1, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue1).to(topicExchange)
.with("site.penghao.*");
}
@Bean
public Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue2).to(topicExchange)
.with("site.penghao.a");
}
}
测试
使用 AmqpTemplate 进行操作即可,发送时也要配置路由键,相应的,对于 Topic Exchange 绑定的所有队列,如果其对应的路由键能匹配所给的路由键,就会将消息发送给对应的队列。
@SpringBootTest
public class MyTest {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void testTopic() {
// 第一个参数是 fanout,第二个参数是 routing-key,第三个参数是 msg
amqpTemplate.convertAndSend("amq.topic", "site.penghao.a", "topic msg");
System.out.println("OK");
}
}
Consumer 代码实现
新建 Spring Boot 项目
类似地,创建 Spring Boot 项目并配置相关信息。
-
在 IDEA 中创建新 Maven 工程
-
导入依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.12</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
编写配置文件
编写配置文件 application.yml
spring:
rabbitmq:
host: 192.168.80.130
username: penghao
password: penghao
编写接收函数
使用 @RabbitListener
配置接收函数,能在应用启动时自动接收对应队列的值,并在接收信息后自动执行接收函数。
这里设置两个 topic 队列的接收函数,它们可以接收到与自己路由键相匹配的消息。
注意:如果 topic1 队列同时有多个消费者,那么将遵循公平分配原则。
package site.penghao.receive;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DemoReceive {
@RabbitListener(queues = "topic1")
public void topic1(String msg) {
System.out.println("Topic1[ site.penghao.* ]: " + msg);
}
@RabbitListener(queues = "topic2")
public void topic2(String msg) {
System.out.println("Topic1[ site.penghao.a ]: " + msg);
}
}
启动
直接在 Spring Boot Application 中启动即可,应用会自动接收消息队列中的消息。并且,对于定义的多个接收函数,它们能接收相应队列的路由键所匹配的消息。
执行效果:
Topic1[ site.penghao.* ]: topic msg
Topic1[ site.penghao.a ]: topic msg
如果发送信息改为:
@Test
public void testTopic() {
// 第一个参数是 fanout,第二个参数是 routing-key,第三个参数是 msg
amqpTemplate.convertAndSend("amq.topic", "site.penghao.b", "topic msg");
System.out.println("OK");
}
执行效果为:
Topic1[ site.penghao.* ]: topic msg
注意:如果 routing-key 改为
site.penghao.a.b
,将能匹配site.penghao.#
,不能匹配site.penghao.*
小结
- Direct 交换器是 Rabbit 的默认交换器,直接发送给某个消息队列,对于多个消费者进行公平调度。
- Fanout 交换器是基于广播的交换器,会把消息发送给交换器绑定的所有消息队列。
- Topic 交换器是基于匹配规则的交换器,会把消息发送给绑定的能跟消息的路由键相匹配的所有消息队列。
- 路由键的格式是
com.example.XXX.YYY
,支持使用*
匹配一个单词(两个句点之间的内容),或者使用#
匹配一个或多个字符(可以匹配句点)。