成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

SpringBoot分布式事務(wù)之可靠消息最終一致性

開發(fā) 前端
如果broker未收到消息(如果執(zhí)行本地事務(wù)突然宕機(jī)了,相當(dāng)執(zhí)行本地事務(wù)(executeLocalTransaction)執(zhí)行結(jié)果返回unknow,則和broker未收到確認(rèn)消息的情況一樣處理。

環(huán)境:springboot2.3.9 + RocketMQ4.8.0

可靠消息最終一致性原理

圖片

  • 執(zhí)行流程
  1. Producer發(fā)送Prepare message到broker。
  2. Prepare Message發(fā)送成功后開始執(zhí)行本地事務(wù)。
  3. 如果本地事務(wù)執(zhí)行成功的話則返回commit,如果執(zhí)行失敗則返回rollback。(這個是在事務(wù)消息的回調(diào)方法里由開發(fā)者自己決定commit or rollback)
  4. Producer發(fā)送上一步的commit還是rollback到broker,這里有以下兩種情況:

1、如果broker收到了commit/rollback消息 :

如果收到了commit,則broker認(rèn)為整個事務(wù)是沒問題的,執(zhí)行成功的。那么會下發(fā)消息給Consumer端消費(fèi)。

如果收到了rollback,則broker認(rèn)為本地事務(wù)執(zhí)行失敗了,broker將會刪除Half Message,不下發(fā)給Consumer端。

2、如果broker未收到消息(如果執(zhí)行本地事務(wù)突然宕機(jī)了,相當(dāng)執(zhí)行本地事務(wù)(executeLocalTransaction)執(zhí)行結(jié)果返回unknow,則和broker未收到確認(rèn)消息的情況一樣處理。):

broker會定時回查本地事務(wù)的執(zhí)行結(jié)果:如果回查結(jié)果是本地事務(wù)已經(jīng)執(zhí)行則返回commit,若未執(zhí)行,則返回unknow。

Producer端回查的結(jié)果發(fā)送給Broker。Broker接收到的如果是commit,則broker視為整個事務(wù)執(zhí)行成功,如果是rollback,則broker視為本地事務(wù)執(zhí)行失敗,broker刪除Half Message,不下發(fā)給consumer。如果broker未接收到回查的結(jié)果(或者查到的是unknow),則broker會定時進(jìn)行重復(fù)回查,以確保查到最終的事務(wù)結(jié)果。重復(fù)回查的時間間隔和次數(shù)都可配。

工程結(jié)構(gòu)

圖片圖片

建立父子工程,兩個子項(xiàng)目account-manager,integral-manager。

依賴

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.0</version>
</dependency>

Account子模塊

  • 配置文件
server:
  port: 8081
---
rocketmq:
  nameServer: localhost:9876
  producer:
    group: pack-mq
---
spring:
  jpa:
    generateDdl: false
    hibernate:
      ddlAuto: update
    openInView: true
    show-sql: true
---
spring:
  datasource:
    driverClassName: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/account?serverTimeznotallow=GMT%2B8
    username: root
    password: ******
    type: com.zaxxer.hikari.HikariDataSource
    hikari:
      minimumIdle: 10
      maximumPoolSize: 200
      autoCommit: true
      idleTimeout: 30000
      poolName: MasterDatabookHikariCP
      maxLifetime: 1800000
      connectionTimeout: 30000
      connectionTestQuery: SELECT 1
  • 業(yè)務(wù)實(shí)體類
// 用戶表
@Entity
@Table(name = "t_account")
public class Account {
  @Id
  private Long id;
  private String name ;
}
// 業(yè)務(wù)記錄表(用來查詢?nèi)ブ兀?@Entity
@Table(name = "t_account_log")
public class AccountLog {
  @Id
  private Long txid;
  private Date createTime ;
}
  • DAO相關(guān)類
public interface AccountRepository extends JpaRepository<Account, Long> {
}
public interface AccountLogRepository extends JpaRepository<AccountLog, Long> {
}
  • Service相關(guān)類
@Resource
private AccountRepository accountRepository ;
@Resource
private AccountLogRepository accountLogRepository ;
  
// 該方法保存業(yè)務(wù)數(shù)據(jù),同時保存操作記錄;操作記錄用來回查。
@Transactional
public boolean register(Account account) {
  accountRepository.save(account) ;
  AccountLog accountLog = new AccountLog(account.getId(), new Date()) ;
  accountLogRepository.save(accountLog) ;
  return true ;
}
  
public AccountLog existsTxId(Long txid) {
  return accountLogRepository.findById(txid).orElse(null) ;
}
  • 發(fā)送消息方法
@Resource
private RocketMQTemplate rocketMQTemplate ;
  
public String sendTx(String topic, String tags, Account account) {
  String uuid = UUID.randomUUID().toString().replaceAll("-", "") ;
  TransactionSendResult result =rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(account).
      setHeader("tx_id", uuid).build(), uuid) ;
  return result.getSendStatus().name() ;
}
  • 消息監(jiān)聽(生產(chǎn)者監(jiān)聽)
@RocketMQTransactionListener
public class ProducerMessageListener implements RocketMQLocalTransactionListener {
  
