RabbitMQ(三) 交换器

交换器负责接收客户端传递过来的消息,并转发到对应的队列中。在 RabbitMQ 中支持四种交换器(交换机):

  1. Direct:直连交换器(默认
  2. Fanout:扇形交换器
  3. Topic:主题交换器
  4. Header:首部交换器

Direct 交换器

direct 交换器是 RabbitMQ 的默认交换器。默认进行公平调度:所有接收者依次从消息队列中获取值。Publisher 给哪个队列发消息,消息只给那个单独的队列,不会对交换器绑定的其他队列造成影响。

  • 一个队列绑定多个消费者
  • 使用到的 API
    • org.springframework.amqp.core.Queue:消息队列
    • AmqpTemplate:操作 RabbitMQ 的接口,负责发送或接收消息
    • @RabbitListener(queues = ""):注解某个方法为接收消息方法

Publisher 代码实现

下面演示使用 Direct 交换器的 Publisher 的代码实现。

新建 Spring Boot 项目

  1. 在 IDEA 中创建新 Maven 工程

  2. 导入依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.12</version>
    </parent>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>
    

编写配置文件

编写配置文件 application.yml

spring:
  rabbitmq:
    host: 192.168.80.130
    username: penghao
    password: penghao

创建配置类

注入 Queue,注意所在包是 org.springframework.amqp.core.Queue

package site.penghao.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitmqConfig {

    @Bean
    protected Queue getQueue() {

        return new Queue("myQueue");
    }
}

测试

只需要使用 AmqpTemplate 进行操作即可。

@SpringBootTest
public class MyTest {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void test() {
        amqpTemplate.convertAndSend("myQueue", "hello world!");
        System.out.println("OK");
    }

}

Consumer 代码实现

新建 Spring Boot 项目

类似地,创建 Spring Boot 项目并配置相关信息。

  1. 在 IDEA 中创建新 Maven 工程

  2. 导入依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.12</version>
    </parent>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>
    

编写配置文件

编写配置文件 application.yml

spring:
  rabbitmq:
    host: 192.168.80.130
    username: penghao
    password: penghao

编写接收函数

使用 @RabbitListener 配置接收函数,能在应用启动时自动接收对应队列的值,并在接收信息后自动执行接收函数。

package site.penghao.receive;


import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DemoReceive {

    @RabbitListener(queues = "myQueue")
    public void demo(String msg) {
        System.out.println("Receive1: " + msg);
    }

    @RabbitListener(queues = "myQueue")
    public void demo2(String msg) {
        System.out.println("Receive2: " + msg);
    }
}

启动

直接在 Spring Boot Application 中启动即可,应用会自动接收消息队列中的消息。并且,对于定义的两个接收函数,它们将轮流收到消息,Direct 交换器是公平的

执行效果:

Receive2: message[1]
Receive1: message[0]
Receive1: message[2]
Receive2: message[3]
Receive1: message[4]
Receive2: message[5]
Receive1: message[6]
Receive2: message[7]
Receive1: message[8]
Receive2: message[9]

Fanout 交换器

扇形交换器,实际上做的事情就是广播,fanout 会把消息发送给所有绑定在当前交换器上的队列。且每个队列消息中的第一个 Consumer 能收到消息。

  • 一个交换器需要绑定多个队列。
  • 使用到的 API:
    • FanoutExchange:fanout 交换器
    • Binding:绑定交换器和队列
    • BindingBuilder:Binding 的构建器
    • amq.fanout:内置 fanout 交换器的名称

Publisher 代码实现

新建 Spring Boot 项目

  1. 在 IDEA 中创建新 Maven 工程

  2. 导入依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.12</version>
    </parent>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>
    

编写配置文件

编写配置文件 application.yml

spring:
  rabbitmq:
    host: 192.168.80.130
    username: penghao
    password: penghao

创建配置类

注入 Queue,注入 Fanout 交换器,并且绑定消息队列和交换机。

注意 Queue 所在包是 org.springframework.amqp.core.Queue

@Configuration
public class RabbitmqConfig {
    
    @Bean
    protected Queue queue1() {

        return new Queue("myFanout1");
    }

    @Bean
    protected Queue queue2() {

        return new Queue("myFanout2");
    }

    @Bean
    protected FanoutExchange getFanout() {

        return new FanoutExchange("amq.fanout");
    }

    @Bean
    public Binding binding(Queue queue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }

    @Bean
    public Binding binding2(Queue queue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }
}

测试

使用 AmqpTemplate 进行操作即可。

@SpringBootTest
public class MyTest {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void testFanout() {
        // 第一个参数是 fanout,第二个参数是 routing-key,第三个参数是 msg
        amqpTemplate.convertAndSend("amq.fanout", "core", "fanout msg");

        System.out.println("OK");
    }

}

Consumer 代码实现

新建 Spring Boot 项目

类似地,创建 Spring Boot 项目并配置相关信息。

  1. 在 IDEA 中创建新 Maven 工程

  2. 导入依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.12</version>
    </parent>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>
    

编写配置文件

编写配置文件 application.yml

spring:
  rabbitmq:
    host: 192.168.80.130
    username: penghao
    password: penghao

编写接收函数

使用 @RabbitListener 配置接收函数,能在应用启动时自动接收对应队列的值,并在接收信息后自动执行接收函数。

这里设置两个 fanout 队列的接收函数,都可以接收到 Publisher 发送的每一条消息。

注意:如果 myFanout1 队列同时有两个消费者,那么将只有一个能接收到消息。

package site.penghao.receive;


import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DemoReceive {

    @RabbitListener(queues = "myFanout1")
    public void fanout1(String msg) {
        System.out.println("Fanout1: " + msg);
    }
    
    @RabbitListener(queues = "myFanout2")
    public void fanout2(String msg) {
        System.out.println("Fanout2: " + msg);
    }
}

启动

直接在 Spring Boot Application 中启动即可,应用会自动接收消息队列中的消息。并且,对于定义的两个接收函数,它们将都能收到消息,Fanout 交换器是多播的

执行效果:

Fanout2: fanout msg
Fanout1: fanout msg

Topic 交换器

Topic 交换器允许在路由键(routing-key)中出现匹配规则。路由键的写法和包的写法相同,是site.penghao.XXX.YYY 格式,绑定时可以带有特殊符号,例如 * 表示任意一个单词(不包含.# 表示 0 个或多个字符(包含.。对于接收方而言,topic 依旧是公平调度的

  • 使用到的 API:
    • TopicExchange:Topic 交换器
    • amq.topic:内置 topic 交换器名称

Publisher 代码实现

新建 Spring Boot 项目

  1. 在 IDEA 中创建新 Maven 工程

  2. 导入依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.12</version>
    </parent>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>
    

编写配置文件

编写配置文件 application.yml

spring:
  rabbitmq:
    host: 192.168.80.130
    username: penghao
    password: penghao

创建配置类

注入 Queue,注入 Topic 交换器,绑定消息队列和交换机并设置路由键,路由键的格式为 com.example.XXX.YYY

注意 Queue 所在包是 org.springframework.amqp.core.Queue

@Configuration
public class RabbitmqConfig {
    
    @Bean
    public Queue topicQueue1() {

        return new Queue("topic1");
    }

    @Bean
    public Queue topicQueue2() {

        return new Queue("topic2");
    }

    @Bean
    public TopicExchange topicExchange() {

        return new TopicExchange("amq.topic");
    }

    @Bean
    public Binding topicBinding1(Queue topicQueue1, TopicExchange topicExchange) {

        return BindingBuilder.bind(topicQueue1).to(topicExchange)
                .with("site.penghao.*");
    }
    @Bean
    public Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange) {

        return BindingBuilder.bind(topicQueue2).to(topicExchange)
                .with("site.penghao.a");
    }
}

测试

使用 AmqpTemplate 进行操作即可,发送时也要配置路由键,相应的,对于 Topic Exchange 绑定的所有队列,如果其对应的路由键能匹配所给的路由键,就会将消息发送给对应的队列。

@SpringBootTest
public class MyTest {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void testTopic() {
        // 第一个参数是 fanout,第二个参数是 routing-key,第三个参数是 msg
        amqpTemplate.convertAndSend("amq.topic", "site.penghao.a", "topic msg");

        System.out.println("OK");
    }

}

Consumer 代码实现

新建 Spring Boot 项目

类似地,创建 Spring Boot 项目并配置相关信息。

  1. 在 IDEA 中创建新 Maven 工程

  2. 导入依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.12</version>
    </parent>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>
    

编写配置文件

编写配置文件 application.yml

spring:
  rabbitmq:
    host: 192.168.80.130
    username: penghao
    password: penghao

编写接收函数

使用 @RabbitListener 配置接收函数,能在应用启动时自动接收对应队列的值,并在接收信息后自动执行接收函数。

这里设置两个 topic 队列的接收函数,它们可以接收到与自己路由键相匹配的消息

注意:如果 topic1 队列同时有多个消费者,那么将遵循公平分配原则

package site.penghao.receive;


import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DemoReceive {

    @RabbitListener(queues = "topic1")
    public void topic1(String msg) {
        System.out.println("Topic1[ site.penghao.* ]: " + msg);
    }

    @RabbitListener(queues = "topic2")
    public void topic2(String msg) {
        System.out.println("Topic1[ site.penghao.a ]: " + msg);
    }
}

启动

直接在 Spring Boot Application 中启动即可,应用会自动接收消息队列中的消息。并且,对于定义的多个接收函数,它们能接收相应队列的路由键所匹配的消息。

执行效果:

Topic1[ site.penghao.* ]: topic msg
Topic1[ site.penghao.a ]: topic msg

如果发送信息改为:

@Test
public void testTopic() {
    // 第一个参数是 fanout,第二个参数是 routing-key,第三个参数是 msg
    amqpTemplate.convertAndSend("amq.topic", "site.penghao.b", "topic msg");

    System.out.println("OK");
}

执行效果为:

Topic1[ site.penghao.* ]: topic msg

注意:如果 routing-key 改为 site.penghao.a.b,将能匹配 site.penghao.#不能匹配site.penghao.*

小结

  • Direct 交换器是 Rabbit 的默认交换器,直接发送给某个消息队列,对于多个消费者进行公平调度。
  • Fanout 交换器是基于广播的交换器,会把消息发送给交换器绑定的所有消息队列。
  • Topic 交换器是基于匹配规则的交换器,会把消息发送给绑定的能跟消息的路由键相匹配的所有消息队列。
  • 路由键的格式是 com.example.XXX.YYY,支持使用 * 匹配一个单词(两个句点之间的内容),或者使用 # 匹配一个或多个字符(可以匹配句点)。