成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

HBase Compaction 原理與線上調優實踐

大數據
本文對 HBase Compaction 的原理、流程以及限流的策略進行了詳細的介紹,列舉了幾個線上進行調優的案例,最后對 Compaction 的相關參數進行了總結。

一、Compaction 介紹

HBase 是基于一種 LSM-Tree(Log-Structured Merge Tree)體系架構的存儲模型設計的,寫入時先寫入 WAL(Write-Ahead-Log)日志,再寫入 Memstore 緩存,滿足一定條件后,會執行 Flush 操作將緩存數據刷寫到磁盤,生成一個 HFile 數據文件。隨著數據不斷寫入,HFile 文件會越來越多,文件太多導致查詢數據時 IO 次數增加,進而影響到 HBase 的查詢性能。為了優化讀的性能,采用合并小 HFile 的方法來減少文件數量,這種合并 HFile 的操作就稱為 Compaction。Compaction 是從一個 Region 的一個 Store 中選擇部分 HFile 文件進行合并的過程。合并原理是從這些待合并的數據文件中依次讀出 KeyValue,由小到大排序后寫入一個新的文件中。之后這個新生成的文件就會取代之前已合并的所有文件對外提供服務。

1.1 Compaction 的分類

HBase 根據合并規模將 Compaction 分為兩類:Minor Compaction 和 Major Compaction。

  • Minor Compaction 是指選取部分小的、相鄰的 HFile,將它們合并成一個更大的 HFile;
  • Major Compaction 是指將一個Store 中所有的 HFile 合并成一個 HFile,這個過程會清理三種無意義的數據:TTL 過期數據、被刪除的數據與版本號超過設定版本號的數據。

下圖形象的描述了2種 Compaction 的區別:

圖片

一般情況下,Major Compaction 持續時間比較長,整個過程消耗大量系統資源,因此線上數據量較大的業務通常推薦關閉自動觸發 Major Compaction 功能,改為在業務低峰期手動觸發(或設置策略自動在低峰期觸發)。

1.2 Compaction的意義

  1. 合并小文件,減少文件數,提升讀取性能,穩定隨機讀延遲;
  2. 合并的時候會讀取遠程 DataNode 上的文件寫入本地 DataNode,提高數據的本地化率;
  3. 清除過期數據和被刪除的數據,減少表的存儲量。

1.3 Compaction觸發時機

HBase 中觸發 Compaction 的時機有很多種,最常見的觸發時機有三種:后臺線程周期性檢查時觸發、MemStore Flush 觸發以及手動觸發。

(1)后臺線程周期性檢查:后臺線程 CompactionChecker 會定期檢查是否需要執行 Compaction,檢查周期為

hbase.server.thread.wakefrequency *hbase.server.compactchecker.interval.multiplier,這里主要考慮的是一段時間內沒有寫入請求導致 Flush 觸發不了 Compaction 的情況。其中參數 hbase.server.thread.wakefrequency 默認值是10s,是 HBase 服務端線程喚醒時間間隔,參數 hbase.server.compactchecker.interval.multiplier 默認值1000,是 Compaction 操作周期性檢查乘數因子。10 * 1000 s 約等于 2hrs 46mins 40sec。

(2)MemStore Flush:Compaction 的根源在于 Flush,MemStore 達到一定閾值就會觸發 Flush ,將內存中的數據刷寫到磁盤生成 HFile 文件,隨著 HFile 文件越來越多就需要執行 Compaction。HBase 每次 Flush之后,都會判斷是否需要進行 Compaction,一旦滿足 Minor Compaction 或 Major Compaction 的條件便會觸發執行。

(3)手動:是指通過 HBase API、HBase Shell 或者 Master UI 界面等方式執行 compact、major_compact 等命令。

二、Compaction流程

了解完基本的背景后,接下來介紹 Compaction 的整個過程。

  • RegionServer 啟動一個 Compaction 檢查線程,定期對 Region 的 Store 進行檢查;
  • Compaction 始于特定的觸發條件。一旦觸發,HBase 會將該 Compaction 交由一個獨立的線程處理;
  • 從對應的 Store 中選擇合適的 HFile 文件,這步是整個 Compaction 的核心,選取文件時需要遵循很多條件,比如文件數既不能太多也不能太少、文件大小不能太大等,盡可能地選取承載 IO 負載重的文件集。基于此,HBase 實現了多種文件選取策略:常用的有
    RatioBasedCompactionPolicy、
    ExploringCompactionPolicy
    StripeCompactionPolicy 等,也支持自定義的 Compaction 算法;
  • 選出待合并的文件后,會根據這些 HFile 文件的總大小選擇對應的線程池來進行處理;
  • 對這些文件執行具體的 Compaction 操作。

下圖簡單的描述了上述流程。

圖片

下面對圖2中具體的每一步進行詳細說明。

2.1 啟動 Compaction 定時線程

在 RegionServer 啟動時,會初始化 CompactSplitThread 線程以及定時檢查的 CompactionChecker ,默認10s執行一次。

// Compaction thread
this.compactSplitThread = new CompactSplitThread(this);
// Background thread to check for compactions; needed if region has not gotten updates
// in a while. It will take care of not checking too frequently on store-by-store basis.
this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);

其中 CompactSplitThread 是用來實現 Compaction 以及 Split 流程的類,而 CompactChecker 是用來周期性檢查是否執行 Compaction 的。

CompactionChecker 是 ScheduledChore 類型,而 ScheduledChore 是 HBase定期執行的一個 Task。

2.2 觸發 Compaction

Compaction 的觸發時機在上面已經介紹過,下面對這3種觸發機制進行詳細的介紹。

2.2.1 后臺線程周期性檢查

后臺線程 CompactionChecker 定期檢查是否需要執行 Compaction,檢查周期為:hbase.regionserver.compaction.check.period(默認10s)。

