成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

商家下載中心設計演進之路

開發 前端
在初始階段,系統功能較少,通常只有一兩個簡單的導入導出功能,此時使用流程擴展是最輕量、最靈活的選擇,能夠快速滿足商家的基本需求。

一、背景

在電商平臺上,二八定律尤為明顯,20%的高價值商家往往創造了80%以上的銷售額。而這些商家通常擁有大量的訂單、商品、出價等管理需求,推動了他們對批量操作功能的迫切需求。批量操作能夠幫助這些商家高效地處理商品信息、庫存和訂單管理,顯著提升運營效率。

通過批量操作,商戶可以在短時間內對多個產品進行修改,如統一調價、調整促銷策略等,從而快速響應市場變化,優化用戶體驗。此外,批量操作還降低了人工出錯的風險,確保了數據的一致性,讓商家能夠更加專注于戰略規劃和客戶關系管理。總之,對于這些商戶而言,批量操作不僅是提升管理效率的關鍵工具,也是實現業務增長的重要保障。

在得物的商家后臺中,商家的所有批量操作都承載在批處理系統(批處理中心),商家可以通過在功能頁面操作批量導入或是批量導出來完成批量操作。操作后的文件將展示在下載中心。

此外,批處理中心還維護了交易后臺、客服、匯金、門店等多個域的批量操作任務。截止目前,批處理中心維護了十個域的上千種批量任務,日均處理數萬個相關任務,數億條相關數據。

隨著得物體量的不斷上升,批處理系統也在不斷演進。簡單來說,批處理系統經歷了從分散到耦合、再到集中與隔離的多個發展階段。接下來,我們以批處理的開發者小王的視角,介紹批處理系統的這三種設計,并探討它們各自的特點與適用場景。

二、集中式:流程擴展

假設小王接到了一個批量操作的需求,要求在商家后臺能進行批量出價。需求很簡單,小王僅用時兩天半就完成了基本流程的搭建。

圖片圖片

業務上線后,商家反饋非常好,產品要求立刻上線一個批量修改出價的需求。于是小王照葫蘆畫瓢寫又寫了一條流程。

圖片圖片

兩條幾乎一樣的流程,有代碼潔癖的小王表示無法接受。經過分析后,小王發現,不管是什么導入流程,有些步驟總是固定的,因此決定代碼復用。 

圖片圖片

代碼復用后,出價和修改之間只有格式校驗和業務邏輯不同。其余的文件下載、內容解析、結果保存和上傳均使用相同的節點。既然各個業務之間的差異主要集中在數據處理,小王決定直接將其開成擴展點。不同的業務場景只需要實現各自的數據處理擴展,就能無縫接入批處理流程。業務擴展的示意圖如下:

圖片圖片

在具體實現的時候,小王在代碼里面通過業務身份來進行擴展點的選擇,可以建立一個相關的工廠類進行。

@Component
public class BpcProcessHandlerFactory {


    @Autowired
    private ApplicationContext applicationContext;


    private static ConcurrentHashMap<String, BpcProcessDefine> templateMap = new ConcurrentHashMap<>();


    @PostConstruct
    private void init() {


        Map<String, ImportService> importServiceMap = applicationContext.getBeansOfType(ImportService.class);


        for (ImportService importService : importServiceMap.values()) {
            initImportService(importService);
        }
    }


    private void initImportService(ImportService importService) {
         // ...   
    }


    public BpcProcessHandler getBpcProcessHandler(String templateCode) {
        if (StringUtils.isBlank(templateCode)) {
            return null;
        }


        if(!templateMap.containsKey(templateCode)) {
            return null;
        }
        
        return templateMap.get(templateCode).newProcessHandler();
   }
}

對于導入的任務處理,簡化的代碼流程如下:

@Service
public class BpcProcessService {


    @Autowired
    private BpcProcessHandlerFactory bpcProcessHandlerFactory;


    public String doBpcProcess(BpcProcessReq req) throws BpcProcessException {
        
        // 獲取擴展點 
        BpcProcessHandler bpcProcessHandler = bpcProcessHandlerFactory.getBpcProcessHandler(req.getTaskTemplateCode());
        if (bpcProcessHandler == null) {
            throw new BpcProcessException("找不到模版定義");
        }


        // 1. 創建任務 
        createTask();
        
        // 2. 文件下載 && 文件保存 
        downloadFromOss();
        
        // 3. 數據解析 
        int loopCnt = 0;
        int maxLoopCnt = bpcProcessHandler.getMaxLoopCnt();
        while(loopCnt++ < maxLoopCnt) {
            // 調用擴展點處理 
            bpcProcessHandler.process();
            
            // 更新任務
            updateTaskProcess();
        }


        // 更新任務
        updateTaskStatus();
        
        return taskId;
    }
}

