2026/1/14 11:51:04
网站建设
项目流程
聊城网站推广公司,成都网站建设开,建筑网挂兼职,创建网页费用RabbitMQ快速入门Spring AMQP快速入门一个队列绑定多个消费者交换机作用Direct 交换机Topic 交换机代码声明队列和交换机消息转换器消息的可靠性生产者的可靠性生产者重连生产者确认(尽量不要用)MQ 的可靠性数据持久化Lazy Queue(惰性队列)消费者可靠性消费者确认快速入门
创建…RabbitMQ快速入门Spring AMQP快速入门一个队列绑定多个消费者交换机作用Direct 交换机Topic 交换机代码声明队列和交换机消息转换器消息的可靠性生产者的可靠性生产者重连生产者确认(尽量不要用)MQ 的可靠性数据持久化Lazy Queue(惰性队列)消费者可靠性消费者确认快速入门创建用户(每个项目创建一个用户)创建虚拟主机创建队列发送消息Spring AMQP快速入门# 配置类spring:rabbitmq:host:192.168.16.128port:5672username:lwypassword:123456virtual-host:/lwy!-- 导入依赖 --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency// 发送消息StringqueueNamemqtest1;// 队列名字Stringmessagehello world;rabbitTemplate.convertAndSend(queueName,message);// 接收消息并处理RabbitListener(queuesmqtest1)// 可对应多个队列publicvoidlisten(Stringmessage){System.out.println(mqtest1 收到消息message);}一个队列绑定多个消费者默认情况下RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息可能出现消息堆积(当消费者1 一秒处理 50 条消息消费者 2 一秒处理 5 条消息有 50 条消息的时候 mq 仍然会让消费者 1 处理 25 条消费者 2 处理 25 条就会导致消费者 1 处理完了不工作了消费者 2 还在工作)因此我们需要修改application.yml设置preFetch值为1确保同一时刻最多投递给消费者1条消息spring:rabbitmq:host:192.168.16.128port:5672username:lwypassword:123456virtual-host:/lwylistener:simple:prefetch:1交换机作用接收 publisher 发送的消息将消息按照规则路由到与之绑定的队列Fanout 交换机FanoutExchange会将接收到的消息广播到每一个跟其绑定的queue所以也叫广播模式// 发送消息到交换机StringexchangeNamelwy.fanout;Stringmessagehello world;rabbitTemplate.convertAndSend(exchangeName,,message);Direct 交换机DirectExchange会将接收到的消息根据规则路由到指定的Queue因此称为定向路由每一个Queue都与Exchange设置一个BindingKey发布者发送消息时指定消息的RoutingkeyExchange将消息路由到BindingKey与消息RoutingKey一致的队列StringexchangeNamelwy.direct;Stringmessageblue;// 第二个参数是路由关键字rabbitTemplate.convertAndSend(exchangeName,yellow,message);Topic 交换机TopicExchange与DirectExchange类似区别在于routingKey可以是多个单词的列表并且以“ . ”分割Queue与Exchange指定BindingKey时可以使用通配符#代指0个或多个单词。例china.#可以匹配 china.news、china.weather*代指一个单词。china.*可以匹配 china.news 不可匹配 china.news.sports代码声明队列和交换机// 如果没有指定的队列和交换机会自动创建RabbitListener(bindingsQueueBinding(valueQueue(namedirect1),exchangeExchange(namelwy.direct,typeExchangeTypes.DIRECT),key{red,blue}// 集合))publicvoiddirect1(Stringmessage)throwsInterruptedException{System.out.println(direct1 收到消息message);}消息转换器rabbitMQ 本身只接受和传输字节流传输对象的时候会乱码通常需要消息转换器自动序列化和反序列化dependency groupIdcom.fasterxml.jackson.core/groupId artifactIdjackson-databind/artifactId /dependencyBeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}消息的可靠性生产者的可靠性生产者重连有的时候由于网络波动可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制spring:rabbitmq:connection-timeout:1s# 连接超时时间template:retry:enabled:true# 开启超时重试initial-interval:1000ms# 初始重试间隔时间multiplier:1# 重试间隔时间倍数max-attempts:3# 最大重试次数注意当网络不稳定的时候利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试也就是说多次重试等待的过程中当前线程是被阻塞的会影响业务性能。如果对于业务性能有要求建议禁用重试机制。如果一定要使用请合理配置等待时长和重试次数当然也可以考虑使用异步线程来执行发送消息的代码。生产者确认(尽量不要用)RabbitMQ 有 PublisherConfirm和PublisherReturn两种确认机制。开启确认机制认后在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况消息投递到了MQ但是路由失败。此时会通过PublisherReturn返回路由异常原因然后返回ACK告知投递成功(路由失败一般是代码有问题)临时消息投递到了MQ并且入队成功返回ACK告知投递成功持久消息投递到了MQ并且入队完成持久化返回ACK告知投递成功其它情况都会返回NACK告知投递失败MQ 的可靠性数据持久化非持久的消息会存储到内存中如果 mq 重启消息就会消失持久化的会存到磁盘中注意如果开启了持久化和生产者确认只有在消息 持久化完成后才会给生产者返回 ACK 回执Lazy Queue(惰性队列)特性接收到消息后直接存入磁盘而非内存内存中只保留最近的消息默认2048条消费者要消费消息时才会从磁盘中读取并加载到内存支持数百万条的消息存储RabbitListener(queuesToDeclareQueue(namelazy.queue,durabletrue,// 持久化队列argumentsArgument(namex-queue-mode,valuelazy)// lazy队列))publicvoidlazy(Stringmessage){System.out.println(lazy.queue 收到消息message);}消费者可靠性消费者确认当消费者处理消息结束后应该向RabbitMQ发送一个回执告知RabbitMQ自己消息处理状态。回执有三种可选值ack成功处理消息RabbitMQ从队列中删除该消息nack消息处理失败RabbitMQ需要再次投递消息reject消息处理失败并拒绝该消息RabbitMQ从队列中删除该消息可通过配置文件选择 ACK 处理方式none不处理。即消息投递给消费者后立刻ack消息会立刻从MQ删除。非常不安全不建议使用manual手动模式。需要自己在业务代码中调用api发送ack或reject存在业务入侵但更灵活auto自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强当业务正常执行时则自动返回ack当业务出现异常时如果是业务异常会自动返回nack如果是消息处理或校验异常自动返回 rejectspring:rabbitmq:listener:simple:prefetch:1acknowledge-mode:none