stream的消费组
在实际开发中,我们的每一个微服务应用为了实现高可用和负载均衡,会部署多个实例。默认情况下,当消息提供者发出一条消息到绑定通道上,这条消息会被每个消费者实例接收和处理(出现了重复消费问题)。但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费。这时,为了防止对消息的重复处理,Spring Cloud Stream提供了消费组的概念。
在Spring Cloud Stream应用程序开发中,如果在同一主题上的应用需要启动多个实例时,为防止对消息的重复处理,我们可以通过spring.cloud.stream.bindings.input. group属性为应用指定一个组名,这样一个应用的多个实例在接收到消息时,只会有一个实例真正收到消息并进行处理。图1描述了使用消费组的流程图。
图1 消费组流程图
在图1中,定义了两个消费组Group-A和Group-B,通过网络传递过来的消息通过
Topic主题,按照分组名传递到这两个消费组中,这两个订阅该主题的消费组都会收到消息,且每个组中只有一个成员会收到该消息。
为了更好的理解Stream消费组,我们先搭建一个没有实现Stream消费组的案例,看一下出现的问题。下面,我们将在前面小节项目的基础上进行改造,具体步骤如下:
(1)搭建一个消费者项目stream-rabbitmq-consumer2,搭建步骤与stream- rabbitmq-consumer项目一致,唯一不同的是,这里我们将stream-rabbitmq-consumer2的端口号设为9899。
(2)依次启动stream-rabbitmq-provider,stream-rabbitmq-consumer,stream- rabbitmq-consumer2,并在浏览器访问http://localhost:8898/send
,在RabbitMQ控制台的 Queues中我们可看到有两个minestream.anonymous.*的队列都绑定了minestream这个Exchange,如图2所示。
图2 RabbitMQ控制台中的消息队列
分别查看消费者项目stream-rabbitmq-consumer和stream-rabbitmq-consumer2的控制台的打印日志,会发现两个消费者的控制台都打印了“接收到MQ消息:Hello World!”说明stream-rabbitmq-provider发送的这条消息被消费者stream-rabbitmq- consumer2和stream-rabbitmq-consumer同时消费了,也就是说,出现了消息被重复消费的情况。
接下来,我们使用Stream消费组解决上述案例出现的消息被重复消费的情况。在上面案例的基础上进行改造,详细讲解Stream消费组的应用。改造的具体步骤如下:
(1)在stream-rabbitmq-provider项目的application.yml配置文件中添加 spring.cloud.stream.bindings.input.group属性来指定消费组名称,如例2所示。
例2 stream-rabbitmq-provider\src\main\resources\application.yml
1 spring:
2 application:
3 name: stream-rabbitmq-provider
4 rabbitmq:
5 host: localhost
6 port: 5672
7 username: guest
8 password: guest
9 cloud:
10 stream:
11 bindings:
12 output:
13 destination: minestream
14 group: stream
15 server:
16 port: 8898
(2)启动项目stream-rabbitmq-provider,stream-rabbitmq-consumer和stream- rabbitMQ-consumer2,访问http://localhost:8898/send
,在控制台中可看到,每发送一条请求,两个项目中只有其中一个接收到了消息,说明我们的消费组设置成功。
如果使用RabbitMQ控制台的Queues查看消费组Stream,可以看到现在只有 minestream.stream这一个队列了,再次印证了我们的消费组设置成功。如图3所示。
图3 RabbitMQ控制台中的Stream消费组