成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

自我修復數(shù)據(jù)管道:數(shù)據(jù)工程的下一件大事?

譯文 精選
人工智能 后端
使用GPT-4和Python等人工智能工具在谷歌云平臺上構(gòu)建自我修復的數(shù)據(jù)管道,可以自動檢測和修復錯誤。?

譯者 | 李睿

審校 | 重樓

梅西百貨公司首席數(shù)據(jù)工程師Naresh Erukulla是一位勇于迎接挑戰(zhàn)的數(shù)據(jù)工程師,他擅長用簡潔明了的概念驗證(POC)解決各種問題。最近,Naresh關注到了數(shù)據(jù)工程師日常工作中普遍遭遇的一個難題,并為此采取行動為所有批處理和流數(shù)據(jù)管道設置了警報系統(tǒng)。當錯誤超過閾值或數(shù)據(jù)管道出現(xiàn)故障時,可以迅速通過電子郵件向數(shù)據(jù)工程師發(fā)送故障通知,確保問題能夠得到及時處理。

一切似乎都在順利進行中,直到他注意到一個關鍵數(shù)據(jù)集無法加載到BigQuery中。在調(diào)查了錯誤日志之后,發(fā)現(xiàn)一些“缺少所需數(shù)據(jù)”提示的消息。當看到用戶輸入文件中頻繁出現(xiàn)的原始數(shù)據(jù)問題時,他為此感到困惑

處理數(shù)據(jù)不一致問題,特別是數(shù)據(jù)缺失或格式錯誤,會在分析和運營工作流程的后續(xù)環(huán)節(jié)引發(fā)嚴重的后果。有一個關鍵的下游報告正是建立在這些輸入數(shù)據(jù)的基礎之上。該報告在日常業(yè)務中發(fā)揮著至關重要的作用,它能夠反映出公司在多個領域內(nèi)的關鍵指標表現(xiàn),并且為決策制定提供了不可或缺的數(shù)據(jù)支持。在這份至關重要的報告中,所有高管級別的利益相關者都依賴這些數(shù)據(jù)來展示業(yè)績指標、討論面臨的挑戰(zhàn)以及規(guī)劃未來的發(fā)展路徑。

Erukulla耗費了數(shù)小時檢查源CSV文件,該文件承載了來自另一個上游應用程序的量事務數(shù)據(jù)。準確識別并修正問題行顯得至關重要。然而,當他著手處理這些問題時,發(fā)現(xiàn)已經(jīng)錯過截止日期,這無疑令利益相關者深感失望。Erukulla意識到傳統(tǒng)數(shù)據(jù)管道脆弱。它們很容易出錯,而且往往需要多次人工干預來進行修復,這個過程既耗時又容易出錯。

人們是否也遇到過類似的情況?是否花費了大量時間調(diào)試數(shù)據(jù)管道,結(jié)果卻發(fā)現(xiàn)根本原因只是一個簡單的格式錯誤或缺少必填字段?事實上,世界各地的數(shù)據(jù)工程師每天都在努力應對這些挑戰(zhàn)。那么是否有可以構(gòu)建能夠自我修復”數(shù)據(jù)管道的方法?這正是Erukulla追求目標

自我修復數(shù)據(jù)管道的工作原理

自我修復數(shù)據(jù)管道的想法很簡單:當數(shù)據(jù)處理過程中出現(xiàn)錯誤時,數(shù)據(jù)管道應該自動檢測、分析和糾正錯誤,而無需人工干預。傳統(tǒng)上,解決這些問題需要人工干預,這既耗時又容易出錯。

雖然有多種方法可以實現(xiàn)這一點,,但使用人工智能代理是最好的方法,也是數(shù)據(jù)工程師在未來自我修復故障數(shù)據(jù)管道并動態(tài)自動糾正它們的方法。本文將展示如何使用像GPT-4/DeepSeek R1模型這樣的LLM來自修復數(shù)據(jù)管道的基本實現(xiàn),其方法是使用LLM對失敗記錄進行分析并提出建議,并在數(shù)據(jù)管道運行的過程中應用這些修復措施。所提供的解決方案可以擴展到大型數(shù)據(jù)管道,并將擴展更多的功能。

