学科分类
目录
Spring Cloud

Stream快速入门

通过上一小节的讲解,希望大家能够了解Spring Cloud Stream与消息中间件的通信过程,下面我们通过一个快速入门的案例,加深大家对Spring Cloud Stream的认识。本案例的主要目标是构建一个基于Spring Boot的微服务应用,这个微服务应用通过使用消息中间件RabbitMQ来接收消息并将消息打印到控制台。所以,在构建案例之前请先确认在本地已经安装了RabbitMQ。关于案例的具体开发步骤如下:

(1)创建项目。使用Spring Initializr方式创建一个名为stream-hello的 Spring Boot项目,这里我们将Group命名为com.itheima,将Artifact命名为stream-hello,添加Spring Cloud Stream对RabbitMQ支持的依赖和Web、Test依赖,如下所示。

 1         <dependencies>
 2             <dependency>
 3                 <groupId>org.springframework.boot</groupId>
 4                 <artifactId>spring-boot-starter-web</artifactId>
 5             </dependency>
 6             <dependency>
 7                 <groupId>org.springframework.boot</groupId>
 8                 <artifactId>spring-boot-starter-test</artifactId>
 9                 <scope>test</scope>
 10         </dependency>
 11         <dependency>
 12            <groupId>org.springframework.cloud</groupId>
 13            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
 14            <version>2.1.3.RELEASE</version>
 15         </dependency>
 16     </dependencies>

上述代码中,第11-15行引入了spring-cloud-starter-stream-rabbit依赖,该依赖包用于提供Spring Cloud Stream对RabbitMQ的支持,包含了Spring Cloud Stream对 RabbitMQ的自动化配置等内容。

(2)创建消息消费者类。创建rabittmq包,并在rabittmq包中创建SinkReceiver类,该类用于接收RabbitMQ发送的消息,如例1所示。

例1 stream-hello\src\main\java\com\itheima\streamhello\rabittmq\SinkReceiver.java

 1     import com.itheima.streamhello.StreamHelloApplication;
 2     import org.slf4j.Logger;
 3     import org.slf4j.LoggerFactory;
 4     import org.springframework.cloud.stream.annotation.EnableBinding;
 5     import org.springframework.cloud.stream.annotation.StreamListener;
 6     import org.springframework.cloud.stream.messaging.Sink;
 7     @EnableBinding(Sink.class)
 8     public class SinkReceiver {
 9         private static Logger logger= 
 10     LoggerFactory.getLogger(StreamHelloApplication.class);
 11         @StreamListener(Sink.INPUT)
 12         public void receive(Object payload){
 13         logger.info("Received:"+payload);
 14      }
 15     }

在例1中,第7行代码的@EnableBinding注解用来指定一个或多个定义了@Input或@Output注解的接口,以此实现对消息通道(Channel)的绑定。@EnableBinding()注解的属性值设为Sink.class表示绑定Sink接口,Sink接口是Spring Cloud Stream默认实现的对输入消息通道绑定的接口。

第11行代码的@StreamListener注解主要是修饰方法,用于将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名,这里我们通过@StreamListener(Sink.INPUT)注解将receive()方法注册为INPUT消息通道的监听处理器,所以当我们在RabbitMQ的控制页面中发布消息的时候,receive()方法会监听到此消息并做出对应的响应动作。

(3)启动测试。先启动RabbitMQ,再启动项目stream-hello,查看stream-hello应用是如何通过使用消息中间件RabbitMQ接收消息并将消息打印到控制台的。

stream-hello项目启动成功后,会在控制台打印以下启动日志内容。

 1      2019-10-25 11:09:50.931  INFO 13660 --- [           main] 
 2      c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for 
 3      inbound: input.anonymous.Uni8EbkdTMWdEDe852UyIA, bound to: input
 4      2019-10-25 11:09:50.967  INFO 13660 --[           main] 
 5      o.s.a.r.c.CachingConnectionFactory       : Created new connection: 
 6      rabbitConnectionFactory#30501e60:0/SimpleConnection@2a3194c6 
 7      [delegate=amqp://guest@127.0.0.1:5672/, localPort= 52460]

上述日志信息中,第2-3行代码表示声明了一个名为input.anonymous.Uni8EbkdTMWdEDe852UyIA(随机生成)的队列,第5-7行代码表示使用guest用户创建了一个指向

127.0.0.1:5672地址的RabbitMQ连接。

(4)在RabbitMQ控制台查看消息。使用浏览器访问http://localhost:15672地址,登录成功后,选择导航栏中的Connections选项,我们可以在RabbitMQ控制台可以看到一条指向127.0.0.1:5672地址的RabbitMQ连接,表明应用程序已和RabbitMQ建立连接。如图1所示。

图1 应用程序与RabbitMQ连接列表图

单击图1中的Queues选项,可以看到Queues列表中新生成了一条名为input.anonymous.Uni8EbkdTMWdE852UyIA(随机生成)的队列,如图2所示。

image-20200623155504995

图2 RabbitMQ控制台中的消息队列

应用程序在声明了一个随机队列之后,通过RabbitMessageChannelBinder将自己绑定为127.0.0.1:5672的消费者。这些信息我们可以在RabbitMQ的控制台单击导航栏中的Channels选项查看,如图2所示。

图3 RabbitMQ控制台中消息队列的消费者

选择图3导航栏中的Queues选项,双击input.anonymous.Uni8EbkdTMWdEde852UyIA这条队列,进入这条队列的管理页面,通过Publish message功能来发送一条消息到该队列中,如图4所示。

图4 通过Publish message发送一条消息

在图4中,在RabbitMQ控制台的队列管理页面使用Publish message功能发送了一条内容为“Send a message”的消息到队列中,我们可以在正在运行的stream-hello项目的控制台中看到如下日志内容。

INFO 6452 --- [bebsbZztvJREQ-1]
c.i.streamhello.StreamHelloApplication  :  Received:Send a message

上述日志内容“Send a message”正是刚刚发送到消息队列的内容。“Send a message”是通过stream-hello应用程序的SinkReceiver类中的receive()方法输出的,说明这条消息被stream-hello应用程序所消费。

至此,Stream快速入门案例就完成了,但是我们会发现,此案例中并没有在配置文件

application.yml中进行任何属性设置,原因在于Spring Cloud Stream会为消息中间件

RabbitMQ提供默认的自动化配置。当然我们也可以在Spring Boot支持的全局配置文件

application.properties或application.yml中修改相关配置。

点击此处
隐藏目录