# Seata 分布式事务
# 概述
分布式事务指事务的操作位于不同的服务节点上,因此需要服务与服务之间远程协作才能完成事务操作,这种分布式系统环境下由不同的服务之间通过网络远程协作完成事务称之为分布式事务。
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 为用户提供了 AT、TCC、XA 和 SAGA(使用较复杂,不建议使用) 四种事务模式,为用户打造一站式的分布式解决方案。
Seata术语
- TC (Transaction Coordinator) - 事务协调者
负责维护全局和分支事务的状态,驱动全局事务提交或回滚,即seata-server端服务。
- TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务,即请求发起方。
- RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚,即请求参与方。
工作流程
1.TM 请求 TC,开始一个新的全局事务,TC 会为这个全局事务生成一个 XID。
2.XID 通过微服务的调用链传递到其他微服务。
3.RM 把本地事务作为这个XID的分支事务注册到TC。
4.TM 请求 TC 对这个 XID 进行提交或回滚。
5.TC 指挥这个 XID 下面的所有分支事务进行提交、回滚。
- 官方文档地址:https://seata.io/zh-cn/docs/overview/what-is-seata.html (opens new window)
- 源码地址:https://github.com/seata/seata (opens new window)
下面详细介绍下每种模式的使用情况。
# AT模式
# AT模式介绍
运行机制
基于两阶段提交协议的演变,一阶段记录业务数据和回滚日志并在同一个本地事务中提交,释放本地锁和连接资源;二阶段异步提交及清理回滚日志或通过一阶段的回滚日志进行反向补偿回滚。
适用场景
基于支持本地 ACID 事务的关系型数据库,适用于公司内部绝大部分业务场景。
优势
业务无侵入,使用简单(使用注解即可)。
劣势
在本地事务提交前,要尝试先拿到该记录的全局锁,对性能有一定影响;AT 需要做 SQL 解析,在 SQL 支持上不及利用本地事务的XA模式,具体 SQL 限制参考官方文档 SQL限制 (opens new window)
# 服务端配置
# windows 版本-适用于本地调试
1.下载安装包点击去下载 (opens new window),下载完成后解压
2.进入解压后的seata-server-1.4.2
的conf
目录,修改registry.conf
文件,这是seata服务端的核心配置文件,可以通过该文件配置服务注册方式、配置读取方式等。
配置如下(需要重点关注注释的配置)
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa-注册中心方式,此处选择nacos
type = "nacos"
nacos {
## seata服务的应用名
application = "seata-server"
## seata服务注册地址
serverAddr = "114.242.246.250:8040"
## seata服务注册群组
group = "SEATA_GROUP"
## seata服务的命名空间ID
namespace = "928cc136-e6e5-4189-a164-9693bc5618c1"
## seata服务集群名称
cluster = "default"
## nacos用户名
username = "nacos"
## nacos密码
password = "nacos"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = 0
password = ""
cluster = "default"
timeout = 0
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
aclToken = ""
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3-配置中心方式,此处选择nacos
type = "nacos"
nacos {
## 配置中心地址
serverAddr = "114.242.246.250:8040"
## 配置中心命名空间ID
namespace = "928cc136-e6e5-4189-a164-9693bc5618c1"
## 配置中心群组名称
group = "SEATA_GROUP"
## nacos用户名
username = "nacos"
## nacos密码
password = "nacos"
## seata配置文件的dataId
dataId = "seataServer.properties"
}
consul {
serverAddr = "127.0.0.1:8500"
aclToken = ""
}
apollo {
appId = "seata-server"
## apolloConfigService will cover apolloMeta
apolloMeta = "http://192.168.1.204:8801"
apolloConfigService = "http://192.168.1.204:8080"
namespace = "application"
apolloAccesskeySecret = ""
cluster = "seata"
}
zk {
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
nodePath = "/seata/seata.properties"
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
3.在nacos上新建命名空间,名称为seata
,ID与步骤2中nacos.namespace
保持一致
在seata
命名空间下新建配置,Data ID与步骤2中config.nacos.dataId
保持一致;Group与config.nacos.group
保持一致
配置如下(重点关注注释部分),完整配置描述请参考官网参数配置 (opens new window)
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
#事务群组,my_test_tx_group为分组(可自定义),配置项值为TC集群名(也可自定义)
service.vgroupMapping.my_test_tx_group=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=20
client.rm.lock.retryTimes=60
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=true
client.rm.sagaBranchRegisterEnable=true
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
#事务会话信息存储方式为db数据库
store.mode=db
#store.publicKey=
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
#事务会话信息存储数据库地址,创建脚本见后续步骤
store.db.url=jdbc:mysql://localhost:3306/seata?useUnicode=true&rewriteBatchedStatements=true
#事务会话信息存储数据库用户名
store.db.user=root
#事务会话信息存储数据库密码
store.db.password=root
store.db.minConn=5
store.db.maxConn=100
#全局事务的表名,创建脚本见后续步骤
store.db.globalTable=global_table
#分支事务的表名,创建脚本见后续步骤
store.db.branchTable=branch_table
store.db.queryLimit=100
##全局锁的表名,创建脚本见后续步骤
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.mode=single
store.redis.single.host=127.0.0.1
store.redis.single.port=6379
#store.redis.sentinel.masterName=
#store.redis.sentinel.sentinelHosts=
store.redis.maxConn=10
store.redis.minConn=1
store.redis.maxTotal=100
store.redis.database=0
#store.redis.password=
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=kryo
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
4.在步骤3中配置的store.db.url
所指定的schema下创建以下表
- 全局事务表
--Mysql
CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`status` tinyint(4) NOT NULL,
`application_id` varchar(32) DEFAULT NULL,
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL,
`timeout` int(11) DEFAULT NULL,
`begin_time` bigint(20) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`) USING BTREE,
KEY `idx_status_gmt_modified` (`status`,`gmt_modified`) USING BTREE,
KEY `idx_transaction_id` (`transaction_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
--Oracle
CREATE TABLE global_table
(
xid VARCHAR2(128) NOT NULL,
transaction_id NUMBER(19),
status NUMBER(3) NOT NULL,
application_id VARCHAR2(32),
transaction_service_group VARCHAR2(32),
transaction_name VARCHAR2(128),
timeout NUMBER(10),
begin_time NUMBER(19),
application_data VARCHAR2(2000),
gmt_create TIMESTAMP(0),
gmt_modified TIMESTAMP(0),
PRIMARY KEY (xid)
);
CREATE INDEX idx_gmt_modified_status ON global_table (gmt_modified, status);
CREATE INDEX idx_transaction_id ON global_table (transaction_id);
- 分支事务表
--Mysql
CREATE TABLE `branch_table` (
`branch_id` bigint(20) NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint(4) DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
PRIMARY KEY (`branch_id`) USING BTREE,
KEY `idx_xid` (`xid`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
--Oracle
CREATE TABLE branch_table
(
branch_id NUMBER(19) NOT NULL,
xid VARCHAR2(128) NOT NULL,
transaction_id NUMBER(19),
resource_group_id VARCHAR2(32),
resource_id VARCHAR2(256),
branch_type VARCHAR2(8),
status NUMBER(3),
client_id VARCHAR2(64),
application_data VARCHAR2(2000),
gmt_create TIMESTAMP(6),
gmt_modified TIMESTAMP(6),
PRIMARY KEY (branch_id)
);
CREATE INDEX idx_xid ON branch_table (xid);
- 全局锁表
--Mysql
CREATE TABLE `lock_table` (
`row_key` varchar(128) NOT NULL,
`xid` varchar(128) DEFAULT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`branch_id` bigint(20) NOT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`table_name` varchar(32) DEFAULT NULL,
`pk` text,
`status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`row_key`) USING BTREE,
KEY `idx_status` (`status`) USING BTREE,
KEY `idx_branch_id` (`branch_id`) USING BTREE,
KEY `idx_xid_and_branch_id` (`xid`,`branch_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
--Oracle
CREATE TABLE lock_table
(
row_key VARCHAR2(128) NOT NULL,
xid VARCHAR2(128),
transaction_id NUMBER(19),
branch_id NUMBER(19) NOT NULL,
resource_id VARCHAR2(256),
table_name VARCHAR2(32),
pk VARCHAR2(36),
gmt_create TIMESTAMP(0),
gmt_modified TIMESTAMP(0),
PRIMARY KEY (row_key)
);
CREATE INDEX idx_branch_id ON lock_table (branch_id);
5.进入seata-server-1.4.2
的bin
目录,双击seata-server.bat
即可启动seata-server服务。
效果如下即启动成功
# linux 版本-正式环境
1.下载安装包
wget https://github.com/seata/seata/releases/download/v1.4.2/seata-server-1.4.2.tar.gz
如服务器无法访问github,也可下载安装包后手动上传到服务器点击去下载 (opens new window)
下载完成后解压
tar -xvf seata-server-1.4.2.tar.gz
解压后目录如下
2.进入目录/seata/seata-server-1.4.2/conf
,修改registry.conf
文件,具体参考windows版步骤2
3.在nacos上新建命名空间与配置文件,创建过程参考windows版步骤3
4.创建数据库表,脚本参考windows版步骤4。
5.进入目录/seata/seata-server-1.4.2/bin
,执行以下脚本命令启动服务,详细启动参数可参考支持的启动参数 (opens new window)
nohup sh seata-server.sh -p 8092 -h 127.0.0.1 -m db > ../logs/seata-start.log 2>&1 &
查看logs/seata-start.log
日志效果如下即启动成功
# 客户端配置
1.各业务数据库执行脚本
- Mysql数据库
CREATE TABLE `undo_log` (
`branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',
`xid` varchar(128) NOT NULL COMMENT 'global transaction id',
`context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` longblob NOT NULL COMMENT 'rollback info',
`log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` datetime(6) NOT NULL COMMENT 'create datetime',
`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='AT transaction mode undo table';
- Oracle数据库
CREATE TABLE undo_log
(
id NUMBER(19) NOT NULL,
branch_id NUMBER(19) NOT NULL,
xid VARCHAR2(128) NOT NULL,
context VARCHAR2(128) NOT NULL,
rollback_info BLOB NOT NULL,
log_status NUMBER(10) NOT NULL,
log_created TIMESTAMP(0) NOT NULL,
log_modified TIMESTAMP(0) NOT NULL,
PRIMARY KEY (id),
CONSTRAINT ux_undo_log UNIQUE (xid, branch_id)
);
COMMENT ON TABLE undo_log IS 'AT transaction mode undo table';
-- Generate ID using sequence and trigger
CREATE SEQUENCE UNDO_LOG_SEQ START WITH 1 INCREMENT BY 1;
2.各服务添加seata相关配置
#seata相关配置
seata:
#是否开启spring-boot自动装配,默认true
enabled: true
#是否开启数据源自动代理,默认true,如果使用多租户动态数据源功能则需关闭此按钮
enableAutoDataSourceProxy: true
#数据源代理模式 可选值AT、XA,默认为AT
data-source-proxy-mode: AT
#事务分组配置项,程序会拼接[service.vgroupMapping.事务分组配置项]这样一个完整配置去配置中心上的seata-server
#配置文件seataServer.properties中查找该完整配置的值,其值就是TC集群的名称,然后就可以根据集群名称获取真实的
#TC服务列表。所以要求该配置项必须与seataServer.properties中[service.vgroupMapping.xxx=集群名称]的xxx相同
tx-service-group: my_test_tx_group
registry:
type: nacos
nacos:
#seata服务端应用名称,与registry.conf中保持一致
application: seata-server
#seata服务端在注册中心上的地址,与registry.conf中保持一致
server-addr: 114.242.246.250:8040
#seata服务端在注册中心上的分组,与registry.conf中保持一致
group : SEATA_GROUP
#seata服务端在注册中心上的命名空间,与registry.conf中保持一致
namespace: 928cc136-e6e5-4189-a164-9693bc5618c1
#seata服务端所属注册中心用户名,与registry.conf中保持一致
username: nacos
#seata服务端所属注册中心密码,与registry.conf中保持一致
password: nacos
#TC集群名称,与registry.conf中保持一致
cluster: default
config:
type: nacos
nacos:
#seata服务端在配置中心的地址,与registry.conf中保持一致
server-addr: 114.242.246.250:8040
#seata服务端在配置中心的分组,与registry.conf中保持一致
group : SEATA_GROUP
#seata服务端在配置中心的命名空间,与registry.conf中保持一致
namespace: 928cc136-e6e5-4189-a164-9693bc5618c1
#seata服务端在配置中心的配置文件名称,与registry.conf中保持一致
dataId: seataServer.properties
#seata服务端所属配置中心用户名,与registry.conf中保持一致
username: nacos
#seata服务端所属配置中心密码,与registry.conf中保持一致
password: nacos
3.关闭 Ribbon 的重试机制
# v4.3 之前版本需设置,之后 ribbon 已经移除,无需设置
ribbon:
MaxAutoRetriesNextServer: 0
为什么要关闭服务调用的重试?远程业务调用失败有两种可能:(1)远程业务执行失败 (2)远程业务执行成功,网络失败。对于第2种事务场景可能会进行重试,从而导致某个业务执行两次。如果业务上能够控制某个事务接口的幂等性,则不用关闭重试。
4.添加seata依赖
<dependency>
<groupId>com.mediway.hos</groupId>
<artifactId>hos-framework-seata-starter</artifactId>
</dependency>
5.发起方服务的方法上添加@GlobalTransactional注解即可。
# 注意事项
@GlobalTransactional注解只需要配置在发起方服务上即可,被调用服务无需配置。
异常需要能够保证被发起方服务感知到,如果发生异常但中途被捕获截断则 seata 无法发起回滚操作。
# AT模式使用示例
模拟员工成功签订合同给员工发放奖励的场景,服务间调用链为合同服务->员工服务->账户服务,合同服务为发起方。
1.合同服务controller
/**
* 签订合同-seata-AT模式演示demo
*
* @param staffId 员工id
* @param name 合同名称
* @param signer 合同签订者
* @param amount 奖励金额
* @return
*/
@ApiOperation(value = "签订合同")
@PostMapping("/signContract")
public BaseResponse<Contract> signContract(
@RequestParam("staffId") String staffId,
@RequestParam("name") String name,
@RequestParam("signer") String signer,
@RequestParam("amount") BigDecimal amount) {
return BaseResponse.success(contractService.signContract(staffId, name, signer, amount));
}
2.合同服务service接口及实现类
public interface ContractService extends BaseService<Contract> {
/**
* 签订合同-AT模式
*
* @param staffId 员工id
* @param name 合同名称
* @param signer 合同签订者
* @param amount 合同奖励金额
* @return
*/
Contract signContract(String staffId, String name, String signer, BigDecimal amount);
}
@Slf4j
@Service
public class ContractServiceImpl extends BaseServiceImpl<ContractMapper, Contract> implements ContractService {
@Autowired
private StaffFeignClient staffFeignClient;
/**
* 签订合同-AT模式
*
* @param staffId 员工id
* @param name 合同名称
* @param signer 合同签订者
* @param amount 合同奖励金额
* @return
*/
@Override
@GlobalTransactional(rollbackFor = Exception.class,name = "signContract")
public Contract signContract(String staffId, String name, String signer, BigDecimal amount) {
log.info("[签订合同]开始,请求入参,staffId:{},name:{},signer:{},amount:{}", staffId, name, signer, amount);
log.info("开始全局事务,XID = " + RootContext.getXID());
// 增加一条合同数据
Contract contract = addContract(name, signer);
// 给用户发放签订合同成功奖励
BaseResponse<Staff> staffBaseResponse = staffFeignClient.awardForSignContract(staffId, amount);
log.info("[给用户发放签订合同成功奖励]返回结果:{}", staffBaseResponse);
//此处需要加此判断,否则下游异常会被全局异常处理机制捕获,无法向上传递到TM,导致不回滚
if (!staffBaseResponse.isSuccess()) {
throw new RuntimeException("[给用户发放签订合同成功奖励]失败");
}
log.info("[签订合同]结束。。。。。。。。。。。]");
return contract;
}
/**
* 增加合同记录
*
* @param name 合同名称
* @param signer 合同签订者
* @return 新增的合同记录
*/
private Contract addContract(String name, String signer) {
Contract contract = new Contract();
contract.setName(name);
contract.setSigner(signer);
contract.setIsDeleted(0);
insert(contract);
return contract;
}
}
3.员工服务FeignClient
@FeignClient(value = "hos-user-service", path = "/staff")
public interface StaffFeignClient {
/**
* 给用户发放签订合同成功奖励-AT模式
*
* @param staffId 员工id
* @param amount 奖励金额
* @return
*/
@PostMapping("/awardForSignContract")
BaseResponse<Staff> awardForSignContract(@RequestParam String staffId, @RequestParam BigDecimal amount);
}
4.员工服务controller
@Slf4j
@Api(tags = "员工信息")
@RestController
@RequestMapping("/staff")
public class StaffController extends BaseController<Staff> {
@Autowired
private StaffService staffService;
/**
* 给用户发放签订合同成功奖励-AT模式
*
* @param staffId 员工id
* @param amount 奖励金额
* @return
*/
@ApiOperation(value = "给用户发放签订合同成功奖励-AT模式")
@PostMapping("/awardForSignContract")
public BaseResponse<Staff> awardForSignContract(@RequestParam("staffId") String staffId, @RequestParam("amount") BigDecimal amount) {
return BaseResponse.success(staffService.awardForSignContract(staffId, amount));
}
}
5.员工服务service接口及实现类
@Service
public interface StaffService extends BaseService<Staff> {
/**
* 给用户发放签订合同成功奖励-AT模式
*
* @param staffId 员工id
* @param amount 奖励金额
* @return
*/
Staff awardForSignContract(String staffId, BigDecimal amount);
}
@Slf4j
@Service
public class StaffServiceImpl extends BaseServiceImpl<StaffMapper, Staff> implements StaffService {
@Autowired
private AccountFeignClient accountFeignClient;
/**
* 给用户发放签订合同成功奖励-AT模式
*
* @param staffId 员工id
* @param amount 奖励金额
* @return
*/
@Override
public Staff awardForSignContract(String staffId, BigDecimal amount) {
log.info("[给用户发放签订合同成功奖励]开始,请求入参,staffId:{},amount:{}", staffId, amount);
log.info("开始全局事务,XID = " + RootContext.getXID());
// 修改员工描述为已签订合同
Staff staff = updateStaffDescForContract(staffId);
// 增加员工账户金额
BaseResponse<Account> response = accountFeignClient.addAccountAmount(staffId, amount);
log.info("[给账户增加余额]返回结果:{}", response);
//此处需要加此判断,否则下游异常会被全局异常处理机制捕获,无法向上传递到TM,导致不回滚
if (!response.isSuccess()) {
throw new RuntimeException("[给账户增加余额]失败");
}
log.info("[给用户发放签订合同成功奖励]结束。。。。。。。。。。。]");
return staff;
}
/**
* 更新员工描述为已签订合同
*
* @param staffId 员工id
* @return
*/
private Staff updateStaffDescForContract(String staffId) {
Staff staff = selectById(staffId);
if (staff == null) {
throw new RuntimeException("员工信息为空");
}
staff.setDescription("已成功签订合同");
staff.setUpdateTime(new Date());
updateById(staff);
return staff;
}
}
6.账户服务FeignClient
@FeignClient(value = "hos-account-service", path = "/account")
public interface AccountFeignClient {
/**
* 添加账户余额-AT模式
*
* @param staffId 员工id
* @param addAmount 增加金额
* @return 操作账户
*/
@PostMapping("/addAccountAmount")
BaseResponse<Account> addAccountAmount(@RequestParam String staffId, @RequestParam BigDecimal addAmount);
}
7.账户服务controller
@Slf4j
@RestController
@RequestMapping("/account")
@Api(value = "账户管理")
public class AccountController extends BaseController<Account> {
@Autowired
private AccountService accountService;
/**
* 添加账户余额-AT模式
*
* @param staffId 员工id
* @param addAmount 增加金额
* @return 操作账户
*/
@PostMapping("/addAccountAmount")
public BaseResponse<Account> addAccountAmount(@RequestParam("staffId") String staffId, @RequestParam("addAmount") BigDecimal addAmount) {
return BaseResponse.success(accountService.addAccountAmount(staffId, addAmount));
}
}
8.账户服务service接口及实现类
public interface AccountService extends BaseService<Account> {
/**
* 给账户增加余额-AT模式
*
* @param staffId 员工id
* @param addAmount 增加的金额
* @return
*/
Account addAccountAmount(String staffId, BigDecimal addAmount);
}
@Slf4j
@Service
public class AccountServiceImpl extends BaseServiceImpl<AccountMapper, Account> implements AccountService {
@Autowired
private AccountMapper accountMapper;
/**
* 给账户增加余额-AT/XA模式
*
* @param staffId 员工id
* @param addAmount 增加的金额
* @return
*/
@Override
public Account addAccountAmount(String staffId, BigDecimal addAmount) {
log.info("[添加账户余额]开始,请求入参,staffId:{},amount:{}", staffId, addAmount);
log.info("开始全局事务,XID = " + RootContext.getXID());
Account account = accountMapper.selectOne(Wrappers.<Account>lambdaQuery().eq(Account::getStaffId, staffId));
if (account == null) {
throw new RuntimeException("待操作账户不存在");
}
account.setAmount(account.getAmount().add(addAmount));
account.setUpdateTime(new Date());
updateById(account);
// todo 模拟失败
//int i = 10/0;
log.info("[添加账户余额]结束。。。。。。。。。。。]");
return account;
}
}
9.启动三个业务服务及网关服务,正常请求接口
http://localhost:7100/contract/contract/signContract?staffId=e2ab1960cd737111154d46878b5bccbb&name=20亿超级订单合同&signer=丽丽&amount=300.00
method:POST
接口响应结果
{
"code": "200",
"msg": "success",
"data": {
"id": "72941a3c269a37a4577c646699984d1b",
"createTime": "2022-06-02 09:59:32",
"updateTime": "2022-06-02 09:59:32",
"current": 0,
"size": 0,
"name": "20亿超级订单合同",
"signer": "丽丽",
"tenantId": null,
"isDeleted": 0,
"remark": null
},
"success": true
}
数据库结果,数据已经正确入库
10.模拟失败情况
修改AccountServiceImpl
,放开//int i = 10/0
前的注释,清空之前测试数据,再次发起请求。
接口响应结果
{
"code": "99001009",
"msg": "业务处理异常",
"data": null,
"success": false
}
代码日志
数据库效果
数据没有入库,进行了回滚,说明分布式事务生效。至此AT模式示例演示完成。
# TCC模式
# TCC模式介绍
运行机制
TCC是Try-Confirm-Cancel的简称,分别对应seata中的预处理Prepare、确认 Confirm、撤销Rollback。Prepare 操作做业务检查及资源预留,Confirm 做业务确认操作,Rollback 实现一个与 Prepare 相反的操作即回滚操作。一阶段执行自定义的 Prepare 操作,如果执行成功则二阶段会执行自定义的 Confirm 操作,否则执行 Rollback 操作进行回滚。
适用场景:
对于无法完全依赖于数据库事务特性的分布式事务,如涉及非关系型数据库与中间件的操作、跨公司服务的调用、跨语言的应用调用就可使用TCC模式。
优势:
使用灵活,不依赖于数据库的事务特性来实现两阶段提交,而是采用代码来实现;同时不需要对数据加全局锁,允许多个事务同时操作数据,因此性能很高。
劣势:
所有分支事务都要手动实现 Prepare、Confirm、Rollback 三个方法, 对业务代码侵入性很强;需要在代码中处理各种异常,所以要将各种情况考虑全面,因为在分布式环境中,出现网络超时、重发,机器宕机等一系列的异常,一旦这些异常情况没有处理或者处理不当,就可能导致业务数据错误。
# 服务端配置
配置流程同AT模式
# 客户端配置
1.各服务添加seata相关配置
#seata相关配置
seata:
#是否开启spring-boot自动装配,默认true
enabled: true
#是否开启数据源自动代理,默认true,如果使用多租户动态数据源功能则需关闭此按钮
enableAutoDataSourceProxy: true
#数据源代理模式 可选值AT、XA,默认为AT
data-source-proxy-mode: AT
#事务分组配置项,程序会拼接[service.vgroupMapping.事务分组配置项]这样一个完整配置去配置中心上的seata-server
#配置文件seataServer.properties中查找该完整配置的值,其值就是TC集群的名称,然后就可以根据集群名称获取真实的
#TC服务列表。所以要求该配置项必须与seataServer.properties中[service.vgroupMapping.xxx=集群名称]的xxx相同
tx-service-group: my_test_tx_group
registry:
type: nacos
nacos:
#seata服务端应用名称,与registry.conf中保持一致
application: seata-server
#seata服务端在注册中心上的地址,与registry.conf中保持一致
server-addr: 114.242.246.250:8040
#seata服务端在注册中心上的分组,与registry.conf中保持一致
group : SEATA_GROUP
#seata服务端在注册中心上的命名空间,与registry.conf中保持一致
namespace: 928cc136-e6e5-4189-a164-9693bc5618c1
#seata服务端所属注册中心用户名,与registry.conf中保持一致
username: nacos
#seata服务端所属注册中心密码,与registry.conf中保持一致
password: nacos
#TC集群名称,与registry.conf中保持一致
cluster: default
config:
type: nacos
nacos:
#seata服务端在配置中心的地址,与registry.conf中保持一致
server-addr: 114.242.246.250:8040
#seata服务端在配置中心的分组,与registry.conf中保持一致
group : SEATA_GROUP
#seata服务端在配置中心的命名空间,与registry.conf中保持一致
namespace: 928cc136-e6e5-4189-a164-9693bc5618c1
#seata服务端在配置中心的配置文件名称,与registry.conf中保持一致
dataId: seataServer.properties
#seata服务端所属配置中心用户名,与registry.conf中保持一致
username: nacos
#seata服务端所属配置中心密码,与registry.conf中保持一致
password: nacos
2.添加seata依赖
<dependency>
<groupId>com.mediway.hos</groupId>
<artifactId>hos-framework-seata-starter</artifactId>
</dependency>
3.定义被调用服务
3.1.TCC接口
TCC模式的接口上需要添加@LocalTCC
注解,表明这是一个TCC接口
示例:
@LocalTCC
public interface AccountTccService {
}
3.2.接口方法
接口需要定义三个方法,分别对应Try、Confirm 和 Cancel三个操作。其中对应Try操作的方法上需要添加@TwoPhaseBusinessAction
注解,该注解含有三个属性如下
name:业务操作唯一标识
commitMethod:提交时执行的方法,与对应Confirm操作的方法名称保持一致,默认值为commit
rollbackMethod:回滚时执行的方法,与对应Cancel操作的方法名称保持一致,默认值为rollback
示例:
/**
* 给账户增加余额-TCC-prepare方法
*
* @param staffId 员工id
* @param addAmount 增加的金额
* @return
*/
@TwoPhaseBusinessAction(name = "addAccountAmountPrepare", commitMethod = "addAccountAmountCommit", rollbackMethod = "addAccountAmountRollBack")
void addAccountAmountPrepare(
@BusinessActionContextParameter(paramName = "staffId") String staffId,
@BusinessActionContextParameter(paramName = "addAmount") BigDecimal addAmount);
/**
* 给账户增加余额-TCC-commit方法
*
* @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
* @return
*/
boolean addAccountAmountCommit(BusinessActionContext context);
/**
* 给账户增加余额-TCC-rollback方法(与prepare方法相反的操作)
*
* @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
* @return
*/
boolean addAccountAmountRollBack(BusinessActionContext context);
3.3.参数注解@BusinessActionContextParameter
该注解用来修饰 Try 方法的入参,被修饰的入参可以在 Commit 方法和 Rollback 方法中通过 BusinessActionContext 获取。
3.4.TCC事务上下文对象BusinessActionContext
TCC事务上下文对象,可以通过该对象获取到 Try 方法的入参
4.发起方服务的方法上添加@GlobalTransactional注解。
# 注意事项
- @GlobalTransactional注解只需要配置在发起方服务上即可,被调用服务无需配置。
- 异常需要能够保证被发起方服务感知到,如果发生异常但中途被捕获截断则 seata 无法发起回滚操作。
- Confirm 和 Cancel方法一定要保证执行成功,如果执行异常或返回结果为false,则seata会不断进行重试,直到执行成功。所以建议如果这两个方法一直执行失败,需要人工参与解决失败问题。
- Try、Confirm 和 Cancel三个操作都要保证接口幂等性。
- 空回滚问题:在没用调用Try方法的情况下,先调用了第二阶段的Cancel方法。解决思路是Cancel方法需要识别出这是一个空回滚,然后直接返回成功即可。具体方法例如可以在Try方法执行时在操作记录表中添加一条记录,如果执行Cancel方法时查询该表没有记录,则表明Try方法没有执行,即当前就是空回滚,此时直接返回成功即可。
- 悬挂问题:第二阶段的Cancel方法比第一阶段的Try方法先执行。由于允许空回滚,在Cancel方法先执行后,此时如果再执行Try方法,那么Try方法预留的业务资源后续无人能够处理,导致资源悬挂。解决方法是在执行Try方法时判断操作记录表是否已被Cancel方法修改过,如果被修改过则不再执行后续操作,直接返回即可。
# TCC模式使用示例
模拟员工成功签订合同给员工发放奖励的场景,服务间调用链为合同业务分别调用合同服务(本模块调用)、员工服务(跨模块调用)、账户服务(跨模块调用),合同服务为发起方。
1.合同服务controller
/**
* 签订合同-seata-TCC模式演示demo
*
* @param staffId 员工id
* @param name 合同名称
* @param signer 合同签订者
* @param amount 奖励金额
* @return
*/
@ApiOperation(value = "签订合同")
@PostMapping("/signContractTcc")
public BaseResponse signContractTcc(
@RequestParam("staffId") String staffId,
@RequestParam("name") String name,
@RequestParam("signer") String signer,
@RequestParam("amount") BigDecimal amount) {
contractBiz.signContractTcc(staffId, name, signer, amount);
return BaseResponse.success();
}
2.合同业务biz
@Slf4j
@Component
public class ContractBiz {
@Autowired
private ContractTccService contractTccService;
@Autowired
private StaffFeignClient staffFeignClient;
@Autowired
private AccountFeignClient accountFeignClient;
/**
* 签订合同-TCC模式-TM(服务发起方)
*
* @param staffId 员工id
* @param name 合同名称
* @param signer 合同签订者
* @param amount 合同奖励金额
* @return
*/
@GlobalTransactional(rollbackFor = Exception.class, name = "signContractTcc")
public void signContractTcc(String staffId, String name, String signer, BigDecimal amount) {
log.info("[签订合同-TCC模式]开始执行,staffId:{},name:{},signer:{},amount:{}", staffId, name, signer, amount);
// 合同业务
contractTccService.signContractPrepare(name, signer);
// 员工业务
BaseResponse staffBaseResponse = staffFeignClient.awardForSignContractTcc(staffId);
log.info("[给用户发放签订合同成功奖励]返回结果:{}", staffBaseResponse);
if (!staffBaseResponse.isSuccess()) {
throw new RuntimeException("[给用户发放签订合同成功奖励]失败");
}
// 账户业务
BaseResponse accountBaseResponse = accountFeignClient.addAccountAmountTcc(staffId, amount);
log.info("[给账户增加余额]返回结果:{}", accountBaseResponse);
if (!accountBaseResponse.isSuccess()) {
throw new RuntimeException("[给账户增加余额]失败");
}
log.info("[签订合同-TCC模式]执行结束,staffId:{},name:{},signer:{},amount:{}", staffId, name, signer, amount);
}
}
3.合同服务service接口及实现类
@LocalTCC
public interface ContractTccService {
/**
* 签订合同-TCC-prepare方法
*
* @param name 合同名称
* @param signer 合同签订者
* @return
*/
@TwoPhaseBusinessAction(name = "signContractPrepare", commitMethod = "signContractCommit", rollbackMethod = "signContractRollBack")
void signContractPrepare(
@BusinessActionContextParameter(paramName = "name") String name,
@BusinessActionContextParameter(paramName = "signer") String signer);
/**
* 签订合同-TCC-commit方法
*
* @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
* @return
*/
boolean signContractCommit(BusinessActionContext context);
/**
* 签订合同-TCC-rollback方法(与prepare方法相反的操作)
*
* @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
* @return
*/
boolean signContractRollBack(BusinessActionContext context);
}
@Slf4j
@Service
public class ContractTccServiceImpl implements ContractTccService {
@Autowired
private ContractMapper contractMapper;
/**
* 签订合同-TCC-prepare方法
*
* @param name 合同名称
* @param signer 合同签订者
* @return
*/
@Override
public void signContractPrepare(String name, String signer) {
log.info("--------->XID =" + RootContext.getXID() + " 合同服务prepare操作准备执行!");
// 全局事务id
String xid = RootContext.getXID();
// 幂等性校验
Contract contract = contractMapper.selectOne(Wrappers.<Contract>lambdaQuery()
.eq(Contract::getName, name).eq(Contract::getSigner, signer).eq(Contract::getRemark, xid));
if (contract != null) {
return;
}
// 添加合同记录
contract = new Contract();
contract.setName(name);
contract.setSigner(signer);
contract.setIsDeleted(0);
contract.setCreateTime(new Date());
// 备注字段存储全局事务id
contract.setRemark(xid);
contractMapper.insert(contract);
log.info("--------->XID =" + RootContext.getXID() + " 合同服务prepare成功!");
}
/**
* 签订合同-TCC-commit方法
*
* @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
* @return
*/
@Override
public boolean signContractCommit(BusinessActionContext context) {
log.info("--------->XID =" + RootContext.getXID() + " 合同服务commit操作准备执行!");
// 合同名称
String name = context.getActionContext("name").toString();
// 合同签订者
String signer = context.getActionContext("signer").toString();
// 全局事务id
String xid = context.getXid();
Contract contract = contractMapper.selectOne(Wrappers.<Contract>lambdaQuery()
.eq(Contract::getName, name).eq(Contract::getSigner, signer).eq(Contract::getRemark, xid));
if (contract != null) {
log.info("--------->XID =" + context.getXid() + " 合同服务commit成功!");
}
return contract != null;
}
/**
* 签订合同-TCC-rollback方法(与prepare方法相反的操作)
*
* @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
* @return
*/
@Override
public boolean signContractRollBack(BusinessActionContext context) {
log.info("--------->XID =" + RootContext.getXID() + " 合同服务rollback操作准备执行!");
// 合同名称
String name = context.getActionContext("name").toString();
// 合同签订者
String signer = context.getActionContext("signer").toString();
// 全局事务id
String xid = context.getXid();
// 幂等性校验+空回滚校验
LambdaQueryWrapper<Contract> queryWrapper = Wrappers.<Contract>lambdaQuery()
.eq(Contract::getName, name).eq(Contract::getSigner, signer).eq(Contract::getRemark, xid);
Contract contract = contractMapper.selectOne(queryWrapper);
if (contract == null) {
return true;
}
// 删除合同记录
contractMapper.delete(queryWrapper);
log.info("--------->XID =" + context.getXid() + " 合同服务rollback成功!");
return true;
}
}
4.员工服务FeignClient
@FeignClient(value = "hos-user-service", path = "/staff")
public interface StaffFeignClient {
/**
* 给用户发放签订合同成功奖励-TCC模式
*
* @param staffId 员工id
* @return
*/
@PostMapping("/awardForSignContractTcc")
BaseResponse awardForSignContractTcc(@RequestParam String staffId);
}
5.员工服务controller
@Slf4j
@Api(tags = "员工信息")
@RestController
@RequestMapping("/staff")
public class StaffController extends BaseController<Staff> {
@Autowired
private StaffTccService staffTccService;
/**
* 给用户发放签订合同成功奖励-TCC模式
*
* @param staffId 员工id
* @return
*/
@ApiOperation(value = "给用户发放签订合同成功奖励-TCC模式")
@PostMapping("/awardForSignContractTcc")
public BaseResponse awardForSignContractTcc(@RequestParam("staffId") String staffId) {
staffTccService.awardForSignContractPrepare(staffId);
return BaseResponse.success();
}
}
6.员工服务service接口及实现类
@Service
@LocalTCC
public interface StaffTccService {
/**
* 给用户发放签订合同成功奖励-TCC-prepare方法
*
* @param staffId 员工id
* @return
*/
@TwoPhaseBusinessAction(name = "awardForSignContractPrepare", commitMethod = "awardForSignContractCommit", rollbackMethod = "awardForSignContractRollBack")
void awardForSignContractPrepare(@BusinessActionContextParameter(paramName = "staffId") String staffId);
/**
* 给用户发放签订合同成功奖励-TCC-commit方法
*
* @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
* @return
*/
boolean awardForSignContractCommit(BusinessActionContext context);
/**
* 给用户发放签订合同成功奖励-TCC-rollback方法(与prepare方法相反的操作)
*
* @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
* @return
*/
boolean awardForSignContractRollBack(BusinessActionContext context);
}
@Slf4j
@Service
public class StaffTccServiceImpl implements StaffTccService {
@Autowired
private StaffMapper staffMapper;
/**
* 给用户发放签订合同成功奖励-TCC-prepare方法
*
* @param staffId 员工id
* @return
*/
@Override
public void awardForSignContractPrepare(String staffId) {
log.info("--------->XID =" + RootContext.getXID() + " 员工服务prepare操作准备执行!");
Staff staff = staffMapper.selectById(staffId);
if (staff == null) {
throw new RuntimeException("员工不存在");
}
// 幂等性校验
if (StringUtil.isNotBlank(staff.getDescription())) {
return;
}
// 更新员工描述为已签订合同
staff.setDescription("已成功签订合同");
staff.setUpdateTime(new Date());
staffMapper.updateById(staff);
log.info("--------->XID =" + RootContext.getXID() + " 员工服务prepare成功!");
}
/**
* 给用户发放签订合同成功奖励-TCC-commit方法
*
* @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
* @return
*/
@Override
public boolean awardForSignContractCommit(BusinessActionContext context) {
log.info("--------->XID =" + RootContext.getXID() + " 员工服务commit操作准备执行!");
// 员工名称
String staffId = context.getActionContext("staffId").toString();
Staff staff = staffMapper.selectById(staffId);
if (staff == null) {
throw new RuntimeException("员工不存在");
}
if (StringUtil.isNotBlank(staff.getDescription())) {
log.info("--------->XID =" + RootContext.getXID() + " 员工服务commit成功!");
}
return StringUtil.isNotBlank(staff.getDescription());
}
/**
* 给用户发放签订合同成功奖励-TCC-rollback方法(与prepare方法相反的操作)
*
* @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
* @return
*/
@Override
public boolean awardForSignContractRollBack(BusinessActionContext context) {
log.info("--------->XID =" + RootContext.getXID() + " 员工服务rollback操作准备执行!");
// 员工名称
String staffId = context.getActionContext("staffId").toString();
Staff staff = staffMapper.selectById(staffId);
if (staff == null) {
throw new RuntimeException("员工不存在");
}
// 幂等性校验+空回滚校验
if (StringUtil.isBlank(staff.getDescription())) {
return true;
}
// 更新员工描述为""
staff.setDescription("");
staff.setUpdateTime(new Date());
staffMapper.updateById(staff);
log.info("--------->XID =" + RootContext.getXID() + " 员工服务rollback成功!");
return true;
}
}
7.账户服务FeignClient
@FeignClient(value = "hos-account-service", path = "/account")
public interface AccountFeignClient {
/**
* 添加账户余额-TCC模式
*
* @param staffId 员工id
* @param addAmount 增加金额
*/
@PostMapping("/addAccountAmountTcc")
BaseResponse addAccountAmountTcc(@RequestParam String staffId, @RequestParam BigDecimal addAmount);
}
8.账户服务controller
@Slf4j
@RestController
@RequestMapping("/account")
@Api(value = "账户管理")
public class AccountController extends BaseController<Account> {
@Autowired
private AccountTccService accountTccService;
/**
* 添加账户余额-TCC模式
*
* @param staffId 员工id
* @param addAmount 增加金额
* @return 操作账户
*/
@PostMapping("/addAccountAmountTcc")
public BaseResponse addAccountAmountTcc(@RequestParam("staffId") String staffId, @RequestParam("addAmount") BigDecimal addAmount) {
accountTccService.addAccountAmountPrepare(staffId, addAmount);
return BaseResponse.success();
}
}
9.账户服务service接口及实现类
@LocalTCC
public interface AccountTccService {
/**
* 给账户增加余额-TCC-prepare方法
*
* @param staffId 员工id
* @param addAmount 增加的金额
* @return
*/
@TwoPhaseBusinessAction(name = "addAccountAmountPrepare", commitMethod = "addAccountAmountCommit", rollbackMethod = "addAccountAmountRollBack")
void addAccountAmountPrepare(
@BusinessActionContextParameter(paramName = "staffId") String staffId,
@BusinessActionContextParameter(paramName = "addAmount") BigDecimal addAmount);
/**
* 给账户增加余额-TCC-commit方法
*
* @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
* @return
*/
boolean addAccountAmountCommit(BusinessActionContext context);
/**
* 给账户增加余额-TCC-rollback方法(与prepare方法相反的操作)
*
* @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
* @return
*/
boolean addAccountAmountRollBack(BusinessActionContext context);
}
@Slf4j
@Service
public class AccountTccServiceImpl implements AccountTccService {
@Autowired
private AccountMapper accountMapper;
/**
* 给账户增加余额-TCC-prepare方法
*
* @param staffId 员工id
* @param addAmount 增加的金额
* @return
*/
@Override
public void addAccountAmountPrepare(String staffId, BigDecimal addAmount) {
log.info("--------->XID =" + RootContext.getXID() + " 账户服务prepare操作准备执行!");
Account account = accountMapper.selectOne(Wrappers.<Account>lambdaQuery().eq(Account::getStaffId, staffId));
if (account == null) {
throw new RuntimeException("待操作账户不存在");
}
//幂等校验,账户状态为正常状态
if (account.getStatus() == 1) {
// 冻结账户
account.setStatus(2);
// 给账户增加金额
account.setAmount(account.getAmount().add(addAmount));
account.setUpdateTime(new Date());
accountMapper.updateById(account);
}
//todo 模拟失败
//int i = 10/0;
log.info("--------->XID =" + RootContext.getXID() + " 账户服务prepare成功!");
}
/**
* 给账户增加余额-TCC-commit方法
*
* @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
* @return
*/
@Override
public boolean addAccountAmountCommit(BusinessActionContext context) {
log.info("--------->XID =" + RootContext.getXID() + " 账户服务commit操作准备执行!");
// 员工id
String staffId = context.getActionContext("staffId").toString();
Account account = accountMapper.selectOne(Wrappers.<Account>lambdaQuery().eq(Account::getStaffId, staffId));
if (account == null) {
throw new RuntimeException("待操作账户不存在");
}
// 幂等校验,判断账户如果处在非冻结状态(当前方法重复执行)则直接返回
if (account.getStatus() != 2) {
return true;
}
// 更新账户状态为正常
account.setStatus(1);
account.setUpdateTime(new Date());
accountMapper.updateById(account);
log.info("--------->XID =" + context.getXid() + " 账户服务commit成功!");
return true;
}
/**
* 给账户增加余额-TCC-rollback方法(与prepare方法相反的操作)
*
* @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
* @return
*/
@Override
public boolean addAccountAmountRollBack(BusinessActionContext context) {
log.info("--------->XID =" + RootContext.getXID() + " 账户服务rollback操作准备执行!");
// 员工id
String staffId = context.getActionContext("staffId").toString();
// 奖励金额
BigDecimal addAmount = new BigDecimal(context.getActionContext("addAmount").toString());
Account account = accountMapper.selectOne(Wrappers.<Account>lambdaQuery().eq(Account::getStaffId, staffId));
if (account == null) {
throw new RuntimeException("待操作账户不存在");
}
// 幂等+空回滚校验,判断账户如果处在非冻结状态(没有执行prepare方法或当前方法重复执行)则直接返回
if (account.getStatus() != 2) {
return true;
}
// 更新账户状态为正常
account.setStatus(1);
// 给账户减去奖励金额
account.setAmount(account.getAmount().subtract(addAmount));
account.setUpdateTime(new Date());
accountMapper.updateById(account);
log.info("--------->XID =" + context.getXid() + " 账户服务rollback成功!");
return true;
}
}
10.启动三个业务服务及网关服务,正常请求接口
http://localhost:7100/contract/contract/signContractTcc?staffId=e2ab1960cd737111154d46878b5bffdd&name=2022年北京大厦建筑合同&signer=黎明&amount=10000.50
method:POST
接口响应结果
{
"code": "200",
"msg": "success",
"data": null,
"success": true
}
数据库结果,数据已经正确入库
seata日志显示全局事务提交成功
11.模拟失败情况
修改AccountTccServiceImpl
,放开//int i = 10/0
前的注释,清空之前测试数据,再次发起请求。
接口响应结果
{
"code": "99001009",
"msg": "业务处理异常",
"data": null,
"success": false
}
seata日志显示全局回滚成功
业务服务日志
数据库效果
数据没有入库,进行了回滚,至此TCC模式示例演示完成。
# XA模式
# XA模式介绍
运行机制
利用事务资源(数据库、消息服务等)对 XA 协议的支持,以 XA 协议的机制来管理分支事务的一种事务模式。执行阶段业务 SQL 操作放在 XA 分支中进行,完成后执行 XA prepare,由资源对 XA 协议的支持来保证可回滚和持久化;完成阶段执行 XA 分支的提交和回滚操作。
适用场景:
适用于对数据一致性要求较高的场景,要求数据库能够支持 XA 协议(Mysql和Oracle都支持)。
优势:
业务无侵入,使用简单(使用注解即可);对数据一致性有较高的保障。
劣势:
数据在整个事务处理过程结束前都被锁定,锁的粒度大导致可能锁定更多无辜数据,并且由于协议阻塞容易产生死锁,性能较差。
# 服务端配置
配置流程同AT模式
# 客户端配置
1.各服务添加seata相关配置
#seata相关配置
seata:
#是否开启spring-boot自动装配,默认true
enabled: true
#是否开启数据源自动代理,默认true,如果使用多租户动态数据源功能则需关闭此按钮
enableAutoDataSourceProxy: true
#数据源代理模式 可选值AT、XA,默认为AT
data-source-proxy-mode: XA
#事务分组配置项,程序会拼接[service.vgroupMapping.事务分组配置项]这样一个完整配置去配置中心上的seata-server
#配置文件seataServer.properties中查找该完整配置的值,其值就是TC集群的名称,然后就可以根据集群名称获取真实的
#TC服务列表。所以要求该配置项必须与seataServer.properties中[service.vgroupMapping.xxx=集群名称]的xxx相同
tx-service-group: my_test_tx_group
registry:
type: nacos
nacos:
#seata服务端应用名称,与registry.conf中保持一致
application: seata-server
#seata服务端在注册中心上的地址,与registry.conf中保持一致
server-addr: 114.242.246.250:8040
#seata服务端在注册中心上的分组,与registry.conf中保持一致
group : SEATA_GROUP
#seata服务端在注册中心上的命名空间,与registry.conf中保持一致
namespace: 928cc136-e6e5-4189-a164-9693bc5618c1
#seata服务端所属注册中心用户名,与registry.conf中保持一致
username: nacos
#seata服务端所属注册中心密码,与registry.conf中保持一致
password: nacos
#TC集群名称,与registry.conf中保持一致
cluster: default
config:
type: nacos
nacos:
#seata服务端在配置中心的地址,与registry.conf中保持一致
server-addr: 114.242.246.250:8040
#seata服务端在配置中心的分组,与registry.conf中保持一致
group : SEATA_GROUP
#seata服务端在配置中心的命名空间,与registry.conf中保持一致
namespace: 928cc136-e6e5-4189-a164-9693bc5618c1
#seata服务端在配置中心的配置文件名称,与registry.conf中保持一致
dataId: seataServer.properties
#seata服务端所属配置中心用户名,与registry.conf中保持一致
username: nacos
#seata服务端所属配置中心密码,与registry.conf中保持一致
password: nacos
2.关闭 Ribbon 的重试机制
# v4.3 之前版本需设置,之后 ribbon 已经移除,无需设置
ribbon:
MaxAutoRetriesNextServer: 0
为什么要关闭服务调用的重试?远程业务调用失败有两种可能:(1)远程业务执行失败 (2)远程业务执行成功,网络失败。对于第2种事务场景可能会进行重试,从而导致某个业务执行两次。如果业务上能够控制某个事务接口的幂等性,则不用关闭重试。
3.添加seata依赖
<dependency>
<groupId>com.mediway.hos</groupId>
<artifactId>hos-framework-seata-starter</artifactId>
</dependency>
4.发起方服务的方法上添加@GlobalTransactional注解。
# 注意事项
- 注意事项参考AT模式
# XA模式使用示例
代码示例同AT模式一致
1.启动三个业务服务及网关服务,正常请求接口
http://localhost:7100/contract/contract/signContract?staffId=e2ab1960cd737111154d46878b5bccbb&name=20亿超级订单合同&signer=丽丽&amount=300.00
method:POST
接口响应结果
{
"code": "200",
"msg": "success",
"data": {
"id": "723dd2c05959a9011265c32f2c3952b2",
"createTime": "2022-06-06 16:45:14",
"updateTime": "2022-06-06 16:45:14",
"current": 0,
"size": 0,
"name": "20亿超级订单合同",
"signer": "丽丽",
"tenantId": null,
"isDeleted": 0,
"remark": null
},
"success": true
}
数据库结果,数据已经正确入库
seata日志显示全局事务提交成功
2.模拟失败情况
修改AccountServiceImpl
,放开//int i = 10/0
前的注释,清空之前测试数据,再次发起请求。
接口响应结果
{
"code": "99001009",
"msg": "业务处理异常",
"data": null,
"success": false
}
seata日志显示全局回滚成功
业务服务日志
数据库效果
数据没有入库,进行了回滚,至此XA模式示例演示完成。
# 与多数据源结合使用
# 概述
seata可以和多数据源结合使用,能够对租户绑定的数据源所产生的分布式事务进行控制。大体流程为当请求到达服务时,根据当前租户绑定的数据源进行切换,seata对切换后的数据源进行代理,实现分布式事务的控制。需要注意的是只有AT
模式和XA
模式支持与多数据源结合使用,TCC模式结合多数据源使用时应用程序会直接报错,这一点需要注意。
# 服务端配置
配置流程同普通AT模式一致
# 客户端配置
1.租户绑定数据源,具体流程可参考数据源隔离章节
2.在租户绑定的数据源中初始化seata脚本(AT模式需要)
- Mysql数据库
CREATE TABLE `undo_log` (
`branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',
`xid` varchar(128) NOT NULL COMMENT 'global transaction id',
`context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` longblob NOT NULL COMMENT 'rollback info',
`log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` datetime(6) NOT NULL COMMENT 'create datetime',
`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='AT transaction mode undo table';
- Oracle数据库
CREATE TABLE undo_log
(
id NUMBER(19) NOT NULL,
branch_id NUMBER(19) NOT NULL,
xid VARCHAR2(128) NOT NULL,
context VARCHAR2(128) NOT NULL,
rollback_info BLOB NOT NULL,
log_status NUMBER(10) NOT NULL,
log_created TIMESTAMP(0) NOT NULL,
log_modified TIMESTAMP(0) NOT NULL,
PRIMARY KEY (id),
CONSTRAINT ux_undo_log UNIQUE (xid, branch_id)
);
COMMENT ON TABLE undo_log IS 'AT transaction mode undo table';
-- Generate ID using sequence and trigger
CREATE SEQUENCE UNDO_LOG_SEQ START WITH 1 INCREMENT BY 1;
3.各服务添加seata与多数据源相关配置
#多数据源相关配置
spring:
datasource:
dynamic:
#是否开启seata支持
seata: true
#使用seata事务模式,支持AT/XA,默认为AT模式
seata-mode: AT
#多租户相关配置
framework:
multi-tenant:
#多租户开关,默认关闭
enable: true
#租户条件列名,默认为tenant_id
column: tenant_id
#忽略表名,默认为空,多个以中划线分隔
ignore-table:
#- staff
#- contract
#- organization
#动态数据源开关,默认关闭
dynamic-datasource: true
#seata相关配置
seata:
#是否开启spring-boot自动装配,默认true
enabled: true
#是否开启数据源自动代理,默认true,如果使用多租户动态数据源功能则需关闭此按钮
enableAutoDataSourceProxy: false
#数据源代理模式 可选值AT、XA(与多数据源结合使用也仅支持这两种模式),默认为AT
data-source-proxy-mode: AT
#事务分组配置项,程序会拼接[service.vgroupMapping.事务分组配置项]这样一个完整配置去配置中心上的seata-server
#配置文件seataServer.properties中查找该完整配置的值,其值就是TC集群的名称,然后就可以根据集群名称获取真实的
#TC服务列表。所以要求该配置项必须与seataServer.properties中[service.vgroupMapping.xxx=集群名称]的xxx相同
tx-service-group: my_test_tx_group
registry:
type: nacos
nacos:
#seata服务端应用名称,与registry.conf中保持一致
application: seata-server
#seata服务端在注册中心上的地址,与registry.conf中保持一致
server-addr: 114.242.246.250:8040
#seata服务端在注册中心上的分组,与registry.conf中保持一致
group : SEATA_GROUP
#seata服务端在注册中心上的命名空间,与registry.conf中保持一致
namespace: 928cc136-e6e5-4189-a164-9693bc5618c1
#seata服务端所属注册中心用户名,与registry.conf中保持一致
username: nacos
#seata服务端所属注册中心密码,与registry.conf中保持一致
password: nacos
#TC集群名称,与registry.conf中保持一致
cluster: default
config:
type: nacos
nacos:
#seata服务端在配置中心的地址,与registry.conf中保持一致
server-addr: 114.242.246.250:8040
#seata服务端在配置中心的分组,与registry.conf中保持一致
group : SEATA_GROUP
#seata服务端在配置中心的命名空间,与registry.conf中保持一致
namespace: 928cc136-e6e5-4189-a164-9693bc5618c1
#seata服务端在配置中心的配置文件名称,与registry.conf中保持一致
dataId: seataServer.properties
#seata服务端所属配置中心用户名,与registry.conf中保持一致
username: nacos
#seata服务端所属配置中心密码,与registry.conf中保持一致
password: nacos
# 关闭ribbon失败重试,v4.3 之前版本需设置,之后 ribbon 已经移除,无需设置
ribbon:
MaxAutoRetriesNextServer: 0
4.各服务添加seata与多数据源依赖
<dependency>
<groupId>com.mediway.hos</groupId>
<artifactId>hos-framework-seata-starter</artifactId>
</dependency>
<dependency>
<groupId>com.mediway.hos</groupId>
<artifactId>hos-framework-tenant-starter</artifactId>
</dependency>
下面步骤5、6、7、8皆是多租户模块hos-framework-tenant-starter
所需配置,如之前服务已引入过该模块并对步骤5、6、7、8进行过实现,则此处无需再次配置。
5.定义租户ID过滤器(内容仅供参考)
@WebFilter(urlPatterns = "/*")
public class TenantIdFilter implements Filter {
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest req = (HttpServletRequest) servletRequest;
String tenantId = req.getHeader("tenantId");
if (StringUtil.isNotBlank(tenantId)) {
TenantUtil.setTenantId(tenantId);
}
filterChain.doFilter(servletRequest, servletResponse);
}
@Override
public void destroy() {
}
}
应用程序需要事先定义好过滤器将租户id从请求header中取出放到上下文中,在单体版只需拦截单个应用放入一次即可,而微服务版每个服务都要将租户id放入上下文,所以需要定义公共的过滤器被所有微服务引用。
6.各服务启动类添加注解@ServletComponentScan
使定义的过滤器生效
7.定义租户实体类Tenant
(内容仅供参考,但是必须含有数据源ID字段)
@Data
public class Tenant {
private String id;
private String tenantName;
private String datasourceId;
private Date createTime;
private String updateTime;
private Integer isDeleted;
}
8.定义DataSourceIdProvider
(内容仅供参考)
@Component
public class DemoDataSourceIdProvider implements DataSourceIdProvider {
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* 根据租户id获取数据源id
*
* @param tenantId 租户id
* @return 数据源id
*/
@Override
public String getDataSourceId(String tenantId) throws Exception {
//查询sql
String queryDataSourceByIdSql = "SELECT datasource_id AS datasourceId FROM tenant WHERE id = ?";
List<Tenant> dataSources = this.jdbcTemplate.query(queryDataSourceByIdSql, new String[]{tenantId}, new BeanPropertyRowMapper(Tenant.class));
if (CollectionUtil.isEmpty(dataSources)) {
return null;
}
return dataSources.get(0).getDatasourceId();
}
}
9.发起方服务的方法上添加@GlobalTransactional注解
# 注意事项
各个微服务的主数据源(配置在nacos中的数据源)应该保持一致,数据源表
t_datasource
及租户表tenant
都存于主数据源中。异常需要能够保证被发起方服务感知到,如果发生异常但中途被捕获截断则 seata 无法发起回滚操作。
# 使用示例
我们以租户请求合同签订接口为例演示seata的AT模式结合多数据源如何使用。
1.数据准备
- 准备两个数据源,作为租户绑定数据源,数据如下
- 准备两个租户,租户1绑定数据源1,租户2绑定数据源2,数据如下
- 在租户数据源1、数据源2中分别新建
undo_log
表,结果如下
- 数据源1中租户1的业务数据如下
- 数据源2中租户2的业务数据如下
2.配置
按照客户端配置模块配置即可。
3.代码示例
同AT模式使用示例代码一致。
4.启动三个业务服务及网关服务,我们以租户1的身份来正常请求接口
http://localhost:7100/contract/contract/signContract?staffId=e2ab1960cd737111154d46878b5bccbb&name=20亿超级订单合同&signer=丽丽&amount=300.00
header:tenantId-111111
method:POST
接口响应结果
{
"code": "200",
"msg": "success",
"data": {
"id": "4953ed1abbc55de0d3410916e5a7d720",
"createTime": "2022-06-07 14:20:24",
"updateTime": "2022-06-07 14:20:24",
"current": 0,
"size": 0,
"name": "20亿超级订单合同",
"signer": "丽丽",
"tenantId": null,
"isDeleted": 0,
"remark": null
},
"success": true
}
租户1绑定的Mysql数据库demo_dynamic
结果显示数据已经正确入库
seata日志显示AT模式全局事务提交成功
应用程序日志显示确实使用了数据源demo_dynamic
,同时也开启了多租户的字段隔离机制
我们换租户2来请求其绑定的数据源看下效果
http://localhost:7100/contract/contract/signContract?staffId=e2ab1960cd737111154d46878b5bffdd&name=粮食订单合同&signer=黎明&amount=12000.00
header:tenantId-222222
method:POST
接口响应结果
{
"code": "200",
"msg": "success",
"data": {
"id": "0d423ce4917b1782c36dd460168c0185",
"createTime": "2022-06-07 14:35:52",
"updateTime": "2022-06-07 14:35:52",
"current": 0,
"size": 0,
"name": "粮食订单合同",
"signer": "黎明",
"tenantId": null,
"isDeleted": 0,
"remark": null
},
"success": true
}
租户2绑定的Oracle数据库HOS
结果显示数据已经正确入库
seata日志显示AT模式全局事务提交成功
应用程序日志显示确实使用了数据源HOS
,同时也开启了多租户的字段隔离机制
我们发现seata的AT模式与多数据源结合使用时正向流程是没有问题的,下面演示下异常情况,验证事务是否会回滚。
5.模拟失败情况
修改AccountServiceImpl
,放开//int i = 10/0
前的注释,清空之前测试数据,再次以租户1发起请求
http://localhost:7100/contract/contract/signContract?staffId=e2ab1960cd737111154d46878b5bccbb&name=20亿超级订单合同&signer=丽丽&amount=300.00
header:tenantId-111111
method:POST
接口响应结果
{
"code": "500",
"msg": "[给用户发放签订合同成功奖励]失败",
"data": null,
"success": false
}
seata日志显示全局回滚成功
业务服务日志
数据库效果
数据没有入库,进行了回滚,我们再以租户2请求一次
http://localhost:7100/contract/contract/signContract?staffId=e2ab1960cd737111154d46878b5bffdd&name=粮食订单合同&signer=黎明&amount=12000.00
header:tenantId-222222
method:POST
接口响应结果
{
"code": "500",
"msg": "[给用户发放签订合同成功奖励]失败",
"data": null,
"success": false
}
seata日志显示全局回滚成功
业务服务日志
数据库效果
我们发现无论在Mysql数据库下还是Oracle数据库下,seata的AT模式都是支持与多数据源结合使用的,至于XA模式大家可以自己尝试下效果。