?使用Python做數據處理的數據科學家或數據從業者,對數據科學包pandas并不陌生,也不乏像云朵君一樣的pandas重度使用者,項目開始寫的第一行代碼,大多是 import pandas as pd。pandas做數據處理可以說是yyds!而他的缺點也是非常明顯,pandas 只能單機處理,它不能隨數據量線性伸縮。例如,如果 pandas 試圖讀取的數據集大于一臺機器的可用內存,則會因內存不足而失敗。
另外 ?pandas 在處理大型?數據方面非常慢,雖然有像Dask 或 Vaex 等其他庫來優化提升數據處理速度,但在大數據處理神之框架Spark面前,也是小菜一碟。
幸運的是,在新的 Spark 3.2 版本中,出現了一個新的Pandas API,將pandas大部分功能都集成到PySpark中,使用pandas的接口,就能使用Spark,因為 Spark 上的 Pandas API 在后臺使用 Spark,這樣就能達到強強聯手的效果,可以說是非常強大,非常方便。
這一切都始于 2019 年 Spark + AI 峰會。Koalas 是一個開源項目,可以在 Spark 之上使用 Pandas。一開始,它只覆蓋了 Pandas 的一小部分功能,但后來逐漸壯大起來。現在,在新的 Spark 3.2 版本中,Koalas 已合并到 PySpark。
Spark 現在集成了 Pandas API,因此可以在 Spark 上運行 Pandas。只需要更改一行代碼:
import pyspark.pandas as ps
由此我們可以獲得諸多的優勢?:
- 如果我們熟悉使用Python 和 Pandas,但不熟悉 Spark,可以省略了需復雜的學習過程而立即使用PySpark。
- 可以為所有內容使用一個代碼庫:無論是小數據和大數據,還是單機和分布式機器。
- 可以在Spark分布式框架上,更快地運行 Pandas 代碼。?
最后一點尤其值得注意。
一方面,可以將分布式計算應用于在 Pandas 中的代碼。且借助 Spark 引擎,代碼即使在單臺機器上也會更快!下圖展示了在一臺機器(具有 96 個 vCPU 和 384 GiBs 內存)上運行 Spark 和單獨調用 pandas 分析 130GB 的 CSV 數據集的性能對比。

