成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Hudi Java Client總結之讀取Hive寫Hudi代碼

開發 前端
initTable主要是根據一些配置信息,生成.hoodie元數據路徑,并生成hoodie.properties元數據文件,該文件里持久化保存了Hudi的一些配置信息。

前言

Hudi除了支持Spark、Fink寫Hudi外,還支持Java客戶端。本文總結Hudi Java Client如何使用,主要為代碼示例,可以實現讀取Hive表寫Hudi表。當然也支持讀取其他數據源,比如mysql,實現讀取mysql的歷史數據和增量數據寫Hudi。

版本

Hudi 0.12.0

功能支持

支持insert/upsert/delete,暫不支持bulkInsert目前僅支持COW表支持完整的寫Hudi操作,包括rollback、clean、archive等。

代碼

完整代碼已上傳GitHub:https://github.com/dongkelun/hudi-demo/tree/master/java-client。

其中HoodieJavaWriteClientExample是從Hudi源碼里拷貝的,包含了insert/upsert/delte/的代碼示例,JavaClientHive2Hudi是我自己的寫的代碼示例總結,實現了kerberos認證、讀取Hive表Schema作為寫hudi的Schema、讀取Hive表數據寫hudi表,并同步hudi元數據至hive元數據,實現自動創建Hive元數據,當然也支持讀取其他數據源,比如mysql,實現歷史和增量寫。

相比于HoodieJavaWriteClientExample,JavaClientHive2Hudi加了很多配置參數,更貼近實際使用,比如HoodieJavaWriteClientExample的payload為HoodieAvroPayload這只能作為示例使用,JavaClientHive2Hudi使用的為DefaultHoodieRecordPayload它支持預合并和歷史值比較,關于這一點可以參考我之前寫的文章:Hudi preCombinedField 總結(二)-源碼分析,如果只需要預合并功能,可以使用OverwriteWithLatestAvroPayload,這倆分別是Spark SQL 和 Spark DF的默認值,當然都不需要的話,也支持HoodieAvroPayload,代碼里是根據條件判斷需要用哪個payloadClassName。

String payloadClassName = shouldOrdering ? DefaultHoodieRecordPayload.class.getName() :
shouldCombine ? OverwriteWithLatestAvroPayload.class.getName() : HoodieAvroPayload.class.getName();

然后利用反射構造payload,其實這里反射的邏輯就是Hudi Spark源碼里的邏輯。

另一個它更貼近實際使用的原因就是我們項目上就是將Hudi Java Client封裝成了一個NIFI processor,然后用NIFI調度,其性能和穩定性都能夠滿足項目需求,這里的核心邏輯和實際項目中的邏輯是差不多的。關于我們使用Java客戶端的原因是由于歷史原因造成的,因為我們之前還沒有調度Spark、Flink的開發工具(之前用的NIFI),而開發一個新的開發工具的話是需要時間成本的,所以選擇了Java客戶端,我們現在已經將Apache DolphinScheduler作為自己的開發調度工具了,后面會主要使用Spark/Flink,所以現在總結一下Hudi Java Client的使用以及源碼,避免遺忘,也希望對大家有所幫助。

初始化Hudi表

Java Client的代碼更貼近源碼。

initTable主要是根據一些配置信息,生成.hoodie元數據路徑,并生成hoodie.properties元數據文件,該文件里持久化保存了Hudi的一些配置信息。

if (!(fs.exists(path) && fs.exists(hoodiePath))) { //根據Hudi路徑存不存在,判斷Hudi表需不需要初始化
if (Arrays.asList(INSERT_OPERATION, UPSERT_OPERATION).contains(writeOperationType)) {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(TABLE_TYPE)
.setTableName(targetTable)
.setPayloadClassName(payloadClassName)
.setRecordKeyFields(recordKeyFields)
.setPreCombineField(preCombineField)
.setPartitionFields(partitionFields)
.setBootstrapIndexClass(NoOpBootstrapIndex.class.getName())
.initTable(hadoopConf, tablePath);
} else if (writeOperationType.equals(DELETE_OPERATION)) { //Delete操作,Hudi表必須已經存在
throw new TableNotFoundException(tablePath);
}
}

hoodie.properties

