2026/1/10 9:30:15
网站建设
项目流程
岳阳网站开发商城,太仓违章建设举报网站,今天西安最新通知,温州高端企业网站建设一、前置认知#xff1a;为什么选 RocketMQ#xff1f;#xff08;新手必懂#xff09;
RocketMQ 是阿里开源的分布式消息队列#xff0c;核心优势是高吞吐、高可用、易集成#xff0c;能解决项目中的“解耦、削峰、异步通信”问题#xff08;比如订单下单后#xff0…一、前置认知为什么选 RocketMQ新手必懂RocketMQ 是阿里开源的分布式消息队列核心优势是高吞吐、高可用、易集成能解决项目中的“解耦、削峰、异步通信”问题比如订单下单后异步通知库存、支付、物流系统避免一个模块故障影响全流程。和 Kafka 比它的 Java 生态更友好、部署维护更简单和 RabbitMQ 比吞吐能力更强适合高并发场景。新手不用纠结底层先掌握“怎么用”再深入原理。还有一个点就是RocketMQ属于信创二、第一步搭建 RocketMQ 环境单机版新手首选环境是基础单机版足够满足开发测试需求步骤如下1. 准备环境操作系统Windows/Linux/Mac推荐 Linux/Mac避免Windows端口冲突JDK1.8 及以上必须RocketMQ 基于 Java 开发Maven3.6项目依赖管理2. 下载与安装官网下载安装包RocketMQ 官网选择稳定版比如 4.9.7兼容性最好解压到指定目录以 Linux 为例unziprocketmq-all-4.9.7-bin-release.zip -d /usr/local/rocketmqcd/usr/local/rocketmq3. 启动 RocketMQ 核心服务RocketMQ 依赖两个核心服务NameServer路由中心和 Broker消息存储/转发中心启动顺序先启动 NameServer再启动 Broker。1启动 NameServer# 后台启动日志输出到 logs/namesrv.lognohupshbin/mqnamesrvlogs/namesrv.log21验证启动成功jps命令查看是否有NamesrvStartup进程或查看日志tail -f logs/namesrv.log出现“The Name Server boot success”即为成功。2启动 Broker先修改配置避免内存不足新手必改编辑bin/runbroker.sh和bin/runserver.sh将 JVM 内存配置改小默认配置太大单机可能扛不住# 编辑 runserver.sh修改如下行示例JAVA_OPT${JAVA_OPT}-server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize128m -XX:MaxMetaspaceSize320m# 编辑 runbroker.sh修改如下行示例JAVA_OPT${JAVA_OPT}-server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize128m -XX:MaxMetaspaceSize320m然后启动 Broker指定 NameServer 地址# 后台启动指定 NameServer 为本地 9876 端口日志输出到 logs/broker.lognohupshbin/mqbroker -n localhost:9876logs/broker.log21验证启动成功jps查看是否有BrokerStartup进程或日志tail -f logs/broker.log出现“The broker[localhost, 172.17.0.2:10911] boot success”即为成功。4. 关闭服务可选后续需要关闭时执行# 关闭 Brokershbin/mqshutdown broker# 关闭 NameServershbin/mqshutdown namesrv三、第二步项目集成 RocketMQSpringBoot 为例最常用SpringBoot 有官方整合 starter步骤简单新手直接抄就行。1. 引入依赖pom.xml!-- RocketMQ SpringBoot 整合依赖 --dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.2.3/version!-- 对应 RocketMQ 4.9.x 版本版本要匹配 --/dependency!-- lombok 可选简化代码 --dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependency2. 配置 application.yml在项目resources/application.yml中添加 RocketMQ 配置spring:application:name:rocketmq-demo# 项目名称rocketmq:name-server:localhost:9876# NameServer 地址和上面启动的一致producer:group:demo-producer-group# 生产者分组自定义用于标识生产者集群send-message-timeout:3000# 发送超时时间毫秒retry-times-when-send-failed:2# 发送失败重试次数consumer:group:demo-consumer-group# 消费者分组自定义集群消费时同组共享消费进度pull-batch-size:10# 批量拉取消息数量四、第三步核心功能实战生产消费新手必会重点实现“发送普通消息”和“消费普通消息”这是项目中最常用的场景代码直接可运行。1. 定义消息实体可选规范消息格式importlombok.Data;DatapublicclassDemoMessage{privateStringid;// 消息IDprivateStringcontent;// 消息内容privateLongtimestamp;// 发送时间戳}2. 实现生产者发送消息创建生产者类通过RocketMQTemplate发送消息SpringBoot 自动注入无需手动创建importorg.apache.rocketmq.spring.core.RocketMQTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;ComponentpublicclassDemoProducer{AutowiredprivateRocketMQTemplaterocketMQTemplate;// 发送普通消息同步发送等待 Broker 响应publicvoidsendSyncMessage(DemoMessagemessage){// 参数1topicName主题名需提前创建或自动创建 : tag标签用于过滤// 参数2消息内容可传对象、字符串等rocketMQTemplate.syncSend(demo-topic:tag1,message);System.out.println(同步消息发送成功message);}// 发送异步消息不等待响应回调通知结果publicvoidsendAsyncMessage(DemoMessagemessage){rocketMQTemplate.asyncSend(demo-topic:tag1,message,result-{if(result.isSuccess()){System.out.println(异步消息发送成功message);}else{System.out.println(异步消息发送失败result.getCause());}});}}3. 实现消费者接收消息创建消费者类通过RocketMQMessageListener注解监听指定主题和标签importorg.apache.rocketmq.spring.annotation.RocketMQMessageListener;importorg.apache.rocketmq.spring.core.RocketMQListener;importorg.springframework.stereotype.Component;// topic监听的主题名consumerGroup消费者分组selectorExpression标签过滤*表示所有标签RocketMQMessageListener(topicdemo-topic,consumerGroup${rocketmq.consumer.group},selectorExpressiontag1)ComponentpublicclassDemoConsumerimplementsRocketMQListenerDemoMessage{// 消息接收后的处理逻辑消息自动ACK无需手动确认OverridepublicvoidonMessage(DemoMessagemessage){System.out.println(收到消息message);// 这里可以写业务逻辑比如更新数据库、调用接口等}}4. 测试运行项目发送消息创建测试类或通过 Controller 触发消息发送importorg.springframework.boot.CommandLineRunner;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;importorg.springframework.beans.factory.annotation.Autowired;SpringBootApplicationpublicclassRocketmqDemoApplicationimplementsCommandLineRunner{AutowiredprivateDemoProducerdemoProducer;publicstaticvoidmain(String[]args){SpringApplication.run(RocketmqDemoApplication.class,args);}// 项目启动后自动执行Overridepublicvoidrun(String...args)throwsException{// 构造消息DemoMessagemessagenewDemoMessage();message.setId(MSG001);message.setContent(这是 RocketMQ 新手入门的第一条消息);message.setTimestamp(System.currentTimeMillis());// 发送同步消息demoProducer.sendSyncMessage(message);// 发送异步消息可选// demoProducer.sendAsyncMessage(message);}}5. 查看运行结果启动 SpringBoot 项目控制台会输出同步消息发送成功DemoMessage(idMSG001, content这是 RocketMQ 新手入门的第一条消息, timestamp1699999999999) 收到消息DemoMessage(idMSG001, content这是 RocketMQ 新手入门的第一条消息, timestamp1699999999999)说明生产者成功发送消息到 Broker消费者成功监听并消费消息整个流程跑通五、新手避坑指南关键注意点版本匹配rocketmq-spring-boot-starter版本要和 RocketMQ 服务端版本对应比如 2.2.x 对应 4.9.x3.x 对应 5.x否则会出现兼容性问题。端口占用NameServer 默认端口 9876Broker 默认端口 10911若端口被占用需修改配置文件conf/broker.conf。主题创建代码中可以自动创建主题但生产环境建议提前通过命令创建sh bin/mqadmin updateTopic -n localhost:9876 -t demo-topic -c DefaultCluster避免权限问题。消息格式发送对象时RocketMQ 会自动序列化默认 JSON消费者需保证消息实体类结构一致否则反序列化失败。消费组规范同一主题的消费者若属于同一消费组则消息会负载均衡每个消息只被一个消费者消费不同消费组则各自消费全量消息。六、下一步学习方向新手进阶消息类型扩展延迟消息定时任务、顺序消息订单流程、事务消息分布式事务。消费模式广播消费同一消费组所有消费者都收到消息、集群消费默认负载均衡。消息可靠性如何避免消息丢失、重复消费比如基于业务ID去重。监控工具部署 RocketMQ-Console可视化查看主题、消息、消费进度。