成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Redis+Node.js實現一個能處理海量數據的異步任務隊列系統

開發 前端 其他數據庫 Redis
本文通過探索 Redis + NodeJS 結合的方式,構造出了一個異步任務隊列處理系統,能較好地完成最初方案的設想,但依然有很多問題需要改進。

 在最近的業務中,接到了一個需要處理約十萬條數據的需求。這些數據都以字符串的形式給到,并且處理它們的步驟是異步且耗時的(平均處理一條數據需要 25s 的時間)。如果以串行的方式實現,其耗時是相當長的:

總耗時時間 = 數據量 × 單條數據處理時間

T = N * t (N = 100,000; t = 25s)

總耗時時間 = 2,500,000 秒 ≈ 695 小時 ≈ 29 天

顯然,我們不能簡單地把數據一條一條地處理。那么有沒有辦法能夠減少處理的時間呢?經過調研后發現,使用異步任務隊列是個不錯的辦法。

一、異步任務隊列原理

我們可以把“處理單條數據”理解為一個異步任務,因此對這十萬條數據的處理,就可以轉化成有十萬個異步任務等待進行。我們可以把這十萬條數據塞到一個隊列里面,讓任務處理器自發地從隊列里面去取得并完成。

任務處理器可以有多個,它們同時從隊列里面把任務取走并處理。當任務隊列為空,表示所有任務已經被認領完;當所有任務處理器完成任務,則表示所有任務已經被處理完。

其基本原理如下圖所示:

首先來解決任務隊列的問題。在這個需求中,任務隊列里面的每一個任務,都包含了待處理的數據,數據以字符串的形式存在。為了方便起見,我們可以使用 Redis 的 List 數據格式來存放這些任務。

由于項目是基于 NodeJS 的,我們可以利用 PM2 的 Cluster 模式來啟動多個任務處理器,并行地處理任務。以一個 8 核的 CPU 為例,如果完全開啟了多進程,其理論處理時間將提升 8 倍,從 29 天縮短到 3.6 天。

接下來,我們會從實際編碼的角度來講解上述內容的實現過程。

二、使用 NodeJS 操作 Redis

異步任務隊列使用 Redis 來實現,因此我們需要部署一個單獨的 Redis 服務。在本地開發中為了快速完成 Redis 的安裝,我使用了 Docker 的辦法(默認機器已經安裝了 Docker)。

Docker 拉取 Redis 鏡像   

  1. docker pull redis:latest 

Docker 啟動 Redis   

  1. docker run -itd --name redis-local-p 6379:6379 redis 

此時我們已經使用 Docker 啟動了一個 Redis 服務,其對外的 IP 及端口為 127.0.0.1:6379。此外,我們還可以在本地安裝一個名為 Another Redis DeskTop Manager的 Redis 可視化工具,來實時查看、修改 Redis 的內容。

在 NodeJS 中,我們可以使用 node-redis 來操作 Redis。新建一個 mqclient.ts 文件并寫入如下內容: 

  1. import* asRedisfrom'redis'  
  2. const client = Redis.createClient({  
  3.   host: '127.0.0.1',  
  4.   port: 6379  
  5. })  
  6. exportdefault client 

Redis 本質上是一個數據庫,而我們對數據庫的操作無非就是增刪改查。node-redis 支持 Redis 的所有交互操作方式,但是操作結果默認是以回調函數的形式返回。為了能夠使用 async/await,我們可以新建一個 utils.ts 文件,把 node-redis 操作 Redis 的各種操作都封裝成 Promise 的形式,方便我們后續使用。   

  1. import client from'./mqClient'  
  2.    // 獲取 Redis 中某個 key 的內容  
  3.    exportconst getRedisValue = (key: string): Promise<string| null> => newPromise(resolve => client.get(key, (err, reply) => resolve(reply)))  
  4.    // 設置 Redis 中某個 key 的內容  
  5.    exportconst setRedisValue = (key: string, value: string) => newPromise(resolve => client.set(key, value, resolve))  
  6.    // 刪除 Redis 中某個 key 及其內容  
  7.    exportconst delRedisKey = (key: string) => newPromise(resolve => client.del(key, resolve)) 

除此之外,還能在 utils.ts 中放置其他常用的工具方法,以實現代碼的復用、保證代碼的整潔。

