成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

使用基于Snowflake的Snowpark DataFrames進行數據處理

譯文
數據庫 SQL Server
Snowpark是Snowflake一個新的開發庫,它提供了一個API讓用戶可以使用Scala(后續也會有Java和Python)等編程語言來代替SQL進行數據處理。

簡介

Snowpark是Snowflake一個新的開發庫,它提供了一個API讓用戶可以使用編程語言像Scala(后續也會有Java和Python)來代替SQL進行數據處理。

Snowpark的核心概念是DataFrame(數據框),它表示一組數據,就比如說一些數據庫表的行,我們可以用最喜歡的工具通過面向對象或者函數式編程的方式處理。Snowpark DataFrames的概念類似于Apache Spark或者Python中Pandas包的DataFrames的含義,是一種表格型的數據結構。

開發者也可以創建自定義函數推送到Snowflake服務器,來更方便地處理數據。Snowpark的代碼執行采用了惰性計算的方式,這減少了從Snowpark倉庫到客戶端之間的數據流轉。

當前版本的Snowpark可以運行在Scala 2.12和JDK 8、9、10或11上。它現在處于公開預覽階段,可用于所有賬戶。

架構特點

從架構的角度來看,Snowpark客戶端類似于Apache Spark Driver程序。它執行用戶在客戶端編寫的代碼并轉為SQL語句推送給Snowpark數據倉庫,等Snowpark計算服務端處理完數據后,接收以DataFrame格式組成的返回結果。

廣義的說,Snowpark數據倉庫的操作可以分為兩類:轉換和執行。由于轉換是延遲執行的,因此它們不會觸發DataFrames數據的計算處理過程。像select(查詢),filter(過濾),sort(排序),groupBy(分組)等等都屬于轉換范疇的操作。而執行是正好相反的,它們會觸發對DataFrames數據的計算。Snowpark將針對DataFrame數據的SQL語句發送到服務端進行計算,然后將結果返回給客戶端內存。show,collect,take等都屬于執行操作。

Snowpark執行

在我們可以執行任何Snowpark轉換和執行之前,我們需要先連接到Snowpark數據倉庫并建立會話。

Scala
object Main {
def main(args: Array[String]): Unit = {
// Replace the <placeholders> below.
val configs = Map (
"URL" -> "https://<SNOWFLAKE-INSTANCE>.snowflakecomputing.com:443",
"USER" -> "<USERNAME>",
"PASSWORD" -> "<PASSWORD>",
"ROLE" -> "SYSADMIN",
"WAREHOUSE" -> "SALESFORCE_ACCOUNT",
"DB" -> "SALESFORCE_DB",
"SCHEMA" -> "SALESFORCE"
)
val session = Session.builder.configs(configs).create
session.sql("show tables").show()
}
}


從Snowpark管理頁面上看,我們有一個SALESFORCE_DB數據庫和一個有3個表的SALESFORCE:SALESFORCE_ACCOUNT表表示來自Salesforce實例的賬戶,SALESFORCE_ORDER表存儲由這些賬戶發起的訂單,SALESFORCE_ACCOUNT_ORDER是一個關聯表,存儲關聯的查詢結果(我們在這篇文章的后面會再論述這點)。

要檢索Salesforce_Account表的前10行,我們可以簡單地執行以下DataFrame方法:

Scala

 // Create a DataFrame from the data in the "salesforce_account" table.
val dfAccount = session.table("salesforce_account")
// To print out the first 10 rows, call:
     dfAccount.show()


Snowpark會把代碼轉換成SQL語句并交給Snowflake執行:

Scala

[main] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: XXXX]  SELECT  *  FROM ( SELECT  *  FROM (salesforce_account)) LIMIT 10

在我們的VSCode IDE中的輸出看起來像這樣:

我們也可以過濾某些行并執行DataFrame的轉換(例如,選擇指定的列):

Scala

    val dfFilter = session.table("salesforce_account").filter(col("type") === "Customer - Direct")
dfFilter.show()
val dfSelect = session.table("salesforce_account").select(col("accountname"), col("phone"))
     dfSelect.show()

Snowpark將生成相應的SQL查詢,并將它們交給Snowflake計算服務器執行:

[main] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: XXXX]  SELECT  *  FROM ( SELECT  *  FROM ( SELECT  *  FROM (salesforce_account)) WHERE ("TYPE" = 'Customer - Direct')) LIMIT 10

 [main] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: XXXX]  SELECT  *  FROM ( SELECT "ACCOUNTNAME", "PHONE" FROM ( SELECT  *  FROM (salesforce_account))) LIMIT 10

下面是在VSCode中的輸出:

Snowpark DataFrame API也允許DataFrames數據間的拼接關聯。在這個例子中,我們有SALESFORCE_ORDER表,記錄了由Salesforce賬戶產生的賬單數據,我們可以將這些數據拉到DataFrame中,并將它們與賬戶記錄連接起來:

Scala

    val dfOrder = session.table("salesforce_order")