在完成了流程擴展點后,小王心想,這下可算是高枕無憂了。后續有新的導入場景,只需要實現自己的校驗邏輯和處理邏輯即可。

但是好景不長,隨著商家體量的增長,小王發現對接的業務越來越多了;先是出價、再是商品然后是其他逆向、服務費的批量服務,小王一個人實在是寫不過來了,只能讓各個業務的開發到批處理系統開發自己的業務。各個人的編碼習慣不一樣,批處理系統對接的Jar也越來越多,系統已經變成了一個大雜燴。

怎么才能改變這個現狀呢?

三、平臺化:配置注冊

在集中式架構中:所有的業務處理流程是共用的,不同的業務通過實現各自的擴展點來完成各個業務的邏輯。這帶來了一個最明顯的問題,即系統的邊界模糊,業務耦合重。

這個擴展點能不能寫在外部呢?

小王靈光一現:SPI不就可以嗎。Java的SPI機制能幫助我們獲取各個業務的實現,因此批處理系統只需要基于SPI抽象出一套核心的導入/導出流程即可。由于各個業務要能準確找到SPI,還需要加入一定業務配置能力。

圖片圖片

和集中式架構對比,配置化方案的可擴展性更強,但是也不可避免的帶來了一個缺點:開發人員需要去創建配置。

而批處理配置至少需要包含以下內容:

  • Excel格式。
  • 流程調用的SPI信息。
  • 數據對象和Excel字段之間的映射關系。

其中字段的映射關系和SPI等信息的維護成本較高,為了減輕開發人員的工作量,小王還維護了一個IDEA插件。用于一鍵上傳配置。

后端開發人員可以僅通過注解的方法一鍵上報自身的配置,大大減輕了業務的配置上傳的工作量。

同步執行-通用配置處理

在創建完配置后,可以利用dubbo的泛化調用來執行各個SPI的實現:

@Override
public String invoke(ServiceDefinition serviceDefinition, Object inputParam) {
    GenericService genericService = DubboConfig.buildService(serviceDefinition.getInterfaceName(), serviceDefinition.getTimeout());


    //參數list轉換處理,由請求參數key轉換成內部參數
    String[] parameterTypes = new String[] {serviceDefinition.getRequestType().getClassName()};
    Object[] args = new Object[] {inputParam};
    long startTime = System.currentTimeMillis();


    Object result;
    try {
        log.info("invoke service={}#{} with request={}", serviceDefinition.getInterfaceName(), serviceDefinition.getMethod(), JSON.toJSONString(args));
        result = genericService.$invoke(serviceDefinition.getMethod(), parameterTypes, args);
        long endTime = System.currentTimeMillis();
        digestLog(serviceDefinition,  true, endTime - startTime);
        log.info("invoke service={}#{} with result={}", serviceDefinition.getInterfaceName(), serviceDefinition.getMethod(), JSON.toJSONString(result));
    } catch (Exception ex) {
        long endTime = System.currentTimeMillis();
        digestLog(serviceDefinition,  false, endTime - startTime);
        log.info("failed to dubbo invoke:" + serviceDefinition.getInterfaceName() + "#" +serviceDefinition.getMethod() + " with error " + ex.getMessage());
        throw new DependencyException(ErrorCodeEnum.DEFAULT_DEPENDENCY_ERROR.getCode(), ex.getMessage(), ex);
    }


    if (result == null) {
        throw new DependencyException(ErrorCodeEnum.DEFAULT_BIZ_ERROR.getCode(), "the result is null");
    }


    Map resultMap = JSON.parseObject(JSON.toJSONString(result), Map.class);
    processError(resultMap);
    Object data = resultMap.get("data");
    return JSON.toJSONString(data);
}

簡化版的執行流程如下所示:

@Service
public class BpcProcessService {


    @Autowired
    private BpcProcessHandlerFactory bpcProcessHandlerFactory;


    public String doBpcProcess(BpcProcessReq req) throws BpcProcessException {
        
        // 1. 獲取配置 
        TaskTemplate template = getTemplate();
       
        // 2. 創建任務 
        Task task = createTask();
        
        // 3. 文件下載 && 文件保存 
        downloadFromOss();
        
        // 4. 數據解析 
        int loopCnt = 0;
        int maxLoopCnt = template.getMaxLoopCnt();
        while(loopCnt++ < maxLoopCnt) {
            // 調用SPI處理 
            invoke(template, task)
            // 更新任務
            updateTaskProcess();
        }


        // 更新任務
        updateTaskStatus();
        
        return taskId;
    }
}

