成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

如何設(shè)計一個實(shí)時流計算系統(tǒng)?

運(yùn)維 系統(tǒng)運(yùn)維
實(shí)時流計算:業(yè)務(wù)系統(tǒng)根據(jù)實(shí)時的操作,不斷生成事件(消息/調(diào)用),然后引起一系列的處理分析,這個過程是分散在多臺計算機(jī)上并行完成的,看上去就像事件連續(xù)不斷的流經(jīng)多個計算節(jié)點(diǎn)處理,形成一個實(shí)時流計算系統(tǒng)。本文主要講了做一個簡單的流計算系統(tǒng)的方法。

實(shí)時流計算的場景歸納起來多半是:業(yè)務(wù)系統(tǒng)根據(jù)實(shí)時的操作,不斷生成事件(消息/調(diào)用),然后引起一系列的處理分析,這個過程是分散在多臺計算機(jī)上并行完成的,看上去就像事件連續(xù)不斷的流經(jīng)多個計算節(jié)點(diǎn)處理,形成一個實(shí)時流計算系統(tǒng)。

市場上流計算產(chǎn)品有很多,主要是通過消息中樞結(jié)合工人模式實(shí)現(xiàn),大致過程如下:

1、開發(fā)者實(shí)現(xiàn)好流程輸入輸出節(jié)點(diǎn)邏輯,上傳job到任務(wù)生產(chǎn)者

2、任務(wù)生產(chǎn)者將任務(wù)發(fā)送到zookeeper,然后監(jiān)控任務(wù)狀態(tài)

3、任務(wù)消費(fèi)者從zookeeper上獲取任務(wù)

4、任務(wù)消費(fèi)者啟動多個工人進(jìn)程,每個進(jìn)程又啟動多個線程執(zhí)行任務(wù)

5、工人之間通過zeroMQ交互

我們看看如何做一個簡單的流計算系統(tǒng),做法跟上面有些不同:

1、首先不過多依賴zookeerper,任務(wù)的分配最好直接給到工人,并能直接監(jiān)控工人完成狀態(tài),這樣效率會更高。

2、工人之間直接通訊,不依賴zeroMQ轉(zhuǎn)發(fā)。

3、并行管理扁平化,多進(jìn)程下再分多線程意義不大,增加管理成本,實(shí)際上一臺機(jī)器8個進(jìn)程,每個進(jìn)程再開8個線程,總體跟8-10個進(jìn)程或者線程的效果差不多(數(shù)量視機(jī)器性能不同)。

4、做成一個流計算系統(tǒng),而不是平臺。

這里我們借助fourinone提供的api和框架去實(shí)現(xiàn),第一次使用可以參考分布式計算上手demo指南,開發(fā)包下載地址 http://code.google.com/p/fourinone/

大致思路:用工頭去做任務(wù)生產(chǎn)和分配,用工人去做任務(wù)執(zhí)行,為了達(dá)到流的效果,需要在工人里面調(diào)用工頭的方式,將多個工人節(jié)點(diǎn)串起來。

下面程序演示了連續(xù)多個消息先發(fā)到一個工人節(jié)點(diǎn)A處理,然后再發(fā)到兩個工人節(jié)點(diǎn)B并行處理的流計算過程,并且獲取到最后處理結(jié)果打印輸出(如果不需要獲取結(jié)果可以直接返回)。

  • StreamCtorA:工頭A實(shí)現(xiàn),它獲取到線上工人A,然后將消息發(fā)給它處理,并輪循等待結(jié)果。工頭A的main函數(shù)模擬了多個消息的連續(xù)調(diào)用。
  • StreamWorkerA:工人A實(shí)現(xiàn),它接收到工頭A的消息進(jìn)行處理,然后創(chuàng)建一個工頭B,通過工頭B將結(jié)果同時發(fā)給兩個工人B處理,然后將結(jié)果返回工頭A。
  • StreamCtorB:工頭B實(shí)現(xiàn),它獲取到線上兩個工人B,調(diào)用doTaskBatch等待兩個工人處理完成,然后返回結(jié)果給工人A。
  • StreamWorkerB:工人B實(shí)現(xiàn),它接收到任務(wù)消息后模擬處理后返回結(jié)果。

