Citus 分布式 PostgreSQL 集群-SQL Reference(查詢處理)
一個 Citus 集群由一個 coordinator 實例和多個 worker 實例組成。數據在 worker 上進行分片和復制,而 coordinator 存儲有關這些分片的元數據。向集群發出的所有查詢都通過 coordinator 執行。 coordinator 將查詢劃分為更小的查詢片段,其中每個查詢片段可以在分片上獨立運行。然后協調器將查詢片段分配給 worker,監督他們的執行,合并他們的結果,并將最終結果返回給用戶。查詢處理架構可以通過下圖進行簡要描述。
Citus 的查詢處理管道涉及兩個組件:
- 分布式查詢計劃器和執行器
- PostgreSQL 計劃器和執行器
我們將在后續部分中更詳細地討論它們。
分布式查詢計劃器
Citus 的分布式查詢計劃器接收 SQL 查詢并規劃它以進行分布式執行。
對于 SELECT 查詢,計劃器首先創建輸入查詢的計劃樹,并將其轉換為可交換和關聯形式,以便可以并行化。它還應用了一些優化以確保以可擴展的方式執行查詢,并最大限度地減少網絡 I/O。
接下來,計劃器將查詢分為兩部分 - 在 coordinator 上運行的 coordinator 查詢和在 worker 上的各個分片上運行的 worker 查詢片段。然后,計劃器將這些查詢片段分配給 worker,以便有效地使用他們的所有資源。在這一步之后,分布式查詢計劃被傳遞給分布式執行器執行。
分布列上的鍵值查找或修改查詢的規劃過程略有不同,因為它們恰好命中一個分片。一旦計劃器收到傳入的查詢,它需要決定查詢應該路由到的正確分片。為此,它提取傳入行中的分布列并查找元數據以確定查詢的正確分片。然后,計劃器重寫該命令的 SQL 以引用分片表而不是原始表。然后將該重寫的計劃傳遞給分布式執行器。
分布式查詢執行器
Citus 的分布式執行器運行分布式查詢計劃并處理故障。執行器非常適合快速響應涉及過濾器、聚合和共置連接的查詢,以及運行具有完整 SQL 覆蓋的單租戶查詢。它根據需要為每個分片打開一個與 woker 的連接,并將所有片段查詢發送給他們。然后它從每個片段查詢中獲取結果,合并它們,并將最終結果返回給用戶。
子查詢/CTE Push-Pull 執行
如有必要,Citus 可以將來自子查詢和 CTE 的結果收集到 coordinator 節點中,然后將它們推送回 worker 以供外部查詢使用。這允許 Citus 支持更多種類的 SQL 構造。
例如,在 WHERE 子句中包含子查詢有時不能與主查詢同時執行內聯,而必須單獨執行。假設 Web 分析應用程序維護一個按 page_id 分區的 page_views 表。要查詢前 20 個訪問量最大的頁面上的訪問者主機數,我們可以使用子查詢來查找頁面列表,然后使用外部查詢來計算主機數。
SELECT page_id, count(distinct host_ip)
FROM page_views
WHERE page_id IN (
SELECT page_id
FROM page_views
GROUP BY page_id
ORDER BY count(*) DESC
LIMIT 20
)
GROUP BY page_id;
執行器希望通過 page_id 對每個分片運行此查詢的片段,計算不同的 host_ips,并在 coordinator 上組合結果。但是,子查詢中的 LIMIT 意味著子查詢不能作為片段的一部分執行。通過遞歸規劃查詢,Citus 可以單獨運行子查詢,將結果推送給所有 worker,運行主片段查詢,并將結果拉回 coordinator。 push-pull(推拉) 設計支持上述子查詢。
讓我們通過查看此查詢的 EXPLAIN 輸出來了解這一點。它相當參與:
GroupAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: remote_scan.page_id
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: remote_scan.page_id
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
-> Distributed Subplan 6_1
-> Limit (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC
-> HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: remote_scan.page_id
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=9701 dbname=postgres
-> HashAggregate (cost=54.70..56.70 rows=200 width=12)
Group Key: page_id
-> Seq Scan on page_views_102008 page_views (cost=0.00..43.47 rows=2247 width=4)
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=9701 dbname=postgres
-> HashAggregate (cost=84.50..86.75 rows=225 width=36)
Group Key: page_views.page_id, page_views.host_ip
-> Hash Join (cost=17.00..78.88 rows=1124 width=36)
Hash Cond: (page_views.page_id = intermediate_result.page_id)
-> Seq Scan on page_views_102008 page_views (cost=0.00..43.47 rows=2247 width=36)
-> Hash (cost=14.50..14.50 rows=200 width=4)
-> HashAggregate (cost=12.50..14.50 rows=200 width=4)
Group Key: intermediate_result.page_id
-> Function Scan on read_intermediate_result intermediate_result (cost=0.00..10.00 rows=1000 width=4)
讓我們把它拆開并檢查每一塊。
GroupAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: remote_scan.page_id
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: remote_scan.page_id
樹的 root 是 coordinator 節點對 worker 的結果所做的事情。在這種情況下,它正在對它們進行分組,并且 GroupAggregate 要求首先對它們進行排序。
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
-> Distributed Subplan 6_1
.
自定義掃描有兩個大子樹,從“分布式子計劃”開始。
-> Limit (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC
-> HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: remote_scan.page_id
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=9701 dbname=postgres
-> HashAggregate (cost=54.70..56.70 rows=200 width=12)
Group Key: page_id
-> Seq Scan on page_views_102008 page_views (cost=0.00..43.47 rows=2247 width=4)
.
工作節點為 32 個分片中的每一個運行上述內容(Citus 正在選擇一個代表進行顯示)。我們可以識別 IN (...) 子查詢的所有部分:排序、分組和限制。當所有 worker 完成此查詢后,他們會將其輸出發送回 coordinator,coordinator 將其組合為“中間結果”。
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=9701 dbname=postgres
-> HashAggregate (cost=84.50..86.75 rows=225 width=36)
Group Key: page_views.page_id, page_views.host_ip
-> Hash Join (cost=17.00..78.88 rows=1124 width=36)
Hash Cond: (page_views.page_id = intermediate_result.page_id)
.
Citus 在第二個子樹中啟動另一個執行器作業。它將在 page_views 中計算不同的主機。它使用 JOIN 連接中間結果。中間結果將幫助它限制在前二十頁。
-> Seq Scan on page_views_102008 page_views (cost=0.00..43.47 rows=2247 width=36)
-> Hash (cost=14.50..14.50 rows=200 width=4)
-> HashAggregate (cost=12.50..14.50 rows=200 width=4)
Group Key: intermediate_result.page_id
-> Function Scan on read_intermediate_result intermediate_result (cost=0.00..10.00 rows=1000 width=4)
.
工作人員使用 read_intermediate_result 函數在內部檢索中間結果,該函數從 coordinator 節點復制的文件中加載數據。
這個例子展示了 Citus 如何使用分布式子計劃在多個步驟中執行查詢,以及如何使用 EXPLAIN 來了解分布式查詢執行。
PostgreSQL 計劃器和執行器
一旦分布式執行器將查詢片段發送給 worker,它們就會像常規 PostgreSQL 查詢一樣被處理。該 worker 上的 PostgreSQL 計劃程序選擇在相應分片表上本地執行該查詢的最佳計劃。 PostgreSQL 執行器然后運行該查詢并將查詢結果返回給分布式執行器。您可以從 PostgreSQL 手冊中了解有關 PostgreSQL 計劃器和執行器的更多信息。最后,分布式執行器將結果傳遞給 coordinator 進行最終聚合。
- 計劃器
http://www.postgresql.org/docs/current/static/planner-optimizer.html
- 執行器
http://www.postgresql.org/docs/current/static/executor.html