成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

本地消息表:Spring Boot 實(shí)現(xiàn)分布式事務(wù)的優(yōu)雅方案

云計(jì)算 分布式
TCC要求每個(gè)分支事務(wù)實(shí)現(xiàn)三個(gè)操作:預(yù)處理Try、確認(rèn)Confirm、撤銷Cancel。Try操作做業(yè)務(wù)檢查及資源預(yù)留,Confirm做業(yè)務(wù)確認(rèn)操作,Cancel實(shí)現(xiàn)一個(gè)與Try相反的操作即回滾操作。

前言

在微服務(wù)架構(gòu)中,分布式事務(wù)一直是一個(gè)棘手的問題,常見的分布式事務(wù)解決方案包括:

2PC(兩階段提交)

2PC即兩階段提交協(xié)議,是將整個(gè)事務(wù)流程分為兩個(gè)階段,準(zhǔn)備階段(Prepare phase)、提交階段(commit phase):

  • 準(zhǔn)備階段(Prepare phase):事務(wù)管理器給每個(gè)參與者發(fā)送Prepare消息,每個(gè)數(shù)據(jù)庫參與者在本地執(zhí)行事務(wù),并寫本地的Undo/Redo日志,此時(shí)事務(wù)沒有提交。 (Undo日志是記錄修改前的數(shù)據(jù),用于數(shù)據(jù)庫回滾,Redo日志是記錄修改后的數(shù)據(jù),用于提交事務(wù)后寫入數(shù)據(jù)文件)
  • 提交階段(commit phase):如果事務(wù)管理器收到了參與者的執(zhí)行失敗或者超時(shí)消息時(shí),直接給每個(gè)參與者發(fā)送回滾(Rollback)消息;否則,發(fā)送提交(Commit)消息;參與者根據(jù)事務(wù)管理器的指令執(zhí)行提交或者回滾操作,并釋放事務(wù)處理過程中使用的鎖資源。注意:必須在最后階段釋放鎖資源。
當(dāng)所有參與者均反饋yes,提交事務(wù)

圖片圖片

當(dāng)任何階段1一個(gè)參與者反饋no,中斷事務(wù)

圖片圖片

TCC(Try-Confirm-Cancel)

TCC要求每個(gè)分支事務(wù)實(shí)現(xiàn)三個(gè)操作:預(yù)處理Try、確認(rèn)Confirm、撤銷Cancel。Try操作做業(yè)務(wù)檢查及資源預(yù)留,Confirm做業(yè)務(wù)確認(rèn)操作,Cancel實(shí)現(xiàn)一個(gè)與Try相反的操作即回滾操作。TM首先發(fā)起所有的分支事務(wù)的try操作,任何一個(gè)分支事務(wù)的try操作執(zhí)行失敗,TM將會發(fā)起所有分支事務(wù)的Cancel操作,若try操作全部成功,TM將會發(fā)起所有分支事務(wù)的Confirm操作,其中Confirm/Cancel操作若執(zhí)行失敗,TM會進(jìn)行重試。

  • Try階段是做業(yè)務(wù)檢查(一致性)及資源預(yù)留(隔離),此階段僅是一個(gè)初步操作,它和后續(xù)的Confirm一起才能真正構(gòu)成一個(gè)完整的業(yè)務(wù)邏輯。
  • Confirm階段是做確認(rèn)提交,Try階段所有分支事務(wù)執(zhí)行成功后開始執(zhí)行Confirm。通常情況下,采用TCC則認(rèn)為Confirm階段是不會出錯(cuò)的。即:只要Try成功,Confirm一定成功。若Confirm階段真的出錯(cuò)了,需引入重試機(jī)制或人工處理。
  • Cancel階段是在業(yè)務(wù)執(zhí)行錯(cuò)誤需要回滾的狀態(tài)下執(zhí)行分支事務(wù)的業(yè)務(wù)取消,預(yù)留資源釋放。通常情況下,采用TCC則認(rèn)為Cancel階段也是一定成功的。若Cancel階段真的出錯(cuò)了,需引入重試機(jī)制或人工處理。
  • TM事務(wù)管理器可以實(shí)現(xiàn)為獨(dú)立的服務(wù),也可以讓全局事務(wù)發(fā)起方充當(dāng)TM的角色,TM獨(dú)立出來是為了成為公用組件,是為了考慮系統(tǒng)結(jié)構(gòu)和軟件復(fù)用。