可以看到配置化后的執行策略和之前流程擴展的執行策略是類似的,主要的變化就是從調用本地擴展點,切換成了調用配置后的SPI。

調度執行-業務針對調整

配置化完成之后,小王松了一口氣,這下系統總算是干凈了。業務的歸業務,流程的歸流程,兩者互不打擾。然而凡事總不順利、沒過多久批處理系統就出了一次冒煙。簡單來說,這次冒煙是由于批處理系統同時處理了大量任務導致的內存溢出。

針對這次冒煙,小王仔細分析系統數據后發現,商家下載中心的業務有著自己的業務特點:

  1. 不同任務之間的數量差異巨大(如,運營任務和商家任務的差距);
  2. 商家操作的流量時間上分布不均,大部分商家操作集中在剛上班(10點左右)和快下班(17點左右);
  3. 任務流量在商家上分布不均,重點商家會創建大量任務。

以下是小王分析的部分數據來源圖:

  • 任務流量分布不均,下面是各個任務類型的執行統計,其中不同顏色代表不同類型的任務。

圖片圖片

  • 時間流量分布不均,下面是導入導出任務流量的時間分布

圖片圖片

  • 商家流量分布不均

圖片圖片

這些特點在批處理系統中表現為:

  1. 系統穩定性風險高,出現過一次線上冒煙。因為系統資源是有限的,高峰期的大流量任務可能會占用過多系統內存,導致OOM。
  2. 商家體驗得不到保證,運營操作可能會導致商家長時間等待。

不就是資源導致的風險嗎,小王覺得這是小case 了,加個限流就搞定了,然后就對創建任務加上了限流。結果上線后情況不僅沒有好轉,還因為限流“誤殺”了好多比較重要的導入任務,經過分析后小王終于找到了原因。在商家下載中心的業務中,限流并不能滿足資源保護訴求。這實際上是由系統本身的內部架構決定的、因為批處理在大部分情況下是一個低CPU高內存占用的系統。如果對任務的提交進行限流,一方面容易誤傷核心的訂單/出價任務,另一個方面忽略了高耗時任務的影響。如下圖所示: 

圖片圖片

  • 運營任務"恰好"占用了流控的窗口,導致后續提交的商家任務都被限流。
  • 長耗時任務會跨越多個時間窗口,導致限流不生效。

不能限流,那只能自己來了。只要把一切都拿到手里,任務啥時候執行不就是自己說了算了嘛。于是小王打算轉變身份,從被動式執行到主動式調度。換言之,就是從同步流程切換成異步調度流程,由系統自己來解決資源的分配,并對業務進行隔離。小王很快畫好了自己的核心流程。

圖片圖片

流程很簡單,創建任務的時候不再直接執行,而是等待系統調度后執行。然而小王在調度和隔離這里又犯了難,這倆該怎么做呢?

業務隔離

隔離主要分為兩大類,物理隔離和邏輯隔離。

物理隔離:不同的機器執行不同業務的調度。

  • 集群隔離:類似于應用發布時的藍綠集群,我們可以把集群分為核心集群+非核心集群,用核心集群來保障商家訂單,出價等相關動作的穩定性,用非核心集群來保障其他鏈路;
  • 機器隔離:機器隔離相較于集群隔離,其粒度更小。通過指定IP來控制不同業務之間的調度;

邏輯隔離:通過使用不同線程池的方式來完成業務的隔離。

凡事先易后難,小王決定先采用簡單的方式來對業務進行隔離,線程池的方式已經能分離開可能造成資損的任務和不會造成資損的任務了。在調度方面,小王列舉了業內常見的帶優先級的調度方法

任務調度

1.優先隊列:利用線程池的等待隊列來完成優先級的調度。 

圖片圖片

優點:代碼簡單,易維護,只需要維護一個優先級隊列即可。

缺點:需要額外增加一個狀態來代表等待調度,有饑餓問題,存在一定穩定性風險,因為對線程池的等待隊列缺少管控手段。

2.老化策略:利用老化策略,動態提升任務優先級。

圖片圖片

優點:較大程度上避免饑餓問題,優先級的可擴展性高,對任務的管控能力強,狀態機侵入少。

缺點:需要考慮并發問題,代碼較復雜。

3.多級隊列:利用多級隊列來完成任務優先級。

圖片圖片

優點:較大程度上避免饑餓問題,代碼較為簡潔,任務管控能力強,狀態機改動少。

缺點:任務優先級可擴展性較差,如果新增一個優先級需要改動調度代碼,沒有高優任務時,系統吞吐性較差。

