学科分类
目录
Spring Cloud

Stream的发布——订阅模式

Spring Cloud Stream中的消息通信方式遵循的是发布-订阅模式,当一条消息被投递到消息中间件之后,它会通过共享主题的方式进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。这里所提到的主题是Spring Cloud Stream中的一个抽象概念,用来代表发布共享消息给消费者的地方。在不同的消息中间件中,主题可能对应不同的概念,比如,在RabbitMQ中,对应的是Exchange,在Kafka中对应的是Topic。

在上一小节中,我们直接从RabbitMQ中给服务发送消息,使用了Stream框架后,应用结构变为:提供者发送消息到RabbitMQ等消息中间件,消费者通过订阅的方式从消息中间件获取消息,如图1所示。

图1 使用了Spring Cloud Stream后的应用结构

本节将通过一个案例来详细讲解如何使用提供者发布消息到RabbitMQ以及如何使用消费者订阅消息。具体步骤如下:

1、 创建提供者和消费者

使用Spring Initializr方式分别创建一个名称为stream-rabbitmq-provider和一个名称为stream-rabbitmq-consumer的Spring Boot项目。这里将两个项目的Group命名为com.itheima,Artifact分别命名为stream-rabbitmq-provider和stream-rabbitmq- consumer。这两个项目都添加了Spring Cloud Stream对RabbitMQ的支持的依赖spring-cloud-starter-stream-rabbit和Web、Test的依赖,如下所示。

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>2.1.3.RELEASE</version>
        </dependency>
 </dependencies>

2、 在提供者和消费者项目中添加RabbitMQ的配置

(1)在提供者stream-rabbitmq-provider项目的配置文件application.yml中编写 RabbitMQ的主机地址、端口号、用户名、密码等相关配置信息,如例2所示。

例2 stream-rabbitmq-provider\src\main\resources\application.yml

 1     server:
 2         port: 8898
 3     spring:
 4         application:
 5            name: stream-rabbitmq-provider
 6         rabbitmq:
 7             host: localhost
 8             port: 5672
 9             username: guest
 10         password: guest

(2)同样的,在消费者stream-rabbitmq-consumer项目的配置文件application.yml中编写RabbitMQ的主机地址、端口号、用户名、密码等相关信息,如例3所示。

例3 stream-rabbitmq-consumer\src\main\resources\application.yml

 1     server:
 2         port: 9898
 3     spring:
 4         application:
 5            name: stream-rabbitmq-consumer
 6         rabbitmq:
 7             host: localhost
 8             port: 5672
 9             username: guest
 10        password: guest

3、 启动运行,测试RabbitMQ与服务是否连接成功

依次启动stream-rabbitmq-provider和stream-rabbitmq-consumer,启动日志中会出现类似以下日志,说明与RabbitMQ连接成功。

Attempting to connect to: [127.0.0.1:5672]
Created new     connection:rabbitConnectionFactory#28068327:0/SimpleConnection@55fbebba     [delegate=amqp://guest@127.0.0.1:5672/, localPort= 51415]

4、 创建提供者类Provider发布消息

在提供者stream-rabbitmq-provider中创建一个Provider类,用于发布消息,如例4所示。

例4 stream-rabbitmq-provider\src\main\java\com\itheima\streamrabbitmqprovider\Provider.java

 1     import org.springframework.beans.factory.annotation.Autowired;
 2     import org.springframework.cloud.stream.annotation.EnableBinding;
 3     import org.springframework.cloud.stream.annotation.Output;
 4     import org.springframework.cloud.stream.messaging.Source;
 5     import org.springframework.messaging.MessageChannel;
 6     import org.springframework.web.bind.annotation.RestController;
 7     import org.springframework.web.bind.annotation.GetMapping;
 8     import org.springframework.messaging.support.MessageBuilder;
 9     @EnableBinding(Source.class)
 10     @RestController
 11     public class Provider {
 12         @Autowired
 13         @Output(Source.OUTPUT)
 14         private MessageChannel channel;
 15         @GetMapping("send")
 16         public void send() {
 17         channel.send(MessageBuilder.withPayload("Hello World!").build());
 18         }
 19     }