當(dāng)Try階段服務(wù)全部正常執(zhí)行, 執(zhí)行確認(rèn)業(yè)務(wù)邏輯操作

圖片圖片

當(dāng)Try階段存在服務(wù)執(zhí)行失敗, 進(jìn)入Cancel階段

圖片圖片

可靠消息最終一致性

可靠消息最終一致性方案是指當(dāng)事務(wù)發(fā)起方執(zhí)行完成本地事務(wù)后并發(fā)出一條消息,事務(wù)參與方(消息消費(fèi)者)一定能夠接收消息并處理事務(wù)成功,此方案強(qiáng)調(diào)的是只要消息發(fā)給事務(wù)參與方最終事務(wù)要達(dá)到一致。

正常情況——事務(wù)主動(dòng)方發(fā)消息

圖片圖片

異常情況——事務(wù)主動(dòng)方消息恢復(fù)

圖片圖片

本地消息表

本地消息表這個(gè)方案最初是eBay提出的,此方案的核心是通過本地事務(wù)保證數(shù)據(jù)業(yè)務(wù)操作和消息的一致性,然后通過定時(shí)任務(wù)將消息發(fā)送至消息中間件,待確認(rèn)消息發(fā)送給消費(fèi)方成功再將消息刪除。

本地消息表的核心思想是:將分布式事務(wù)拆分為多個(gè)本地事務(wù),并通過消息表記錄事務(wù)狀態(tài)。具體流程如下:

  • 業(yè)務(wù)操作與消息記錄:在同一個(gè)本地事務(wù)中,執(zhí)行業(yè)務(wù)操作并記錄消息到本地?cái)?shù)據(jù)庫的消息表中
  • 消息發(fā)送:事務(wù)提交后,通過定時(shí)任務(wù)或事件監(jiān)聽機(jī)制,將消息發(fā)送到消息隊(duì)列
  • 消息消費(fèi):下游服務(wù)從消息隊(duì)列消費(fèi)消息并執(zhí)行業(yè)務(wù)操作
  • 消息確認(rèn):下游服務(wù)處理完成后,通過某種機(jī)制確認(rèn)消息已處理

圖片圖片

這種方案的最大優(yōu)勢是將分布式事務(wù)轉(zhuǎn)換為本地事務(wù),利用數(shù)據(jù)庫的ACID特性保證業(yè)務(wù)操作和消息記錄的原子性。

實(shí)現(xiàn)步驟

本地消息表因其實(shí)現(xiàn)簡單、可靠性高、性能良好,成為中小型項(xiàng)目的首選方案,本文將詳細(xì)介紹本地消息表的原理,并結(jié)合Spring Boot提供完整的實(shí)現(xiàn)方案。

數(shù)據(jù)庫設(shè)計(jì)

-- 消息表結(jié)構(gòu)
CREATE TABLE message (
    id VARCHAR(32) PRIMARY KEY,
    content TEXT NOT NULL,
    topic VARCHAR(100) NOT NULL,
    status VARCHAR(20) NOT NULL COMMENT 'INIT, SENDING, SENT, FAILED',
    retry_count INT DEFAULT 0,
    next_retry_time DATETIME,
    create_time DATETIME NOT NULL,
    update_time DATETIME NOT NULL,
    INDEX idx_status (status),
    INDEX idx_next_retry_time (next_retry_time)
);

-- 業(yè)務(wù)表示例(如訂單表)
CREATE TABLE orders (
    id VARCHAR(32) PRIMARY KEY,
    user_id VARCHAR(32) NOT NULL,
    amount DECIMAL(10, 2) NOT NULL,
    status VARCHAR(20) NOT NULL,
    create_time DATETIME NOT NULL,
    update_time DATETIME NOT NULL
);

消息結(jié)構(gòu)

public enum MessageStatus {
    INIT("初始化"),
    SENDING("發(fā)送中"),
    SENT("已發(fā)送"),
    FAILED("發(fā)送失敗");

    private final String description;

    MessageStatus(String description) {
        this.description = description;
    }

    public String getDescription() {
        return description;
    }
}

public interface MessageRepository extends JpaRepository<Message, String> {

    List<Message> findByStatusAndNextRetryTimeLessThanEqual(String status, LocalDateTime now);

    @Transactional
    @Modifying
    @Query("UPDATE Message m SET m.status = ?2, m.updateTime = ?3 WHERE m.id = ?1")
    int updateStatus(String id, String status, LocalDateTime updateTime);

