Stream快速入门
通过上一小节的讲解,希望大家能够了解Spring Cloud Stream与消息中间件的通信过程,下面我们通过一个快速入门的案例,加深大家对Spring Cloud Stream的认识。本案例的主要目标是构建一个基于Spring Boot的微服务应用,这个微服务应用通过使用消息中间件RabbitMQ来接收消息并将消息打印到控制台。所以,在构建案例之前请先确认在本地已经安装了RabbitMQ。关于案例的具体开发步骤如下:
(1)创建项目。使用Spring Initializr方式创建一个名为stream-hello的 Spring Boot项目,这里我们将Group命名为com.itheima,将Artifact命名为stream-hello,添加Spring Cloud Stream对RabbitMQ支持的依赖和Web、Test依赖,如下所示。
1 <dependencies>
2 <dependency>
3 <groupId>org.springframework.boot</groupId>
4 <artifactId>spring-boot-starter-web</artifactId>
5 </dependency>
6 <dependency>
7 <groupId>org.springframework.boot</groupId>
8 <artifactId>spring-boot-starter-test</artifactId>
9 <scope>test</scope>
10 </dependency>
11 <dependency>
12 <groupId>org.springframework.cloud</groupId>
13 <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
14 <version>2.1.3.RELEASE</version>
15 </dependency>
16 </dependencies>
上述代码中,第11-15行引入了spring-cloud-starter-stream-rabbit依赖,该依赖包用于提供Spring Cloud Stream对RabbitMQ的支持,包含了Spring Cloud Stream对 RabbitMQ的自动化配置等内容。
(2)创建消息消费者类。创建rabittmq包,并在rabittmq包中创建SinkReceiver类,该类用于接收RabbitMQ发送的消息,如例1所示。
例1 stream-hello\src\main\java\com\itheima\streamhello\rabittmq\SinkReceiver.java
1 import com.itheima.streamhello.StreamHelloApplication;
2 import org.slf4j.Logger;
3 import org.slf4j.LoggerFactory;
4 import org.springframework.cloud.stream.annotation.EnableBinding;
5 import org.springframework.cloud.stream.annotation.StreamListener;
6 import org.springframework.cloud.stream.messaging.Sink;
7 @EnableBinding(Sink.class)
8 public class SinkReceiver {
9 private static Logger logger=
10 LoggerFactory.getLogger(StreamHelloApplication.class);
11 @StreamListener(Sink.INPUT)
12 public void receive(Object payload){
13 logger.info("Received:"+payload);
14 }
15 }
在例1中,第7行代码的@EnableBinding注解用来指定一个或多个定义了@Input或@Output注解的接口,以此实现对消息通道(Channel)的绑定。@EnableBinding()注解的属性值设为Sink.class表示绑定Sink接口,Sink接口是Spring Cloud Stream默认实现的对输入消息通道绑定的接口。
第11行代码的@StreamListener注解主要是修饰方法,用于将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名,这里我们通过@StreamListener(Sink.INPUT)注解将receive()方法注册为INPUT消息通道的监听处理器,所以当我们在RabbitMQ的控制页面中发布消息的时候,receive()方法会监听到此消息并做出对应的响应动作。
(3)启动测试。先启动RabbitMQ,再启动项目stream-hello,查看stream-hello应用是如何通过使用消息中间件RabbitMQ接收消息并将消息打印到控制台的。
stream-hello项目启动成功后,会在控制台打印以下启动日志内容。
1 2019-10-25 11:09:50.931 INFO 13660 --- [ main]
2 c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for
3 inbound: input.anonymous.Uni8EbkdTMWdEDe852UyIA, bound to: input
4 2019-10-25 11:09:50.967 INFO 13660 --[ main]
5 o.s.a.r.c.CachingConnectionFactory : Created new connection:
6 rabbitConnectionFactory#30501e60:0/SimpleConnection@2a3194c6
7 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 52460]
上述日志信息中,第2-3行代码表示声明了一个名为input.anonymous.Uni8EbkdTMWdEDe852UyIA(随机生成)的队列,第5-7行代码表示使用guest用户创建了一个指向
127.0.0.1:5672地址的RabbitMQ连接。
(4)在RabbitMQ控制台查看消息。使用浏览器访问http://localhost:15672
地址,登录成功后,选择导航栏中的Connections选项,我们可以在RabbitMQ控制台可以看到一条指向127.0.0.1:5672地址的RabbitMQ连接,表明应用程序已和RabbitMQ建立连接。如图1所示。
图1 应用程序与RabbitMQ连接列表图
单击图1中的Queues选项,可以看到Queues列表中新生成了一条名为input.anonymous.Uni8EbkdTMWdE852UyIA(随机生成)的队列,如图2所示。
图2 RabbitMQ控制台中的消息队列
应用程序在声明了一个随机队列之后,通过RabbitMessageChannelBinder将自己绑定为127.0.0.1:5672的消费者。这些信息我们可以在RabbitMQ的控制台单击导航栏中的Channels选项查看,如图2所示。
图3 RabbitMQ控制台中消息队列的消费者
选择图3导航栏中的Queues选项,双击input.anonymous.Uni8EbkdTMWdEde852UyIA这条队列,进入这条队列的管理页面,通过Publish message功能来发送一条消息到该队列中,如图4所示。
图4 通过Publish message发送一条消息
在图4中,在RabbitMQ控制台的队列管理页面使用Publish message功能发送了一条内容为“Send a message”的消息到队列中,我们可以在正在运行的stream-hello项目的控制台中看到如下日志内容。
INFO 6452 --- [bebsbZztvJREQ-1]
c.i.streamhello.StreamHelloApplication : Received:Send a message
上述日志内容“Send a message”正是刚刚发送到消息队列的内容。“Send a message”是通过stream-hello应用程序的SinkReceiver类中的receive()方法输出的,说明这条消息被stream-hello应用程序所消费。
至此,Stream快速入门案例就完成了,但是我们会发现,此案例中并没有在配置文件
application.yml中进行任何属性设置,原因在于Spring Cloud Stream会为消息中间件
RabbitMQ提供默认的自动化配置。当然我们也可以在Spring Boot支持的全局配置文件
application.properties或application.yml中修改相关配置。