学科分类
目录
Spring Boot开发

Publish/Subscribe(发布订阅模式)

Spring Boot整合RabbitMQ中间件实现消息服务,主要围绕三个部分的工作进行展开:定制中间件、消息发送者发送消息、消息消费者接收消息,其中,定制中间件是比较麻烦的工作,且必须预先定制。下面,以用户注册成功后同时发送邮件通知和短信通知这一场景为例,分别使用基于API、基于配置类和基于注解这三种方式实现Publish/Subscribe工作模式的整合。

1.基于API的方式

基于API的方式主要讲的是使用Spring框架提供的API管理类AmqpAdmin定制消息发送组件,并进行消息发送。这种定制消息发送组件的方式与在RabbitMQ可视化界面上通过对应面板进行组件操作的实现基本一样,都是通过管理员的身份,预先手动声明交换器、队列、路由键等,然后进行消息队列组装,来供应用程序调用,从而实现消息服务。下面,对这种基于API的方式进行讲解和演示。

(1)使用AmqpAdmin定制消息发送组件

打开chapter08项目的测试类Chapter08ApplicationTests,在该测试类中先引入AmqpAdmin管理类定制Publish/Subscribe工作模式所需的消息组件,内容如文件1所示。

文件1 Chapter08ApplicationTests.java

 1  import org.junit.Test;
 2  import org.junit.runner.RunWith;
 3  import org.springframework.amqp.core.*;
 4  import org.springframework.beans.factory.annotation.Autowired;
 5  import org.springframework.boot.test.context.SpringBootTest;
 6  import org.springframework.test.context.junit4.SpringRunner;
 7  @RunWith(SpringRunner.class)
 8  @SpringBootTest
 9  public class Chapter08ApplicationTests {
 10     @Autowired
 11     private AmqpAdmin amqpAdmin;
 12     @Test
 13     public void amqpAdmin() {
 14         // 1、定义fanout类型的交换器
 15         amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
 16         // 2、定义两个默认持久化队列,分别处理email和sms
 17         amqpAdmin.declareQueue(new Queue("fanout_queue_email"));
 18         amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));
 19         // 3、将队列分别与交换器进行绑定
 20         amqpAdmin.declareBinding(new Binding("fanout_queue_email",
 21                                 Binding.DestinationType.QUEUE,"fanout_exchange","",null));
 22         amqpAdmin.declareBinding(new Binding("fanout_queue_sms",
 23                                 Binding.DestinationType.QUEUE,"fanout_exchange","",null));
 24     }
 25 }

在文件1中,使用Spring框架提供的消息管理组件AmqpAdmin定制了消息组件。其中,第15行定义了一个fanout类型的交换器fanout_exchange;第17-18行定义了两个消息队列fanout_queue_email和fanout_queue_sms,分别用来处理邮件信息和短信信息;第20-23行,将定义的两个队列分别与交换器绑定。

执行上述单元测试方法amqpAdmin(),验证RabbitMQ消息组件的定制效果。单元测试方法执行成功后,通过RabbitMQ可视化管理页面的Exchanges面板查看效果,结果如图1所示。

图1 AmqpAdmin定制消息组件效果

从图1可以看出,在RabbitMQ可视化管理页面的Exchanges面板中新出现了一个名称为fanout_exchange的交换器(其他7个交换器是RabbitMQ自带的),且其类型是设置的fanout类型。单击fanout_exchange交换器进入查看,效果如图2所示。

图2 AmqpAdmin定制消息组件效果

从图2可以看出,在fanout_exchange交换器详情页面中,展示有该交换器的具体信息,还有与之绑定的两个消息队列fanout_queue_email和fanout_queue_sms,并且程序中设置的绑定规则一致。切换到Queues面板页面,查看定制生成的消息队列信息,效果如图3所示。

图3 AmqpAdmin定制消息组件效果

从图3可以看出,在Queues队列面板页面中,展示有定制的消息队列信息,这与程序中定制的消息队列一致。读者可以单击消息队列名称查看每个队列的详情。

通过上述操作可以发现,在管理页面中提供了消息组件交换器、队列的定制功能。在程序中使用Spring框架提供的管理员API组件AmqpAdmin定制消息组件和在管理页面上手动定制消息组件的本质是一样的。

