SpringCloud进阶--RabbitMQ消息队列(完结)

张开发
2026/6/8 6:11:24 15 分钟阅读
SpringCloud进阶--RabbitMQ消息队列(完结)
abbitMQ消息队列什么是消息队列一般我们在进行远程调用时可以使用发送HTTP请求来完成现在可以使用第二种方式那就是消息队列。他能将发送方发送的消息放入队列中当新消息入队时会通知接收方进行处理一般消息发送方称为生产者接收方称为消费者。这样所有的请求都直接丢到消息队列中再由消费者取出不再是直接连接消费者的形式了而是增加了一个中间商这也时一种很好的解耦方案并且在高并发情况下由于消费者能力有限消息队列能起到削峰填谷的作用堆积一部分请求再由消费者来慢慢处理而不会像直接调用那样请求蜂拥而至消息队列的具体实现有哪些RabbitMQ老牌、稳定、功能全适合传统企业、小中型项目、复杂路由Kafka吞吐量爆炸、高吞吐、日志 / 大数据专用不适合做业务消息RocketMQ阿里出品高吞吐 业务可靠电商 / 金融 / 互联网主流那到底要怎么选择呢做电商 / 订单 / 支付→RocketMQ做日志 / 大数据 / 流处理→Kafka做企业系统 / 微服务解耦→RabbitMQ整体对比如下对比维度RabbitMQKafkaRocketMQ开发语言ErlangScala/JavaJava协议AMQP、MQTT、STOMP自定义 TCP 协议自定义 TCP 协议吞吐量万级 TPS中等并发几十万 TPS超高吞吐十万级 TPS高吞吐延迟微秒级极低延迟毫秒级毫秒级可靠性高持久化 消息确认中配置不当易丢消息最高金融级可靠功能丰富度最全复杂路由、死信、优先级、延迟极简只做发布订阅、流处理很全事务、定时、重试、顺序消息集群依赖自带集群依赖 Zookeeper依赖 NameServer运维难度一般Erlang 问题难排查较高依赖 ZK参数多较低轻量、易部署社区生态国外成熟、文档全大数据生态极强国内生态好、阿里系完善典型场景企业系统、微服务解耦、复杂路由日志收集、大数据流、埋点电商、金融、订单、秒杀、支付安装消息队列RabbitMQ运行需要Erlang环境所以需要先安装Erlang环境安装Erlang环境sudo apt install erlang安装RabbitMQsudo apt install rabbitmq-server安装完成后可输入sudo rabbitmqctl status 查看当前RabbitMQ的运行状态可以看到有两个端口一会使用的就是amqp协议那个端口来连接。25672时集群化端口。开启RabbitMQ管理面板这样就可以在浏览器上进行实时访问和监控了sudo rabbitmq-plugins enable rabbitmq_management输入网址http://127.0.0.1:15672 就可以访问了默认用户和密码都是guestguest只能在本地进行登录如果要从远程服务器访问管理面板要创建一个新的管理员账号不能使用guest。创建命令如下 # sudo rabbitmqctl add_user 用户名 密码 sudo rabbitmqctl add_user admin admin然后将管理员权限授予创建的用户sudo rabbitmqctl set_user_tags admin administrator然后就可以登录了。RabbitMQ的设计架构如下Channel客户端连接会使用Channel再通过Channel访问Rabbit MQ服务器这里的通信协议不是http而是amqp协议。Exchange类似于交换机会根据请求转发给对应的消息队列每个队列都可以绑定到Exchange上这样Exchange就可以将数据转发给队列。可以存在多个。不同的Exchange类型可以用于实现不同的消息模式。Queue消息队列生产者所有的消息都存放再消息队列中等待消费者取出。Virtual Host:类似于环境隔离不同环境可以单独配置一个Virtual Host每个Virtual Host可以包含多个Exchange和Queue每个Virtual Host互不影响。使用消息队列简单模式一个生产者--------输入数据-----》消息队列---------取出数据-----------》一个消费者先进入管理页面创建一个新的实验环境只需新建一个Virtual Host即可主页选择admin标签右侧选择Virtual Host选项然后创建一个新的Virtual Host这时系统会自动增加对应的Exchange这里先说明前面2个direct类型的交换机。(AMQP default)和amq.direct他们都是直连模式的交换机。第一个交换机(AMQP default)这个交换机是所有虚拟主机都会自带的一个默认交换机此交换机不可删除默认绑定到所有的消息队列如果通过默认交换机发送消息那么会根据消息的routingKey决定发送给哪个同名的消息队列同时也不能显示地将消息队列绑定或解绑到此交换机。当前交换机的特性是持久化的如果不持久化那么一重启增加的交换机就会消失。所有带D字样的。都表示是持久化的。所有自动生成的交换机都是持久化的。第二个交换机amq.direct此交换机和默认交换机类型一致并且也是持久化但是它具有绑定关系。如果没指定消息队列绑定到此交换机那么此交换机无法将信息存放到指定队列。消息队列Queues此时没有消息队列需要添加一个。第一行选择刚创建的虚拟主机。类型选择Classic持久化可以选择Transient 暂时的也可以选择持久化自动删除选择No(需要至少一个消费者连接到此队列之后一旦这个队列没有消费者连接就会自动删除此队列)然后就可以保存了。然后将此消息队列绑定到第二个交换机上。然后可以在页面上直接发送消息。这时可以在页面上获取队列中的消息Ack Mode 应答模式选择一共4个选项Nack message requeue true:拒绝消息不会将消息从队列中取出并且重新排队一次可以拒绝多个消息。Ack message requeue false:确认应答确认后会从队列中移除一次可以确认多个消息。Reject message requeue true/false: 拒绝消息可以指定是否重新排队。这里使用默认的只会查看消息。不会取出。第二个参数是编码格式使用默认就行第三个参数是指定取出消息的个数使用Java操作消息队列使用简单的maven项目整合消息队列先引入依赖dependency groupIdcom.rabbitmq/groupId artifactIdamqp-client/artifactId version5.14.2/version /dependency实现生产者和消费者。首先是生产者。生产者复制把消息发送到消息队列public static void main( String[] args ) { // 使用 RabbitMQ Java 客户端连接到 RabbitMQ 服务器 ConnectionFactory factory new ConnectionFactory(); // 连接基础配置 factory.setHost(localhost); factory.setPort(5672); // 5672 是 RabbitMQ 的默认 AMQP 端口 factory.setUsername(admin); factory.setPassword(admin); factory.setVirtualHost(/test); try { // 创建连接 Connection connection factory.newConnection(); // 创建频道 Channel channel connection.createChannel(); // 声明队列并绑定它们 channel.queueDeclare(test, false, false, false, null); // 将队列绑定到交换机 channel.queueBind(yyds, amq.direct, my-yyds); // 发送消息 消息需要转换成字节数组 channel.basicPublish(amq.direct, my-yyds, null, Hello World.getBytes()); } catch (Exception e) { throw new RuntimeException(e); } }其中queueDeclare方法的参数如下queue队列名称默认创建后routingKey和队列名称一致durable是否持久化exclusive是否排他如果一个队列被声明为排他队列该队列仅对首次声明它的连接可见并在连接断开时自动删除。排他队列是基于Connection可见同一个Connection的不同Channel是可以同时访问同一个排他队列并且如果一个Connection已经声明了一个排他队列其他的Connection不允许建立同名的排他队列即使该队列是持久化的一旦Connection关闭或者客户端退出该排他队列会被自动删除。autoDelete是否自动删除arguments设置队列的其他一些参数这里暂时不需要设置。其中queueBind方法参数如下queue需要绑定的队列名称exchange需要绑定的交换机名称routingKey指定routingKey其中basicPublish方法的参数如下exchange对应的exchange名称我们这里使用第二个直连交换机routingKey绑定时指定的routingKeyprops其他配置body消息本体现在可以创建消费者来获取消息了public static void main(String[] args) throws IOException, TimeoutException { // 使用 RabbitMQ Java 客户端连接到 RabbitMQ 服务器 ConnectionFactory factory new ConnectionFactory(); // 连接基础配置 factory.setHost(localhost); factory.setPort(5672); // 5672 是 RabbitMQ 的默认 AMQP 端口 factory.setUsername(admin); factory.setPassword(admin); factory.setVirtualHost(/test); // 创建连接 // 这里不使用try-with-resources因为消费者需要持续监听消息不能在try块结束后关闭连接 Connection connection factory.newConnection(); // 创建频道 Channel channel connection.createChannel(); // 声明队列并绑定它们 channel.queueDeclare(test, false, false, false, null); // 将队列绑定到交换机 channel.queueBind(yyds, amq.direct, my-yyds); // 创建消息消费者监听队列并处理消息 channel.basicConsume(yyds, false, (s, delivery) - { String msg new String(delivery.getBody()); System.out.println(Received: msg); // basicAck确认应答 // 第一个参数是消息的标签 // 第二个参数是是否批量确认如果为true则会一次性确认所有小于等于该标签的消息如果为false则只确认当前消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // basicNack是拒绝应答前两个参数与basicAck相同 // 第三个参数表示是否重新入队如果为true则消息会重新放回队列等待被消费如果为false则消息会被丢弃 //channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); // 跟上面一样最后一个参数是false表示不重新入队而是直接丢弃消息只不过这里省略了 //channel.basicReject(delivery.getEnvelope().getDeliveryTag(), true); }, s - {}); }其中basicConsume方法参数如下queue队列名称autoAck自动应答消费者从消息队列取出数据后需要跟服务器进行确认应答当服务器确认后会自动将消息删除如果开启自动应答那么消息发出后会直接删除。deliver消息接收后的回调函数可以在回调中对消息进行处理处理完成后需要给服务器确认应答。cancel当消费者取消订阅时进行的函数回调这里暂时用不到。spring boot整合消息队列先引入依赖dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-amqp/artifactId /dependency添加配置信息spring: rabbitmq: addresses: localhost username: admin password: admin virtual-host: /test创建一个配置类Configuration public class RabbitConfiguration { Bean(directExchange)// 定义交换机可以很多个 public Exchange exchange() { return ExchangeBuilder.directExchange(amq.direct).build(); } Bean(yydsQueue) // 定义消息队列 public Queue queue() { return QueueBuilder.nonDurable(yyds) // 非持久化队列 .build(); } Bean(binding) public Binding binding(Qualifier(directExchange) Exchange exchange,Qualifier(yydsQueue) Queue queue) { return BindingBuilder.bind(queue)// 绑定队列 .to(exchange)// 绑定交换机 .with(my-yyds) // 绑定路由键routingKey .noargs(); } }创建一个生产者// RabbitTemplate封装了大量的方法来简化RabbitMQ的使用 Resource RabbitTemplate rabbitTemplate; Test void publisher() { // 最后一个消息本体可以是Object类型 rabbitTemplate.convertAndSend(amqp.direct, my-yyds, Hello, RabbitMQ!); }创建一个消费者实际上就是创建一个监听器Component // 注册为bean public class TestLinster { // 定义此方法为队列yyds的监听器一旦监听到消息就会接收并处理 RabbitListener(queues yyds) public void test(Message message) { System.out.println(new String(message.getBody())); } }那如何获取消费者的处理结果呢生产者需要这样写Test void publisher() { Object res rabbitTemplate.convertSendAndReceive(amqp.direct, my-yyds, Hello, RabbitMQ!); System.out.println(消费者的相应res); }消费者这样写RabbitListener(queues yyds) public String test1(Message message) { System.out.println(new String(message.getBody())); return 相应成功; }那如果我需要直接接收一个json格式的消息并且希望直接获取到实体对象呢在配置类里面加一个消息转换器// 创建一个消息转换器使用Jackson2JsonMessageConverter将消息转换为JSON格式 Bean(jacksonConverter) public Jackson2JsonMessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }消费者的RabbitListener注解里指定这个消息转换器RabbitListener(queues yyds,messageConverter jacksonConverter) public String test1(User user) { System.out.println(user); return 相应成功; }生产者可以这样直接发送对象消息Test void publisher() { Object res rabbitTemplate.convertSendAndReceive(amqp.direct, my-yyds, new User(1, 张三, 123456)); System.out.println(消费者的相应res); }死信队列如果队列中的数据迟迟没有消费者处理就会一直占用队列的空间。比如抢车票。如果用户下单后不付款这张票就会一直被这个用户占用直到超时后才可以被他人购买这时可以使用死信队列将用户超时未付款或者主动取消的订单进行处理以下类型的消息都会被判定为死信消息被拒绝basic.reject/basic.nack并且requeue false消息TTL过期队列达到最大长度那么如何构建这样的模式呢其实本质上就是一个死信交换机死信队列当正常队列中的消息被判定为死信时会被发送到对应的死信交换机然后通过交换机发送到死信队列死信队列也有对应的消费者去处理信息具体实现步骤如下在配置类中创建一个死信交换机和死信队列并进行绑定Bean(directDLExchange) public Exchange dlExchange() { // 创建一个死信交换机类型为direct名称为dlx.direct return ExchangeBuilder.directExchange(dlx.direct).build(); } Bean(yydsDLQueue) // 创建一个死信队列名称为dl-yyds public Queue dlQueue() { return QueueBuilder.nonDurable(dl-yyds) // 非持久化队列 .build(); } Bean(dlBinding) // 创建一个绑定将死信队列绑定到死信交换机上使用路由键dl-yyds public Binding dlBingding(Qualifier(directDLExchange) Exchange exchange,Qualifier(yydsDLQueue) Queue queue) { return BindingBuilder.bind(queue)// 绑定队列 .to(exchange)// 绑定交换机 .with(dl-yyds) // 绑定路由键routingKey .noargs(); } Bean(yydsQueue) public Queue queue() { return QueueBuilder.nonDurable(yyds) // 非持久化队列 .deadLetterExchange(dlx.direct) // 设置死信交换机为dlx.direct .deadLetterRoutingKey(dl-yyds) // 设置死信路由键为dl-yyds .build(); }创建一个死信队列监听器RabbitListener(queues dl-yyds,messageConverter jacksonConverter) public String test2(Message message) { System.out.println(new String(message.getBody())); return 相应成功; }另外Rabbit MQ支持将超过一定时间没被消费的消息自动删除这需要消息队列设定TTL值如果消息的存货时间超过了TIme To Live 值就会被自动删除然后消息进入死信队列。Bean(yydsQueue) public Queue queue() { return QueueBuilder.nonDurable(yyds) // 非持久化队列 .deadLetterExchange(dlx.direct) // 设置死信交换机为dlx.direct .deadLetterRoutingKey(dl-yyds) // 设置死信路由键为dl-yyds .ttl(5000) // 设置消息的过期时间为5000毫秒5秒超时自动删除 .build(); }当消息队列长度达到最大会把先进入队列的消息放进死信队列Bean(yydsQueue) public Queue queue() { return QueueBuilder.nonDurable(yyds) // 非持久化队列 .deadLetterExchange(dlx.direct) // 设置死信交换机为dlx.direct .deadLetterRoutingKey(dl-yyds) // 设置死信路由键为dl-yyds .ttl(5000) // 设置消息的过期时间为5000毫秒5秒 .maxLength(3) // 设置队列的最大长度为3条消息 .build(); }工作队列模式这种模式非常适合多个工人等待新的任务到来的场景。当我们把多个任务丢进消息队列而此时工人有多个可以将这些任务分配给各个工人!实现起来非常简单只需要创建两个监听器即可RabbitListener(queues yyds) public void test1(String data) { System.out.println(一号队列监听器data); } RabbitListener(queues yyds) public void test1_1(String data) { System.out.println(二号队列监听器data); }这些监听器会自动轮询获取队列中的消息。如果一开始队列中就有一部分消息这时再开始启用消费者还是轮询获取队列中的消息码不是如果一开就存在部分消息会被一个消费者一次性全部消耗因为我们没有对消费者的Prefetch Count预获取数量一次性获取消息的最大数量进行限制。如果想要轮询获取就需要将这个值设置为1即消费者一次只能拿一个消息而不是将所有消息全部获取。那如何对消费者的预获取数量进行设置呢需要再配置类中定义一个自定义的 在这里设置消费者Channel的Prefetch CountResource private CachingConnectionFactory cachingConnectionFactory; Bean(listenerContainer) // 创建一个RabbitListenerContainerFactory设置连接工厂和预取计数 public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(cachingConnectionFactory); factory.setPrefetchCount(1); // 设置预取计数为1表示每次只处理一条消息处理完后再处理下一条消息 return factory; }然后在监听器指定刚才设置的监听工厂RabbitListener(queues yyds,containerFactory listenerContainer) public void test1(String data) { System.out.println(一号队列监听器data); } RabbitListener(queues yyds,containerFactory listenerContainer) public void test1_1(String data) { System.out.println(二号队列监听器data); }除了去定义两个相同的监听器之外我们也可以在注解中定义来开启多个消费者比如现在开启10个相同的消费者RabbitListener(queues yyds,containerFactory listenerContainerFactory,concurrency 10) public void test1_1(String data) { System.out.println(二号队列监听器data); }发布订阅模式比如当购买的云服务器快到期时就会给手机、邮箱发送续费消息但是手机短信和邮件发送并不是同一个业务提供的但是希望能够都去执行这时就需要用到发布订阅模式简而言之就是发布一个消费多个。实现这种模式也非常简单这里需要用到另一种交换机fanout 扇出类型这是一种广播类型消息会被广播到所有与此交换机绑定的消息队列中。具体实现步骤如下在配置类中设置2个消息队列绑定到fanout交换机Bean(fanoutExchange) public Exchange exchange() { return ExchangeBuilder.fanoutExchange(amq.fanout).build(); } Bean(yydsQueue1) public Queue queue() { return QueueBuilder.nonDurable(yyds1) // 非持久化队列 .build(); } Bean(yydsQueue2) public Queue queue2() { return QueueBuilder.nonDurable(yyds2) // 非持久化队列 .build(); } Bean(binding) public Binding binding(Qualifier(fanoutExchange) Exchange exchange,Qualifier(yydsQueue1) Queue queue) { return BindingBuilder.bind(queue)// 绑定队列 .to(exchange)// 绑定交换机 .with(yyds1) // 绑定路由键routingKey .noargs(); } Bean(binding2) public Binding binding2(Qualifier(fanoutExchange) Exchange exchange,Qualifier(yydsQueue2) Queue queue) { return BindingBuilder.bind(queue)// 绑定队列 .to(exchange)// 绑定交换机 .with(yyds2) // 绑定路由键routingKey .noargs(); }设置监听器RabbitListener(queues yyds1) public void test1(String data) { System.out.println(一号队列监听器data); } RabbitListener(queues yyds2) public void test1_1(String data) { System.out.println(二号队列监听器data); }发送到队列中的消息这两个监听器都能获取到。这样就实现了发布订阅模式。路由模式路由模式就是在绑定时指定想要的routingKey只有生产者发送时指定了routingKey带能到达对应的队列。除了之前的一次绑定外同一个队列可以多次绑定到交换机并且使用不同的routingKey这样只要满足其中一个routingKey都可以将消息发送到此队列中。先在配置类中进行设置Bean(directExchange) public Exchange exchange() { return ExchangeBuilder.directExchange(amq.direct).build(); } Bean(yydsQueue) public Queue queue() { return QueueBuilder.nonDurable(yyds) // 非持久化队列 .build(); } Bean(binding) // routingKey是yyds1 public Binding binding(Qualifier(directExchange) Exchange exchange,Qualifier(yydsQueue) Queue queue) { return BindingBuilder.bind(queue)// 绑定队列 .to(exchange)// 绑定交换机 .with(yyds1) // 绑定路由键routingKey .noargs(); } Bean(binding2) // routingKey是yyds2 public Binding binding2(Qualifier(directExchange) Exchange exchange,Qualifier(yydsQueue) Queue queue) { return BindingBuilder.bind(queue)// 绑定队列 .to(exchange)// 绑定交换机 .with(yyds2) // 绑定路由键routingKey .noargs(); }监听器正常监听即可RabbitListener(queues yyds) public void test1(String data) { System.out.println(一号队列监听器data); }主题模式实际上就是一种模糊匹配的模式将routingKey以模糊匹配的方式去进行转发。使用* 或# 来表示*表示任意一个单词,比如*.test可以匹配 a.test#表示0或多个单词,比如#.test可以匹配test、aaa.test具体使用如下现在配置类中进行设置Bean(topicExchange) // 使用topic类型交换机 public Exchange exchange() { return ExchangeBuilder.topicExchange(amq.topic).build(); } Bean(yydsQueue) public Queue queue() { return QueueBuilder.nonDurable(yyds) // 非持久化队列 .build(); } Bean(binding) public Binding binding(Qualifier(topicExchange) Exchange exchange,Qualifier(yydsQueue) Queue queue) { return BindingBuilder.bind(queue)// 绑定队列 .to(exchange)// 绑定交换机 .with(*.test.*) // 绑定路由键routingKey .noargs(); }除了使用默认的主题交换机amqp.topic之外还有一个叫做amq.rabbitmq.trace交换机这个交换机也是topic类型那这个交换机是做什么的呢它是帮助我们记录和追踪生产者和消费者使用消息队列的它是一个内部交换机记录消息进入哪个队列被哪个消费者消费了具体的使用方法如下先在控制台将虚拟主机/test 的追踪功能开启sudo rabbitmqctl trace_on -p /test开启后将队列trace绑定到上面的交换机上publish.# 表示记录所有生产者无论绑定的是哪个交换机发布的消息deliver.# 表示记录所有消费者无论绑定的是哪个队列消费的消息由于发送到此交换机上的routingKey为publish.交换机名称和deliver.队列名称分别对应生产者投递到交换机的消息和消费者从队列上获取的消息因此这里使用通配符进行绑定。此时生产者给某个队列发送消息后trace队列会记录这些消息此时trace队列中的消息如下

更多文章