From 59eaccbd87197095c50c902458bf84777932e51f Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 15 Jan 2024 17:49:54 +0100 Subject: [PATCH 1/3] [enrichment single step] refactoring to fix issue in disappeared result type --- .../main/java/eu/dnetlib/dhp/MoveResult.java | 84 +++++++++++++++++++ ...kResultToCommunityFromOrganizationJob.java | 66 +++++++++------ .../SparkResultToCommunityFromProject.java | 41 ++++----- .../eu/dnetlib/dhp/wf/main/job.properties | 6 +- .../dhp/wf/main/oozie_app/workflow.xml | 2 +- .../input_moveresult_parameters.json | 22 +++++ .../oozie_app/workflow.xml | 29 ++++++- .../oozie_app/workflow.xml | 29 ++++++- 8 files changed, 225 insertions(+), 54 deletions(-) create mode 100644 dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/MoveResult.java create mode 100644 dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/input_moveresult_parameters.json diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/MoveResult.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/MoveResult.java new file mode 100644 index 000000000..5ffcf8d3f --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/MoveResult.java @@ -0,0 +1,84 @@ + +package eu.dnetlib.dhp; + +import static eu.dnetlib.dhp.PropagationConstant.isSparkSessionManaged; +import static eu.dnetlib.dhp.PropagationConstant.readPath; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Result; + +/** + * @author miriam.baglioni + * @Date 15/01/24 + */ +public class MoveResult implements Serializable { + private static final Logger log = LoggerFactory.getLogger(MoveResult.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkResultToCommunityFromOrganizationJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/wf/subworkflows/input_moveresult_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + moveResults(spark, inputPath, outputPath); + + }); + } + + public static void moveResults(SparkSession spark, String inputPath, String outputPath) { + + ModelSupport.entityTypes + .keySet() + .parallelStream() + .filter(e -> ModelSupport.isResult(e)) + // .parallelStream() + .forEach(e -> { + Class resultClazz = ModelSupport.entityTypes.get(e); + Dataset resultDataset = readPath(spark, inputPath + e.name(), resultClazz); + if (resultDataset.count() > 0) { + + resultDataset + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + e.name()); + } + + }); + + } + +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java index cc87b80e5..4f755266a 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java @@ -76,29 +76,41 @@ public class SparkResultToCommunityFromOrganizationJob { ModelSupport.entityTypes .keySet() .parallelStream() + .filter(e -> ModelSupport.isResult(e)) + // .parallelStream() .forEach(e -> { - if (ModelSupport.isResult(e)) { - Class resultClazz = ModelSupport.entityTypes.get(e); - removeOutputDir(spark, outputPath + e.name()); - Dataset result = readPath(spark, inputPath + e.name(), resultClazz); + // if () { + Class resultClazz = ModelSupport.entityTypes.get(e); + removeOutputDir(spark, outputPath + e.name()); + Dataset result = readPath(spark, inputPath + e.name(), resultClazz); - result - .joinWith( - possibleUpdates, - result.col("id").equalTo(possibleUpdates.col("resultId")), - "left_outer") - .map(resultCommunityFn(), Encoders.bean(resultClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + e.name()); + log.info("executing left join"); + result + .joinWith( + possibleUpdates, + result.col("id").equalTo(possibleUpdates.col("resultId")), + "left_outer") + .map(resultCommunityFn(), Encoders.bean(resultClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + e.name()); - readPath(spark, outputPath + e.name(), resultClazz) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(inputPath + e.name()); - } +// log +// .info( +// "reading results from " + outputPath + e.name() + " and copying them to " + inputPath +// + e.name()); +// Dataset tmp = readPath(spark, outputPath + e.name(), resultClazz); +// if (tmp.count() > 0){ +// +// tmp +// .write() +// .mode(SaveMode.Overwrite) +// .option("compression", "gzip") +// .json(inputPath + e.name()); +// } + + // } }); } @@ -115,11 +127,11 @@ public class SparkResultToCommunityFromOrganizationJob { .map(Context::getId) .collect(Collectors.toList()); - @SuppressWarnings("unchecked") - R res = (R) ret.getClass().newInstance(); + // @SuppressWarnings("unchecked") + // R res = (R) ret.getClass().newInstance(); - res.setId(ret.getId()); - List propagatedContexts = new ArrayList<>(); + // res.setId(ret.getId()); + // List propagatedContexts = new ArrayList<>(); for (String cId : communitySet) { if (!contextList.contains(cId)) { Context newContext = new Context(); @@ -133,11 +145,11 @@ public class SparkResultToCommunityFromOrganizationJob { PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME, ModelConstants.DNET_PROVENANCE_ACTIONS))); - propagatedContexts.add(newContext); + ret.getContext().add(newContext); } } - res.setContext(propagatedContexts); - ret.mergeFrom(res); + // res.setContext(propagatedContexts); + // ret.mergeFrom(res); } return ret; }; diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java index dde534061..bb712d878 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java @@ -86,29 +86,30 @@ public class SparkResultToCommunityFromProject implements Serializable { ModelSupport.entityTypes .keySet() .parallelStream() + .filter(e -> ModelSupport.isResult(e)) .forEach(e -> { - if (ModelSupport.isResult(e)) { - removeOutputDir(spark, outputPath + e.name()); - Class resultClazz = ModelSupport.entityTypes.get(e); - Dataset result = readPath(spark, inputPath + e.name(), resultClazz); + // if () { + removeOutputDir(spark, outputPath + e.name()); + Class resultClazz = ModelSupport.entityTypes.get(e); + Dataset result = readPath(spark, inputPath + e.name(), resultClazz); - result - .joinWith( - possibleUpdates, - result.col("id").equalTo(possibleUpdates.col("resultId")), - "left_outer") - .map(resultCommunityFn(), Encoders.bean(resultClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + e.name()); + result + .joinWith( + possibleUpdates, + result.col("id").equalTo(possibleUpdates.col("resultId")), + "left_outer") + .map(resultCommunityFn(), Encoders.bean(resultClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + e.name()); - readPath(spark, outputPath + e.name(), resultClazz) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(inputPath + e.name()); - } + readPath(spark, outputPath + e.name(), resultClazz) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(inputPath + e.name()); + // } }); } diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties index 4cb759343..a84e8ab6b 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties @@ -1,12 +1,12 @@ -sourcePath=/tmp/beta_provision/graph/09_graph_dedup_enriched -resumeFrom=CountryPropagation +sourcePath=/tmp/beta_provision/graph/10_graph_orcid_enriched +resumeFrom=CommunityOrganization allowedsemrelsorcidprop=isSupplementedBy;isSupplementTo allowedsemrelsresultproject=isSupplementedBy;isSupplementTo allowedsemrelscommunitysemrel=isSupplementedBy;isSupplementTo datasourceWhitelistForCountryPropagation=10|opendoar____::16e6a3326dd7d868cbc926602a61e4d0;10|openaire____::fdb035c8b3e0540a8d9a561a6c44f4de;10|eurocrisdris::fe4903425d9040f680d8610d9079ea14;10|openaire____::5b76240cc27a58c6f7ceef7d8c36660e;10|openaire____::172bbccecf8fca44ab6a6653e84cb92a;10|openaire____::149c6590f8a06b46314eed77bfca693f;10|eurocrisdris::a6026877c1a174d60f81fd71f62df1c1;10|openaire____::4692342f0992d91f9e705c26959f09e0;10|openaire____::8d529dbb05ec0284662b391789e8ae2a;10|openaire____::345c9d171ef3c5d706d08041d506428c;10|opendoar____::1c1d4df596d01da60385f0bb17a4a9e0;10|opendoar____::7a614fd06c325499f1680b9896beedeb;10|opendoar____::1ee3dfcd8a0645a25a35977997223d22;10|opendoar____::d296c101daa88a51f6ca8cfc1ac79b50;10|opendoar____::798ed7d4ee7138d49b8828958048130a;10|openaire____::c9d2209ecc4d45ba7b4ca7597acb88a2;10|eurocrisdris::c49e0fe4b9ba7b7fab717d1f0f0a674d;10|eurocrisdris::9ae43d14471c4b33661fedda6f06b539;10|eurocrisdris::432ca599953ff50cd4eeffe22faf3e48 #allowedtypes=pubsrepository::institutional allowedtypes=Institutional -outputPath=/tmp/miriam/enrichment_one_step +outputPath=/tmp/beta_provision/graph/11_graph_orcid pathMap ={"author":"$['author'][*]['fullname']", \ "title":"$['title'][*]['value']",\ "orcid":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']" ,\ diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml index 8e91707b6..9b7fad325 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml @@ -231,7 +231,7 @@ - + diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/input_moveresult_parameters.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/input_moveresult_parameters.json new file mode 100644 index 000000000..4645be435 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/input_moveresult_parameters.json @@ -0,0 +1,22 @@ +[ + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + { + "paramName": "ssm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false + } + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromorganization/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromorganization/oozie_app/workflow.xml index 6aeffb457..18c5f4f0f 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromorganization/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromorganization/oozie_app/workflow.xml @@ -69,7 +69,7 @@ yarn cluster - community2resultfromorganization-Publication + community2resultfromorganization eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob dhp-enrichment-${projectVersion}.jar @@ -88,6 +88,33 @@ --sourcePath${sourcePath}/ --outputPath${workingDir}/communityorganization/resulttocommunityfromorganization/ + + + + + + + yarn + cluster + community2resultfromorganization - move results + eu.dnetlib.dhp.MoveResult + dhp-enrichment-${projectVersion}.jar + + --executor-cores=6 + --executor-memory=5G + --conf spark.executor.memoryOverhead=3g + --conf spark.sql.shuffle.partitions=3284 + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + + --sourcePath${workingDir}/communityorganization/resulttocommunityfromorganization/ + --outputPath${sourcePath}/ + + diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromproject/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromproject/oozie_app/workflow.xml index dd845064b..01e366c02 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromproject/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromproject/oozie_app/workflow.xml @@ -86,12 +86,37 @@ --sourcePath${sourcePath}/ --outputPath${workingDir}/communitythroughproject/ + + + + + + + yarn + cluster + community2resultfromorganization - move results + eu.dnetlib.dhp.MoveResult + dhp-enrichment-${projectVersion}.jar + + --executor-cores=6 + --executor-memory=5G + --conf spark.executor.memoryOverhead=3g + --conf spark.sql.shuffle.partitions=3284 + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + + --sourcePath${workingDir}/communitythroughproject/ + --outputPath${sourcePath}/ + + - - \ No newline at end of file From 67ce2d54be4019d3d1aa157cbc1d50eb03f1ea59 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 17 Jan 2024 16:50:00 +0100 Subject: [PATCH 2/3] [enrichment single step] refactoring to fix issues in disappeared result type --- .../SparkCountryPropagationJob.java | 6 -- .../SparkResultToCommunityFromProject.java | 10 +- ...parkResultToCommunityThroughSemRelJob.java | 21 +---- .../eu/dnetlib/dhp/wf/main/job.properties | 4 +- .../dhp/wf/main/oozie_app/workflow.xml | 2 +- .../bulktag/oozie_app/config-default.xml | 12 ++- .../bulktag/oozie_app/workflow.xml | 18 +++- .../oozie_app/config-default.xml | 4 +- .../countrypropagation/oozie_app/workflow.xml | 92 ++++++++++++++----- .../oozie_app/workflow.xml | 15 ++- .../oozie_app/workflow.xml | 20 ++-- .../projecttoresult/oozie_app/workflow.xml | 15 ++- .../input_moveresult_parameters.json | 0 .../oozie_app/workflow.xml | 14 ++- .../oozie_app/workflow.xml | 16 +++- .../oozie_app/workflow.xml | 49 +++++++++- .../oozie_app/workflow.xml | 14 ++- 17 files changed, 229 insertions(+), 83 deletions(-) rename dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/{ => resulttocommunityfromorganization}/input_moveresult_parameters.json (100%) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index 92930c18b..a0cc4c84a 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -97,12 +97,6 @@ public class SparkCountryPropagationJob { .mode(SaveMode.Overwrite) .json(outputPath); - readPath(spark, outputPath, resultClazz) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(sourcePath); - } private static MapFunction, R> getCountryMergeFn() { diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java index bb712d878..f9c36d7ca 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java @@ -104,11 +104,11 @@ public class SparkResultToCommunityFromProject implements Serializable { .option("compression", "gzip") .json(outputPath + e.name()); - readPath(spark, outputPath + e.name(), resultClazz) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(inputPath + e.name()); +// readPath(spark, outputPath + e.name(), resultClazz) +// .write() +// .mode(SaveMode.Overwrite) +// .option("compression", "gzip") +// .json(inputPath + e.name()); // } }); diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java index 4929c7582..3cf2f73c3 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java @@ -101,11 +101,6 @@ public class SparkResultToCommunityThroughSemRelJob { .option("compression", "gzip") .json(outputPath); - readPath(spark, outputPath, resultClazz) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(inputPath); } private static MapFunction, R> contextUpdaterFn() { @@ -115,11 +110,11 @@ public class SparkResultToCommunityThroughSemRelJob { if (rcl.isPresent()) { Set contexts = new HashSet<>(); ret.getContext().forEach(c -> contexts.add(c.getId())); - List contextList = rcl + rcl .get() .getCommunityList() .stream() - .map( + .forEach( c -> { if (!contexts.contains(c)) { Context newContext = new Context(); @@ -133,19 +128,11 @@ public class SparkResultToCommunityThroughSemRelJob { PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, ModelConstants.DNET_PROVENANCE_ACTIONS))); - return newContext; + ret.getContext().add(newContext); } - return null; - }) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - @SuppressWarnings("unchecked") - R r = (R) ret.getClass().newInstance(); + }); - r.setId(ret.getId()); - r.setContext(contextList); - ret.mergeFrom(r); } return ret; diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties index a84e8ab6b..7e82d9b2c 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties @@ -1,12 +1,12 @@ sourcePath=/tmp/beta_provision/graph/10_graph_orcid_enriched -resumeFrom=CommunityOrganization +resumeFrom=CommunitySemanticRelation allowedsemrelsorcidprop=isSupplementedBy;isSupplementTo allowedsemrelsresultproject=isSupplementedBy;isSupplementTo allowedsemrelscommunitysemrel=isSupplementedBy;isSupplementTo datasourceWhitelistForCountryPropagation=10|opendoar____::16e6a3326dd7d868cbc926602a61e4d0;10|openaire____::fdb035c8b3e0540a8d9a561a6c44f4de;10|eurocrisdris::fe4903425d9040f680d8610d9079ea14;10|openaire____::5b76240cc27a58c6f7ceef7d8c36660e;10|openaire____::172bbccecf8fca44ab6a6653e84cb92a;10|openaire____::149c6590f8a06b46314eed77bfca693f;10|eurocrisdris::a6026877c1a174d60f81fd71f62df1c1;10|openaire____::4692342f0992d91f9e705c26959f09e0;10|openaire____::8d529dbb05ec0284662b391789e8ae2a;10|openaire____::345c9d171ef3c5d706d08041d506428c;10|opendoar____::1c1d4df596d01da60385f0bb17a4a9e0;10|opendoar____::7a614fd06c325499f1680b9896beedeb;10|opendoar____::1ee3dfcd8a0645a25a35977997223d22;10|opendoar____::d296c101daa88a51f6ca8cfc1ac79b50;10|opendoar____::798ed7d4ee7138d49b8828958048130a;10|openaire____::c9d2209ecc4d45ba7b4ca7597acb88a2;10|eurocrisdris::c49e0fe4b9ba7b7fab717d1f0f0a674d;10|eurocrisdris::9ae43d14471c4b33661fedda6f06b539;10|eurocrisdris::432ca599953ff50cd4eeffe22faf3e48 #allowedtypes=pubsrepository::institutional allowedtypes=Institutional -outputPath=/tmp/beta_provision/graph/11_graph_orcid +outputPath=/tmp/miriam/graph/11_graph_orcid pathMap ={"author":"$['author'][*]['fullname']", \ "title":"$['title'][*]['value']",\ "orcid":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']" ,\ diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml index 9b7fad325..8e91707b6 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml @@ -231,7 +231,7 @@ - + diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/config-default.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/config-default.xml index fe82ae194..2695253e6 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/config-default.xml @@ -45,10 +45,18 @@ sparkExecutorMemory - 6G + 5G sparkExecutorCores - 1 + 4 + + + memoryOverhead + 3G + + + partitions + 3284 \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml index 6c5163448..c7a9e8a26 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml @@ -12,6 +12,10 @@ baseURL The URL to access the community APIs + + startFrom> + undelete + @@ -26,12 +30,20 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + ${wf:conf('startFrom') eq 'undelete'} + + + + + @@ -45,7 +57,7 @@ yarn-cluster cluster - bulkTagging-publication + bulkTagging eu.dnetlib.dhp.bulktag.SparkBulkTagJob dhp-enrichment-${projectVersion}.jar @@ -53,6 +65,8 @@ --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${memoryOverhead} + --conf spark.sql.shuffle.partitions=${partitions} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/countrypropagation/oozie_app/config-default.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/countrypropagation/oozie_app/config-default.xml index 2744ea92b..1cb0b8a5e 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/countrypropagation/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/countrypropagation/oozie_app/config-default.xml @@ -45,11 +45,11 @@ sparkExecutorMemory - 6G + 5G sparkExecutorCores - 1 + 4 spark2MaxExecutors diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/countrypropagation/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/countrypropagation/oozie_app/workflow.xml index 81d6dc3dc..3a6e3edfb 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/countrypropagation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/countrypropagation/oozie_app/workflow.xml @@ -12,6 +12,10 @@ allowedtypes the allowed types + + startFrom + undelete + @@ -25,7 +29,15 @@ - + + + + + ${wf:conf('startFrom') eq 'undelete'} + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -61,7 +73,7 @@ --sourcePath${sourcePath} --whitelist${whitelist} --allowedtypes${allowedtypes} - --outputPath${workingDir}/preparedInfo + --outputPath${workingDir}/country/preparedInfo @@ -95,10 +107,10 @@ --conf spark.sql.shuffle.partitions=3840 --sourcePath${sourcePath}/publication - --outputPath${workingDir}/publication - --workingPath${workingDir}/workingP + --outputPath${workingDir}/country/publication + --workingPath${workingDir}/country/workingP --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication - --preparedInfoPath${workingDir}/preparedInfo + --preparedInfoPath${workingDir}/country/preparedInfo @@ -125,10 +137,10 @@ --conf spark.sql.shuffle.partitions=3840 --sourcePath${sourcePath}/dataset - --outputPath${workingDir}/dataset - --workingPath${workingDir}/workingD + --outputPath${workingDir}/country/dataset + --workingPath${workingDir}/country/workingD --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset - --preparedInfoPath${workingDir}/preparedInfo + --preparedInfoPath${workingDir}/country/preparedInfo @@ -155,10 +167,10 @@ --conf spark.sql.shuffle.partitions=3840 --sourcePath${sourcePath}/otherresearchproduct - --outputPath${workingDir}/otherresearchproduct - --workingPath${workingDir}/workingO + --outputPath${workingDir}/country/otherresearchproduct + --workingPath${workingDir}/country/workingO --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --preparedInfoPath${workingDir}/preparedInfo + --preparedInfoPath${workingDir}/country/preparedInfo @@ -185,10 +197,10 @@ --conf spark.sql.shuffle.partitions=3840 --sourcePath${sourcePath}/software - --outputPath${workingDir}/software - --workingPath${workingDir}/workingS + --outputPath${workingDir}/country/software + --workingPath${workingDir}/country/workingS --resultTableNameeu.dnetlib.dhp.schema.oaf.Software - --preparedInfoPath${workingDir}/preparedInfo + --preparedInfoPath${workingDir}/country/preparedInfo @@ -224,9 +236,9 @@ --conf spark.sql.shuffle.partitions=3840 --sourcePath${sourcePath}/publication - --preparedInfoPath${workingDir}/publication + --preparedInfoPath${workingDir}/country/publication --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication - --outputPath${workingDir}/country/publication + --outputPath${workingDir}/country/country/publication @@ -253,9 +265,9 @@ --conf spark.sql.shuffle.partitions=3840 --sourcePath${sourcePath}/dataset - --preparedInfoPath${workingDir}/dataset + --preparedInfoPath${workingDir}/country/dataset --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset - --outputPath${workingDir}/country/dataset + --outputPath${workingDir}/country/country/dataset @@ -282,9 +294,9 @@ --conf spark.sql.shuffle.partitions=3840 --sourcePath${sourcePath}/otherresearchproduct - --preparedInfoPath${workingDir}/otherresearchproduct + --preparedInfoPath${workingDir}/country/otherresearchproduct --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --outputPath${workingDir}/country/otherresearchproduct + --outputPath${workingDir}/country/country/otherresearchproduct @@ -311,15 +323,49 @@ --conf spark.sql.shuffle.partitions=3840 --sourcePath${sourcePath}/software - --preparedInfoPath${workingDir}/software + --preparedInfoPath${workingDir}/country/software --resultTableNameeu.dnetlib.dhp.schema.oaf.Software - --outputPath${workingDir}/country/software + --outputPath${workingDir}/country/country/software - + + + + + yarn + cluster + community2resultfromorganization - move results + eu.dnetlib.dhp.MoveResult + dhp-enrichment-${projectVersion}.jar + + --executor-cores=6 + --executor-memory=5G + --conf spark.executor.memoryOverhead=3g + --conf spark.sql.shuffle.partitions=3284 + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + + --sourcePath${workingDir}/country/country/ + --outputPath${sourcePath}/ + + + + + + + + ${wf:conf('startFrom') eq 'undelete'} + + + + diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/entitytoorganizationfromsemrel/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/entitytoorganizationfromsemrel/oozie_app/workflow.xml index 05824d209..ecec3579b 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/entitytoorganizationfromsemrel/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/entitytoorganizationfromsemrel/oozie_app/workflow.xml @@ -4,7 +4,10 @@ sourcePath the source path - + + startFrom + undelete + @@ -18,7 +21,15 @@ - + + + + + ${wf:conf('startFrom') eq 'undelete'} + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/oozie_app/workflow.xml index 483a805b1..bab1e55df 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/oozie_app/workflow.xml @@ -114,7 +114,7 @@ --sourcePath${sourcePath} --hive_metastore_uris${hive_metastore_uris} --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication - --outputPath${workingDir}/preparedInfo/targetOrcidAssoc + --outputPath${workingDir}/orcid/preparedInfo/targetOrcidAssoc --allowedsemrels${allowedsemrels} @@ -142,7 +142,7 @@ --sourcePath${sourcePath} --hive_metastore_uris${hive_metastore_uris} --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset - --outputPath${workingDir}/preparedInfo/targetOrcidAssoc + --outputPath${workingDir}/orcid/preparedInfo/targetOrcidAssoc --allowedsemrels${allowedsemrels} @@ -170,7 +170,7 @@ --sourcePath${sourcePath} --hive_metastore_uris${hive_metastore_uris} --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --outputPath${workingDir}/preparedInfo/targetOrcidAssoc + --outputPath${workingDir}/orcid/preparedInfo/targetOrcidAssoc --allowedsemrels${allowedsemrels} @@ -198,7 +198,7 @@ --sourcePath${sourcePath} --hive_metastore_uris${hive_metastore_uris} --resultTableNameeu.dnetlib.dhp.schema.oaf.Software - --outputPath${workingDir}/preparedInfo/targetOrcidAssoc + --outputPath${workingDir}/orcid/preparedInfo/targetOrcidAssoc --allowedsemrels${allowedsemrels} @@ -225,8 +225,8 @@ --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} - --sourcePath${workingDir}/orcidprop - --outputPath${workingDir}/orcidprop/mergedOrcidAssoc + --sourcePath${workingDir}/orcid/orcidprop + --outputPath${workingDir}/orcid/orcidprop/mergedOrcidAssoc @@ -261,7 +261,7 @@ --conf spark.hadoop.mapreduce.reduce.speculative=false --conf spark.sql.shuffle.partitions=3840 - --possibleUpdatesPath${workingDir}/orcidprop/mergedOrcidAssoc + --possibleUpdatesPath${workingDir}/orcid/orcidprop/mergedOrcidAssoc --sourcePath${sourcePath}/publication --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication --outputPath${outputPath}/publication @@ -291,7 +291,7 @@ --conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false - --possibleUpdatesPath${workingDir}/orcidprop/mergedOrcidAssoc + --possibleUpdatesPath${workingDir}/orcid/orcidprop/mergedOrcidAssoc --sourcePath${sourcePath}/dataset --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset --outputPath${outputPath}/dataset @@ -321,7 +321,7 @@ --conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false - --possibleUpdatesPath${workingDir}/orcidprop/mergedOrcidAssoc + --possibleUpdatesPath${workingDir}/orcid/orcidprop/mergedOrcidAssoc --sourcePath${sourcePath}/otherresearchproduct --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --outputPath${outputPath}/otherresearchproduct @@ -351,7 +351,7 @@ --conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false - --possibleUpdatesPath${workingDir}/orcidprop/mergedOrcidAssoc + --possibleUpdatesPath${workingDir}/orcid/orcidprop/mergedOrcidAssoc --sourcePath${sourcePath}/software --resultTableNameeu.dnetlib.dhp.schema.oaf.Software --outputPath${outputPath}/software diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/projecttoresult/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/projecttoresult/oozie_app/workflow.xml index f0db9c777..f26f3f98b 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/projecttoresult/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/projecttoresult/oozie_app/workflow.xml @@ -8,7 +8,10 @@ allowedsemrels the allowed semantics - + + startFrom + undelete + @@ -22,7 +25,15 @@ - + + + + + ${wf:conf('startFrom') eq 'undelete'} + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/input_moveresult_parameters.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromorganization/input_moveresult_parameters.json similarity index 100% rename from dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/input_moveresult_parameters.json rename to dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromorganization/input_moveresult_parameters.json diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromorganization/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromorganization/oozie_app/workflow.xml index 18c5f4f0f..aa5357eea 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromorganization/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromorganization/oozie_app/workflow.xml @@ -8,6 +8,10 @@ baseURL the baseURL from where to reach the community APIs + + startFrom + undelete + @@ -21,7 +25,15 @@ - + + + + + ${wf:conf('startFrom') eq 'undelete'} + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromproject/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromproject/oozie_app/workflow.xml index 01e366c02..0ceee5a7e 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromproject/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromproject/oozie_app/workflow.xml @@ -8,6 +8,10 @@ baseURL the base URL to use to select the right community APIs + + startFrom + undelete + @@ -21,7 +25,15 @@ - + + + + + ${wf:conf('startFrom') eq 'undelete'} + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -94,7 +106,7 @@ yarn cluster - community2resultfromorganization - move results + move results eu.dnetlib.dhp.MoveResult dhp-enrichment-${projectVersion}.jar diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromsemrel/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromsemrel/oozie_app/workflow.xml index 773c7fba7..b5e6fbf05 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromsemrel/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromsemrel/oozie_app/workflow.xml @@ -16,9 +16,21 @@ outputPath the output path + + startFrom + undelete + - + + + + + ${wf:conf('startFrom') eq 'undelete'} + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -209,9 +221,9 @@ dhp-enrichment-${projectVersion}.jar --executor-cores=6 - --executor-memory=5G - --conf spark.executor.memoryOverhead=3g - --conf spark.sql.shuffle.partitions=3284 + --executor-memory=4G + --conf spark.executor.memoryOverhead=5G + --conf spark.sql.shuffle.partitions=15000 --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} @@ -324,7 +336,34 @@ - + + + + + yarn + cluster + move results + eu.dnetlib.dhp.MoveResult + dhp-enrichment-${projectVersion}.jar + + --executor-cores=6 + --executor-memory=5G + --conf spark.executor.memoryOverhead=3g + --conf spark.sql.shuffle.partitions=3284 + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + + --sourcePath${workingDir}/communitysemrel/ + --outputPath${sourcePath}/ + + + + + diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttoorganizationfrominstrepo/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttoorganizationfrominstrepo/oozie_app/workflow.xml index e963453da..ca76a0e85 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttoorganizationfrominstrepo/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttoorganizationfrominstrepo/oozie_app/workflow.xml @@ -8,6 +8,10 @@ blacklist The list of institutional repositories that should not be used for the propagation + + startFrom + undelete + @@ -21,7 +25,15 @@ - + + + + + ${wf:conf('startFrom') eq 'undelete'} + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] From 82e9e262ee12e4cd55f1f8593893fc8e41b82a07 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 17 Jan 2024 17:38:03 +0100 Subject: [PATCH 3/3] [enrichment single step] remove parameter from execution --- .../SparkResultToProjectThroughSemRelJob.java | 29 +++++++++---------- .../eu/dnetlib/dhp/wf/main/job.properties | 2 +- .../projecttoresult/oozie_app/workflow.xml | 9 ------ 3 files changed, 15 insertions(+), 25 deletions(-) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob.java index e7518673d..a6466716a 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob.java @@ -64,7 +64,7 @@ public class SparkResultToProjectThroughSemRelJob { removeOutputDir(spark, outputPath); } execPropagation( - spark, outputPath, alreadyLinkedPath, potentialUpdatePath, saveGraph); + spark, outputPath, alreadyLinkedPath, potentialUpdatePath); }); } @@ -72,24 +72,23 @@ public class SparkResultToProjectThroughSemRelJob { SparkSession spark, String outputPath, String alreadyLinkedPath, - String potentialUpdatePath, - Boolean saveGraph) { + String potentialUpdatePath) { Dataset toaddrelations = readPath(spark, potentialUpdatePath, ResultProjectSet.class); Dataset alreadyLinked = readPath(spark, alreadyLinkedPath, ResultProjectSet.class); - if (saveGraph) { - toaddrelations - .joinWith( - alreadyLinked, - toaddrelations.col("resultId").equalTo(alreadyLinked.col("resultId")), - "left_outer") - .flatMap(mapRelationRn(), Encoders.bean(Relation.class)) - .write() - .mode(SaveMode.Append) - .option("compression", "gzip") - .json(outputPath); - } + // if (saveGraph) { + toaddrelations + .joinWith( + alreadyLinked, + toaddrelations.col("resultId").equalTo(alreadyLinked.col("resultId")), + "left_outer") + .flatMap(mapRelationRn(), Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(outputPath); + // } } private static FlatMapFunction, Relation> mapRelationRn() { diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties index 7e82d9b2c..05db04090 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties @@ -1,5 +1,5 @@ sourcePath=/tmp/beta_provision/graph/10_graph_orcid_enriched -resumeFrom=CommunitySemanticRelation +resumeFrom=ResultProject allowedsemrelsorcidprop=isSupplementedBy;isSupplementTo allowedsemrelsresultproject=isSupplementedBy;isSupplementTo allowedsemrelscommunitysemrel=isSupplementedBy;isSupplementTo diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/projecttoresult/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/projecttoresult/oozie_app/workflow.xml index f26f3f98b..21cc5522f 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/projecttoresult/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/projecttoresult/oozie_app/workflow.xml @@ -97,17 +97,8 @@ --potentialUpdatePath${workingDir}/resultproject/preparedInfo/potentialUpdates --alreadyLinkedPath${workingDir}/resultproject/preparedInfo/alreadyLinked - - - - - - - - - \ No newline at end of file