1. JTA (Java Transaction API) 与 Atomikos
JTA是一种标准的XA事务管理API,Atomikos是一个流行的JTA实现。
配置
@Configuration
@EnableTransactionManagement
public class JtaConfig {
@Bean(initMethod = "init", destroyMethod = "close")
public AtomikosDataSourceBean dataSourceA() {
AtomikosDataSourceBean dataSource = new AtomikosDataSourceBean();
// 配置数据源A
return dataSource;
}
@Bean(initMethod = "init", destroyMethod = "close")
public AtomikosDataSourceBean dataSourceB() {
AtomikosDataSourceBean dataSource = new AtomikosDataSourceBean();
// 配置数据源B
return dataSource;
}
@Bean
public JtaTransactionManager transactionManager() {
UserTransactionManager userTransactionManager = new UserTransactionManager();
UserTransaction userTransaction = new UserTransactionImp();
return new JtaTransactionManager(userTransaction, userTransactionManager);
}
}
使用
@Service
public class UserService {
@Transactional
public void createUserWithOrder(User user, Order order) {
userRepository.save(user);
orderRepository.save(order);
// 如果发生异常,两个操作都会回滚
}
}
2. Seata 框架
Seata是一个开源的分布式事务解决方案,支持AT、TCC、SAGA和XA模式。
配置
@Configuration
public class SeataConfig {
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
return new GlobalTransactionScanner("your-application-name", "your-tx-service-group");
}
}
使用 (AT模式)
@Service
public class OrderService {
@GlobalTransactional
public void createOrder(Order order) {
orderRepository.save(order);
accountService.deduct(order.getUserId(), order.getAmount());
inventoryService.deduct(order.getProductId(), order.getQuantity());
}
}
3. TCC (Try-Confirm-Cancel) 模式
TCC是一种补偿型事务模式,需要为每个操作实现try、confirm和cancel方法。
实现
@Service
public class OrderService {
@Transactional
@TccTransaction(confirmMethod = "confirmCreate", cancelMethod = "cancelCreate")
public void tryCreate(Order order) {
// 预留资源,创建订单
order.setStatus("PENDING");
orderRepository.save(order);
}
public void confirmCreate(Order order) {
// 确认订单创建
order.setStatus("CONFIRMED");
orderRepository.save(order);
}
public void cancelCreate(Order order) {
// 取消订单创建
orderRepository.delete(order);
}
}
使用
@Service
public class BusinessService {
@Autowired
private OrderService orderService;
@Autowired
private PaymentService paymentService;
@Transactional
public void createOrderWithPayment(Order order, Payment payment) {
orderService.tryCreate(order);
paymentService.tryCreate(payment);
// 如果任何一个操作失败,会触发cancel方法
}
}
4. 本地消息表 + 消息队列
这种方式结合了本地事务和消息队列,实现最终一致性。
实现
@Service
public class OrderService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void createOrder(Order order) {
// 创建订单
jdbcTemplate.update("INSERT INTO orders (id, user_id, amount) VALUES (?, ?, ?)",
order.getId(), order.getUserId(), order.getAmount());
// 插入本地消息表
jdbcTemplate.update("INSERT INTO message_table (message_id, message_body, status) VALUES (?, ?, ?)",
UUID.randomUUID().toString(), JSON.toJSONString(order), "NEW");
}
@Scheduled(fixedDelay = 10000)
public void sendMessage() {
List<Message> messages = jdbcTemplate.query(
"SELECT * FROM message_table WHERE status = 'NEW'",
(rs, rowNum) -> new Message(rs.getString("message_id"), rs.getString("message_body"))
);
for (Message message : messages) {
try {
rabbitTemplate.convertAndSend("orderExchange", "orderCreated", message.getBody());
jdbcTemplate.update("UPDATE message_table SET status = 'SENT' WHERE message_id = ?",
message.getId());
} catch (Exception e) {
// 处理发送失败的情况
}
}
}
}
消费者端
@Service
public class InventoryService {
@RabbitListener(queues = "orderCreatedQueue")
public void handleOrderCreated(String orderJson) {
Order order = JSON.parseObject(orderJson, Order.class);
// 更新库存
// 实现幂等性处理,防止重复消费
}
}
总结
- JTA/XA: 适用于强一致性要求的场景,但性能开销较大。
- Seata: 提供了多种模式,适应性强,但需要额外部署Seata服务器。
- TCC: 灵活性高,但实现复杂,需要仔细处理各种异常情况。
- 本地消息表 + MQ: 实现简单,适合最终一致性场景,但一致性保证相对较弱。
选择合适的方案需要根据具体的业务需求、性能要求和系统架构来决定。在实际应用中,可能需要组合使用多种方案来满足复杂的业务需求。