成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

得物自建DTS平臺的技術(shù)演進(jìn)

開發(fā) 架構(gòu)
將現(xiàn)有的DTS能力都遷移到Flink平臺上,保持統(tǒng)一的技術(shù)棧,能夠極大的降低維護(hù)成本。現(xiàn)有遺留的雙向同步、數(shù)據(jù)比對等能力需要做進(jìn)一步的改造和遷移,符合整體技術(shù)收斂的趨勢。

前言

DTS是數(shù)據(jù)傳輸平臺(Data Transfer Platform的縮寫)

隨著得物App的用戶流量增長,業(yè)務(wù)選擇的數(shù)據(jù)庫越來越多樣化,異構(gòu)數(shù)據(jù)源之間的數(shù)據(jù)同步需求也逐漸增多。為了控制成本并更好地支持業(yè)務(wù)發(fā)展,我們決定自建DTS平臺。本文主要從技術(shù)選型、能力支持與演化的角度出發(fā),分享了在DTS平臺升級過程中獲得的經(jīng)驗,并提供一些參考。

1技術(shù)選型

DTS的主要目標(biāo)是支持不同類型的數(shù)據(jù)源之間的數(shù)據(jù)交互,包括關(guān)系型數(shù)據(jù)庫(RDBMS)、NoSQL數(shù)據(jù)庫、OLAP等,同時整合了數(shù)據(jù)庫配置管理、數(shù)據(jù)訂閱、數(shù)據(jù)同步、數(shù)據(jù)遷移、DRC雙活數(shù)據(jù)同步支持、數(shù)據(jù)巡檢、監(jiān)控報警、統(tǒng)一權(quán)限等多個模塊,以構(gòu)建安全、可擴(kuò)展、高可用的數(shù)據(jù)架構(gòu)平臺。

1.1 能力對比

圖片圖片

1.2 DTS 1.0 - 以 canal/otter/datax 作為執(zhí)行引擎

圖片圖片

1.3 為什么要切換到Flink?

為了支持多種讀端數(shù)據(jù)源和寫端數(shù)據(jù)源,需要一個統(tǒng)一數(shù)據(jù)處理框架,以減少重復(fù)組件和提高開發(fā)效率。同時數(shù)據(jù)源類型和組件的維護(hù)難度與復(fù)雜度呈線性增長,現(xiàn)有的組件需要統(tǒng)一維護(hù)到一個項目中。

Canal和Otter等組件的社區(qū)活躍度低,很長時間沒有得到維護(hù)更新。因此,需要選擇一個新的、活躍的框架。此外,現(xiàn)有組件也無法有效支持全量+增量一體化的操作。

因此,使用一個統(tǒng)一的數(shù)據(jù)處理框架,能夠同時支持多種讀端數(shù)據(jù)源和寫端數(shù)據(jù)源,以及全量+增量一體化的功能,是必要的。這樣能夠降低組件的維護(hù)難度和復(fù)雜度,提高開發(fā)效率。

通過DTS 2.0,我們希望將canal/otter/datax演化為一個任務(wù)執(zhí)行框架+管理平臺,能夠為后續(xù)大量數(shù)據(jù)源迭代提速。

1.4 DTS 2.0 以Flink作為執(zhí)行引擎

現(xiàn)有的開發(fā)流程:

  • 統(tǒng)一的任務(wù)執(zhí)行框架,集成flink并引入connectors根據(jù)配置組裝出具體的DTS任務(wù)
  • 維護(hù)并研發(fā)新的 connector

當(dāng)我們需要支持新的數(shù)據(jù)源, 首先將數(shù)據(jù)源相關(guān)插件維護(hù)在connector中,接著在執(zhí)行框架中引入需要的組件,其中存在大量的可復(fù)用的功能,這樣就做到了connector及功能組件復(fù)用的效果。

2DTS 現(xiàn)有能力

圖片圖片

3我們做了什么?

3.1 DTS Connectors框架 - 數(shù)據(jù)源支持提速

在Flink CDC基礎(chǔ)上實現(xiàn)的全量/增量任務(wù)同步框架,基本的架構(gòu)如下

圖片圖片

其中Connector中分別實現(xiàn)了Flink提供的SourceFunction和SinkFunction函數(shù),分別負(fù)責(zé)從讀端讀取數(shù)據(jù),往寫端寫入數(shù)據(jù),因此一個Connector可同時存在于上游或者下游。