以下介紹如何利用OpenAI API在云計算環(huán)境中使用GPT-4模型構(gòu)建一個實用的管道。遵循的基本步驟如下:

  • 將源文件上傳到谷歌云存儲桶(Google Cloud Storage Bucket)。如果沒有谷歌云平臺的訪問權限,則可以使用任何本地或其他云存儲。
  • 創(chuàng)建數(shù)據(jù)模型,用于將原始數(shù)據(jù)提取到BigQuery表中,將錯誤記錄提取到錯誤表中。
  • 從CSV中讀取源文件,并從輸入數(shù)據(jù)中識別干凈(Clean)數(shù)據(jù)集和無效記錄錯誤行(Error Rows)數(shù)據(jù)集。
  • 將Clean數(shù)據(jù)集導入BigQuery,并使用提示將Error Rows數(shù)據(jù)集傳遞給LLM。
  • 對于每個錯誤行(Error Rows),OpenAI的GPT API進行分析并提供智能產(chǎn)品ID分配。
  • 使用Google BigQuery動態(tài)存儲和檢索產(chǎn)品信息。
  • 基于Python的自動化無縫集成。

可以參閱ErukullaGitHub上的完整代碼庫。

1.從云存儲讀取輸入數(shù)據(jù)

數(shù)據(jù)管道首先讀取存儲在Cloud Storage中的客戶端上傳的CSV文件可以利用云函數(shù)(無服務器執(zhí)行管道步驟)在新文件上傳到存儲桶時觸發(fā)。該函數(shù)使用谷歌云存儲庫(google-cloud-storage)讀取文件,并將其解析為Pandas DataFrame以供進一步處理。

在將數(shù)據(jù)傳遞到下一步之前,可以實施一些數(shù)據(jù)質(zhì)量檢查。然而,現(xiàn)實世界中的數(shù)據(jù)問題是動態(tài)的,無法預測和編寫所有測試用例,這會使代碼變得復雜且難以閱讀。

在這個用例中,CSV文件包含字段ProductID、Price、name、saleAmount。以下是包含數(shù)據(jù)的示例文件(ProductID和Price字段中也缺少數(shù)據(jù))。

1 # Read CSV from GCS
2 client = storage.Client()
3 bucket = client.bucket(bucket_name)
4 blob = bucket.blob(file_name)
5 data = blob.download_as_text()
6 df = pd.read_csv(io.StringIO(data))
7

2.將數(shù)據(jù)導入BigQuery

接下來,數(shù)據(jù)管道嘗試將數(shù)據(jù)導入到BigQuery中。如果由于模式不匹配、數(shù)據(jù)類型錯誤或缺少字段而導致任何行失敗,則捕獲并記錄它們以供進一步分析。這一步驟對于檢測底層錯誤信息至關重要,這些錯誤信息將用于識別OpenAI API的可能解決方案。

1 # Function to clean and preprocess data
2 def clean_data(df):
3 avg_price = get_average_price()
4 
5 df["Price"] = df["Price"].fillna(avg_price)
6
7 # Log and remove rows with issues
8 error_rows = df[df["ProductID"].isna()]
9 clean_df = df.dropna(subset=["ProductID"])
10
11 return clean_df, error_rows
12
13 # Function to query BigQuery for an average price
14 def get_average_price():
15 client = bigquery.Client()
16 query = f"SELECT AVG(Price) AS avg_price FROM `{BQ_PROJECT_ID}.{BQ_DATASET_ID}.Product_Info`"
17 
18 try:
19 df = client.query(query).to_dataframe()
20 avg_price = df["avg_price"][0]
21 print(f"Fetched Average Price: {avg_price}")
22 return avg_price
23 except Exception as e:
24 print(f"Error fetching average price: {e}")
25 return None
26

注意,avg_price = get_average_price()是從BigQuery查詢中獲取的。

在插入干凈的數(shù)據(jù)集之后如下圖所示:

3.使用LLM分析錯誤

分析錯誤是整個流程中的關鍵步驟,這就是采用LLM的神奇之處。失敗的記錄被發(fā)送到GPT-4或DeepSeek R1等LLM進行分析。LLM檢查錯誤并提出更正建議和修正后的記錄

例如,假設日期字段的格式不正確。在這種情況下,LLM可能會建議從字符串到整數(shù)轉(zhuǎn)換或從字符串到日期/時間戳轉(zhuǎn)換的正確格式記錄,反之亦然。在數(shù)據(jù)是預期的但發(fā)現(xiàn)為空的情況下,根據(jù)代碼強制執(zhí)行的規(guī)則,帶有“平均”(Average)或“默認”(Default)值的缺失值將被修復,以確保數(shù)據(jù)攝取成功。

通過重試機制實現(xiàn)ChatCompletion請求。

為了確保彈性,利用tenacity實現(xiàn)了重試機制。該函數(shù)將錯誤細節(jié)發(fā)送給GPT并檢索建議的修復程序。在本文的示例中,創(chuàng)建了‘functions’(函數(shù))列表,并使用ChatCompletion Request將其傳遞給JSON有效負載。

