From 59eaccbd87197095c50c902458bf84777932e51f Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 15 Jan 2024 17:49:54 +0100 Subject: [PATCH] [enrichment single step] refactoring to fix issue in disappeared result type --- .../main/java/eu/dnetlib/dhp/MoveResult.java | 84 +++++++++++++++++++ ...kResultToCommunityFromOrganizationJob.java | 66 +++++++++------ .../SparkResultToCommunityFromProject.java | 41 ++++----- .../eu/dnetlib/dhp/wf/main/job.properties | 6 +- .../dhp/wf/main/oozie_app/workflow.xml | 2 +- .../input_moveresult_parameters.json | 22 +++++ .../oozie_app/workflow.xml | 29 ++++++- .../oozie_app/workflow.xml | 29 ++++++- 8 files changed, 225 insertions(+), 54 deletions(-) create mode 100644 dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/MoveResult.java create mode 100644 dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/input_moveresult_parameters.json diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/MoveResult.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/MoveResult.java new file mode 100644 index 0000000000..5ffcf8d3f8 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/MoveResult.java @@ -0,0 +1,84 @@ + +package eu.dnetlib.dhp; + +import static eu.dnetlib.dhp.PropagationConstant.isSparkSessionManaged; +import static eu.dnetlib.dhp.PropagationConstant.readPath; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Result; + +/** + * @author miriam.baglioni + * @Date 15/01/24 + */ +public class MoveResult implements Serializable { + private static final Logger log = LoggerFactory.getLogger(MoveResult.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkResultToCommunityFromOrganizationJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/wf/subworkflows/input_moveresult_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + moveResults(spark, inputPath, outputPath); + + }); + } + + public static void moveResults(SparkSession spark, String inputPath, String outputPath) { + + ModelSupport.entityTypes + .keySet() + .parallelStream() + .filter(e -> ModelSupport.isResult(e)) + // .parallelStream() + .forEach(e -> { + Class resultClazz = ModelSupport.entityTypes.get(e); + Dataset resultDataset = readPath(spark, inputPath + e.name(), resultClazz); + if (resultDataset.count() > 0) { + + resultDataset + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + e.name()); + } + + }); + + } + +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java index cc87b80e5e..4f755266a1 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java @@ -76,29 +76,41 @@ public class SparkResultToCommunityFromOrganizationJob { ModelSupport.entityTypes .keySet() .parallelStream() + .filter(e -> ModelSupport.isResult(e)) + // .parallelStream() .forEach(e -> { - if (ModelSupport.isResult(e)) { - Class resultClazz = ModelSupport.entityTypes.get(e); - removeOutputDir(spark, outputPath + e.name()); - Dataset result = readPath(spark, inputPath + e.name(), resultClazz); + // if () { + Class resultClazz = ModelSupport.entityTypes.get(e); + removeOutputDir(spark, outputPath + e.name()); + Dataset result = readPath(spark, inputPath + e.name(), resultClazz); - result - .joinWith( - possibleUpdates, - result.col("id").equalTo(possibleUpdates.col("resultId")), - "left_outer") - .map(resultCommunityFn(), Encoders.bean(resultClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + e.name()); + log.info("executing left join"); + result + .joinWith( + possibleUpdates, + result.col("id").equalTo(possibleUpdates.col("resultId")), + "left_outer") + .map(resultCommunityFn(), Encoders.bean(resultClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + e.name()); - readPath(spark, outputPath + e.name(), resultClazz) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(inputPath + e.name()); - } +// log +// .info( +// "reading results from " + outputPath + e.name() + " and copying them to " + inputPath +// + e.name()); +// Dataset tmp = readPath(spark, outputPath + e.name(), resultClazz); +// if (tmp.count() > 0){ +// +// tmp +// .write() +// .mode(SaveMode.Overwrite) +// .option("compression", "gzip") +// .json(inputPath + e.name()); +// } + + // } }); } @@ -115,11 +127,11 @@ public class SparkResultToCommunityFromOrganizationJob { .map(Context::getId) .collect(Collectors.toList()); - @SuppressWarnings("unchecked") - R res = (R) ret.getClass().newInstance(); + // @SuppressWarnings("unchecked") + // R res = (R) ret.getClass().newInstance(); - res.setId(ret.getId()); - List propagatedContexts = new ArrayList<>(); + // res.setId(ret.getId()); + // List propagatedContexts = new ArrayList<>(); for (String cId : communitySet) { if (!contextList.contains(cId)) { Context newContext = new Context(); @@ -133,11 +145,11 @@ public class SparkResultToCommunityFromOrganizationJob { PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME, ModelConstants.DNET_PROVENANCE_ACTIONS))); - propagatedContexts.add(newContext); + ret.getContext().add(newContext); } } - res.setContext(propagatedContexts); - ret.mergeFrom(res); + // res.setContext(propagatedContexts); + // ret.mergeFrom(res); } return ret; }; diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java index dde5340617..bb712d8786 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java @@ -86,29 +86,30 @@ public class SparkResultToCommunityFromProject implements Serializable { ModelSupport.entityTypes .keySet() .parallelStream() + .filter(e -> ModelSupport.isResult(e)) .forEach(e -> { - if (ModelSupport.isResult(e)) { - removeOutputDir(spark, outputPath + e.name()); - Class resultClazz = ModelSupport.entityTypes.get(e); - Dataset result = readPath(spark, inputPath + e.name(), resultClazz); + // if () { + removeOutputDir(spark, outputPath + e.name()); + Class resultClazz = ModelSupport.entityTypes.get(e); + Dataset result = readPath(spark, inputPath + e.name(), resultClazz); - result - .joinWith( - possibleUpdates, - result.col("id").equalTo(possibleUpdates.col("resultId")), - "left_outer") - .map(resultCommunityFn(), Encoders.bean(resultClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + e.name()); + result + .joinWith( + possibleUpdates, + result.col("id").equalTo(possibleUpdates.col("resultId")), + "left_outer") + .map(resultCommunityFn(), Encoders.bean(resultClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + e.name()); - readPath(spark, outputPath + e.name(), resultClazz) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(inputPath + e.name()); - } + readPath(spark, outputPath + e.name(), resultClazz) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(inputPath + e.name()); + // } }); } diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties index 4cb759343c..a84e8ab6b4 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties @@ -1,12 +1,12 @@ -sourcePath=/tmp/beta_provision/graph/09_graph_dedup_enriched -resumeFrom=CountryPropagation +sourcePath=/tmp/beta_provision/graph/10_graph_orcid_enriched +resumeFrom=CommunityOrganization allowedsemrelsorcidprop=isSupplementedBy;isSupplementTo allowedsemrelsresultproject=isSupplementedBy;isSupplementTo allowedsemrelscommunitysemrel=isSupplementedBy;isSupplementTo datasourceWhitelistForCountryPropagation=10|opendoar____::16e6a3326dd7d868cbc926602a61e4d0;10|openaire____::fdb035c8b3e0540a8d9a561a6c44f4de;10|eurocrisdris::fe4903425d9040f680d8610d9079ea14;10|openaire____::5b76240cc27a58c6f7ceef7d8c36660e;10|openaire____::172bbccecf8fca44ab6a6653e84cb92a;10|openaire____::149c6590f8a06b46314eed77bfca693f;10|eurocrisdris::a6026877c1a174d60f81fd71f62df1c1;10|openaire____::4692342f0992d91f9e705c26959f09e0;10|openaire____::8d529dbb05ec0284662b391789e8ae2a;10|openaire____::345c9d171ef3c5d706d08041d506428c;10|opendoar____::1c1d4df596d01da60385f0bb17a4a9e0;10|opendoar____::7a614fd06c325499f1680b9896beedeb;10|opendoar____::1ee3dfcd8a0645a25a35977997223d22;10|opendoar____::d296c101daa88a51f6ca8cfc1ac79b50;10|opendoar____::798ed7d4ee7138d49b8828958048130a;10|openaire____::c9d2209ecc4d45ba7b4ca7597acb88a2;10|eurocrisdris::c49e0fe4b9ba7b7fab717d1f0f0a674d;10|eurocrisdris::9ae43d14471c4b33661fedda6f06b539;10|eurocrisdris::432ca599953ff50cd4eeffe22faf3e48 #allowedtypes=pubsrepository::institutional allowedtypes=Institutional -outputPath=/tmp/miriam/enrichment_one_step +outputPath=/tmp/beta_provision/graph/11_graph_orcid pathMap ={"author":"$['author'][*]['fullname']", \ "title":"$['title'][*]['value']",\ "orcid":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']" ,\ diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml index 8e91707b6e..9b7fad3255 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml @@ -231,7 +231,7 @@ - + diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/input_moveresult_parameters.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/input_moveresult_parameters.json new file mode 100644 index 0000000000..4645be435e --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/input_moveresult_parameters.json @@ -0,0 +1,22 @@ +[ + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + { + "paramName": "ssm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false + } + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromorganization/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromorganization/oozie_app/workflow.xml index 6aeffb4574..18c5f4f0f6 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromorganization/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromorganization/oozie_app/workflow.xml @@ -69,7 +69,7 @@ yarn cluster - community2resultfromorganization-Publication + community2resultfromorganization eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob dhp-enrichment-${projectVersion}.jar @@ -88,6 +88,33 @@ --sourcePath${sourcePath}/ --outputPath${workingDir}/communityorganization/resulttocommunityfromorganization/ + + + + + + + yarn + cluster + community2resultfromorganization - move results + eu.dnetlib.dhp.MoveResult + dhp-enrichment-${projectVersion}.jar + + --executor-cores=6 + --executor-memory=5G + --conf spark.executor.memoryOverhead=3g + --conf spark.sql.shuffle.partitions=3284 + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + + --sourcePath${workingDir}/communityorganization/resulttocommunityfromorganization/ + --outputPath${sourcePath}/ + + diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromproject/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromproject/oozie_app/workflow.xml index dd845064b2..01e366c02f 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromproject/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromproject/oozie_app/workflow.xml @@ -86,12 +86,37 @@ --sourcePath${sourcePath}/ --outputPath${workingDir}/communitythroughproject/ + + + + + + + yarn + cluster + community2resultfromorganization - move results + eu.dnetlib.dhp.MoveResult + dhp-enrichment-${projectVersion}.jar + + --executor-cores=6 + --executor-memory=5G + --conf spark.executor.memoryOverhead=3g + --conf spark.sql.shuffle.partitions=3284 + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + + --sourcePath${workingDir}/communitythroughproject/ + --outputPath${sourcePath}/ + + - - \ No newline at end of file