From 7501e823ed90cbb5eb61bc16da349c7ecefff6b5 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 13 Apr 2022 17:46:22 +0200 Subject: [PATCH] [Enrichment Step] get rid of hive --- .../PrepareResultOrcidAssociationStep0.java | 90 +++++++++++++++++++ .../PrepareResultOrcidAssociationStep1.java | 21 ++--- .../PrepareResultOrcidAssociationStep2.java | 2 +- ...nput_prepareorcidtoresult0_parameters.json | 26 ++++++ ...input_prepareorcidtoresult_parameters.json | 4 +- .../oozie_app/workflow.xml | 54 ++++++++--- 6 files changed, 168 insertions(+), 29 deletions(-) create mode 100644 dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep0.java create mode 100644 dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult0_parameters.json diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep0.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep0.java new file mode 100644 index 000000000..12808b950 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep0.java @@ -0,0 +1,90 @@ + +package eu.dnetlib.dhp.orcidtoresultfromsemrel; + +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.PropagationConstant.readPath; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Author; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import scala.Tuple2; + +public class PrepareResultOrcidAssociationStep0 implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(PrepareResultOrcidAssociationStep0.class); + + public static void main(String[] args) throws Exception { + String jsonConf = IOUtils + .toString( + PrepareResultOrcidAssociationStep0.class + .getResourceAsStream( + "/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult0_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConf); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final List allowedsemrel = Arrays + .stream(parser.get("allowedsemrels").split(";")) + .map(s -> s.toLowerCase()) + .collect(Collectors.toList()); + + log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel)); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + + selectRelations( + spark, inputPath, outputPath, allowedsemrel); + }); + } + + private static void selectRelations(SparkSession spark, String inputPath, String outputPath, + List allowedsemrel) { + + readPath(spark, inputPath, Relation.class) + .filter( + (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() + && allowedsemrel.contains(r.getRelClass().toLowerCase())) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } + +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java index e1a72a705..d17d87fed 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java @@ -53,7 +53,7 @@ public class PrepareResultOrcidAssociationStep1 { String inputPath = parser.get("sourcePath"); log.info("inputPath: {}", inputPath); - final String outputPath = parser.get("outputPath"); + final String outputPath = parser.get("workingPath"); log.info("outputPath: {}", outputPath); final String resultClassName = parser.get("resultTableName"); @@ -76,15 +76,6 @@ public class PrepareResultOrcidAssociationStep1 { SparkConf conf = new SparkConf(); - String inputRelationPath = inputPath + "/relation"; - log.info("inputRelationPath: {}", inputRelationPath); - - String inputResultPath = inputPath + "/" + resultType; - log.info("inputResultPath: {}", inputResultPath); - - String outputResultPath = outputPath + "/" + resultType; - log.info("outputResultPath: {}", outputResultPath); - runWithSparkSession( conf, isSparkSessionManaged, @@ -112,12 +103,14 @@ public class PrepareResultOrcidAssociationStep1 { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(outputPath + "/relationSubset"); + .json(outputPath + "/" + resultType + "/relationSubset"); Dataset relation = readPath(spark, outputPath + "/relationSubset", Relation.class); log.info("Reading Graph table from: {}", inputResultPath); + final String resultOutputPath = outputPath + "/resultSubset/" + resultType; + readPath(spark, inputResultPath, resultClazz) .filter( (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && !r.getDataInfo().getInvisible()) @@ -135,11 +128,11 @@ public class PrepareResultOrcidAssociationStep1 { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(outputPath + "/resultSubset"); + .json(resultOutputPath); - Dataset result = readPath(spark, outputPath + "/resultSubset", resultClazz); + Dataset result = readPath(spark, resultOutputPath, resultClazz); - result.foreach((ForeachFunction) r -> System.out.println(new ObjectMapper().writeValueAsString(r))); + // result.foreach((ForeachFunction) r -> System.out.println(new ObjectMapper().writeValueAsString(r))); result .joinWith(relation, result.col("id").equalTo(relation.col("source"))) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java index ad59f65cb..29dd67e3f 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java @@ -52,7 +52,7 @@ public class PrepareResultOrcidAssociationStep2 { conf, isSparkSessionManaged, spark -> { - removeOutputDir(spark, outputPath); + // removeOutputDir(spark, outputPath); mergeInfo(spark, inputPath, outputPath); }); } diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult0_parameters.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult0_parameters.json new file mode 100644 index 000000000..78b629ae6 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult0_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName":"as", + "paramLongName":"allowedsemrels", + "paramDescription": "the allowed sematinc relations for propagation", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + { + "paramName": "ssm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters.json index 5354399fb..2818785db 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters.json +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters.json @@ -18,8 +18,8 @@ "paramRequired": true }, { - "paramName": "out", - "paramLongName": "outputPath", + "paramName": "wp", + "paramLongName": "workingPath", "paramDescription": "the path used to store temporary output files", "paramRequired": true }, diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/oozie_app/workflow.xml index aa341aef6..6bdce3cd5 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/oozie_app/workflow.xml @@ -14,7 +14,7 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -80,7 +80,37 @@ - + + + + + yarn + cluster + ORCIDPropagation-PreparePhase0-SelectRELATIONS + eu.dnetlib.dhp.orcidtoresultfromsemrel.PrepareResultOrcidAssociationStep0 + dhp-enrichment-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.dynamicAllocation.enabled=true + --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.speculation=false + --conf spark.hadoop.mapreduce.map.speculative=false + --conf spark.hadoop.mapreduce.reduce.speculative=false + + --sourcePath${sourcePath}/relation + --outputPath${workingDir}/orcidprop/relationSubset + --allowedsemrels${allowedsemrels} + + + + @@ -113,7 +143,7 @@ --sourcePath${sourcePath} --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication - --outputPath${workingDir}/orcidprop/preparedInfo/targetOrcidAssoc + --workingPath${workingDir}/orcidprop --allowedsemrels${allowedsemrels} --allowedpids${allowedpids} @@ -141,7 +171,7 @@ --sourcePath${sourcePath} --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset - --outputPath${workingDir}/orcidprop/preparedInfo/targetOrcidAssoc + --workingPath${workingDir}/orcidprop --allowedsemrels${allowedsemrels} --allowedpids${allowedpids} @@ -169,7 +199,7 @@ --sourcePath${sourcePath} --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --outputPath${workingDir}/orcidprop/preparedInfo/targetOrcidAssoc + --workingPath${workingDir}/orcidprop --allowedsemrels${allowedsemrels} --allowedpids${allowedpids} @@ -197,7 +227,7 @@ --sourcePath${sourcePath} --resultTableNameeu.dnetlib.dhp.schema.oaf.Software - --outputPath${workingDir}/orcidprop/preparedInfo/targetOrcidAssoc + --workingPath${workingDir}/orcidprop --allowedsemrels${allowedsemrels} --allowedpids${allowedpids} @@ -225,8 +255,8 @@ --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} - --sourcePath${workingDir}/orcidprop/preparedInfo/targetOrcidAssoc - --outputPath${workingDir}/orcidprop/preparedInfo/mergedOrcidAssoc + --sourcePath${workingDir}/orcidprop + --outputPath${workingDir}/orcidprop//mergedOrcidAssoc @@ -261,7 +291,7 @@ --conf spark.hadoop.mapreduce.reduce.speculative=false --conf spark.sql.shuffle.partitions=3840 - --possibleUpdatesPath${workingDir}/orcidprop/preparedInfo/mergedOrcidAssoc + --possibleUpdatesPath${workingDir}/orcidprop/mergedOrcidAssoc --sourcePath${sourcePath}/publication --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication --outputPath${outputPath}/publication @@ -291,7 +321,7 @@ --conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false - --possibleUpdatesPath${workingDir}/orcidprop/preparedInfo/mergedOrcidAssoc + --possibleUpdatesPath${workingDir}/orcidprop/mergedOrcidAssoc --sourcePath${sourcePath}/dataset --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset --outputPath${outputPath}/dataset @@ -321,7 +351,7 @@ --conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false - --possibleUpdatesPath${workingDir}/orcidprop/preparedInfo/mergedOrcidAssoc + --possibleUpdatesPath${workingDir}/orcidprop/mergedOrcidAssoc --sourcePath${sourcePath}/otherresearchproduct --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --outputPath${outputPath}/otherresearchproduct @@ -351,7 +381,7 @@ --conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false - --possibleUpdatesPath${workingDir}/orcidprop/preparedInfo/mergedOrcidAssoc + --possibleUpdatesPath${workingDir}/orcidprop/mergedOrcidAssoc --sourcePath${sourcePath}/software --resultTableNameeu.dnetlib.dhp.schema.oaf.Software --outputPath${outputPath}/software