(1)首先檢查文件數是否大于可執行 Compaction 的文件數,一旦大于就會觸發 Compaction。

(2)如果不滿足,會接著檢查是否到了 Major Compaction 的執行周期。如果當前 Store 中 HFile 的最早更新時間早于某個值 mcTime,就會觸發 Major Compaction,其中 mcTime 是一個浮動值,浮動區間默認為[7-7*0.2,7+7*0.2],其中7為配置項 hbase.hregion.majorcompaction 設置,0.2為配置項 hbase.hregion.

majorcompaction.jitter,所以在7天左右就會執行一次 Major Compaction。用戶如果想禁用 Major Compaction,只需要將參數hbase.hregion

.majorcompaction 設為0。

(3)如果到了 Major Compaction 的執行周期:

  • 首先判斷有幾個 HFile 文件,如果只有1個文件,會判斷是否有過期數據、本地化率是否比較低,如果都不滿足就不做 Major
    Compaction;
  • 如果大于1個文件,也會做 Major
    Compaction。

后臺線程周期性檢查的流程如圖3所示。

圖片

下面是該線程的關鍵代碼:

//ScheduledChore的run方法會一直調用chore函數
@Override
protected void chore() {
  // 遍歷instance下的所有online的region 進行循環檢測
  // onlineRegions是HRegionServer上存儲的所有能夠提供有效服務的在線Region集合;
  for (HRegion r : this.instance.onlineRegions.values()) {
    if (r == null)
      continue;
    // 取出每個region的store
    for (Store s : r.getStores().values()) {
      try {
        // 檢查是否需要compact的時間間隔 hbase.server.compactchecker.interval.multiplier * hbase.server.thread.wakefrequency,multiplier默認1000;
        long multiplier = s.getCompactionCheckMultiplier();
        assert multiplier > 0;
        // 未到multiplier的倍數跳過,每當迭代因子iteration為合并檢查倍增器multiplier的整數倍時,才會發起檢查
        if (iteration % multiplier != 0) continue;
        // 需要合并的話,發起SystemCompaction請求,此處最終比較的是是否當前hfile數量減去正在compacting的文件數大于設置的compact min值。若滿足則執行systemcompact
        if (s.needsCompaction()) {
          // Queue a compaction. Will recognize if major is needed.
          this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
              + " requests compaction");
        } else if (s.isMajorCompaction()) {
          if (majorCompactPriority == DEFAULT_PRIORITY
              || majorCompactPriority > r.getCompactPriority()) {
            this.instance.compactSplitThread.requestCompaction(r, s, getName()
                + " requests major compaction; use default priority", null);
          } else {
            this.instance.compactSplitThread.requestCompaction(r, s, getName()
                + " requests major compaction; use configured priority",
              this.majorCompactPriority, null);
          }
        }
      } catch (IOException e) {
        LOG.warn("Failed major compaction check on " + r, e);
      }
    }
  }
  iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
}

2.2.2 Memstore Flush 觸發

Memstore Flush 會產生 HFile 文件,文件越來越多就需要 Compaction。因此在每次執行完 Flush 操作之后,都會對當前 Store 中的文件數進行判斷,一旦文件數超過 Compaction 的閾值 ,就會觸發 Compaction。這里需要強調的是,Compaction 是以 Store 為單位進行的,而在 Flush 觸發條件下,整個 Region 的所有 Store 都會執行 Compaction,所以會在短時間內可能會執行多次 Compaction。下面是 Flush 操作觸發 Compaction 的代碼。

/**
   * Flush a region.
   * @param region Region to flush.
   * @param emergencyFlush Set if we are being force flushed. If true the region
   * needs to be removed from the flush queue. If false, when we were called
   * from the main flusher run loop and we got the entry to flush by calling
   * poll on the flush queue (which removed it).
   * @param forceFlushAllStores whether we want to flush all store.
   * @return true if the region was successfully flushed, false otherwise. If
   * false, there will be accompanying log messages explaining why the region was
   * not flushed.
   */
  private boolean flushRegion(final Region region, final boolean emergencyFlush,
      boolean forceFlushAllStores) {
    synchronized (this.regionsInQueue) {
      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
      // Use the start time of the FlushRegionEntry if available
      if (fqe != null && emergencyFlush) {
        // Need to remove from region from delay queue.  When NOT an
        // emergencyFlush, then item was removed via a flushQueue.poll.
        flushQueue.remove(fqe);
      }
    }
    lock.readLock().lock();
    try {
      // flush
      notifyFlushRequest(region, emergencyFlush);
      FlushResult flushResult = region.flush(forceFlushAllStores);
     // 檢查是否需要compact
      boolean shouldCompact = flushResult.isCompactionNeeded();
      // We just want to check the size
      // 檢查是否需要split
      boolean shouldSplit = ((HRegion)region).checkSplit() != null;
      if (shouldSplit) {
        this.server.compactSplitThread.requestSplit(region);
      } else if (shouldCompact) {
        // 發起compact請求
        server.compactSplitThread.requestSystemCompaction(
            region, Thread.currentThread().getName());
      }
    } catch (DroppedSnapshotException ex) {
      // Cache flush can fail in a few places. If it fails in a critical
      // section, we get a DroppedSnapshotException and a replay of wal
      // is required. Currently the only way to do this is a restart of
      // the server. Abort because hdfs is probably bad (HBASE-644 is a case
      // where hdfs was bad but passed the hdfs check).
      server.abort("Replay of WAL required. Forcing server shutdown", ex);
      return false;
    } catch (IOException ex) {
      LOG.error("Cache flush failed" + (region != null ? (" for region " +
          Bytes.toStringBinary(region.getRegionInfo().getRegionName())) : ""),
        RemoteExceptionHandler.checkIOException(ex));
      if (!server.checkFileSystem()) {
        return false;
      }
    } finally {
      lock.readLock().unlock();
      wakeUpIfBlocking();
    }
    return true;
  }

