Spring Cloud-Seata处理分布式事务问题
什么是分布式事务问题
在微服务架构中由于全局数据一致性没法保证产生的问题就是分布式事务问题。简单来说,一次业务操作需要操作多个数据源或需要进行远程调用,就会产生分布式事务问题。
Seata简介
Seata是Alibaba开源的一款分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务,本文将通过一个简单的下单业务场景来对其用法进行详细介绍。
Seata详细介绍参见以下地址:
https://whwtree.com/archives/seata-introduce.html
seata-server的安装与配置
我们先从官网下载seata-server,这里下载的是:seata-server-1.0.0.zip
下载地址:
https://github.com/seata/seata/releases
下载后,解压seata-server安装包到指定目录,修改conf目录下的file.conf配置文件:
主要修改自定义事务组名称、事务日志存储模式为db以及数据库连接信息;
service {
#transaction service group mapping
#修改事务组名称为:my_test_tx_group,和客户端自定义的名称对应
vgroup_mapping.my_test_tx_group = "default"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#disable seata
disableGlobalTransaction = false
}
## transaction log store, only used in seata-server
store {
## store mode: file、db
#修改此处将事务信息存储到数据库中
mode = "db"
## file store property
file {
## store location dir
dir = "sessionStore"
}
## database store property
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
driver-class-name = "com.mysql.jdbc.Driver"
#修改数据库连接地址
url = "jdbc:mysql://127.0.0.1:3306/seata-server?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&allowMultiQueries=true&useSSL=false&tinyInt1isBit=false&serverTimezone=GMT%2B8"
#修改数据库用户名
user = "test"
#修改数据库密码
password = "123456"
}
}
由于我们使用了db模式存储事务日志,所以我们需要创建一个seata-server数据库,建表sql在seata-server/conf/db_store.sql。
修改conf目录下的registry.conf配置文件,指明注册中心为nacos,及修改nacos连接信息即可;
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos" #改为nacos
nacos {
serverAddr = "localhost:8848" #改为nacos的连接地址
namespace = ""
cluster = "default"
}
}
先启动Nacos,再使用seata-server中的/bin/seata-server.bat -p 8091启动seata-server。
数据库准备
创建业务数据库
- seata-order:存储订单的数据库;
- seata-storage:存储库存的数据库;
- seata-account:存储账户信息的数据库;
初始化业务表
order表
CREATE TABLE `order` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`user_id` bigint(11) DEFAULT NULL COMMENT '用户id',
`product_id` bigint(11) DEFAULT NULL COMMENT '产品id',
`count` int(11) DEFAULT NULL COMMENT '数量',
`money` decimal(11,0) DEFAULT NULL COMMENT '金额',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
ALTER TABLE `seata-order`.`order` ADD COLUMN `status` int(1) DEFAULT NULL COMMENT '订单状态:0:创建中;1:已完结' AFTER `money` ;
storage表
CREATE TABLE `storage` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`product_id` bigint(11) DEFAULT NULL COMMENT '产品id',
`total` int(11) DEFAULT NULL COMMENT '总库存',
`used` int(11) DEFAULT NULL COMMENT '已用库存',
`residue` int(11) DEFAULT NULL COMMENT '剩余库存',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
INSERT INTO `seata-storage`.`storage` (`id`, `product_id`, `total`, `used`, `residue`) VALUES ('1', '1', '100', '0', '100');
account表
CREATE TABLE `account` (
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
`user_id` bigint(11) DEFAULT NULL COMMENT '用户id',
`total` decimal(10,0) DEFAULT NULL COMMENT '总额度',
`used` decimal(10,0) DEFAULT NULL COMMENT '已用余额',
`residue` decimal(10,0) DEFAULT '0' COMMENT '剩余可用额度',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
INSERT INTO `seata-account`.`account` (`id`, `user_id`, `total`, `used`, `residue`) VALUES ('1', '1', '1000', '0', '1000');
创建日志回滚表
使用Seata还需要在每个数据库中创建日志表,建表sql在seata-server/conf/db_undo_log.sql中。
完整数据库示意图
制造一个分布式事务问题
这里我们创建三个服务:一个订单服务、一个库存服务、一个账户服务。当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存,再通过远程调用账户服务来扣减用户账户里面的余额,最后在订单服务中修改订单状态为已完成。
该操作跨越了三个数据库,有两次远程调用,很明显会有分布式事务问题。
客户端配置
seata-order-service
seata-storage-service
seata-account-service
三个seata的客户端配置大致相同,我们以seata-order-service配置为例:
修改application.yml配置文件,自定义事务组的名称:
spring:
application:
name: seata-order-service
cloud:
alibaba:
seata:
#自定义事务组名称需要与seata-server中的对应
tx-service-group: my_test_tx_group
添加并修改file.conf配置文件,主要是修改自定义事务组名称;
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
#thread factory for netty
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
boss-thread-size = 1
#auto default pin or 8
worker-thread-size = 8
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#vgroup->rgroup
vgroup_mapping.my_test_tx_group = "default"
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
disableGlobalTransaction = false
}
client {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
report.retry.count = 5
tm.commit.retry.count = 1
tm.rollback.retry.count = 1
}
transaction {
undo.data.validation = true
undo.log.serialization = "jackson"
undo.log.save.days = 7
#schedule delete expired undo_log in milliseconds
undo.log.delete.period = 86400000
undo.log.table = "undo_log"
}
support {
## spring
spring {
# auto proxy the DataSource bean
datasource.autoproxy = false
}
}
添加并修改registry.conf配置文件,主要是将注册中心改为Nacos;
registry {
# file 、nacos 、eureka、redis、zk
type = "nacos"
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6381"
db = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk
type = "file"
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
apollo {
app.id = "fescar-server"
apollo.meta = "http://192.168.1.204:8801"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
file {
name = "file.conf"
}
}
在启动类中取消数据源的自动创建:
@EnableFeignClients
@EnableDiscoveryClient
@MapperScan(basePackages = {"com.whw.springcloud.seataorderservice.mapper"})
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class SeataOrderServiceApplication {
public static void main(String[] args) {
SpringApplication.run(SeataOrderServiceApplication.class, args);
}
}
创建配置使用Seata对数据源进行代理:
@Configuration
public class DataSourceProxyConfig {
@Value("${mybatis.mapper-locations}")
private String mapperLocations;
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource() {
return new DruidDataSource();
}
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver()
.getResources(mapperLocations));
sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
return sqlSessionFactoryBean.getObject();
}
}
使用@GlobalTransactional注解开启分布式事务:
@Service
public class OrderServiceImpl implements OrderService {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderServiceImpl.class);
@Autowired
private OrderMapper orderMapper;
@Autowired
private StorageService storageService;
@Autowired
private AccountService accountService;
/**
* 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态
*/
@Override
@GlobalTransactional(name = "my-test-create-order",rollbackFor = Exception.class)
public void create(Order order) {
LOGGER.info("------->下单开始");
//本应用创建订单
orderMapper.create(order);
//远程调用库存服务扣减库存
LOGGER.info("------->order-service中扣减库存开始");
storageService.decrease(order.getProductId(),order.getCount());
LOGGER.info("------->order-service中扣减库存结束");
//远程调用账户服务扣减余额
LOGGER.info("------->order-service中扣减余额开始");
accountService.decrease(order.getUserId(),order.getMoney());
LOGGER.info("------->order-service中扣减余额结束");
//修改订单状态为已完成
LOGGER.info("------->order-service中修改订单状态开始");
orderMapper.update(order.getUserId(),0);
LOGGER.info("------->order-service中修改订单状态结束");
LOGGER.info("------->下单结束");
}
}
分布式事务功能演示
运行seata-order-service、seata-storage-service和seata-account-service三个服务;
seata-server和上面三个服务启动成功后,Nacos服务列表如下:
seata-server控制台界面如下:
数据库初始信息状态:
Seata分布式事务-正常提交情况
调用接口进行下单操作后查看数据库:
http://localhost:8180/order/create?userId=1&productId=1&count=10&money=100
返回信息:
{"data":null,"message":"订单创建成功!","code":200}
seata-order-service服务日志如下:
22:45:46.106 INFO 17312 - [nio-8180-exec-5] i.seata.tm.api.DefaultGlobalTransaction : Begin new global transaction [192.168.225.1:8091:2127515862]
22:45:46.106 INFO 17312 - [nio-8180-exec-5] c.w.s.s.OrderServiceImpl : ------->下单开始
22:45:46.142 INFO 17312 - [nio-8180-exec-5] c.w.s.s.OrderServiceImpl : ------->order-service中扣减库存开始
22:45:46.183 INFO 17312 - [nio-8180-exec-5] c.w.s.s.OrderServiceImpl : ------->order-service中扣减库存结束
22:45:46.184 INFO 17312 - [nio-8180-exec-5] c.w.s.s.OrderServiceImpl : ------->order-service中扣减余额开始
22:45:46.225 INFO 17312 - [nio-8180-exec-5] c.w.s.s.OrderServiceImpl : ------->order-service中扣减余额结束
22:45:46.225 INFO 17312 - [nio-8180-exec-5] c.w.s.s.OrderServiceImpl : ------->order-service中修改订单状态开始
22:45:46.257 INFO 17312 - [nio-8180-exec-5] c.w.s.s.OrderServiceImpl : ------->order-service中修改订单状态结束
22:45:46.257 INFO 17312 - [nio-8180-exec-5] c.w.s.s.OrderServiceImpl : ------->下单结束
22:45:46.280 INFO 17312 - [nio-8180-exec-5] i.seata.tm.api.DefaultGlobalTransaction : [192.168.225.1:8091:2127515862] commit status: Committed
22:45:46.987 INFO 17312 - [atch_RMROLE_1_8] i.s.core.rpc.netty.RmMessageListener : onMessage:xid=192.168.225.1:8091:2127515862,branchId=2127515864,branchType=AT,resourceId=jdbc:mysql://localhost:3306/seata-order,applicationData=null
22:45:46.987 INFO 17312 - [atch_RMROLE_1_8] io.seata.rm.AbstractRMHandler : Branch committing: 192.168.225.1:8091:2127515862 2127515864 jdbc:mysql://localhost:3306/seata-order null
22:45:46.987 INFO 17312 - [atch_RMROLE_1_8] io.seata.rm.AbstractRMHandler : Branch commit result: PhaseTwo_Committed
22:45:47.014 INFO 17312 - [atch_RMROLE_1_8] i.s.core.rpc.netty.RmMessageListener : onMessage:xid=192.168.225.1:8091:2127515862,branchId=2127515873,branchType=AT,resourceId=jdbc:mysql://localhost:3306/seata-order,applicationData=null
22:45:47.014 INFO 17312 - [atch_RMROLE_1_8] io.seata.rm.AbstractRMHandler : Branch committing: 192.168.225.1:8091:2127515862 2127515873 jdbc:mysql://localhost:3306/seata-order null
22:45:47.014 INFO 17312 - [atch_RMROLE_1_8] io.seata.rm.AbstractRMHandler : Branch commit result: PhaseTwo_Committed
数据库数据:
Seata分布式事务-超时回滚情况
我们在seata-account-service中制造一个超时异常后,调用下单接口:
@Service
public class AccountServiceImpl implements AccountService {
private static final Logger LOGGER = LoggerFactory.getLogger(AccountServiceImpl.class);
@Autowired
private AccountMapper accountMapper;
/**
* 扣减账户余额
*/
@Override
public void decrease(Long userId, BigDecimal money) {
LOGGER.info("------->account-service中扣减账户余额开始");
//模拟超时异常,全局事务回滚
try {
Thread.sleep(30*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
accountMapper.decrease(userId,money);
LOGGER.info("------->account-service中扣减账户余额结束");
}
}
此时,我们发现下单后数据库数据并没有任何改变;
seata-order-service服务日志如下:
23:19:18.545 INFO 17312 - [nio-8180-exec-7] i.seata.tm.api.DefaultGlobalTransaction : Begin new global transaction [192.168.225.1:8091:2127515923]
23:19:18.545 INFO 17312 - [nio-8180-exec-7] c.w.s.s.OrderServiceImpl : ------->下单开始
23:19:18.588 INFO 17312 - [nio-8180-exec-7] c.w.s.s.OrderServiceImpl : ------->order-service中扣减库存开始
23:19:18.637 INFO 17312 - [nio-8180-exec-7] c.w.s.s.OrderServiceImpl : ------->order-service中扣减库存结束
23:19:18.637 INFO 17312 - [nio-8180-exec-7] c.w.s.s.OrderServiceImpl : ------->order-service中扣减余额开始
23:19:19.743 INFO 17312 - [atch_RMROLE_1_8] i.s.core.rpc.netty.RmMessageListener : onMessage:xid=192.168.225.1:8091:2127515923,branchId=2127515925,branchType=AT,resourceId=jdbc:mysql://localhost:3306/seata-order,applicationData=null
23:19:19.743 INFO 17312 - [atch_RMROLE_1_8] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 192.168.225.1:8091:2127515923 2127515925 jdbc:mysql://localhost:3306/seata-order
23:19:19.754 INFO 17312 - [atch_RMROLE_1_8] i.s.r.d.undo.AbstractUndoLogManager : xid 192.168.225.1:8091:2127515923 branch 2127515925, undo_log deleted with GlobalFinished
23:19:19.755 INFO 17312 - [atch_RMROLE_1_8] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
23:19:19.784 INFO 17312 - [nio-8180-exec-7] i.seata.tm.api.DefaultGlobalTransaction : [192.168.225.1:8091:2127515923] rollback status: Rollbacked
23:19:19.786 ERROR 17312 - [nio-8180-exec-7] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is com.netflix.hystrix.exception.HystrixRuntimeException: AccountService#decrease(Long,BigDecimal) timed-out and no fallback available.] with root cause
java.util.concurrent.TimeoutException: null
at com.netflix.hystrix.AbstractCommand.handleTimeoutViaFallback(AbstractCommand.java:997) ~[hystrix-core-1.5.18.jar:1.5.18]
at com.netflix.hystrix.AbstractCommand.access$500(AbstractCommand.java:60) ~[hystrix-core-1.5.18.jar:1.5.18]
at com.netflix.hystrix.AbstractCommand$12.call(AbstractCommand.java:609) ~[hystrix-core-1.5.18.jar:1.5.18]
at com.netflix.hystrix.AbstractCommand$12.call(AbstractCommand.java:601) ~[hystrix-core-1.5.18.jar:1.5.18]
at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) ~[rxjava-1.3.8.jar:1.3.8]
at com.netflix.hystrix.AbstractCommand$HystrixObservableTimeoutOperator$1.run(AbstractCommand.java:1142) ~[hystrix-core-1.5.18.jar:1.5.18]
at com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable$1.call(HystrixContextRunnable.java:41) ~[hystrix-core-1.5.18.jar:1.5.18]
at com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable$1.call(HystrixContextRunnable.java:37) ~[hystrix-core-1.5.18.jar:1.5.18]
at com.alibaba.cloud.seata.feign.hystrix.SeataHystrixConcurrencyStrategy$SeataContextCallable.call(SeataHystrixConcurrencyStrategy.java:72) ~[spring-cloud-alibaba-seata-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable.run(HystrixContextRunnable.java:57) ~[hystrix-core-1.5.18.jar:1.5.18]
at com.netflix.hystrix.AbstractCommand$HystrixObservableTimeoutOperator$2.tick(AbstractCommand.java:1159) ~[hystrix-core-1.5.18.jar:1.5.18]
at com.netflix.hystrix.util.HystrixTimer$1.run(HystrixTimer.java:99) ~[hystrix-core-1.5.18.jar:1.5.18]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_181]
at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308) ~[na:1.8.0_181]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java) ~[na:1.8.0_181]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_181]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[na:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]
从日志可以看出,整个全局事务中,由于扣减余额出现超时异常,导致创建订单、扣减库存分支事务状态回滚,最终整个全局事务回滚。故Seata保证了全局数据的一致性。
去掉Seata分布式事务-超时异常情况
我们可以在seata-order-service中注释掉@GlobalTransactional,然后重启seata-order-service服务,来看看没有Seata的分布式事务管理会发生什么情况:
由于seata-account-service的超时会导致当库存和账户金额扣减后订单状态并没有设置为已完成,而且由于远程调用的重试机制,账户余额还会被多次扣减。
没有Seata分布式事务时,出现了全局数据不一致的问题,所以,从以上对比可看出,Seata分布式事务很好的解决了微服务架构中的全局数据不一致的问题。
使用到的模块:
springcloud-learning
├── seata-order-service -- 整合了seata的订单服务
├── seata-storage-service -- 整合了seata的库存服务
└── seata-account-service -- 整合了seata的账户服务
原文地址:https://blog.csdn.net/ThinkWon/article/details/103786102
(完)