From bead34d11a889b716a256fb0b763995d2c220f0f Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 1 Feb 2021 14:58:06 +0100 Subject: [PATCH] code refactor --- .../GenerateNativeStoreSparkJob.java | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java index 466ddcd21..553a3dc5f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java @@ -157,8 +157,9 @@ public class GenerateNativeStoreSparkJob { final Encoder encoder = Encoders.bean(MetadataRecord.class); - Dataset mdstore = spark.createDataset(nativeStore.rdd(), encoder); + final Dataset mdstore = spark.createDataset(nativeStore.rdd(), encoder); + final String targetPath = currentVersion.getHdfsPath() + DATASET_NAME; if (readMdStoreVersion != null) { // INCREMENTAL MODE @@ -168,28 +169,35 @@ public class GenerateNativeStoreSparkJob { .as(encoder); TypedColumn aggregator = new MDStoreAggregator().toColumn(); - mdstore = currentMdStoreVersion - .union(mdstore) - .groupByKey( - (MapFunction) MetadataRecord::getId, - Encoders.STRING()) - .agg(aggregator) - .map((MapFunction, MetadataRecord>) Tuple2::_2, encoder); + saveDataset( + currentMdStoreVersion + .union(mdstore) + .groupByKey( + (MapFunction) MetadataRecord::getId, + Encoders.STRING()) + .agg(aggregator) + .map((MapFunction, MetadataRecord>) Tuple2::_2, encoder), + targetPath); + } else { + saveDataset(mdstore, targetPath); } - mdstore - .write() - .mode(SaveMode.Overwrite) - .format("parquet") - .save(currentVersion.getHdfsPath() + DATASET_NAME); - mdstore = spark.read().load(currentVersion.getHdfsPath() + DATASET_NAME).as(encoder); - final Long total = mdstore.count(); + final Long total = spark.read().load(targetPath).count(); AggregationUtility.writeTotalSizeOnHDFS(spark, total, currentVersion.getHdfsPath() + "/size"); }); } + private static void saveDataset(final Dataset currentMdStore, final String targetPath) { + currentMdStore + .write() + .mode(SaveMode.Overwrite) + .format("parquet") + .save(targetPath); + + } + public static MetadataRecord parseRecord( final String input, final String xpath,