2.2.3 手動觸發

手動觸發就是通過命令或者 API 接口手動觸發 Compaction,手動觸發的原因有三個:

  • 很多業務擔心自動 Major Compaction 影響讀寫性能,因此會選擇低峰期手動觸發;
  • 用戶在執行完修改ttl的屬性后希望立刻生效,執行手動觸發 Major Compaction;
  • 硬盤容量不夠的情況下手動觸發 Major Compaction 刪除大量過期數據。

大多數都是基于第1點原因進行手動觸發。

2.3 選擇待合并的文件

Compaction 的核心就是選擇合適的文件進行合并,因為合并文件的大小以及其當前承載的 IO 直接決定了 Compaction 的效果。希望能找到這樣的文件:承載了大量 IO 請求但是文件大小很小,這樣 Compaction 本身不會消耗太多 IO,而且合并完成之后對讀的性能會有顯著提升。現實情況可能大部分都不會是這樣。目前 HBase 提供了多種 Minor Compaction 文件選擇策略,通過配置項 hbase.hstore.engine.class 設置。不管哪種策略,在執行之前都要做對文件做一些篩選操作,排除不符合條件的文件,以減少 Compaction 的工作量,減少對讀寫的影響。

  • 排除當前正在執行 Compaction 的文件;
  • 如果一個文件所有的記錄都已經過期,則直接將文件刪除;
  • 排除過大的單個文件,如果文件大小大于 hbase.hstore.compaction.max.size(默認Long最大值)則被排除,不排除會產生大量 IO 消耗。

排除完后剩下的文件稱為候選文件,接下來會再判斷是否滿足 Major Compaction 條件,如果滿足,就會選擇全部文件進行合并。判斷條件有下面三條,只要滿足其中一條就會執行 Major Compaction:

  • 到了 Compaction 自動執行的周期且候選文件數小于 hbase.hstore.compaction
    .max(默認10),如果關掉自動 Major Compaction 執行則不適用;
  • Store 中含有 Reference 文件,Reference 文件是 Split Region 產生的臨時引用文件,在 Compaction 過程中刪除;
  • 用戶手動執行的 Major Compaction。

如果不滿足上述執行條件,則為 Minor compaction。Minor Compaction 的策略有很多種,下面重點介紹 

RationBasedCompactionPolicy(0.98之前的版本)、ExploringCompactionPolicy(0.98之后默認的版本) 和 StripeCompactionPolicy 的執行策略。

2.3.1 Compaction文件選擇策略的建模

所謂的 Compaction 文件選擇策略可以建模為下面的問題:

圖片

圖中的每個數字表示了文件的 Sequence ID,數字越大則文件越新,很有可能剛剛Flush而成,意味著文件 Size 也可能越小。這樣的文件在 Compaction 時優先選擇,因此 Store下的 Storefile 文件會依據 Sequence ID 從小到大排序,依次標記為 f[0]、f[1]。。。。f[n-1],篩選策略就是要確定一個連續范圍 [Start, End] 內的 Storefile 參與 Compaction。

Compaction 的目的是減少文件數量和刪除無用的數據,優化讀性能,Compaction 實現是將原文件的內容重寫到新的文件,如果文件過大意味著 Compaction 的時間長,Compaction 過程中產生的 IO 放大越明顯,因此文件篩選的準則是用最小的 IO 代價去合并減少最多的文件數。

Compaction 依賴兩個先決條件:

  • 所有 StoreFile 按照順序進行排序(此順序為:老文件在前,新文件在后);
  • 參與 Compaction 的文件必須是連續的。

2.3.2 RationBasedCompactionPolicy

基本思想就是選擇在固定 End 為最后一個文件的前提下(一般情況),從隊列頭開始滑動尋找 Start,直到 Start 滿足下面的條件之一便停止掃描:

  1. 當前文件大小 < 比當前文件新的所有文件大小總和 * ratio,就是滿足公式 f[start].size <= ratio * (f[start+1].size +.......+ f[end-1].size)。其中 ration 是一個可變的比例,高峰期 ration 為1.2,非高峰期 ration 為5,非高峰期允許合并更大的文件。
    可以通過參數 hbase.offpeak.start.hour 和 hbase.offpeak.end.hour 設置高峰期時間段。
  2. 當前所剩候選文件數 >= hbase.store.compaction.min(默認為3),因為要保證本次 Compaction 的時候文件個數要大于配置的 Compaction 最小值。

下面附上 RationBasedCompactionPolicy 的具體邏輯代碼。