任務(wù)的啟動流程:
  • 指定任務(wù)Json配置, 根據(jù)類型加載SourceFunction和SinkFunction構(gòu)建通用能力函數(shù)并啟動

a. 任務(wù)的Main函數(shù)如下所示, 根據(jù)如下的Json文件加載到對應(yīng)的Connector中的SourceFactory或者SinkFactory來構(gòu)造對應(yīng)的DataStream。

DataStream是Flink中提供的數(shù)據(jù)流操作類

public class Main {
    public static void main(String[] args) throws Exception {


        // 解析參數(shù)
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String[] parsedArgs = parseArgs(parameterTool);


        Options options = new OptionParser(parsedArgs).getOptions();
        options.setJobName(options.getJobName());


        // 執(zhí)行任務(wù)
        StreamExecutionEnvironment environment =
                EnvFactory.createStreamExecutionEnvironment(options);
        exeJob(environment, options);
    }

任務(wù)Json配置:

{
  "job":{
    "content":{
      "reader":{
        "name":"binlogreader",
        "parameter":{
          "accessKey":"",
          "binlogOssApiUrl":"",
          "delayBetweenRestartAttempts":2000,
          "fetchSize":1,
          "instanceId":"",
          "rdsPlatform":"",
          "restartAttempts":5,
          "secretKey":"",
          "serverTimezone":"",
          "splitSize":1024,
          "startupMode":"LATEST_OFFSET"
        }
      },
      "writer":{
        "name":"jdbcwriter",
        "parameter":{
          "batchSize":10000,
          "concurrentWrite":true,
          ],
          "dryRun":false,
          "dumpCommitData":false,
          "errorRecord":0,
          "flushIntervalMills":30000,
          "poolSize":10,
          "retries":3,
          "smallBatchSize":200
        }
      }
    },


  }
}

b. 我們提供了兩個抽象工廠類,SourceFactory, SinkFactory, 其中的createSource, createSink便是子工廠需要實現(xiàn)的方法,不同的數(shù)據(jù)源實現(xiàn)不同。

public abstract class SourceFactory<T> {
    public abstract DataStream<T> createSource();
}
public abstract class SinkFactory<T> {
    public abstract void createSink(DataStream<T> rowData) throws Exception;
}

c. 接下來,我們只需要實現(xiàn)對應(yīng)的子工廠方法就可以了

public class BinlogSourceFactory extends AbstractJdbcSourceFactory {
    @Override
    public DataStream<TableRowData> createSource() {


        List<String> tables = this.binlogSourceConf.getConnection().getTable();
        Set<String> databaseList = new HashSet<>(2);


        // 使用對應(yīng)的Connector構(gòu)建DataStream
    }
}

d. 通用能力函數(shù):RateLimitFunction, BinlogPositionFunction 其中分別實現(xiàn)了對應(yīng)的任務(wù)能力,例如限流,任務(wù)位點保存等。

public class RateLimiterMapFunction<T> extends RichMapFunction<T, T> {




    private transient FlinkConnectorRateLimiter rateLimiter;




    @Override
    public T map(T value) throws Exception {
        if (rateLimiterEnabled) {
            rateLimiter.acquire(1);
        }
        return value;
    }

當(dāng)任務(wù)所需的函數(shù)都創(chuàng)建完成后,任務(wù)就真正開始運行了。

收益:

使用一套封裝完善且易擴(kuò)展的框架能夠提高開發(fā)效率并降低后續(xù)代碼的維護(hù)成本。相比于DTS1.0、Canal和Otter等項目,該項目的維護(hù)成本大大降低,同時提供了更好的擴(kuò)展性,使得我們能夠在短期內(nèi)支持PostgreSQL、MongoDB、Hbase、StarRocks等不同的數(shù)據(jù)源。

3.2 RDS日志獲取

DTS通過提供增量和全量同步能力為業(yè)務(wù)提供數(shù)據(jù)同步功能,但在增量訂閱/同步任務(wù)執(zhí)行過程中,可能會遇到一些異常情況。其中,以下三種情況需要特別處理:

  • Binlog可用性

云廠商的數(shù)據(jù)庫實例本地binlog有效期8小時,過期部分進(jìn)行OSS備份。MySQL業(yè)務(wù)高峰期或者DDL變產(chǎn)生大量的binlog,  DTS任務(wù)嘗試獲取過期數(shù)據(jù)失敗,任務(wù)因此中斷。因此,DTS支持了本地binlog+OSS備份binlog的獲取及切換,保障日志可用性。