(2)消息发送者发送消息

完成消息组件的定制工作后,创建消息发送者发送消息到消息队列中。在进行发送消息时,将会定制一个实体类进行消息传递,需要预先创建一个实体类对象。

首先,在chapter08项目中创建名为com.itheima.domain的包,并在该包下创建一个实体类User,内容如文件2所示。

文件2 User.java

 1  public class User {
 2      private Integer id;
 3      private String username;
 4      // 省略属性getXX()和setXX()方法
 5      // 省略toString()方法
 6  }

其次,在项目测试类Chapter08ApplicationTests中使用Spring框架提供的RabbitTemplate模板类实现消息发送,示例代码如下。

@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void psubPublisher() {
    User user=new User();
    user.setId(1);
    user.setUsername("石头");
    rabbitTemplate.convertAndSend("fanout_exchange","",user);
}

上述代码中,先使用@Autowired注解引入了进行消息中间件管理的RabbitTemplate组件对象,然后使用该模板工具类的convertAndSend(String exchange, String routingKey, Object object)方法进行消息发布。其中,convertAndSend(String exchange, String routingKey, Object object)方法中的第一个参数表示发送消息的交换器,这个参数值要与之前定制的交换器名称一致;第二个参数表示路由键,因为实现的是Publish/Subscribe工作模式,所以不需要指定;第三个参数是发送的消息内容,接收Object类型。

然后,执行上述消息发送的测试方法psubPublisher(),控制执行效果如图4所示。

图4 消息发送执行效果

从图4可以看出,发送实体类对象消息时程序发生异常,从异常信息“SimpleMessageConverter only supports String, byte[] and Serializable payloads”可以看出,进行消息发送过程中默认使用了SimpleMessageConverter转换器进行消息转换存储,该转换器只支持Spring、byte[]和Serializable序列化后的消息,而测试类中发送的是User实体类对象消息,所以发生异常。

实际开发中,还会发送实体类消息,针对这种需求可以有两种解决方案:第一种方式,直接将实体类实现JDK自带的Serializable序列化接口;第二种方式,定制其他类型的消息转换器,例如JSON格式的消息转换器Jackson2JsonMessageConverter。这两种实现方式都可行,只不过第一种方式实现后可视化效果很差,转换后的消息无法辨识,因此后续将使用第二种方式,使用较为熟悉的JSON格式消息转换器来替换默认转换器。

在chapter08项目中创建名为com.itheima.config的包,并在该包下创建一个RabbitMQ消息配置类RabbitMQConfig,内容如文件3所示。

文件3 RabbitMQConfig.java

 1  import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
 2  import org.springframework.amqp.support.converter.MessageConverter;
 3  import org.springframework.context.annotation.Bean;
 4  import org.springframework.context.annotation.Configuration;
 5  @Configuration
 6  public class RabbitMQConfig {
 7      @Bean
 8      public MessageConverter messageConverter(){
 9          return new Jackson2JsonMessageConverter();
 10     }
 11 }

文件3中,创建了一个RabbitMQ消息配置类RabbitMQConfig,并在该配置类中通过@Bean注解自定义了一个Jackson2JsonMessageConverter类型的消息转换器组件,该组件的返回值必须为MessageConverter类型。

最后,再次执行上述消息发送的测试方法psubPublisher(),此次测试方法将会执行成功,同时查看RabbitMQ可视化管理页面Queues面板信息,效果如图5所示。

图5 消息队列信息

从图5可以看出,消息发送完成后,Publish/Subscribe工作模式下绑定的两个消息队列中各自拥有一条待接收的消息,由于目前尚未提供消息消费者,所以刚才测试类发送的消息会暂存在每一个队列中。此时,可以单击某个队列详情页面,查看具体的消息信息,效果如图6所示。

图6 消息队列详情

从图6可以看出,在消息队列中存储有指定发送的消息详情和其他参数信息,这与程序指定发送的信息完全一致。

(3)消息消费者接收消息

在chapter08项目中创建名为com.itheima.service的包,并在该包下创建一个针对RabbitMQ消息中间件进行消息接收和处理的业务类RabbitMQService,内容如文件4所示。

