From 7167673a58ada940619b4a42545bdf3c6e3918b6 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 19 Feb 2020 14:54:18 +0100 Subject: [PATCH] implementation and configuration for propagation of project to result through semantic relation: P -> R1 and R1 -> supplemented by -> R2 => P -> R2 --- .../SparkResultToProjectThroughSemRelJob.java | 120 ++++++++++++++++++ .../input_projecttoresult_parameters.json | 21 +++ .../oozie_app/config-default.xml | 18 +++ .../projecttoresult/oozie_app/workflow.xml | 55 ++++++++ 4 files changed, 214 insertions(+) create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob.java create mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json create mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob.java new file mode 100644 index 000000000..1d99b3ec8 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob.java @@ -0,0 +1,120 @@ +package eu.dnetlib.dhp.projecttoresult; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.TypedRow; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; + +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.PropagationConstant.toPair; + +public class SparkResultToProjectThroughSemRelJob { + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToProjectThroughSemRelJob.class.getResourceAsStream("/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkResultToProjectThroughSemRelJob.class.getSimpleName()) + .master(parser.get("master")) + .enableHiveSupport() + .getOrCreate(); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String outputPath = "/tmp/provision/propagation/projecttoresult"; + + final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); + + File directory = new File(outputPath); + + if (!directory.exists()) { + directory.mkdirs(); + } + + JavaRDD relations = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)).cache(); + + JavaPairRDD result_result = relations + .filter(r -> !r.getDataInfo().getDeletedbyinference()) + .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())) + .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) + .mapToPair(toPair()); + + JavaPairRDD result_project = relations + .filter(r -> !r.getDataInfo().getDeletedbyinference()) + .filter(r -> RELATION_RESULT_PROJECT_REL_CLASS.equals(r.getRelClass()) && RELATION_RESULTPROJECT_REL_TYPE.equals(r.getRelType())) + .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) + .mapToPair(toPair()); + + //relationships from project to result. One pair for each relationship for results having allowed semantics relation with another result + JavaPairRDD project_result = result_project.join(result_result) + .map(c -> { + String projectId = c._2()._1().getTargetId(); + String resultId = c._2()._2().getTargetId(); + return new TypedRow().setSourceId(projectId).setTargetId(resultId); + }) + .mapToPair(toPair()); + + //relationships from project to result. One Pair for each project => project id list of results related to the project + JavaPairRDD project_results = relations + .filter(r -> !r.getDataInfo().getDeletedbyinference()) + .filter(r -> RELATION_PROJECT_RESULT_REL_CLASS.equals(r.getRelClass()) && RELATION_RESULTPROJECT_REL_TYPE.equals(r.getRelType())) + .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) + .mapToPair(toPair()) + .reduceByKey((a, b) -> { + if (a == null) { + return b; + } + if (b == null) { + return a; + } + a.addAll(b.getAccumulator()); + return a; + }); + + + + JavaRDD newRels = project_result.join(project_results) + .flatMap(c -> { + String resId = c._2()._1().getTargetId(); + + if (c._2()._2().getAccumulator().contains(resId)) { + return null; + } + String progId = c._2()._1().getSourceId(); + List rels = new ArrayList(); + + rels.add(getRelation(progId, resId, RELATION_PROJECT_RESULT_REL_CLASS, + RELATION_RESULTPROJECT_REL_TYPE, RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); + rels.add(getRelation(resId, progId, RELATION_RESULT_PROJECT_REL_CLASS, + RELATION_RESULTPROJECT_REL_TYPE, RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); + return rels.iterator(); + }) + .cache(); + + newRels.map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/relation_new"); + + newRels.union(relations).map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/relation"); + + } + + +} diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json new file mode 100644 index 000000000..695dc176c --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json @@ -0,0 +1,21 @@ +[ + { + "paramName":"mt", + "paramLongName":"master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + + { + "paramName":"asr", + "paramLongName":"allowedsemrels", + "paramDescription": "the types of the allowed datasources. Split by ;", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/config-default.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/config-default.xml new file mode 100644 index 000000000..2e0ed9aee --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml new file mode 100644 index 000000000..4c073f0a2 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml @@ -0,0 +1,55 @@ + + + + sourcePath + the source path + + + allowedsemrels + the allowed semantics + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ProjectToResultPropagation + eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob + dhp-propagation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" + --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + + -mt yarn-cluster + --sourcePath${sourcePath} + --allowedsemrels${allowedsemrels} + + + + + + + \ No newline at end of file