    @Transactional
    @Modifying
    @Query("UPDATE Message m SET m.status = ?2, m.retryCount = m.retryCount + 1, " +
            "m.nextRetryTime = ?3, m.updateTime = ?3 WHERE m.id = ?1")
    int updateForRetry(String id, String status, LocalDateTime nextRetryTime);

    @Transactional
    @Modifying
    @Query("DELETE FROM Message m WHERE m.createTime < ?1")
    int deleteOlderThan(LocalDateTime threshold);
}
  • MessageStatus枚舉:定義消息的四種狀態(tài)
  • MessageRepository接口:繼承JPA的JpaRepository,提供基本CRUD操作,并定義自定義查詢方法

消息服務(wù)實(shí)現(xiàn)

@Service
public class MessageServiceImpl implements MessageService {

    @Autowired
    private MessageRepository messageRepository;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${message.retry.interval:60000}")
    private long retryInterval;

    @Value("${message.max.retry:10}")
    private int maxRetry;

    @Value("${message.retention.days:30}")
    private int retentionDays;

    @Override
    @Transactional
    public Message saveMessage(String content, String topic) {
        Message message = new Message();
        message.setId(UUID.randomUUID().toString().replace("-", ""));
        message.setContent(content);
        message.setTopic(topic);
        message.setStatus(MessageStatus.INIT.name());
        message.setCreateTime(LocalDateTime.now());
        message.setUpdateTime(LocalDateTime.now());
        return messageRepository.save(message);
    }

    @Override
    public void sendMessage(Message message) {
        try {
            // 更新消息狀態(tài)為發(fā)送中
            messageRepository.updateStatus(message.getId(), MessageStatus.SENDING.name(), LocalDateTime.now());
            
            // 發(fā)送消息到RabbitMQ
            rabbitTemplate.convertAndSend(message.getTopic(), message.getContent());
            
            // 更新消息狀態(tài)為已發(fā)送
            messageRepository.updateStatus(message.getId(), MessageStatus.SENT.name(), LocalDateTime.now());
        } catch (Exception e) {
            // 發(fā)送失敗,更新重試信息
            handleSendFailure(message, e);
        }
    }

    @Override
    public void processPendingMessages() {
        List<Message> pendingMessages = messageRepository.findByStatusAndNextRetryTimeLessThanEqual(
                MessageStatus.INIT, LocalDateTime.now());
        
        for (Message message : pendingMessages) {
            sendMessage(message);
        }
    }

    @Override
    public void retryFailedMessages() {
        List<Message> failedMessages = messageRepository.findByStatusAndNextRetryTimeLessThanEqual(
                MessageStatus.FAILED, LocalDateTime.now());
        
        for (Message message : failedMessages) {
            if (message.getRetryCount() < maxRetry) {
                try {
                    // 更新消息狀態(tài)為發(fā)送中
                    messageRepository.updateStatus(message.getId(), MessageStatus.SENDING.name(), LocalDateTime.now());
                    
                    // 重試發(fā)送消息
                    rabbitTemplate.convertAndSend(message.getTopic(), message.getContent());
                    
                    // 更新消息狀態(tài)為已發(fā)送
                    messageRepository.updateStatus(message.getId(), MessageStatus.SENT.name(), LocalDateTime.now());
                } catch (Exception e) {
                    // 重試失敗,更新重試信息
                    handleSendFailure(message, e);
                }
            } else {
                // 超過最大重試次數(shù),記錄日志并標(biāo)記為永久失敗
                // 可以添加告警機(jī)制
                System.err.println("Message exceeded max retry count: " + message.getId());
            }
        }
    }

    @Override
    public void cleanOldMessages() {
        LocalDateTime threshold = LocalDateTime.now().minusDays(retentionDays);
        int deletedCount = messageRepository.deleteOlderThan(threshold);
        System.out.println("Deleted " + deletedCount + " old messages");
    }

    @Override
    public void updateMessageStatus(String messageId, MessageStatus status) {
        messageRepository.updateStatus(messageId, status.name(), LocalDateTime.now());
    }

