成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Storm入門教程:一致性事務(wù)

運(yùn)維 系統(tǒng)運(yùn)維
Storm是一個(gè)分布式的流處理系統(tǒng),利用anchor和ack機(jī)制保證所有tuple都被成功處理。如果tuple出錯(cuò),則可以被重傳,但是如何保證出錯(cuò)的tuple只被處理一次呢?Storm提供了一套事務(wù)性組件Transaction Topology,用來(lái)解決這個(gè)問(wèn)題。

Storm是一個(gè)分布式的流處理系統(tǒng),利用anchor和ack機(jī)制保證所有tuple都被成功處理。如果tuple出錯(cuò),則可以被重傳,但是如何保證出錯(cuò)的tuple只被處理一次呢?Storm提供了一套事務(wù)性組件Transaction Topology,用來(lái)解決這個(gè)問(wèn)題。

Transactional Topology目前已經(jīng)不再維護(hù),由Trident來(lái)實(shí)現(xiàn)事務(wù)性topology,但是原理相同。

一、一致性事務(wù)的設(shè)計(jì)

Storm如何實(shí)現(xiàn)即對(duì)tuple并行處理,又保證事務(wù)性。本節(jié)從簡(jiǎn)單的事務(wù)性實(shí)現(xiàn)方法入手,逐步引出Transactional Topology的原理。

1、簡(jiǎn)單設(shè)計(jì)一:強(qiáng)順序流

保證tuple只被處理一次,最簡(jiǎn)單的方法就是將tuple流變成強(qiáng)順序的,并且每次只處理一個(gè)tuple。從1開(kāi)始,給每個(gè)tuple都順序加上一個(gè)id。在處理tuple的時(shí)候,將處理成功的tuple id和計(jì)算結(jié)果存在數(shù)據(jù)庫(kù)中。下一個(gè)tuple到來(lái)的時(shí)候,將其id與數(shù)據(jù)庫(kù)中的id做比較。如果相同,則說(shuō)明這個(gè)tuple已經(jīng)被成功處理過(guò)了,忽略它;如果不同,根據(jù)強(qiáng)順序性,說(shuō)明這個(gè)tuple沒(méi)有被處理過(guò),將它的id及計(jì)算結(jié)果更新到數(shù)據(jù)庫(kù)中。

以統(tǒng)計(jì)消息總數(shù)為例。每來(lái)一個(gè)tuple,如果數(shù)據(jù)庫(kù)中存儲(chǔ)的id 與當(dāng)前tuple id不同,則數(shù)據(jù)庫(kù)中的消息總數(shù)加1,同時(shí)更新數(shù)據(jù)庫(kù)中的當(dāng)前tuple id值。如圖:

 

但是這種機(jī)制使得系統(tǒng)一次只能處理一個(gè)tuple,無(wú)法實(shí)現(xiàn)分布式計(jì)算。

2、簡(jiǎn)單設(shè)計(jì)二:強(qiáng)順序batch流

為了實(shí)現(xiàn)分布式,我們可以每次處理一批tuple,稱為一個(gè)batch。一個(gè)batch中的tuple可以被并行處理。

我們要保證一個(gè)batch只被處理一次,機(jī)制和上一節(jié)類似。只不過(guò)數(shù)據(jù)庫(kù)中存儲(chǔ)的是batch id。batch的中間計(jì)算結(jié)果先存在局部變量中,當(dāng)一個(gè)batch中的所有tuple都被處理完之后,判斷batch id,如果跟數(shù)據(jù)庫(kù)中的id不同,則將中間計(jì)算結(jié)果更新到數(shù)據(jù)庫(kù)中。

如何確保一個(gè)batch里面的所有tuple都被處理完了呢?可以利用Storm提供的CoordinateBolt。如圖:

但是強(qiáng)順序batch流也有局限,每次只能處理一個(gè)batch,batch之間無(wú)法并行。要想實(shí)現(xiàn)真正的分布式事務(wù)處理,可以使用storm提供的Transactional Topology。在此之前,我們先詳細(xì)介紹一下CoordinateBolt的原理。

3、CoordinateBolt原理

