07-消息驱动详解

张开发
2026/6/9 4:41:04 15 分钟阅读
07-消息驱动详解
Spring Cloud Stream 消息驱动详解一、知识概述Spring Cloud Stream 是构建消息驱动微服务的框架,它提供了统一的编程模型,屏蔽了底层消息中间件的差异。开发者可以使用统一的 API 进行消息的生产和消费,支持 RabbitMQ、Kafka、RocketMQ 等多种消息中间件。消息驱动的核心概念:Binder:消息中间件绑定器Binding:消息通道绑定Message:消息体Channel:消息通道理解消息驱动的原理,是构建异步、解耦微服务系统的重要技能。二、知识点详细讲解2.1 架构模型应用层 │ ├── Input Channel(输入通道) │ │ │ ▼ │ Binder(绑定器) │ │ │ ▼ │ 消息中间件(RabbitMQ/Kafka/RocketMQ) │ │ │ ▼ ├── Output Channel(输出通道) │ 应用层2.2 核心概念Binder负责与消息中间件交互,提供:连接管理消息发送/接收序列化/反序列化Binding连接应用与 Binder 的桥梁:Input Binding:消费者绑定Output Binding:生产者绑定Message消息的基本单元:Header:消息头Payload:消息体2.3 消息中间件对比特性RabbitMQKafkaRocketMQ吞吐量中高高延迟低中低顺序性❌✅✅事务消息❌✅✅延迟消息✅✅✅消息回溯❌✅✅2.4 消费模型发布订阅(Publish-Subscribe)一条消息被多个消费者消费每个消费者独立消费消费者组(Consumer Group)一条消息只被组内一个消费者消费实现负载均衡三、代码示例3.1 基础配置!-- pom.xml --!-- RabbitMQ Binder --dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-rabbit/artifactId/dependency!-- Kafka Binder --dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-kafka/artifactId/dependency# application.ymlspring:cloud:stream:# 绑定器配置binders:rabbit-binder:type:rabbitenvironment:spring:rabbitmq:host:localhostport:5672username:guestpassword:guestkafka-binder:type:kafkaenvironment:spring:kafka:bootstrap-servers:localhost:9092# 绑定配置bindings:# 输出通道userOutput:destination:user-topiccontent-type:application/jsonbinder:rabbit-binder# 输入通道userInput:destination:user-topicgroup:user-groupbinder:rabbit-binder3.2 定义消息通道importorg.springframework.cloud.stream.annotation.*;importorg.springframework.messaging.*;importorg.springframework.stereotype.Component;// 消息通道定义publicinterfaceUserChannels{// 输出通道(生产者)StringUSER_OUTPUT="userOutput";@Output(USER_OUTPUT)MessageChanneluserOutput();// 输入通道(消费者)StringUSER_INPUT="userInput";@Input(USER_INPUT)SubscribableChanneluserInput();}// 多通道定义publicinterfaceOrderChannels{@Output("orderOutput")MessageChannelorderOutput();@Input("orderInput")SubscribableChannelorderInput();@Output("paymentOutput")MessageChannelpaymentOutput();@Input("paymentInput")SubscribableChannelpaymentInput();}3.3 启用消息通道importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;importorg.springframework.cloud.stream.annotation.EnableBinding;@SpringBootApplication@EnableBinding({UserChannels.class,OrderChannels.class})publicclassStreamApplication{publicstaticvoidmain(String[]args){SpringApplication.run(StreamApplication.class,args);}}3.4 消息生产者importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.cloud.stream.annotation.

更多文章