本地消息表:Spring Boot 實(shí)現(xiàn)分布式事務(wù)的優(yōu)雅方案
前言
在微服務(wù)架構(gòu)中,分布式事務(wù)一直是一個(gè)棘手的問題,常見的分布式事務(wù)解決方案包括:
2PC(兩階段提交)
2PC即兩階段提交協(xié)議,是將整個(gè)事務(wù)流程分為兩個(gè)階段,準(zhǔn)備階段(Prepare phase)、提交階段(commit phase):
- 準(zhǔn)備階段(Prepare phase):事務(wù)管理器給每個(gè)參與者發(fā)送Prepare消息,每個(gè)數(shù)據(jù)庫參與者在本地執(zhí)行事務(wù),并寫本地的Undo/Redo日志,此時(shí)事務(wù)沒有提交。 (Undo日志是記錄修改前的數(shù)據(jù),用于數(shù)據(jù)庫回滾,Redo日志是記錄修改后的數(shù)據(jù),用于提交事務(wù)后寫入數(shù)據(jù)文件)
- 提交階段(commit phase):如果事務(wù)管理器收到了參與者的執(zhí)行失敗或者超時(shí)消息時(shí),直接給每個(gè)參與者發(fā)送回滾(Rollback)消息;否則,發(fā)送提交(Commit)消息;參與者根據(jù)事務(wù)管理器的指令執(zhí)行提交或者回滾操作,并釋放事務(wù)處理過程中使用的鎖資源。注意:必須在最后階段釋放鎖資源。
當(dāng)所有參與者均反饋yes,提交事務(wù)
圖片
當(dāng)任何階段1一個(gè)參與者反饋no,中斷事務(wù)
圖片
TCC(Try-Confirm-Cancel)
TCC要求每個(gè)分支事務(wù)實(shí)現(xiàn)三個(gè)操作:預(yù)處理Try、確認(rèn)Confirm、撤銷Cancel。Try操作做業(yè)務(wù)檢查及資源預(yù)留,Confirm做業(yè)務(wù)確認(rèn)操作,Cancel實(shí)現(xiàn)一個(gè)與Try相反的操作即回滾操作。TM首先發(fā)起所有的分支事務(wù)的try操作,任何一個(gè)分支事務(wù)的try操作執(zhí)行失敗,TM將會發(fā)起所有分支事務(wù)的Cancel操作,若try操作全部成功,TM將會發(fā)起所有分支事務(wù)的Confirm操作,其中Confirm/Cancel操作若執(zhí)行失敗,TM會進(jìn)行重試。
- Try階段是做業(yè)務(wù)檢查(一致性)及資源預(yù)留(隔離),此階段僅是一個(gè)初步操作,它和后續(xù)的Confirm一起才能真正構(gòu)成一個(gè)完整的業(yè)務(wù)邏輯。
- Confirm階段是做確認(rèn)提交,Try階段所有分支事務(wù)執(zhí)行成功后開始執(zhí)行Confirm。通常情況下,采用TCC則認(rèn)為Confirm階段是不會出錯(cuò)的。即:只要Try成功,Confirm一定成功。若Confirm階段真的出錯(cuò)了,需引入重試機(jī)制或人工處理。
- Cancel階段是在業(yè)務(wù)執(zhí)行錯(cuò)誤需要回滾的狀態(tài)下執(zhí)行分支事務(wù)的業(yè)務(wù)取消,預(yù)留資源釋放。通常情況下,采用TCC則認(rèn)為Cancel階段也是一定成功的。若Cancel階段真的出錯(cuò)了,需引入重試機(jī)制或人工處理。
- TM事務(wù)管理器可以實(shí)現(xiàn)為獨(dú)立的服務(wù),也可以讓全局事務(wù)發(fā)起方充當(dāng)TM的角色,TM獨(dú)立出來是為了成為公用組件,是為了考慮系統(tǒng)結(jié)構(gòu)和軟件復(fù)用。
當(dāng)Try階段服務(wù)全部正常執(zhí)行, 執(zhí)行確認(rèn)業(yè)務(wù)邏輯操作
圖片
當(dāng)Try階段存在服務(wù)執(zhí)行失敗, 進(jìn)入Cancel階段
圖片
可靠消息最終一致性
可靠消息最終一致性方案是指當(dāng)事務(wù)發(fā)起方執(zhí)行完成本地事務(wù)后并發(fā)出一條消息,事務(wù)參與方(消息消費(fèi)者)一定能夠接收消息并處理事務(wù)成功,此方案強(qiáng)調(diào)的是只要消息發(fā)給事務(wù)參與方最終事務(wù)要達(dá)到一致。
正常情況——事務(wù)主動(dòng)方發(fā)消息
圖片
異常情況——事務(wù)主動(dòng)方消息恢復(fù)
圖片
本地消息表
本地消息表這個(gè)方案最初是eBay提出的,此方案的核心是通過本地事務(wù)保證數(shù)據(jù)業(yè)務(wù)操作和消息的一致性,然后通過定時(shí)任務(wù)將消息發(fā)送至消息中間件,待確認(rèn)消息發(fā)送給消費(fèi)方成功再將消息刪除。
本地消息表的核心思想是:將分布式事務(wù)拆分為多個(gè)本地事務(wù),并通過消息表記錄事務(wù)狀態(tài)。具體流程如下:
- 業(yè)務(wù)操作與消息記錄:在同一個(gè)本地事務(wù)中,執(zhí)行業(yè)務(wù)操作并記錄消息到本地?cái)?shù)據(jù)庫的消息表中
- 消息發(fā)送:事務(wù)提交后,通過定時(shí)任務(wù)或事件監(jiān)聽機(jī)制,將消息發(fā)送到消息隊(duì)列
- 消息消費(fèi):下游服務(wù)從消息隊(duì)列消費(fèi)消息并執(zhí)行業(yè)務(wù)操作
- 消息確認(rèn):下游服務(wù)處理完成后,通過某種機(jī)制確認(rèn)消息已處理
圖片
這種方案的最大優(yōu)勢是將分布式事務(wù)轉(zhuǎn)換為本地事務(wù),利用數(shù)據(jù)庫的ACID特性保證業(yè)務(wù)操作和消息記錄的原子性。
實(shí)現(xiàn)步驟
本地消息表因其實(shí)現(xiàn)簡單、可靠性高、性能良好,成為中小型項(xiàng)目的首選方案,本文將詳細(xì)介紹本地消息表的原理,并結(jié)合Spring Boot提供完整的實(shí)現(xiàn)方案。
數(shù)據(jù)庫設(shè)計(jì)
-- 消息表結(jié)構(gòu)
CREATE TABLE message (
id VARCHAR(32) PRIMARY KEY,
content TEXT NOT NULL,
topic VARCHAR(100) NOT NULL,
status VARCHAR(20) NOT NULL COMMENT 'INIT, SENDING, SENT, FAILED',
retry_count INT DEFAULT 0,
next_retry_time DATETIME,
create_time DATETIME NOT NULL,
update_time DATETIME NOT NULL,
INDEX idx_status (status),
INDEX idx_next_retry_time (next_retry_time)
);
-- 業(yè)務(wù)表示例(如訂單表)
CREATE TABLE orders (
id VARCHAR(32) PRIMARY KEY,
user_id VARCHAR(32) NOT NULL,
amount DECIMAL(10, 2) NOT NULL,
status VARCHAR(20) NOT NULL,
create_time DATETIME NOT NULL,
update_time DATETIME NOT NULL
);
消息結(jié)構(gòu)
public enum MessageStatus {
INIT("初始化"),
SENDING("發(fā)送中"),
SENT("已發(fā)送"),
FAILED("發(fā)送失敗");
private final String description;
MessageStatus(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
public interface MessageRepository extends JpaRepository<Message, String> {
List<Message> findByStatusAndNextRetryTimeLessThanEqual(String status, LocalDateTime now);
@Transactional
@Modifying
@Query("UPDATE Message m SET m.status = ?2, m.updateTime = ?3 WHERE m.id = ?1")
int updateStatus(String id, String status, LocalDateTime updateTime);
@Transactional
@Modifying
@Query("UPDATE Message m SET m.status = ?2, m.retryCount = m.retryCount + 1, " +
"m.nextRetryTime = ?3, m.updateTime = ?3 WHERE m.id = ?1")
int updateForRetry(String id, String status, LocalDateTime nextRetryTime);
@Transactional
@Modifying
@Query("DELETE FROM Message m WHERE m.createTime < ?1")
int deleteOlderThan(LocalDateTime threshold);
}
- MessageStatus枚舉:定義消息的四種狀態(tài)
- MessageRepository接口:繼承JPA的JpaRepository,提供基本CRUD操作,并定義自定義查詢方法
消息服務(wù)實(shí)現(xiàn)
@Service
public class MessageServiceImpl implements MessageService {
@Autowired
private MessageRepository messageRepository;
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${message.retry.interval:60000}")
private long retryInterval;
@Value("${message.max.retry:10}")
private int maxRetry;
@Value("${message.retention.days:30}")
private int retentionDays;
@Override
@Transactional
public Message saveMessage(String content, String topic) {
Message message = new Message();
message.setId(UUID.randomUUID().toString().replace("-", ""));
message.setContent(content);
message.setTopic(topic);
message.setStatus(MessageStatus.INIT.name());
message.setCreateTime(LocalDateTime.now());
message.setUpdateTime(LocalDateTime.now());
return messageRepository.save(message);
}
@Override
public void sendMessage(Message message) {
try {
// 更新消息狀態(tài)為發(fā)送中
messageRepository.updateStatus(message.getId(), MessageStatus.SENDING.name(), LocalDateTime.now());
// 發(fā)送消息到RabbitMQ
rabbitTemplate.convertAndSend(message.getTopic(), message.getContent());
// 更新消息狀態(tài)為已發(fā)送
messageRepository.updateStatus(message.getId(), MessageStatus.SENT.name(), LocalDateTime.now());
} catch (Exception e) {
// 發(fā)送失敗,更新重試信息
handleSendFailure(message, e);
}
}
@Override
public void processPendingMessages() {
List<Message> pendingMessages = messageRepository.findByStatusAndNextRetryTimeLessThanEqual(
MessageStatus.INIT, LocalDateTime.now());
for (Message message : pendingMessages) {
sendMessage(message);
}
}
@Override
public void retryFailedMessages() {
List<Message> failedMessages = messageRepository.findByStatusAndNextRetryTimeLessThanEqual(
MessageStatus.FAILED, LocalDateTime.now());
for (Message message : failedMessages) {
if (message.getRetryCount() < maxRetry) {
try {
// 更新消息狀態(tài)為發(fā)送中
messageRepository.updateStatus(message.getId(), MessageStatus.SENDING.name(), LocalDateTime.now());
// 重試發(fā)送消息
rabbitTemplate.convertAndSend(message.getTopic(), message.getContent());
// 更新消息狀態(tài)為已發(fā)送
messageRepository.updateStatus(message.getId(), MessageStatus.SENT.name(), LocalDateTime.now());
} catch (Exception e) {
// 重試失敗,更新重試信息
handleSendFailure(message, e);
}
} else {
// 超過最大重試次數(shù),記錄日志并標(biāo)記為永久失敗
// 可以添加告警機(jī)制
System.err.println("Message exceeded max retry count: " + message.getId());
}
}
}
@Override
public void cleanOldMessages() {
LocalDateTime threshold = LocalDateTime.now().minusDays(retentionDays);
int deletedCount = messageRepository.deleteOlderThan(threshold);
System.out.println("Deleted " + deletedCount + " old messages");
}
@Override
public void updateMessageStatus(String messageId, MessageStatus status) {
messageRepository.updateStatus(messageId, status.name(), LocalDateTime.now());
}
private void handleSendFailure(Message message, Exception e) {
// 計(jì)算下一次重試時(shí)間
LocalDateTime nextRetryTime = LocalDateTime.now().plusSeconds(retryInterval);
// 更新消息狀態(tài)為失敗并增加重試次數(shù)
messageRepository.updateForRetry(
message.getId(),
MessageStatus.FAILED.name(),
nextRetryTime
);
// 記錄錯(cuò)誤日志
System.err.println("Failed to send message: " + message.getId() + ", error: " + e.getMessage());
}
}
- saveMessage:創(chuàng)建并保存新消息,設(shè)置初始狀態(tài)為INIT
- sendMessage:發(fā)送消息到消息隊(duì)列,處理發(fā)送成功和失敗的情況
- processPendingMessages:處理待發(fā)送的消息
- retryFailedMessages:重試發(fā)送失敗的消息,實(shí)現(xiàn)最大重試次數(shù)控制
- cleanOldMessages:清理過期消息,防止消息表過大
- handleSendFailure:處理發(fā)送失敗的情況,更新重試信息
定時(shí)任務(wù)配置
@Component
public class MessageScheduler {
@Autowired
private MessageService messageService;
/**
* 定時(shí)處理待發(fā)送的消息
*/
@Scheduled(fixedRate = 10000) // 每10秒執(zhí)行一次
public void processPendingMessages() {
messageService.processPendingMessages();
}
/**
* 定時(shí)重試失敗的消息
*/
@Scheduled(fixedRate = 30000) // 每30秒執(zhí)行一次
public void retryFailedMessages() {
messageService.retryFailedMessages();
}
/**
* 定時(shí)清理過期消息
*/
@Scheduled(fixedRateString = "${message.clean.interval}")
public void cleanOldMessages() {
messageService.cleanOldMessages();
}
}
- processPendingMessages:每 10 秒檢查一次待發(fā)送的消息
- retryFailedMessages:每 30 秒檢查一次需要重試的消息
- cleanOldMessages:按配置的間隔清理過期消息
業(yè)務(wù)邏輯
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private MessageService messageService;
@Override
@Transactional
public Order createOrder(String userId, Double amount) {
// 創(chuàng)建訂單
Order order = new Order();
order.setId(UUID.randomUUID().toString().replace("-", ""));
order.setUserId(userId);
order.setAmount(amount);
order.setStatus("CREATED");
order.setCreateTime(LocalDateTime.now());
order.setUpdateTime(LocalDateTime.now());
// 保存訂單
orderRepository.save(order);
// 在同一事務(wù)中保存消息(關(guān)鍵:確保訂單創(chuàng)建和消息記錄在同一個(gè)本地事務(wù)中)
String messageContent = String.format("{\"orderId\":\"%s\",\"userId\":\"%s\",\"amount\":%.2f}",
order.getId(), userId, amount);
messageService.saveMessage(messageContent, "order.created");
return order;
}
}
消息消費(fèi)處理
@Component
public class OrderCreatedConsumer {
@Autowired
private PaymentService paymentService;
@Autowired
private MessageService messageService;
@Autowired
private ObjectMapper objectMapper;
@RabbitListener(queues = "order.created")
public void handleOrderCreated(String message) {
try {
// 解析消息內(nèi)容
Map<String, Object> messageData = objectMapper.readValue(message, Map.class);
String orderId = (String) messageData.get("orderId");
String userId = (String) messageData.get("userId");
Double amount = Double.parseDouble(messageData.get("amount").toString());
// 處理訂單創(chuàng)建事件(例如:創(chuàng)建支付記錄)
paymentService.createPayment(orderId, userId, amount);
// 可以在這里添加其他業(yè)務(wù)邏輯,如扣減庫存、發(fā)送通知等
} catch (Exception e) {
// 消費(fèi)失敗,記錄日志
System.err.println("Failed to process order created message: " + e.getMessage());
// 注意:RabbitMQ默認(rèn)會重試,可能導(dǎo)致消息重復(fù)消費(fèi),需要在業(yè)務(wù)層處理冪等性
}
}
}
@Service
public class PaymentServiceImpl implements PaymentService {
@Override
@Transactional
public void createPayment(String orderId, String userId, Double amount) {
// 實(shí)現(xiàn)支付邏輯
// 例如:創(chuàng)建支付記錄、調(diào)用支付網(wǎng)關(guān)、更新訂單狀態(tài)等
System.out.println("Creating payment for order: " + orderId + ", amount: " + amount);
// 這里只是示例,實(shí)際應(yīng)用中需要根據(jù)業(yè)務(wù)需求實(shí)現(xiàn)具體邏輯
}
}
- @RabbitListener注解:監(jiān)聽指定隊(duì)列的消息
- 業(yè)務(wù)處理:根據(jù)消息內(nèi)容執(zhí)行業(yè)務(wù)邏輯
?
冪等性設(shè)計(jì):消費(fèi)端需要確保業(yè)務(wù)操作的冪等性,防止重復(fù)消費(fèi)導(dǎo)致的數(shù)據(jù)不一致。
解決接口冪等問題,只需要記住一句口令"一鎖、二判、三更新",只要嚴(yán)格遵守這個(gè)過程,那么就可以解決并發(fā)問題。
//一鎖:先加一個(gè)分布式鎖
@DistributeLock(scene = "OEDER", keyExpression = "#request.identifier", expire = 3000)
public OrderResponse apply(OrderRequest request) {
OrderResponse response = new OrderResponse();
//二判:判斷請求是否執(zhí)行成功過
OrderDTO orderDTO = orderService.queryOrder(request.getProduct(), request.getIdentifier());
if (orderDTO != null) {
response.setSuccess(true);
response.setResponseCode("DUPLICATED");
return response;
}
//三更新:執(zhí)行更新的業(yè)務(wù)邏輯
return orderService.order(request);
}
總結(jié)
本地消息表方案的核心在于:
- 將業(yè)務(wù)操作和消息記錄放在同一個(gè)本地事務(wù)中,確保原子性
- 通過定時(shí)任務(wù)異步發(fā)送消息,避免阻塞業(yè)務(wù)流程
- 實(shí)現(xiàn)消息重試機(jī)制,提高消息發(fā)送成功率
- 消費(fèi)端實(shí)現(xiàn)冪等性,保證數(shù)據(jù)一致性