/**
  * @param candidates pre-filtrate
  * @return filtered subset
  * -- Default minor compaction selection algorithm:
  * choose CompactSelection from candidates --
  * First exclude bulk-load files if indicated in configuration.
  * Start at the oldest file and stop when you find the first file that
  * meets compaction criteria:
  * (1) a recently-flushed, small file (i.e. <= minCompactSize)
  * OR
  * (2) within the compactRatio of sum(newer_files)
  * Given normal skew, any newer files will also meet this criteria
  * <p/>
  * Additional Note:
  * If fileSizes.size() >> maxFilesToCompact, we will recurse on
  * compact().  Consider the oldest files first to avoid a
  * situation where we always compact [end-threshold,end).  Then, the
  * last file becomes an aggregate of the previous compactions.
  *
  * normal skew:
  *
  *         older ----> newer (increasing seqID)
  *     _
  *    | |   _
  *    | |  | |   _
  *  --|-|- |-|- |-|---_-------_-------  minCompactSize
  *    | |  | |  | |  | |  _  | |
  *    | |  | |  | |  | | | | | |
  *    | |  | |  | |  | | | | | |
  */
ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
    boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
  if (candidates.isEmpty()) {
    return candidates;
  }
  // we're doing a minor compaction, let's see what files are applicable
  int start = 0;
  // 獲取文件合并比例:取參數hbase.hstore.compaction.ratio,默認為1.2
  double ratio = comConf.getCompactionRatio();
  if (mayUseOffPeak) {
    // 取參數hbase.hstore.compaction.ratio.offpeak,默認為5.0
    ratio = comConf.getCompactionRatioOffPeak();
    LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
  }
  // get store file sizes for incremental compacting selection.
  final int countOfFiles = candidates.size();
  long[] fileSizes = new long[countOfFiles];
  long[] sumSize = new long[countOfFiles];
  for (int i = countOfFiles - 1; i >= 0; --i) {
    StoreFile file = candidates.get(i);
    fileSizes[i] = file.getReader().length();
    // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
    // tooFar表示后移動最大文件數位置的文件大小,也就是剛剛滿足達到最大文件數位置的那個文件,從i至tooFar數目為合并時允許的最大文件數
    int tooFar = i + comConf.getMaxFilesToCompact() - 1;
    sumSize[i] = fileSizes[i]
      + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
      - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
  }
  // 倒序循環,如果文件數目滿足最小合并時允許的最小文件數,且該位置的文件大小大于合并時允許的文件最小大小與下一個文件窗口文件總大小乘以一定比例中的較大者,則繼續;
  // 實際上就是選擇出一個文件窗口內能最小能滿足的文件大小的一組文件
  while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
    fileSizes[start] > Math.max(comConf.getMinCompactSize(),
        (long) (sumSize[start + 1] * ratio))) {
    ++start;
  }
  if (start < countOfFiles) {
    LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
      + " files from " + countOfFiles + " candidates");
  } else if (mayBeStuck) {
    // We may be stuck. Compact the latest files if we can.保證最小文件數目的要求
    int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
    if (filesToLeave >= 0) {
      start = filesToLeave;
    }
  }
  candidates.subList(0, start).clear();
  return candidates;
}

2.3.3 ExploringCompactionPolicy

該策略繼承自 RatioBasedCompactionPolicy,不同的是 Ration 策略在找到一個合適的文件集合之后就停止掃描了,而 Exploring 策略會把 Storefile 列表劃分成多個子隊列,從中找出一個最優解參與 Compaction。最優解可以理解為:待合并文件數最多或者待合并文件數相同的情況下文件較小,這樣有利于減少 Compaction 帶來的 IO 消耗。算法流程可以描述為:

  1. 從頭到尾遍歷文件,判斷所有符合條件的組合;
  2. 選擇組合內文件數 >= minFiles,且 <= maxFiles;
  3. 計算各組合文件的總大小 size,選擇組合 size <= MaxCompactSize,且 >= minCompactSize;
  4. 每個組合里面的每一個文件大小都必須滿足 FileSize(i) <= (sum(0,N,FileSize(_)) - FileSize(i)) * ration,意義在于去掉很大的文件,每次 Compaction 時應該盡量合并一些大小較小的文件;
  5. 滿足以上 1-4 條件的組合里面選擇文件數最多,文件數一樣多時進一步選擇文件總 size 最小的,目的在于盡可能多地合并文件并且 Compaction 帶來的 IO 壓力越小越好。

下面附上 ExploringCompactionPolicy 的具體邏輯代碼。

public List<StoreFile> applyCompactionPolicy(final List<StoreFile> candidates,
       boolean mightBeStuck, boolean mayUseOffPeak, int minFiles, int maxFiles) {
  
    final double currentRatio = mayUseOffPeak
        ? comConf.getCompactionRatioOffPeak() : comConf.getCompactionRatio();
    // Start off choosing nothing.
    List<StoreFile> bestSelection = new ArrayList<StoreFile>(0);
    List<StoreFile> smallest = mightBeStuck ? new ArrayList<StoreFile>(0) : null;
    long bestSize = 0;
    long smallestSize = Long.MAX_VALUE;
    int opts = 0, optsInRatio = 0, bestStart = -1; // for debug logging
    // Consider every starting place. 從頭到尾遍歷文件
    for (int start = 0; start < candidates.size(); start++) {
      // Consider every different sub list permutation in between start and end with min files.
      for (int currentEnd = start + minFiles - 1;
          currentEnd < candidates.size(); currentEnd++) {
        List<StoreFile> potentialMatchFiles = candidates.subList(start, currentEnd + 1);
        // Sanity checks
        if (potentialMatchFiles.size() < minFiles) {
          continue;
        }
        if (potentialMatchFiles.size() > maxFiles) {
          continue;
        }
        // Compute the total size of files that will
        // have to be read if this set of files is compacted. 計算文件大小
        long size = getTotalStoreSize(potentialMatchFiles);
  
        // Store the smallest set of files.  This stored set of files will be used
        // if it looks like the algorithm is stuck. 總size最小的
        if (mightBeStuck && size < smallestSize) {
          smallest = potentialMatchFiles;
          smallestSize = size;
        }
        if (size > comConf.getMaxCompactSize(mayUseOffPeak)) {
          continue;
        }
        ++opts;
        if (size >= comConf.getMinCompactSize()
            && !filesInRatio(potentialMatchFiles, currentRatio)) {
          continue;
        }
        ++optsInRatio;
        if (isBetterSelection(bestSelection, bestSize, potentialMatchFiles, size, mightBeStuck)) {
          bestSelection = potentialMatchFiles;
          bestSize = size;
          bestStart = start;
        }
      }
    }
    if (bestSelection.size() == 0 && mightBeStuck) {
      LOG.debug("Exploring compaction algorithm has selected " + smallest.size()
          + " files of size "+ smallestSize + " because the store might be stuck");
      return new ArrayList<StoreFile>(smallest);
    }
    LOG.debug("Exploring compaction algorithm has selected " + bestSelection.size()
        + " files of size " + bestSize + " starting at candidate #" + bestStart +
        " after considering " + opts + " permutations with " + optsInRatio + " in ratio");
    return new ArrayList<StoreFile>(bestSelection);
  }