CoordinateBolt具體原理如下:

  • 真正執(zhí)行計(jì)算的bolt外面封裝了一個(gè)CoordinateBolt。真正執(zhí)行任務(wù)的bolt我們稱為real bolt。
  • 每個(gè)CoordinateBolt記錄兩個(gè)值:有哪些task給我發(fā)送了tuple(根據(jù)topology的grouping信息);我要給哪些tuple發(fā)送信息(同樣根據(jù)groping信息)
  • Real bolt發(fā)出一個(gè)tuple后,其外層的CoordinateBolt會(huì)記錄下這個(gè)tuple發(fā)送給哪個(gè)task了。
  • 等所有的tuple都發(fā)送完了之后,CoordinateBolt通過(guò)另外一個(gè)特殊的stream以emitDirect的方式告訴所有它發(fā)送過(guò)tuple的task,它發(fā)送了多少tuple給這個(gè)task。下游task會(huì)將這個(gè)數(shù)字和自己已經(jīng)接收到的tuple數(shù)量做對(duì)比,如果相等,則說(shuō)明處理完了所有的tuple。
  • 下游CoordinateBolt會(huì)重復(fù)上面的步驟,通知其下游。

整個(gè)過(guò)程如圖所示:

CoordinateBolt主要用于兩個(gè)場(chǎng)景:

  • DRPC
  • Transactional Topology

CoordinatedBolt對(duì)于業(yè)務(wù)是有侵入的,要使用CoordinatedBolt提供的功能,你必須要保證你的每個(gè)bolt發(fā)送的每個(gè)tuple的第一個(gè)field是request-id。 所謂的“我已經(jīng)處理完我的上游”的意思是說(shuō)當(dāng)前這個(gè)bolt對(duì)于當(dāng)前這個(gè)request-id所需要做的工作做完了。這個(gè)request-id在DRPC里面代表一個(gè)DRPC請(qǐng)求;在Transactional Topology里面代表一個(gè)batch。

4、Trasactional Topology

Storm提供的Transactional Topology將batch計(jì)算分為process和commit兩個(gè)階段。Process階段可以同時(shí)處理多個(gè)batch,不用保證順序性;commit階段保證batch的強(qiáng)順序性,并且一次只能處理一個(gè)batch,第1個(gè)batch成功提交之前,第2個(gè)batch不能被提交。

還是以統(tǒng)計(jì)消息總數(shù)為例,以下代碼來(lái)自storm-starter里面的TransactionalGlobalCount。

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA,new Fields(“word“), PARTITION_TAKE_PER_BATCH);

TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(“global-count“, “spout“, spout, 3);

builder.setBolt(“partial-count“, new BatchCount(), 5).noneGrouping(“spout“);

builder.setBolt(“sum“, new UpdateGlobalCount()).globalGrouping(“partial-count“);

TransactionalTopologyBuilder共接收四個(gè)參數(shù)。

  • 這個(gè)Transactional Topology的id。Id用來(lái)在Zookeeper中保存當(dāng)前topology的進(jìn)度,如果這個(gè)topology重啟,可以繼續(xù)之前的進(jìn)度執(zhí)行。
  • Spout在這個(gè)topology中的id
  • 一個(gè)TransactionalSpout。一個(gè)Trasactional Topology中只能有一個(gè)TrasactionalSpout.在本例中是一個(gè)MemoryTransactionalSpout,從一個(gè)內(nèi)存變量(DATA)中讀取數(shù)據(jù)。
  • TransactionalSpout的并行度(可選)。