    private void handleSendFailure(Message message, Exception e) {
        // 計(jì)算下一次重試時(shí)間
        LocalDateTime nextRetryTime = LocalDateTime.now().plusSeconds(retryInterval);
        
        // 更新消息狀態(tài)為失敗并增加重試次數(shù)
        messageRepository.updateForRetry(
                message.getId(), 
                MessageStatus.FAILED.name(), 
                nextRetryTime
        );
        
        // 記錄錯(cuò)誤日志
        System.err.println("Failed to send message: " + message.getId() + ", error: " + e.getMessage());
    }
}
  • saveMessage:創(chuàng)建并保存新消息,設(shè)置初始狀態(tài)為INIT
  • sendMessage:發(fā)送消息到消息隊(duì)列,處理發(fā)送成功和失敗的情況
  • processPendingMessages:處理待發(fā)送的消息
  • retryFailedMessages:重試發(fā)送失敗的消息,實(shí)現(xiàn)最大重試次數(shù)控制
  • cleanOldMessages:清理過期消息,防止消息表過大
  • handleSendFailure:處理發(fā)送失敗的情況,更新重試信息

定時(shí)任務(wù)配置

@Component
public class MessageScheduler {

    @Autowired
    private MessageService messageService;


    /**
     * 定時(shí)處理待發(fā)送的消息
     */
    @Scheduled(fixedRate = 10000) // 每10秒執(zhí)行一次
    public void processPendingMessages() {
        messageService.processPendingMessages();
    }

    /**
     * 定時(shí)重試失敗的消息
     */
    @Scheduled(fixedRate = 30000) // 每30秒執(zhí)行一次
    public void retryFailedMessages() {
        messageService.retryFailedMessages();
    }

    /**
     * 定時(shí)清理過期消息
     */
    @Scheduled(fixedRateString = "${message.clean.interval}")
    public void cleanOldMessages() {
        messageService.cleanOldMessages();
    }
}
  • processPendingMessages:每 10 秒檢查一次待發(fā)送的消息
  • retryFailedMessages:每 30 秒檢查一次需要重試的消息
  • cleanOldMessages:按配置的間隔清理過期消息

業(yè)務(wù)邏輯

@Service
public class OrderServiceImpl implements OrderService {

    @Autowired
    private OrderRepository orderRepository;

    @Autowired
    private MessageService messageService;

    @Override
    @Transactional
    public Order createOrder(String userId, Double amount) {
        // 創(chuàng)建訂單
        Order order = new Order();
        order.setId(UUID.randomUUID().toString().replace("-", ""));
        order.setUserId(userId);
        order.setAmount(amount);
        order.setStatus("CREATED");
        order.setCreateTime(LocalDateTime.now());
        order.setUpdateTime(LocalDateTime.now());
        
        // 保存訂單
        orderRepository.save(order);
        
        // 在同一事務(wù)中保存消息(關(guān)鍵:確保訂單創(chuàng)建和消息記錄在同一個(gè)本地事務(wù)中)
        String messageContent = String.format("{\"orderId\":\"%s\",\"userId\":\"%s\",\"amount\":%.2f}", 
                order.getId(), userId, amount);
        messageService.saveMessage(messageContent, "order.created");
        
        return order;
    }
}

消息消費(fèi)處理

@Component
public class OrderCreatedConsumer {

    @Autowired
    private PaymentService paymentService;

    @Autowired
    private MessageService messageService;

    @Autowired
    private ObjectMapper objectMapper;

    @RabbitListener(queues = "order.created")
    public void handleOrderCreated(String message) {
        try {
            // 解析消息內(nèi)容
            Map<String, Object> messageData = objectMapper.readValue(message, Map.class);
            String orderId = (String) messageData.get("orderId");
            String userId = (String) messageData.get("userId");
            Double amount = Double.parseDouble(messageData.get("amount").toString());
            
            // 處理訂單創(chuàng)建事件(例如:創(chuàng)建支付記錄)
            paymentService.createPayment(orderId, userId, amount);
            
            // 可以在這里添加其他業(yè)務(wù)邏輯,如扣減庫存、發(fā)送通知等
            
        } catch (Exception e) {
            // 消費(fèi)失敗,記錄日志
            System.err.println("Failed to process order created message: " + e.getMessage());
            // 注意:RabbitMQ默認(rèn)會重試,可能導(dǎo)致消息重復(fù)消費(fèi),需要在業(yè)務(wù)層處理冪等性
        }
    }
}

@Service
public class PaymentServiceImpl implements PaymentService {

    @Override
    @Transactional
    public void createPayment(String orderId, String userId, Double amount) {
        // 實(shí)現(xiàn)支付邏輯
        // 例如:創(chuàng)建支付記錄、調(diào)用支付網(wǎng)關(guān)、更新訂單狀態(tài)等
        
        System.out.println("Creating payment for order: " + orderId + ", amount: " + amount);
        
        // 這里只是示例,實(shí)際應(yīng)用中需要根據(jù)業(yè)務(wù)需求實(shí)現(xiàn)具體邏輯
    }
}
  • @RabbitListener注解:監(jiān)聽指定隊(duì)列的消息
  • 業(yè)務(wù)處理:根據(jù)消息內(nèi)容執(zhí)行業(yè)務(wù)邏輯

