Stream的发布——订阅模式
Spring Cloud Stream中的消息通信方式遵循的是发布-订阅模式,当一条消息被投递到消息中间件之后,它会通过共享主题的方式进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。这里所提到的主题是Spring Cloud Stream中的一个抽象概念,用来代表发布共享消息给消费者的地方。在不同的消息中间件中,主题可能对应不同的概念,比如,在RabbitMQ中,对应的是Exchange,在Kafka中对应的是Topic。
在上一小节中,我们直接从RabbitMQ中给服务发送消息,使用了Stream框架后,应用结构变为:提供者发送消息到RabbitMQ等消息中间件,消费者通过订阅的方式从消息中间件获取消息,如图1所示。
图1 使用了Spring Cloud Stream后的应用结构
本节将通过一个案例来详细讲解如何使用提供者发布消息到RabbitMQ以及如何使用消费者订阅消息。具体步骤如下:
1、 创建提供者和消费者
使用Spring Initializr方式分别创建一个名称为stream-rabbitmq-provider和一个名称为stream-rabbitmq-consumer的Spring Boot项目。这里将两个项目的Group命名为com.itheima,Artifact分别命名为stream-rabbitmq-provider和stream-rabbitmq- consumer。这两个项目都添加了Spring Cloud Stream对RabbitMQ的支持的依赖spring-cloud-starter-stream-rabbit和Web、Test的依赖,如下所示。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
</dependencies>
2、 在提供者和消费者项目中添加RabbitMQ的配置
(1)在提供者stream-rabbitmq-provider项目的配置文件application.yml中编写 RabbitMQ的主机地址、端口号、用户名、密码等相关配置信息,如例2所示。
例2 stream-rabbitmq-provider\src\main\resources\application.yml
1 server:
2 port: 8898
3 spring:
4 application:
5 name: stream-rabbitmq-provider
6 rabbitmq:
7 host: localhost
8 port: 5672
9 username: guest
10 password: guest
(2)同样的,在消费者stream-rabbitmq-consumer项目的配置文件application.yml中编写RabbitMQ的主机地址、端口号、用户名、密码等相关信息,如例3所示。
例3 stream-rabbitmq-consumer\src\main\resources\application.yml
1 server:
2 port: 9898
3 spring:
4 application:
5 name: stream-rabbitmq-consumer
6 rabbitmq:
7 host: localhost
8 port: 5672
9 username: guest
10 password: guest
3、 启动运行,测试RabbitMQ与服务是否连接成功
依次启动stream-rabbitmq-provider和stream-rabbitmq-consumer,启动日志中会出现类似以下日志,说明与RabbitMQ连接成功。
Attempting to connect to: [127.0.0.1:5672]
Created new connection:rabbitConnectionFactory#28068327:0/SimpleConnection@55fbebba [delegate=amqp://guest@127.0.0.1:5672/, localPort= 51415]
4、 创建提供者类Provider发布消息
在提供者stream-rabbitmq-provider中创建一个Provider类,用于发布消息,如例4所示。
例4 stream-rabbitmq-provider\src\main\java\com\itheima\streamrabbitmqprovider\Provider.java
1 import org.springframework.beans.factory.annotation.Autowired;
2 import org.springframework.cloud.stream.annotation.EnableBinding;
3 import org.springframework.cloud.stream.annotation.Output;
4 import org.springframework.cloud.stream.messaging.Source;
5 import org.springframework.messaging.MessageChannel;
6 import org.springframework.web.bind.annotation.RestController;
7 import org.springframework.web.bind.annotation.GetMapping;
8 import org.springframework.messaging.support.MessageBuilder;
9 @EnableBinding(Source.class)
10 @RestController
11 public class Provider {
12 @Autowired
13 @Output(Source.OUTPUT)
14 private MessageChannel channel;
15 @GetMapping("send")
16 public void send() {
17 channel.send(MessageBuilder.withPayload("Hello World!").build());
18 }
19 }
在例4中,第8行代码将@EnableBinding()注解的属性值设为Source.class,表示绑定Source接口,该接口我们用的是Stream提供的消息通道接口。
5、 创建消费者类Consumer订阅消息
在消费者stream-rabbitmq-consumer中创建一个用于订阅消息的类Consumer,并在类上添加@EnableBinding注解,用于绑定Sink消息通道接口。在Consumer类中添加一个receiver()方法,用于接收RabbitMQ消息,并在receiver()方法上添加@StreamListener注解,将receive方法注册为INPUT消息通道的监听处理器,如例5所示。
例5 stream-rabbitmq-consumer\src\main\java\com\itheima\streamrabbitmqconsumer\Consumer.java
1 import org.springframework.cloud.stream.annotation.EnableBinding;
2 import org.springframework.cloud.stream.annotation.StreamListener;
3 import org.springframework.cloud.stream.messaging.Sink;
4 @EnableBinding(Sink.class)
5 public class Consumer {
6 @StreamListener(Sink.INPUT)
7 public void receiver(String message) {
8 System.out.println("接收到MQ消息:" + message);
9 }
10 }
在例5中,第4行代码将@EnableBinding()注解的属性值设为Sink.class来绑定
Sink接口,该接口使用的是Stream提供的消息通道接口。
需要说明的是,@EnableBinding注解中指定的消息通道类,可以是Stream提供的默认类,也可以自定义。如果消息通道类是自定义的,可以使用注解@EnableBinding注解指定自定义消息通道类,例如,自定义的消息通道类的类名为Sink,就必须在@EnableBinding注解中指定Sink接口为消息通道类,指定格式为@EnableBinding(Sink.class)。这样,在应用启动的时候会自动实现对自定义消息通道类的绑定,Spring Cloud Stream会为其创建具体的实例,而我们只需要通过注入的方式来获取这些实例并直接使用即可。
6、 指定输入通道对应的destination值
在提供者stream-rabbitmq-provider的配置文件application.yml中指定输入通道的destination值,具体代码如下所示。
Spring:
cloud:
stream:
bindings:
myOutput:
destination: minestream #指定输入通道对应的主题名
7、 指定输出通道对应的destination值
在消费者stream-rabbitmq-consumer的配置文件application.yml中指定输入通道的destination值,具体代码如下所示。
spring
cloud:
stream:
bindings:
myIntput:
destination: minestream
注意:
INPUT和OUTPUT的destination值要在配置文件中做配置,不然你会发现项目没报错,消息也发出去了,但是收不到。这里非常重要,官方文档也有说明:
setting the application property spring.cloud.stream.bindings.input.destination to raw-sensor-data will cause it to read from the raw-sensor-data Kafka topic, or from a queue bound to the raw-sensor-data RabbitMQ exchange
8、 测试运行
依次启动RabbitMQ、stream-rabbitmq-provider、stream-rabbitmq-consumer,访问http://localhost:8898/send
地址发送消息,在消费者项目stream-rabbitmq-consumer的控制台可以看到打印日志“接收到MQ消息:Hello World!”,说明消息已成功被接收。
接下来,我们查看RabbitMQ控制台,如图2所示。
图2 RabbitMQ控制台队列信息
从图2中,有一个名为minestream.anonymous.KmI00K8uRyOw70mWIzrsmg的队列,双击这条队列进入队列的管理界面,在Bindings下可以看到该队列绑定了minestream这个Exchange,如图3所示。
图3 RabbitMQ控制台的Bindings绑定
也就是说,发布者项目stream-rabbitmq-provider通过发布消息到RabbitMQ之后,共享了minestream这个Exchange,而消费者项目stream-rabbitmq-consumer通过订阅 minestream这个Exchange来接收提供者项目stream-rabbitmq-provider发布的消息。