多線程和 Spark SQL Catalyst Optimizer 都有助于優化性能。例如,Join count 操作在整個階段代碼生成時快 4 倍:沒有代碼生成時為 5.9 秒,代碼生成時為 1.6 秒。
Spark 在鏈式操作(chaining operations)中具有特別顯著的優勢。Catalyst 查詢優化器可以識別過濾器以明智地過濾數據并可以應用基于磁盤的連接(disk-based joins),而 Pandas 傾向于每一步將所有數據加載到內存中。
現在是不是迫不及待的想嘗試如何在 Spark 上使用 Pandas API 編寫一些代碼?我們現在就開始吧!?
在 Pandas ?/ Pandas-on-Spark / Spark 之間切換
需要知道的第一件事是我們到底在使用什么。在使用 Pandas 時,使用類pandas.core.frame.DataFrame?。在 Spark 中使用 pandas API 時,使用pyspark.pandas.frame.DataFrame。雖然?兩者相似,但不相同。主要區別在于前者在單機中,而后者是分布式的。
?可以使用 Pandas-on-Spark 創建一個 Dataframe 并將其轉換為 Pandas,反之亦然:
# import Pandas-on-Spark
import pyspark.pandas as ps
# 使用 Pandas-on-Spark 創建一個 DataFrame
ps_df = ps.DataFrame(range(10))
# 將 Pandas-on-Spark Dataframe 轉換為 Pandas Dataframe
pd_df = ps_df.to_pandas()
# 將 Pandas Dataframe 轉換為 Pandas-on-Spark Dataframe
ps_df = ps.from_pandas(pd_df)
注意,如果使用多臺機器,則在將 Pandas-on-Spark Dataframe 轉換為 Pandas Dataframe 時,數據會從多臺機器傳輸到一臺機器,反之亦然(可參閱PySpark 指南[1])。
還可以將 Pandas-on-Spark Dataframe 轉換為 Spark DataFrame,反之亦然:
# 使用 Pandas-on-Spark 創建一個 DataFrame
ps_df = ps.DataFrame(range(10))
# 將 Pandas-on-Spark Dataframe 轉換為 Spark Dataframe
spark_df = ps_df.to_spark()
# 將 Spark Dataframe 轉換為 Pandas-on-Spark Dataframe
ps_df_new = spark_df.to_pandas_on_spark()
數據類型如何改變??
在使用 Pandas-on-Spark 和 Pandas 時,數據類型基本相同。將 Pandas-on-Spark DataFrame 轉換為 Spark DataFrame 時,數據類型會自動轉換為適當的類型(請參閱PySpark 指南[2])
下面的示例顯示了在轉換時是?如何將數據類型從 PySpark DataFrame 轉換為 pandas-on-Spark DataFrame。
>>> sdf = spark.createDataFrame([
... (1, Decimal(1.0), 1., 1., 1, 1, 1, datetime(2020, 10, 27), "1", True, datetime(2020, 10, 27)),
... ], 'tinyint tinyint, decimal decimal, float float, double double, integer integer, long long, short short, timestamp timestamp, string string, boolean boolean, date date')
>>> sdf
DataFrame[tinyint: tinyint, decimal: decimal(10,0),
float: float, double: double, integer: int,
long: bigint, short: smallint, timestamp: timestamp,
string: string, boolean: boolean, date: date]
psdf = sdf.pandas_api()
psdf.dtypes
tinyint int8
decimal object
float float32
double float64
integer int32
long int64
short int16
timestamp datetime64[ns]
string object
boolean bool
date object
dtype: object
Pandas-on-Spark vs Spark 函數
在 Spark 中的 DataFrame 及其在 Pandas-on-Spark 中的最常用函數。注意,Pandas-on-Spark 和 Pandas 在語法上的唯一區別就是 import pyspark.pandas as ps 一行。
當你看完如下內容后,你會發現,即使您不熟悉 Spark,也可以通過 Pandas API 輕松使用。
導入庫
# 運行Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Spark") \
.getOrCreate()
# 在Spark上運行Pandas
import pyspark.pandas as ps
讀取數據
以 old dog iris 數據集為例。
# SPARK
sdf = spark.read.options(inferSchema='True',
header='True').csv('iris.csv')
# PANDAS-ON-SPARK
pdf = ps.read_csv('iris.csv')
選擇
# SPARK
sdf.select("sepal_length","sepal_width").show()
# PANDAS-ON-SPARK
pdf[["sepal_length","sepal_width"]].head()
刪除列
# SPARK
sdf.drop('sepal_length').show()# PANDAS-ON-SPARK
pdf.drop('sepal_length').head()
刪除重復項
# SPARK
sdf.dropDuplicates(["sepal_length","sepal_width"]).show()
# PANDAS-ON-SPARK
pdf[["sepal_length", "sepal_width"]].drop_duplicates()
篩選
# SPARK
sdf.filter( (sdf.flower_type == "Iris-setosa") & (sdf.petal_length > 1.5) ).show()
# PANDAS-ON-SPARK
pdf.loc[ (pdf.flower_type == "Iris-setosa") & (pdf.petal_length > 1.5) ].head()
計數
# SPARK
sdf.filter(sdf.flower_type == "Iris-virginica").count()
# PANDAS-ON-SPARK
pdf.loc[pdf.flower_type == "Iris-virginica"].count()
唯一值
# SPARK
sdf.select("flower_type").distinct().show()
# PANDAS-ON-SPARK
pdf["flower_type"].unique()
排序
# SPARK
sdf.sort("sepal_length", "sepal_width").show()
# PANDAS-ON-SPARK
pdf.sort_values(["sepal_length", "sepal_width"]).head()
分組
# SPARK
sdf.groupBy("flower_type").count().show()
# PANDAS-ON-SPARK
pdf.groupby("flower_type").count()
替換
# SPARK
sdf.replace("Iris-setosa", "setosa").show()
# PANDAS-ON-SPARK
pdf.replace("Iris-setosa", "setosa").head()
連接
#SPARK
sdf.union(sdf)
# PANDAS-ON-SPARK
pdf.append(pdf)
transform 和 apply 函數應用
有許多 API 允許用戶針對 pandas-on-Spark DataFrame 應用函數,例如:
DataFrame.transform()
DataFrame.apply()
DataFrame.pandas_on_spark.transform_batch()
DataFrame.pandas_on_spark.apply_batch()
Series.pandas_on_spark.transform_batch()
每個 API 都有不同的用途,并且在內部工作方式不同。
transform 和 apply
DataFrame.transform()和DataFrame.apply()之間的主要區別在于,前者需要返回相同長度的輸入,而后者不需要。
# transform
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pser):
return pser + 1 # 應該總是返回與輸入相同的長度。
psdf.transform(pandas_plus)
# apply
psdf = ps.DataFrame({'a': [1,2,3], 'b':[5,6,7]})
def pandas_plus(pser):
return pser[pser % 2 == 1] # 允許任意長度
psdf.apply(pandas_plus)
在這種情況下,每個函數采用一個 pandas Series,Spark 上的 pandas API 以分布式方式計算函數,如下所示。