2.3.4 StripeCompactionPolicy

Stripe Compaction (HBASE-7667)還是為了減少 Major Compaction 的壓力而提出的。其思想是:減少 Major Compaction 壓力最直接辦法是減少 Region 的大小,最好整個集群都是由很多小 Region 組成,這樣參與 Compaction 的文件總大小就必然不會太大。可是 Region 設置過小會導致 Region 數量很多,這一方面會導致 HBase 管理 Region 的開銷很大,另一方面 Region 過多也要求 HBase 能夠分配更多的內存作為 Memstore 使用,否則有可能導致整個 RegionServer 級別的 Flush,進而引起長時間的寫阻塞。因此單純地通過將 Region 大小設置過小并不能本質解決問題。

(1) Level Compaction

社區開發者借鑒了 Leveldb 的 Compaction 策略 Level Compaction。Level Compaction 設計思路是將 Store 中的所有數據劃分為很多層,每一層都會有一部分數據,如下圖所示:

圖片

數據組織形式不再按照時間前后進行組織,而是按照 KeyRange 進行組織,每個 KeyRange 中會包含多個文件,這些文件所有數據的 Key 必須分布在同一個范圍。比如 Key 分布在 Key0~KeyN 之間的所有數據都會落在第一個 KeyRange 區間的文件中,Key 分布在 KeyN+1~KeyT 之間的所有數據會分布在第二個區間的文件中,以此類推。

整個數據體系會被劃分為很多層,最上層(Level 0)表示最新數據,最下層(Level 6)表示最舊數據。每一層都由大量 KeyRange 塊組成(Level 0除外),KeyRange 之間沒有 Key 重合。而且層數越大,對應層的每個 KeyRange 塊大小越大,下層 KeyRange 塊大小是上一層大小的10倍。圖中 Range 顏色越深,對應的 Range 塊越大。

數據從 Memstore 中 Flush 之后,會首先落入 Level 0,此時落入 Level 0 的數據可能包含所有可能的 Key。此時如果需要執行 Compaction,只需要將 Level 0 中的 KV 一個一個讀出來,然后按照 Key 的分布分別插入 Level 1 中對應 KeyRange 塊的文件中,如果此時剛好 Level 1 中的某個KeyRange 塊大小超過了一定閾值,就會繼續往下一層合并。

Level Compaction 依然會有 Major Compaction 的概念,發生 Major Compaction 只需要將 Range 塊內的文件執行合并就可以,而不需要合并整個 Region 內的數據文件。

可見,這種 Compaction 在合并的過程中,從上到下只需要部分文件參與,而不需要對所有文件執行 Compaction 操作。另外,Level Compaction 還有另外一個好處,對于很多只讀最近寫入數據’的業務來說,大部分讀請求都會落到 Level 0,這樣可以使用 SSD 作為上層 Level 存儲介質,進一步優化讀。然而,這種 Compaction 因為 Level 層數太多導致 Compaction 的次數明顯增多,經過測試,發現這種 Compaction 并沒有對 IO 利用率有任何提升。

(2)Stripe Compaction

雖然原生的 Level Compaction 并不適用于 HBase,但是這種 Compaction 的思想卻激發了HBase 研發者的靈感,再結合之前提到的小 Region 策略,就形成了 Stripe Compaction。

同 Level Compaction 相同,Stripe Compaction 會將整個 Store 中的文件按照 Key 劃分為多個 Range,稱為 Stripe,Stripe 的數量可以通過參數設定,相鄰的 Stripe 之間 Key 不會重合。Stripe 類似于 Sub-Region 的概念,即將一個大 Region 切分成了很多小的 Sub-Region。

隨著數據寫入,Memstore 執行 Flush 之后形成 HFile,這些 HFile 并不會馬上寫入對應的 Stripe,而是放到一個稱為 L0 的地方,用戶可以配置 L0 可以放置 HFile 的數量。一旦 L0 放置的文件數超過設定值,系統就會將這些 HFile 寫入對應的 Stripe:首先讀出 HFile 的 KVs,再根據每個 KV 的 Key 定位到具體的 Stripe,將該 KV 插入對應 Stripe 的文件中即可,如圖6所示。由于 Stripe 是個小的 Region,所以 Compaction 并不會太多消耗系統資源。另外,讀取數據時,根據對應的 Key 查找到對應的 Stripe,然后在 Stripe 內部執行查找,因為 Stripe 內數據量相對很小,所以一定程度上也可以提升數據查找性能。

圖片

2.4 執行 Compaction 操作

挑選好待合并文件后,就是執行真正的合并。合并流程主要分為以下幾步:

  1. 按順序讀出待合并所有 HFile 文件的 KV,并順序寫到位于./tmp 目錄下的臨時文件中;
  2. 將臨時文件移動到對應 Region 的正式數據目錄中;
  3. 將 Compaction 的輸入文件路徑和輸出文件路徑封裝為 KV 寫入 WAL 日志,并打上 Compaction 標記,最后強制執行 sync;
  4. 將對應 Region 數據目錄下的 Compaction 的輸入文件全部刪除。

HBase對整個 Compaction 的考慮是非常全面的,上述4個步驟的每一步發生錯誤,都具有很強的容錯性和冪等性(執行一次和多次的結果相同)。

  • 如果 RS 在步驟2或步驟2之前發生異常,本次 Compaction 會被認為失敗,如果繼續進行同樣的 Compaction,上次異常對接下來的 Compaction不會有任何影響,也不會對讀寫有影響,唯一的影響就是多了一份冗余的數據;
  • 如果 RS 在步驟2之后、步驟3或步驟3之前發生異常,也僅僅會多一份冗余數據;
  • 如果在步驟3之后、步驟4之前發生異常,則 RS 在重新打開 Region 之后就會從 WAL 中看到上次 Compaction 的日志。因為此時輸入文件和輸出文件已經持久化到 HDFS,因此只需要根據 WAL 日志移除掉 Compaction 的輸入文件即可。