在例4中,第8行代码将@EnableBinding()注解的属性值设为Source.class,表示绑定Source接口,该接口我们用的是Stream提供的消息通道接口。

5、 创建消费者类Consumer订阅消息

在消费者stream-rabbitmq-consumer中创建一个用于订阅消息的类Consumer,并在类上添加@EnableBinding注解,用于绑定Sink消息通道接口。在Consumer类中添加一个receiver()方法,用于接收RabbitMQ消息,并在receiver()方法上添加@StreamListener注解,将receive方法注册为INPUT消息通道的监听处理器,如例5所示。

例5 stream-rabbitmq-consumer\src\main\java\com\itheima\streamrabbitmqconsumer\Consumer.java

 1     import org.springframework.cloud.stream.annotation.EnableBinding;
 2     import org.springframework.cloud.stream.annotation.StreamListener;
 3     import org.springframework.cloud.stream.messaging.Sink;
 4     @EnableBinding(Sink.class)
 5     public class Consumer {
 6         @StreamListener(Sink.INPUT)
 7         public void receiver(String message) {
 8             System.out.println("接收到MQ消息:" + message);
 9         }
 10     }

在例5中,第4行代码将@EnableBinding()注解的属性值设为Sink.class来绑定

Sink接口,该接口使用的是Stream提供的消息通道接口。

需要说明的是,@EnableBinding注解中指定的消息通道类,可以是Stream提供的默认类,也可以自定义。如果消息通道类是自定义的,可以使用注解@EnableBinding注解指定自定义消息通道类,例如,自定义的消息通道类的类名为Sink,就必须在@EnableBinding注解中指定Sink接口为消息通道类,指定格式为@EnableBinding(Sink.class)。这样,在应用启动的时候会自动实现对自定义消息通道类的绑定,Spring Cloud Stream会为其创建具体的实例,而我们只需要通过注入的方式来获取这些实例并直接使用即可。

6、 指定输入通道对应的destination值

在提供者stream-rabbitmq-provider的配置文件application.yml中指定输入通道的destination值,具体代码如下所示。

Spring:
  cloud:
    stream:
      bindings:
        myOutput:
          destination: minestream                       #指定输入通道对应的主题名

7、 指定输出通道对应的destination值

在消费者stream-rabbitmq-consumer的配置文件application.yml中指定输入通道的destination值,具体代码如下所示。

spring
  cloud:
    stream:
      bindings:
        myIntput:
          destination: minestream

注意:

INPUT和OUTPUT的destination值要在配置文件中做配置,不然你会发现项目没报错,消息也发出去了,但是收不到。这里非常重要,官方文档也有说明:

setting the application property     spring.cloud.stream.bindings.input.destination to raw-sensor-data will     cause it to read from the raw-sensor-data Kafka topic, or from a queue     bound to the raw-sensor-data RabbitMQ exchange

8、 测试运行

依次启动RabbitMQ、stream-rabbitmq-provider、stream-rabbitmq-consumer,访问http://localhost:8898/send 地址发送消息,在消费者项目stream-rabbitmq-consumer的控制台可以看到打印日志“接收到MQ消息:Hello World!”,说明消息已成功被接收。

接下来,我们查看RabbitMQ控制台,如图2所示。

图2 RabbitMQ控制台队列信息

从图2中,有一个名为minestream.anonymous.KmI00K8uRyOw70mWIzrsmg的队列,双击这条队列进入队列的管理界面,在Bindings下可以看到该队列绑定了minestream这个Exchange,如图3所示。

图3 RabbitMQ控制台的Bindings绑定

也就是说,发布者项目stream-rabbitmq-provider通过发布消息到RabbitMQ之后,共享了minestream这个Exchange,而消费者项目stream-rabbitmq-consumer通过订阅 minestream这个Exchange来接收提供者项目stream-rabbitmq-provider发布的消息。

点击此处
隐藏目录