為了在 Redis 中創建任務隊列,我們可以單獨寫一個 createTasks.ts 的腳本,用于往隊列中塞入自定義的任務。 

  1. import{ TASK_NAME, TASK_AMOUNT, setRedisValue, delRedisKey } from'./utils'  
  2.   import client from'./mqClient'  
  3.   client.on('ready', async() => {  
  4.   await delRedisKey(TASK_NAME)  
  5.   for(let i = TASK_AMOUNT; i > 0; i--) {  
  6.       client.lpush(TASK_NAME, `task-${i}`)  
  7.   }  
  8.     client.lrange(TASK_NAME, 0, TASK_AMOUNT, async(err, reply) => {  
  9.   if(err) {  
  10.         console.error(err)  
  11.   return  
  12.   }  
  13.       console.log(reply)  
  14.       process.exit()  
  15.   })  
  16.   }) 

在這段腳本中,我們從 utils.ts 中獲取了各個 Redis 操作的方法,以及任務的名稱 TASKNAME (此處為 localtasks)和任務的總數 TASKAMOUNT(此處為 20 個)。通過 LPUSH 方法往 TASKNAME 的 List 當中塞入內容為 task-1 到 task-20 的任務,如圖所示:

三、異步任務處理

首先新建一個 index.ts 文件,作為整個異步任務隊列處理系統的入口文件。   

  1. import taskHandler from'./tasksHandler'  
  2.    import client from'./mqClient'  
  3.    client.on('connect', () => {  
  4.      console.log('Redis is connected!')  
  5.    })  
  6.    client.on('ready', async() => {  
  7.      console.log('Redis is ready!')  
  8.    await taskHandler()  
  9.    })  
  10.    client.on('error', (e) => {  
  11.      console.log('Redis error! '+ e)  
  12.    }) 

在運行該文件時,會自動連接 Redis,并且在 ready 狀態時執行任務處理器 taskHandler()。

在上一節的操作中,我們往任務隊列里面添加了 20 個任務,每個任務都是形如 task-n 的字符串。為了驗證異步任務的實現,我們可以在任務處理器 taskHandler.ts 中寫一段 demo 函數,來模擬真正的異步任務: 

  1. function handleTask(task: string) {  
  2.  returnnewPromise((resolve) => {  
  3.        setTimeout(async() => {  
  4.          console.log(`Handling task: ${task}...`)  
  5.          resolve()  
  6.  }, 2000)  
  7.  })  
  8.  } 

上面這個 handleTask() 函數,將會在執行的 2 秒后打印出當前任務的內容,并返回一個 Promise,很好地模擬了異步函數的實現方式。接下來我們將會圍繞這個函數,來處理隊列中的任務。

其實到了這一步為止,整個異步任務隊列處理系統已經基本完成了,只需要在 taskHandler.ts 中補充一點點代碼即可:   

  1. import{ popTask } from'./utils'  
  2.     import client from'./mqClient'  
  3.     function handleTask(task: string) { /* ... */}  
  4.     exportdefaultasyncfunction tasksHandler() {  
  5.     // 從隊列中取出一個任務  
  6.     const task = await popTask()  
  7.     // 處理任務  
  8.     await handleTask(task)  
  9.     // 遞歸運行  
  10.     await tasksHandler()  
  11.     } 

最后,我們使用 PM2 啟動 4 個進程,來試著跑一下整個項目:   

  1. pm2 start ./dist/index.js -i 4&& pm2 logs 

可以看到,4 個任務處理器分別處理完了隊列中的所有任務,相互之前互不影響。

事到如今已經大功告成了嗎?未必。為了測試我們的這套系統到底提升了多少的效率,還需要統計完成隊列里面所有任務的總耗時。

四、統計任務完成耗時

要統計任務完成的耗時,只需要實現下列的公式即可:

總耗時 = 最后一個任務的完成時間 - 首個任務被取得的時間

首先來解決“獲取首個任務被取得的時間”這個問題。

由于我們是通過 PM2 的 Cluster 模式來啟動應用的,且從 Redis 隊列中讀取任務是個異步操作,因此在多進程運行的情況下無法直接保證從隊列中讀取任務的先后順序,必須通過一個額外的標記來判斷。其原理如下圖:

如圖所示,綠色的 worker 由于無法保證運行的先后順序,所以編號用問號來表示。當第一個任務被取得時,把黃色的標記值從 false 設置成 true。當且僅當黃色的標記值為 false 時才會設置時間。這樣一來,當其他任務被取得時,由于黃色的標記值已經是 true 了,因此無法設置時間,所以我們便能得到首個任務被取得的時間。

在本文的例子中,黃色的標記值和首個任務被取得的時間也被存放在 Redis 中,分別被命名為 localtasksSETFIRST 和 localtasksBEGINTIME。