綜合以上各種方案后,小王最終采用了多級隊列 + 線程隔離的方式來進行任務的調度。在調度的具體實現上,采用定時任務來進行流程的觸發。

此外,為了支持大任務量場景臨時增加系統吞吐,小王還增加了分片的能力,通過接受分片參數,每臺機器只取自己的分片。簡化版本的代碼如下:

@Service
@Slf4j
public class TaskScheduleServiceImpl implements TaskScheduleService {


    @Override
    @LogAnnotation
    public void schedule(int shared, int all) {


        StopWatch stopWatch = new StopWatch();
        stopWatch.start();


        // 丟線程池執行
        List<Long> highTaskIds = taskInstanceRepository.queryUnstartedTaskIdByPriority(TaskPriorityEnum.HIGH, all * arkConfig.highSize);
        highTaskIds = highTaskIds.stream().filter((id) -> id % all == shared).collect(Collectors.toList());
        log.info("優先級調度任務,待執行高優任務 Ids = {}", highTaskIds);
        process(highTaskIds, (id) -> taskThreadPool.executeHigh(() -> process(id)));


        // 丟線程池執行
        List<Long> mediumTaskIds = taskInstanceRepository.queryUnstartedTaskIdByPriority(TaskPriorityEnum.MEDIUM, all * arkConfig.mediumSize);
        mediumTaskIds = mediumTaskIds.stream().filter((id) -> id % all == shared).collect(Collectors.toList());
        log.info("優先級調度任務,待執行中優任務 Ids = {}", mediumTaskIds);
        process(mediumTaskIds, (id) -> taskThreadPool.executeMedium(() -> process(id)));


        // 丟線程池執行
        List<Long> lowTaskIds = taskInstanceRepository.queryUnstartedTaskIdByPriority(TaskPriorityEnum.LOW, all * arkConfig.lowSize);
        lowTaskIds = lowTaskIds.stream().filter((id) -> id % all == shared).collect(Collectors.toList());
        log.info("優先級調度任務,待執行低優任務 Ids = {}", lowTaskIds);
        process(lowTaskIds, (id) -> taskThreadPool.executeLow(() -> process(id)));


        log.info("優先級調度任務,執行完畢, cost = {}", stopWatch.getTime());
    }


    private void process(List<Long> idList, Consumer<Long> consumer) {
        if (CollectionUtils.isEmpty(idList)) {
            return;
        }


        for (Long id : idList) {
            consumer.accept(id);
        }
    }


    private void process(Long id) {
        // 任務處理邏輯。。。
    }
}

干完了這些事后,小王突然想起來,測試環境還需要走染色呢。

于是又在調度上增加了染色環境的路由。

這回總算是徹底解決了系統的穩定性問題了,以后系統存在吞吐風險時,只需要動態調整召回數量就好了。

四、本地化:任務上報

作為上面的一切后,小王打開了APM的監控,發現系統的內存占用還是很高。明明復雜的業務流程都放到外面了,為啥性能還是一般呢?

小王思考了現在批處理存在的缺點:

  1. 配置維護成本高、業務需要上報SPI信息和映射關系,且配置完成后更改風險高。
  2. 全局資源利用率低,一份業務數據在多個系統都需要占用內存。
  3. 調度的隔離是基于線程池or物理機器,粒度較粗,無法完全避免業務之間的互相干擾。

此外,還有一個令他最難受的問題,就是業務咨詢很多。很多業務雖然對接了他的系統,但是在執行失敗時他們經常找不到錯誤原因,需要小王配合排查。如何解決這幾個問題呢?

小王決定返璞歸真,回歸本源。批處理中心是為了解決商家批量導入導出的問題而生的,其產生的主要目的在于幫助業務平臺減少文件解析、文件生成、文件上傳、頁面展示的成本。

這些問題一定需要一個系統來支持嗎?文件解析和生成實際上是用EasyExcel的SDK完成的,文件上傳是用Oss的SDK完成的,還有一個頁面展示的功能是一個非常輕量的邏輯。換言之,完全可以在業務系統把前幾件事都做了。構建一個批處理插件來完成批處理中心的大部分能力,批處理系統僅作為展示使用。小王產生了一個新的想法:把邏輯放到批處理SDK中去,批處理僅維護一兩臺機器用于承載展示邏輯即可。

整體的架構設計如下圖所示:

圖片圖片

在本地化的思路下:批處理中心類似于一個中心節點,各個業務系統作為其的葉子節點,只需要定時上報任務相關情況即可。批處理系統只負責頁面的展示,和業務完全解耦。

