成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

DDD 架構,MQ 應該放那一層使用?

開發(fā) 前端
因為我們本章所講解的內(nèi)容是把 RocketMQ 放入 DDD 架構中進行使用,那么也就引申出領域事件定義。所以我們先來了解下,什么是領域事件。

本文的宗旨在于通過簡單干凈實踐的方式教會讀者,使用 Docker 配置 RocketMQ 并在基于 DDD 分層結構的 SpringBoot 工程中使用 RocketMQ 技術。因為大部分 MQ 的發(fā)送都是基于特定業(yè)務場景的,所以本章節(jié)也是基于 《MyBatis 使用教程和插件開發(fā)》 章節(jié)的擴展。

本章也會包括關于 MQ 消息的發(fā)送和接收應該處于 DDD 的哪一層的實踐講解和使用。

本文涉及的工程:

  • xfg-dev-tech-rocketmq:https://gitcode.net/KnowledgePlanet/road-map/xfg-dev-tech-rocketmq
  • RocketMQ Docker 安裝:rocketmq-docker-compose-mac-amd-arm.yml
  • 導入測試庫表 road-map.sql

一、案例背景

首先我們要知道,MQ 消息的作用是用于;解耦過長的業(yè)務流程和應對流量沖擊的消峰。如;用戶下單支付完成后,拿到支付消息推動后續(xù)的發(fā)貨流程。也可以是我們基于 《MyBatis 使用教程和插件開發(fā)》 中的案例場景,給雇員提升級別和薪資的時候,也發(fā)送一條MQ消息,用于發(fā)送郵件通知給用戶。

圖片圖片

  • 從薪資調(diào)整到郵件發(fā)送,這里是2個業(yè)務流程,通過 MQ 消息的方式進行連接。
  • 其實MQ消息的使用場景特別多,原來你可能使用多線程的一些操作,現(xiàn)在就擴展為多實例的操作了。發(fā)送 MQ 消息出來,讓應用的各個實例接收并進行消費。

二、領域事件

因為我們本章所講解的內(nèi)容是把 RocketMQ 放入 DDD 架構中進行使用,那么也就引申出領域事件定義。所以我們先來了解下,什么是領域事件。

領域事件,可以說是解耦微服務設計的關鍵。領域事件也是領域模型中非常重要的一部分內(nèi)容,用于標示當前領域模型中發(fā)生的事件行為。一個領域事件會推進業(yè)務流程的進一步操作,在實現(xiàn)業(yè)務解耦的同時,也推動了整個業(yè)務的閉環(huán)。

圖片圖片

  • 首先,我們需要在領域模型層,添加一塊 event 區(qū)域。它的存在是為了定義出于當前領域下所需的事件消息信息。信息的類型可以是model 下的實體對象、聚合對象。
  • 之后,消息的發(fā)送是放在基礎設置層。本身基礎設置層就是依賴倒置于模型層,所以在模型層所定義的 event 對象,可以很方便的在基礎設置層使用。而且大部分開發(fā)的時候,MQ消息的發(fā)送與數(shù)據(jù)庫操作都是關聯(lián)的,采用的方式是,做完數(shù)據(jù)落庫后,推送MQ消息。所以定義在倉儲中實現(xiàn),會更加得心應手、水到渠成。
  • 最后,就是 MQ 的消息,MQ 的消費可以是自身服務所發(fā)出的消息,也可以是外部其他微服務的消息。就在小傅哥所整體講述的這套簡明教程中 DDD 部分的觸發(fā)器層。

三、環(huán)境安裝

本案例涉及了數(shù)據(jù)庫和RocketMQ的使用,都已經(jīng)在工程中提供了安裝腳本,可以按需執(zhí)行。

圖片圖片

這里主要介紹 RocketMQ 的安裝;

1. 執(zhí)行 compose yml

文件:docs/rocketmq/rocketmq-docker-compose-mac-amd-arm.yml - 關于安裝小傅哥提供了不同的鏡像,包括Mac、Mac M1、Windows 可以按需選擇使用。

