From e28d395e87cce86c8f59f35f138c48661a58022c Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 8 Mar 2023 21:16:52 +0100 Subject: [PATCH] [aggregator graph] using dedicated path to sync claims, adjusted paths with wildcards --- .../oa/graph/raw/GenerateEntitiesApplication.java | 2 +- .../dhp/oa/graph/raw/VerifyRecordsApplication.java | 2 +- .../raw/common/AbstractMigrationApplication.java | 14 +++++++------- .../dhp/oa/graph/raw_all/oozie_app/workflow.xml | 4 ++-- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index f85f940468..79ee5c5b65 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -9,7 +9,6 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; -import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.Text; @@ -30,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java index 82d7b07295..de0003bd18 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java @@ -9,7 +9,6 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; -import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -24,6 +23,7 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java index 947f6615b5..950abdcc65 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java @@ -9,7 +9,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import eu.dnetlib.dhp.common.HdfsSupport; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -17,14 +16,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.utils.DHPUtils; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SparkSession; public class AbstractMigrationApplication implements Closeable { @@ -104,10 +104,10 @@ public class AbstractMigrationApplication implements Closeable { protected static List listEntityPaths(final SparkSession spark, final String paths) { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); return Arrays - .stream(paths.split(",")) - .filter(StringUtils::isNotBlank) - .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()) || p.contains("/*")) - .collect(Collectors.toList()); + .stream(paths.split(",")) + .filter(StringUtils::isNotBlank) + .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()) || p.contains("/*")) + .collect(Collectors.toList()); } public ObjectMapper getObjectMapper() { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index 048fa839e5..b74562284f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -456,7 +456,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims + --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims/*/*,${contentPath}/odf_claims/*/* --invalidPath${workingDir}/invalid_records_claim --isLookupUrl${isLookupUrl} @@ -480,7 +480,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims + --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims/*/*,${contentPath}/odf_claims/*/* --targetPath${workingDir}/entities_claim --isLookupUrl${isLookupUrl} --shouldHashId${shouldHashId}