Atomikos是一个轻量级的分布式事务管理器,实现了Java Transaction API (JTA)规范,可以很方便的和Spring Boot集成,支持微服务场景下跨节点的全局事务。
本文为一个微服务的示例应用,通过引入Atomikos
增加全局事务能力。
示例代码可以在这里查看。
用户访问Business
服务,它通过RPC
调用分别调用Order
和Storage
创建订单和减库存。三个服务需要加入到一个全局事务中,要么全部成功,任何一个服务失败,都会造成事务回滚,数据的状态始终保持一致性。
蚂蚁金服开源的Seata就是为了解决这类问题,在微服务架构下提供分布式事务服务。传统的应用服务器通过JTA/JTS
也能解决分布式场景下的事务问题,但需要和EJB
绑定在一起才能使用。Atomikos
是一个独立的分布式事务管理器,原先是为Spring
和Tomcat
提供事务服务,让用户不必只为了事务服务而引入应用服务器。
现在Atomikos
也能为微服务提供分布式事务服务,这时主要需要两个问题:
- 事务上下文如何通过RPC在服务间传播
- 微服务如何参与进两阶段提交协议的过程
后面会结合示例应用介绍Atomikos
是如何解决这两个问题。示例应用atomkos-sample
的结构如下:
- api:定义了服务接口
OrderService
和StorageService
- order-service:
OrderService
的具体实现
- storage-service:
StorageService
的具体实现
- business-service:用户访问入口
事务上下文的传播
在项目主工程的pom文件中引入Atomikos
依赖,注意要包括transactions-remoting
,正是它才能让事务上下文在RPC
调用时传递。
1
2
3
4
5
|
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-remoting</artifactId>
<version>5.0.6</version>
</dependency>
|
transactions-remoting
支持jaxrs
,Spring Remoting
和Spring rest
等几种RPC
方式,我们使用的是Spring Remoting
。
以order-service为例,通过TransactionalHttpInvokerServiceExporter
将OrderService
发布为远程服务:
1
2
3
4
5
6
7
|
@Bean(name = "/services/order")
TransactionalHttpInvokerServiceExporter orderService(OrderServiceImpl orderService) {
TransactionalHttpInvokerServiceExporter exporter = new TransactionalHttpInvokerServiceExporter();
exporter.setService(orderService);
exporter.setServiceInterface(OrderService.class);
return exporter;
}
|
OrderService
的调用者business-service使用HttpInvokerProxyFactoryBean
引入远程服务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@Bean
public HttpInvokerProxyFactoryBean orderService() {
HttpInvokerProxyFactoryBean orderService = new HttpInvokerProxyFactoryBean();
orderService.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor());
orderService.setServiceUrl("http://localhost:8082/services/order");
orderService.setServiceInterface(OrderService.class);
return orderService;
}
@Bean
public TransactionalHttpInvokerRequestExecutor httpInvokerRequestExecutor() {
TransactionalHttpInvokerRequestExecutor httpInvokerRequestExecutor = new TransactionalHttpInvokerRequestExecutor();
return httpInvokerRequestExecutor;
}
|
business-service负责发起全局事务,它使用Spring
标准的@Transactional
标记方法开启事务:
1
2
3
4
5
|
@Transactional
public void createOrder(String userId, String commodityCode, Integer count) {
orderService.create(userId, commodityCode, count);
storageService.deduct(commodityCode, count);
}
|
Atomikos
提供了TransactionalHttpInvokerRequestExecutor
和TransactionalHttpInvokerServiceExporter
拦截请求和响应,利用HTTP header
传递事务上下文。
business-service在调用远程服务OrderService
时,请求发送前会经过TransactionalHttpInvokerRequestExecutor.prepareConnection处理,增加HTTP header
,携带事务上下文:
1
2
3
4
5
6
7
|
@Override
protected void prepareConnection(HttpURLConnection con, int contentLength)
throws IOException {
String propagation = template.onOutgoingRequest();
con.setRequestProperty(HeaderNames.PROPAGATION_HEADER_NAME, propagation);
super.prepareConnection(con, contentLength);
}
|
OrderService
会使用TransactionalHttpInvokerServiceExporter.decorateInputStream进行请求拦截,能从HTTP header
中解析出事务上下文:
1
2
3
4
5
6
7
8
9
10
11
|
@Override
protected InputStream decorateInputStream(HttpServletRequest request, InputStream is) throws IOException {
try {
String propagation = request.getHeader(HeaderNames.PROPAGATION_HEADER_NAME);
template.onIncomingRequest(propagation);
} catch (IllegalArgumentException e) {
...
}
return super.decorateInputStream(request, is);
}
|
OrderService
处理完成返回响应时,会将该节点加入全局事务包装成Event
,放入HTTP header
返回给business-service:
1
2
3
4
5
6
7
8
9
10
11
12
|
@Override
protected OutputStream decorateOutputStream(HttpServletRequest request, HttpServletResponse response,
OutputStream os) throws IOException {
...
response.addHeader(HeaderNames.EXTENT_HEADER_NAME, extent);
...
return super.decorateOutputStream(request, response, os);
}
|
business-service接收到响应,利用TransactionalHttpInvokerRequestExecutor.validateResponse解析出Event
,注册进事务管理器,这样在全局事务提交时,可以让该分支参与到两阶段提交协议:
1
2
3
4
5
6
7
|
@Override
protected void validateResponse(HttpInvokerClientConfiguration config,
HttpURLConnection con) throws IOException {
super.validateResponse(config, con);
String extent = con.getHeaderField(HeaderNames.EXTENT_HEADER_NAME);
template.onIncomingResponse(extent);
}
|
两阶段提交过程
在处理RPC
调用的响应时,Atomikos
会将参与到全局事务的远程节点注册为Participants
(Extent.addRemoteParticipants),在事务提交时,所有的Participants
都会参与到两阶段提交:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
synchronized ( fsm_ ) {
if ( commit ) {
if ( participants_.size () <= 1 ) {
commit ( true );
} else {
int prepareResult = prepare ();
// make sure to only do commit if NOT read only
if ( prepareResult != Participant.READ_ONLY )
commit ( false );
}
} else {
rollback ();
}
|
可以看出,如果Participants
大于1,会走prepare
和commit
两阶段提交的完整过程。那么OrderService
和StorageService
如何参与进两阶段提交呢?
Atomikos
提供了REST
入口com.atomikos.remoting.twopc.AtomikosRestPort,你可以将AtomikosRestPort
注册到JAX-RS
,例如本示例选择的是Apache CFX,在application.properties
进行配置:
1
2
3
|
cxf.path=/api
cxf.jaxrs.classes-scan=true
cxf.jaxrs.classes-scan-packages=com.atomikos.remoting.twopc
|
business-service在进行全局事务提交时,会访问所有Participants
相应的REST
接口进行两阶段提交:
business-service是怎么知道AtomikosRestPort
的访问地址的呢?上面提到了,business-service在访问OrderService
时,返回的响应header
中包含了Event
,地址就随着Event
返回给了调用者。AtomikosRestPort
的访问地址配置在jta.properties
中:
1
|
com.atomikos.icatch.rest_port_url=http://localhost:8082/api/atomikos
|
至此,我们解释清楚了Atomikos
如何为微服务提供分布式事务服务的,主要解决了两个问题:事务上下文如何通过RPC在服务间传播,以及微服务如何参与进两阶段提交协议的过程。
下一步我准备为Atomikos
增加dubbo的支持,即事务上下文可以通过dubbo
进行传播。