成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Hive、MapReduce、Spark分布式生成唯一數(shù)值型ID

大數(shù)據(jù) 分布式 Spark
Spark中生成這樣的非連續(xù)唯一數(shù)值型ID,非常簡(jiǎn)單,直接使用zipWithUniqueId()即可。

[[188237]]

在實(shí)際業(yè)務(wù)場(chǎng)景下,經(jīng)常會(huì)遇到在Hive、MapReduce、Spark中需要生成唯一的數(shù)值型ID。

一般常用的做法有:

MapReduce中使用1個(gè)Reduce來(lái)生成;

Hive中使用row_number分析函數(shù)來(lái)生成,其實(shí)也是1個(gè)Reduce;

借助HBase或Redis或Zookeeper等其它框架的計(jì)數(shù)器來(lái)生成;

數(shù)據(jù)量不大的情況下,可以直接使用1和2方法來(lái)生成,但如果數(shù)據(jù)量巨大,1個(gè)Reduce處理起來(lái)就非常慢。

在數(shù)據(jù)量非常大的情況下,如果你僅僅需要唯一的數(shù)值型ID,注意:不是需要”連續(xù)的唯一的數(shù)值型ID”,那么可以考慮采用本文中介紹的方法,否則,請(qǐng)使用第3種方法來(lái)完成。

Spark中生成這樣的非連續(xù)唯一數(shù)值型ID,非常簡(jiǎn)單,直接使用zipWithUniqueId()即可。

參考zipWithUniqueId()的方法,在MapReduce和Hive中,實(shí)現(xiàn)如下:

 

在Spark中,zipWithUniqueId是通過(guò)使用分區(qū)Index作為每個(gè)分區(qū)ID的開始值,在每個(gè)分區(qū)內(nèi),ID增長(zhǎng)的步長(zhǎng)為該RDD的分區(qū)數(shù),那么在MapReduce和Hive中,也可以照此思路實(shí)現(xiàn),Spark中的分區(qū)數(shù),即為MapReduce中的Map數(shù),Spark分區(qū)的Index,即為Map Task的ID。Map數(shù),可以通過(guò)JobConf的getNumMapTasks(),而Map Task ID,可以通過(guò)參數(shù)mapred.task.id獲取,格式如:attempt_1478926768563_0537_m_000004_0,截取m_000004_0中的4,再加1,作為該Map Task的ID起始值。注意:這兩個(gè)只均需要在Job運(yùn)行時(shí)才能獲取。另外,從圖中也可以看出,每個(gè)分區(qū)/Map Task中的數(shù)據(jù)量不是絕對(duì)一致的,因此,生成的ID不是連續(xù)的。