  • 數(shù)據(jù)庫實例主從切換

RDS經(jīng)常會發(fā)生主備節(jié)點切換,在切換的過程中要保證數(shù)據(jù)不丟。由于切換前后兩個數(shù)據(jù)庫實例 Binlog 文件一般都是不一致的,此時任務(wù)位點記錄方式是 BinlogPosition 模式,則在切換之后任務(wù)需要自動進(jìn)行 Binlog 對齊操作,進(jìn)而保證數(shù)據(jù)的完整性。將新數(shù)據(jù)實例上的位點查詢時間戳提前1-2分鐘即可。

  • 讀實例訂閱支持

DTS任務(wù)binlog dump連接數(shù)過多造成主庫壓力及影響DDL變更,因此需要支持讀庫訂閱。云廠商的讀庫不提供備份,在讀庫日志過期時需要切換到主庫進(jìn)行讀取。

3.3 全量增量一體化功能

圖片圖片

全量增量一體化是指先同步存量數(shù)據(jù),待存量結(jié)束之后再開始同步增量數(shù)據(jù)。其中也加入了增量階段的OSS備份日志獲取。但存量階段依然存在一些問題,需要進(jìn)一步改造優(yōu)化。

全量模式下新增表先進(jìn)行存量數(shù)據(jù)同步再進(jìn)行增量數(shù)據(jù)同步,該任務(wù)中已存在的表會因此導(dǎo)致數(shù)據(jù)延遲。待新增表數(shù)據(jù)同步完成,任務(wù)延遲則會恢復(fù)正常。

3.4 數(shù)據(jù)源接入- starrocks, postgres等

支持從mysql同步到starrocks和postgres, 在任務(wù)執(zhí)行框架的基礎(chǔ)上,只需要開發(fā)starrocks-connector, postgres connector支持對應(yīng)的數(shù)據(jù)源即可。其中的其他能力,像多表同步、分庫分表等場景都可以達(dá)到復(fù)用的效果。

3.5 JBDC寫入改造

腳本擴(kuò)展和動態(tài)表名路由:

圖片圖片

數(shù)據(jù)合并和多線程寫入:

圖片圖片

3.6 監(jiān)控告警

DTS任務(wù)需要采集flink任務(wù)指標(biāo),主要包括任務(wù)延遲、各個算子階段的寫入速率,算子被壓及使用率等。其中 任務(wù)延遲需要接入告警服務(wù),于是我們選擇了引入redis來緩存任務(wù)的延遲時間,再上報到告警服務(wù)來完成飛書的消息和電話告警。

4最佳實踐

4.1 0000-00-00 00:00:00時間戳的問題

MySQL的時間戳允許為0000-00-00 00:00:00, 在Flink任務(wù)中通常會被轉(zhuǎn)換為null, 導(dǎo)致寫入下游數(shù)據(jù)源失敗, 因此需要做特殊標(biāo)記對于不同的數(shù)據(jù)源做不同的轉(zhuǎn)化保證寫入的正切行。

4.2 Flink CDC任務(wù)serverId唯一性

Flink CDC source 會偽裝成 MySQL slave節(jié)點,為了保證數(shù)據(jù)的準(zhǔn)確性,每個slave必須擁有唯一的serverId來標(biāo)記該slave的唯一性。因此在flink cdc的任務(wù)中我們?yōu)槊恳粋€任務(wù)分配了一個唯一的serverId區(qū)間(范圍區(qū)間是為了支持多并行度)。

4.3 Flink任務(wù)數(shù)據(jù)序列化瓶頸

在flink任務(wù)中使用DataStreamAPI并使用比較復(fù)雜的數(shù)據(jù)結(jié)構(gòu)進(jìn)行傳輸時,算子之間的序列化成本較高,兩個方向,一是建立更為高效的數(shù)據(jù)結(jié)構(gòu)進(jìn)行傳輸,二是開啟flink對象復(fù)用,并盡可能減少不同并行度之間的數(shù)據(jù)傳輸。

5未來演進(jìn)

DTS作為一個數(shù)據(jù)同步平臺主要功能是盡可能提供高效的數(shù)據(jù)源同步功能,助力于多變的業(yè)務(wù)場景。

5.1 基于Flink SQL的ETL任務(wù)管理