?

冪等性設(shè)計(jì):消費(fèi)端需要確保業(yè)務(wù)操作的冪等性,防止重復(fù)消費(fèi)導(dǎo)致的數(shù)據(jù)不一致。

解決接口冪等問題,只需要記住一句口令"一鎖、二判、三更新",只要嚴(yán)格遵守這個(gè)過程,那么就可以解決并發(fā)問題。

//一鎖:先加一個(gè)分布式鎖
@DistributeLock(scene = "OEDER", keyExpression = "#request.identifier", expire = 3000)
public OrderResponse apply(OrderRequest request) {
    OrderResponse response = new OrderResponse();   
    //二判:判斷請求是否執(zhí)行成功過
    OrderDTO orderDTO = orderService.queryOrder(request.getProduct(), request.getIdentifier());
    if (orderDTO != null) {
        response.setSuccess(true);
        response.setResponseCode("DUPLICATED");
        return response;
    }
    //三更新:執(zhí)行更新的業(yè)務(wù)邏輯   
    return orderService.order(request);
}

總結(jié)

本地消息表方案的核心在于:

  • 將業(yè)務(wù)操作和消息記錄放在同一個(gè)本地事務(wù)中,確保原子性
  • 通過定時(shí)任務(wù)異步發(fā)送消息,避免阻塞業(yè)務(wù)流程
  • 實(shí)現(xiàn)消息重試機(jī)制,提高消息發(fā)送成功率
  • 消費(fèi)端實(shí)現(xiàn)冪等性,保證數(shù)據(jù)一致性
責(zé)任編輯:武曉燕 來源: 一安未來
相關(guān)推薦

2024-06-07 08:06:36

2025-04-29 04:00:00

分布式事務(wù)事務(wù)消息

2024-06-13 09:25:14

2022-06-27 08:21:05

Seata分布式事務(wù)微服務(wù)

2023-01-06 09:19:12

Seata分布式事務(wù)

2020-07-15 16:50:57

Spring BootRedisJava

2025-01-26 00:00:40

Seata分布式事務(wù)

2021-02-03 10:49:34

JTA分布式事務(wù)

2025-04-28 00:44:04

2025-06-11 08:01:06

2023-09-14 15:44:46

分布式事務(wù)數(shù)據(jù)存儲

2020-05-28 09:35:05

分布式事務(wù)方案

2023-11-06 13:15:32

分布式事務(wù)Seata

2017-07-26 15:08:05

大數(shù)據(jù)分布式事務(wù)

2022-06-21 08:27:22

Seata分布式事務(wù)

2020-03-31 08:05:23

分布式開發(fā)技術(shù)

2023-09-04 08:12:16

分布式鎖Springboot

2021-06-16 08:33:02

分布式事務(wù)ACID

2024-08-19 09:05:00

Seata分布式事務(wù)

2022-06-28 08:37:07

分布式服務(wù)器WebSocket
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 国产在线视频网 | 手机av免费在线 | 久久亚洲欧美日韩精品专区 | 99久久婷婷国产综合精品电影 | 九九久久久 | 91大神在线资源观看无广告 | 国产精品毛片一区二区三区 | 亚洲精品久久久久久宅男 | 91av在线免费 | 国产精品视频 | 亚洲天堂一区 | 午夜一区 | 亚洲精品乱码久久久久久久久 | 91精品国产综合久久久亚洲 | 中文字幕国产一区 | 国产精品久久在线 | 精品国产一区二区三区成人影院 | 一区二区视频在线 | 中文字幕精品视频 | 亚洲视频欧美视频 | 亚洲精品国产一区 | 欧美精品一区二区三区在线 | 欧美精品1区 | 日韩av在线一区 | 欧日韩在线 | 性一爱一乱一交一视频 | 奇色影视| 亚洲一区二区精品视频 | 成人在线免费观看av | 91av免费观看 | 国产一区二区三区在线 | 久久se精品一区精品二区 | 国产精品毛片无码 | 久久久久国产精品一区三寸 | 色在线免费 | 国产91在线播放 | 久久中文免费视频 | 91伦理片 | 午夜免费视频 | 亚洲一区二区三区在线免费 | 韩日精品视频 |