成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

基于PySpark SQL的媒體瀏覽日志ETL作業

數據庫 其他數據庫
pyspark除了官方的文檔,網上的教程資料一直很少,但基于調度平臺下,使用pyspark編寫代碼非常高效,程序本身是提交到spark集群中,性能上也是毫無問題的

pyspark除了官方的文檔,網上的教程資料一直很少,但基于調度平臺下,使用pyspark編寫代碼非常高效,程序本身是提交到spark集群中,性能上也是毫無問題的,在本文中,我們將深入探討基于Spark的媒體瀏覽日志ETL(提取、轉換、加載)流水線的詳細實現,在展示如何使用PySpark SQL處理大規模的媒體瀏覽日志數據,包括IP地址轉換、數據清洗、時間維度補充、碼表關聯等關鍵步驟。

一、環境配置

首先,我們需要創建一個SparkSession并導入必要的庫和設置默認參數,包括與IP-to-Location數據庫的交互以及其他相關的配置。

如果pyspark僅僅是本地運行而不是提交集群時,可以使用findspark庫,它能夠幫助我們快速初始化Spark環境。在開始之前,確保您已經成功安裝了findspark庫,并已經下載并解壓了Spark二進制文件。將Spark的安裝路徑和Python解釋器路徑指定為變量。

import findspark

# 指定Spark的安裝路徑
spark_home = "/usr/local/spark"

# 指定用于Spark的Python解釋器路徑
python_path = "/home/hadoop/.conda/envs/sparkbox/bin/python3.6"

# 使用findspark.init方法初始化Spark環境
findspark.init(spark_home, python_path)

findspark.init方法將幫助設置PYSPARK_PYTHON和SPARK_HOME環境變量,確保正確的Spark庫和配置文件被加載。其簡化了Spark環境的初始化過程,避免手動配置環境變量。

二、數據處理

接下來,我們定義了一個NewsEtl類,用于執行數據處理和轉換的各個步驟。這包括從HDFS中讀取媒體瀏覽日志數據,進行IP地址轉、換,清洗數據,添加時間維度,補充碼表信息等。

在spark_function中,我們詳細說明了數據處理的邏輯。這包括讀取媒體瀏覽日志數據、進行IP地址轉換、添加時間維度、補充碼表信息、數據清洗和最終寫入HDFS等步驟。

1.數據讀取

首先,我們使用PySpark的read方法從HDFS中讀取媒體瀏覽日志數據。我們指定了數據的schema,以確保正確地解析每一列。

df = spark.read.schema(schema).parquet(
"hdfs://xxx:8020/user/hive/warehouse/xxx.db/ods_media_browse_log").filter(
"dt in ({})".format(",".join(["'{}'".format(partition) for partition in latest_partitions])))

2.IP地址轉換

接下來,我們通過iptranslate函數將IP地址轉換為地理位置信息。這使用了XdbSearcher類,該類負責讀取xdb文件并執行IP地址的二分查找。

# 根據IP地址獲取地點信息
from_ip_get_place_udf = udf(action.iptranslate, struct_schema)
df = df.withColumn('country', from_ip_get_place_udf(col('ip'), lit('country')))
df = df.withColumn("place", from_ip_get_place_udf(col('ip')))
df = df.withColumn("country", df["place"]["country"])
df = df.withColumn("city", df["place"]["city"])
df = df.withColumn("province", df["place"]["province"])
df = df.drop('place')

3.時間維度添加

我們生成當前時間的時間戳,并添加各種時間格式的列,包括年、季度、月、日、小時等。

# 生成當前時間的時間戳
df = df.withColumn("current_timestamp", from_unixtime(df["operation_time"] / 1000))
# 添加各種時間格式的列
df = df.withColumn("year", date_format("current_timestamp", "yyyy"))
df = df.withColumn("quarter", date_format("current_timestamp", "yyyy-MM"))
df = df.withColumn("month", date_format("current_timestamp", "yyyy-MM"))
df = df.withColumn("day", date_format("current_timestamp", "dd"))
df = df.withColumn("hour_time", date_format("current_timestamp", "yyyy-MM-dd HH"))
df = df.withColumn("dt", date_format("current_timestamp", "yyyy-MM-dd"))
df = df.withColumn("hour", date_format("current_timestamp", "HH"))
df = df.drop('current_timestamp')