流式數(shù)據(jù)處理除了現(xiàn)有的DataStream API還存在SQL的形式,SQL作為一種通用的語言,對于數(shù)據(jù)相關(guān)的業(yè)務(wù)同學(xué)極大的降低了學(xué)習(xí)成本。而通過Flink SQL可以做到的ETL流式數(shù)據(jù)加工也能解決一些復(fù)雜業(yè)務(wù)場景的處理邏輯,將業(yè)務(wù)邏輯轉(zhuǎn)化為DAG的流式處理圖,通過拖拽的方式也能方便使用,F(xiàn)LINK SQL的演進(jìn)方向能夠和現(xiàn)有的Flink DataStream API互補(bǔ)。

應(yīng)用場景:ETL強(qiáng)大的流式數(shù)據(jù)轉(zhuǎn)換處理能力大幅提升數(shù)據(jù)集成效率,也能建實時報表體系,提高分析效率,同時也可以應(yīng)用于一些實時大屏的場景。

5.2 統(tǒng)一技術(shù)棧

將現(xiàn)有的DTS能力都遷移到Flink平臺上,保持統(tǒng)一的技術(shù)棧,能夠極大的降低維護(hù)成本。現(xiàn)有遺留的雙向同步、數(shù)據(jù)比對等能力需要做進(jìn)一步的改造和遷移,符合整體技術(shù)收斂的趨勢。

6總結(jié)

本文主要分享了以下幾個方面:Flink相比現(xiàn)有的技術(shù)棧帶來的收益,切換到Flink以后的迭代方向及架構(gòu)功能上的變更、帶來新的問題如何解決,以及未來的一些迭代方向,希望能讓大家有所收獲。

責(zé)任編輯:武曉燕 來源: 得物技術(shù)
相關(guān)推薦

2025-04-22 00:00:55

2023-01-11 18:34:22

推薦精排模型

2019-11-19 11:51:54

固件更新物聯(lián)網(wǎng)物聯(lián)網(wǎng)平臺

2019-05-23 08:29:42

物聯(lián)網(wǎng)平臺物聯(lián)網(wǎng)IOT

2024-11-12 14:19:53

2014-02-27 14:14:20

第三技術(shù)平臺梭子魚

2015-11-03 09:35:42

物聯(lián)網(wǎng)核心技術(shù)

2023-11-29 18:41:35

模型數(shù)據(jù)

2023-05-15 18:33:09

得物前端巡檢

2022-10-20 14:35:48

用戶畫像離線

2025-03-20 10:47:15

2023-12-06 11:28:48

工業(yè)物聯(lián)網(wǎng)IIoT

2022-12-09 18:58:10

2023-02-01 18:33:44

得物商家客服

2023-12-27 18:46:05

云原生容器技術(shù)

2023-02-06 18:35:05

架構(gòu)探測技術(shù)

2020-06-04 16:52:31

物聯(lián)網(wǎng)安防技術(shù)

2024-12-03 11:59:53

2023-05-12 18:42:13

得物AI平臺

2019-01-23 07:41:27

私有云企業(yè)虛擬化
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 中文字幕在线视频免费观看 | 91观看| 黄色在线观看国产 | 国产日韩欧美在线一区 | 日本欧美视频 | 天堂av中文在线 | 国产一区在线免费 | 亚洲成av人影片在线观看 | 国产精品一二三区 | 久久国产精品99久久久久久丝袜 | 一级毛片成人免费看a | 国户精品久久久久久久久久久不卡 | 国产激情视频网址 | 国产第一页在线播放 | 一区二区在线不卡 | 亚洲视频在线观看 | 超碰520 | 亚洲欧美日韩精品久久亚洲区 | 欧美日韩在线高清 | 欧美激情视频一区二区三区在线播放 | 日韩精品一区二区三区中文字幕 | 国产专区免费 | 亚洲精品大全 | 精品久久久久久 | 国产区免费视频 | 国产精品美女久久久久aⅴ国产馆 | 久久99深爱久久99精品 | 欧美成人精品一区二区男人看 | 国产精品一区在线观看 | 日韩精品一区二区三区中文字幕 | 欧美一级免费看 | 亚洲欧美视频 | 日韩在线欧美 | 超碰在线影院 | 亚洲欧美激情视频 | 欧美日韩视频在线第一区 | 91精品国产综合久久久久久丝袜 | 91中文在线观看 | 人人人人干 | 成人激情视频免费在线观看 | 九九在线精品视频 |