From 7fd89566c2c79908aa040932743e2d582c8b7465 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 8 Mar 2023 12:43:00 +0100 Subject: [PATCH] [aggregator graph] handle paths including wildcards --- .../graph/raw/GenerateEntitiesApplication.java | 10 ++++------ .../oa/graph/raw/VerifyRecordsApplication.java | 11 +++++------ .../raw/common/AbstractMigrationApplication.java | 16 ++++++++++++++++ 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index 5f9d980739..f85f940468 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -9,6 +9,7 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; +import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.Text; @@ -36,7 +37,7 @@ import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; -public class GenerateEntitiesApplication { +public class GenerateEntitiesApplication extends AbstractMigrationApplication { private static final Logger log = LoggerFactory.getLogger(GenerateEntitiesApplication.class); @@ -112,15 +113,12 @@ public class GenerateEntitiesApplication { final boolean shouldHashId, final Mode mode) { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - final List existingSourcePaths = Arrays - .stream(sourcePaths.split(",")) - .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) - .collect(Collectors.toList()); + final List existingSourcePaths = listEntityPaths(spark, sourcePaths); log.info("Generate entities from files:"); existingSourcePaths.forEach(log::info); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD inputRdd = sc.emptyRDD(); for (final String sp : existingSourcePaths) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java index a8eb871c89..82d7b07295 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java @@ -9,6 +9,7 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; +import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -28,7 +29,7 @@ import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; -public class VerifyRecordsApplication { +public class VerifyRecordsApplication extends AbstractMigrationApplication { private static final Logger log = LoggerFactory.getLogger(VerifyRecordsApplication.class); @@ -69,15 +70,13 @@ public class VerifyRecordsApplication { private static void validateRecords(SparkSession spark, String sourcePaths, String invalidPath, VocabularyGroup vocs) { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - final List existingSourcePaths = Arrays - .stream(sourcePaths.split(",")) - .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) - .collect(Collectors.toList()); + final List existingSourcePaths = listEntityPaths(spark, sourcePaths); log.info("Verify records in files:"); existingSourcePaths.forEach(log::info); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + for (final String sp : existingSourcePaths) { RDD invalidRecords = sc .sequenceFile(sp, Text.class, Text.class) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java index b0dc7d7e76..947f6615b5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java @@ -3,9 +3,14 @@ package eu.dnetlib.dhp.oa.graph.raw.common; import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import eu.dnetlib.dhp.common.HdfsSupport; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -18,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.utils.DHPUtils; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; public class AbstractMigrationApplication implements Closeable { @@ -94,6 +101,15 @@ public class AbstractMigrationApplication implements Closeable { } } + protected static List listEntityPaths(final SparkSession spark, final String paths) { + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + return Arrays + .stream(paths.split(",")) + .filter(StringUtils::isNotBlank) + .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()) || p.contains("/*")) + .collect(Collectors.toList()); + } + public ObjectMapper getObjectMapper() { return objectMapper; }