下面附上 Store 的 compact 方法。

public List<StoreFile> compact(CompactionContext compaction,
   CompactionThroughputController throughputController, User user) throws IOException {
   assert compaction != null;
   List<StoreFile> sfs = null;
   CompactionRequest cr = compaction.getRequest();
   try {
     // Do all sanity checking in here if we have a valid CompactionRequest
     // because we need to clean up after it on the way out in a finally
     // block below
     long compactionStartTime = EnvironmentEdgeManager.currentTime();
     assert compaction.hasSelection();
     Collection<StoreFile> filesToCompact = cr.getFiles();
     assert !filesToCompact.isEmpty();
     synchronized (filesCompacting) {
       // sanity check: we're compacting files that this store knows about
       // TODO: change this to LOG.error() after more debugging
       // 再次檢查
       Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
     }
     // Ready to go. Have list of files to compact.
     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
         + this + " of " + this.getRegionInfo().getRegionNameAsString()
         + " into tmpdir=" + fs.getTempDir() + ", totalSize="
         + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
     // Commence the compaction.  開始compact,newFiles是合并后的新文件
     List<Path> newFiles = compaction.compact(throughputController, user);
     long outputBytes = 0L;
     // TODO: get rid of this!
     if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
       LOG.warn("hbase.hstore.compaction.complete is set to false");
       sfs = new ArrayList<StoreFile>(newFiles.size());
       final boolean evictOnClose =
           cacheConf != null? cacheConf.shouldEvictOnClose(): true;
       for (Path newFile : newFiles) {
         // Create storefile around what we wrote with a reader on it.
         StoreFile sf = createStoreFileAndReader(newFile);
         sf.closeReader(evictOnClose);
         sfs.add(sf);
       }
       return sfs;
     }
     // Do the steps necessary to complete the compaction.
     // 將newFiles移動到新的位置,返回StoreFile列表
     sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
     // 在WAL中寫入Compaction記錄
     writeCompactionWalRecord(filesToCompact, sfs);
     // 將新生成的StoreFile列表替換到StoreFileManager的storefile中
     replaceStoreFiles(filesToCompact, sfs);
     // 根據compact類型,累加相應計數器
     if (cr.isMajor()) {
       majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
       majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
     } else {
       compactedCellsCount += getCompactionProgress().totalCompactingKVs;
       compactedCellsSize += getCompactionProgress().totalCompactedSize;
     }
     for (StoreFile sf : sfs) {
       outputBytes += sf.getReader().length();
     }
     // At this point the store will use new files for all new scanners.
     // 歸檔舊文件
     completeCompaction(filesToCompact, true); // Archive old files & update store size.
     long now = EnvironmentEdgeManager.currentTime();
     if (region.getRegionServerServices() != null
         && region.getRegionServerServices().getMetrics() != null) {
       region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(),
         now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
         outputBytes);
     }
     // 記錄日志信息并返回
     logCompactionEndMessage(cr, sfs, now, compactionStartTime);
     return sfs;
   } finally {
     finishCompactionRequest(cr);
   }
 }

三、Compaction 的限流

上述幾種策略都是根據不同的業務場景設置對應的文件選擇策略,核心都是減少參與 Compaction 的文件數,縮短整個 Compaction 執行的時間,間接降低 Compaction 的 IO 放大效應,減少對業務讀寫的延遲影響。但是,如果不對 Compaction 執行階段的讀寫吞吐量進行限制的話也會引起短時間大量系統資源消耗,影響用戶讀寫延遲。HBase 通過限制 Compaction 速度 和 Compaction 的帶寬來對 Compaction 進行限流。

3.1 Limit Compaction Speed

該優化方案通過感知 Compaction 的壓力情況自動調節系統的 Compaction 吞吐量,在壓力大的時候降低合并吞吐量,壓力小的時候增加合并吞吐量。

基本原理為:

在正常情況下,用戶需要設置吞吐量下限參數 hbase.hstore.compaction.throughput.lower.bound (默認10MB/sec) 和上限參數 hbase.hstore.compaction.throughput.higher.bound (默認20MB/sec),實際會工作時吞吐量為 lower + (higer – lower) * ratio,其中 ratio 是一個取值范圍在0到1的小數,它由當前 Store 中待參與 Compation 的 HFile 數量決定,數量越多,ratio 越小,反之越大。

如果當前 Store中 HFile 的數量太多,并且超過了參數 blockingFileCount,此時所有寫請求就會阻塞等待 Compaction 完成,這種場景下上述限制會自動失效。

3.2 Compaction BandWidth Limit

原理其實和 Limit Compaction Speed 思路基本一致,它主要涉及兩個參數:compactBwLimit 和 numOfFilesDisableCompactLimit。

作用分別如下:

  • compactBwLimit:一次 Compaction 的最大帶寬使用量,如果 Compaction 所使用的帶寬高于該值,就會強制令其 sleep 一段時間。
  • numOfFilesDisableCompactLimit:很顯然,在寫請求非常大的情況下,限制 Compaction 帶寬的使用量必然會導致 HFile 堆積,進而會影響到讀請求響應延時。因此該值意義就很明顯,一旦 Store 中 HFile 數量超過該設定值,帶寬限制就會失效。
// 該方法進行Compaction的動態限制
private void tune(double compactionPressure) {
    double maxThroughputToSet;
    // 壓力大于1,最大限速不受限制
    if (compactionPressure > 1.0) {
      // set to unlimited if some stores already reach the blocking store file count
      maxThroughputToSet = Double.MAX_VALUE;
     // 空閑時間,最大限速為設置的Compaction最大吞吐量
    } else if (offPeakHours.isOffPeakHour()) {
      maxThroughputToSet = maxThroughputOffpeak;
    } else {
      // compactionPressure is between 0.0 and 1.0, we use a simple linear formula to
      // calculate the throughput limitation.
      // lower + (higher - lower) * ratio
      maxThroughputToSet =
          maxThroughputLowerBound + (maxThroughputHigherBound - maxThroughputLowerBound)
              * compactionPressure;
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to "
          + throughputDesc(maxThroughputToSet));
    }
    this.maxThroughput = maxThroughputToSet;
  }

