2026/1/14 17:55:16
网站建设
项目流程
同ip多域名做网站,百度游戏排行榜,看颜色应该搜索哪些词汇,网站推广咋做的目录 参考美团技术团队博客
项目中实现思路 参考美团技术团队博客 两种通用的解决方案#xff1a; 1. 版本号。 2. 状态机。 版本号 举个简单的例子#xff0c;一个产品的状态有上线/下线状态。如果消息1是下线#xff0c;消息2是上线。不巧消息1判重失败#xff0c;被投递…目录参考美团技术团队博客项目中实现思路参考美团技术团队博客两种通用的解决方案 1. 版本号。 2. 状态机。版本号举个简单的例子一个产品的状态有上线/下线状态。如果消息1是下线消息2是上线。不巧消息1判重失败被投递了两次且第二次发生在2之后如果不做重复性判断显然最终状态是错误的。 但是如果每个消息自带一个版本号。上游发送的时候标记消息1版本号是1消息2版本号是2。如果再发送下线消息则版本号标记为3。下游对于每次消息的处理同时维护一个版本号。 每次只接受比当前版本号大的消息。初始版本为0当消息1到达时将版本号更新为1。消息2到来时因为版本号1.可以接收同时更新版本号为2.当另一条下线消息到来时如果版本号是3.则是真实的下线消息。如果是1则是重复投递的消息。 如果业务方只关心消息重复不重复那么问题就已经解决了。但很多时候另一个头疼的问题来了就是消息顺序如果和想象的顺序不一致。比如应该的顺序是12到来的顺序是21。则最后会发生状态错误。 参考TCP/IP协议如果想让乱序的消息最后能够正确的被组织那么就应该只接收比当前版本号大一的消息。并且在一个session周期内要一直保存各个消息的版本号。 如果到来的顺序是21则先把2存起来待1到来后先处理1再处理2这样重复性和顺序性要求就都达到了。。。。基于版本号来处理重复和顺序消息听起来是个不错的主意但凡事总有瑕疵。使用版本号的最大问题是对发送方必须要求消息带业务版本号。下游必须存储消息的版本号对于要严格保证顺序的。https://tech.meituan.com/2016/07/01/mq-design.htmlhttps://tech.meituan.com/2016/07/01/mq-design.html其中参考主要思想参考TCP/IP协议如果想让乱序的消息最后能够正确的被组织那么就应该只接收比当前版本号大一的消息。并且在一个session周期内要一直保存各个消息的版本号。 如果到来的顺序是21则先把2存起来待1到来后先处理1再处理2这样重复性和顺序性要求就都达到了。项目中实现思路消息的顺序性保障不靠中间件去实施在消费者这一侧来实现消息的顺序性。一个broker的topicxxxx 是专门用来发顺序消息的消费者这边收到消息之后全部落库不处理落库成功之后返回ack。利用mysql存消息数据库这边business_key version有唯一索引。重复消息被唯一索引幂等掉。假设发的时 2 1 3 1 4数据库真实存储的是2 1 3 4。消费者这边处理的逻辑伪代码while(ture) { // 查询所有未处理的消息按bussiness_key 和 version 升序排列 ListMessage messages DB.query(SELECT * FROM messages WHERE processed FALSE ORDER BY business_key, version ASC) for(Message msg : messages) { String businessKey msg.getBusinessKey(); int version msg.getVersion(); // 检查是否有上一版本号未处理 boolean preVersionProcessed DB.esists( SELECT 1 FROM messages WHERE business_key ? AND version ? AND processed TRUE, businessKey, version - 1 ); // 如果是version1,直接处理;或者上一个版本已经处理了也可以处理当前版本 if (version 1 || preVersionProcessed) { processMessage(msg); // 标记消息已处理 DB.update(UPDATE message SET processed TRUE WHERE id ?, msg.getId()); } else { continue; } } sleep(5000); }优化点避免重复读取可以使用分布式锁或者加行级锁FOR UPDATE。加上事务机制。ListMessage messages DB.query(SELECT * FROM messages WHERE processed FALSE ORDER BY business_key, version ASC)