dfOrder.show()
val dfJoin = dfAccount.join(dfOrder, col("sfdcid") === col("accountid")).select(col("accountname"), col("phone"),col("productname"), col("amount"))
dfJoin.show()

Snowflake把DataFrame方法轉換為SQL語句,然后推送給Snowflake數據倉庫進行計算。在VSCode中輸出如下:

如果我們想持久化保存計算結果,可以使用saveAsTable這個方法:

Scala

 dfJoin.write.mode(SaveMode.Overwrite).saveAsTable("salesforce_account_order")

生成的SQL語句看起來就像這樣:

Scala

[main] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: XXXX]  CREATE  OR  REPLACE  TABLE salesforce_account_order AS  SELECT  *  FROM ( SELECT "ACCOUNTNAME", "PHONE", "PRODUCTNAME", "AMOUNT" FROM ( SELECT  *  FROM (( SELECT "ACCOUNTNAME" AS "ACCOUNTNAME", "PHONE" AS "PHONE", "TYPE" AS "TYPE", "SFDCID" AS "SFDCID" FROM ( SELECT  *  FROM (salesforce_account))) AS SNOWPARK_TEMP_TABLE_UKKLR6UCHN6POXL INNER JOIN ( SELECT "ACCOUNTID" AS "ACCOUNTID", "PRODUCTNAME" AS "PRODUCTNAME", "AMOUNT" AS "AMOUNT" FROM ( SELECT  *  FROM (salesforce_order))) AS SNOWPARK_TEMP_TABLE_36DEOZXTQJUYKLD ON ("SFDCID" = "ACCOUNTID"))))

隨后,Snowpark會創建一個新表或者替換掉已存在的舊表,來存儲生成的數據:

結語

Snowpark為數據處理提供了豐富的操作和工具。它允許用戶創建非常復雜的高級數據處理管道操作。將用戶自定義的代碼推到Snowflake數據倉庫服務端,并通過減少不必要的數據傳輸,在數據端執行,這是Snowpark的一個非常強大的特性。

譯者介紹

盧鑫旺,51CTO社區編輯,半路出家的九零后程序員。做過前端頁面,寫過業務接口,搞過爬蟲,研究過JS,有幸接觸Golang,參與微服務架構轉型。目前主寫Java,負責公司可定制化低代碼平臺的數據引擎層設計開發工作。

原文標題:Snowflake Data Processing With Snowpark DataFrames,作者:Istvan Szegedi


責任編輯:華軒 來源: 51CTO
相關推薦

2023-09-27 15:34:48

數據編程

2022-01-21 13:53:29

云計算邊緣計算數據

2021-07-08 09:51:18

MaxCompute SQL數據處理

2021-07-17 22:41:53

Python數據技術

2023-05-05 19:29:41

2017-10-31 11:55:46

sklearn數據挖掘自動化

2023-10-11 14:37:21

工具開發

2024-10-30 10:00:00

Python函數

2017-02-16 08:41:09

數據Vlookup匹配

2019-09-30 10:12:21

機器學習數據映射

2009-09-08 16:50:12

使用LINQ進行數據轉

2022-11-02 14:45:24

Python數據分析工具

2009-03-16 10:29:45

數據挖掘過濾器Access

2023-12-12 11:06:37

PythonPandas數據

2023-08-15 16:20:42

Pandas數據分析

2022-03-28 14:08:02

Python數據清洗數據集

2022-05-24 09:52:37

Spark SQL大數據處理Hive

2024-05-08 14:05:03

時間序列數據

2009-07-16 14:46:48

jdbc statem
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 欧美视频第三页 | 国产精品96久久久久久 | 久久精品青青大伊人av | 欧美一级视频免费看 | 久草网站 | 国产精品无码专区在线观看 | 欧美日韩精品一区二区三区视频 | 黄免费在线 | 欧美成人一区二区三区 | 国产欧美一区二区三区在线看 | 中文欧美日韩 | 欧美激情视频一区二区三区在线播放 | 91大片| 国产精品久久久亚洲 | 亚洲精品久久久久久久久久久久久 | 精品日本久久久久久久久久 | 中文字幕不卡在线观看 | 欧美精品一区二区三区在线播放 | 人和拘一级毛片c | 国产在线精品一区二区三区 | 中文字幕日韩专区 | 黑人粗黑大躁护士 | 欧美不卡在线 | 国产精品亚洲欧美日韩一区在线 | 97免费在线观看视频 | 密色视频| 狠狠操电影| 精品国产一区二区在线 | 日韩福利片 | 6996成人影院网在线播放 | av天天干 | 日韩精品1区2区3区 国产精品国产成人国产三级 | 亚洲婷婷六月天 | 亚洲欧美自拍偷拍视频 | 午夜在线影院 | 久久久久亚洲 | 天堂在线91| 欧美一区二区三区在线看 | 精品网站999 | 亚洲欧美日韩在线 | 亚洲国产精品日韩av不卡在线 |