4.數據清洗

最后,我們對數據進行清洗,包括將空值替換為默認值、字符串去除空格、數據類型轉換等。

# 數據清洗
newdf = newdf.withColumn("media_type", when(col("media_type").isNull(), 0).otherwise(col("media_type")))
newdf = newdf.withColumn("news_type", when(col("news_type").isNull(), 99).otherwise(col("news_type")))
newdf = newdf.withColumn("original_type", when(col("original_type").isNull(), 99).otherwise(col("original_type")))
# ...

5.最終寫入HDFS

最終,我們將處理后的數據寫入HDFS,采用分區方式存儲,以便更高效地管理和查詢。

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
newdf.write.partitionBy("dt", "hour").mode("overwrite").option('user', 'hive').parquet(
"hdfs://xxxx:8020/user/hive/warehouse/xxx.db/dwd_media_browse_log")

通過這一系列步驟,我們完成了對媒體瀏覽日志數據的全面處理,包括數據轉換、地理位置信息的添加、時間維度的補充和數據清洗等關鍵步驟。

三、結論

通過詳細的實現步驟,深入解析了基于Spark的媒體瀏覽日志ETL任務的構建過程。這個任務可以根據具體需求進行調整和擴展,為大規模數據處理任務提供了一種高效而靈活的解決方案。

責任編輯:華軒 來源: 口袋大數據
相關推薦

2010-10-19 12:11:15

SQL Server定

2009-04-16 08:32:43

Cooliris瀏覽器媒體中心

2023-04-04 12:38:50

GPT機器人LLM

2010-10-20 15:11:53

SQL Server作

2019-07-08 09:10:48

TigGitLinux

2022-06-28 08:40:16

LokiPromtail日志報警

2010-10-20 17:00:51

SQL Server代

2011-04-06 14:16:49

SQL Server自動備份

2009-04-13 08:40:30

AMD瀏覽器Fusion Medi

2009-02-04 16:11:45

2009-01-15 10:43:21

SCDMA通信產業McWiLL

2010-10-27 11:12:39

2009-04-03 09:09:21

瀏覽器網絡辦公室

2012-04-16 10:04:08

Eclipse瀏覽器IDE

2012-04-11 10:16:02

EclipseIDE

2013-07-08 14:45:52

2019-07-16 07:15:42

瀏覽器網絡威脅網絡安全

2010-09-13 14:12:21

SQL Server日

2013-05-20 09:36:32

Hadoop大數據分析工具大數據

2010-09-06 09:36:51

SQL語句
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 欧美激情在线观看一区二区三区 | 一区二区三区四区不卡 | 国产精品久久久久久久久久久久午夜片 | 国产成人av电影 | 东京av男人的天堂 | 九色91视频 | 日韩欧美国产一区二区三区 | 国产精品久久 | 国产一区二区视频免费在线观看 | 中文字幕免费视频 | 中文字幕乱码一区二区三区 | 日韩精品免费一区二区在线观看 | 亚洲一区导航 | 午夜精品久久久久久久99黑人 | 国产精品久久久久久久久久了 | 在线欧美一区 | 日韩中文一区 | 亚洲国产精品一区二区久久 | 国产 91 视频 | 亚州av| 久久久亚洲一区 | 国产欧美日韩综合精品一 | 午夜影院在线观看 | 久久久91精品国产一区二区三区 | 亚洲一二三区免费 | 亚洲精品4 | 欧美午夜视频 | 欧美在线日韩 | 国产视频福利 | 欧美高清成人 | 日韩欧美在线视频一区 | 国产乱码精品1区2区3区 | 色毛片 | 性视频网 | 亚洲天堂一区 | 亚洲欧美综合 | 久久av一区 | 中文字幕日韩在线观看 | 国产精品自产拍在线观看蜜 | 精久久久 | 毛片一区二区三区 |