#Properties saved on 2022-10-24T07:40:36.530Z
#Mon Oct 24 15:40:36 CST 2022
hoodie.table.name=test_hudi_target
hoodie.archivelog.folder=archived
hoodie.table.type=COPY_ON_WRITE
hoodie.table.version=5
hoodie.timeline.layout.version=1
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.checksum=1749434190

創建HoodieJavaWriteClient

首先要創建HoodieWriteConfig,主要是hudi的一些配置,比如Schema、表名、payload、索引、clean等一些參數,具體可以自己去了解。

HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(writeSchema.toString())
.withParallelism(2, 2).withDeleteParallelism(2)
.forTable(targetTable)
.withWritePayLoad(payloadClassName)
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(orderingField).build())
.withIndexConfig(HoodieIndexConfig.newBuilder()
.withIndexType(HoodieIndex.IndexType.BLOOM)
// .bloomIndexPruneByRanges(false) // 1000萬總體時間提升1分鐘
.bloomFilterFPP(0.000001) // 1000萬總體時間提升3分鐘
.fromProperties(indexProperties)
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.compactionSmallFileSize(smallFileLimit)
.approxRecordSize(recordSizeEstimate).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(150, 200).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(100).build())
.withStorageConfig(HoodieStorageConfig.newBuilder()
.parquetMaxFileSize(maxFileSize).build())
.build();

writeClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg)

startCommit

返回commitTime,首先會執行rollback,然后創建一個.commit.request,再將commitTime返回。

String newCommitTime = writeClient.startCommit();

generateRecord

這里主要是構造寫hudi需要的數據結構,包含HoodieKey和payLoad,其中delete操作只需要HoodieKey。

public static List<HoodieRecord<HoodieRecordPayload>> generateRecord(ResultSet rs,
Schema writeSchema,
String payloadClassName,
boolean shouldCombine) throws IOException, SQLException {
List<HoodieRecord<HoodieRecordPayload>> list = new ArrayList<>();

while (rs.next()) {
GenericRecord rec = new GenericData.Record(writeSchema);

writeSchema.getFields().forEach(field -> {
try {
rec.put(field.name(), convertValueType(rs, field.name(), field.schema().getType()));
} catch (SQLException e) {
throw new RuntimeException(e);
}
});

String partitionPath = partitionFields == null ? "" : getRecordPartitionPath(rs, writeSchema);
System.out.println(partitionPath);
String rowKey = recordKeyFields == null && writeOperationType.equals(INSERT_OPERATION) ? UUID.randomUUID().toString() : getRecordKey(rs, writeSchema);
HoodieKey key = new HoodieKey(rowKey, partitionPath);
if (shouldCombine) {
Object orderingVal = HoodieAvroUtils.getNestedFieldVal(rec, preCombineField, false, false);
list.add(new HoodieAvroRecord<>(key, createPayload(payloadClassName, rec, (Comparable) orderingVal)));
} else {
list.add(new HoodieAvroRecord<>(key, createPayload(payloadClassName, rec)));
}

}
return list;
}

寫Hudi

最后執行寫Hudi的操作,常用upsert/insert/delete,Java Client也是默認開啟clean等操作的,具體的實現是在HoodieJavaCopyOnWriteTable中。目前還不支持bulkInsert等操作,后面如果我有能力的話,會嘗試提交PR支持。

writeClient.upsert(records, newCommitTime);
writeClient.insert(records, newCommitTime);
writeClient.delete(records, newCommitTime);

同步Hive

最后是同步元數據至Hive,實現在hive中建表,這一步是可選的。這樣可以利用Hive SQL和Spark SQL查詢Hudi表。

/**
* 利用HiveSyncTool同步Hive元數據
* Spark寫Hudi同步hive元數據的源碼就是這樣同步的
*
* @param properties
* @param hiveConf
*/
public static void syncHive(TypedProperties properties, HiveConf hiveConf) {
HiveSyncTool hiveSyncTool = new HiveSyncTool(properties, hiveConf);
hiveSyncTool.syncHoodieTable();
}

public static HiveConf getHiveConf(String hiveSitePath, String coreSitePath, String hdfsSitePath) {
HiveConf configuration = new HiveConf();
configuration.addResource(new Path(hiveSitePath));
configuration.addResource(new Path(coreSitePath));
configuration.addResource(new Path(hdfsSitePath));

return configuration;
}