需要注意,‘functions’列表是使用在管道代碼中創(chuàng)建的Python函數(shù)來修復已知或可能問題的所有函數(shù)的列表。GPT分析輸入提示符和錯誤消息,以確定是否調(diào)用‘functions’列表中列出的特定函數(shù)來修復問題。

因此,GPT的響應提供了指示應該調(diào)用哪個函數(shù)的結(jié)構(gòu)化數(shù)據(jù)。GPT不會直接執(zhí)行函數(shù),而是由數(shù)據(jù)管道來執(zhí)行。

1 @retry(wait=wait_random_exponential(min=1, max=40), stop=stop_after_attempt(3))
2 def chat_completion_request(messages, functinotallow=None, model=GPT_MODEL):
3 headers = {
4 "Content-Type": "application/json",
5 "Authorization": "Bearer " + openai.api_key,
6 }
7 json_data = {"model": model, "messages": messages}
8 if functions is not None:
9 json_data.update({"functions": functions})
10 try:
11 response = requests.post(
12 "https://api.openai.com/v1/chat/completions",
13 headers=headers,
14 jsnotallow=json_data,
15 )
16 return response.json()
17 except Exception as e:
18 print("Unable to generate ChatCompletion response")
19 print(f"Exception: {e}")
20 return e
21 # Function to send ChatCompletion request to OpenAI API
22 functions = [
23 {
24 "name": "assign_product_id",
25 "description": "assigning a unique ProductID",
26 "parameters": {
27 "type": "object",
28 "properties": {
29 "ProductID": {
30 "type": "integer",
31 "description": "The product ID to assign."
32 },
33 }
34 },
35 }
36 ]
37

assign_product_id是‘functions’列表中列出的函數(shù),GPT可以在需要時調(diào)用它。在這個示例中,CSV文件的最后兩行缺少ProductID。因此,GPT調(diào)用特定的assign_product_id函數(shù)來確定ProductID值。

assign_product_id函數(shù)從BigQuery中獲取最高分配的ProductID,并將其遞增以供后續(xù)使用。如果它是首次運行,或者BigQuery表中沒有可用的數(shù)據(jù),它將分配默認的99999作為ProductID。

1 def assign_product_id():
2 client = bigquery.Client()
3 # table_ref = client.dataset(BQ_DATASET_ID).table(BQ_TABLE_ID)
4
5 query = f"""
6 Select max(ProductID) as max_id from `{BQ_PROJECT_ID}.{BQ_DATASET_ID}.{BQ_TABLE_ID}` WHERE ProductID < 99999
7 """
8 df = client
9 try:
10 df = client.query(query).to_dataframe()
11 except Exception as e:
12 print(f"Error fetching max ProductID: {e}")
13 return None
14 return df["max_id"][0] + 1 if not df.empty else 99999
15

4.應用自動更正

數(shù)據(jù)管道將GPT的建議應用于失敗的記錄,并重新嘗試將它們導入到BigQuery中。如果更正成功,數(shù)據(jù)將存儲在主表中。如果沒有,不可修復的記錄將被記錄到一個單獨的錯誤表中,以供人工檢查。

在字段是必需且唯一的情況下,GPT可以從BigQuery獲得唯一的ProductID值,并在此值的基礎上加1,以確保其唯一性。考慮管道中有多個錯誤行的情況;每個記錄都按照GPT響應提供的修復程序順序處理,并用建議值更新錯誤記錄。

在以下的代碼中,ProductID被從assign_product_id()BigQuery表中獲取的值替換。當有多個錯誤行時,每個錯誤行都會通過遞增ProductID獲得一個唯一的數(shù)字。

1 # Function to send error data to GPT-4 for analysis
2 def analyze_errors_with_gpt(error_rows):
3 if error_rows.empty:
4 return error_rows
5
6 new_product_id = assign_product_id()
7
8 for index, row in error_rows.iterrows():
9 prompt = f"""
10 Fix the following ProductID by assigning a unique ProductID from the bigquery table Product_Info:
11 {row.to_json()}
12 """
13 chat_response = chat_completion_request(
14 model=GPT_MODEL,
15 messages=[{"role": "user", "content": prompt}],
16 functions=functions
17 )
18
19 if chat_response is not None:
20 try:
21 if chat_response["choices"][0]["message"]:
22 response_content = chat_response["choices"][0]["message"]
23 else:
24 print("Chat response content is None")
25 continue
26 except json.JSONDecodeError as e:
27 print(f"Error decoding JSON response: {e}")
28 continue
29
30 if 'function_call' in response_content:
31 if response_content['function_call']['name'] == 'assign_product_id':
32 res = json.loads(response_content['function_call']['arguments'])
33 res['product_id'] = new_product_id
34 error_rows.at[index, "ProductID"] = res['product_id']
35 new_product_id += 1 # Increment the ProductID for the next row
36
37 print(f"Assigned ProductID: {res['product_id']}")
38 else:
39 print("Function not supported")
40 else:
41 chat.add_prompt('assistant', response_content['content'])
42 else:
43 print("ChatCompletion request failed. Retrying...")
44
45 return error_rows
46

