From 83132ee99afda95926c25d3d087c7bc857350460 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 14 Jun 2021 11:57:00 +0200 Subject: [PATCH 1/2] fixed a problem with empty mdstore list --- .../raw/MigrateHdfsMdstoresApplication.java | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java index f4e783edc..31527283c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java @@ -14,6 +14,8 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -52,9 +54,8 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - MigrateHdfsMdstoresApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json"))); + .toString(MigrateHdfsMdstoresApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -94,14 +95,18 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) .toArray(size -> new String[size]); - spark - .read() - .parquet(validPaths) - .map((MapFunction) r -> enrichRecord(r), Encoders.STRING()) - .toJavaRDD() - .mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml))) - // .coalesce(1) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + if (validPaths.length > 0) { + spark + .read() + .parquet(validPaths) + .map((MapFunction) r -> enrichRecord(r), Encoders.STRING()) + .toJavaRDD() + .mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml))) + // .coalesce(1) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + } else { + FileSystem.get(sc.hadoopConfiguration()).createNewFile(new Path(outputPath)); + } } private static String enrichRecord(final Row r) { From ada063ce702373ffc8afc7e5fc41a3188c42a9ec Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 14 Jun 2021 12:04:47 +0200 Subject: [PATCH 2/2] fixed a problem with empty mdstore list (2) --- .../dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java index 31527283c..5109a0763 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java @@ -14,8 +14,6 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -105,7 +103,10 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication // .coalesce(1) .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); } else { - FileSystem.get(sc.hadoopConfiguration()).createNewFile(new Path(outputPath)); + spark.emptyDataFrame() + .toJavaRDD() + .mapToPair(xml -> new Tuple2<>(new Text(), new Text())) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); } }