基于PySpark SQL的媒體瀏覽日志ETL作業
pyspark除了官方的文檔,網上的教程資料一直很少,但基于調度平臺下,使用pyspark編寫代碼非常高效,程序本身是提交到spark集群中,性能上也是毫無問題的,在本文中,我們將深入探討基于Spark的媒體瀏覽日志ETL(提取、轉換、加載)流水線的詳細實現,在展示如何使用PySpark SQL處理大規模的媒體瀏覽日志數據,包括IP地址轉換、數據清洗、時間維度補充、碼表關聯等關鍵步驟。
一、環境配置
首先,我們需要創建一個SparkSession并導入必要的庫和設置默認參數,包括與IP-to-Location數據庫的交互以及其他相關的配置。
如果pyspark僅僅是本地運行而不是提交集群時,可以使用findspark庫,它能夠幫助我們快速初始化Spark環境。在開始之前,確保您已經成功安裝了findspark庫,并已經下載并解壓了Spark二進制文件。將Spark的安裝路徑和Python解釋器路徑指定為變量。
import findspark
# 指定Spark的安裝路徑
spark_home = "/usr/local/spark"
# 指定用于Spark的Python解釋器路徑
python_path = "/home/hadoop/.conda/envs/sparkbox/bin/python3.6"
# 使用findspark.init方法初始化Spark環境
findspark.init(spark_home, python_path)
findspark.init方法將幫助設置PYSPARK_PYTHON和SPARK_HOME環境變量,確保正確的Spark庫和配置文件被加載。其簡化了Spark環境的初始化過程,避免手動配置環境變量。
二、數據處理
接下來,我們定義了一個NewsEtl類,用于執行數據處理和轉換的各個步驟。這包括從HDFS中讀取媒體瀏覽日志數據,進行IP地址轉、換,清洗數據,添加時間維度,補充碼表信息等。
在spark_function中,我們詳細說明了數據處理的邏輯。這包括讀取媒體瀏覽日志數據、進行IP地址轉換、添加時間維度、補充碼表信息、數據清洗和最終寫入HDFS等步驟。
1.數據讀取
首先,我們使用PySpark的read方法從HDFS中讀取媒體瀏覽日志數據。我們指定了數據的schema,以確保正確地解析每一列。
df = spark.read.schema(schema).parquet(
"hdfs://xxx:8020/user/hive/warehouse/xxx.db/ods_media_browse_log").filter(
"dt in ({})".format(",".join(["'{}'".format(partition) for partition in latest_partitions])))
2.IP地址轉換
接下來,我們通過iptranslate函數將IP地址轉換為地理位置信息。這使用了XdbSearcher類,該類負責讀取xdb文件并執行IP地址的二分查找。
# 根據IP地址獲取地點信息
from_ip_get_place_udf = udf(action.iptranslate, struct_schema)
df = df.withColumn('country', from_ip_get_place_udf(col('ip'), lit('country')))
df = df.withColumn("place", from_ip_get_place_udf(col('ip')))
df = df.withColumn("country", df["place"]["country"])
df = df.withColumn("city", df["place"]["city"])
df = df.withColumn("province", df["place"]["province"])
df = df.drop('place')
3.時間維度添加
我們生成當前時間的時間戳,并添加各種時間格式的列,包括年、季度、月、日、小時等。
# 生成當前時間的時間戳
df = df.withColumn("current_timestamp", from_unixtime(df["operation_time"] / 1000))
# 添加各種時間格式的列
df = df.withColumn("year", date_format("current_timestamp", "yyyy"))
df = df.withColumn("quarter", date_format("current_timestamp", "yyyy-MM"))
df = df.withColumn("month", date_format("current_timestamp", "yyyy-MM"))
df = df.withColumn("day", date_format("current_timestamp", "dd"))
df = df.withColumn("hour_time", date_format("current_timestamp", "yyyy-MM-dd HH"))
df = df.withColumn("dt", date_format("current_timestamp", "yyyy-MM-dd"))
df = df.withColumn("hour", date_format("current_timestamp", "HH"))
df = df.drop('current_timestamp')
4.數據清洗
最后,我們對數據進行清洗,包括將空值替換為默認值、字符串去除空格、數據類型轉換等。
# 數據清洗
newdf = newdf.withColumn("media_type", when(col("media_type").isNull(), 0).otherwise(col("media_type")))
newdf = newdf.withColumn("news_type", when(col("news_type").isNull(), 99).otherwise(col("news_type")))
newdf = newdf.withColumn("original_type", when(col("original_type").isNull(), 99).otherwise(col("original_type")))
# ...
5.最終寫入HDFS
最終,我們將處理后的數據寫入HDFS,采用分區方式存儲,以便更高效地管理和查詢。
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
newdf.write.partitionBy("dt", "hour").mode("overwrite").option('user', 'hive').parquet(
"hdfs://xxxx:8020/user/hive/warehouse/xxx.db/dwd_media_browse_log")
通過這一系列步驟,我們完成了對媒體瀏覽日志數據的全面處理,包括數據轉換、地理位置信息的添加、時間維度的補充和數據清洗等關鍵步驟。
三、結論
通過詳細的實現步驟,深入解析了基于Spark的媒體瀏覽日志ETL任務的構建過程。這個任務可以根據具體需求進行調整和擴展,為大規模數據處理任務提供了一種高效而靈活的解決方案。