原理已經弄懂,但是在實踐中還有一個地方值得注意。我們知道,從 Redis 中讀寫數據也是一個異步操作。由于我們有多個 worker 但只有一個 Redis,那么在讀取黃色標記值的時候很可能會出現“沖突”的問題。舉個例子,當 worker-1 修改標記值為 true 的同時, worker-2 正好在讀取標記值。由于時間的關系,可能 worker-2 讀到的標記值依然是 false,那么這就沖突了。為了解決這個問題,我們可以使用 node-redlock 這個工具來實現“鎖”的操作。

顧名思義,“鎖”的操作可以理解為當 worker-1 讀取并修改標記值的時候,不允許其他 worker 讀取該值,也就是把標記值給鎖住了。當 worker-1 完成標記值的修改時會釋放鎖,此時才允許其他的 worker 去讀取該標記值。

node-redlock 是 Redis 分布式鎖 Redlock 算法的 JavaScript 實現,關于該算法的講解可參考 https://redis.io/topics/distlock。值得注意的是,在 node-redlock 在使用的過程中,如果要鎖一個已存在的 key,就必須為該 key 添加一個前綴 locks:,否則會報錯。

回到 utils.ts,編寫一個 setBeginTime() 的工具函數: 

  1. exportconst setBeginTime = async(redlock: Redlock) => {  
  2.  // 讀取標記值前先把它鎖住  
  3.  constlockawait redlock.lock(`lock:${TASK_NAME}_SET_FIRST`, 1000)  
  4.  const setFirst = await getRedisValue(`${TASK_NAME}_SET_FIRST`)  
  5.  // 當且僅當標記值不等于 true 時,才設置起始時間  
  6.  if(setFirst !== 'true') {  
  7.      console.log(`${pm2tips} Get the first task!`)  
  8.  await setRedisValue(`${TASK_NAME}_SET_FIRST`, 'true')  
  9.  await setRedisValue(`${TASK_NAME}_BEGIN_TIME`, `${new Date().getTime()}`)  
  10.  }  
  11.  // 完成標記值的讀寫操作后,釋放鎖  
  12.  awaitlock.unlock().catch(e => e)  
  13.  } 

然后把它添加到 taskHandler() 函數里面即可: 

  1. exportdefaultasyncfunction tasksHandler() {  
  2.   +  // 獲取第一個任務被取得的時間  
  3.   +  await setBeginTime(redlock)  
  4.   // 從隊列中取出一個任務  
  5.   const task = await popTask()  
  6.   // 處理任務  
  7.   await handleTask(task)  
  8.   // 遞歸運行  
  9.   await tasksHandler()  
  10.   } 

接下來解決“最后一個任務的完成時間”這個問題。

類似上一個問題,由于任務執行的先后順序無法保證,異步操作的完成時間也無法保證,因此我們也需要一個額外的標識來記錄任務的完成情況。在 Redis 中創建一個初始值為 0 的標識 localtasksCURINDEX,當 worker 完成一個任務就讓標識加。由于任務隊列的初始長度是已知的(為 TASKAMOUNT 常量,也寫入了 Redis 的 localtasksTOTAL 中),因此當標識的值等于隊列初始長度的值時,即可表明所有任務都已經完成。

如圖所示,被完成的任務都會讓黃色的標識加一,任何時候只要判斷到標識的值等于隊列的初始長度值,即可表明任務已經全部完成。

