Atomikos在微服务场景下的使用

Atomikos是一个轻量级的分布式事务管理器,实现了Java Transaction API (JTA)规范,可以很方便的和Spring Boot集成,支持微服务场景下跨节点的全局事务。

本文为一个微服务的示例应用,通过引入Atomikos增加全局事务能力。

示例代码可以在这里查看。

demo-services

用户访问Business服务,它通过RPC调用分别调用OrderStorage创建订单和减库存。三个服务需要加入到一个全局事务中,要么全部成功,任何一个服务失败,都会造成事务回滚,数据的状态始终保持一致性。

蚂蚁金服开源的Seata就是为了解决这类问题,在微服务架构下提供分布式事务服务。传统的应用服务器通过JTA/JTS也能解决分布式场景下的事务问题,但需要和EJB绑定在一起才能使用。Atomikos是一个独立的分布式事务管理器,原先是为SpringTomcat提供事务服务,让用户不必只为了事务服务而引入应用服务器。

现在Atomikos也能为微服务提供分布式事务服务,这时主要需要两个问题:

  1. 事务上下文如何通过RPC在服务间传播
  2. 微服务如何参与进两阶段提交协议的过程

后面会结合示例应用介绍Atomikos是如何解决这两个问题。示例应用atomkos-sample的结构如下:

  1. api:定义了服务接口OrderServiceStorageService
  2. order-serviceOrderService的具体实现
  3. storage-serviceStorageService的具体实现
  4. business-service:用户访问入口

# 事务上下文的传播

在项目主工程的pom文件中引入Atomikos依赖,注意要包括transactions-remoting,正是它才能让事务上下文在RPC调用时传递。

1
2
3
4
5
<dependency>
    <groupId>com.atomikos</groupId>
    <artifactId>transactions-remoting</artifactId>
    <version>5.0.6</version>
</dependency>

transactions-remoting支持jaxrsSpring RemotingSpring rest等几种RPC方式,我们使用的是Spring Remoting

order-service为例,通过TransactionalHttpInvokerServiceExporterOrderService发布为远程服务

1
2
3
4
5
6
7
@Bean(name = "/services/order")
TransactionalHttpInvokerServiceExporter orderService(OrderServiceImpl orderService) {
    TransactionalHttpInvokerServiceExporter exporter = new TransactionalHttpInvokerServiceExporter();
    exporter.setService(orderService);
    exporter.setServiceInterface(OrderService.class);
    return exporter;
}

OrderService的调用者business-service使用HttpInvokerProxyFactoryBean引入远程服务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Bean
public HttpInvokerProxyFactoryBean orderService() {
    HttpInvokerProxyFactoryBean orderService = new HttpInvokerProxyFactoryBean();
    orderService.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor());
    orderService.setServiceUrl("http://localhost:8082/services/order");
    orderService.setServiceInterface(OrderService.class);
    return orderService;
}

@Bean
public TransactionalHttpInvokerRequestExecutor httpInvokerRequestExecutor() {
    TransactionalHttpInvokerRequestExecutor httpInvokerRequestExecutor = new TransactionalHttpInvokerRequestExecutor();
    return httpInvokerRequestExecutor;
}

business-service负责发起全局事务,它使用Spring标准的@Transactional标记方法开启事务

1
2
3
4
5
@Transactional
public void createOrder(String userId, String commodityCode, Integer count) {
    orderService.create(userId, commodityCode, count);
    storageService.deduct(commodityCode, count);
}

Atomikos提供了TransactionalHttpInvokerRequestExecutorTransactionalHttpInvokerServiceExporter拦截请求和响应,利用HTTP header传递事务上下文。

spring-remoting

business-service在调用远程服务OrderService时,请求发送前会经过TransactionalHttpInvokerRequestExecutor.prepareConnection处理,增加HTTP header,携带事务上下文:

1
2
3
4
5
6
7
@Override
protected void prepareConnection(HttpURLConnection con, int contentLength)
		throws IOException {
	String propagation = template.onOutgoingRequest();
	con.setRequestProperty(HeaderNames.PROPAGATION_HEADER_NAME, propagation);
	super.prepareConnection(con, contentLength);
}

OrderService会使用TransactionalHttpInvokerServiceExporter.decorateInputStream进行请求拦截,能从HTTP header中解析出事务上下文:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@Override
protected InputStream decorateInputStream(HttpServletRequest request, InputStream is) throws IOException {
	
	try {
		String propagation = request.getHeader(HeaderNames.PROPAGATION_HEADER_NAME);
		template.onIncomingRequest(propagation);
	} catch (IllegalArgumentException e) {
		...
	}
	return super.decorateInputStream(request, is);
}

OrderService处理完成返回响应时,会将该节点加入全局事务包装成Event,放入HTTP header返回给business-service

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@Override
	protected OutputStream decorateOutputStream(HttpServletRequest request, HttpServletResponse response,
			OutputStream os) throws IOException {

		...

		response.addHeader(HeaderNames.EXTENT_HEADER_NAME, extent);
        
        ...
		
		return super.decorateOutputStream(request, response, os);
	}

business-service接收到响应,利用TransactionalHttpInvokerRequestExecutor.validateResponse解析出Event,注册进事务管理器,这样在全局事务提交时,可以让该分支参与到两阶段提交协议:

1
2
3
4
5
6
7
@Override
protected void validateResponse(HttpInvokerClientConfiguration config,
		HttpURLConnection con) throws IOException {
	super.validateResponse(config, con);
	String extent = con.getHeaderField(HeaderNames.EXTENT_HEADER_NAME);
	template.onIncomingResponse(extent);
}

# 两阶段提交过程

在处理RPC调用的响应时,Atomikos会将参与到全局事务的远程节点注册为Participants(Extent.addRemoteParticipants),在事务提交时,所有的Participants都会参与到两阶段提交

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
synchronized ( fsm_ ) {
if ( commit ) {
	if ( participants_.size () <= 1 ) {
		commit ( true );
	} else {
		int prepareResult = prepare ();
		// make sure to only do commit if NOT read only
		if ( prepareResult != Participant.READ_ONLY )
			commit ( false );
	}
} else {
	rollback ();
}

可以看出,如果Participants大于1,会走preparecommit两阶段提交的完整过程。那么OrderServiceStorageService如何参与进两阶段提交呢?

Atomikos提供了REST入口com.atomikos.remoting.twopc.AtomikosRestPort,你可以将AtomikosRestPort注册到JAX-RS,例如本示例选择的是Apache CFX,在application.properties进行配置:

1
2
3
cxf.path=/api
cxf.jaxrs.classes-scan=true
cxf.jaxrs.classes-scan-packages=com.atomikos.remoting.twopc

business-service在进行全局事务提交时,会访问所有Participants相应的REST接口进行两阶段提交:

atomikosrestport

business-service是怎么知道AtomikosRestPort的访问地址的呢?上面提到了,business-service在访问OrderService时,返回的响应header中包含了Event,地址就随着Event返回给了调用者。AtomikosRestPort的访问地址配置在jta.properties中:

1
com.atomikos.icatch.rest_port_url=http://localhost:8082/api/atomikos

至此,我们解释清楚了Atomikos如何为微服务提供分布式事务服务的,主要解决了两个问题:事务上下文如何通过RPC在服务间传播,以及微服务如何参与进两阶段提交协议的过程。

下一步我准备为Atomikos增加dubbo的支持,即事务上下文可以通过dubbo进行传播。

comments powered by Disqus