diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java index c68bfa13a..b30658feb 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java @@ -92,7 +92,7 @@ public class PrepareH2020ProgrammeTest { Assertions.assertEquals(0, verificationDataset.filter("classification = ''").count()); - //tmp.foreach(csvProgramme -> System.out.println(OBJECT_MAPPER.writeValueAsString(csvProgramme))); + // tmp.foreach(csvProgramme -> System.out.println(OBJECT_MAPPER.writeValueAsString(csvProgramme))); Assertions .assertEquals( diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsTest.java index 4be09c4b7..0d92c48a8 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsTest.java @@ -98,7 +98,7 @@ public class ReadProjectsTest { Assertions.assertEquals("H2020-EU.1.3.", project.getLegalBasis()); Assertions.assertEquals("MSCA-IF-2019", project.getTopics()); - //tmp.foreach(p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p))); + // tmp.foreach(p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p))); } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadTopicTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadTopicTest.java index bdb0cc3a1..82a9e6aed 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadTopicTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadTopicTest.java @@ -93,7 +93,7 @@ public class ReadTopicTest { Assertions.assertEquals("Individual Fellowships", topic.getTitle()); Assertions.assertEquals("MSCA-IF-2019", topic.getTopic()); - //tmp.foreach(p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p))); + // tmp.foreach(p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p))); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index 5f9d98073..79ee5c5b6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; @@ -36,7 +37,7 @@ import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; -public class GenerateEntitiesApplication { +public class GenerateEntitiesApplication extends AbstractMigrationApplication { private static final Logger log = LoggerFactory.getLogger(GenerateEntitiesApplication.class); @@ -112,15 +113,12 @@ public class GenerateEntitiesApplication { final boolean shouldHashId, final Mode mode) { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - final List existingSourcePaths = Arrays - .stream(sourcePaths.split(",")) - .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) - .collect(Collectors.toList()); + final List existingSourcePaths = listEntityPaths(spark, sourcePaths); log.info("Generate entities from files:"); existingSourcePaths.forEach(log::info); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD inputRdd = sc.emptyRDD(); for (final String sp : existingSourcePaths) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java index a8eb871c8..de0003bd1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java @@ -23,12 +23,13 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; -public class VerifyRecordsApplication { +public class VerifyRecordsApplication extends AbstractMigrationApplication { private static final Logger log = LoggerFactory.getLogger(VerifyRecordsApplication.class); @@ -69,15 +70,13 @@ public class VerifyRecordsApplication { private static void validateRecords(SparkSession spark, String sourcePaths, String invalidPath, VocabularyGroup vocs) { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - final List existingSourcePaths = Arrays - .stream(sourcePaths.split(",")) - .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) - .collect(Collectors.toList()); + final List existingSourcePaths = listEntityPaths(spark, sourcePaths); log.info("Verify records in files:"); existingSourcePaths.forEach(log::info); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + for (final String sp : existingSourcePaths) { RDD invalidRecords = sc .sequenceFile(sp, Text.class, Text.class) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java index b0dc7d7e7..950abdcc6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java @@ -3,19 +3,26 @@ package eu.dnetlib.dhp.oa.graph.raw.common; import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.utils.DHPUtils; @@ -94,6 +101,15 @@ public class AbstractMigrationApplication implements Closeable { } } + protected static List listEntityPaths(final SparkSession spark, final String paths) { + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + return Arrays + .stream(paths.split(",")) + .filter(StringUtils::isNotBlank) + .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()) || p.contains("/*")) + .collect(Collectors.toList()); + } + public ObjectMapper getObjectMapper() { return objectMapper; } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index a47b42c90..b74562284 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -215,7 +215,7 @@ eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication - --hdfsPath${contentPath}/mdstore + --hdfsPath${contentPath}/odf_claims --mongoBaseUrl${mongoURL} --mongoDb${mongoDb} --mdFormatODF @@ -238,7 +238,7 @@ eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication - --hdfsPath${contentPath}/mdstore + --hdfsPath${contentPath}/oaf_claims --mongoBaseUrl${mongoURL} --mongoDb${mongoDb} --mdFormatOAF @@ -456,7 +456,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims + --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims/*/*,${contentPath}/odf_claims/*/* --invalidPath${workingDir}/invalid_records_claim --isLookupUrl${isLookupUrl} @@ -480,7 +480,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims + --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims/*/*,${contentPath}/odf_claims/*/* --targetPath${workingDir}/entities_claim --isLookupUrl${isLookupUrl} --shouldHashId${shouldHashId}