From 7167673a58ada940619b4a42545bdf3c6e3918b6 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 19 Feb 2020 14:54:18 +0100 Subject: [PATCH 1/2] implementation and configuration for propagation of project to result through semantic relation: P -> R1 and R1 -> supplemented by -> R2 => P -> R2 --- .../SparkResultToProjectThroughSemRelJob.java | 120 ++++++++++++++++++ .../input_projecttoresult_parameters.json | 21 +++ .../oozie_app/config-default.xml | 18 +++ .../projecttoresult/oozie_app/workflow.xml | 55 ++++++++ 4 files changed, 214 insertions(+) create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob.java create mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json create mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob.java new file mode 100644 index 000000000..1d99b3ec8 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob.java @@ -0,0 +1,120 @@ +package eu.dnetlib.dhp.projecttoresult; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.TypedRow; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; + +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.PropagationConstant.toPair; + +public class SparkResultToProjectThroughSemRelJob { + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToProjectThroughSemRelJob.class.getResourceAsStream("/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkResultToProjectThroughSemRelJob.class.getSimpleName()) + .master(parser.get("master")) + .enableHiveSupport() + .getOrCreate(); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String outputPath = "/tmp/provision/propagation/projecttoresult"; + + final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); + + File directory = new File(outputPath); + + if (!directory.exists()) { + directory.mkdirs(); + } + + JavaRDD relations = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)).cache(); + + JavaPairRDD result_result = relations + .filter(r -> !r.getDataInfo().getDeletedbyinference()) + .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())) + .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) + .mapToPair(toPair()); + + JavaPairRDD result_project = relations + .filter(r -> !r.getDataInfo().getDeletedbyinference()) + .filter(r -> RELATION_RESULT_PROJECT_REL_CLASS.equals(r.getRelClass()) && RELATION_RESULTPROJECT_REL_TYPE.equals(r.getRelType())) + .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) + .mapToPair(toPair()); + + //relationships from project to result. One pair for each relationship for results having allowed semantics relation with another result + JavaPairRDD project_result = result_project.join(result_result) + .map(c -> { + String projectId = c._2()._1().getTargetId(); + String resultId = c._2()._2().getTargetId(); + return new TypedRow().setSourceId(projectId).setTargetId(resultId); + }) + .mapToPair(toPair()); + + //relationships from project to result. One Pair for each project => project id list of results related to the project + JavaPairRDD project_results = relations + .filter(r -> !r.getDataInfo().getDeletedbyinference()) + .filter(r -> RELATION_PROJECT_RESULT_REL_CLASS.equals(r.getRelClass()) && RELATION_RESULTPROJECT_REL_TYPE.equals(r.getRelType())) + .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) + .mapToPair(toPair()) + .reduceByKey((a, b) -> { + if (a == null) { + return b; + } + if (b == null) { + return a; + } + a.addAll(b.getAccumulator()); + return a; + }); + + + + JavaRDD newRels = project_result.join(project_results) + .flatMap(c -> { + String resId = c._2()._1().getTargetId(); + + if (c._2()._2().getAccumulator().contains(resId)) { + return null; + } + String progId = c._2()._1().getSourceId(); + List rels = new ArrayList(); + + rels.add(getRelation(progId, resId, RELATION_PROJECT_RESULT_REL_CLASS, + RELATION_RESULTPROJECT_REL_TYPE, RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); + rels.add(getRelation(resId, progId, RELATION_RESULT_PROJECT_REL_CLASS, + RELATION_RESULTPROJECT_REL_TYPE, RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); + return rels.iterator(); + }) + .cache(); + + newRels.map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/relation_new"); + + newRels.union(relations).map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/relation"); + + } + + +} diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json new file mode 100644 index 000000000..695dc176c --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json @@ -0,0 +1,21 @@ +[ + { + "paramName":"mt", + "paramLongName":"master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + + { + "paramName":"asr", + "paramLongName":"allowedsemrels", + "paramDescription": "the types of the allowed datasources. Split by ;", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/config-default.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/config-default.xml new file mode 100644 index 000000000..2e0ed9aee --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml new file mode 100644 index 000000000..4c073f0a2 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml @@ -0,0 +1,55 @@ + + + + sourcePath + the source path + + + allowedsemrels + the allowed semantics + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ProjectToResultPropagation + eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob + dhp-propagation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" + --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + + -mt yarn-cluster + --sourcePath${sourcePath} + --allowedsemrels${allowedsemrels} + + + + + + + \ No newline at end of file From 8aa3b4d7c048592668f5b3dac2ee3437d1ce7c90 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 19 Feb 2020 14:55:54 +0100 Subject: [PATCH 2/2] adding to propagation constants the ones needed for propagation of project to result and addition of new accumulator Set in typed row to collect values of a type --- .../eu/dnetlib/dhp/PropagationConstant.java | 11 ++++++++ .../main/java/eu/dnetlib/dhp/TypedRow.java | 26 ++++++++++--------- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/PropagationConstant.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/PropagationConstant.java index 6957490bc..7cd8c54d1 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/PropagationConstant.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/PropagationConstant.java @@ -23,6 +23,9 @@ public class PropagationConstant { public final static String PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME = "Propagation of country to result collected from datasources of type institutional repositories"; public final static String PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID = "result:organization:instrepo"; public final static String PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME = "Propagation of affiliation to result collected from datasources of type institutional repository"; + public final static String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID = "result:project:semrel"; + public final static String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME = "Propagation of result to project through semantic relation"; + public final static String RELATION_DATASOURCEORGANIZATION_REL_TYPE = "datasourceOrganization"; public final static String RELATION_DATASOURCEORGANIZATION_SUBREL_TYPE = "provision"; @@ -34,6 +37,14 @@ public class PropagationConstant { public final static String RELATION_ORGANIZATION_RESULT_REL_CLASS = "isAuthorInstitutionOf"; public final static String RELATION_RESULT_ORGANIZATION_REL_CLASS = "hasAuthorInstitution"; + public static final String RELATION_RESULTRESULT_REL_TYPE = "resultResult"; + public static final String RELATION_RESULTRESULT_SUBREL_TYPE = "supplement"; + + public static final String RELATION_RESULTPROJECT_REL_TYPE = "resultProject"; + public static final String RELATION_RESULTPROJECT_SUBREL_TYPE = "outcome"; + public static final String RELATION_RESULT_PROJECT_REL_CLASS = "isProducedBy"; + public static final String RELATION_PROJECT_RESULT_REL_CLASS = "produces"; + public static Country getCountry(String country){ Country nc = new Country(); diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/TypedRow.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/TypedRow.java index d08b0e33d..13f3ca611 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/TypedRow.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/TypedRow.java @@ -2,31 +2,33 @@ package eu.dnetlib.dhp; import java.io.Serializable; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; +import java.util.*; public class TypedRow implements Serializable { private String sourceId; private String targetId; private String type; - private String country; + private String value; + private Set accumulator; - public List getAccumulator() { + public Set getAccumulator() { return accumulator; } - public TypedRow setAccumulator(List accumulator) { + public TypedRow setAccumulator(Set accumulator) { this.accumulator = accumulator; return this; } - private List accumulator; + + public void addAll(Set toadd){ + this.accumulator.addAll(toadd); + } public void add(String a){ if (accumulator == null){ - accumulator = new ArrayList<>(); + accumulator = new HashSet<>(); } accumulator.add(a); } @@ -35,12 +37,12 @@ public class TypedRow implements Serializable { return accumulator.iterator(); } - public String getCountry() { - return country; + public String getValue() { + return value; } - public TypedRow setCountry(String country) { - this.country = country; + public TypedRow setValue(String value) { + this.value = value; return this; }