2026/1/2 4:36:40
网站建设
项目流程
网站下载视频方法,电商旅游网站策划书,网站建设中广告图片尺寸,网站建设咨询电话分布式事务实践#xff1a;从问题到 Seata 解决方案
写在前面
去年我在做一个电商系统重构#xff0c;把原来的单体应用拆成了订单服务、库存服务、支付服务、积分服务。拆分完成后#xff0c;遇到了一个头疼的问题#xff1a;用户下单时#xff0c;需要同时扣库存、创建…分布式事务实践从问题到 Seata 解决方案写在前面去年我在做一个电商系统重构把原来的单体应用拆成了订单服务、库存服务、支付服务、积分服务。拆分完成后遇到了一个头疼的问题用户下单时需要同时扣库存、创建订单、扣积分如果其中一步失败了怎么办这就是分布式事务要解决的问题。这篇文章我会结合 Seata 这个框架深入聊聊我们在实践中是怎么处理这类问题的包括原理分析、实战踩坑、性能优化、生产实践等方方面面。问题从哪来一个典型的场景假设你在做一个电商系统用户下单的流程是这样的用户下单 ↓ 1. 库存服务扣减库存 ↓ 2. 订单服务创建订单 ↓ 3. 积分服务扣减积分看起来很简单但如果第 2 步创建订单成功了第 3 步扣积分失败了这时候订单已经存在了但积分没扣。用户可能会觉得自己占了便宜但对我们来说就是数据不一致。在单体应用时代这个问题不存在。因为所有的操作都在同一个数据库里可以用数据库的本地事务保证TransactionalpublicvoidcreateOrder(OrderRequestrequest){// 扣库存inventoryService.reduceStock(request.getProductId());// 创建订单orderService.create(request);// 扣积分pointService.deduct(request.getUserId());}只要加了Transactional这三步要么全部成功要么全部失败回滚。但在微服务架构下这三个服务各自有独立的数据库。库存服务操作库存库订单服务操作订单库积分服务操作积分库。这时候本地事务就无能为力了因为事务的范围只限于单个数据库。这就是为什么需要分布式事务。常见的解决方案两阶段提交2PC这是最经典的分布式事务协议。它的核心思想是引入一个协调者Coordinator协调者负责询问所有参与者Participant是否准备好提交如果所有人都说准备好了协调者再通知大家提交。详细流程阶段一准备阶段Prepare Phase协调者 参与者1 参与者2 | | | |----- prepare request -----------| | | |---- 执行事务但不提交 --------| | |---- 锁定资源写日志 --------| | | | |---- YES (ready to commit) ------| | | | | | prepare request | |--------------------------------------------------| | | | | | |---- 执行事务 | | |---- 锁定资源 | | | | |------------------------------------------------- YES |阶段二提交阶段Commit Phase如果所有参与者都回复 YES协调者 参与者1 参与者2 | | | |----- commit request ------------| | | |---- 提交事务释放锁 --------| | | | |---- ACK (committed) ------------| | | | | | commit request | |--------------------------------------------------| | | |---- 提交事务 | | |---- 释放锁 | | | | |------------------------------------------------- ACK |如果有任何一个参与者回复 NO协调者 参与者1 参与者2 | | | |----- rollback request ----------| | | |---- 回滚事务释放锁 --------| | | | |---- ACK (rolled back) ----------| |2PC 的问题同步阻塞在阶段一之后参与者的资源会被锁定如果协调者挂了所有参与者都会一直等待直到协调者恢复。这在高并发场景下是不可接受的。单点故障协调者是单点如果协调者挂了事务就无法完成。数据不一致的可能性如果协调者在发送 commit 请求后挂了部分参与者可能已经提交部分还没有导致数据不一致。性能问题需要两轮网络通信延迟较高。三阶段提交3PC的改进3PC 在 2PC 的基础上增加了超时机制和预提交阶段阶段一CanCommit- 协调者询问参与者是否可以提交参与者不锁定资源阶段二PreCommit- 协调者发送预提交请求参与者锁定资源执行事务阶段三DoCommit- 协调者发送提交请求参与者提交或回滚3PC 通过超时机制减少了阻塞时间但仍然无法彻底解决数据一致性问题而且实现更复杂实际应用较少。TCC 模式TCC 是 Try-Confirm-Cancel 的缩写。它要求业务代码实现三个方法Try尝试执行预留资源。比如扣库存不是直接扣而是先冻结。Confirm确认执行提交操作。如果 Try 成功Confirm 就真正扣减库存。Cancel取消执行释放资源。如果 Try 失败或者需要回滚Cancel 就释放冻结的库存。TCC 的执行流程TM (事务管理器) | |--- 开启全局事务 | |--- 调用 Try 阶段 | | | |--- 服务A.try() - 预留资源A | |--- 服务B.try() - 预留资源B | |--- 服务C.try() - 预留资源C | |--- 判断结果 | | | |--- 全部成功 - 调用 Confirm | | |--- 服务A.confirm() - 真正扣减资源A | | |--- 服务B.confirm() - 真正扣减资源B | | |--- 服务C.confirm() - 真正扣减资源C | | | |--- 任何失败 - 调用 Cancel | |--- 服务C.cancel() - 释放资源C | |--- 服务B.cancel() - 释放资源B | |--- 服务A.cancel() - 释放资源ATCC 的设计原则TCC 模式虽然灵活但实现起来有严格的要求幂等性Confirm 和 Cancel 必须保证幂等因为网络重试可能导致重复调用空回滚如果 Try 根本没执行比如网络超时Cancel 也要能正常执行防悬挂如果 Cancel 比 Try 先执行Try 执行时要判断是否已经被 Cancel举个例子扣库存的 TCC 实现需要注意这些问题ServicepublicclassInventoryTccServiceImplimplementsInventoryTccService{AutowiredprivateInventoryMapperinventoryMapper;AutowiredprivateTccActionMappertccActionMapper;// 记录 TCC 操作状态OverrideTransactionalpublicbooleantryReduceStock(LongproductId,Integerquantity){// 1. 检查是否已经被 Cancel防悬挂TccActionactiontccActionMapper.selectByXidAndAction(RootContext.getXID(),reduceStock,productId);if(action!nullaction.getStatus()TccActionStatus.CANCELLED){// 已经被 Cancel直接返回 falsereturnfalse;}// 2. 冻结库存InventoryinventoryinventoryMapper.selectByProductId(productId);if(inventory.getStock()quantity){thrownewRuntimeException(库存不足);}inventory.setFrozenStock(inventory.getFrozenStock()quantity);inventoryMapper.updateById(inventory);// 3. 记录 TCC 操作TccActiontccActionnewTccAction();tccAction.setXid(RootContext.getXID());tccAction.setAction(reduceStock);tccAction.setProductId(productId);tccAction.setQuantity(quantity);tccAction.setStatus(TccActionStatus.TRYING);tccActionMapper.insert(tccAction);returntrue;}OverrideTransactionalpublicbooleanconfirm(BusinessActionContextcontext){LongproductId(Long)context.getActionContext(productId);Integerquantity(Integer)context.getActionContext(quantity);Stringxidcontext.getXid();// 幂等性检查TccActionactiontccActionMapper.selectByXidAndAction(xid,reduceStock,productId);if(actionnull||action.getStatus()TccActionStatus.CONFIRMED){// 已经确认过了直接返回幂等returntrue;}// 真正扣减库存InventoryinventoryinventoryMapper.selectByProductId(productId);inventory.setStock(inventory.getStock()-quantity);inventory.setFrozenStock(inventory.getFrozenStock()-quantity);inventoryMapper.updateById(inventory);// 更新状态action.setStatus(TccActionStatus.CONFIRMED);tccActionMapper.updateById(action);returntrue;}OverrideTransactionalpublicbooleancancel(BusinessActionContextcontext){LongproductId(Long)context.getActionContext(productId);Integerquantity(Integer)context.getActionContext(quantity);Stringxidcontext.getXid();// 空回滚检查TccActionactiontccActionMapper.selectByXidAndAction(xid,reduceStock,productId);if(actionnull){// Try 没执行记录空回滚标记actionnewTccAction();action.setXid(xid);action.setAction(reduceStock);action.setProductId(productId);action.setQuantity(quantity);action.setStatus(TccActionStatus.CANCELLED);tccActionMapper.insert(action);returntrue;// 空回滚直接返回}// 幂等性检查if(action.getStatus()TccActionStatus.CANCELLED){returntrue;// 已经回滚过了}// 释放冻结的库存InventoryinventoryinventoryMapper.selectByProductId(productId);inventory.setFrozenStock(inventory.getFrozenStock()-quantity);inventoryMapper.updateById(inventory);// 更新状态action.setStatus(TccActionStatus.CANCELLED);tccActionMapper.updateById(action);returntrue;}}这样做的好处是业务可控性强不需要数据库支持性能也比较好。但缺点也很明显需要改造大量的业务代码开发成本高而且容易出错。本地消息表Transactional Outbox这个方案的思路是既然跨服务的事务不好搞那就用消息队列来保证最终一致性。基本流程订单服务 | |--- 开启本地事务 | | | |--- 1. 创建订单写入 order 表 | |--- 2. 插入消息写入 message 表 | | |--- 提交事务保证订单和消息的原子性 | |--- 定时任务扫描 message 表 | | | |--- 读取未发送消息 | |--- 发送到消息队列 | |--- 更新消息状态为已发送 | 消息队列 | |--- 库存服务消费消息 - 扣库存 |--- 积分服务消费消息 - 扣积分完整的实现示例首先创建消息表CREATETABLEoutbox_message(idBIGINTPRIMARYKEYAUTO_INCREMENT,aggregate_idVARCHAR(100)NOTNULLCOMMENT聚合ID如订单ID,event_typeVARCHAR(50)NOTNULLCOMMENT事件类型,payloadTEXTNOTNULLCOMMENT消息内容JSON,statusTINYINTNOTNULLDEFAULT0COMMENT0-待发送,1-已发送,2-发送失败,retry_countINTNOTNULLDEFAULT0COMMENT重试次数,created_atDATETIMENOTNULLDEFAULTCURRENT_TIMESTAMP,updated_atDATETIMENOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,INDEXidx_status(status,created_at))ENGINEInnoDBDEFAULTCHARSETutf8mb4;消息实体和 MapperDataTableName(outbox_message)publicclassOutboxMessage{privateLongid;privateStringaggregateId;privateStringeventType;privateStringpayload;privateIntegerstatus;privateIntegerretryCount;privateLocalDateTimecreatedAt;privateLocalDateTimeupdatedAt;}MapperpublicinterfaceOutboxMessageMapperextendsBaseMapperOutboxMessage{}订单服务代码ServiceSlf4jpublicclassOrderServiceImplimplementsOrderService{AutowiredprivateOrderMapperorderMapper;AutowiredprivateOutboxMessageMapperoutboxMessageMapper;AutowiredprivateRabbitTemplaterabbitTemplate;Transactional(rollbackForException.class)OverridepublicvoidcreateOrder(OrderRequestrequest){// 1. 创建订单OrderordernewOrder();order.setUserId(request.getUserId());order.setProductId(request.getProductId());order.setQuantity(request.getQuantity());order.setStatus(OrderStatus.CREATED);orderMapper.insert(order);// 2. 在同一事务中插入消息OutboxMessageinventoryMessagenewOutboxMessage();inventoryMessage.setAggregateId(order.getId().toString());inventoryMessage.setEventType(INVENTORY_REDUCE);inventoryMessage.setPayload(JSON.toJSONString(newInventoryReduceEvent(order.getProductId(),order.getQuantity())));inventoryMessage.setStatus(0);outboxMessageMapper.insert(inventoryMessage);OutboxMessagepointMessagenewOutboxMessage();pointMessage.setAggregateId(order.getId().toString());pointMessage.setEventType(POINT_DEDUCT);pointMessage.setPayload(JSON.toJSONString(newPointDeductEvent(order.getUserId(),order.getAmount())));pointMessage.setStatus(0);outboxMessageMapper.insert(pointMessage);// 事务提交后订单和消息都已持久化}}定时任务扫描并发送消息ComponentSlf4jpublicclassOutboxMessageSender{AutowiredprivateOutboxMessageMapperoutboxMessageMapper;AutowiredprivateRabbitTemplaterabbitTemplate;privatestaticfinalintMAX_RETRY3;privatestaticfinalintBATCH_SIZE100;Scheduled(fixedDelay1000)// 每秒执行一次publicvoidsendPendingMessages(){// 查询待发送的消息ListOutboxMessagemessagesoutboxMessageMapper.selectList(newLambdaQueryWrapperOutboxMessage().eq(OutboxMessage::getStatus,0).lt(OutboxMessage::getRetryCount,MAX_RETRY).orderByAsc(OutboxMessage::getCreatedAt).last(LIMIT BATCH_SIZE));for(OutboxMessagemessage:messages){try{// 发送到消息队列rabbitTemplate.convertAndSend(getExchange(message.getEventType()),getRoutingKey(message.getEventType()),message.getPayload());// 更新状态为已发送message.setStatus(1);outboxMessageMapper.updateById(message);log.info(消息发送成功: id{}, eventType{},message.getId(),message.getEventType());}catch(Exceptione){log.error(消息发送失败: id{}, eventType{},message.getId(),message.getEventType(),e);// 增加重试次数message.setRetryCount(message.getRetryCount()1);if(message.getRetryCount()MAX_RETRY){message.setStatus(2);// 标记为发送失败}outboxMessageMapper.updateById(message);}}}privateStringgetExchange(StringeventType){// 根据事件类型返回对应的 Exchangeswitch(eventType){caseINVENTORY_REDUCE:returninventory.exchange;casePOINT_DEDUCT:returnpoint.exchange;default:returndefault.exchange;}}privateStringgetRoutingKey(StringeventType){// 根据事件类型返回对应的 RoutingKeyreturneventType.toLowerCase().replace(_,.);}}库存服务消费消息ComponentSlf4jpublicclassInventoryEventConsumer{AutowiredprivateInventoryServiceinventoryService;RabbitListener(queuesinventory.reduce.queue)publicvoidhandleInventoryReduce(Stringpayload){try{InventoryReduceEventeventJSON.parseObject(payload,InventoryReduceEvent.class);// 幂等性检查检查订单是否已经处理过if(isProcessed(event.getOrderId())){log.warn(订单已处理跳过: orderId{},event.getOrderId());return;}// 扣减库存inventoryService.reduceStock(event.getProductId(),event.getQuantity());// 记录处理结果用于幂等性检查markAsProcessed(event.getOrderId());log.info(库存扣减成功: productId{}, quantity{},event.getProductId(),event.getQuantity());}catch(Exceptione){log.error(库存扣减失败,e);thrownewAmqpRejectAndDontRequeueException(处理失败,e);}}}本地消息表的优缺点优点实现相对简单保证了消息的可靠投递消息和业务数据在同一事务中可以通过重试机制保证最终一致性缺点只能保证最终一致性不能保证强一致性需要额外的定时任务和消息表增加系统复杂度消息发送有延迟取决于定时任务的执行频率需要处理幂等性问题改进方案CDC Outbox为了减少延迟可以使用 CDCChange Data Capture技术通过监听数据库的 binlog 来实时捕获消息而不需要定时扫描。比如使用 Canal 或者 Debezium。但这个方案有个问题是最终一致不是强一致。从订单创建到库存扣减完成中间可能有延迟。对于实时性要求高的场景可能不太合适。Seata 是什么SeataSimple Extensible Autonomous Transaction Architecture是阿里巴巴开源的分布式事务解决方案。它支持多种事务模式其中 AT 模式对业务代码侵入最小也是最常用的。Seata 的架构Seata 有三个核心角色TCTransaction Coordinator事务协调器独立部署的服务负责维护全局事务的状态协调分支事务的提交或回滚TMTransaction Manager事务管理器通常就是我们的业务代码负责开启、提交或回滚全局事务RMResource Manager资源管理器管理分支事务负责和 TC 通信报告分支事务的状态并执行 TC 的指令提交或回滚┌─────────────┐ │ TC │ (事务协调器) │ (Server) │ └──────┬──────┘ │ ┌──────────────┼──────────────┐ │ │ │ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ │ TM │ │ RM │ │ RM │ │(订单服务)│ │(库存服务)│ │(积分服务)│ └─────────┘ └─────────┘ └─────────┘AT 模式的详细工作原理1. 全局事务的开启当我们在方法上加GlobalTransactional时Seata 的拦截器会// Seata 拦截器的简化实现publicclassGlobalTransactionalInterceptorimplementsMethodInterceptor{OverridepublicObjectinvoke(MethodInvocationinvocation)throwsThrowable{// 1. 开启全局事务GlobalTransactiontxGlobalTransactionContext.getCurrentOrCreate();tx.begin(timeout,name);// 2. 将 XID 绑定到当前线程Stringxidtx.getXid();RootContext.bind(xid);try{// 3. 执行业务方法Objectresultinvocation.proceed();// 4. 提交全局事务tx.commit();returnresult;}catch(Throwablee){// 5. 回滚全局事务tx.rollback();throwe;}finally{// 6. 清理上下文RootContext.unbind();}}}全局事务开启后会生成一个全局唯一的 XIDTransaction ID格式类似192.168.1.100:8091:20231201123456782. XID 的传递机制XID 需要在服务间传递这样 TC 才能知道哪些分支事务属于同一个全局事务。Seata 通过拦截器自动处理Feign 调用时的 XID 传递// Seata 的 Feign 拦截器简化版publicclassSeataFeignRequestInterceptorimplementsRequestInterceptor{Overridepublicvoidapply(RequestTemplatetemplate){// 获取当前线程的 XIDStringxidRootContext.getXID();if(StringUtils.isNotBlank(xid)){// 将 XID 添加到请求头template.header(RootContext.KEY_XID,xid);}}}Dubbo 调用时的 XID 传递// Seata 的 Dubbo 过滤器简化版publicclassTransactionPropagationFilterimplementsFilter{OverridepublicResultinvoke(Invoker?invoker,Invocationinvocation){// 从 RPC 上下文中获取 XIDStringxidinvocation.getAttachment(RootContext.KEY_XID);if(StringUtils.isNotBlank(xid)){// 绑定到当前线程RootContext.bind(xid);}try{returninvoker.invoke(invocation);}finally{// 清理上下文RootContext.unbind();}}}3. 分支事务的注册当 RM 执行 SQL 时Seata 的ConnectionProxy会拦截// ConnectionProxy 的简化实现publicclassConnectionProxyimplementsConnection{OverridepublicPreparedStatementprepareStatement(Stringsql){// 1. 解析 SQL判断是否需要全局事务if(needGlobalLock(sql)){// 2. 获取全局锁acquireGlobalLock();}// 3. 执行 SQL 前记录前镜像TableRecordsbeforeImagebuildBeforeImage(sql);// 4. 执行 SQLPreparedStatementstatementtargetConnection.prepareStatement(sql);intupdateCountstatement.executeUpdate();// 5. 执行 SQL 后记录后镜像TableRecordsafterImagebuildAfterImage(sql,beforeImage);// 6. 生成 Undo LogUndoLogundoLogbuildUndoLog(beforeImage,afterImage);insertUndoLog(undoLog);// 7. 注册分支事务registerBranch();returnstatement;}privatevoidregisterBranch(){BranchRegisterRequestrequestnewBranchRegisterRequest();request.setXid(RootContext.getXID());request.setResourceId(dataSourceProxy.getResourceId());request.setBranchType(BranchType.AT);request.setApplicationId(applicationId);// 向 TC 注册分支事务BranchRegisterResponseresponsedefaultRMHandler.getRMChannel().sendSyncRequest(request);// 保存 branchIdthis.branchIdresponse.getBranchId();}}4. Undo Log 的结构Undo Log 存储在undo_log表中结构如下{branchId:123456789,xid:192.168.1.100:8091:2023120112345678,context:serializerjackson,rollbackInfo:{class:io.seata.rm.datasource.undo.BranchUndoLog,xid:192.168.1.100:8091:2023120112345678,branchId:123456789,sqlUndoLogs:[{class:io.seata.rm.datasource.undo.SQLUndoLog,sqlType:UPDATE,tableName:inventory,beforeImage:{class:io.seata.rm.datasource.sql.struct.TableRecords,tableName:inventory,rows:[{class:io.seata.rm.datasource.sql.struct.Row,fields:[{keyType:PrimaryKey,name:id,type:4,value:1},{name:stock,type:4,value:100}]}]},afterImage:{class:io.seata.rm.datasource.sql.struct.TableRecords,tableName:inventory,rows:[{class:io.seata.rm.datasource.sql.struct.Row,fields:[{keyType:PrimaryKey,name:id,type:4,value:1},{name:stock,type:4,value:80}]}]}}]},logStatus:1,logCreated:1701417600000,logModified:1701417600000}5. SQL 解析原理Seata 使用 Druid SQL Parser 来解析 SQL// SQL 解析的简化流程publicclassSQLRecognizer{publicstaticSQLRecognizerparse(Stringsql,StringdbType){// 1. 使用 Druid 解析 SQLSQLStatementstatementSQLUtils.parseSingleStatement(sql,dbType);// 2. 判断 SQL 类型if(statementinstanceofSQLUpdateStatement){returnnewMySQLUpdateRecognizer(sql,statement);}elseif(statementinstanceofSQLInsertStatement){returnnewMySQLInsertRecognizer(sql,statement);}elseif(statementinstanceofSQLDeleteStatement){returnnewMySQLDeleteRecognizer(sql,statement);}thrownewNotSupportYetException(不支持的 SQL 类型);}}// UPDATE 语句的识别器publicclassMySQLUpdateRecognizerextendsBaseMySQLRecognizer{OverridepublicTableRecordsgetBeforeImage(TableMetatableMeta,ListSQLStatementsqlStatements){// 1. 根据 WHERE 条件查询数据前镜像StringselectSQLbuildSelectSQL(tableMeta,sqlStatements);returnexecutor.query(selectSQL);}OverridepublicTableRecordsgetAfterImage(TableRecordsbeforeImage,TableMetatableMeta,ListSQLStatementsqlStatements){// 2. 执行 UPDATE然后根据主键查询数据后镜像executor.update(sqlStatements);StringselectSQLbuildSelectSQLByPK(beforeImage);returnexecutor.query(selectSQL);}}6. 全局提交流程如果所有分支事务都成功TM TC RM | | | |---- commit request ----------| | | | | | |---- branch commit request ---| | | | | |--- ACK (committed) ----------| | | | |--- ACK (committed) ----------| | | | | | |---- delete undo_log ---------| | | |TC 收到 TM 的提交请求后会异步通知各 RM 删除 Undo Log因为此时所有分支事务已经提交不需要回滚了。7. 全局回滚流程如果任何分支事务失败TM TC RM | | | |---- rollback request --------| | | | | | |---- branch rollback request -| | | | | | |---- 查询 undo_log | | |---- 根据前镜像恢复数据 | | |---- 删除 undo_log | | | | |--- ACK (rolled back) --------| | | | |--- ACK (rolled back) --------| |TC 收到 TM 的回滚请求后会同步通知各 RM 根据 Undo Log 回滚数据。关键点在于Seata 自动帮我们处理了回滚逻辑我们只需要在业务方法上加一个GlobalTransactional注解。用 Seata 解决我们的问题环境准备1. 数据库准备Seata Server 需要连接数据库来存储事务日志。我们可以在数据库中执行 Seata 提供的建表脚本-- Seata Server 使用的数据库表CREATEDATABASEseata;USEseata;-- 全局事务表CREATETABLEglobal_table(xidVARCHAR(128)NOTNULL,transaction_idBIGINT,statusTINYINTNOTNULL,application_idVARCHAR(32),transaction_service_groupVARCHAR(32),transaction_nameVARCHAR(128),timeoutINT,begin_timeBIGINT,application_dataVARCHAR(2000),gmt_createDATETIME,gmt_modifiedDATETIME,PRIMARYKEY(xid),KEYidx_gmt_modified_status(gmt_modified,status),KEYidx_transaction_id(transaction_id))ENGINEInnoDBDEFAULTCHARSETutf8;-- 分支事务表CREATETABLEbranch_table(branch_idBIGINTNOTNULL,xidVARCHAR(128)NOTNULL,transaction_idBIGINT,resource_group_idVARCHAR(32),resource_idVARCHAR(256),branch_typeVARCHAR(8),statusTINYINT,client_idVARCHAR(64),application_dataVARCHAR(2000),gmt_createDATETIME(6),gmt_modifiedDATETIME(6),PRIMARYKEY(branch_id),KEYidx_xid(xid))ENGINEInnoDBDEFAULTCHARSETutf8;-- 分布式锁表CREATETABLElock_table(row_keyVARCHAR(128)NOTNULL,xidVARCHAR(96),transaction_idBIGINT,branch_idBIGINTNOTNULL,resource_idVARCHAR(256),table_nameVARCHAR(32),pkVARCHAR(36),gmt_createDATETIME,gmt_modifiedDATETIME,PRIMARYKEY(row_key),KEYidx_branch_id(branch_id))ENGINEInnoDBDEFAULTCHARSETutf8;2. 启动 Seata Server方式一Docker 运行最简单docker run -d\--name seata-server\-p8091:8091\-eSEATA_PORT8091\seataio/seata-server:latest方式二本地部署推荐生产环境从官网下载 Seata Server解压后修改配置文件conf/application.ymlserver:port:7091spring:application:name:seata-serverseata:config:type:dbdb:datasource:druiddb-type:mysqldriver-class-name:com.mysql.cj.jdbc.Driverurl:jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatementstrueuser:rootpassword:your_passwordmin-conn:10max-conn:100global-table:global_tablebranch-table:branch_tablelock-table:lock_tabledistributed-lock-table:distributed_lockquery-limit:100max-wait:5000registry:type:nacosnacos:application:seata-serverserver-addr:127.0.0.1:8848group:SEATA_GROUPnamespace:cluster:defaultusername:nacospassword:nacosstore:mode:dbdb:datasource:druiddb-type:mysqldriver-class-name:com.mysql.cj.jdbc.Driverurl:jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatementstrueuser:rootpassword:your_passwordmin-conn:10max-conn:100global-table:global_tablebranch-table:branch_tablelock-table:lock_tablequery-limit:100max-wait:5000security:secretKey:SeataSecretKey0c382ef121d778043159209298fd40bf3850a017tokenValidityInMilliseconds:1800000ignore:urls:/,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/api/v1/auth/login然后启动shbin/seata-server.sh方式三Kubernetes 部署生产环境高可用如果需要高可用可以在 K8s 中部署apiVersion:apps/v1kind:Deploymentmetadata:name:seata-servernamespace:defaultspec:replicas:3selector:matchLabels:app:seata-servertemplate:metadata:labels:app:seata-serverspec:containers:-name:seata-serverimage:seataio/seata-server:latestports:-containerPort:8091-containerPort:7091env:-name:SEATA_PORTvalue:8091-name:STORE_MODEvalue:db-name:SEATA_CONFIG_NAMEvalue:file:/root/seata-config/registry3. 验证 Seata Server 是否启动成功访问 Seata 控制台http://localhost:7091默认用户名和密码都是seata。或者通过 API 检查curlhttp://localhost:7091/v1/metrics依赖引入在订单服务、库存服务、积分服务的 pom.xml 中都加入 Seata 依赖dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-seata/artifactId/dependency配置 Seata基础配置在 application.yml 中配置 Seataseata:enabled:trueapplication-id:order-service# 服务名用于标识不同的服务tx-service-group:my_test_tx_group# 事务组名# 服务配置service:# 事务组与集群的映射关系vgroup-mapping:my_test_tx_group:default# 事务组名 - 集群名# Seata Server 地址列表grouplist:default:127.0.0.1:8091# 集群名 - Server 地址多个用逗号分隔# 是否启用降级enable-degrade:false# 降级阈值当全局事务失败率达到这个值时会降级degrade-check-period:2000# 降级阈值百分比degrade-check-allow-times:10# 配置中心配置config:type:nacos# 支持 file、nacos、apollo、zk、consul 等nacos:server-addr:127.0.0.1:8848namespace:group:SEATA_GROUPusername:nacospassword:nacosdata-id:seataServer.properties# 注册中心配置registry:type:nacos# 支持 file、nacos、eureka、redis、zk、etcd3、sofa、consul 等nacos:application:seata-serverserver-addr:127.0.0.1:8848group:SEATA_GROUPnamespace:cluster:defaultusername:nacospassword:nacos# 客户端配置client:# RM 配置rm:# 是否异步提交分支事务async-commit-buffer-limit:10000# 报告重试次数report-retry-count:5# 表元数据检查table-meta-check-enable:false# 是否上报成功report-success-enable:false# Saga 分支注册使能saga-branch-register-enable:false# Saga JSON 解析器saga-json-parser:fastjson# Saga 重试持久化模式异步或同步saga-retry-persist-mode-update:false# Saga 补偿持久化模式异步或同步saga-compensate-persist-mode-update:false# 锁重试间隔毫秒lock-retry-interval:10# 锁重试次数lock-retry-times:30# TM 配置tm:# 提交重试次数commit-retry-count:5# 回滚重试次数rollback-retry-count:5# 降级检查degrade-check:false# 降级检查周期毫秒degrade-check-period:2000# 降级检查允许次数degrade-check-allow-times:10# Undo 配置undo:# 是否启用数据验证data-validation:true# 日志序列化方式jackson、kryo、protobuf、fastjsonlog-serialization:jackson# 日志表名log-table:undo_log# 是否只关心更新后的数据only-care-update-columns:true# 负载均衡配置load-balance:# 负载均衡类型RandomLoadBalance、RoundRobinLoadBalance、LeastActiveLoadBalance、ConsistentHashLoadBalancetype:RandomLoadBalance# 虚拟节点数仅适用于 ConsistentHashLoadBalancevirtual-nodes:10使用 Nacos 作为配置中心如果使用 Nacos 作为配置中心需要在 Nacos 中创建配置登录 Nacos 控制台创建命名空间可选创建配置Data IDseataServer.propertiesGroupSEATA_GROUP配置内容# 事务组配置 service.vgroupMapping.my_test_tx_groupdefault service.default.grouplist127.0.0.1:8091 # 事务服务降级策略 service.enableDegradefalse service.disableGlobalTransactionfalse # 客户端与 TC 通信配置 transport.typeTCP transport.serverNIO transport.heartbeattrue transport.enableClientBatchSendRequesttrue transport.threadFactory.bossThreadPrefixNettyBoss transport.threadFactory.workerThreadPrefixNettyServerNIOWorker transport.threadFactory.serverExecutornettyServerHandler transport.threadFactory.shareBossWorkerfalse transport.threadFactory.clientSelectorThreadPrefixNettyClientSelector transport.threadFactory.clientSelectorThreadSize1 transport.threadFactory.clientWorkerThreadPrefixNettyClientWorkerThread transport.threadFactory.bossThreadSize1 transport.threadFactory.workerThreadSizedefault transport.shutdown.wait3 # TC 存储配置 store.modedb store.db.datasourcedruid store.db.dbTypemysql store.db.driverClassNamecom.mysql.cj.jdbc.Driver store.db.urljdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatementstrue store.db.userroot store.db.passwordyour_password store.db.minConn5 store.db.maxConn100 store.db.globalTableglobal_table store.db.branchTablebranch_table store.db.lockTablelock_table store.db.queryLimit100 store.db.maxWait5000使用 Nacos 作为注册中心使用 Nacos 作为注册中心时客户端会自动从 Nacos 获取 Seata Server 的地址不需要手动配置grouplist。数据源代理Seata 需要代理数据源这样才能拦截 SQL 并生成 Undo Log。有多种方式可以实现方式一使用 DataSourceProxy推荐ConfigurationpublicclassDataSourceConfig{BeanConfigurationProperties(spring.datasource)publicDruidDataSourcedruidDataSource(){DruidDataSourcedruidDataSourcenewDruidDataSource();returndruidDataSource;}PrimaryBeanpublicDataSourcedataSource(DruidDataSourcedruidDataSource){// AT 模式使用 DataSourceProxyreturnnewDataSourceProxy(druidDataSource);}}方式二使用 Seata 自动代理Spring Boot Starter如果使用spring-cloud-starter-alibaba-seataSeata 会自动代理数据源不需要手动配置。但需要注意数据源的配置顺序ConfigurationpublicclassDataSourceConfig{BeanConfigurationProperties(spring.datasource)publicDruidDataSourcedruidDataSource(){returnnewDruidDataSource();}// 如果使用自动代理不要手动创建 DataSourceProxy// Seata 会自动拦截 Bean DataSource 类型的方法}方式三使用 SeataDataSourceBeanPostProcessor对于动态数据源如 Druid 的多数据源需要特殊处理ConfigurationpublicclassDynamicDataSourceConfig{BeanpublicDataSourcedataSource(){// 创建动态数据源DynamicRoutingDataSourcedynamicDataSourcenewDynamicRoutingDataSource();// 添加数据源MapString,DataSourcedataSourceMapnewHashMap();dataSourceMap.put(master,masterDataSource());dataSourceMap.put(slave1,slave1DataSource());dynamicDataSource.setTargetDataSources(dataSourceMap);dynamicDataSource.setDefaultTargetDataSource(masterDataSource());returndynamicDataSource;}BeanpublicDataSourcemasterDataSource(){DruidDataSourcedataSourcenewDruidDataSource();// ... 配置数据源returnnewDataSourceProxy(dataSource);}BeanpublicDataSourceslave1DataSource(){DruidDataSourcedataSourcenewDruidDataSource();// ... 配置数据源returnnewDataSourceProxy(dataSource);}}数据源代理的原理DataSourceProxy会拦截所有通过该数据源获取的Connection返回一个ConnectionProxy// DataSourceProxy 的简化实现publicclassDataSourceProxyextendsAbstractDataSourceProxy{OverridepublicConnectiongetConnection()throwsSQLException{ConnectiontargetConnectiontargetDataSource.getConnection();returnnewConnectionProxy(this,targetConnection);}}// ConnectionProxy 会拦截 SQL 执行publicclassConnectionProxyextendsAbstractConnectionProxy{OverridepublicPreparedStatementprepareStatement(Stringsql)throwsSQLException{// 解析 SQLSQLRecognizersqlRecognizerSQLRecognizerFactory.get(sql,getDbType());if(sqlRecognizer!null){// 生成前镜像TableRecordsbeforeImagebuildBeforeImage(sqlRecognizer);// 执行 SQLPreparedStatementtargetStatementtargetConnection.prepareStatement(sql);// 生成后镜像TableRecordsafterImagebuildAfterImage(sqlRecognizer,beforeImage);// 生成 Undo LogUndoLogundoLogbuildUndoLog(beforeImage,afterImage);insertUndoLog(undoLog);returnnewPreparedStatementProxy(this,targetStatement,sqlRecognizer);}returntargetConnection.prepareStatement(sql);}}业务代码改造最关键的一步在开启全局事务的地方加上注解ServicepublicclassOrderServiceImplimplementsOrderService{AutowiredprivateInventoryServiceinventoryService;AutowiredprivateOrderMapperorderMapper;AutowiredprivatePointServicepointService;GlobalTransactional(rollbackForException.class)OverridepublicvoidcreateOrder(OrderRequestrequest){// 1. 扣库存inventoryService.reduceStock(request.getProductId(),request.getQuantity());// 2. 创建订单OrderordernewOrder();order.setUserId(request.getUserId());order.setProductId(request.getProductId());order.setQuantity(request.getQuantity());orderMapper.insert(order);// 3. 扣积分pointService.deduct(request.getUserId(),request.getAmount());}}注意这三个服务的调用需要保证 XID 能够传递。如果是用 Feign 或者 DubboSeata 的 starter 已经帮我们自动处理了 XID 的传递。在库存服务和积分服务中也需要开启本地事务ServicepublicclassInventoryServiceImplimplementsInventoryService{AutowiredprivateInventoryMapperinventoryMapper;TransactionalOverridepublicvoidreduceStock(LongproductId,Integerquantity){InventoryinventoryinventoryMapper.selectByProductId(productId);if(inventory.getStock()quantity){thrownewRuntimeException(库存不足);}inventory.setStock(inventory.getStock()-quantity);inventoryMapper.updateById(inventory);}}Undo Log 表Seata 的 AT 模式需要在每个业务数据库中创建 undo_log 表CREATETABLEundo_log(idBIGINT(20)NOTNULLAUTO_INCREMENT,branch_idBIGINT(20)NOTNULL,xidVARCHAR(100)NOTNULL,contextVARCHAR(128)NOTNULL,rollback_infoLONGBLOBNOTNULL,log_statusINT(11)NOTNULL,log_createdDATETIME(6)NOTNULL,log_modifiedDATETIME(6)NOTNULL,PRIMARYKEY(id),UNIQUEKEYux_undo_log(xid,branch_id))ENGINEInnoDBDEFAULTCHARSETutf8;这个表用来存储回滚日志。当需要回滚时Seata 会从这个表读取数据的前镜像然后恢复数据。实际遇到的问题问题 1回滚失败场景描述我们在测试时发现有时候回滚会失败报错信息类似Cannot get table meta for table xxx, unknown table或者Primary key is null, cannot generate undo log原因分析排查后发现主要有几个原因表没有主键Seata 的 AT 模式需要根据主键来定位要回滚的记录如果没有主键就无法生成回滚 SQL。主键类型不支持Seata 只支持简单的单字段主键不支持复合主键。虽然可以配置支持但建议尽量避免。表元数据缓存未更新如果表结构改了比如新增了字段但 Seata 的元数据缓存没有更新也会导致回滚失败。解决方案所有业务表都要有主键最好是单字段主键-- 好的设计CREATETABLEorder(idBIGINTPRIMARYKEYAUTO_INCREMENT,user_idBIGINTNOTNULL,-- ...);-- 避免的设计CREATETABLEorder_item(order_idBIGINT,item_idBIGINT,-- ... 没有主键);-- 如果确实需要复合主键确保 Seata 配置支持CREATETABLEorder_item(order_idBIGINT,item_idBIGINT,PRIMARYKEY(order_id,item_id));配置表元数据检查开发环境可以开启生产环境建议关闭以提升性能seata:client:rm:table-meta-check-enable:true# 开启表元数据检查手动刷新元数据缓存如果确实需要// 在启动时刷新表元数据ComponentpublicclassTableMetaInitializer{AutowiredprivateDataSourceProxydataSourceProxy;PostConstructpublicvoidinit(){TableMetaCacheFactory.getTableMetaCache(DataSourceType.MYSQL).refresh(dataSourceProxy,order);}}问题 2并发更新导致数据不一致场景描述在高并发场景下多个全局事务同时更新同一条记录时可能会出现数据不一致的问题。比如两个订单同时扣减同一个商品的库存事务1库存 100 - 扣减 10 - 库存 90 事务2库存 100 - 扣减 20 - 库存 80如果这两个事务并发执行最终库存可能是 90 或 80但期望的应该是 70。原因分析Seata 的 AT 模式使用的是全局锁Global Lock但在某些情况下全局锁可能无法及时获取导致并发更新。全局锁的工作原理当 RM 执行 UPDATE 语句时会先获取本地数据库锁然后向 TC 申请全局锁如果全局锁被其他事务占用会重试直到超时但如果两个事务几乎同时执行可能出现事务1 获取了本地锁正在申请全局锁事务2 也获取了本地锁因为事务1还没拿到全局锁也在申请全局锁最终两个事务都成功提交导致数据不一致解决方案使用 SELECT FOR UPDATE在更新前先加行锁ServicepublicclassInventoryServiceImplimplementsInventoryService{TransactionalOverridepublicvoidreduceStock(LongproductId,Integerquantity){// 使用 FOR UPDATE 加锁InventoryinventoryinventoryMapper.selectByIdForUpdate(productId);if(inventory.getStock()quantity){thrownewRuntimeException(库存不足);}inventory.setStock(inventory.getStock()-quantity);inventoryMapper.updateById(inventory);}}// Mapper 中定义Select(SELECT * FROM inventory WHERE id #{id} FOR UPDATE)InventoryselectByIdForUpdate(Longid);使用乐观锁通过版本号控制DataTableName(inventory)publicclassInventory{privateLongid;privateLongproductId;privateIntegerstock;Version// MyBatis-Plus 乐观锁注解privateIntegerversion;}ServicepublicclassInventoryServiceImplimplementsInventoryService{TransactionalOverridepublicvoidreduceStock(LongproductId,Integerquantity){InventoryinventoryinventoryMapper.selectByProductId(productId);if(inventory.getStock()quantity){thrownewRuntimeException(库存不足);}// 使用版本号更新introwsinventoryMapper.updateStock(inventory.getId(),inventory.getVersion(),quantity);if(rows0){// 版本冲突重试或抛异常thrownewRuntimeException(库存更新冲突请重试);}}}// SQLUPDATE inventorySETstockstock-#{quantity},versionversion1WHEREid#{id}ANDversion#{version}使用分布式锁对于热点数据可以使用 Redis 分布式锁ServicepublicclassInventoryServiceImplimplementsInventoryService{AutowiredprivateRedisTemplateString,StringredisTemplate;TransactionalOverridepublicvoidreduceStock(LongproductId,Integerquantity){StringlockKeyinventory:lock:productId;StringlockValueUUID.randomUUID().toString();try{// 尝试获取分布式锁BooleanlockedredisTemplate.opsForValue().setIfAbsent(lockKey,lockValue,10,TimeUnit.SECONDS);if(!locked){thrownewRuntimeException(获取锁失败请重试);}// 执行库存扣减InventoryinventoryinventoryMapper.selectByProductId(productId);if(inventory.getStock()quantity){thrownewRuntimeException(库存不足);}inventory.setStock(inventory.getStock()-quantity);inventoryMapper.updateById(inventory);}finally{// 释放锁Stringscriptif redis.call(get, KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) else return 0 end;redisTemplate.execute(newDefaultRedisScript(script,Long.class),Collections.singletonList(lockKey),lockValue);}}}问题 3自定义 SQL 不支持场景描述Seata 的 AT 模式只支持标准的 INSERT、UPDATE、DELETE 语句。如果你的业务代码里有复杂的 SQL比如-- 带子查询的 UPDATEUPDATEorderSETstatus1WHEREuser_idIN(SELECTidFROMuserWHERElevel5);-- 批量更新UPDATEinventorySETstockCASEWHENid1THENstock-10WHENid2THENstock-20ELSEstockENDWHEREidIN(1,2);-- 使用函数UPDATEorderSETtotal_amount(SELECTSUM(amount)FROMorder_itemWHEREorder_idorder.id);这种复杂 SQLSeata 可能无法正确解析和回滚。原因分析Seata 使用 Druid SQL Parser 解析 SQL只支持标准的单表操作。对于复杂 SQL无法准确识别影响的数据范围无法生成准确的回滚 SQL可能导致回滚失败或数据不一致解决方案把复杂 SQL 拆成多条简单 SQL推荐// 不推荐Update(UPDATE order SET status 1 WHERE user_id IN (SELECT id FROM user WHERE level 5))voidupdateOrderStatusByUserLevel();// 推荐publicvoidupdateOrderStatusByUserLevel(){// 1. 先查询用户IDListLonguserIdsuserMapper.selectIdsByLevel(5);// 2. 再更新订单Seata 可以正确处理if(!userIds.isEmpty()){orderMapper.updateStatusByUserIds(userIds,OrderStatus.COMPLETED);}}使用编程式事务手动处理回滚逻辑ServicepublicclassComplexOrderService{GlobalTransactionalpublicvoidcomplexUpdate(OrderRequestrequest){try{// 执行复杂 SQLexecuteComplexSQL(request);}catch(Exceptione){// 手动回滚manualRollback(request);throwe;}}privatevoidmanualRollback(OrderRequestrequest){// 根据业务逻辑手动回滚// 比如记录需要回滚的操作然后补偿}}对于这种特殊情况考虑用 TCC 模式TCC 模式不依赖 SQL 解析可以灵活处理复杂业务逻辑。问题 4性能影响性能测试数据我们在生产环境的监控中发现使用 Seata 后数据库写入量增加约 30-50%每次 UPDATE/DELETE 都会生成 Undo Log事务耗时增加约 20-30ms主要是生成 Undo Log 和注册分支事务的开销数据库连接数略有增加RM 需要额外的连接与 TC 通信具体数据单机 1000 TPS指标未使用 Seata使用 Seata增加幅度平均响应时间50ms72ms44%P99 响应时间200ms280ms40%数据库 QPS3000450050%数据库连接数506530%性能瓶颈分析Undo Log 写入开销每次 UPDATE/DELETE 都需要查询前镜像一次 SELECT执行业务 SQL一次 UPDATE/DELETE查询后镜像一次 SELECT插入 Undo Log一次 INSERT网络通信开销分支事务注册需要与 TC 通信锁竞争全局锁可能导致事务等待优化方案优化 Undo Log 存储seata:client:undo:# 只记录变更的字段减少 Undo Log 大小only-care-update-columns:true# 使用更高效的序列化方式kryo 或 protobuflog-serialization:kryo异步提交分支事务减少提交阶段的耗时seata:client:rm:# 异步提交分支事务async-commit-buffer-limit:10000# 不报告成功减少网络通信report-success-enable:false批量操作优化对于批量更新如果支持尽量使用批量接口// 不推荐循环更新for(Orderorder:orders){orderMapper.updateById(order);// 每次都会生成 Undo Log}// 推荐批量更新如果 MyBatis-Plus 支持批量 Undo LogorderMapper.updateBatchById(orders);// 只生成一个 Undo Log读写分离将只读操作路由到从库减少主库压力DS(slave)// 使用从库不会开启事务也不会生成 Undo LogpublicListOrderqueryOrders(LonguserId){returnorderMapper.selectByUserId(userId);}对于高并发场景考虑用 TCC 模式或者消息队列TCC 模式不生成 Undo Log性能更好。但需要业务改造。监控和调优定期监控关键指标及时发现问题ComponentpublicclassSeataMetrics{AutowiredprivateMeterRegistrymeterRegistry;EventListenerpublicvoidonGlobalTransactionBegin(GlobalTransactionBeginEventevent){meterRegistry.counter(seata.global.transaction.begin).increment();}EventListenerpublicvoidonGlobalTransactionCommit(GlobalTransactionCommitEventevent){Timer.SamplesampleTimer.start(meterRegistry);sample.stop(meterRegistry.timer(seata.global.transaction.commit));}EventListenerpublicvoidonGlobalTransactionRollback(GlobalTransactionRollbackEventevent){meterRegistry.counter(seata.global.transaction.rollback).increment();}}问题 5网络分区和数据不一致场景描述在网络不稳定的环境下可能出现RM 与 TC 网络分区无法通信分支事务已经提交但 TC 不知道全局事务超时后TC 尝试回滚已经提交的分支事务解决方案设置合理的超时时间GlobalTransactional(timeoutMills300000,// 5 分钟rollbackForException.class)publicvoidlongRunningBusiness(){// ...}使用 Saga 模式处理长事务对于执行时间不确定的长事务Saga 模式更合适。实现补偿机制对于可能出现的部分提交情况实现补偿逻辑ServicepublicclassOrderService{GlobalTransactionalpublicvoidcreateOrder(OrderRequestrequest){try{inventoryService.reduceStock(request.getProductId());orderService.create(request);pointService.deduct(request.getUserId());}catch(Exceptione){// Seata 会自动回滚但为了保险我们可以记录日志log.error(订单创建失败开始补偿,e);compensate(request);throwe;}}privatevoidcompensate(OrderRequestrequest){// 记录需要补偿的操作CompensationLoglognewCompensationLog();log.setOrderId(request.getOrderId());log.setAction(createOrder);log.setStatus(NEED_COMPENSATE);compensationLogMapper.insert(log);}// 定时任务扫描并执行补偿Scheduled(fixedDelay60000)publicvoidexecuteCompensation(){ListCompensationLoglogscompensationLogMapper.selectList(newLambdaQueryWrapperCompensationLog().eq(CompensationLog::getStatus,NEED_COMPENSATE));for(CompensationLoglog:logs){try{doCompensate(log);log.setStatus(COMPENSATED);}catch(Exceptione){log.error(补偿失败,e);}compensationLogMapper.updateById(log);}}}问题 6超时时间设置场景描述全局事务有个超时时间默认是 60 秒。如果业务执行时间超过这个时间事务会被强制回滚。我们在处理大批量订单时遇到过这个问题GlobalTransactionalpublicvoidbatchCreateOrder(ListOrderRequestrequests){// 如果 requests 有 10000 条处理时间可能超过 60 秒for(OrderRequestrequest:requests){createOrder(request);// 每个订单 10ms总共 100 秒}}解决方案设置合理的超时时间GlobalTransactional(timeoutMills300000)// 设置超时时间为 5 分钟publicvoidbatchCreateOrder(ListOrderRequestrequests){// ...}合理拆分事务把大事务拆成多个小事务// 不推荐一个大事务处理所有订单GlobalTransactionalpublicvoidbatchCreateOrder(ListOrderRequestrequests){for(OrderRequestrequest:requests){createOrder(request);}}// 推荐批量处理每个批次一个事务publicvoidbatchCreateOrder(ListOrderRequestrequests){intbatchSize100;for(inti0;irequests.size();ibatchSize){ListOrderRequestbatchrequests.subList(i,Math.min(ibatchSize,requests.size()));processBatch(batch);}}GlobalTransactional(timeoutMills30000)privatevoidprocessBatch(ListOrderRequestbatch){for(OrderRequestrequest:batch){createOrder(request);}}异步处理对于非实时要求的业务可以异步处理ServicepublicclassOrderService{AutowiredprivateThreadPoolTaskExecutorasyncExecutor;publicvoidbatchCreateOrder(ListOrderRequestrequests){// 异步处理不开启全局事务asyncExecutor.execute(()-{processBatch(requests);});}GlobalTransactionalprivatevoidprocessBatch(ListOrderRequestrequests){// 异步处理中的事务for(OrderRequestrequest:requests){createOrder(request);}}}Seata 的其他模式除了 AT 模式Seata 还支持 TCC 模式和 Saga 模式。每种模式都有其适用场景。TCC 模式详解TCC 模式需要业务代码实现 Try、Confirm、Cancel 三个方法。对于扣库存这个场景可以这样实现接口定义publicinterfaceInventoryTccService{TwoPhaseBusinessAction(namereduceStock,commitMethodconfirm,rollbackMethodcancel)booleantryReduceStock(BusinessActionContextParameter(productId)LongproductId,BusinessActionContextParameter(quantity)Integerquantity,BusinessActionContextParameter(orderId)LongorderId);booleanconfirm(BusinessActionContextcontext);booleancancel(BusinessActionContextcontext);}完整实现包含幂等、空回滚、防悬挂ServiceSlf4jpublicclassInventoryTccServiceImplimplementsInventoryTccService{AutowiredprivateInventoryMapperinventoryMapper;AutowiredprivateTccActionMappertccActionMapper;OverrideTransactionalpublicbooleantryReduceStock(LongproductId,Integerquantity,LongorderId){StringxidRootContext.getXID();// 1. 防悬挂检查是否已经被 CancelTccActionactiontccActionMapper.selectByXidAndAction(xid,reduceStock,productId.toString());if(action!nullaction.getStatus()TccActionStatus.CANCELLED){log.warn(Try 阶段被悬挂xid{}, productId{},xid,productId);returnfalse;}// 2. 幂等性检查if(action!nullaction.getStatus()TccActionStatus.TRYING){log.info(Try 阶段已执行直接返回xid{}, productId{},xid,productId);returntrue;}// 3. 冻结库存InventoryinventoryinventoryMapper.selectByProductId(productId);if(inventory.getStock()quantity){thrownewRuntimeException(库存不足);}inventory.setFrozenStock(inventory.getFrozenStock()quantity);inventoryMapper.updateById(inventory);// 4. 记录 TCC 操作状态if(actionnull){actionnewTccAction();action.setXid(xid);action.setAction(reduceStock);action.setResourceId(productId.toString());action.setContext(JSON.toJSONString(Map.of(productId,productId,quantity,quantity,orderId,orderId)));action.setStatus(TccActionStatus.TRYING);tccActionMapper.insert(action);}log.info(Try 阶段执行成功xid{}, productId{}, quantity{},xid,productId,quantity);returntrue;}OverrideTransactionalpublicbooleanconfirm(BusinessActionContextcontext){Stringxidcontext.getXid();MapString,ObjectparamsJSON.parseObject(context.getActionContext(context).toString(),Map.class);LongproductIdLong.valueOf(params.get(productId).toString());IntegerquantityInteger.valueOf(params.get(quantity).toString());// 1. 幂等性检查TccActionactiontccActionMapper.selectByXidAndAction(xid,reduceStock,productId.toString());if(actionnull){log.warn(Confirm 阶段未找到 Try 记录可能已确认xid{},xid);returntrue;// 可能已经确认过了}if(action.getStatus()TccActionStatus.CONFIRMED){log.info(Confirm 阶段已执行直接返回xid{},xid);returntrue;}// 2. 真正扣减库存InventoryinventoryinventoryMapper.selectByProductId(productId);inventory.setStock(inventory.getStock()-quantity);inventory.setFrozenStock(inventory.getFrozenStock()-quantity);inventoryMapper.updateById(inventory);// 3. 更新状态action.setStatus(TccActionStatus.CONFIRMED);tccActionMapper.updateById(action);log.info(Confirm 阶段执行成功xid{}, productId{}, quantity{},xid,productId,quantity);returntrue;}OverrideTransactionalpublicbooleancancel(BusinessActionContextcontext){Stringxidcontext.getXid();MapString,ObjectparamsJSON.parseObject(context.getActionContext(context).toString(),Map.class);LongproductIdLong.valueOf(params.get(productId).toString());IntegerquantityInteger.valueOf(params.get(quantity).toString());// 1. 空回滚检查TccActionactiontccActionMapper.selectByXidAndAction(xid,reduceStock,productId.toString());if(actionnull){// Try 没执行记录空回滚标记actionnewTccAction();action.setXid(xid);action.setAction(reduceStock);action.setResourceId(productId.toString());action.setContext(context.getActionContext(context).toString());action.setStatus(TccActionStatus.CANCELLED);tccActionMapper.insert(action);log.warn(Cancel 阶段空回滚xid{}, productId{},xid,productId);returntrue;}// 2. 幂等性检查if(action.getStatus()TccActionStatus.CANCELLED){log.info(Cancel 阶段已执行直接返回xid{},xid);returntrue;}// 3. 释放冻结的库存InventoryinventoryinventoryMapper.selectByProductId(productId);inventory.setFrozenStock(inventory.getFrozenStock()-quantity);inventoryMapper.updateById(inventory);// 4. 更新状态action.setStatus(TccActionStatus.CANCELLED);tccActionMapper.updateById(action);log.info(Cancel 阶段执行成功xid{}, productId{}, quantity{},xid,productId,quantity);returntrue;}}TCC 模式的使用ServicepublicclassOrderServiceImplimplementsOrderService{AutowiredprivateInventoryTccServiceinventoryTccService;GlobalTransactionalOverridepublicvoidcreateOrder(OrderRequestrequest){// 1. Try 阶段冻结库存inventoryTccService.tryReduceStock(request.getProductId(),request.getQuantity(),request.getOrderId());// 2. 创建订单OrderordernewOrder();order.setUserId(request.getUserId());order.setProductId(request.getProductId());orderMapper.insert(order);// 如果后续步骤失败Seata 会自动调用 Cancel// 如果全部成功Seata 会自动调用 Confirm}}TCC 模式的优缺点优点性能好不依赖数据库事务不需要 Undo Log数据库压力小业务可控性强缺点需要大量改造业务代码需要处理幂等性、空回滚、防悬挂等复杂逻辑开发成本高容易出错Saga 模式详解Saga 模式适用于长事务场景。它的核心思想是把一个大事务拆成多个小事务每个小事务都有对应的补偿操作。注解方式实现ServicepublicclassOrderSagaService{SagaStartGlobalTransactionalpublicvoidcreateOrder(OrderRequestrequest){reduceStock(request);createOrderRecord(request);deductPoint(request);}Compensable(compensationMethodreduceStockCompensate)publicvoidreduceStock(OrderRequestrequest){// 扣库存inventoryService.reduceStock(request.getProductId(),request.getQuantity());}publicvoidreduceStockCompensate(OrderRequestrequest){// 补偿恢复库存inventoryService.restoreStock(request.getProductId(),request.getQuantity());}Compensable(compensationMethodcreateOrderRecordCompensate)publicvoidcreateOrderRecord(OrderRequestrequest){// 创建订单orderService.create(request);}publicvoidcreateOrderRecordCompensate(OrderRequestrequest){// 补偿删除订单orderService.delete(request.getOrderId());}Compensable(compensationMethoddeductPointCompensate)publicvoiddeductPoint(OrderRequestrequest){// 扣积分pointService.deduct(request.getUserId(),request.getAmount());}publicvoiddeductPointCompensate(OrderRequestrequest){// 补偿恢复积分pointService.restore(request.getUserId(),request.getAmount());}}状态机方式实现状态机方式更灵活可以处理复杂的业务流程。需要定义一个 JSON 格式的状态机定义文件{Name:createOrderSaga,Comment:创建订单流程,StartState:ReduceStock,Version:0.0.1,States:{ReduceStock:{Type:ServiceTask,ServiceName:inventoryService,ServiceMethod:reduceStock,CompensateState:ReduceStockCompensate,Next:CreateOrder},ReduceStockCompensate:{Type:CompensationTask,ServiceName:inventoryService,ServiceMethod:restoreStock},CreateOrder:{Type:ServiceTask,ServiceName:orderService,ServiceMethod:create,CompensateState:CreateOrderCompensate,Next:DeductPoint},CreateOrderCompensate:{Type:CompensationTask,ServiceName:orderService,ServiceMethod:delete},DeductPoint:{Type:ServiceTask,ServiceName:pointService,ServiceMethod:deduct,CompensateState:DeductPointCompensate,IsEnd:true},DeductPointCompensate:{Type:CompensationTask,ServiceName:pointService,ServiceMethod:restore}}}Saga 模式的优缺点优点适合长事务不锁定资源性能好缺点只能保证最终一致性可能有短暂的中间状态需要实现补偿逻辑异构系统分布式事务的挑战问题场景在实际项目中我们经常需要与第三方系统集成。比如我们的系统Java Seata ↓ 1. 扣库存本地数据库 ↓ 2. 创建订单本地数据库 ↓ 3. 调用第三方支付接口第三方系统可能是 PHP、Python 等 ↓ 4. 扣积分本地数据库当你使用GlobalTransactional时如果第三方接口返回错误你捕获错误编码后抛出异常发现本地系统的数据并没有回滚。典型错误代码示例很多开发者会这样写代码ServicepublicclassOrderService{GlobalTransactional(rollbackForException.class)publicvoidcreateOrder(OrderRequestrequest){// 1. 扣库存inventoryService.reduceStock(request.getProductId(),request.getQuantity());// 2. 创建订单OrderordernewOrder();order.setUserId(request.getUserId());order.setProductId(request.getProductId());orderMapper.insert(order);// 3. 调用第三方接口try{ThirdPartyResponseresponsethirdPartyService.callPayment(request);// 第三方返回错误编码if(!response.isSuccess()){// ❌ 错误只是记录日志没有抛出异常log.error(第三方支付失败: code{}, msg{},response.getCode(),response.getMsg());// 这里没有 throw事务不会回滚}}catch(Exceptione){// ❌ 错误捕获异常后没有重新抛出log.error(调用第三方接口异常,e);// 这里没有 throw事务不会回滚}// 4. 扣积分pointService.deduct(request.getUserId(),request.getAmount());// 结果即使第三方返回错误本地数据也不会回滚}}为什么数据没有回滚原因一异常被吞掉了最常见的问题捕获异常后没有重新抛出。// ❌ 错误写法try{ThirdPartyResponseresponsethirdPartyService.callPayment(request);if(!response.isSuccess()){log.error(支付失败);// 没有 throw事务不会回滚}}catch(Exceptione){log.error(异常,e);// 没有 throw事务不会回滚}// ✅ 正确写法try{ThirdPartyResponseresponsethirdPartyService.callPayment(request);if(!response.isSuccess()){thrownewBusinessException(第三方支付失败: response.getMsg());}}catch(Exceptione){log.error(调用第三方接口异常,e);thrownewBusinessException(第三方接口调用失败,e);// 必须重新抛出}原因二异常类型不在 rollbackFor 范围内如果抛出的异常类型不在rollbackFor指定的范围内事务也不会回滚。// ❌ 错误抛出的异常类型不在 rollbackFor 范围内GlobalTransactional(rollbackFor{BusinessException.class})// 只回滚 BusinessExceptionpublicvoidcreateOrder(OrderRequestrequest){// ...if(!response.isSuccess()){thrownewRuntimeException(支付失败);// RuntimeException 不在 rollbackFor 中// 事务不会回滚}}// ✅ 正确抛出指定类型的异常GlobalTransactional(rollbackForException.class)// 所有异常都回滚publicvoidcreateOrder(OrderRequestrequest){// ...if(!response.isSuccess()){thrownewBusinessException(支付失败);// 或者直接 throw new Exception}}原因三第三方调用在事务提交后执行如果第三方调用是在事务提交后执行的比如用了Async或TransactionalEventListener即使抛出异常也不会回滚。// ❌ 错误异步调用事务已经提交GlobalTransactionalpublicvoidcreateOrder(OrderRequestrequest){inventoryService.reduceStock(request.getProductId());orderMapper.insert(order);// 异步调用事务可能已经提交asyncThirdPartyService.callPayment(request);// 这个方法有 Async}// ✅ 正确同步调用在事务内GlobalTransactionalpublicvoidcreateOrder(OrderRequestrequest){inventoryService.reduceStock(request.getProductId());orderMapper.insert(order);// 同步调用在事务内thirdPartyService.callPayment(request);}原因四第三方调用被包装成了非异常返回有些开发者喜欢用返回值来表示错误而不是抛异常。// ❌ 错误用返回值表示错误不抛异常GlobalTransactionalpublicvoidcreateOrder(OrderRequestrequest){inventoryService.reduceStock(request.getProductId());orderMapper.insert(order);// 返回 Result 对象不抛异常ResultresultthirdPartyService.callPayment(request);if(!result.isSuccess()){log.error(支付失败);return;// 只是返回事务不会回滚}}// ✅ 正确失败时抛出异常GlobalTransactional(rollbackForException.class)publicvoidcreateOrder(OrderRequestrequest){inventoryService.reduceStock(request.getProductId());orderMapper.insert(order);ResultresultthirdPartyService.callPayment(request);if(!result.isSuccess()){thrownewBusinessException(支付失败: result.getMsg());}}正确的处理方式方式一直接抛出异常推荐ServiceSlf4jpublicclassOrderService{GlobalTransactional(rollbackForException.class)publicvoidcreateOrder(OrderRequestrequest){// 1. 扣库存inventoryService.reduceStock(request.getProductId(),request.getQuantity());// 2. 创建订单OrderordernewOrder();order.setUserId(request.getUserId());order.setProductId(request.getProductId());order.setStatus(OrderStatus.CREATED);orderMapper.insert(order);// 3. 调用第三方接口ThirdPartyResponseresponsethirdPartyService.callPayment(request);// 检查返回码失败直接抛异常if(!response.isSuccess()){log.error(第三方支付失败: code{}, msg{},response.getCode(),response.getMsg());thrownewBusinessException(第三方支付失败: coderesponse.getCode(), msgresponse.getMsg());}// 4. 扣积分pointService.deduct(request.getUserId(),request.getAmount());}}方式二封装第三方调用统一异常处理ServiceSlf4jpublicclassOrderService{AutowiredprivateThirdPartyServicethirdPartyService;GlobalTransactional(rollbackForException.class)publicvoidcreateOrder(OrderRequestrequest){// 1. 扣库存inventoryService.reduceStock(request.getProductId(),request.getQuantity());// 2. 创建订单OrderordernewOrder();order.setUserId(request.getUserId());order.setProductId(request.getProductId());orderMapper.insert(order);// 3. 调用第三方接口封装的方法会自动处理异常callThirdPartyWithException(request);// 4. 扣积分pointService.deduct(request.getUserId(),request.getAmount());}/** * 调用第三方接口失败时抛出异常 */privatevoidcallThirdPartyWithException(OrderRequestrequest){try{ThirdPartyResponseresponsethirdPartyService.callPayment(request);if(!response.isSuccess()){// 根据错误码判断是否需要回滚if(isRollbackRequired(response.getCode())){thrownewBusinessException(第三方支付失败: response.getMsg());}else{// 某些错误码不需要回滚比如参数错误本地数据是正确的log.warn(第三方返回错误但不回滚: code{},response.getCode());}}}catch(BusinessExceptione){// 业务异常直接抛出throwe;}catch(Exceptione){// 系统异常包装后抛出log.error(调用第三方接口异常,e);thrownewSystemException(第三方接口调用失败,e);}}/** * 判断错误码是否需要回滚 */privatebooleanisRollbackRequired(StringerrorCode){// 根据业务规则判断// 比如余额不足、系统错误等需要回滚// 参数错误、重复请求等不需要回滚return!PARAM_ERROR.equals(errorCode)!DUPLICATE_REQUEST.equals(errorCode);}}方式三使用自定义异常和异常处理器/** * 第三方调用异常 */publicclassThirdPartyExceptionextendsRuntimeException{privateStringerrorCode;privateStringerrorMsg;publicThirdPartyException(StringerrorCode,StringerrorMsg){super(第三方调用失败: codeerrorCode, msgerrorMsg);this.errorCodeerrorCode;this.errorMsgerrorMsg;}// getter/setter}ServiceSlf4jpublicclassOrderService{GlobalTransactional(rollbackFor{Exception.class,ThirdPartyException.class})publicvoidcreateOrder(OrderRequestrequest){inventoryService.reduceStock(request.getProductId(),request.getQuantity());orderMapper.insert(order);// 调用第三方ThirdPartyResponseresponsethirdPartyService.callPayment(request);if(!response.isSuccess()){// 抛出自定义异常thrownewThirdPartyException(response.getCode(),response.getMsg());}pointService.deduct(request.getUserId(),request.getAmount());}}排查步骤如果遇到抛出异常但数据没有回滚的问题按以下步骤排查检查异常是否真的抛出了GlobalTransactional(rollbackForException.class)publicvoidcreateOrder(OrderRequestrequest){try{// 业务逻辑thirdPartyService.callPayment(request);}catch(Exceptione){log.error(异常,e);// 在这里打断点确认异常是否被捕获// 确认这里是否有 throwthrowe;// 必须要有这一行}}检查 rollbackFor 配置// 确认异常类型在 rollbackFor 中GlobalTransactional(rollbackForException.class)// 所有异常都回滚// 或者GlobalTransactional(rollbackFor{BusinessException.class,ThirdPartyException.class})检查事务是否真的开启了GlobalTransactional(rollbackForException.class)publicvoidcreateOrder(OrderRequestrequest){// 打印 XID确认事务是否开启StringxidRootContext.getXID();log.info(当前事务 XID: {},xid);if(xidnull){log.error(事务未开启);return;}// 业务逻辑}检查是否有异步调用// 确认第三方调用不是异步的// 如果方法上有 Async事务不会回滚publicvoidcreateOrder(OrderRequestrequest){// 检查 thirdPartyService.callPayment 是否有 Async}检查日志确认 Seata 是否收到回滚请求开启 Seata 的 DEBUG 日志logging:level:io.seata:DEBUGio.seata.tm:DEBUGio.seata.rm:DEBUG查看日志中是否有Global transaction beginGlobal transaction rollbackBranch transaction rollback如果没有看到 rollback 日志说明 Seata 没有收到回滚请求可能是异常没有正确抛出。最佳实践统一异常处理定义业务异常基类统一处理publicclassBusinessExceptionextendsRuntimeException{privateStringerrorCode;publicBusinessException(StringerrorCode,Stringmessage){super(message);this.errorCodeerrorCode;}}GlobalTransactional(rollbackForBusinessException.class)publicvoidcreateOrder(OrderRequestrequest){// 业务逻辑if(!response.isSuccess()){thrownewBusinessException(response.getCode(),response.getMsg());}}使用断言式编程让代码更清晰privatevoidassertThirdPartySuccess(ThirdPartyResponseresponse){if(!response.isSuccess()){thrownewBusinessException(response.getCode(),第三方调用失败: response.getMsg());}}GlobalTransactional(rollbackForException.class)publicvoidcreateOrder(OrderRequestrequest){inventoryService.reduceStock(request.getProductId());orderMapper.insert(order);ThirdPartyResponseresponsethirdPartyService.callPayment(request);assertThirdPartySuccess(response);// 断言式失败直接抛异常pointService.deduct(request.getUserId(),request.getAmount());}记录详细日志方便排查问题GlobalTransactional(rollbackForException.class)publicvoidcreateOrder(OrderRequestrequest){StringxidRootContext.getXID();log.info(开始创建订单XID: {},xid);try{inventoryService.reduceStock(request.getProductId());log.info(扣库存成功XID: {},xid);orderMapper.insert(order);log.info(创建订单成功XID: {},xid);ThirdPartyResponseresponsethirdPartyService.callPayment(request);if(!response.isSuccess()){log.error(第三方支付失败准备回滚XID: {}, code: {}, msg: {},xid,response.getCode(),response.getMsg());thrownewBusinessException(response.getCode(),response.getMsg());}log.info(第三方支付成功XID: {},xid);pointService.deduct(request.getUserId(),request.getAmount());log.info(扣积分成功XID: {},xid);}catch(Exceptione){log.error(订单创建失败准备回滚XID: {},xid,e);throwe;// 必须重新抛出}}为什么会出现这个问题Seata 的 AT 模式只能管理被 Seata 代理的数据源。对于第三方系统无法控制第三方系统的数据源第三方系统有自己的数据库Seata 无法代理无法生成 Undo Log第三方系统的操作无法被 Seata 拦截无法参与分布式事务第三方系统可能不支持 Seata甚至不知道分布式事务的存在所以当第三方接口返回错误时第三方系统可能已经提交了事务比如已经扣款或者第三方系统根本没有事务比如只是记录日志你的本地系统虽然抛出了异常但 Seata 只能回滚本地数据库的操作第三方系统的状态无法回滚解决方案方案一最大努力通知模式推荐这是处理异构系统分布式事务最常用的方案。核心思想是先完成本地事务保证本地数据的一致性异步通知第三方通过消息队列异步调用第三方接口补偿机制如果第三方调用失败通过定时任务重试或人工补偿ServiceSlf4jpublicclassOrderService{AutowiredprivateOrderMapperorderMapper;AutowiredprivateInventoryServiceinventoryService;AutowiredprivatePointServicepointService;AutowiredprivateThirdPartyServicethirdPartyService;AutowiredprivateCompensationLogMappercompensationLogMapper;AutowiredprivateRabbitTemplaterabbitTemplate;/** * 创建订单不包含第三方调用 */GlobalTransactional(rollbackForException.class)publicvoidcreateOrder(OrderRequestrequest){// 1. 扣库存本地事务inventoryService.reduceStock(request.getProductId(),request.getQuantity());// 2. 创建订单本地事务OrderordernewOrder();order.setUserId(request.getUserId());order.setProductId(request.getProductId());order.setQuantity(request.getQuantity());order.setStatus(OrderStatus.CREATED);// 初始状态已创建待支付orderMapper.insert(order);// 3. 扣积分本地事务pointService.deduct(request.getUserId(),request.getAmount());// 4. 发送消息到队列异步调用第三方// 注意这里不直接调用第三方而是发送消息sendThirdPartyNotification(order);// 如果上面的步骤都成功本地事务提交// 如果失败Seata 会回滚本地数据库的操作}/** * 发送第三方通知消息 */privatevoidsendThirdPartyNotification(Orderorder){ThirdPartyNotificationMessagemessagenewThirdPartyNotificationMessage();message.setOrderId(order.getId());message.setOrderNo(order.getOrderNo());message.setUserId(order.getUserId());message.setAmount(order.getAmount());// 发送到消息队列rabbitTemplate.convertAndSend(third-party.exchange,third-party.payment,JSON.toJSONString(message));// 同时记录补偿日志用于失败重试CompensationLoglognewCompensationLog();log.setOrderId(order.getId());log.setOrderNo(order.getOrderNo());log.setAction(THIRD_PARTY_PAYMENT);log.setStatus(CompensationStatus.PENDING);log.setRetryCount(0);log.setPayload(JSON.toJSONString(message));compensationLogMapper.insert(log);}/** * 消费消息调用第三方接口 */RabbitListener(queuesthird-party.payment.queue)publicvoidhandleThirdPartyPayment(StringmessageJson){ThirdPartyNotificationMessagemessageJSON.parseObject(messageJson,ThirdPartyNotificationMessage.class);try{// 调用第三方接口ThirdPartyResponseresponsethirdPartyService.callPaymentAPI(message);if(response.isSuccess()){// 成功更新订单状态updateOrderStatus(message.getOrderId(),OrderStatus.PAID);// 更新补偿日志状态updateCompensationLog(message.getOrderId(),CompensationStatus.SUCCESS);log.info(第三方支付成功: orderId{},message.getOrderId());}else{// 失败标记需要重试handleThirdPartyFailure(message,response);}}catch(Exceptione){log.error(调用第三方接口失败: orderId{},message.getOrderId(),e);handleThirdPartyFailure(message,null);}}/** * 处理第三方调用失败 */privatevoidhandleThirdPartyFailure(ThirdPartyNotificationMessagemessage,ThirdPartyResponseresponse){CompensationLoglogcompensationLogMapper.selectByOrderId(message.getOrderId());if(lognull){// 创建补偿日志lognewCompensationLog();log.setOrderId(message.getOrderId());log.setOrderNo(message.getOrderNo());log.setAction(THIRD_PARTY_PAYMENT);log.setStatus(CompensationStatus.FAILED);log.setRetryCount(0);log.setPayload(JSON.toJSONString(message));log.setErrorMsg(response!null?response.getErrorMsg():调用异常);compensationLogMapper.insert(log);}else{// 增加重试次数log.setRetryCount(log.getRetryCount()1);log.setErrorMsg(response!null?response.getErrorMsg():调用异常);if(log.getRetryCount()3){// 超过最大重试次数标记为需要人工处理log.setStatus(CompensationStatus.MANUAL_REQUIRED);}compensationLogMapper.updateById(log);}// 如果重试次数未超限延迟重试if(log.getRetryCount()3){// 发送延迟消息30秒后重试rabbitTemplate.convertAndSend(third-party.exchange,third-party.payment.retry,JSON.toJSONString(message),msg-{msg.getMessageProperties().setDelay(30000);// 30秒延迟returnmsg;});}}/** * 定时任务扫描需要补偿的记录 */Scheduled(fixedDelay60000)// 每分钟执行一次publicvoidcompensateFailedTransactions(){ListCompensationLoglogscompensationLogMapper.selectList(newLambdaQueryWrapperCompensationLog().eq(CompensationLog::getStatus,CompensationStatus.FAILED).lt(CompensationLog::getRetryCount,3).lt(CompensationLog::getUpdatedAt,LocalDateTime.now().minusMinutes(5))// 5分钟前的失败记录);for(CompensationLoglog:logs){try{ThirdPartyNotificationMessagemessageJSON.parseObject(log.getPayload(),ThirdPartyNotificationMessage.class);// 重新发送消息rabbitTemplate.convertAndSend(third-party.exchange,third-party.payment,JSON.toJSONString(message));log.info(重新发送第三方通知: orderId{},message.getOrderId());}catch(Exceptione){log.error(补偿失败: orderId{},log.getOrderId(),e);}}}}优点本地事务可以正常回滚第三方调用失败不影响本地数据一致性通过重试机制保证最终一致性缺点只能保证最终一致性不能保证强一致性需要实现补偿机制方案二TCC 模式如果第三方支持如果第三方系统也支持 TCC 模式比如也是 Java 系统可以集成 Seata可以使用 TCC 模式ServicepublicclassOrderService{AutowiredprivateInventoryTccServiceinventoryTccService;AutowiredprivateThirdPartyTccServicethirdPartyTccService;GlobalTransactionalpublicvoidcreateOrder(OrderRequestrequest){// 1. Try 阶段冻结库存inventoryTccService.tryReduceStock(request.getProductId(),request.getQuantity());// 2. Try 阶段调用第三方 Try 接口冻结资源thirdPartyTccService.tryPayment(request.getUserId(),request.getAmount());// 如果后续步骤失败Seata 会自动调用 Cancel// 如果全部成功Seata 会自动调用 Confirm}}// 第三方 TCC 服务接口publicinterfaceThirdPartyTccService{TwoPhaseBusinessAction(namethirdPartyPayment,commitMethodconfirmPayment,rollbackMethodcancelPayment)booleantryPayment(BusinessActionContextParameter(userId)LonguserId,BusinessActionContextParameter(amount)BigDecimalamount);booleanconfirmPayment(BusinessActionContextcontext);booleancancelPayment(BusinessActionContextcontext);}前提条件第三方系统必须支持 TCC 模式第三方系统必须提供 Try、Confirm、Cancel 三个接口第三方系统必须能够参与 Seata 的分布式事务方案三Saga 模式 补偿接口如果第三方系统提供补偿接口可以使用 Saga 模式ServicepublicclassOrderSagaService{SagaStartGlobalTransactionalpublicvoidcreateOrder(OrderRequestrequest){reduceStock(request);createOrderRecord(request);callThirdParty(request);// 调用第三方deductPoint(request);}Compensable(compensationMethodcallThirdPartyCompensate)publicvoidcallThirdParty(OrderRequestrequest){// 调用第三方接口ThirdPartyResponseresponsethirdPartyService.payment(request);if(!response.isSuccess()){thrownewBusinessException(第三方支付失败: response.getErrorMsg());}}publicvoidcallThirdPartyCompensate(OrderRequestrequest){// 补偿调用第三方的退款接口try{thirdPartyService.refund(request.getOrderId());}catch(Exceptione){log.error(第三方退款失败需要人工处理: orderId{},request.getOrderId(),e);// 记录到补偿日志人工处理recordCompensationLog(request.getOrderId(),THIRD_PARTY_REFUND_FAILED);}}}前提条件第三方系统必须提供补偿接口如退款接口补偿接口必须是幂等的方案四本地消息表 两阶段提交伪如果第三方系统支持两阶段提交协议可以实现一个伪两阶段提交ServicepublicclassOrderService{GlobalTransactionalpublicvoidcreateOrder(OrderRequestrequest){// 1. 本地操作inventoryService.reduceStock(request.getProductId());orderService.create(request);// 2. 调用第三方第一阶段准备ThirdPartyPrepareResponseprepareResponsethirdPartyService.prepare(request);if(!prepareResponse.isPrepared()){thrownewBusinessException(第三方准备失败);}// 3. 记录第三方事务IDStringthirdPartyTxIdprepareResponse.getTransactionId();orderMapper.updateThirdPartyTxId(request.getOrderId(),thirdPartyTxId);// 如果后续步骤失败需要调用第三方的 Cancel 接口}/** * 事务提交后的回调需要 Seata 的 Hook */TransactionalEventListener(phaseTransactionPhase.AFTER_COMMIT)publicvoidafterCommit(OrderCreatedEventevent){// 事务提交后调用第三方的 Confirm 接口try{thirdPartyService.confirm(event.getThirdPartyTxId());}catch(Exceptione){log.error(第三方确认失败需要补偿: txId{},event.getThirdPartyTxId(),e);// 记录补偿日志recordCompensationLog(event.getOrderId(),THIRD_PARTY_CONFIRM_FAILED);}}/** * 事务回滚后的回调 */TransactionalEventListener(phaseTransactionPhase.AFTER_ROLLBACK)publicvoidafterRollback(OrderCreatedEventevent){// 事务回滚后调用第三方的 Cancel 接口if(event.getThirdPartyTxId()!null){try{thirdPartyService.cancel(event.getThirdPartyTxId());}catch(Exceptione){log.error(第三方取消失败: txId{},event.getThirdPartyTxId(),e);}}}}注意这个方案需要第三方系统支持两阶段提交协议实际项目中很少见。最佳实践建议对于异构系统分布式事务我推荐使用最大努力通知模式本地事务保证强一致性使用 Seata 保证本地数据库操作的原子性第三方调用异步化通过消息队列异步调用第三方避免阻塞本地事务补偿机制如果第三方调用失败通过重试或人工补偿保证最终一致性状态机管理使用状态机管理订单状态清晰标识每个阶段// 订单状态枚举publicenumOrderStatus{CREATED,// 已创建待支付PAYING,// 支付中已发送第三方通知PAID,// 已支付第三方确认成功PAY_FAILED,// 支付失败第三方返回失败CANCELLED,// 已取消REFUNDING,// 退款中REFUNDED// 已退款}总结异构系统分布式事务是 Seata 的局限性因为 Seata 只能管理被它代理的资源。对于第三方系统Seata AT 模式无法直接支持因为无法代理第三方系统的数据源推荐使用最大努力通知模式本地事务 异步通知 补偿机制如果第三方支持 TCC/Saga可以考虑使用对应的模式关键是要有补偿机制保证最终一致性记住分布式事务不是万能的对于异构系统最终一致性往往比强一致性更实际。如何选择模式对比特性AT 模式TCC 模式Saga 模式开发成本低只需加注解高需要实现三个方法中需要实现补偿性能中等需要 Undo Log高无需 Undo Log高无锁一致性强一致强一致最终一致锁机制全局锁业务锁无锁适用场景标准 CRUD 操作高性能核心链路长事务、异步流程数据库要求需要支持事务不需要不需要复杂度低高需要处理幂等等中选择建议使用 AT 模式的情况标准 CRUD 操作INSERT、UPDATE、DELETE 语句开发成本要求低希望快速接入分布式事务性能要求不是特别高TPS 5000强一致性要求需要强一致性保证同构系统所有参与的服务都是 Java 系统使用相同的数据源类型示例场景订单创建创建订单、扣库存、扣积分-都是本地服务支付流程扣款、更新订单状态、发通知-都是本地服务账户操作转账、充值、提现-都是本地服务不适用场景❌ 需要调用第三方系统如第三方支付、第三方物流❌ 需要调用非 Java 系统如 PHP、Python 服务❌ 第三方系统不支持 Seata使用 TCC 模式的情况高性能要求TPS 10000核心链路对性能要求极高的核心业务复杂业务逻辑无法用简单 SQL 实现的业务有充足开发资源可以投入时间实现 TCC 逻辑示例场景秒杀场景库存扣减账户余额操作需要精确控制复杂计算场景需要多步骤计算使用 Saga 模式的情况长事务事务执行时间不确定或很长异步流程可以接受最终一致性业务流程复杂涉及多个步骤需要灵活控制示例场景订单退款涉及多个服务可能需要人工审核数据同步跨系统数据同步工作流多步骤审批流程混合使用在实际项目中我们通常是混合使用ServicepublicclassOrderService{// 标准订单创建用 AT 模式GlobalTransactionalpublicvoidcreateOrder(OrderRequestrequest){inventoryService.reduceStock(request.getProductId());orderMapper.insert(order);pointService.deduct(request.getUserId());}// 秒杀订单用 TCC 模式GlobalTransactionalpublicvoidcreateSeckillOrder(OrderRequestrequest){inventoryTccService.tryReduceStock(request.getProductId());orderMapper.insert(order);// TCC Confirm 会在全局事务提交时自动调用}// 退款流程用 Saga 模式SagaStartpublicvoidrefundOrder(LongorderId){// 状态机控制流程sagaEngine.start(refundSaga,orderId);}}我们在项目中主要用 AT 模式只有对性能要求特别高的核心链路如秒杀才考虑用 TCC。对于退款、数据同步等长流程使用 Saga 模式。最佳实践1. 事务边界设计避免长事务// ❌ 不推荐事务范围太大GlobalTransactionalpublicvoidprocessOrder(OrderRequestrequest){// 1. 参数校验不需要事务validate(request);// 2. 业务处理需要事务createOrder(request);// 3. 发送消息不需要事务sendMessage(request);// 4. 记录日志不需要事务logOperation(request);}// ✅ 推荐只对需要的事务操作加注解publicvoidprocessOrder(OrderRequestrequest){// 1. 参数校验validate(request);// 2. 业务处理需要事务的部分doCreateOrder(request);// 3. 发送消息事务外sendMessage(request);// 4. 记录日志事务外logOperation(request);}GlobalTransactionalprivatevoiddoCreateOrder(OrderRequestrequest){createOrder(request);}合理拆分事务// ❌ 不推荐一个大事务处理所有逻辑GlobalTransactionalpublicvoidbatchProcess(ListOrderRequestrequests){for(OrderRequestrequest:requests){createOrder(request);sendNotification(request);updateStatistics(request);}}// ✅ 推荐按业务语义拆分publicvoidbatchProcess(ListOrderRequestrequests){// 每个订单一个事务for(OrderRequestrequest:requests){processSingleOrder(request);}// 统计更新用一个事务updateStatisticsBatch(requests);}GlobalTransactionalprivatevoidprocessSingleOrder(OrderRequestrequest){createOrder(request);sendNotification(request);}2. 异常处理统一异常处理GlobalTransactional(rollbackForException.class)publicvoidcreateOrder(OrderRequestrequest){try{inventoryService.reduceStock(request.getProductId());orderService.create(request);pointService.deduct(request.getUserId());}catch(BusinessExceptione){// 业务异常需要回滚log.error(订单创建失败,e);throwe;// 抛出异常触发回滚}catch(Exceptione){// 系统异常也需要回滚log.error(系统异常,e);thrownewSystemException(订单创建失败,e);}}区分可回滚和不可回滚的异常GlobalTransactional(rollbackFor{BusinessException.class,SystemException.class})publicvoidcreateOrder(OrderRequestrequest){// 如果抛出 BusinessException 或 SystemException会回滚// 如果抛出其他异常如日志记录失败不回滚inventoryService.reduceStock(request.getProductId());orderService.create(request);try{logService.log(request);// 日志失败不影响主事务}catch(Exceptione){log.error(日志记录失败,e);}}3. 幂等性设计使用唯一键保证幂等GlobalTransactionalpublicvoidcreateOrder(OrderRequestrequest){// 使用订单号作为唯一键StringorderNogenerateOrderNo(request);// 先检查是否已存在OrderexistOrderorderMapper.selectByOrderNo(orderNo);if(existOrder!null){log.warn(订单已存在直接返回: orderNo{},orderNo);returnexistOrder;}// 创建订单数据库唯一键约束也会保证幂等OrderordernewOrder();order.setOrderNo(orderNo);orderMapper.insert(order);returnorder;}使用分布式锁保证幂等ServicepublicclassOrderService{AutowiredprivateRedisTemplateString,StringredisTemplate;GlobalTransactionalpublicvoidcreateOrder(OrderRequestrequest){StringlockKeyorder:create:request.getOrderNo();StringlockValueUUID.randomUUID().toString();try{// 尝试获取锁BooleanlockedredisTemplate.opsForValue().setIfAbsent(lockKey,lockValue,30,TimeUnit.SECONDS);if(!locked){thrownewBusinessException(订单正在处理中请勿重复提交);}// 执行业务逻辑doCreateOrder(request);}finally{// 释放锁releaseLock(lockKey,lockValue);}}}4. 监控和告警集成 Prometheus 监控ComponentpublicclassSeataMetricsCollector{privatefinalCountertransactionBeginCounter;privatefinalCountertransactionCommitCounter;privatefinalCountertransactionRollbackCounter;privatefinalTimertransactionDurationTimer;publicSeataMetricsCollector(MeterRegistrymeterRegistry){this.transactionBeginCounterCounter.builder(seata.transaction.begin).description(全局事务开始次数).register(meterRegistry);this.transactionCommitCounterCounter.builder(seata.transaction.commit).description(全局事务提交次数).register(meterRegistry);this.transactionRollbackCounterCounter.builder(seata.transaction.rollback).description(全局事务回滚次数).register(meterRegistry);this.transactionDurationTimerTimer.builder(seata.transaction.duration).description(全局事务耗时).register(meterRegistry);}EventListenerpublicvoidonGlobalTransactionBegin(GlobalTransactionBeginEventevent){transactionBeginCounter.increment();}EventListenerpublicvoidonGlobalTransactionCommit(GlobalTransactionCommitEventevent){transactionCommitCounter.increment();transactionDurationTimer.record(Duration.between(event.getBeginTime(),Instant.now()));}EventListenerpublicvoidonGlobalTransactionRollback(GlobalTransactionRollbackEventevent){transactionRollbackCounter.increment();}}告警配置# Prometheus 告警规则groups:-name:seata_alertsrules:-alert:SeataHighRollbackRateexpr:rate(seata_transaction_rollback_total[5m]) / rate(seata_transaction_begin_total[5m])0.1for:5mannotations:summary:Seata 回滚率过高description:过去5分钟回滚率超过10%-alert:SeataHighTransactionDurationexpr:histogram_quantile(0.99,seata_transaction_duration_seconds_bucket)5for:5mannotations:summary:Seata 事务耗时过高description:P99 事务耗时超过5秒5. 压测和性能调优压测脚本示例SpringBootTestpublicclassSeataPerformanceTest{AutowiredprivateOrderServiceorderService;TestpublicvoidtestCreateOrderPerformance(){intthreadCount100;intrequestPerThread100;CountDownLatchlatchnewCountDownLatch(threadCount);AtomicIntegersuccessCountnewAtomicInteger(0);AtomicIntegerfailCountnewAtomicInteger(0);ListLongdurationsnewCopyOnWriteArrayList();ExecutorServiceexecutorExecutors.newFixedThreadPool(threadCount);longstartTimeSystem.currentTimeMillis();for(inti0;ithreadCount;i){executor.submit(()-{try{for(intj0;jrequestPerThread;j){longbeginSystem.currentTimeMillis();try{orderService.createOrder(createTestRequest());successCount.incrementAndGet();}catch(Exceptione){failCount.incrementAndGet();}finally{durations.add(System.currentTimeMillis()-begin);}}}finally{latch.countDown();}});}latch.await();longtotalTimeSystem.currentTimeMillis()-startTime;// 统计结果doubletps(double)(successCount.get()failCount.get())*1000/totalTime;doubleavgDurationdurations.stream().mapToLong(Long::longValue).average().orElse(0);longp99Durationdurations.stream().sorted().skip((long)(durations.size()*0.99)).findFirst().orElse(0L);System.out.println(总请求数: (successCount.get()failCount.get()));System.out.println(成功数: successCount.get());System.out.println(失败数: failCount.get());System.out.println(TPS: tps);System.out.println(平均耗时: avgDurationms);System.out.println(P99耗时: p99Durationms);}}性能调优参数seata:client:rm:# 异步提交缓冲区大小async-commit-buffer-limit:10000# 报告重试次数report-retry-count:5# 锁重试间隔毫秒lock-retry-interval:10# 锁重试次数lock-retry-times:30tm:# 提交重试次数commit-retry-count:5# 回滚重试次数rollback-retry-count:5undo:# 只记录变更字段only-care-update-columns:true# 使用高效的序列化方式log-serialization:kryo6. 故障排查常见问题排查清单事务不回滚检查是否抛出了异常检查rollbackFor配置检查是否有try-catch吞掉了异常Undo Log 清理失败检查数据库连接是否正常检查 Undo Log 表结构是否正确检查是否有长时间未提交的事务性能下降检查 Undo Log 表大小定期清理检查是否有长事务检查数据库连接池配置XID 传递失败检查 Feign/Dubbo 拦截器是否配置检查请求头/上下文是否被过滤检查异步调用是否正确传递 XID日志配置logging:level:io.seata:DEBUGio.seata.rm.datasource:DEBUGio.seata.tm:DEBUGpattern:console:%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n总结分布式事务是一个复杂的问题没有银弹。Seata 提供了一套相对完善的解决方案但在实际使用中还是会遇到各种问题。我的建议是优先考虑是否真的需要强一致性很多时候最终一致性就足够了可以用消息队列、事件驱动等方式实现。如果确实需要分布式事务AT 模式是个不错的起点开发成本低适合大部分场景。注意性能影响做好监控和优化定期检查 Undo Log 表大小监控事务耗时和回滚率。关键业务要做好降级方案比如可以手动补偿数据或者提供数据修复工具。合理设计事务边界避免长事务合理拆分事务。做好幂等性设计使用唯一键、分布式锁等方式保证幂等。最后分布式事务只是解决数据一致性的一种手段但不是唯一手段。在架构设计时可以通过合理拆分服务、减少跨服务事务来降低复杂度。毕竟最好的分布式事务就是没有分布式事务。如果业务允许尽量通过以下方式避免分布式事务数据库拆分时考虑业务边界相关的数据尽量放在同一个数据库中使用最终一致性对于非核心业务可以使用消息队列保证最终一致性业务补偿对于一些场景可以接受短暂不一致通过补偿机制最终达到一致分布式事务是一把双刃剑它在解决一致性问题的同时也带来了复杂性和性能损耗。我们需要在一致性和性能之间找到平衡点。附录A. 完整项目结构示例seata-demo/ ├── seata-server/ # Seata Server 配置 │ ├── conf/ │ │ └── application.yml │ └── script/ │ └── db/ │ └── mysql.sql ├── order-service/ # 订单服务 │ ├── src/main/java/ │ │ └── com/example/order/ │ │ ├── OrderApplication.java │ │ ├── config/ │ │ │ └── DataSourceConfig.java │ │ ├── controller/ │ │ │ └── OrderController.java │ │ ├── service/ │ │ │ └── OrderService.java │ │ └── mapper/ │ │ └── OrderMapper.java │ └── src/main/resources/ │ └── application.yml ├── inventory-service/ # 库存服务 │ └── ... ├── point-service/ # 积分服务 │ └── ... └── common/ # 公共模块 └── ...B. 常用 SQL 脚本创建 Undo Log 表CREATETABLEundo_log(idBIGINT(20)NOTNULLAUTO_INCREMENTCOMMENTincrement id,branch_idBIGINT(20)NOTNULLCOMMENTbranch transaction id,xidVARCHAR(100)NOTNULLCOMMENTglobal transaction id,contextVARCHAR(128)NOTNULLCOMMENTundo_log context,such as serialization,rollback_infoLONGBLOBNOTNULLCOMMENTrollback info,log_statusINT(11)NOTNULLCOMMENT0:normal status,1:defense status,log_createdDATETIME(6)NOTNULLCOMMENTcreate datetime,log_modifiedDATETIME(6)NOTNULLCOMMENTmodify datetime,PRIMARYKEY(id),UNIQUEKEYux_undo_log(xid,branch_id))ENGINEInnoDBAUTO_INCREMENT1DEFAULTCHARSETutf8COMMENTAT transaction mode undo table;清理历史 Undo Log-- 清理7天前的 Undo LogDELETEFROMundo_logWHERElog_createdDATE_SUB(NOW(),INTERVAL7DAY)ANDlog_status1;-- 或者定期归档CREATETABLEundo_log_archiveLIKEundo_log;INSERTINTOundo_log_archiveSELECT*FROMundo_logWHERElog_createdDATE_SUB(NOW(),INTERVAL7DAY);DELETEFROMundo_logWHERElog_createdDATE_SUB(NOW(),INTERVAL7DAY);C. 配置参考Nacos 配置示例在 Nacos 中创建配置Data ID:seataServer.propertiesGroup:SEATA_GROUP# 事务组配置 service.vgroupMapping.my_test_tx_groupdefault service.default.grouplist127.0.0.1:8091 service.enableDegradefalse service.disableGlobalTransactionfalse # 传输配置 transport.typeTCP transport.serverNIO transport.heartbeattrue transport.enableClientBatchSendRequesttrue # 存储配置 store.modedb store.db.datasourcedruid store.db.dbTypemysql store.db.driverClassNamecom.mysql.cj.jdbc.Driver store.db.urljdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatementstrue store.db.userroot store.db.passwordyour_password store.db.minConn5 store.db.maxConn100 store.db.globalTableglobal_table store.db.branchTablebranch_table store.db.lockTablelock_table store.db.queryLimit100 store.db.maxWait5000 # 事务配置 server.recovery.committingRetryPeriod1000 server.recovery.asynCommittingRetryPeriod1000 server.recovery.rollbackingRetryPeriod1000 server.recovery.timeoutRetryPeriod1000 server.maxCommitRetryTimeout-1 server.maxRollbackRetryTimeout-1 server.rollbackRetryTimeoutUnlockEnablefalse # 指标配置 metrics.enabledfalse metrics.registryTypecompact metrics.exporterListprometheus metrics.exporterPrometheusPort9898D. 常见错误及解决方案错误信息原因解决方案Cannot get table meta for table xxx表不存在或表名错误检查表名是否正确是否有权限Primary key is null表没有主键为表添加主键Global lock acquire failed全局锁获取失败检查是否有其他事务持有锁增加重试次数XID is not foundXID 传递失败检查 Feign/Dubbo 拦截器配置Undo log not foundUndo Log 不存在检查 Undo Log 表是否正确创建Branch transaction timeout分支事务超时增加超时时间或优化业务逻辑第三方系统调用失败本地数据未回滚第三方系统无法参与 Seata 事务使用最大努力通知模式异步调用第三方E. 性能基准测试结果测试环境CPU: 8核内存: 16GB数据库: MySQL 5.7Seata Server: 单机部署测试场景3个服务参与分布式事务每个服务执行1次 UPDATE 操作测试结果并发数TPS平均耗时P99耗时成功率100450220ms450ms99.8%5001200420ms850ms99.5%10001800550ms1200ms98.9%20002200910ms2000ms97.5%结论在并发数 1000 时Seata AT 模式性能表现良好当并发数 1000 时建议使用 TCC 模式或优化数据库性能参考资料Seata 官方文档Seata GitHub分布式事务解决方案Seata 源码解析分布式事务理论基础作者注这篇文章是基于我们在生产环境使用 Seata 的实践经验总结而成。如果你在使用过程中遇到问题欢迎交流讨论。也欢迎关注我的技术博客我会持续分享更多分布式系统的实践经验。