version: '3'
services:
  # https://hub.docker.com/r/xuchengen/rocketmq
  # 注意修改項;
  # 01:data/rocketmq/conf/broker.conf 添加 brokerIP1=127.0.0.1
  # 02:data/console/config/application.properties server.port=9009 - 如果8080端口被占用,可以修改或者添加映射端口
  rocketmq:
    image: livinphp/rocketmq:5.1.0
    container_name: rocketmq
    ports:
      - 9009:9009
      - 9876:9876
      - 10909:10909
      - 10911:10911
      - 10912:10912
    volumes:
      - ./data:/home/app/data
    environment:
      TZ: "Asia/Shanghai"
      NAMESRV_ADDR: "rocketmq:9876"
  • 在 IDEA 中打開 rocketmq-docker-compose-mac-amd-arm.yml 你會看到一個綠色的按鈕在左側(cè)側(cè)邊欄,點擊即可安裝。或者你也可以使用命令安裝:# /usr/local/bin/docker-compose -f /docs/dev-ops/environment/environment-docker-compose.yml up -d - 比較適合在云服務器上執(zhí)行。
  • 首次安裝可能使用不了,一個原因是 brokerIP1 未配置IP,另外一個是默認的 8080 端口占用。可以按照如下小傅哥說的方式修改。

2. 修改默認配合

  1. 打開 data/rocketmq/conf/broker.conf 添加一條 brokerIP1=127.0.0.1 在結尾
