使用基于Snowflake的Snowpark DataFrames進行數據處理
譯文簡介
Snowpark是Snowflake一個新的開發庫,它提供了一個API讓用戶可以使用編程語言像Scala(后續也會有Java和Python)來代替SQL進行數據處理。
Snowpark的核心概念是DataFrame(數據框),它表示一組數據,就比如說一些數據庫表的行,我們可以用最喜歡的工具通過面向對象或者函數式編程的方式處理。Snowpark DataFrames的概念類似于Apache Spark或者Python中Pandas包的DataFrames的含義,是一種表格型的數據結構。
開發者也可以創建自定義函數推送到Snowflake服務器,來更方便地處理數據。Snowpark的代碼執行采用了惰性計算的方式,這減少了從Snowpark倉庫到客戶端之間的數據流轉。
當前版本的Snowpark可以運行在Scala 2.12和JDK 8、9、10或11上。它現在處于公開預覽階段,可用于所有賬戶。
架構特點
從架構的角度來看,Snowpark客戶端類似于Apache Spark Driver程序。它執行用戶在客戶端編寫的代碼并轉為SQL語句推送給Snowpark數據倉庫,等Snowpark計算服務端處理完數據后,接收以DataFrame格式組成的返回結果。
廣義的說,Snowpark數據倉庫的操作可以分為兩類:轉換和執行。由于轉換是延遲執行的,因此它們不會觸發DataFrames數據的計算處理過程。像select(查詢),filter(過濾),sort(排序),groupBy(分組)等等都屬于轉換范疇的操作。而執行是正好相反的,它們會觸發對DataFrames數據的計算。Snowpark將針對DataFrame數據的SQL語句發送到服務端進行計算,然后將結果返回給客戶端內存。show,collect,take等都屬于執行操作。
Snowpark執行
在我們可以執行任何Snowpark轉換和執行之前,我們需要先連接到Snowpark數據倉庫并建立會話。
Scala
object Main {
def main(args: Array[String]): Unit = {
// Replace the <placeholders> below.
val configs = Map (
"URL" -> "https://<SNOWFLAKE-INSTANCE>.snowflakecomputing.com:443",
"USER" -> "<USERNAME>",
"PASSWORD" -> "<PASSWORD>",
"ROLE" -> "SYSADMIN",
"WAREHOUSE" -> "SALESFORCE_ACCOUNT",
"DB" -> "SALESFORCE_DB",
"SCHEMA" -> "SALESFORCE"
)
val session = Session.builder.configs(configs).create
session.sql("show tables").show()
}
}
從Snowpark管理頁面上看,我們有一個SALESFORCE_DB數據庫和一個有3個表的SALESFORCE:SALESFORCE_ACCOUNT表表示來自Salesforce實例的賬戶,SALESFORCE_ORDER表存儲由這些賬戶發起的訂單,SALESFORCE_ACCOUNT_ORDER是一個關聯表,存儲關聯的查詢結果(我們在這篇文章的后面會再論述這點)。
要檢索Salesforce_Account表的前10行,我們可以簡單地執行以下DataFrame方法:
Scala
// Create a DataFrame from the data in the "salesforce_account" table.
val dfAccount = session.table("salesforce_account")
// To print out the first 10 rows, call:
dfAccount.show()
Snowpark會把代碼轉換成SQL語句并交給Snowflake執行:
Scala
[main] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: XXXX] SELECT * FROM ( SELECT * FROM (salesforce_account)) LIMIT 10
在我們的VSCode IDE中的輸出看起來像這樣:
我們也可以過濾某些行并執行DataFrame的轉換(例如,選擇指定的列):
Scala
val dfFilter = session.table("salesforce_account").filter(col("type") === "Customer - Direct")
dfFilter.show()
val dfSelect = session.table("salesforce_account").select(col("accountname"), col("phone"))
dfSelect.show()
Snowpark將生成相應的SQL查詢,并將它們交給Snowflake計算服務器執行:
[main] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: XXXX] SELECT * FROM ( SELECT * FROM ( SELECT * FROM (salesforce_account)) WHERE ("TYPE" = 'Customer - Direct')) LIMIT 10
[main] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: XXXX] SELECT * FROM ( SELECT "ACCOUNTNAME", "PHONE" FROM ( SELECT * FROM (salesforce_account))) LIMIT 10
下面是在VSCode中的輸出:
Snowpark DataFrame API也允許DataFrames數據間的拼接關聯。在這個例子中,我們有SALESFORCE_ORDER表,記錄了由Salesforce賬戶產生的賬單數據,我們可以將這些數據拉到DataFrame中,并將它們與賬戶記錄連接起來:
Scala
val dfOrder = session.table("salesforce_order")
dfOrder.show()
val dfJoin = dfAccount.join(dfOrder, col("sfdcid") === col("accountid")).select(col("accountname"), col("phone"),col("productname"), col("amount"))
dfJoin.show()
Snowflake把DataFrame方法轉換為SQL語句,然后推送給Snowflake數據倉庫進行計算。在VSCode中輸出如下:
如果我們想持久化保存計算結果,可以使用saveAsTable這個方法:
Scala
dfJoin.write.mode(SaveMode.Overwrite).saveAsTable("salesforce_account_order")
生成的SQL語句看起來就像這樣:
Scala
[main] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: XXXX] CREATE OR REPLACE TABLE salesforce_account_order AS SELECT * FROM ( SELECT "ACCOUNTNAME", "PHONE", "PRODUCTNAME", "AMOUNT" FROM ( SELECT * FROM (( SELECT "ACCOUNTNAME" AS "ACCOUNTNAME", "PHONE" AS "PHONE", "TYPE" AS "TYPE", "SFDCID" AS "SFDCID" FROM ( SELECT * FROM (salesforce_account))) AS SNOWPARK_TEMP_TABLE_UKKLR6UCHN6POXL INNER JOIN ( SELECT "ACCOUNTID" AS "ACCOUNTID", "PRODUCTNAME" AS "PRODUCTNAME", "AMOUNT" AS "AMOUNT" FROM ( SELECT * FROM (salesforce_order))) AS SNOWPARK_TEMP_TABLE_36DEOZXTQJUYKLD ON ("SFDCID" = "ACCOUNTID"))))
隨后,Snowpark會創建一個新表或者替換掉已存在的舊表,來存儲生成的數據:
結語
Snowpark為數據處理提供了豐富的操作和工具。它允許用戶創建非常復雜的高級數據處理管道操作。將用戶自定義的代碼推到Snowflake數據倉庫服務端,并通過減少不必要的數據傳輸,在數據端執行,這是Snowpark的一個非常強大的特性。
譯者介紹
盧鑫旺,51CTO社區編輯,半路出家的九零后程序員。做過前端頁面,寫過業務接口,搞過爬蟲,研究過JS,有幸接觸Golang,參與微服務架構轉型。目前主寫Java,負責公司可定制化低代碼平臺的數據引擎層設計開發工作。
原文標題:Snowflake Data Processing With Snowpark DataFrames,作者:Istvan Szegedi