文件4 RabbitMQService.java

 1  import org.springframework.amqp.core.Message;
 2  import org.springframework.amqp.rabbit.annotation.RabbitListener;
 3  import org.springframework.stereotype.Service;
 4  @Service
 5  public class RabbitMQService {
 6      /**
 7       * Publish/Subscribe工作模式接收,处理邮件业务
 8       */
 9      @RabbitListener(queues = "fanout_queue_email")
 10     public void psubConsumerEmail(Message message) {
 11         byte[] body = message.getBody();
 12         String s = new String(body);
 13         System.out.println("邮件业务接收到消息: "+s);
 14 
 15     }
 16     /**
 17      * Publish/Subscribe工作模式接收,处理短信业务
 18      */
 19     @RabbitListener(queues = "fanout_queue_sms")
 20     public void psubConsumerSms(Message message) {
 21         byte[] body = message.getBody();
 22         String s = new String(body);
 23         System.out.println("短信业务接收到消息: "+s);
 24     }
 25 }

文件4中,创建了一个接收处理RabbitMQ消息的业务处理类RabbitMQService,在该类中使用Spring框架提供的@RabbitListener注解分别监听队列名称为fanout_queue_email和fanout_queue_sms的消息进行处理,监听的这两个名称的队列是前面指定发送并存储消息的消息队列。

需要说明的是,使用@RabbitListener注解监听队列消息后,一旦服务启动且监听到指定的队列中有消息存在(目前两个队列中各有一条相同的消息),对应注解的方法会立即接收并消费队列中的消息。另外,在接收消息的方法中,参数类型可以与发送的消息类型保持一致,或者使用Object类型和Message类型。如果使用与消息类型对应的参数接收消息的话,只能够得到具体的消息体信息;如果使用Object或者Message类型参数接收消息的话,还可以获得除了消息体外的消息参数信息MessageProperties。

启动chapter08项目来监听并接收消息队列中的消息。程序启动成功后,立即查看控制台打印结果,效果如图7所示。

图7 消息消费效果

从图7可以看出,项目启动成功后,消息消费者监听到消息队列中存在的两条消息,并进行了各自的消费。与此同时,通过RabbitMQ可视化管理页面的Queues面板查看队列消息情况,会发现两个队列中存储的消息已经自动删除。至此,一条完整的消息发送、消息中间件存储、消息消费的Publish/Subscribe工作模式的业务案例已经实现。

小提示:

文件4中,使用的是开发中常用的@RabbitListener注解监听指定名称队列的消息情况,这种方式会在监听到指定队列存在消息后立即进行消费处理。除此之外,还可以使用RabbitTemplate模板类的receiveAndConvert(String queueName)方法手动消费指定队列中的消息。

2.基于配置类的方式

基于配置类的方式主要讲的是使用Spring Boot框架提供的@Configuration注解配置类定制消息发送组件,并进行消息发送。下面,对这种基于配置类的方式进行讲解和演示。

打开RabbitMQ消息配置类RabbitMQConfig,在该配置类中使用基于配置类的方式定制消息发送相关组件,修改后的内容如文件5所示。

文件5 RabbitMQConfig.java

 1  import org.springframework.amqp.core.*;
 2  import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
 3  import org.springframework.amqp.support.converter.MessageConverter;
 4  import org.springframework.context.annotation.Bean;
 5  import org.springframework.context.annotation.Configuration;
 6  @Configuration
 7  public class RabbitMQConfig {
 8      // 自定义消息转换器
 9      @Bean
 10     public MessageConverter messageConverter(){
 11         return new Jackson2JsonMessageConverter();
 12     }
 13     // 1、定义fanout类型的交换器
 14     @Bean
 15     public Exchange fanout_exchange(){
 16         return ExchangeBuilder.fanoutExchange("fanout_exchange").build();
 17     }
 18     // 2、定义两个不同名称的消息队列
 19     @Bean
 20     public Queue fanout_queue_email(){
 21         return new Queue("fanout_queue_email");
 22     }
 23     @Bean
 24     public Queue fanout_queue_sms(){
 25         return new Queue("fanout_queue_sms");
 26     }
 27     // 3、将两个不同名称的消息队列与交换器进行绑定
 28     @Bean
 29     public Binding bindingEmail(){
 30         return 
 31     BindingBuilder.bind(fanout_queue_email()).to(fanout_exchange()).with("").noargs();
 32     }
 33     @Bean
 34     public Binding bindingSms(){
 35         return 
 36       BindingBuilder.bind(fanout_queue_sms()).to(fanout_exchange()).with("").noargs();
 37     }
 38 }