再來看下獲取 R S的 Compaction 壓力的 getCompactionPressure 方法,其實就是遍歷每個 Region 的每個 Store,取壓力最大的。

@Override
public double getCompactionPressure() {
  double max = 0;
  for (Region region : onlineRegions.values()) {
    for (Store store : region.getStores()) {
      double normCount = store.getCompactionPressure();
      if (normCount > max) {
        max = normCount;
      }
    }
  }
  return max;
}
@Override
public double getCompactionPressure() {
  int storefileCount = getStorefileCount();
  int minFilesToCompact = comConf.getMinFilesToCompact();
  if (storefileCount <= minFilesToCompact) {
    return 0.0;
  }
  return (double) (storefileCount - minFilesToCompact) / (blockingFileCount - minFilesToCompact);
}

HBase 的限流方案通過感知 Compaction 的壓力情況自動調節系統的 Compaction 吞吐量,在壓力大的時候降低合并吞吐量,壓力小的時候增加合并吞吐量。

基本原理為:

在正常情況下,用戶需要設置吞吐量下限參數 hbase.hstore.compaction.throughput.lower.bound (默認10MB/sec)   和上限參數 hbase.hstore.compaction.throughput.higher.bound(默認20MB/sec),而實際會工作在吞吐量為 lower + (higer – lower) * ratio的情況下,其中 ratio 是一個取值范圍在0到1的小數,它由當前 Store 中待參與 Compation 的 HFile 數量決定,數量越多,ratio 越小,反之越大。

如果當前 Store中 HFile 的數量太多,并且超過了 blockingFileCount 的值,該值由參數 hbase.hstore.blockingStoreFiles 配置,此時所有寫請求就會阻塞等待 Compaction 完成,這種場景下,上述限制會自動失效。

四、線上遇到的問題及調優方法

由于線上環境的復雜性,對 Compaction 模塊做了較多的優化,下面選取兩個典型案例進行說明。

4.1 關閉了自動觸發 Major Compaction,但是監控中 Major Compaction 隊列仍然有值進而影響讀寫性能

線上集群都是關閉自動觸發 Major Compaction 的功能,在業務低峰期由定時任務手動觸發 Major Compaction。在某次故障中,業務反饋讀寫性能在非執行 Major Compaction 的時段延遲比較大。查看監控發現,監控中的 Major Compaction 隊列的值比較大。

下面是當時的 Major Compaction 隊列長度和讀寫調用平均耗時的監控圖,從圖中可以很明顯地看出下面幾點:

  • Major Compaction 的隊列長度比較大的時候,讀寫的耗時也比較大;
  • Major Compaction 的隊列長度跟入流量有關系,入流量比較大的時候,Major Compaction 的隊列長度就比較大。

這里就產生了疑問,關閉了自動 Major Compaction,是什么條件觸發了 Major Compaction ?

圖片


圖片


帶著上面的疑問,我們從源碼的層面對問題進行分析。

1)首先查看了 Major Compaction 隊列長度這個指標的含義,該指標表示 longCompaction 線程池的工作隊列中等待的個數。

@Override
public int getLargeCompactionQueueSize() {
  //The thread could be zero.  if so assume there is no queue.
  if (this.regionServer.compactSplitThread == null) {
    return 0;
  }
  return this.regionServer.compactSplitThread.getLargeCompactionQueueSize();
}
public int getLargeCompactionQueueSize() {
  return longCompactions.getQueue().size();
}

2)查看 HBase 日志,發現確實有做 Major Compaction 的行為。

圖片

3)進一步排查什么時候會去調用 long Compaction 的線程池,查看 Compaction 選擇 long Compaction 和 small Compaction 隊列相關的源碼。

/**
 * @param candidateFiles candidate files, ordered from oldest to newest. All files in store.
 * @return subset copy of candidate list that meets compaction criteria
 * @throws java.io.IOException
 */
public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
    final List<StoreFile> filesCompacting, final boolean isUserCompaction,
    final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
  // Preliminary compaction subject to filters
  ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
  // Stuck and not compacting enough (estimate). It is not guaranteed that we will be
  // able to compact more if stuck and compacting, because ratio policy excludes some
  // non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
  int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
  boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
      >= storeConfigInfo.getBlockingFileCount();
  candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
  LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
      filesCompacting.size() + " compacting, " + candidateSelection.size() +
      " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
 
  // If we can't have all files, we cannot do major anyway
  boolean isAllFiles = candidateFiles.size() == candidateSelection.size();
  if (!(forceMajor && isAllFiles)) {
    // 過濾掉大文件
    candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak);
    isAllFiles = candidateFiles.size() == candidateSelection.size();
  }
  ...
}

其中 skipLargeFiles 方法對待合并文件進行過濾,去掉大文件,該閾值是由 

maxCompactSize =

conf.getLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, Long.MAX_VALUE)配置,默認是 Long.MAX_VALUE。

/**
 * @param candidates pre-filtrate
 * @return filtered subset
 * exclude all files above maxCompactSize
 * Also save all references. We MUST compact them
 */
private ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates,
  boolean mayUseOffpeak) {
  int pos = 0;
  while (pos < candidates.size() && !candidates.get(pos).isReference()
    && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) {
    ++pos;
  }
  if (pos > 0) {
    LOG.debug("Some files are too large. Excluding " + pos
        + " files from compaction candidates");
    candidates.subList(0, pos).clear();
  }
  return candidates;
}

