From 7fd89566c2c79908aa040932743e2d582c8b7465 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 8 Mar 2023 12:43:00 +0100 Subject: [PATCH 1/4] [aggregator graph] handle paths including wildcards --- .../graph/raw/GenerateEntitiesApplication.java | 10 ++++------ .../oa/graph/raw/VerifyRecordsApplication.java | 11 +++++------ .../raw/common/AbstractMigrationApplication.java | 16 ++++++++++++++++ 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index 5f9d98073..f85f94046 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -9,6 +9,7 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; +import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.Text; @@ -36,7 +37,7 @@ import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; -public class GenerateEntitiesApplication { +public class GenerateEntitiesApplication extends AbstractMigrationApplication { private static final Logger log = LoggerFactory.getLogger(GenerateEntitiesApplication.class); @@ -112,15 +113,12 @@ public class GenerateEntitiesApplication { final boolean shouldHashId, final Mode mode) { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - final List existingSourcePaths = Arrays - .stream(sourcePaths.split(",")) - .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) - .collect(Collectors.toList()); + final List existingSourcePaths = listEntityPaths(spark, sourcePaths); log.info("Generate entities from files:"); existingSourcePaths.forEach(log::info); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD inputRdd = sc.emptyRDD(); for (final String sp : existingSourcePaths) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java index a8eb871c8..82d7b0729 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java @@ -9,6 +9,7 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; +import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -28,7 +29,7 @@ import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; -public class VerifyRecordsApplication { +public class VerifyRecordsApplication extends AbstractMigrationApplication { private static final Logger log = LoggerFactory.getLogger(VerifyRecordsApplication.class); @@ -69,15 +70,13 @@ public class VerifyRecordsApplication { private static void validateRecords(SparkSession spark, String sourcePaths, String invalidPath, VocabularyGroup vocs) { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - final List existingSourcePaths = Arrays - .stream(sourcePaths.split(",")) - .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) - .collect(Collectors.toList()); + final List existingSourcePaths = listEntityPaths(spark, sourcePaths); log.info("Verify records in files:"); existingSourcePaths.forEach(log::info); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + for (final String sp : existingSourcePaths) { RDD invalidRecords = sc .sequenceFile(sp, Text.class, Text.class) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java index b0dc7d7e7..947f6615b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java @@ -3,9 +3,14 @@ package eu.dnetlib.dhp.oa.graph.raw.common; import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import eu.dnetlib.dhp.common.HdfsSupport; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -18,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.utils.DHPUtils; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; public class AbstractMigrationApplication implements Closeable { @@ -94,6 +101,15 @@ public class AbstractMigrationApplication implements Closeable { } } + protected static List listEntityPaths(final SparkSession spark, final String paths) { + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + return Arrays + .stream(paths.split(",")) + .filter(StringUtils::isNotBlank) + .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()) || p.contains("/*")) + .collect(Collectors.toList()); + } + public ObjectMapper getObjectMapper() { return objectMapper; } From 5b8fd373143b46c866399754926326dcf1a84069 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 8 Mar 2023 15:28:14 +0100 Subject: [PATCH 2/4] [aggregator graph] using dedicated path to sync claims --- .../eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index a47b42c90..048fa839e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -215,7 +215,7 @@ eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication - --hdfsPath${contentPath}/mdstore + --hdfsPath${contentPath}/odf_claims --mongoBaseUrl${mongoURL} --mongoDb${mongoDb} --mdFormatODF @@ -238,7 +238,7 @@ eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication - --hdfsPath${contentPath}/mdstore + --hdfsPath${contentPath}/oaf_claims --mongoBaseUrl${mongoURL} --mongoDb${mongoDb} --mdFormatOAF From e28d395e87cce86c8f59f35f138c48661a58022c Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 8 Mar 2023 21:16:52 +0100 Subject: [PATCH 3/4] [aggregator graph] using dedicated path to sync claims, adjusted paths with wildcards --- .../oa/graph/raw/GenerateEntitiesApplication.java | 2 +- .../dhp/oa/graph/raw/VerifyRecordsApplication.java | 2 +- .../raw/common/AbstractMigrationApplication.java | 14 +++++++------- .../dhp/oa/graph/raw_all/oozie_app/workflow.xml | 4 ++-- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index f85f94046..79ee5c5b6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -9,7 +9,6 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; -import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.Text; @@ -30,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java index 82d7b0729..de0003bd1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java @@ -9,7 +9,6 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; -import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -24,6 +23,7 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java index 947f6615b..950abdcc6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java @@ -9,7 +9,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import eu.dnetlib.dhp.common.HdfsSupport; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -17,14 +16,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.utils.DHPUtils; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SparkSession; public class AbstractMigrationApplication implements Closeable { @@ -104,10 +104,10 @@ public class AbstractMigrationApplication implements Closeable { protected static List listEntityPaths(final SparkSession spark, final String paths) { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); return Arrays - .stream(paths.split(",")) - .filter(StringUtils::isNotBlank) - .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()) || p.contains("/*")) - .collect(Collectors.toList()); + .stream(paths.split(",")) + .filter(StringUtils::isNotBlank) + .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()) || p.contains("/*")) + .collect(Collectors.toList()); } public ObjectMapper getObjectMapper() { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index 048fa839e..b74562284 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -456,7 +456,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims + --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims/*/*,${contentPath}/odf_claims/*/* --invalidPath${workingDir}/invalid_records_claim --isLookupUrl${isLookupUrl} @@ -480,7 +480,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims + --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims/*/*,${contentPath}/odf_claims/*/* --targetPath${workingDir}/entities_claim --isLookupUrl${isLookupUrl} --shouldHashId${shouldHashId} From 24e2fd828b1a231e24a0390d2a1208611f31657e Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 8 Mar 2023 21:17:08 +0100 Subject: [PATCH 4/4] code formatting --- .../dhp/actionmanager/project/PrepareH2020ProgrammeTest.java | 2 +- .../eu/dnetlib/dhp/actionmanager/project/ReadProjectsTest.java | 2 +- .../eu/dnetlib/dhp/actionmanager/project/ReadTopicTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java index c68bfa13a..b30658feb 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java @@ -92,7 +92,7 @@ public class PrepareH2020ProgrammeTest { Assertions.assertEquals(0, verificationDataset.filter("classification = ''").count()); - //tmp.foreach(csvProgramme -> System.out.println(OBJECT_MAPPER.writeValueAsString(csvProgramme))); + // tmp.foreach(csvProgramme -> System.out.println(OBJECT_MAPPER.writeValueAsString(csvProgramme))); Assertions .assertEquals( diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsTest.java index 4be09c4b7..0d92c48a8 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsTest.java @@ -98,7 +98,7 @@ public class ReadProjectsTest { Assertions.assertEquals("H2020-EU.1.3.", project.getLegalBasis()); Assertions.assertEquals("MSCA-IF-2019", project.getTopics()); - //tmp.foreach(p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p))); + // tmp.foreach(p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p))); } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadTopicTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadTopicTest.java index bdb0cc3a1..82a9e6aed 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadTopicTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadTopicTest.java @@ -93,7 +93,7 @@ public class ReadTopicTest { Assertions.assertEquals("Individual Fellowships", topic.getTitle()); Assertions.assertEquals("MSCA-IF-2019", topic.getTopic()); - //tmp.foreach(p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p))); + // tmp.foreach(p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p))); } }