回到 taskHandler() 函數,加入下列內容:   

  1. exportdefaultasyncfunction tasksHandler() {  
  2.    +  // 獲取標識值和隊列初始長度  
  3.    +  let curIndex = Number(await getRedisValue(`${TASK_NAME}_CUR_INDEX`))  
  4.    +  const taskAmount = Number(await getRedisValue(`${TASK_NAME}_TOTAL`))  
  5.    +  // 等待新任務  
  6.    +  if(taskAmount === 0) {  
  7.    +    console.log(`${pm2tips} Wating new tasks...`)  
  8.    +    await sleep(2000)  
  9.    +    await tasksHandler()  
  10.    +    return  
  11.    +  }  
  12.    +  // 判斷所有任務已經完成  
  13.    +  if(curIndex === taskAmount) {  
  14.    +    const beginTime = await getRedisValue(`${TASK_NAME}_BEGIN_TIME`)  
  15.    +    // 獲取總耗時  
  16.    +    const cost = newDate().getTime() - Number(beginTime)  
  17.    +    console.log(`${pm2tips} All tasks were completed! Time cost: ${cost}ms. ${beginTime}`)  
  18.    +    // 初始化 Redis 的一些標識值  
  19.    +    await setRedisValue(`${TASK_NAME}_TOTAL`, '0')  
  20.    +    await setRedisValue(`${TASK_NAME}_CUR_INDEX`, '0')  
  21.    +    await setRedisValue(`${TASK_NAME}_SET_FIRST`, 'false')  
  22.    +    await delRedisKey(`${TASK_NAME}_BEGIN_TIME`)  
  23.    +    await sleep(2000)  
  24.    +    await tasksHandler()  
  25.    }  
  26.    // 獲取第一個任務被取得的時間  
  27.    await setBeginTime(redlock)  
  28.    // 從隊列中取出一個任務  
  29.    const task = await popTask()  
  30.    // 處理任務  
  31.    await handleTask(task)  
  32.    + // 任務完成后需要為標識位加一  
  33.    +  try{  
  34.    +    constlockawait redlock.lock(`lock:${TASK_NAME}_CUR_INDEX`, 1000)  
  35.    +    curIndex = await getCurIndex()  
  36.    +    await setCurIndex(curIndex + 1)  
  37.    +    awaitlock.unlock().catch((e) => e)  
  38.    +  } catch(e) {  
  39.    +    console.log(e)  
  40.    +  }  
  41.    +  // recursion  
  42.    +  await tasksHandler()  
  43.    +}  
  44.    // 遞歸運行  
  45.    await tasksHandler()  
  46.    } 

到這一步為止,我們已經解決了獲取“最后一個任務的完成時間”的問題,再結合前面的首個任務被取得的時間,便能得出運行的總耗時。

最后來看一下實際的運行效果。我們循例往隊列里面添加了 task-1 到 task-20 這 20 個任務,然后啟動 4 個進程來跑:

運行狀況良好。從運行結果來看,4 個進程處理 20 個平均耗時 2 秒的任務,只需要 10 秒的時間,完全符合設想。

五、小結

當面對海量的異步任務需要處理的時候,多進程 + 任務隊列的方式是一個不錯的解決方式。本文通過探索 Redis + NodeJS 結合的方式,構造出了一個異步任務隊列處理系統,能較好地完成最初方案的設想,但依然有很多問題需要改進。比如說當任務出錯了應該怎么辦,系統能否支持不同類型的任務,能否運行多個隊列等等,都是值得思考的問題。 

 

責任編輯:龐桂玉 來源: 前端教程
相關推薦

2011-10-25 09:28:30

Node.js

2020-08-07 10:40:56

Node.jsexpress前端

2023-03-01 09:39:40

調度系統

2022-01-05 12:09:16

異步隊列集群

2021-12-25 22:29:57

Node.js 微任務處理事件循環

2013-03-18 10:31:22

JS異常

2025-06-27 10:41:04

Redis數據庫集群

2021-09-07 07:53:43

工具

2024-03-15 15:20:10

并發服務IP

2022-09-21 12:01:22

消息隊列任務隊列任務調度

2010-12-01 14:34:59

AsyncTask異步處理任務Android

2011-06-01 10:59:59

Oceanbase海量數據庫

2021-04-06 10:15:29

Node.jsHooks前端

2011-12-23 13:58:57

node.js

2025-01-13 00:00:00

2012-06-26 10:03:06

海量數據處理

2024-12-09 09:25:30

2020-03-02 17:00:24

程序員數據庫MySQL

2023-09-16 18:16:57

Python系統

2022-04-25 15:01:07

系統程序員調度
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲一区二区三区在线播放 | 在线一区二区三区 | 国产丝袜人妖cd露出 | 色偷偷噜噜噜亚洲男人 | 亚洲综合日韩精品欧美综合区 | 99色播| 四虎最新地址 | 亚洲区一区二 | 狠狠av| 国产视频久 | 国产精品一区二区视频 | 国产欧美日韩一区 | 久久精品这里 | 午夜成人在线视频 | 亚洲午夜av | 在线播放一区二区三区 | 国产日韩精品视频 | 精品国产乱码久久久久久闺蜜 | 成人午夜毛片 | 99av成人精品国语自产拍 | 日本一区二区在线视频 | 国产免费看 | 国产精品一二三区在线观看 | 日韩精品免费在线 | 玖玖视频 | 亚洲精品电影 | 在线成人| 亚洲有码转帖 | 亚洲精品久久久一区二区三区 | 国产一区二区 | 亚洲欧美日韩精品久久亚洲区 | 一区中文字幕 | 久草影视在线 | 99re视频在线免费观看 | 99爱在线视频 | 超碰免费在线观看 | 亚洲高清在线视频 | 国产精品久久精品 | www.99热.com | av手机在线看 | 成年网站在线观看 |