/**
* 同步Hive元數據的一些屬性配置
* @param basePath
* @return
*/
public static TypedProperties getHiveSyncProperties(String basePath) {
TypedProperties properties = new TypedProperties();
properties.put(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), HiveSyncMode.HMS.name());
properties.put(HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE.key(), true);
properties.put(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), dbName);
properties.put(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), targetTable);
properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), basePath);
properties.put(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), MultiPartKeysValueExtractor.class.getName());
properties.put(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), partitionFields);
if (partitionFields != null && !partitionFields.isEmpty()) {
properties.put(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), partitionFields);
}

return properties;
}

與0.9.0版本差異

之前是基于0.9.0版本開發的,本文代碼示例基于0.12.0,核心代碼是一樣的,差異的地方有兩處。

1、0.9.0 clean、archive的參數都是在withCompactionConfig中,現在單獨拎出來2、0.9.0 HiveSyncTool的參數為HiveSyncConfig,現在為TypedProperties。

總結

Hudi Java Client和Spark、Flink一樣都可以實現完整的寫Hudi的邏輯,但是目前功能支持還不完善,比如不支持MOR表,而且性能上也不如Spark、Flink,畢竟Spark、FLink都是集群,但是Hudi Java Client可以集成到其他框架中,比如NIFI,集成起來比較方便,集成到NIFI的好處是,可以通過拖來拽配置參數的形式完成歷史數據和增量數據寫入Hudi。也可以自己實現多線程,提升性能,我們目前測試的性能是Insert可以達到10000條/s,而upsert因為需要讀取索引,還有歷史數據的更新,可能需要重寫整個表,所以當歷史數據比較大且更新占比比較高時,單線程的性能會非常差,但是我們基于源碼改造,將布隆索引和寫數據的部分改為多線程后,性能就會提升很多,當然這也取決于機器的性能,和CPU、內存有關。對于數據量不是很大的ZF數據,一般大表幾十億,性能還是可以滿足要求的。

責任編輯:武曉燕 來源: 倫少的博客
相關推薦

2022-11-01 07:43:30

2022-11-03 07:22:42

2023-02-26 00:12:10

Hadoop數據湖存儲

2022-10-17 07:51:31

Hudi異常HDFS

2024-04-26 07:36:42

Hudi 1.0數據湖倉數據查詢

2022-10-24 00:26:51

大數據Hadoop存儲層

2021-08-31 10:07:16

Flink Hud數據湖阿里云

2022-12-08 07:17:49

2022-12-23 16:52:22

Lakehouse數據湖

2022-07-20 11:47:18

數據

2022-10-17 10:48:50

Hudi大數據Hadoop

2023-07-19 16:22:00

Hudi機器學習

2020-03-26 10:05:18

大數據IT互聯網

2021-09-13 14:19:03

HudiLakehouse阿里云

2023-12-14 13:01:00

Hudivivo

2023-09-05 07:22:17

Hudi數據存儲

2025-06-09 09:57:16

2022-06-08 13:25:51

數據

2021-09-13 13:46:29

Apache HudiB 站數據湖

2022-06-09 14:19:46

順豐數據集成Flink
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 中文字幕一区在线观看视频 | 九九热精品在线 | 操亚洲| 国产精品久久久久久久7777 | 国产精品一区二 | 亚洲97| 黄色毛片在线播放 | 国产成人免费视频网站视频社区 | 成人在线精品视频 | 成人在线观看免费视频 | 99久久久无码国产精品 | 四虎av电影| 日韩欧美三级 | 一区二区三区视频免费观看 | 国产91一区 | 久久国产精品一区 | 精品一区二区久久久久久久网站 | 欧美激情精品久久久久久变态 | 久久久久久久久国产 | 欧美在线a | 99re66在线观看精品热 | 亚洲xx在线 | 日韩精品免费在线观看 | 久久久久久成人 | 一级大黄| 国产成人在线观看免费 | 巨大黑人极品videos精品 | 欧美一级二级三级视频 | 农村真人裸体丰满少妇毛片 | 精品久久久久久久久久久久久久久久久 | 亚洲精品 在线播放 | 波多野吉衣久久 | 久久99精品国产自在现线小黄鸭 | 亚洲成人蜜桃 | av网站在线看 | 1级毛片| 特级黄一级播放 | 欧美精品一区二区三区四区五区 | 日本欧美视频 | 精品久久久久久亚洲精品 | 久久伊人精品 |