code refactor

This commit is contained in:
Sandro La Bruzzo 2021-02-01 14:58:06 +01:00
parent 6ff234d81b
commit bead34d11a
1 changed files with 23 additions and 15 deletions

View File

@ -157,8 +157,9 @@ public class GenerateNativeStoreSparkJob {
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class); final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
Dataset<MetadataRecord> mdstore = spark.createDataset(nativeStore.rdd(), encoder); final Dataset<MetadataRecord> mdstore = spark.createDataset(nativeStore.rdd(), encoder);
final String targetPath = currentVersion.getHdfsPath() + DATASET_NAME;
if (readMdStoreVersion != null) { if (readMdStoreVersion != null) {
// INCREMENTAL MODE // INCREMENTAL MODE
@ -168,28 +169,35 @@ public class GenerateNativeStoreSparkJob {
.as(encoder); .as(encoder);
TypedColumn<MetadataRecord, MetadataRecord> aggregator = new MDStoreAggregator().toColumn(); TypedColumn<MetadataRecord, MetadataRecord> aggregator = new MDStoreAggregator().toColumn();
mdstore = currentMdStoreVersion saveDataset(
.union(mdstore) currentMdStoreVersion
.groupByKey( .union(mdstore)
(MapFunction<MetadataRecord, String>) MetadataRecord::getId, .groupByKey(
Encoders.STRING()) (MapFunction<MetadataRecord, String>) MetadataRecord::getId,
.agg(aggregator) Encoders.STRING())
.map((MapFunction<Tuple2<String, MetadataRecord>, MetadataRecord>) Tuple2::_2, encoder); .agg(aggregator)
.map((MapFunction<Tuple2<String, MetadataRecord>, MetadataRecord>) Tuple2::_2, encoder),
targetPath);
} else {
saveDataset(mdstore, targetPath);
} }
mdstore
.write()
.mode(SaveMode.Overwrite)
.format("parquet")
.save(currentVersion.getHdfsPath() + DATASET_NAME);
mdstore = spark.read().load(currentVersion.getHdfsPath() + DATASET_NAME).as(encoder);
final Long total = mdstore.count(); final Long total = spark.read().load(targetPath).count();
AggregationUtility.writeTotalSizeOnHDFS(spark, total, currentVersion.getHdfsPath() + "/size"); AggregationUtility.writeTotalSizeOnHDFS(spark, total, currentVersion.getHdfsPath() + "/size");
}); });
} }
private static void saveDataset(final Dataset<MetadataRecord> currentMdStore, final String targetPath) {
currentMdStore
.write()
.mode(SaveMode.Overwrite)
.format("parquet")
.save(targetPath);
}
public static MetadataRecord parseRecord( public static MetadataRecord parseRecord(
final String input, final String input,
final String xpath, final String xpath,