之后再通過待合并文件的大小來選擇 long Compaction 線程池還是 small Compaction 的線程池。

@Override
public boolean throttleCompaction(long compactionSize) {
  return compactionSize > comConf.getThrottlePoint();
}

這個閾值的計算方法如下,默認是2.5G,就是說如果待合并的文件大小大于2.5G,就會放到 long Compaction 的線程池中去執行。

throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
          2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());

4)查看 ReigonServer 該時間段的日志,發現有大量大于 2.5G 的文件在 Compaction,這就解釋了為什么RS日志中該時間段并沒有做 Major Compaction 的日志但是 long Compaction 隊列有值的問題。

圖片

至此,問題原因就找到了,入流量的增加導致單個 HFile 文件比較大,Flush 之后做 Minor Compaction 的時候如果待合并文件總大小大于2.5G(默認值)的時候,會將此次 Minor Compaction 放入到 long Compaction 的線程池中執行。待合并的文件比較大導致磁盤 IO 消耗比較高,進而影響到讀寫性能。

5)措施

我們調整了 Compaction 的參數 

hbase.hstore.compaction.max.size 將該值修改為2G,表示在 Minor Compaction 的時候大于 2G 的 HFile 將會被排除,等到業務低峰期的時候再對大于2G的文件合并,減少 Compaction 對磁盤 IO 的影響。

6)效果

調整之后,在非手動觸發 Major Compaction 期間就很少有占用 long Compaction 線程池的情況出現了,讀寫平均耗時也降到了50ms以下。

圖片


圖片

4.2 定時手動觸發的 Major Compation 任務執行時間過長

業務反饋某張表的讀寫性能最近有點慢,通過監控查看到該表的存儲一直在增長,存儲單副本達到了578TB。查看表信息,該表的TTL設置的15天,該表的輸入流量也沒有明顯的增加。監控圖如下:

圖片


圖片

于是懷疑每天的 Compaction 任務沒有做完,導致過期數據未能完全刪除。查看線上配置,Major Compaction 的線程池大小是1,該表的數據量又比較大。于是調整了 Compaction 線程池的大小為10,并且設置了集群的空閑時間 hbase.offpeak.start.hour 與 hbase.offpeak

.end.hour,在這個時間段內 Compaction 的時候可以增加待合并文件大小。調整完成后,通過監控查看 Compaction 的效果對比圖,可以看到 Compaction 的工作量明顯增大了。

圖片

查看該表所占存儲的大小,可以看到該表已經從 578T 下降到了 349T,下降幅度達到了40%。業務的讀寫耗時也恢復正常。Compaction 的參數比較重要, 在調整的時候需要考慮對業務是否有影響,調整之后要多觀察業務的耗時情況,可以循序漸進的對參數進行調整。

五、Compaction相關參數介紹

下面附上 Compaction 相關的參數,線上環境可以根據實際情況進行調整。

圖片

六、總結

Compaction 是 HBase 提升讀寫性能非常重要的手段,而 Compaction 的邏輯又比較復雜,并且使用不當,會導致寫放大,進而會影響到正常的讀寫請求。本文重點介紹了 Compaction 的觸發機制、Compaction 發展過程中出現的多種合并策略、待合并文件的選擇算法、 Compaction 的限流以及 Compaction 相關的參數做了詳細的描述,最后選擇線上的2個案例,介紹了具體的分析思路和調優的方法,經調優后,性能得到了成倍的提升,保障了業務高效、穩定的運行。

責任編輯:龐桂玉 來源: vivo互聯網技術
相關推薦

2021-07-12 09:17:54

Memory Comp系統內存

2021-11-21 23:03:38

jvm調優虛擬機

2020-05-22 09:12:46

HTTP3網絡協議

2025-02-06 08:24:25

AQS開發Java

2009-06-08 16:52:00

2017-04-17 15:48:15

Cinder備份實踐

2024-05-10 11:35:22

Redis延時隊列數據庫

2025-02-08 08:10:00

2017-06-16 09:39:32

優酷實踐阿里云

2023-02-26 11:50:04

Hbase程序Oracle

2017-12-20 15:10:09

HBaseHadoop數據

2021-12-20 00:03:38

Webpack運行機制

2020-09-03 14:30:40

Tomcat 拆解調優

2017-05-04 16:35:45

2023-10-29 16:26:27

Python動查重

2011-07-08 16:02:54

HBase

2009-07-24 13:54:39

MVVM模式

2023-02-22 07:04:05

自動機原理優化實踐

2010-03-30 14:16:32

無線上網卡原理

2021-11-07 23:49:19

SQL數據庫工具
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 免费av手机在线观看 | 亚洲精品1区2区3区 91免费看片 | 91精品国产欧美一区二区成人 | 黑人精品xxx一区一二区 | 亚洲三区在线观看 | 成年人在线观看视频 | 国产精品久久久久久238 | 夜夜操天天干 | 一区二区三区中文字幕 | 久久久网| 国产精品一区二区福利视频 | 日韩三 | 日日噜噜夜夜爽爽狠狠 | 欧美一区二不卡视频 | 色999日韩 | 九色91视频 | 国产精品91视频 | www久久久 | 四虎成人免费电影 | 精品成人佐山爱一区二区 | 欧美日韩三区 | 喷潮网站 | 色综合久久天天综合网 | 亚洲精品456| 日本色婷婷 | 久久久久无码国产精品一区 | 欧美精品一| 亚洲精品国产精品国自产在线 | 日韩一区中文字幕 | www日韩高清 | 免费观看羞羞视频网站 | 国产有码 | 成人午夜视频在线观看 | 久久久999免费视频 999久久久久久久久6666 | 亚洲欧美在线视频 | 国产日产精品一区二区三区四区 | 中文字幕中文字幕 | 国产美女视频黄 | 免费av观看 | 自拍 亚洲 欧美 老师 丝袜 | 国产精品久久网 |