本地化帶來了以下幾個明顯的好處:

  1. 效率高,不再需要跨系統之間的邏輯調用,既能節約系統資源,又能減少網絡傳輸時間。
  2. 維護成本低,業務方可隨時調整業務映射,批處理只需要維護極小的配置(模版和對應的展示名稱、展示地方)。
  3. 迭代升級容易,平臺化的改造由于影響面比較大,風險高。而SDK的升級是單應用升級的,因此影響小,風險可控。
  4. 流程擴展相對簡單,SDK可以提供相對較多的鉤子函數。

當然,凡事沒有銀彈。本地化也不可避免帶來了一些缺點:

  1. 業務需要維護部分配置,這其中主要是一些oss相關的配置。

本地化后,批處理中心不需要維護業務邏輯、也不需要任務調度、任務的隔離粒度最細。小王總算是能安心睡個好覺了。

五、總結

上面我們以一個批處理系統普通開發者的視角,回歸了商家批處理系統發展的三個階段(本地化正在進行中)。這三個階段,體現了從厚到薄、從業務耦合到業務隔離的演進過程。從本地到平臺再到本地,頗有種天下大勢,分久必合合久必分的感覺。這三種方式并沒有絕對的優劣之分,而是隨著業務需求的變化而逐步演化的。

在初始階段,系統功能較少,通常只有一兩個簡單的導入導出功能,此時使用流程擴展是最輕量、最靈活的選擇,能夠快速滿足商家的基本需求。隨著業務量的增長,系統之間的隔離性變得越來越重要,這時引入配置注冊成為必要措施,以確保不同模塊之間的自主性和穩定性。

進一步發展后,平臺化改造的初步實現通常采用同步調用方式,但隨之而來的穩定性要求推動了異步調度的引入。然而,到了后期,即使是異步調度也可能面臨系統吞吐量不足的問題。因此,業務系統本地執行狀態上報的模式逐漸成為更優的選擇,能夠有效提升系統的響應速度和處理能力。

隨著業務的不斷發展,商家的批處理系統必然會進行更新與迭代,以適應新的需求和挑戰。系統設計沒有銀彈,所有設計的迭代實際上就是開發人員遇見問題、解決問題的能力體現。

責任編輯:武曉燕 來源: 得物技術
相關推薦

2014-01-15 09:09:56

2015-08-17 13:49:59

數據中心二層封裝技術

2023-07-02 11:14:21

工具TypeScript框架

2018-05-02 11:16:27

數據中心

2018-03-27 10:06:26

對象存儲演進

2009-08-05 16:14:32

CDMA網絡的演進無線網絡發展

2024-03-29 13:25:12

互動玩法直播

2023-01-03 17:43:39

網易郵箱數倉

2024-07-17 11:40:58

2022-08-08 13:24:28

整潔架構架構前端

2015-07-17 08:23:06

品高云計算

2016-03-15 16:24:47

集群調度框架演進

2012-11-19 11:36:16

PTNLTE網絡承載

2024-08-14 08:11:41

2023-05-18 22:44:09

2023-02-01 10:11:06

轉轉容器日志

2023-10-10 15:32:09

veImageXWeb 圖片

2016-08-16 17:44:19

華為

2023-11-01 18:06:46

彩虹橋架構性能

2023-01-13 14:35:00

攜程實踐
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 在线a视频网站 | 亚洲电影一区 | 久久艹免费视频 | 欧美一级高潮片免费的 | 久久人人爽人人爽人人片av免费 | 国产精品一区二区三区99 | 色婷婷久久久久swag精品 | 欧美福利| 人人干人人舔 | 自拍视频在线观看 | 中文日韩在线 | 伊人久久精品一区二区三区 | 一级全黄少妇性色生活免费看 | 99热在线播放 | 伊人热久久 | 精品自拍视频在线观看 | 国产一区二区在线播放 | 国产精品18久久久久久久 | 成人小视频在线观看 | 国产在线播 | 久久成人免费视频 | 国产精品区一区二 | 久久国产精品一区二区三区 | 亚洲高清视频在线观看 | 在线观看精品视频网站 | 国产探花在线精品一区二区 | 一级片免费视频 | 欧美一区二区免费视频 | 国产精品久久久久久久午夜片 | 亚洲天堂一区 | 国产精品99久久久久久久vr | 黄色国产区 | 99精品久久99久久久久 | 国产精品资源在线观看 | 亚洲欧美一区二区三区国产精品 | 99精品久久 | 亚洲乱码国产乱码精品精的特点 | 天天夜天天操 | 男女羞羞免费网站 | 欧美日韩一区二区电影 | www.日本国产 |