下面是BatchCount的定義:

  1. public static class BatchCount extends BaseBatchBolt { 
  2. Object _id; 
  3. BatchOutputCollector _collector; 
  4. int _count = 0
  5. @Override 
  6. public void prepare(Map conf, TopologyContext context, 
  7. BatchOutputCollector collector, Object id) { 
  8. _collector = collector; 
  9. _id = id; 
  10. @Override 
  11. public void execute(Tuple tuple) { 
  12. _count++; 
  13. @Override 
  14. public void finishBatch() { 
  15. _collector.emit(new Values(_id, _count)); 
  16. @Override 
  17. public void declareOutputFields(OutputFieldsDeclarer declarer) { 
  18. declarer.declare(new Fields(“id“, “count“)); 

BatchCount的prepare方法的最后一個(gè)參數(shù)是batch id,在Transactional Tolpoloyg里面這id是一個(gè)TransactionAttempt對(duì)象。

Transactional Topology里發(fā)送的tuple都必須以TransactionAttempt作為第一個(gè)field,storm根據(jù)這個(gè)field來(lái)判斷tuple屬于哪一個(gè)batch。

TransactionAttempt包含兩個(gè)值:一個(gè)transaction id,一個(gè)attempt id。transaction id的作用就是我們上面介紹的對(duì)于每個(gè)batch中的tuple是唯一的,而且不管這個(gè)batch replay多少次都是一樣的。attempt id是對(duì)于每個(gè)batch唯一的一個(gè)id, 但是對(duì)于同一個(gè)batch,它replay之后的attempt id跟replay之前就不一樣了, 我們可以把a(bǔ)ttempt id理解成replay-times, storm利用這個(gè)id來(lái)區(qū)別一個(gè)batch發(fā)射的tuple的不同版本。

execute方法會(huì)為batch里面的每個(gè)tuple執(zhí)行一次,你應(yīng)該把這個(gè)batch里面的計(jì)算狀態(tài)保持在一個(gè)本地變量里面。對(duì)于這個(gè)例子來(lái)說(shuō), 它在execute方法里面遞增tuple的個(gè)數(shù)。

最后, 當(dāng)這個(gè)bolt接收到某個(gè)batch的所有的tuple之后, finishBatch方法會(huì)被調(diào)用。這個(gè)例子里面的BatchCount類會(huì)在這個(gè)時(shí)候發(fā)射它的局部數(shù)量到它的輸出流里面去。

下面是UpdateGlobalCount類的定義:

  1. public static class UpdateGlobalCount extends BaseTransactionalBolt 
  2. implements ICommitter { 
  3. TransactionAttempt _attempt; 
  4. BatchOutputCollector _collector; 
  5. int _sum = 0
  6. @Override 
  7. public void prepare(Map conf, TopologyContext context, 
  8. BatchOutputCollector collector, TransactionAttempt attempt) { 
  9. _collector = collector; 
  10. _attempt = attempt; 
  11. @Override 
  12. public void execute(Tuple tuple) { 
  13. _sum+=tuple.getInteger(1); 
  14. @Override 
  15. public void finishBatch() { 
  16. Value val = DATABASE.get(GLOBAL_COUNT_KEY); 
  17. Value newval; 
  18. if(val == null || !val.txid.equals(_attempt.getTransactionId())) { 
  19. newnewval = new Value(); 
  20. newval.txid = _attempt.getTransactionId(); 
  21. if(val==null) { 
  22. newval.count = _sum
  23. } else { 
  24. newval.count = _sum + val.count; 
  25. DATABASE.put(GLOBAL_COUNT_KEY, newval); 
  26. } else { 
  27. newval = val; 
  28. _collector.emit(new Values(_attempt, newval.count)); 
  29. @Override 
  30. public void declareOutputFields(OutputFieldsDeclarer declarer) { 
  31. declarer.declare(new Fields(“id“, “sum“)); 

UpdateGlobalCount實(shí)現(xiàn)了ICommitter接口,所以storm只會(huì)在commit階段執(zhí)行finishBatch方法。而execute方法可以在任何階段完成。

在UpdateGlobalCount的finishBatch方法中,將當(dāng)前的transaction id與數(shù)據(jù)庫(kù)中存儲(chǔ)的id做比較。如果相同,則忽略這個(gè)batch;如果不同,則把這個(gè)batch的計(jì)算結(jié)果加到總結(jié)果中,并更新數(shù)據(jù)庫(kù)。

Transactional Topolgy運(yùn)行示意圖如下:

下面總結(jié)一下Transactional Topology的一些特性:

  • Transactional Topology將事務(wù)性機(jī)制都封裝好了,其內(nèi)部使用CoordinateBolt來(lái)保證一個(gè)batch中的tuple被處理完。
  • TransactionalSpout只能有一個(gè),它將所有tuple分為一個(gè)一個(gè)的batch,而且保證同一個(gè)batch的transaction id始終一樣。
  • BatchBolt處理batch在一起的tuples。對(duì)于每一個(gè)tuple調(diào)用execute方法,而在整個(gè)batch處理完成的時(shí)候調(diào)用finishBatch方法。
  • 如果BatchBolt被標(biāo)記成Committer,則只能在commit階段調(diào)用finishBolt方法。一個(gè)batch的commit階段由storm保證只在前一個(gè)batch成功提交之后才會(huì)執(zhí)行。并且它會(huì)重試直到topology里面的所有bolt在commit完成提交。
  • Transactional Topology隱藏了anchor/ack框架,它提供一個(gè)不同的機(jī)制來(lái)fail一個(gè)batch,從而使得這個(gè)batch被replay。

二、Trident介紹

Trident是Storm之上的高級(jí)抽象,提供了joins,grouping,aggregations,fuctions和filters等接口。如果你使用過(guò)Pig或Cascading,對(duì)這些接口就不會(huì)陌生。

Trident將stream中的tuples分成batches進(jìn)行處理,API封裝了對(duì)這些batches的處理過(guò)程,保證tuple只被處理一次。處理batches中間結(jié)果存儲(chǔ)在TridentState對(duì)象中。

Trident事務(wù)性原理這里不詳細(xì)介紹,有興趣的讀者請(qǐng)自行查閱資料。

參考:http://xumingming.sinaapp.com/736/twitter-storm-transactional-topolgoy/

http://xumingming.sinaapp.com/811/twitter-storm-code-analysis-coordinated-bolt/

https://github.com/nathanmarz/storm/wiki/Trident-tutorial

責(zé)任編輯:黃丹 來(lái)源: 量子恒道官方博客
相關(guān)推薦

2014-01-16 16:53:53

storm事務(wù)一致性

2013-08-29 14:12:52

Storm分布式實(shí)時(shí)計(jì)算

2013-08-29 14:28:09

StormHadoop

2022-08-29 08:38:00

事務(wù)一致性

2014-01-13 11:22:28

storm

2021-08-13 07:56:13

Raft算法日志

2022-08-11 07:55:05

數(shù)據(jù)庫(kù)Mysql

2013-12-12 16:14:21

storm入門教程storm消息處理

2017-07-25 14:38:56

數(shù)據(jù)庫(kù)一致性非鎖定讀一致性鎖定讀

2013-09-18 14:46:32

StormStorm集群

2009-06-18 09:18:08

Oracle檢索數(shù)據(jù)數(shù)據(jù)一致性事務(wù)恢復(fù)

2023-12-01 13:51:21

數(shù)據(jù)一致性數(shù)據(jù)庫(kù)

2019-09-18 08:41:53

并發(fā)扣減一致性redis

2021-03-04 06:49:53

RocketMQ事務(wù)

2022-12-14 08:23:30

2013-04-03 10:01:42

JavaequalsObject

2021-02-05 08:00:48

哈希算法?機(jī)器

2021-02-02 12:40:50

哈希算法數(shù)據(jù)

2014-01-16 11:14:37

StormTopology
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 91亚洲精品国偷拍自产在线观看 | 污污免费网站 | 黑人性hd| 国产午夜精品一区二区三区嫩草 | 国精品一区 | 欧美激情综合网 | a级性视频 | 免费在线看a | 成人在线精品 | 四虎在线播放 | 日韩激情在线 | 九九在线视频 | 美国一级黄色片 | 亚洲日本一区二区三区四区 | 久久久久久久久久久久久久av | 成人在线小视频 | 一区二区三区在线 | 欧美视频 | 欧美一区在线视频 | a级大片免费观看 | 欧美在线一区二区三区 | 精品国产青草久久久久福利 | av天天澡天天爽天天av | 亚洲成人在线视频播放 | 亚洲成人久久久 | 国产一级在线视频 | 久久一区二区视频 | av免费网站在线观看 | 亚洲午夜电影 | 欧美日本亚洲 | 欧美成人精品在线 | 欧美xxxx黑人又粗又长 | 超碰在线免费公开 | 中文在线a在线 | 欧美日韩成人 | 精品伦精品一区二区三区视频 | 成人免费一区二区三区牛牛 | 欧美视频免费在线观看 | 午夜丁香视频在线观看 | 国产97在线视频 | 中国av在线免费观看 |