  @Resource
  private AccountService accountService ;


  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    try {
      Account account = new JsonMapper().readValue((byte[])msg.getPayload(), Account.class) ;
      accountService.register(account) ;
    } catch (Exception e) {
      e.printStackTrace() ;
      return RocketMQLocalTransactionState.ROLLBACK ;
    }
    return RocketMQLocalTransactionState.COMMIT ;
  }


  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
  // 這里檢查本地事務(wù)是否執(zhí)行成功
    try {
      Account account = new JsonMapper().readValue((byte[])msg.getPayload(), Account.class) ;
      System.out.println("執(zhí)行查詢ID為:" + account.getId() + " 的數(shù)據(jù)是否存在") ;
      AccountLog accountLog = accountService.existsTxId(account.getId()) ;
      if (accountLog == null) {
        return RocketMQLocalTransactionState.UNKNOWN ;
      }
    } catch (Exception e) {
      e.printStackTrace() ;
      return RocketMQLocalTransactionState.UNKNOWN ;
    }
    return RocketMQLocalTransactionState.COMMIT ;
  }


}
  • Controller接口
@RestController
@RequestMapping("/accounts")
public class AccountController {
  @Resource
  private ProducerMessageService messageService ;
  @PostMapping("/send")
  public Object sendMessage(@RequestBody Account account) {
    return messageService.sendTx("tx-topic", "mks", account) ;
  }
}

Integral子模塊

  • 業(yè)務(wù)實(shí)體類
@Entity
@Table(name = "t_integral")
public class Integral {
  @Id
  private Long id;
  private Integer score ;
  private Long acccountId ;
}
  • DAO相關(guān)類
public interface IntegralRepository extends JpaRepository<Integral, Long> {
}
  • Service相關(guān)類
@Resource
private IntegralRepository integralRepository ;
  
@Transactional
public Integral saveIntegral(Integral integral) {
  return integralRepository.save(integral) ;
}
  • 消息監(jiān)聽
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "mks")
@Component
public class IntegralMessageListener implements RocketMQListener<String> {


  @Resource
  private IntegralService integralService ;
  
  @SuppressWarnings("unchecked")
  @Override
  public void onMessage(String message) {
    System.out.println("Integral接收到消息:" + message) ;
    try {
      Map<String, Object> jsonMap = new JsonMapper().readValue(message, Map.class) ;
      Integer id = (Integer) jsonMap.get("id") ;
      integralService.saveIntegral(new Integral(1L, 1000, id + 0L)) ;
    } catch (Exception e) {
      throw new RuntimeException(e) ;
    }
  }


}

測試

分別啟動兩個子模塊

  • 初始數(shù)據(jù)表

圖片圖片


  • Postman測試

圖片圖片

Account模塊

圖片圖片

Integral模塊

圖片圖片


當(dāng)子模塊Account執(zhí)行本地事務(wù)發(fā)生錯誤時,事務(wù)會回滾并且刪除消息。子模塊Integral并不會收到消息。

責(zé)任編輯:武曉燕 來源: 實(shí)戰(zhàn)案例錦集
相關(guān)推薦

2021-06-16 08:33:02

分布式事務(wù)ACID

2022-07-21 06:54:28

微服務(wù)系統(tǒng)RocketMQ

2022-12-19 19:12:17

分布式事務(wù)

2019-10-11 23:27:19

分布式一致性算法開發(fā)

2019-09-05 08:43:34

微服務(wù)分布式一致性數(shù)據(jù)共享

2021-11-22 16:30:30

分布式一致性分布式系統(tǒng)

2015-10-19 10:42:37

分布式一致性應(yīng)用系統(tǒng)

2024-01-31 09:54:51

Redis分布式

2020-05-11 10:30:57

2PC分布式協(xié)議

2024-06-04 10:58:30

2017-09-21 10:59:36

分布式系統(tǒng)線性一致性測試

2024-11-28 10:56:55

2021-07-28 08:39:25

分布式架構(gòu)系統(tǒng)

2022-06-07 12:08:10

Paxos算法

2021-06-03 15:27:31

RaftSOFAJRaft

2025-06-09 08:00:37

分布式文件系統(tǒng)

2021-06-06 12:45:41

分布式CAPBASE

2017-09-22 12:08:01

數(shù)據(jù)庫分布式系統(tǒng)互聯(lián)網(wǎng)

2020-10-28 11:15:24

EPaxos分布式性算法

2023-11-06 09:06:54

分布式一致性數(shù)據(jù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: a级片在线观看 | 国产99视频精品免费视频7 | 亚洲精品www久久久久久广东 | 国产精品女人久久久 | 成人伊人 | 久久69精品久久久久久久电影好 | 久久精品国产99国产精品 | 精品伊人| 日韩一区二区三区在线 | 狠狠久| 毛片链接| 亚洲二区精品 | 99精品国产一区二区三区 | 亚洲午夜精品久久久久久app | 欧美国产日韩在线观看 | 欧美性大战久久久久久久蜜臀 | 国产黄色在线观看 | 国产一区 在线视频 | 国产精品久久久久久中文字 | 国产一级片 | 午夜欧美a级理论片915影院 | 在线国产中文字幕 | 91免费视频观看 | 免费特级黄毛片 | 亚洲一区 中文字幕 | 欧美亚洲国产一区 | 久久伊人一区 | a免费视频 | 国产精品视频一区二区三区四蜜臂 | 久草福利| 91精品一区二区 | 日韩欧美精品 | 午夜免费在线观看 | 欧洲一区二区三区 | 日韩欧美电影在线 | 久久综合一区二区三区 | 福利片在线看 | 亚洲欧美激情精品一区二区 | 国产h视频| 资源首页二三区 | 欧美精品在线视频 |