5.將已修改的行導入到BigQuery表中

main函數(shù)從谷歌云存儲(Google Cloud Storage)讀取數(shù)據(jù)并進行清理,并將有效數(shù)據(jù)導入到BigQuery中,同時動態(tài)修復錯誤。

1 # Main function to execute the pipeline
2 def main():
3 bucket_name = "self-healing-91"
4 file_name = "query_results.csv"
5
6 # Read CSV from GCS
7 client = storage.Client()
8 bucket = client.bucket(bucket_name)
9 blob = bucket.blob(file_name)
10 data = blob.download_as_text()
11 df = pd.read_csv(io.StringIO(data))
12
13 # Clean data and identify errors
14 clean_df, error_rows = clean_data(df)
15
16 # Load valid data into BigQuery
17 load_to_bigquery(clean_df, BQ_TABLE_ID)
18
19 # Process errors if any
20 if not error_rows.empty:
21
22 # Analyze errors with GPT-4
23 error_rows = analyze_errors_with_gpt(error_rows)
24
25 load_to_bigquery(error_rows, BQ_TABLE_ID)
26
27 print("Fixed Errors loaded successfully into BigQuery original table.")
28

在修復數(shù)據(jù)錯誤之后,需要特別檢查第66至68行。從BigQuery表中獲取最大值10000 ProductID后,對這些ID逐一進行遞增處理。此外,錯誤行中缺少信息的Price字段被BigQuery表中的Avg(Price)替換。

6.日志記錄和監(jiān)控

在整個過程中,使用云日志(Cloud Logging)記錄錯誤和數(shù)據(jù)管道的活動。這確保工程師可以監(jiān)控數(shù)據(jù)管道的運行狀況并排查問題

采用的工具和技術

以下是用來構(gòu)建和測試數(shù)據(jù)管道的關鍵工具和技術:

  • 云存儲:用于存儲輸入的CSV文件。
  • 函數(shù):用于無服務器執(zhí)行管道步驟。
  • BigQuery:用于存儲清理過的數(shù)據(jù)和錯誤日志。
  • GPT-4/DeepSeek R1:用于分析失敗記錄并提出更正建議。
  • 云日志:用于監(jiān)視和故障排除。
  • 云編排器:它用于使用Apache氣流編排管道。

面臨的挑戰(zhàn)

1. LLM集成

將LLM集成到數(shù)據(jù)管道中頗具挑戰(zhàn)性。必須確保API調(diào)用是有效的,LLM的響應是準確的。此外,還有成本方面的考慮,由于為LLM配置API對于大型數(shù)據(jù)集來說可能成本高昂。因此,只需知道必須為該服務設置一個API密鑰。

例如,對于OpenAI,必須訪問https://platform.openai.com/來注冊和生成新的API密鑰,并在發(fā)送帶有提示的API調(diào)用時在數(shù)據(jù)管道中使用它。

2.錯誤處理

設計一個穩(wěn)健的錯誤處理機制具有挑戰(zhàn)性。必須考慮各種錯誤,從模式不匹配到網(wǎng)絡問題,并確保數(shù)據(jù)管道能夠優(yōu)雅地處理它們。數(shù)據(jù)管道可能會面臨許多問題,而且所有問題都不能動態(tài)解決,例如文件為空或BigQuery表不存在等問題。

3.可擴展性

隨著數(shù)據(jù)量的增長,必須優(yōu)化數(shù)據(jù)管道以實現(xiàn)可擴展性。這涉及到在BigQuery中對數(shù)據(jù)進行分區(qū),并使用Dataflow進行大規(guī)模處理。

4.成本管理

雖然谷歌云平臺提供了強大的工具,但使用這些工具需要支付費用。因此必須仔細監(jiān)控使用情況并優(yōu)化數(shù)據(jù)管道,以避免額外的成本。OpenAI API成本是需要仔細監(jiān)控的另一個因素。

結(jié)論和要點

