From 93665773eacc4d11b3d1631f405cfbd61349b998 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 25 Feb 2020 15:59:21 +0100 Subject: [PATCH] Fixed a problem with JavaRDD Union --- .../migration/ExtractEntitiesFromHDFSJob.java | 46 +++++++++++++++---- .../dhp/migration/oozie_app/workflow.xml | 17 +++++-- 2 files changed, 49 insertions(+), 14 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/ExtractEntitiesFromHDFSJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/ExtractEntitiesFromHDFSJob.java index 22b61798e..3b6fc9b5d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/ExtractEntitiesFromHDFSJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/ExtractEntitiesFromHDFSJob.java @@ -1,10 +1,16 @@ package eu.dnetlib.dhp.migration; +import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -23,6 +29,8 @@ import scala.Tuple2; public class ExtractEntitiesFromHDFSJob { + private static final Log log = LogFactory.getLog(ExtractEntitiesFromHDFSJob.class); + public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils.toString(MigrateMongoMdstoresApplication.class @@ -35,10 +43,11 @@ public class ExtractEntitiesFromHDFSJob { .master(parser.get("master")) .getOrCreate(); - final List sourcePaths = Arrays.asList(parser.get("sourcePaths").split(",")); - final String targetPath = parser.get("graphRawPath"); - try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) { + + final List sourcePaths = Arrays.stream(parser.get("sourcePaths").split(",")).filter(p -> exists(sc, p)).collect(Collectors.toList()); + final String targetPath = parser.get("graphRawPath"); + processEntity(sc, Publication.class, sourcePaths, targetPath); processEntity(sc, Dataset.class, sourcePaths, targetPath); processEntity(sc, Software.class, sourcePaths, targetPath); @@ -53,16 +62,33 @@ public class ExtractEntitiesFromHDFSJob { private static void processEntity(final JavaSparkContext sc, final Class clazz, final List sourcePaths, final String targetPath) { final String type = clazz.getSimpleName().toLowerCase(); - final JavaRDD inputRdd = sc.emptyRDD(); - sourcePaths.forEach(sourcePath -> inputRdd.union(sc.sequenceFile(sourcePath, Text.class, Text.class) - .map(k -> new Tuple2<>(k._1().toString(), k._2().toString())) - .filter(k -> isEntityType(k._1(), type)) - .map(Tuple2::_2))); + log.info(String.format("Processing entities (%s) in files:", type)); + sourcePaths.forEach(log::info); + + JavaRDD inputRdd = sc.emptyRDD(); + + for (final String sp : sourcePaths) { + inputRdd = inputRdd.union(sc.sequenceFile(sp, Text.class, Text.class) + .map(k -> new Tuple2<>(k._1().toString(), k._2().toString())) + .filter(k -> isEntityType(k._1(), type)) + .map(Tuple2::_2)); + } inputRdd.saveAsTextFile(targetPath + "/" + type); + } - private static boolean isEntityType(final String item, final String entity) { - return StringUtils.substringAfter(item, ":").equalsIgnoreCase(entity); + private static boolean isEntityType(final String item, final String type) { + return StringUtils.substringAfter(item, ":").equalsIgnoreCase(type); + } + + private static boolean exists(final JavaSparkContext context, final String pathToFile) { + try { + final FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(context.hadoopConfiguration()); + final Path path = new Path(pathToFile); + return hdfs.exists(path); + } catch (final IOException e) { + throw new RuntimeException(e); + } } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/workflow.xml index b11cddfcf..658963321 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/workflow.xml @@ -43,7 +43,7 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -87,7 +87,7 @@ -dbpasswd${postgresPassword} -aclaims - + @@ -171,11 +171,20 @@ -pguser${postgresUser} -pgpasswd${postgresPassword} - + - + + + + + + + + + + ${jobTracker} ${nameNode}