1、交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。三种常用的交换器类型,a、direct(发布与订阅 完全匹配)。b、fanout(广播)。c、topic(主题,规则匹配)。
2、direct(发布与订阅 完全匹配)的使用。
由于使用的是SpringBoot项目结合Maven项目构建的。项目工程如下所示:
3、生产者模块和消费者模块分开的,但是pom. 是一致的,如下所示:
1 <? version="1.0" encoding="UTF-8"?> 2 <project ns="http://maven.apache.org/POM/4.0.0" 3 ns:xsi="http://www.w3.org/2001/ Schema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 5 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 6 <modelVersion>4.0.0</modelVersion> 7 <parent> 8 <groupId>org.spring work.boot</groupId> 9 <artifactId>spring-boot-starter-parent</artifactId> 10 <version>2.1.1.RELEASE</version> 11 <relativePath /> <!-- lookup parent from repository --> 12 </parent> 13 <groupId>com.bie</groupId> 14 <artifactId>rabbitmq-direct-provider</artifactId> 15 <version>0.0.1-SNAPSHOT</version> 16 <name>rabbitmq-direct-provider</name> 17 <de ion>Demo project for Spring Boot</de ion> 18 19 <properties> 20 <java.version>1.8</java.version> 21 </properties> 22 23 <dependencies> 24 <dependency> 25 <groupId>org.spring work.boot</groupId> 26 <artifactId>spring-boot-starter</artifactId> 27 </dependency> 28 <dependency> 29 <groupId>org.spring work.boot</groupId> 30 <artifactId>spring-boot-starter-web</artifactId> 31 </dependency> 32 <dependency> 33 <groupId>org.spring work.boot</groupId> 34 <artifactId>spring-boot-starter-test</artifactId> 35 <scope>test</scope> 36 </dependency> 37 <dependency> 38 <groupId>org.spring work.boot</groupId> 39 <artifactId>spring-boot-starter-amqp</artifactId> 40 </dependency> 41 </dependencies> 42 43 <build> 44 <plugins> 45 <plugin> 46 <groupId>org.spring work.boot</groupId> 47 <artifactId>spring-boot-maven-plugin</artifactId> 48 </plugin> 49 </plugins> 50 </build> 51 52 </project>
配置生产者的配置文件application.properties。配置如下所示:
1 # 给当前项目起名称. 2 spring.application.name=rabbitmq-direct-provider 3 4 # 配置端口号 5 server.port=8081 6 7 8 # 配置rabbitmq的参数. 9 # rabbitmq服务器的ip地址. 10 spring.rabbitmq.host=192.168.110.133 11 # rabbitmq的端口号5672,区别于浏览器访问界面的15672端口号. 12 spring.rabbitmq.port=5672 13 # rabbitmq的账号. 14 spring.rabbitmq.username=guest 15 # rabbitmq的密码. 16 spring.rabbitmq.password=guest 17 18 # 设置交换器的名称,方便修改. 19 # 生产者和消费者的交换器的名称是一致的,这样生产者生产的消息发送到交换器,消费者可以从这个交换器中消费. 20 rabbitmq.config.exchange=log.exchange.direct 21 22 # 生产者生产消息的时候也要带上路由键,队列通过路由键绑定到交换器,交换器根据路由键将绑定到队列上. 23 # 交换器根据不同的路由键将消息发送到不同队列上. 24 # info的路由键. 25 rabbitmq.config.queue.info.routing.key=log.info.routing.key 26 27 # error的路由键. 28 rabbitmq.config.queue.error.routing.key=log.error.routing.key
配置完毕,配置文件开始写生产者生产消息代码。
本模块练习的是发布订阅模式即Direct,分为两个生产者LogInfo、LogError,生产者生产消息的时候也要带上路由键,队列通过路由键绑定到交换器(即交换器根据路由键将绑定到队列上),交换器根据不同的路由键将消息发送到不同队列上。
本项目指定了info的路由键、error的路由键,然后生产者生产的消息发送到指定的交换器。交换器通过路由到绑定的队列中去,最后消费者进行监听队列发生变化,触发指定的方法进行消息的消费。
1 package com.example.bie.provider;
2
3 import org.spring work.amqp.core.AmqpTemplate;
4 import org.spring work.beans.factory.annotation.Autowired;
5 import org.spring work.beans.factory.annotation.Value;
6 import org.spring work.stereotype.Component;
7
8 /**
9 *
10 * @author biehl
11 *
12 * 生产者,生产消息同样需要知道向那个交换器Exchange发送消息的.
13 *
14 * 这里使用的交换器类型使用的是direct发布订阅模式,
15 * 根据配置的路由routing-key来决定,将不同的消息路由到不同的队列queue中。
16 * 不同的消息具有相同的路由键,就会进入相同的队列当中去。
17 *
18 *
19 */
20 @Component
21 public class RabbitMqLogInfoProduce {
22
23 @Autowired
24 private AmqpTemplate rabbitmqAmqpTemplate;
25
26 // 交换器的名称Exchange
27 @Value(value = "${rabbitmq.config.exchange}")
28 private String exchange;
29
30 // 路由键routingkey
31 @Value(value = "${rabbitmq.config.queue.info.routing.key}")
32 private String routingKey;
33
34 /**
35 * 发送消息的方法
36 *
37 * @param msg
38 */
39 public void producer(String msg) {
40 // 向消息队列发送消息
41 // 参数1,交换器的名称
42 // 参数2,路由键
43 // 参数3,消息
44 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKey, msg);
45 }
46
47 } 1 package com.example.bie.provider;
2
3 import org.spring work.amqp.core.AmqpTemplate;
4 import org.spring work.beans.factory.annotation.Autowired;
5 import org.spring work.beans.factory.annotation.Value;
6 import org.spring work.stereotype.Component;
7
8 /**
9 *
10 * @author biehl
11 *
12 * 生产者,生产消息同样需要知道向那个交换器Exchange发送消息的.
13 *
14 * 这里使用的交换器类型使用的是direct发布订阅模式,
15 * 根据配置的路由routing-key来决定,将不同的消息路由到不同的队列queue中。
16 * 不同的消息具有相同的路由键,就会进入相同的队列当中去。
17 *
18 *
19 */
20 @Component
21 public class RabbitMqLogErrorProduce {
22
23 @Autowired
24 private AmqpTemplate rabbitmqAmqpTemplate;
25
26 // 交换器的名称Exchange
27 @Value(value = "${rabbitmq.config.exchange}")
28 private String exchange;
29
30 // 路由键routingkey
31 @Value(value = "${rabbitmq.config.queue.error.routing.key}")
32 private String routingKey;
33
34 /**
35 * 发送消息的方法
36 *
37 * @param msg
38 */
39 public void producer(String msg) {
40 // 向消息队列发送消息
41 // 参数1,交换器的名称
42 // 参数2,路由键
43 // 参数3,消息
44 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKey, msg);
45 }
46 }这里使用web工程,浏览器访问调用,方便测试。
1 package com.example.bie.controller;
2
3 import org.spring work.beans.factory.annotation.Autowired;
4 import org.spring work.stereotype.Controller;
5 import org.spring work.web.bind.annotation.RequestMapping;
6 import org.spring work.web.bind.annotation.ResponseBody;
7
8 import com.example.bie.provider.RabbitMqLogErrorProduce;
9 import com.example.bie.provider.RabbitMqLogInfoProduce;
10
11 /**
12 *
13 * @author biehl
14 *
15 */
16 @Controller
17 public class RabbitmqController {
18
19 @Autowired
20 private RabbitMqLogInfoProduce rabbitMqLogInfoProduce;
21
22 @Autowired
23 private RabbitMqLogErrorProduce rabbitMqLogErrorProduce;
24
25 @RequestMapping(value = "/logInfo")
26 @ResponseBody
27 public String rabbitmqSendLogInfoMessage() {
28 String msg = "生产者===>生者的LogInfo消息message: ";
29 for (int i = 0; i < 100000; i++) {
30 rabbitMqLogInfoProduce.producer(msg + i);
31 }
32 return "生产===> LogInfo消息message ===> success!!!";
33 }
34
35 @RequestMapping(value = "/logError")
36 @ResponseBody
37 public String rabbitmqSendLogErrorMessage() {
38 String msg = "生产者===>生者的LogError消息message: ";
39 for (int i = 0; i < 100000; i++) {
40 rabbitMqLogErrorProduce.producer(msg + i);
41 }
42 return "生产===> LogError消息message ===> success!!!";
43 }
44
45 }生产者的启动类,如下所示:
1 package com.example;
2
3 import org.spring work.boot.SpringApplication;
4 import org.spring work.boot.autoconfigure.SpringBootApplication;
5
6 @SpringBootApplication
7 public class RabbitmqProducerApplication {
8
9 public static void main(String[] args) {
10 SpringApplication.run(RabbitmqProducerApplication.class, args);
11 }
12
13 }4、生产者搞完以后,开始搞消费者。由于pom. 配置文件一致,这里省略消费者的pom. 配置文件。
1 # 给当前项目起名称. 2 spring.application.name=rabbitmq-direct-consumer 3 4 # 配置端口号 5 server.port=8080 6 7 # 配置rabbitmq的参数. 8 # rabbitmq服务器的ip地址. 9 spring.rabbitmq.host=192.168.110.133 10 # rabbitmq的端口号5672,区别于浏览器访问界面的15672端口号. 11 spring.rabbitmq.port=5672 12 # rabbitmq的账号. 13 spring.rabbitmq.username=guest 14 # rabbitmq的密码. 15 spring.rabbitmq.password=guest 16 17 # 设置交换器的名称,方便修改. 18 # 路由键是将交换器和队列进行绑定的,队列通过路由键绑定到交换器. 19 rabbitmq.config.exchange=log.exchange.direct 20 21 # info级别的队列名称. 22 rabbitmq.config.queue.info=log.info.queue 23 # info的路由键. 24 rabbitmq.config.queue.info.routing.key=log.info.routing.key 25 26 # error级别的队列名称. 27 rabbitmq.config.queue.error=log.error.queue 28 # error的路由键. 29 rabbitmq.config.queue.error.routing.key=log.error.routing.key
消费者消费消息的编写,如下所示:
1 package com.example.bie.consumer;
2
3 import org.spring work.amqp.core.ExchangeTypes;
4 import org.spring work.amqp.rabbit.annotation.Exchange;
5 import org.spring work.amqp.rabbit.annotation.Queue;
6 import org.spring work.amqp.rabbit.annotation.QueueBinding;
7 import org.spring work.amqp.rabbit.annotation.RabbitHandler;
8 import org.spring work.amqp.rabbit.annotation.RabbitListener;
9 import org.spring work.stereotype.Component;
10
11 /**
12 *
13 * @author biehl
14 *
15 * 消息接收者
16 *
17 * 1、@RabbitListener bindings:绑定队列
18 *
19 * 2、@QueueBinding
20 * value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器
21 *
22 * 3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列
23 *
24 * 4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型
25 *
26 *
27 */
28 @Component
29 @RabbitListener(bindings = @QueueBinding(
30
31 value = @Queue(value = "${rabbitmq.config.queue.error}", autoDelete = "true"),
32
33 exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.DIRECT),
34
35 key = "${rabbitmq.config.queue.error.routing.key}"))
36 public class LogErrorConsumer {
37
38 /**
39 * 接收消息的方法,采用消息队列监听机制.
40 *
41 * @RabbitHandler意思是将注解@RabbitListener配置到类上面
42 *
43 * @RabbitHandler是指定这个方法可以进行消息的接收并且消费.
44 *
45 * @param msg
46 */
47 @RabbitHandler
48 public void consumer(String msg) {
49 // 打印消息
50 System.out.println("ERROR消费者===>消费<===消息message: " + msg);
51 }
52
53 } 1 package com.example.bie.consumer;
2
3 import org.spring work.amqp.core.ExchangeTypes;
4 import org.spring work.amqp.rabbit.annotation.Exchange;
5 import org.spring work.amqp.rabbit.annotation.Queue;
6 import org.spring work.amqp.rabbit.annotation.QueueBinding;
7 import org.spring work.amqp.rabbit.annotation.RabbitHandler;
8 import org.spring work.amqp.rabbit.annotation.RabbitListener;
9 import org.spring work.stereotype.Component;
10
11 /**
12 *
13 * @author biehl
14 *
15 * 消息接收者
16 *
17 * 1、@RabbitListener bindings:绑定队列
18 *
19 * 2、@QueueBinding
20 * value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器
21 *
22 * 3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列
23 *
24 * 4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型
25 *
26 *
27 */
28 @Component
29 @RabbitListener(bindings = @QueueBinding(
30
31 value = @Queue(value = "${rabbitmq.config.queue.info}", autoDelete = "true"),
32
33 exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.DIRECT),
34
35 key = "${rabbitmq.config.queue.info.routing.key}"))
36 public class LogInfoConsumer {
37
38 /**
39 * 接收消息的方法,采用消息队列监听机制.
40 *
41 * @RabbitHandler意思是将注解@RabbitListener配置到类上面
42 *
43 * @RabbitHandler是指定这个方法可以进行消息的接收并且消费.
44 *
45 * @param msg
46 */
47 @RabbitHandler
48 public void consumer(String msg) {
49 // 打印消息
50 System.out.println("INFO消费者===>消费: " + msg);
51 }
52
53 }消费者启动主类如下所示:
1 package com.example;
2
3 import org.spring work.boot.SpringApplication;
4 import org.spring work.boot.autoconfigure.SpringBootApplication;
5
6 @SpringBootApplication
7 public class RabbitmqConsumerApplication {
8
9 public static void main(String[] args) {
10 SpringApplication.run(RabbitmqConsumerApplication.class, args);
11 }
12
13 }5、发布订阅模式,生产者生产消息,消费者消费消息,运行效果如下所示:
首先启动你消费者消费消息的启动类,再启动你的生产者生产消息的启动类。
继续阅读与本文标签相同的文章
上一篇 :
1.1 云原生历史
-
十年磨一剑:从2009启动“去IOE”工程到2019年OceanBase拿下TPC-C世界第一
2026-05-16栏目: 教程
-
爬了各大搜索引擎,2019年Java面试题(集合+并发+调优+微服务)
2026-05-16栏目: 教程
-
Java修饰符
2026-05-16栏目: 教程
-
一个合格的程序员,需要哪些必备技能?
2026-05-16栏目: 教程
-
为什么Java字符串是不可变对象?
2026-05-16栏目: 教程