對于數(shù)據(jù)工程師來說,構(gòu)建自我修復的數(shù)據(jù)管道是一個改變游戲規(guī)則的方法。它可以減少人工干預,提高效率,保證數(shù)據(jù)質(zhì)量。然而,這并不是靈丹妙藥。雖然自我修復數(shù)據(jù)管道可以節(jié)省時間,但它們會帶來額外的成本,例如LLM API費用和增加的云函數(shù)的使用量。因此,權衡這些成本與收益至關重要。

對于自我修復數(shù)據(jù)管道領域的新手來說,建議從小型項目著手,先嘗試集成大型語言模型(LLM)和處理基本錯誤,然后再逐步擴展。在這一過程中,定期監(jiān)控數(shù)據(jù)管道的性能和成本。使用云監(jiān)控和云日志之類的工具來識別瓶頸并進行相應的優(yōu)化。最后,要與數(shù)據(jù)科學家、分析師和業(yè)務利益相關者緊密合作,了解他們的實際需求,并確保當業(yè)務需求發(fā)生變化時,其數(shù)據(jù)管道能夠持續(xù)創(chuàng)造價值。

總之,自我修復的數(shù)據(jù)管道代表著數(shù)據(jù)工程的未來。通過利用歌云平臺和LLM等工具,可以構(gòu)建健壯、高效、智能的管道,從而最大限度地減少停機時間并提升生產(chǎn)效率如果曾經(jīng)受到脆弱的數(shù)據(jù)管道的困擾,可以探索和采用這一方法,而前期的努力將帶來長期的收益。

原文標題Self-Healing Data Pipelines: The Next Big Thing in Data Engineering?,作者:Naresh Erukulla

責任編輯:姜華 來源: 51CTO
相關推薦

2021-12-06 13:54:05

全息數(shù)據(jù)存儲存儲數(shù)據(jù)存儲

2023-07-24 14:18:04

數(shù)據(jù)中心綜合布線

2021-06-01 11:18:14

云計算機密云云安全

2013-07-08 16:00:58

OpenFlow軟件定義網(wǎng)絡SDN

2022-07-22 09:37:26

BunWebpackJavaScript

2020-06-11 21:46:05

個性化醫(yī)療保健物聯(lián)網(wǎng)IOT

2018-09-27 12:34:33

物聯(lián)網(wǎng)汽車工業(yè)IOT

2018-07-03 16:00:25

無服務器云計算公共云

2020-04-27 10:26:23

網(wǎng)絡安全加密技術

2021-04-20 10:06:57

微軟Nuance公司人工智能

2018-01-31 11:26:54

2017-06-22 15:55:57

運維企業(yè)FreeWheel

2021-01-08 10:16:38

小米Miui

2019-04-13 00:30:12

網(wǎng)絡安全網(wǎng)絡安全大會西湖論劍

2024-04-07 12:57:10

數(shù)據(jù)訓練

2018-01-04 12:09:13

DevOps物聯(lián)網(wǎng)人工智能

2011-06-30 11:23:32

Python

2023-04-25 18:54:13

數(shù)據(jù)數(shù)據(jù)丟失

2015-06-12 10:01:25

程序員代碼

2021-07-02 15:25:40

數(shù)據(jù)中心網(wǎng)絡自動化基于意圖的網(wǎng)絡
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 精品久久久久久亚洲精品 | 在线色网站| 男女羞羞在线观看 | 国产精品免费一区二区三区 | 国内毛片毛片毛片毛片 | 九九综合九九 | 波多野结衣在线观看一区二区三区 | 欧美一区二区在线视频 | 黄色成人在线网站 | 午夜影院网站 | av在线一区二区三区 | 二区三区av | 久久久99国产精品免费 | 日韩一区二区在线视频 | 精品国产乱码久久久久久影片 | 日本精品久久久久 | 久久久久久久久久久久91 | 精品96久久久久久中文字幕无 | 亚洲欧美视频 | 国产乱码精品1区2区3区 | www.日韩欧美 | 成av在线| 久久久久久国产精品免费免费男同 | 午夜在线观看视频 | 午夜成人免费视频 | 欧美日韩中文在线观看 | 一区免费 | 国产91丝袜在线18 | 久久成人人人人精品欧 | 性高湖久久久久久久久aaaaa | 日韩中文字幕一区二区 | 成人影视网址 | 亚洲一区二区在线电影 | 国产免费人成xvideos视频 | 黄色av网站在线免费观看 | 伦理片97| 欧美激情精品久久久久久变态 | caoporn国产| 精品国产一区二区三区久久久蜜月 | 精品久久久久久久久久 | 日韩精品免费视频 |