在“列”軸的情況下,該函數將每一行作為一個熊貓系列。
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pser):
return sum(pser) # 允許任意長度
psdf.apply(pandas_plus, axis='columns')
上面的示例將每一行的總和計算為pands Series

pandas_on_spark.transform_batch和pandas_on_spark.apply_batch
batch 后綴表示 pandas-on-Spark DataFrame 或 Series 中的每個塊。API 對 pandas-on-Spark DataFrame 或 Series 進行切片,然后以 pandas DataFrame 或 Series 作為輸入和輸出應用給定函數。請參閱以下示例:
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pdf):
return pdf + 1 # 應該總是返回與輸入相同的長度。
psdf.pandas_on_spark.transform_batch(pandas_plus)
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pdf):
return pdf[pdf.a > 1] # 允許任意長度
psdf.pandas_on_spark.apply_batch(pandas_plus)
兩個示例中的函數都將 pandas DataFrame 作為 pandas-on-Spark DataFrame 的一個塊,并輸出一個 pandas DataFrame。Spark 上的 Pandas API 將 pandas 數據幀組合為 pandas-on-Spark 數據幀。

在 Spark 上使用 pandas API的注意事項
避免shuffle
某些操作,例如sort_values在并行或分布式環境中比在單臺機器上的內存中更難完成,因為它需要將數據發送到其他節點,并通過網絡在多個節點之間交換數據。
避免在單個分區上計算
另一種常見情況是在單個分區上進行計算。目前, DataFrame.rank 等一些 API 使用 PySpark 的 Window 而不指定分區規范。這會將所有數據移動到單個機器中的單個分區中,并可能導致嚴重的性能下降。對于非常大的數據集,應避免使用此類 API。
不要使用重復的列名
不允許使用重復的列名,因為 Spark SQL 通常不允許這樣做。Spark 上的 Pandas API 繼承了這種行為。例如,見下文:
import pyspark.pandas as ps
psdf = ps.DataFrame({'a': [1, 2], 'b':[3, 4]})
psdf.columns = ["a", "a"]
Reference 'a' is ambiguous, could be: a, a.;
此外,強烈建議不要使用區分大小寫的列名。Spark 上的 Pandas API 默認不允許它。
import pyspark.pandas as ps
psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
Reference 'a' is ambiguous, could be: a, a.;
但可以在 Spark 配置spark.sql.caseSensitive中打開以啟用它,但需要自己承擔風險。
from pyspark.sql import SparkSession
builder = SparkSession.builder.appName("pandas-on-spark")
builder = builder.config("spark.sql.caseSensitive", "true")
builder.getOrCreate()
import pyspark.pandas as ps
psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
psdf
使用默認索引
pandas-on-Spark 用戶面臨的一個常見問題是默認索引導致性能下降。當索引未知時,Spark 上的 Pandas API 會附加一個默認索引,例如 Spark DataFrame 直接轉換為 pandas-on-Spark DataFrame。
如果計劃在生產中處理大數據,請通過將默認索引配置為distributed或distributed-sequence來使其確保為分布式。
有關配置默認索引的更多詳細信息,請參閱默認索引類型[3]。
在 Spark 上使用 pandas API
盡管 Spark 上的 pandas API 具有大部分與 pandas 等效的 API,但仍有一些 API 尚未實現或明確不受支持。因此盡可能直接在 Spark 上使用 pandas API。
例如,Spark 上的 pandas API 沒有實現__iter__(),阻止用戶將所有數據從整個集群收集到客戶端(驅動程序)端。不幸的是,許多外部 API,例如 min、max、sum 等 Python 的內置函數,都要求給定參數是可迭代的。對于 pandas,它開箱即用,如下所示:
>>> import pandas as pd
>>> max(pd.Series([1, 2, 3]))
3
>>> min(pd.Series([1, 2, 3]))
1
>>> sum(pd.Series([1, 2, 3]))
6
Pandas 數據集存在于單臺機器中,自然可以在同一臺機器內進行本地迭代。但是,pandas-on-Spark 數據集存在于多臺機器上,并且它們是以分布式方式計算的。很難在本地迭代,很可能用戶在不知情的情況下將整個數據收集到客戶端。因此,最好堅持使用 pandas-on-Spark API。上面的例子可以轉換如下:
>>> import pyspark.pandas as ps
>>> ps.Series([1, 2, 3]).max()
3
>>> ps.Series([1, 2, 3]).min()
1
>>> ps.Series([1, 2, 3]).sum()
6
pandas 用戶的另一個常見模式可能是依賴列表推導式或生成器表達式。但是,它還假設數據集在引擎蓋下是本地可迭代的。因此,它可以在 pandas 中無縫運行,如下所示:
import pandas as pd
data = []
countries = ['London', 'New York', 'Helsinki']
pser = pd.Series([20., 21., 12.], index=countries)
for temperature in pser:
assert temperature > 0
if temperature > 1000:
temperature = None
data.append(temperature ** 2)
pd.Series(data, index=countries)
London 400.0
New York 441.0
Helsinki 144.0
dtype: float64
但是,對于 Spark 上的 pandas API,它的工作原理與上述相同。上面的示例也可以更改為直接使用 pandas-on-Spark API,如下所示:
import pyspark.pandas as ps
import numpy as np
countries = ['London', 'New York', 'Helsinki']
psser = ps.Series([20., 21., 12.], index=countries)
def square(temperature) -> np.float64:
assert temperature > 0
if temperature > 1000:
temperature = None
return temperature ** 2
psser.apply(square)
London 400.0
New York 441.0
Helsinki 144.0
減少對不同 DataFrame 的操作
Spark 上的 Pandas API 默認不允許對不同 DataFrame(或 Series)進行操作,以防止昂貴的操作。只要有可能,就應該避免這種操作。
寫在最后
到目前為止,我們將能夠在 Spark 上使用 Pandas。這將會導致Pandas 速度的大大提高,遷移到 Spark 時學習曲線的減少,以及單機計算和分布式計算在同一代碼庫中的合并。
參考資料
[1]PySpark 指南: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/pandas_pyspark.html
[2]PySpark 指南: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/types.html
[3]默認索引類型: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#default-index-type