# 集群名稱
brokerClusterName = DefaultCluster
# BROKER 名稱
brokerName = broker-a
# 0 表示 Master, > 0 表示 Slave
brokerId = 0
# 刪除文件時間點,默認凌晨 4 點
deleteWhen = 04
# 文件保留時間,默認 48 小時
fileReservedTime = 48
# BROKER 角色 ASYNC_MASTER為異步主節(jié)點,SYNC_MASTER為同步主節(jié)點,SLAVE為從節(jié)點
brokerRole = ASYNC_MASTER
# 刷新數(shù)據(jù)到磁盤的方式,ASYNC_FLUSH 刷新
flushDiskType = ASYNC_FLUSH
# 存儲路徑
storePathRootDir = /home/app/data/rocketmq/store
# IP地址
brokerIP1 = 127.0.0.1
  1. 打開 ``data/console/config/application.properties修改server.port=9009` 端口。
server.address=0.0.0.0
server.port=9009
  • 修改配置后,重啟服務。

3. RockMQ登錄與配置

3.1 登錄

RocketMQ 此鏡像,會在安裝后在控制臺打印登錄賬號信息,你可以查看使用。

圖片圖片

圖片圖片

登錄:http://localhost:9009/

3.2 創(chuàng)建Topic

圖片圖片

  • 也可以使用命令創(chuàng)建:docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t xfg-mq

3.3 創(chuàng)建消費者組

圖片圖片

  • 也可以使用命令創(chuàng)建:docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g xfg-group

四、工程實現(xiàn)

1. 工程結構

圖片圖片

  • MQ 的使用無論是 RocketMQ 還是 Kafka 等,都很簡單。但在使用之前,要考慮好怎么在架構中合理的使用。如果最初沒有定義好這些,那么胡亂的任何地方都能發(fā)送和接收MQ,最后的工程將非常難以維護。
  • 所以這里整個MQ的生產(chǎn)和消費,是按照整個 DDD 領域事件結構進行設計。分為在 domain 使用基礎層生產(chǎn)消息,再有 trigger 層接收消息。

2. 配置文件

引入POM

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client-java -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.4</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

添加配置

# RocketMQ 配置
rocketmq:
  name-server: 127.0.0.1:9876
  consumer:
    group: xfg-group
    # 一次拉取消息最大值,注意是拉取消息的最大值而非消費最大值
    pull-batch-size: 10
  producer:
    # 發(fā)送同一類消息的設置為同一個group,保證唯一
    group: xfg-group
    # 發(fā)送消息超時時間,默認3000
    sendMessageTimeout: 10000
    # 發(fā)送消息失敗重試次數(shù),默認2
    retryTimesWhenSendFailed: 2
    # 異步消息重試此處,默認2
    retryTimesWhenSendAsyncFailed: 2
    # 消息最大長度,默認1024 * 1024 * 4(默認4M)
    maxMessageSize: 4096
    # 壓縮消息閾值,默認4k(1024 * 4)
    compressMessageBodyThreshold: 4096
    # 是否在內(nèi)部發(fā)送失敗時重試另一個broker,默認false
    retryNextServer: false

3. 定義領域事件

源碼:cn.bugstack.xfg.dev.tech.domain.salary.event.SalaryAdjustEvent

圖片圖片

@EqualsAndHashCode(callSuper = true)
@Data
public class SalaryAdjustEvent extends BaseEvent<AdjustSalaryApplyOrderAggregate> {

    public static String TOPIC = "xfg-mq";

    public static SalaryAdjustEvent create(AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate) {
        SalaryAdjustEvent event = new SalaryAdjustEvent();
        event.setId(RandomStringUtils.randomNumeric(11));
        event.setTimestamp(new Date());
        event.setData(adjustSalaryApplyOrderAggregate);
        return event;
    }

}
  • 每個領域的消息,都有領域自己定義。發(fā)送的時候再交給基礎設施層來發(fā)送。

4. 消息發(fā)送

源碼:cn.bugstack.xfg.dev.tech.infrastructure.event.EventPublisher

圖片圖片

@Component
@Slf4j
public class EventPublisher {

    @Setter(onMethod_ = @Autowired)
    private RocketMQTemplate rocketmqTemplate;

    /**
     * 普通消息
     *
     * @param topic   主題
     * @param message 消息
     */
    public void publish(String topic, BaseEvent<?> message) {
        try {
            String mqMessage = JSON.toJSONString(message);
            log.info("發(fā)送MQ消息 topic:{} message:{}", topic, mqMessage);
            rocketmqTemplate.convertAndSend(topic, mqMessage);
        } catch (Exception e) {
            log.error("發(fā)送MQ消息失敗 topic:{} message:{}", topic, JSON.toJSONString(message), e);
            // 大部分MQ發(fā)送失敗后,會需要任務補償
        }
    }

    /**
     * 延遲消息
     *
     * @param topic          主題
     * @param message        消息
     * @param delayTimeLevel 延遲時長
     */
    public void publishDelivery(String topic, BaseEvent<?> message, int delayTimeLevel) {
        try {
            String mqMessage = JSON.toJSONString(message);
            log.info("發(fā)送MQ延遲消息 topic:{} message:{}", topic, mqMessage);
            rocketmqTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), 1000, delayTimeLevel);
        } catch (Exception e) {
            log.error("發(fā)送MQ延遲消息失敗 topic:{} message:{}", topic, JSON.toJSONString(message), e);
            // 大部分MQ發(fā)送失敗后,會需要任務補償
        }
    }

}
  • 在基礎設施層提供 event 事件的處理,也就是 MQ 消息的發(fā)送。

源碼:cn.bugstack.xfg.dev.tech.infrastructure.repository.SalaryAdjustRepository

@Resource
private EventPublisher eventPublisher;
    
@Override
@Transactional(rollbackFor = Exception.class, timeout = 350, propagation = Propagation.REQUIRED, isolation = Isolation.DEFAULT)
public String adjustSalary(AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate) {
   
  // ... 省略部分代碼 

    eventPublisher.publish(SalaryAdjustEvent.TOPIC, SalaryAdjustEvent.create(adjustSalaryApplyOrderAggregate));
    return orderId;
}

在 SalaryAdjustRepository 倉儲的實現(xiàn)中,做完業(yè)務流程開始發(fā)送 MQ 消息。這里有2點要注意;

  1. 消息發(fā)送,不要寫在數(shù)據(jù)庫事務中。因為事務一直占用數(shù)據(jù)庫連接,需要快速釋放。
  2. 對于一些強MQ要求的場景,需要在發(fā)送MQ前,寫入一條數(shù)據(jù)庫 Task 記錄,發(fā)送消息后更新 Task 狀態(tài)為成功。如果長時間未更新數(shù)據(jù)庫狀態(tài)或者為失敗的,則需要由任務補償進行處理。

5. 消費消息

源碼:cn.bugstack.xfg.dev.tech.trigger.mq.SalaryAdjustMQListener

圖片圖片

@Component
@Slf4j
@RocketMQMessageListener(topic = "xfg-mq", consumerGroup = "xfg-group")
public class SalaryAdjustMQListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        log.info("接收到MQ消息 {}", s);
    }

}
  • 消費消息,配置消費者組合消費的主題,之后就可以接收到消息了。接收以后你可以做自己的業(yè)務,如果拋出異常,消息會進行重新接收處理。

六、測試驗證

1. 單獨發(fā)送消息測試

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class RocketMQTest {

    @Setter(onMethod_ = @Autowired)
    private RocketMQTemplate rocketmqTemplate;

    @Test
    public void test() throws InterruptedException {
        while (true) {
            rocketmqTemplate.convertAndSend("xfg-mq", "我是測試消息");
            Thread.sleep(3000);
        }
    }

}
  • 這里方便你來發(fā)送消息,驗證流程。

2. 業(yè)務流程消息驗證

@Test
public void test_execSalaryAdjust() throws InterruptedException {
    AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate = AdjustSalaryApplyOrderAggregate.builder()
            .employeeNumber("10000001")
            .orderId("100908977676003")
            .employeeEntity(EmployeeEntity.builder().employeeLevel(EmployeePostVO.T3).employeeTitle(EmployeePostVO.T3).build())
            .employeeSalaryAdjustEntity(EmployeeSalaryAdjustEntity.builder()
                    .adjustTotalAmount(new BigDecimal(100))
                    .adjustBaseAmount(new BigDecimal(80))
                    .adjustMeritAmount(new BigDecimal(20)).build())
            .build();
    String orderId = salaryAdjustApplyService.execSalaryAdjust(adjustSalaryApplyOrderAggregate);
    log.info("調(diào)薪測試 req: {} res: {}", JSON.toJSONString(adjustSalaryApplyOrderAggregate), orderId);
    Thread.sleep(Integer.MAX_VALUE);
}
23-07-29.15:40:52.307 [main            ] INFO  HikariDataSource       - HikariPool-1 - Start completed.
23-07-29.15:40:52.445 [main            ] INFO  EventPublisher         - 發(fā)送MQ消息 topic:xfg-mq message:{"data":{"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"},"id":"98117654515","timestamp":"2023-07-29 15:40:52.425"}
23-07-29.15:40:52.517 [main            ] INFO  ISalaryAdjustApplyServiceTest - 調(diào)薪測試 req: {"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"} res: 100908977676004
23-07-29.15:40:52.520 [ConsumeMessageThread_1] INFO  SalaryAdjustMQListener - 接收到MQ消息 {"data":{"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"},"id":"98117654515","timestamp":"2023-07-29 15:40:52.425"}
  • 當執(zhí)行一次加薪調(diào)整后,就會接收到MQ消息了。
責任編輯:武曉燕 來源: bugstack蟲洞棧
相關推薦

2023-11-24 07:16:10

DDD微服務

2020-09-07 06:38:54

HA高可用協(xié)議

2024-05-21 09:26:54

微服務DDD建模架構

2019-01-18 16:39:08

系統(tǒng)層中間件層應用層

2022-01-10 13:01:32

指針Struct內(nèi)存

2021-10-29 21:26:39

前端引擎層類型

2025-01-15 08:46:55

2025-02-05 09:46:13

OracleDBA投資

2023-02-15 13:50:58

DDD戰(zhàn)略設計

2022-01-11 20:43:16

TCPIP模型

2021-10-26 16:20:34

比特幣區(qū)塊鏈加密貨幣

2025-01-16 10:38:31

2009-06-10 09:58:14

程序員職場層次

2021-03-18 13:20:52

Linux MintLinuxLinux發(fā)行版

2011-04-19 13:53:41

三層架構

2024-04-11 10:01:29

2010-11-10 10:39:19

2024-06-20 13:22:13

C++11C++模板

2023-08-06 23:31:36

架構系統(tǒng)RPC

2019-08-26 14:53:32

數(shù)據(jù)中心運維管理宕機
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 热久久久久 | 99视频在线看 | 国产久| 成人片在线看 | 久久不卡日韩美女 | 尤物在线视频 | 99免费精品视频 | 免费看的黄网站 | 成人午夜免费在线视频 | 欧美性生活视频 | 综合久久综合久久 | 欧美精品中文字幕久久二区 | 99热最新网址 | 国产精品成人国产乱 | 久久精品色欧美aⅴ一区二区 | 久操av在线| 精品国产黄a∨片高清在线 成人区精品一区二区婷婷 日本一区二区视频 | 欧美一级免费看 | 精品国产一区一区二区三亚瑟 | 亚洲精品一区二三区不卡 | 国产精品国产三级国产aⅴ入口 | 国产一区91精品张津瑜 | 一区二区三区四区不卡视频 | 午夜视频一区二区 | wwwxxx日本在线观看 | 2019精品手机国产品在线 | 久久成人高清视频 | 精精国产xxxx视频在线播放 | 欧美99久久精品乱码影视 | 国产美女在线播放 | 亚洲成人一区 | 男人久久天堂 | 欧美不卡一区二区三区 | 韩日一区二区 | 国产精品久久久久久婷婷天堂 | 欧美综合一区二区三区 | 国产亚洲一区二区三区 | 国产精品日韩在线 | 精品乱人伦一区二区三区 | 国产精品久久久亚洲 | 国产精品伦理一区二区三区 |