下面的UDF可以在Hive中直接使用:

  1. package com.lxw1234.hive.udf; 
  2.   
  3. import org.apache.hadoop.hive.ql.exec.MapredContext; 
  4. import org.apache.hadoop.hive.ql.exec.UDFArgumentException; 
  5. import org.apache.hadoop.hive.ql.metadata.HiveException; 
  6. import org.apache.hadoop.hive.ql.udf.UDFType; 
  7. import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; 
  8. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; 
  9. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; 
  10. import org.apache.hadoop.io.LongWritable; 
  11.   
  12. @UDFType(deterministic = false, stateful = true
  13. public class RowSeq2 extends GenericUDF { 
  14.      
  15.     private static LongWritable result = new LongWritable(); 
  16.     private static final char SEPARATOR = '_'
  17.     private static final String ATTEMPT = "attempt"
  18.     private long initID = 0l; 
  19.     private int increment = 0; 
  20.      
  21.      
  22.     @Override 
  23.     public void configure(MapredContext context) { 
  24.         increment = context.getJobConf().getNumMapTasks(); 
  25.         if(increment == 0) { 
  26.             throw new IllegalArgumentException("mapred.map.tasks is zero"); 
  27.         } 
  28.          
  29.         initID = getInitId(context.getJobConf().get("mapred.task.id"),increment); 
  30.         if(initID == 0l) { 
  31.             throw new IllegalArgumentException("mapred.task.id"); 
  32.         } 
  33.          
  34.         System.out.println("initID : " + initID + "  increment : " + increment); 
  35.     } 
  36.      
  37.     @Override 
  38.     public ObjectInspector initialize(ObjectInspector[] arguments) 
  39.             throws UDFArgumentException { 
  40.         return PrimitiveObjectInspectorFactory.writableLongObjectInspector; 
  41.     } 
  42.   
  43.     @Override 
  44.     public Object evaluate(DeferredObject[] arguments) throws HiveException { 
  45.         result.set(getValue()); 
  46.         increment(increment); 
  47.         return result; 
  48.     } 
  49.      
  50.     @Override 
  51.     public String getDisplayString(String[] children) { 
  52.         return "RowSeq-func()"
  53.     } 
  54.      
  55.     private synchronized void increment(int incr) { 
  56.         initID += incr; 
  57.     } 
  58.      
  59.     private synchronized long getValue() { 
  60.         return initID; 
  61.     } 
  62.      
  63.     //attempt_1478926768563_0537_m_000004_0 // return 0+1 
  64.     private long getInitId (String taskAttemptIDstr,int numTasks) 
  65.             throws IllegalArgumentException { 
  66.         try { 
  67.             String[] parts = taskAttemptIDstr.split(Character.toString(SEPARATOR)); 
  68.             if(parts.length == 6) { 
  69.                 if(parts[0].equals(ATTEMPT)) { 
  70.                     if(!parts[3].equals("m") && !parts[3].equals("r")) { 
  71.                         throw new Exception(); 
  72.                     } 
  73.                     long result = Long.parseLong(parts[4]); 
  74.                     if(result >= numTasks) { //if taskid >= numtasks 
  75.                         throw new Exception("TaskAttemptId string : " + taskAttemptIDstr 
  76.                                 + "  parse ID [" + result + "] >= numTasks[" + numTasks + "] .."); 
  77.                     } 
  78.                     return result + 1; 
  79.                 } 
  80.             } 
  81.         } catch (Exception e) {} 
  82.         throw new IllegalArgumentException("TaskAttemptId string : " + taskAttemptIDstr 
  83.                 + " is not properly formed"); 
  84.     } 
  85.      
  86.   

有一張去重后的用戶id(字符串類型)表,需要位每個(gè)用戶id生成一個(gè)唯一的數(shù)值型seq:

  1. ADD jar file:///tmp/udf.jar; 
  2. CREATE temporary function seq2 as 'com.lxw1234.hive.udf.RowSeq2'
  3.   
  4. hive>> desc lxw_all_ids; 
  5. OK 
  6. id                      string                                       
  7. Time taken: 0.074 seconds, Fetched: 1 row(s) 
  8. hive> select * from lxw_all_ids limit 5; 
  9. OK 
  10. 01779E7A06ABF5565A4982_cookie 
  11. 031E2D2408C29556420255_cookie 
  12. 03371ADA0B6E405806FFCD_cookie 
  13. 0517C4B701BC1256BFF6EC_cookie 
  14. 05F12ADE0E880455931C1A_cookie 
  15. Time taken: 0.215 seconds, Fetched: 5 row(s) 
  16. hive> select count(1) from lxw_all_ids; 
  17. 253402337 
  18.   
  19. hive> create table lxw_all_ids2 as select id,seq2() as seq from lxw_all_ids; 
  20. … 
  21. Hadoop job information for Stage-1: number of mappers: 27; number of reducers: 0 
  22. … 
  23.   
  24.   
  25.   

該Job使用了27個(gè)Map Task,沒有使用Reduce,那么將會(huì)產(chǎn)生27個(gè)結(jié)果文件。

再看結(jié)果表中的數(shù)據(jù):

  1. hive> select * from lxw_all_ids2 limit 10; 
  2. OK 
  3. 766CA2770527B257D332AA_cookie   1 
  4. 5A0492DB0000C557A81383_cookie   28 
  5. 8C06A5770F176E58301EEF_cookie   55 
  6. 6498F47B0BCAFE5842B83A_cookie   82 
  7. 6DA33CB709A23758428A44_cookie   109 
  8. B766347B0D27925842AC2D_cookie   136 
  9. 5794357B050C99584251AC_cookie   163 
  10. 81D67A7B011BEA5842776C_cookie   190 
  11. 9D2F8EB40AEA525792347D_cookie   217 
  12. BD21077B09F9E25844D2C1_cookie   244 
  13.   
  14. hive> select count(1),count(distinct seq) from lxw_all_ids2; 
  15. 253402337       253402337 
  16.   

limit 10只從第一個(gè)結(jié)果文件,即MapTaskId為0的結(jié)果文件中拿了10條,這個(gè)Map中,start=1,increment=27,因此生成的ID如上所示。

count(1),count(distinct seq)的值相同,說(shuō)明seq沒有重復(fù)值,你可以試試max(seq),結(jié)果必然大于253402337,說(shuō)明seq是”非連續(xù)唯一數(shù)值型ID“.

責(zé)任編輯:武曉燕 來(lái)源: lxw的大數(shù)據(jù)田地
相關(guān)推薦

2022-02-23 07:09:30

分布式ID雪花算法

2021-06-28 14:45:07

分布式框架操作

2021-11-08 19:25:37

Go生成系統(tǒng)

2020-07-21 11:35:21

開發(fā)技能代碼

2023-09-03 22:14:23

分布式ID

2024-02-02 10:57:12

Java分布式算法

2017-07-01 16:02:39

分布式ID生成器

2021-06-05 07:33:09

ID分布式架構(gòu)

2023-12-13 09:35:52

算法分布式

2019-09-05 13:06:08

雪花算法分布式ID

2024-10-31 13:51:58

2023-12-12 07:13:39

雪花算法分布式ID

2023-01-12 17:46:37

分庫(kù)分表id如何生成

2021-07-02 06:54:43

分布式環(huán)境ID

2016-11-29 09:12:21

數(shù)據(jù)庫(kù)分布式ID

2015-10-15 14:05:51

StormSparkMapReduce

2016-09-01 13:48:18

2022-09-28 07:58:06

MongoDB分布式ID

2019-09-03 09:22:08

數(shù)據(jù)庫(kù)Redis算法

2025-03-28 10:27:29

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: a免费视频 | 黄色欧美在线 | 一区二区三区四区五区在线视频 | 日韩久久久久 | 伊人精品在线视频 | 色黄爽 | 久久国产成人 | 国产乱码精品一品二品 | 91视频在线观看免费 | 国产精成人 | 亚洲国产精品久久久久 | 亚洲一二三区在线观看 | 精品无码久久久久久久动漫 | 欧美一区二区三区久久精品视 | 久久久日韩精品一区二区三区 | 91porn国产成人福利 | 国产精品激情 | 91综合网 | 国产7777 | 性一交一乱一透一a级 | 天天操夜夜拍 | 毛片网站在线观看 | 久久久久久国产精品久久 | 91精品国产91久久久久久吃药 | 国产 欧美 日韩 一区 | 国产精品123区| 精品日韩一区二区 | 久久久久国产精品一区 | 91视频进入 | 国产精品片| 中文字幕一区在线观看视频 | 国产精品欧美一区二区三区不卡 | 久久一区二区三区四区 | 日韩欧美在线播放 | 亚洲高清视频在线观看 | 亚洲国产一区二区三区 | 91成人免费电影 | 欧美国产日韩精品 | 免费毛片网 | 欧美综合久久久 | 久久成人一区 |