文件5中,使用@Bean注解定制了三种类型的Bean组件,这三种组件分别表示1个交换器、2个消息队列、2个消息队列分别与交换器的绑定。这种基于配置类方式定制的消息组件内容和基于API方式定制的消息组件内容完全一样,只不过是实现方式的不同而已。

按照消息服务整合实现步骤,完成消息组件的定制后,还需要编写消息发送者和消息消费者,而在基于API的方式中已经实现了消息发送者和消息消费者,并且基于配置类方式定制的消息组件名称和之前测试用的消息发送和消息消费组件名称都是一致的,所以这里可以直接重复使用。

重新运行消息发送者测试方法psubPublisher(),消息消费者可以自动监听并消费消息队列中存在的消息,效果与基于API的方式测试效果一样。

3.基于注解的方式

基于注解的方式主要讲的是使用Spring框架提供@RabbitListener注解及其相关属性定制消息发送组件,并进行消息发送。下面,对这种基于注解的方式进行讲解和演示。

打开进行消息接收和处理的业务类RabbitMQService,将针对邮件业务和短信业务处理的消息消费者方法进行注释,使用@RabbitListener注解及其相关属性定制消息发送组件,修改后的内容如文件6所示。

文件6 RabbitMQService.java

 1  import com.itheima.domain.User;
 2  import org.springframework.amqp.rabbit.annotation.Exchange;
 3  import org.springframework.amqp.rabbit.annotation.Queue;
 4  import org.springframework.amqp.rabbit.annotation.QueueBinding;
 5  import org.springframework.amqp.rabbit.annotation.RabbitListener;
 6  import org.springframework.stereotype.Service;
 7  @Service
 8  public class RabbitMQService {
 9      /**
 10      *  **使用基于注解的方式实现消息服务
 11      * 1.1、Publish/Subscribe工作模式接收,处理邮件业务
 12      */
 13     @RabbitListener(bindings =@QueueBinding(value =
 14                                    @Queue("fanout_queue_email"), exchange = 
 15                                    @Exchange(value = "fanout_exchange",type = "fanout")))
 16     public void psubConsumerEmailAno(User user) {
 17         System.out.println("邮件业务接收到消息: "+user);
 18     }
 19     /**
 20      * 1.2、Publish/Subscribe工作模式接收,处理短信业务
 21      */
 22     @RabbitListener(bindings =@QueueBinding(value =
 23                                    @Queue("fanout_queue_sms"),exchange = 
 24                                    @Exchange(value = "fanout_exchange",type = "fanout")))
 25     public void psubConsumerSmsAno(User user) {
 26         System.out.println("短信业务接收到消息: "+user);
 27     }
 28 }

文件6中,使用@RabbitListener注解及其相关属性定制消息组件消费者,改用与发送消息对应的实体类User作为消息接收参数。在@RabbitListener注解中,使用bindings属性来自动创建并绑定交换器和消息队列组件,在定制交换器时将交换器类型设置为fanout。另外,bindings属性的@QueueBinding注解除了有value、type属性外,还有key属性用于定制路由键routingKey(当前发布订阅模式不需要)。

重启测试方法psubPublisher(),消息消费者可以自动监听并消费消息队列中存在的消息了,效果也与基于API的方式测试效果一样。

至此,在Spring Boot中完成了使用基于API、基于配置类和基于注解这三种方式来实现Publish/Subscribe工作模式的整合讲解。其中,这三种实现消息服务的方式中,基于API的方式相对简单、直观,但容易与业务代码产生耦合;基于配置类的方式相对隔离、容易统一管理、符合Spring Boot框架思想;基于注解的方式清晰明了、方便各自管理,但是也容易与业务代码产生耦合。在实际开发中,使用基于配置类的方式和基于注解的方式定制组件实现消息服务较为常见,使用基于API的方式偶尔使用,具体选择的话需要根据实际情况进行选择。

点击此处
隐藏目录