運(yùn)行步驟(在本地模擬): 

1、啟動ParkServerDemo(它的IP端口已經(jīng)在配置文件指定) 

  1. java -cp fourinone.jar; ParkServerDemo 

2、啟動工人A

  1. java  -cp fourinone.jar; StreamWorkerA localhost 2008 

3、啟動兩個工人B

  1. java  -cp fourinone.jar; StreamWorkerB localhost 2009 
  2. java  -cp fourinone.jar; StreamWorkerB localhost 2010 

4、啟動工頭A

  1. java  -cp fourinone.jar; StreamCtorA 

多機(jī)部署說明:StreamCtorA可以單獨(dú)部署一臺機(jī)器,StreamWorkerA和StreamCtorB部署一臺機(jī)器,兩個StreamWorkerB可以部署兩臺機(jī)器。

總結(jié):計算平臺和計算系統(tǒng)的區(qū)別

如果我們只有幾臺機(jī)器,但是每天有人開發(fā)不同的流處理應(yīng)用要在這幾臺機(jī)器上運(yùn)行,我們需要一個計算平臺來管理好job,讓開發(fā)者按照規(guī)范配置好流程和運(yùn)行時節(jié)點(diǎn)申請,打包成job上傳,然后平臺根據(jù)每個job配置動態(tài)分配資源依次執(zhí)行每個job內(nèi)容。

如果我們的幾臺機(jī)器只為一個流處理業(yè)務(wù)服務(wù),比如實(shí)時營銷,我們需要一個流計算系統(tǒng),按照業(yè)務(wù)流程部署好計算節(jié)點(diǎn)即可,不需要運(yùn)行多個job和動態(tài)分配資源,按照計算平臺的方式做只會增加復(fù)雜性,開發(fā)者也不清楚每臺機(jī)器上到底運(yùn)行了什么邏輯。

如果你想實(shí)現(xiàn)一個計算平臺,可以參考動態(tài)部署和進(jìn)程管理功能(開發(fā)包內(nèi)有指南)

//完整源碼

// ParkServerDemo

  1. import com.fourinone.BeanContext; 
  2. public class ParkServerDemo 
  3.     public static void main(String[] args) 
  4.     { 
  5.         BeanContext.startPark(); 
  6.     } 

//StreamCtorA

  1. import com.fourinone.Contractor; 
  2. import com.fourinone.WareHouse; 
  3. import com.fourinone.WorkerLocal; 
  4. import java.util.ArrayList; 
  5. public class StreamCtorA extends Contractor 
  6.  public WareHouse giveTask(WareHouse inhouse) 
  7.  { 
  8.   WorkerLocal[] wks = getWaitingWorkers("StreamWorkerA"); 
  9.   System.out.println("wks.length:"+wks.length); 
  10.   WareHouse result = wks[0].doTask(inhouse); 
  11.   while(true){ 
  12.    if(result.getStatus()!=WareHouse.NOTREADY) 
  13.    { 
  14.     break; 
  15.    } 
  16.   } 
  17.   return result; 
  18.  } 
  19.  public static void main(String[] args) 
  20.  { 
  21.   StreamCtorA sc = new StreamCtorA(); 
  22.   for(int i=0;i<10;i++){ 
  23.     WareHouse msg = new WareHouse(); 
  24.     msg.put("msg","hello"+i); 
  25.     WareHouse wh = sc.giveTask(msg); 
  26.     System.out.println(wh); 
  27.   } 
  28.   sc.exit(); 
  29.  } 

//StreamWorkerA

  1. import com.fourinone.MigrantWorker; 
  2. import com.fourinone.WareHouse; 
  3. public class StreamWorkerA extends MigrantWorker 
  4.  public WareHouse doTask(WareHouse inhouse) 
  5.  { 
  6.   System.out.println(inhouse); 

  //do something

  1. StreamCtorB sc = new StreamCtorB(); 
  2. WareHouse msg = new WareHouse(); 
  3. msg.put("msg",inhouse.getString("msg")+",from StreamWorkerA"); 
  4. WareHouse wh = sc.giveTask(msg); 
  5. sc.exit(); 
  6. return wh; 
  7.  } 
  8. public static void main(String[] args) 
  9.  { 
  10. StreamWorkerA wd = new StreamWorkerA(); 
  11. wd.waitWorking(args[0],Integer.parseInt(args[1]),"StreamWorkerA"); 
  12.  } 

