学科分类
目录
Spring Cloud

stream的消费组

在实际开发中,我们的每一个微服务应用为了实现高可用和负载均衡,会部署多个实例。默认情况下,当消息提供者发出一条消息到绑定通道上,这条消息会被每个消费者实例接收和处理(出现了重复消费问题)。但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费。这时,为了防止对消息的重复处理,Spring Cloud Stream提供了消费组的概念。

在Spring Cloud Stream应用程序开发中,如果在同一主题上的应用需要启动多个实例时,为防止对消息的重复处理,我们可以通过spring.cloud.stream.bindings.input. group属性为应用指定一个组名,这样一个应用的多个实例在接收到消息时,只会有一个实例真正收到消息并进行处理。图1描述了使用消费组的流程图。

图1 消费组流程图

在图1中,定义了两个消费组Group-A和Group-B,通过网络传递过来的消息通过

Topic主题,按照分组名传递到这两个消费组中,这两个订阅该主题的消费组都会收到消息,且每个组中只有一个成员会收到该消息。

为了更好的理解Stream消费组,我们先搭建一个没有实现Stream消费组的案例,看一下出现的问题。下面,我们将在前面小节项目的基础上进行改造,具体步骤如下:

(1)搭建一个消费者项目stream-rabbitmq-consumer2,搭建步骤与stream- rabbitmq-consumer项目一致,唯一不同的是,这里我们将stream-rabbitmq-consumer2的端口号设为9899。

(2)依次启动stream-rabbitmq-provider,stream-rabbitmq-consumer,stream- rabbitmq-consumer2,并在浏览器访问http://localhost:8898/send,在RabbitMQ控制台的 Queues中我们可看到有两个minestream.anonymous.*的队列都绑定了minestream这个Exchange,如图2所示。

图2 RabbitMQ控制台中的消息队列

分别查看消费者项目stream-rabbitmq-consumer和stream-rabbitmq-consumer2的控制台的打印日志,会发现两个消费者的控制台都打印了“接收到MQ消息:Hello World!”说明stream-rabbitmq-provider发送的这条消息被消费者stream-rabbitmq- consumer2和stream-rabbitmq-consumer同时消费了,也就是说,出现了消息被重复消费的情况。

接下来,我们使用Stream消费组解决上述案例出现的消息被重复消费的情况。在上面案例的基础上进行改造,详细讲解Stream消费组的应用。改造的具体步骤如下:

(1)在stream-rabbitmq-provider项目的application.yml配置文件中添加 spring.cloud.stream.bindings.input.group属性来指定消费组名称,如例2所示。

例2 stream-rabbitmq-provider\src\main\resources\application.yml

 1 spring:
 2            application:
 3               name: stream-rabbitmq-provider
 4            rabbitmq:
 5               host: localhost
 6               port: 5672
 7               username: guest
 8               password: guest
 9            cloud:
 10          stream:
 11            bindings:
 12              output:
 13                destination: minestream
 14                group: stream                 
 15     server:
 16       port: 8898

(2)启动项目stream-rabbitmq-provider,stream-rabbitmq-consumer和stream- rabbitMQ-consumer2,访问http://localhost:8898/send,在控制台中可看到,每发送一条请求,两个项目中只有其中一个接收到了消息,说明我们的消费组设置成功。

如果使用RabbitMQ控制台的Queues查看消费组Stream,可以看到现在只有 minestream.stream这一个队列了,再次印证了我们的消费组设置成功。如图3所示。

图3 RabbitMQ控制台中的Stream消费组

点击此处
隐藏目录