//StreamCtorB 

  1. import com.fourinone.Contractor; 
  2. import com.fourinone.WareHouse; 
  3. import com.fourinone.WorkerLocal; 
  4. import java.util.ArrayList; 
  5. public class StreamCtorB extends Contractor 
  6.  public WareHouse giveTask(WareHouse inhouse) 
  7.  { 
  8.   WorkerLocal[] wks = getWaitingWorkers("StreamWorkerB"); 
  9.   System.out.println("wks.length:"+wks.length); 
  10.   WareHouse[] hmarr = doTaskBatch(wks, inhouse); 
  11.   WareHouse result = new WareHouse(); 
  12.   result.put("B1",hmarr[0]); 
  13.   result.put("B2",hmarr[1]); 
  14.   return result; 
  15.  } 

//StreamWorkerB 

  1. view sourceprint? 
  2. import com.fourinone.MigrantWorker; 
  3. import com.fourinone.WareHouse; 
  4. public class StreamWorkerB extends MigrantWorker 
  5.  public WareHouse doTask(WareHouse inhouse) 
  6.  { 
  7.   System.out.println(inhouse); 

  //do something

  1. inhouse.put("msg",inhouse.getString("msg")+",from StreamWorkerB"); 
  2. return inhouse; 
  3.  } 
  4.  public static void main(String[] args) 
  5.  { 
  6.   StreamWorkerB wd = new StreamWorkerB(); 
  7.   wd.waitWorking(args[0],Integer.parseInt(args[1]),"StreamWorkerB"); 
  8.  } 

 

責(zé)任編輯:黃丹 來源: oschina.net
相關(guān)推薦

2020-09-16 11:20:03

流計算基準(zhǔn)測試

2018-09-18 09:38:11

RPC遠(yuǎn)程調(diào)用網(wǎng)絡(luò)通信

2020-11-11 09:49:12

計算架構(gòu)

2023-09-08 08:10:48

2023-09-08 08:22:30

2024-08-27 12:49:20

2020-09-02 07:22:17

JavaScript插件框架

2024-04-24 10:38:22

2024-11-20 13:18:21

2022-07-18 08:02:16

秒殺系統(tǒng)后端

2025-01-22 08:00:00

架構(gòu)秒殺系統(tǒng)Java

2024-06-17 11:59:39

2020-03-26 09:36:06

AB Test平臺的流量

2015-07-28 14:35:40

2019-08-01 08:36:51

緩存系統(tǒng)并發(fā)

2024-06-21 08:15:25

2024-02-28 08:04:15

2013-07-01 11:01:22

API設(shè)計API

2022-09-13 08:01:58

短鏈服務(wù)哈希算法字符串

2025-04-30 08:56:34

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 麻豆久久精品 | 欧美一级在线观看 | 日韩视频中文字幕 | 狠狠涩| 欧美一级黑人aaaaaaa做受 | 国产精品久久久久久久久久免费看 | 黄色播放 | 亚洲免费网址 | 精品国产免费一区二区三区演员表 | 亚洲一一在线 | 亚洲精品91 | 香蕉一区 | 亚洲顶级毛片 | 免费视频一区二区 | 综合五月| 国产欧美一区二区三区在线看 | 亚洲精选一区 | 丁香综合 | 国产在线一区二区三区 | 久久久www成人免费无遮挡大片 | 欧美一级二级在线观看 | 国产国语精品 | 91av视频在线播放 | 亚洲精选久久 | 一区二区三区在线免费观看 | 在线看亚洲 | 一区二区视频 | 天天色天天 | 国产一区二区三区在线 | 精品亚洲一区二区三区 | 久久久国产精品一区 | 欧美不卡在线 | 免费在线a视频 | 蜜桃av鲁一鲁一鲁一鲁 | 久久久国产一区 | 久久久夜色精品亚洲 | 亚洲精品成人在线 | av特级毛片 | 91视频麻豆 | 国产成人免费视频 | 99影视|