diff --git a/dhp-workflows/dhp-enrichment/pom.xml b/dhp-workflows/dhp-enrichment/pom.xml
index 644ac21409..0b4269acd0 100644
--- a/dhp-workflows/dhp-enrichment/pom.xml
+++ b/dhp-workflows/dhp-enrichment/pom.xml
@@ -48,6 +48,12 @@
io.github.classgraph
classgraph
+
+ eu.dnetlib.dhp
+ dhp-aggregation
+ 1.2.4-SNAPSHOT
+ compile
+
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/KeyValueSet.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/KeyValueSet.java
new file mode 100644
index 0000000000..57ab716b33
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/KeyValueSet.java
@@ -0,0 +1,26 @@
+
+package eu.dnetlib.dhp;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+public class KeyValueSet implements Serializable {
+ private String key;
+ private ArrayList valueSet;
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public ArrayList getValueSet() {
+ return valueSet;
+ }
+
+ public void setValueSet(ArrayList valueSet) {
+ this.valueSet = valueSet;
+ }
+}
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java
index 392a5ab44f..02fdcb09b8 100644
--- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java
@@ -1,6 +1,7 @@
package eu.dnetlib.dhp;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -55,6 +56,9 @@ public class PropagationConstant {
public static final String PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID = "result:organization:instrepo";
public static final String PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME = "Propagation of affiliation to result collected from datasources of type institutional repository";
+ public static final String PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_ID = "result:organization:semrel";
+ public static final String PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_NAME = "Propagation of affiliation to result through sematic relations";
+
public static final String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID = "result:project:semrel";
public static final String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME = "Propagation of result to project through semantic relation";
@@ -67,6 +71,13 @@ public class PropagationConstant {
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID = "authorpid:result";
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME = "Propagation of authors pid to result through semantic relations";
+ public static final String ITERATION_ONE = "ExitAtFirstIteration";
+ public static final String ITERATION_TWO = "ExitAtSecondIteration";
+ public static final String ITERATION_THREE = "ExitAtThirdIteration";
+ public static final String ITERATION_FOUR = "ExitAtFourthIteration";
+ public static final String ITERATION_FIVE = "ExitAtFifthIteration";
+ public static final String ITERATION_NO_PARENT = "ExitAtNoFirstParentReached";
+
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String cfHbforResultQuery = "select distinct r.id, inst.collectedfrom.key cf, inst.hostedby.key hb "
@@ -127,6 +138,39 @@ public class PropagationConstant {
return pa;
}
+ public static ArrayList getOrganizationRelationPair(String orgId,
+ String resultId,
+ String classID,
+ String className
+
+ ) {
+ ArrayList newRelations = new ArrayList();
+ newRelations
+ .add(
+ getRelation(
+ orgId,
+ resultId,
+ ModelConstants.IS_AUTHOR_INSTITUTION_OF,
+ ModelConstants.RESULT_ORGANIZATION,
+ ModelConstants.AFFILIATION,
+ PROPAGATION_DATA_INFO_TYPE,
+ classID,
+ className));
+ newRelations
+ .add(
+ getRelation(
+ resultId,
+ orgId,
+ ModelConstants.HAS_AUTHOR_INSTITUTION,
+ ModelConstants.RESULT_ORGANIZATION,
+ ModelConstants.AFFILIATION,
+ PROPAGATION_DATA_INFO_TYPE,
+ classID,
+ className));
+
+ return newRelations;
+ }
+
public static Relation getRelation(
String source,
String target,
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java
index 0ef5ca1810..50ab997b68 100644
--- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java
@@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.KeyValueSet;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Datasource;
@@ -124,7 +125,7 @@ public class PrepareResultInstRepoAssociation {
private static void prepareAlreadyLinkedAssociation(
SparkSession spark, String alreadyLinkedPath) {
- String query = "Select source resultId, collect_set(target) organizationSet "
+ String query = "Select source key, collect_set(target) valueSet "
+ "from relation "
+ "where datainfo.deletedbyinference = false "
+ "and lower(relClass) = '"
@@ -134,7 +135,7 @@ public class PrepareResultInstRepoAssociation {
spark
.sql(query)
- .as(Encoders.bean(ResultOrganizationSet.class))
+ .as(Encoders.bean(KeyValueSet.class))
// TODO retry to stick with datasets
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/ResultOrganizationSet.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/ResultOrganizationSet.java
deleted file mode 100644
index 3bce14cdbf..0000000000
--- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/ResultOrganizationSet.java
+++ /dev/null
@@ -1,26 +0,0 @@
-
-package eu.dnetlib.dhp.resulttoorganizationfrominstrepo;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-
-public class ResultOrganizationSet implements Serializable {
- private String resultId;
- private ArrayList organizationSet;
-
- public String getResultId() {
- return resultId;
- }
-
- public void setResultId(String resultId) {
- this.resultId = resultId;
- }
-
- public ArrayList getOrganizationSet() {
- return organizationSet;
- }
-
- public void setOrganizationSet(ArrayList organizationSet) {
- this.organizationSet = organizationSet;
- }
-}
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java
index 63824f1a81..0757ebccd4 100644
--- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java
@@ -18,6 +18,7 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import eu.dnetlib.dhp.KeyValueSet;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
@@ -28,7 +29,7 @@ public class SparkResultToOrganizationFromIstRepoJob {
private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromIstRepoJob.class);
- private static final String RESULT_ORGANIZATIONSET_QUERY = "SELECT id resultId, collect_set(organizationId) organizationSet "
+ private static final String RESULT_ORGANIZATIONSET_QUERY = "SELECT id key, collect_set(organizationId) valueSet "
+ "FROM ( SELECT id, organizationId "
+ "FROM rels "
+ "JOIN cfhb "
@@ -107,14 +108,14 @@ public class SparkResultToOrganizationFromIstRepoJob {
Dataset dsOrg = readPath(spark, datasourceorganization, DatasourceOrganization.class);
- Dataset potentialUpdates = getPotentialRelations(spark, inputPath, clazz, dsOrg);
+ Dataset potentialUpdates = getPotentialRelations(spark, inputPath, clazz, dsOrg);
- Dataset alreadyLinked = readPath(spark, alreadyLinkedPath, ResultOrganizationSet.class);
+ Dataset alreadyLinked = readPath(spark, alreadyLinkedPath, KeyValueSet.class);
potentialUpdates
.joinWith(
alreadyLinked,
- potentialUpdates.col("resultId").equalTo(alreadyLinked.col("resultId")),
+ potentialUpdates.col("key").equalTo(alreadyLinked.col("key")),
"left_outer")
.flatMap(createRelationFn(), Encoders.bean(Relation.class))
.write()
@@ -123,49 +124,34 @@ public class SparkResultToOrganizationFromIstRepoJob {
.json(outputPath);
}
- private static FlatMapFunction, Relation> createRelationFn() {
+ private static FlatMapFunction, Relation> createRelationFn() {
return value -> {
List newRelations = new ArrayList<>();
- ResultOrganizationSet potentialUpdate = value._1();
- Optional alreadyLinked = Optional.ofNullable(value._2());
- List organizations = potentialUpdate.getOrganizationSet();
+ KeyValueSet potentialUpdate = value._1();
+ Optional alreadyLinked = Optional.ofNullable(value._2());
+ List organizations = potentialUpdate.getValueSet();
alreadyLinked
.ifPresent(
resOrg -> resOrg
- .getOrganizationSet()
+ .getValueSet()
.forEach(organizations::remove));
- String resultId = potentialUpdate.getResultId();
+ String resultId = potentialUpdate.getKey();
organizations
.forEach(
- orgId -> {
- newRelations
- .add(
- getRelation(
- orgId,
- resultId,
- ModelConstants.IS_AUTHOR_INSTITUTION_OF,
- ModelConstants.RESULT_ORGANIZATION,
- ModelConstants.AFFILIATION,
- PROPAGATION_DATA_INFO_TYPE,
- PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID,
- PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME));
- newRelations
- .add(
- getRelation(
- resultId,
- orgId,
- ModelConstants.HAS_AUTHOR_INSTITUTION,
- ModelConstants.RESULT_ORGANIZATION,
- ModelConstants.AFFILIATION,
- PROPAGATION_DATA_INFO_TYPE,
- PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID,
- PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME));
- });
+ orgId -> newRelations
+ .addAll(
+ getOrganizationRelationPair(
+ orgId,
+ resultId,
+ PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID,
+ PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME))
+
+ );
return newRelations.iterator();
};
}
- private static Dataset getPotentialRelations(
+ private static Dataset getPotentialRelations(
SparkSession spark,
String inputPath,
Class resultClazz,
@@ -179,7 +165,7 @@ public class SparkResultToOrganizationFromIstRepoJob {
return spark
.sql(RESULT_ORGANIZATIONSET_QUERY)
- .as(Encoders.bean(ResultOrganizationSet.class));
+ .as(Encoders.bean(KeyValueSet.class));
}
}
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/Leaves.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/Leaves.java
new file mode 100644
index 0000000000..7984721e8c
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/Leaves.java
@@ -0,0 +1,16 @@
+
+package eu.dnetlib.dhp.resulttoorganizationfromsemrel;
+
+import java.io.Serializable;
+
+public class Leaves implements Serializable {
+ private String value;
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+}
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java
new file mode 100644
index 0000000000..707462f24e
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java
@@ -0,0 +1,154 @@
+
+package eu.dnetlib.dhp.resulttoorganizationfromsemrel;
+
+import static eu.dnetlib.dhp.PropagationConstant.*;
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
+
+import java.io.Serializable;
+import java.util.*;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.sql.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.KeyValueSet;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import scala.Tuple2;
+
+/**
+ * Searches for all the association between result and organization already existing in the graph
+ * Creates also the parenthood hierarchy from the organizations
+ */
+
+public class PrepareInfo implements Serializable {
+
+ private static final Logger log = LoggerFactory.getLogger(PrepareInfo.class);
+
+ // associate orgs with all their parent
+ private static final String ORGANIZATION_ORGANIZATION_QUERY = "SELECT target key, collect_set(source) as valueSet " +
+ "FROM relation " +
+ "WHERE lower(relclass) = '" + ModelConstants.IS_PARENT_OF.toLowerCase() +
+ "' and datainfo.deletedbyinference = false " +
+ "GROUP BY target";
+
+ //associates results with all the orgs they are affiliated to
+ private static final String RESULT_ORGANIZATION_QUERY = "SELECT source key, collect_set(target) as valueSet " +
+ "FROM relation " +
+ "WHERE lower(relclass) = '" + ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase() +
+ "' and datainfo.deletedbyinference = false " +
+ "GROUP BY source";
+
+ public static void main(String[] args) throws Exception {
+
+ String jsonConfiguration = IOUtils
+ .toString(
+ SparkResultToOrganizationFromIstRepoJob.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_preparation_parameter.json"));
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ String graphPath = parser.get("graphPath");
+ log.info("graphPath: {}", graphPath);
+
+ final String leavesPath = parser.get("leavesPath");
+ log.info("leavesPath: {}", leavesPath);
+
+ final String childParentPath = parser.get("childParentPath");
+ log.info("childParentPath: {}", childParentPath);
+
+ final String resultOrganizationPath = parser.get("resultOrgPath");
+ log.info("resultOrganizationPath: {}", resultOrganizationPath);
+
+ final String relationPath = parser.get("relationPath");
+ log.info("relationPath: {}", relationPath);
+
+ SparkConf conf = new SparkConf();
+ conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
+
+ runWithSparkHiveSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> prepareInfo(
+ spark,
+ graphPath,
+ childParentPath,
+ leavesPath,
+ resultOrganizationPath,
+ relationPath));
+ }
+
+ private static void prepareInfo(SparkSession spark, String inputPath, String childParentOrganizationPath,
+ String currentIterationPath, String resultOrganizationPath, String relationPath) {
+ Dataset relation = readPath(spark, inputPath + "/relation", Relation.class);
+ relation.createOrReplaceTempView("relation");
+
+ spark
+ .sql(ORGANIZATION_ORGANIZATION_QUERY)
+ .as(Encoders.bean(KeyValueSet.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(childParentOrganizationPath);
+
+ spark
+ .sql(RESULT_ORGANIZATION_QUERY)
+ .as(Encoders.bean(KeyValueSet.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(resultOrganizationPath);
+
+ relation
+ .filter(
+ (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() &&
+ r.getRelClass().equals(ModelConstants.HAS_AUTHOR_INSTITUTION))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression","gzip")
+ .json(relationPath);
+
+ Dataset children = spark
+ .sql(
+ "Select distinct target as child from relation where " +
+ "lower(relclass)='" + ModelConstants.IS_PARENT_OF.toLowerCase() +
+ "' and datainfo.deletedbyinference = false")
+ .as(Encoders.STRING());
+
+ Dataset parent = spark
+ .sql(
+ "Select distinct source as parent from relation " +
+ "where lower(relclass)='" + ModelConstants.IS_PARENT_OF.toLowerCase() +
+ "' and datainfo.deletedbyinference = false")
+ .as(Encoders.STRING());
+
+ // takes from the join the entities having only the left hand side: the leaves. Saves them
+ children
+ .joinWith(parent, children.col("child").equalTo(parent.col("parent")), "left")
+ .map((MapFunction, String>) value -> {
+ if (Optional.ofNullable(value._2()).isPresent()) {
+ return null;
+ }
+
+ return value._1();
+ }, Encoders.STRING())
+ .filter(Objects::nonNull)
+ .write()
+ .mode(SaveMode.Overwrite)
+ .json(currentIterationPath);
+ }
+
+}
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PropagationCounter.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PropagationCounter.java
new file mode 100644
index 0000000000..788eff0e3e
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PropagationCounter.java
@@ -0,0 +1,77 @@
+
+package eu.dnetlib.dhp.resulttoorganizationfromsemrel;
+
+import java.io.Serializable;
+
+import org.apache.spark.util.LongAccumulator;
+
+public class PropagationCounter implements Serializable {
+ private LongAccumulator iterationOne;
+ private LongAccumulator iterationTwo;
+ private LongAccumulator iterationThree;
+ private LongAccumulator iterationFour;
+ private LongAccumulator iterationFive;
+ private LongAccumulator notReachedFirstParent;
+
+ public PropagationCounter() {
+ }
+
+ public PropagationCounter(LongAccumulator iterationOne, LongAccumulator iterationTwo,
+ LongAccumulator iterationThree, LongAccumulator iterationFour, LongAccumulator iterationFive,
+ LongAccumulator notReachedFirstParent) {
+ this.iterationOne = iterationOne;
+ this.iterationTwo = iterationTwo;
+ this.iterationThree = iterationThree;
+ this.iterationFour = iterationFour;
+ this.iterationFive = iterationFive;
+ this.notReachedFirstParent = notReachedFirstParent;
+ }
+
+ public LongAccumulator getIterationOne() {
+ return iterationOne;
+ }
+
+ public void setIterationOne(LongAccumulator iterationOne) {
+ this.iterationOne = iterationOne;
+ }
+
+ public LongAccumulator getIterationTwo() {
+ return iterationTwo;
+ }
+
+ public void setIterationTwo(LongAccumulator iterationTwo) {
+ this.iterationTwo = iterationTwo;
+ }
+
+ public LongAccumulator getIterationThree() {
+ return iterationThree;
+ }
+
+ public void setIterationThree(LongAccumulator iterationThree) {
+ this.iterationThree = iterationThree;
+ }
+
+ public LongAccumulator getIterationFour() {
+ return iterationFour;
+ }
+
+ public void setIterationFour(LongAccumulator iterationFour) {
+ this.iterationFour = iterationFour;
+ }
+
+ public LongAccumulator getIterationFive() {
+ return iterationFive;
+ }
+
+ public void setIterationFive(LongAccumulator iterationFive) {
+ this.iterationFive = iterationFive;
+ }
+
+ public LongAccumulator getNotReachedFirstParent() {
+ return notReachedFirstParent;
+ }
+
+ public void setNotReachedFirstParent(LongAccumulator notReachedFirstParent) {
+ this.notReachedFirstParent = notReachedFirstParent;
+ }
+}
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java
new file mode 100644
index 0000000000..e26276f53f
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java
@@ -0,0 +1,206 @@
+
+package eu.dnetlib.dhp.resulttoorganizationfromsemrel;
+
+import static eu.dnetlib.dhp.PropagationConstant.*;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.MapGroupsFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.util.LongAccumulator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.dhp.KeyValueSet;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+
+
+public class SparkResultToOrganizationFromSemRel implements Serializable {
+ private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromSemRel.class);
+ private static final int MAX_ITERATION = 5;
+ public static final String NEW_RELATION_PATH = "/newRelation";
+
+ public static void main(String[] args) throws Exception {
+
+ String jsonConfiguration = IOUtils
+ .toString(
+ SparkResultToOrganizationFromIstRepoJob.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_propagation_parameter.json"));
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ String relationPath = parser.get("relationPath");
+ log.info("relationPath: {}", relationPath);
+
+ final String outputPath = parser.get("outputPath");
+ log.info("outputPath: {}", outputPath);
+
+ final String leavesPath = parser.get("leavesPath");
+ log.info("leavesPath: {}", leavesPath);
+
+ final String childParentPath = parser.get("childParentPath");
+ log.info("childParentPath: {}", childParentPath);
+
+ final String resultOrganizationPath = parser.get("resultOrgPath");
+ log.info("resultOrganizationPath: {}", resultOrganizationPath);
+
+ final String workingPath = parser.get("workingDir");
+ log.info("workingPath: {}", workingPath);
+
+ SparkConf conf = new SparkConf();
+ conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
+
+ runWithSparkHiveSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> execPropagation(
+ spark,
+ leavesPath,
+ childParentPath,
+ resultOrganizationPath,
+ relationPath,
+ workingPath,
+ outputPath));
+ }
+
+ public static void execPropagation(SparkSession spark,
+ String leavesPath,
+ String childParentPath,
+ String resultOrganizationPath,
+ String graphPath,
+ String workingPath,
+ String outputPath) {
+
+ final LongAccumulator iterationOne = spark.sparkContext().longAccumulator(ITERATION_ONE);
+ final LongAccumulator iterationTwo = spark.sparkContext().longAccumulator(ITERATION_TWO);
+ final LongAccumulator iterationThree = spark.sparkContext().longAccumulator(ITERATION_THREE);
+ final LongAccumulator iterationFour = spark.sparkContext().longAccumulator(ITERATION_FOUR);
+ final LongAccumulator iterationFive = spark.sparkContext().longAccumulator(ITERATION_FIVE);
+ final LongAccumulator notReachedFirstParent = spark.sparkContext().longAccumulator(ITERATION_NO_PARENT);
+
+ final PropagationCounter propagationCounter = new PropagationCounter(iterationOne,
+ iterationTwo,
+ iterationThree,
+ iterationFour,
+ iterationFive,
+ notReachedFirstParent);
+
+ doPropagate(
+ spark, leavesPath, childParentPath, resultOrganizationPath, graphPath,
+ workingPath, outputPath, propagationCounter);
+
+ }
+
+ private static void doPropagate(SparkSession spark, String leavesPath, String childParentPath,
+ String resultOrganizationPath, String graphPath, String workingPath, String outputPath,
+ PropagationCounter propagationCounter) {
+ int iteration = 0;
+ long leavesCount;
+
+ do {
+ iteration++;
+ StepActions
+ .execStep(
+ spark, graphPath, workingPath + NEW_RELATION_PATH,
+ leavesPath, childParentPath, resultOrganizationPath);
+ StepActions
+ .prepareForNextStep(
+ spark, workingPath + NEW_RELATION_PATH, resultOrganizationPath, leavesPath,
+ childParentPath, workingPath + "/leaves", workingPath + "/resOrg");
+ moveOutput(spark, workingPath, leavesPath, resultOrganizationPath);
+ leavesCount = readPath(spark, leavesPath, Leaves.class).count();
+ } while (leavesCount > 0 && iteration < MAX_ITERATION);
+
+ if (leavesCount == 0) {
+ switch (String.valueOf(iteration)) {
+ case "1":
+ propagationCounter.getIterationOne().add(1);
+ break;
+ case "2":
+ propagationCounter.getIterationTwo().add(1);
+ break;
+ case "3":
+ propagationCounter.getIterationThree().add(1);
+ break;
+ case "4":
+ propagationCounter.getIterationFour().add(1);
+ break;
+ case "5":
+ propagationCounter.getIterationFive().add(1);
+ break;
+ default:
+ break;
+ }
+ } else {
+ propagationCounter.getNotReachedFirstParent().add(1);
+ }
+
+ addNewRelations(spark, workingPath + NEW_RELATION_PATH, outputPath);
+ }
+
+ private static void moveOutput(SparkSession spark, String workingPath, String leavesPath,
+ String resultOrganizationPath) {
+ readPath(spark, workingPath + "/leaves", Leaves.class)
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(leavesPath);
+
+ readPath(spark, workingPath + "/resOrg", KeyValueSet.class)
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(resultOrganizationPath);
+
+ }
+
+ private static void addNewRelations(SparkSession spark, String newRelationPath, String outputPath) {
+ Dataset relation = readPath(spark, newRelationPath, Relation.class);
+
+ relation
+ .groupByKey((MapFunction) r -> r.getSource() + r.getTarget(), Encoders.STRING())
+ .mapGroups(
+ (MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Relation.class))
+ .flatMap(
+ (FlatMapFunction) r -> Arrays
+ .asList(
+ r, getRelation(
+ r.getTarget(), r.getSource(), ModelConstants.IS_AUTHOR_INSTITUTION_OF,
+ ModelConstants.RESULT_ORGANIZATION,
+ ModelConstants.AFFILIATION,
+ PROPAGATION_DATA_INFO_TYPE,
+ PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_ID,
+ PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_NAME))
+ .iterator()
+
+ , Encoders.bean(Relation.class))
+ .write()
+
+ .mode(SaveMode.Append)
+ .option("compression", "gzip")
+ .json(outputPath);
+ }
+
+
+
+}
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java
new file mode 100644
index 0000000000..02444cb152
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java
@@ -0,0 +1,206 @@
+
+package eu.dnetlib.dhp.resulttoorganizationfromsemrel;
+
+import static eu.dnetlib.dhp.PropagationConstant.*;
+import static eu.dnetlib.dhp.PropagationConstant.readPath;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.KeyValueSet;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import scala.Tuple2;
+
+public class StepActions implements Serializable {
+
+ public static void execStep(SparkSession spark,
+ String graphPath, String newRelationPath,
+ String leavesPath, String chldParentOrgPath, String resultOrgPath) {
+
+ Dataset relationGraph = readPath(spark, graphPath, Relation.class);
+ // select only the relation source target among those proposed by propagation that are not already existent
+ getNewRels(
+ newRelationPath, relationGraph,
+ getPropagationRelation(spark, leavesPath, chldParentOrgPath, resultOrgPath));
+
+ }
+
+ public static void prepareForNextStep(SparkSession spark, String selectedRelsPath, String resultOrgPath,
+ String leavesPath, String chldParentOrgPath, String leavesOutputPath,
+ String orgOutputPath) {
+ // use of the parents as new leaves set
+ changeLeavesSet(spark, leavesPath, chldParentOrgPath, leavesOutputPath);
+
+ // add the new relations obtained from propagation to the keyvalueset result organization
+ updateResultOrganization(
+ spark, resultOrgPath, readPath(spark, selectedRelsPath, Relation.class), orgOutputPath);
+ }
+
+ private static void updateResultOrganization(SparkSession spark, String resultOrgPath,
+ Dataset selectedRels, String outputPath) {
+ Dataset resultOrg = readPath(spark, resultOrgPath, KeyValueSet.class);
+ resultOrg
+ .joinWith(
+ selectedRels, resultOrg
+ .col("key")
+ .equalTo(selectedRels.col("source")),
+ "left")
+ .groupByKey((MapFunction, String>) mf -> mf._1().getKey(), Encoders.STRING())
+ .mapGroups((MapGroupsFunction, KeyValueSet>) (key, it) -> {
+ Tuple2 first = it.next();
+ if (!Optional.ofNullable(first._2()).isPresent()) {
+ return first._1();
+ }
+ KeyValueSet ret = new KeyValueSet();
+ ret.setKey(first._1().getKey());
+ HashSet hs = new HashSet<>();
+ hs.addAll(first._1().getValueSet());
+ hs.add(first._2().getTarget());
+ it.forEachRemaining(rel -> hs.add(rel._2().getTarget()));
+ ArrayList orgs = new ArrayList<>();
+ orgs.addAll(hs);
+ ret.setValueSet(orgs);
+ return ret;
+ }, Encoders.bean(KeyValueSet.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(outputPath);
+ }
+
+ private static void changeLeavesSet(SparkSession spark, String leavesPath, String chldParentOrgPath,
+ String leavesOutputPath) {
+ Dataset childParent = readPath(spark, chldParentOrgPath, KeyValueSet.class);
+ Dataset leaves = readPath(spark, leavesPath, Leaves.class);
+
+ childParent.createOrReplaceTempView("childParent");
+ leaves.createOrReplaceTempView("leaves");
+
+ spark
+ .sql(
+ "SELECT distinct parent as value " +
+ "FROM leaves " +
+ "JOIN (SELECT key, parent " +
+ " FROM childParent " +
+ " LATERAL VIEW explode(valueSet) kv as parent) tmp " +
+ "ON value = key ")
+ .as(Encoders.bean(Leaves.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(leavesOutputPath);
+ }
+
+ @NotNull
+ private static void getNewRels(String newRelationPath, Dataset relationDataset,
+ Dataset newRels) {
+ // selects new, not already existent relations
+ // union of new propagation relations to the relation set
+ // grouping from sourcetarget (we are sure the only relations are those from result to organization by
+ // construction of the set)
+ // if at least one relation in the set was not produced by propagation no new relation will be returned
+
+ relationDataset
+ .union(newRels)
+ .groupByKey((MapFunction) r -> r.getSource() + r.getTarget(), Encoders.STRING())
+ .mapGroups((MapGroupsFunction) (k, it) -> {
+
+ ArrayList relationList = new ArrayList<>();
+ relationList.add(it.next());
+ it.forEachRemaining(rel -> relationList.add(rel));
+
+ if (relationList
+ .stream()
+ .filter(
+ rel -> !rel
+ .getDataInfo()
+ .getProvenanceaction()
+ .getClassid()
+ .equals(PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_ID))
+ .count() > 0) {
+ return null;
+ }
+
+ return new ObjectMapper().writeValueAsString(relationList.get(0));
+
+ }, Encoders.STRING())
+ .filter(Objects::nonNull)
+ .map(
+ (MapFunction) r -> new ObjectMapper().readValue(r, Relation.class),
+ Encoders.bean(Relation.class))
+ .write()
+ .mode(SaveMode.Append)
+ .option("compression", "gzip")
+ .json(newRelationPath);
+
+ }
+
+ // get the possible relations from propagation
+ private static Dataset getPropagationRelation(SparkSession spark,
+ String leavesPath,
+ String chldParentOrgPath,
+ String resultOrgPath) {
+
+ Dataset childParent = readPath(spark, chldParentOrgPath, KeyValueSet.class);
+ Dataset resultOrg = readPath(spark, resultOrgPath, KeyValueSet.class);
+ Dataset leaves = readPath(spark, leavesPath, Leaves.class);
+
+ childParent.createOrReplaceTempView("childParent");
+ resultOrg.createOrReplaceTempView("resultOrg");
+ leaves.createOrReplaceTempView("leaves");
+
+ Dataset resultParent = spark
+ .sql(
+ "SELECT resId as key, " +
+ "collect_set(parent) valueSet " +
+ "FROM (SELECT key as child, parent " +
+ " FROM childParent " +
+ " LATERAL VIEW explode(valueSet) ks as parent) as cp " +
+ "JOIN leaves " +
+ "ON leaves.value = cp.child " +
+ "JOIN (" +
+ "SELECT key as resId, org " +
+ "FROM resultOrg " +
+ "LATERAL VIEW explode (valueSet) ks as org ) as ro " +
+ "ON leaves.value = ro.org " +
+ "GROUP BY resId")
+ .as(Encoders.bean(KeyValueSet.class));
+
+
+ // create new relations from result to organization for each result linked to a leaf
+ return resultParent
+ .flatMap(
+ (FlatMapFunction) v -> v
+ .getValueSet()
+ .stream()
+ .map(
+ orgId -> getRelation(
+ v.getKey(),
+ orgId,
+ ModelConstants.HAS_AUTHOR_INSTITUTION,
+ ModelConstants.RESULT_ORGANIZATION,
+ ModelConstants.AFFILIATION,
+ PROPAGATION_DATA_INFO_TYPE,
+ PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_ID,
+ PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_NAME))
+ .collect(Collectors.toList())
+ .iterator(),
+ Encoders.bean(Relation.class));
+
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_preparation_parameter.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_preparation_parameter.json
new file mode 100644
index 0000000000..c79bfe05d8
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_preparation_parameter.json
@@ -0,0 +1,44 @@
+[
+ {
+ "paramName":"gp",
+ "paramLongName":"graphPath",
+ "paramDescription": "the path of the sequencial file to read",
+ "paramRequired": true
+ },
+ {
+ "paramName":"h",
+ "paramLongName":"hive_metastore_uris",
+ "paramDescription": "the hive metastore uris",
+ "paramRequired": true
+ },
+ {
+ "paramName":"lp",
+ "paramLongName":"leavesPath",
+ "paramDescription": "true if the new version of the graph must be saved",
+ "paramRequired": false
+ },
+ {
+ "paramName":"cp",
+ "paramLongName":"childParentPath",
+ "paramDescription": "path where to store/find association from datasource and organization",
+ "paramRequired": true
+ },
+ {
+ "paramName":"rp",
+ "paramLongName":"resultOrgPath",
+ "paramDescription": "path where to store/find already linked results and organizations",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ssm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "the path where prepared info have been stored",
+ "paramRequired": false
+ },
+ {
+ "paramName": "rep",
+ "paramLongName": "relationPath",
+ "paramDescription": "the path where to store the selected subset of relations",
+ "paramRequired": false
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_propagation_parameter.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_propagation_parameter.json
new file mode 100644
index 0000000000..f73cc221ef
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_propagation_parameter.json
@@ -0,0 +1,50 @@
+[
+ {
+ "paramName":"rep",
+ "paramLongName":"relationPath",
+ "paramDescription": "the path of the sequencial file to read",
+ "paramRequired": true
+ },
+ {
+ "paramName":"h",
+ "paramLongName":"hive_metastore_uris",
+ "paramDescription": "the hive metastore uris",
+ "paramRequired": true
+ },
+ {
+ "paramName":"lp",
+ "paramLongName":"leavesPath",
+ "paramDescription": "true if the new version of the graph must be saved",
+ "paramRequired": false
+ },
+ {
+ "paramName":"cp",
+ "paramLongName":"childParentPath",
+ "paramDescription": "path where to store/find association from datasource and organization",
+ "paramRequired": true
+ },
+ {
+ "paramName":"rp",
+ "paramLongName":"resultOrgPath",
+ "paramDescription": "path where to store/find already linked results and organizations",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ssm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "the path where prepared info have been stored",
+ "paramRequired": false
+ },
+ {
+ "paramName": "wd",
+ "paramLongName": "workingDir",
+ "paramDescription": "true if it is a test running",
+ "paramRequired": false
+ },
+ {
+ "paramName": "out",
+ "paramLongName": "outputPath",
+ "paramDescription": "the path used to store temporary output files",
+ "paramRequired": true
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/oozie_app/config-default.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/oozie_app/config-default.xml
new file mode 100644
index 0000000000..2744ea92ba
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/oozie_app/config-default.xml
@@ -0,0 +1,58 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+ hive_metastore_uris
+ thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
+
+
+ spark2YarnHistoryServerAddress
+ http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089
+
+
+ spark2EventLogDir
+ /user/spark/spark2ApplicationHistory
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+
+
+ sparkExecutorNumber
+ 4
+
+
+ sparkDriverMemory
+ 15G
+
+
+ sparkExecutorMemory
+ 6G
+
+
+ sparkExecutorCores
+ 1
+
+
+ spark2MaxExecutors
+ 50
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/oozie_app/workflow.xml
new file mode 100644
index 0000000000..17502abea0
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/oozie_app/workflow.xml
@@ -0,0 +1,193 @@
+
+
+
+ sourcePath
+ the source path
+
+
+ outputPath
+ sets the outputPath
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ oozie.action.sharelib.for.spark
+ ${oozieActionShareLibForSpark2}
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ ${wf:conf('resumeFrom') eq 'PrepareInfo'}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${nameNode}/${sourcePath}/relation
+ ${nameNode}/${outputPath}/relation
+
+
+
+
+
+
+
+ ${nameNode}/${sourcePath}/publication
+ ${nameNode}/${outputPath}/publication
+
+
+
+
+
+
+
+ ${nameNode}/${sourcePath}/dataset
+ ${nameNode}/${outputPath}/dataset
+
+
+
+
+
+
+
+ ${nameNode}/${sourcePath}/otherresearchproduct
+ ${nameNode}/${outputPath}/otherresearchproduct
+
+
+
+
+
+
+
+ ${nameNode}/${sourcePath}/software
+ ${nameNode}/${outputPath}/software
+
+
+
+
+
+
+
+ ${nameNode}/${sourcePath}/organization
+ ${nameNode}/${outputPath}/organization
+
+
+
+
+
+
+
+ ${nameNode}/${sourcePath}/project
+ ${nameNode}/${outputPath}/project
+
+
+
+
+
+
+
+ ${nameNode}/${sourcePath}/datasource
+ ${nameNode}/${outputPath}/datasource
+
+
+
+
+
+
+
+
+
+
+ yarn
+ cluster
+ PrepareResultOrganizationAssociation
+ eu.dnetlib.dhp.resulttoorganizationfromsemrel.PrepareInfo
+ dhp-enrichment-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --graphPath${sourcePath}
+ --hive_metastore_uris${hive_metastore_uris}
+ --leavesPath${workingDir}/preparedInfo/leavesPath
+ --childParentPath${workingDir}/preparedInfo/childParentPath
+ --resultOrgPath${workingDir}/preparedInfo/resultOrgPath
+ --relationPath${workingDir}/preparedInfo/relation
+
+
+
+
+
+
+
+ yarn
+ cluster
+ resultToOrganizationFromSemRel
+ eu.dnetlib.dhp.resulttoorganizationfromsemrel.SparkResultToOrganizationFromSemRel
+ dhp-enrichment-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.dynamicAllocation.enabled=true
+ --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
+
+ --relationPath${workingDir}/preparedInfo/relation
+ --outputPath${outputPath}/relation
+ --leavesPath${workingDir}/preparedInfo/leavesPath
+ --childParentPath${workingDir}/preparedInfo/childParentPath
+ --resultOrgPath${workingDir}/preparedInfo/resultOrgPath
+ --hive_metastore_uris${hive_metastore_uris}
+ --workingDir${workingDir}/working
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java
new file mode 100644
index 0000000000..21d99321bf
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java
@@ -0,0 +1,578 @@
+
+package eu.dnetlib.dhp.resulttoorganizationfromsemrel;
+
+import static eu.dnetlib.dhp.PropagationConstant.readPath;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FilterFunction;
+import org.apache.spark.api.java.function.ForeachFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.KeyValueSet;
+import eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+
+public class PrepareInfoJobTest {
+
+ private static final Logger log = LoggerFactory.getLogger(PrepareInfoJobTest.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private static SparkSession spark;
+
+ private static Path workingDir;
+
+ @BeforeAll
+ public static void beforeAll() throws IOException {
+ workingDir = Files.createTempDirectory(PrepareInfoJobTest.class.getSimpleName());
+ log.info("using work dir {}", workingDir);
+
+ SparkConf conf = new SparkConf();
+ conf.setAppName(PrepareInfoJobTest.class.getSimpleName());
+
+ conf.setMaster("local[*]");
+ conf.set("spark.driver.host", "localhost");
+ conf.set("hive.metastore.local", "true");
+ conf.set("spark.ui.enabled", "false");
+ conf.set("spark.sql.warehouse.dir", workingDir.toString());
+ conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
+
+ spark = SparkSession
+ .builder()
+ .appName(PrepareInfoJobTest.class.getSimpleName())
+ .config(conf)
+ .getOrCreate();
+ }
+
+ @AfterAll
+ public static void afterAll() throws IOException {
+ FileUtils.deleteDirectory(workingDir.toFile());
+ spark.stop();
+ }
+
+ @Test
+ public void childParentTest1() throws Exception {
+
+ PrepareInfo
+ .main(
+ new String[] {
+ "-isSparkSessionManaged", Boolean.FALSE.toString(),
+ "-graphPath", getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest1")
+ .getPath(),
+ "-hive_metastore_uris", "",
+ "-leavesPath", workingDir.toString() + "/currentIteration/",
+ "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
+ "-childParentPath", workingDir.toString() + "/childParentOrg/",
+ "-relationPath", workingDir.toString() + "/relation"
+
+ });
+
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ JavaRDD tmp = sc
+ .textFile(workingDir.toString() + "/childParentOrg/")
+ .map(item -> OBJECT_MAPPER.readValue(item, KeyValueSet.class));
+
+ Dataset verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(KeyValueSet.class));
+
+ Assertions.assertEquals(6, verificationDs.count());
+
+ Assertions
+ .assertEquals(
+ 1, verificationDs
+ .filter("key = '20|dedup_wf_001::2899e571609779168222fdeb59cb916d'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertEquals(
+ "20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f",
+ verificationDs
+ .filter("key = '20|dedup_wf_001::2899e571609779168222fdeb59cb916d'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .get(0));
+
+ Assertions
+ .assertEquals(
+ 2, verificationDs
+ .filter("key = '20|pippo_wf_001::2899e571609779168222fdeb59cb916d'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertTrue(
+ verificationDs
+ .filter("key = '20|pippo_wf_001::2899e571609779168222fdeb59cb916d'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .contains("20|dedup_wf_001::2899e571609779168222fdeb59cb916d"));
+ Assertions
+ .assertTrue(
+ verificationDs
+ .filter("key = '20|pippo_wf_001::2899e571609779168222fdeb59cb916d'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .contains("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"));
+
+ Assertions
+ .assertEquals(
+ 1, verificationDs
+ .filter("key = '20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertTrue(
+ verificationDs
+ .filter("key = '20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .contains("20|dedup_wf_001::2899e571609779168222fdeb59cb916d"));
+
+ Assertions
+ .assertEquals(
+ 1, verificationDs
+ .filter("key = '20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertTrue(
+ verificationDs
+ .filter("key = '20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .contains("20|doajarticles::03748bcb5d754c951efec9700e18a56d"));
+
+ Assertions
+ .assertEquals(
+ 1, verificationDs
+ .filter("key = '20|doajarticles::1cae0b82b56ccd97c2db1f698def7074'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertTrue(
+ verificationDs
+ .filter("key = '20|doajarticles::1cae0b82b56ccd97c2db1f698def7074'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .contains("20|openaire____::ec653e804967133b9436fdd30d3ff51d"));
+
+ Assertions
+ .assertEquals(
+ 1, verificationDs
+ .filter("key = '20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertTrue(
+ verificationDs
+ .filter("key = '20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .contains("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074"));
+
+ verificationDs
+ .foreach((ForeachFunction) v -> System.out.println(OBJECT_MAPPER.writeValueAsString(v)));
+
+ }
+
+ @Test
+ public void childParentTest2() throws Exception {
+
+ PrepareInfo
+ .main(
+ new String[] {
+ "-isSparkSessionManaged", Boolean.FALSE.toString(),
+ "-graphPath", getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest2")
+ .getPath(),
+ "-hive_metastore_uris", "",
+ "-leavesPath", workingDir.toString() + "/currentIteration/",
+ "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
+ "-childParentPath", workingDir.toString() + "/childParentOrg/",
+ "-relationPath", workingDir.toString() + "/relation"
+
+ });
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ JavaRDD tmp = sc
+ .textFile(workingDir.toString() + "/childParentOrg/")
+ .map(item -> OBJECT_MAPPER.readValue(item, KeyValueSet.class));
+
+ Dataset verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(KeyValueSet.class));
+
+ Assertions.assertEquals(5, verificationDs.count());
+
+ Assertions
+ .assertEquals(
+ 0, verificationDs.filter("key = '20|dedup_wf_001::2899e571609779168222fdeb59cb916d'").count());
+
+ Assertions
+ .assertEquals(
+ 1, verificationDs
+ .filter("key = '20|pippo_wf_001::2899e571609779168222fdeb59cb916d'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertEquals(
+ "20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f",
+ verificationDs
+ .filter("key = '20|pippo_wf_001::2899e571609779168222fdeb59cb916d'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .get(0));
+
+ Assertions
+ .assertEquals(
+ 1, verificationDs
+ .filter("key = '20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertTrue(
+ verificationDs
+ .filter("key = '20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .contains("20|dedup_wf_001::2899e571609779168222fdeb59cb916d"));
+
+ Assertions
+ .assertEquals(
+ 1, verificationDs
+ .filter("key = '20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertTrue(
+ verificationDs
+ .filter("key = '20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .contains("20|doajarticles::03748bcb5d754c951efec9700e18a56d"));
+
+ Assertions
+ .assertEquals(
+ 1, verificationDs
+ .filter("key = '20|doajarticles::1cae0b82b56ccd97c2db1f698def7074'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertTrue(
+ verificationDs
+ .filter("key = '20|doajarticles::1cae0b82b56ccd97c2db1f698def7074'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .contains("20|openaire____::ec653e804967133b9436fdd30d3ff51d"));
+
+ Assertions
+ .assertEquals(
+ 1, verificationDs
+ .filter("key = '20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertTrue(
+ verificationDs
+ .filter("key = '20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .contains("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074"));
+
+ }
+
+ @Test
+ public void relationTest()throws Exception {
+
+ PrepareInfo
+ .main(
+ new String[] {
+ "-isSparkSessionManaged", Boolean.FALSE.toString(),
+ "-graphPath", getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest")
+ .getPath(),
+ "-hive_metastore_uris", "",
+ "-leavesPath", workingDir.toString() + "/currentIteration/",
+ "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
+ "-childParentPath", workingDir.toString() + "/childParentOrg/",
+ "-relationPath", workingDir.toString() + "/relation"
+
+ });
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ JavaRDD tmp = sc
+ .textFile(workingDir.toString() + "/relation")
+ .map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
+
+ Dataset verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
+
+ Assertions.assertEquals(7, verificationDs.count());
+
+ }
+ @Test
+ public void resultOrganizationTest1() throws Exception {
+
+ PrepareInfo
+ .main(
+ new String[] {
+ "-isSparkSessionManaged", Boolean.FALSE.toString(),
+ "-graphPath", getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest")
+ .getPath(),
+ "-hive_metastore_uris", "",
+ "-leavesPath", workingDir.toString() + "/currentIteration/",
+ "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
+ "-childParentPath", workingDir.toString() + "/childParentOrg/",
+ "-relationPath", workingDir.toString() + "/relation"
+
+ });
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ JavaRDD tmp = sc
+ .textFile(workingDir.toString() + "/resultOrganization/")
+ .map(item -> OBJECT_MAPPER.readValue(item, KeyValueSet.class));
+
+ Dataset verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(KeyValueSet.class));
+
+ Assertions.assertEquals(5, verificationDs.count());
+
+ Assertions
+ .assertEquals(
+ 2, verificationDs
+ .filter("key = '50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertTrue(
+ verificationDs
+ .filter("key = '50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .contains("20|dedup_wf_001::2899e571609779168222fdeb59cb916d"));
+ Assertions
+ .assertTrue(
+ verificationDs
+ .filter("key = '50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .contains("20|pippo_wf_001::2899e571609779168222fdeb59cb916d"));
+
+ Assertions
+ .assertEquals(
+ 2, verificationDs
+ .filter("key = '50|dedup_wf_001::2899e571609779168222fdeb59cb916d'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertTrue(
+ verificationDs
+ .filter("key = '50|dedup_wf_001::2899e571609779168222fdeb59cb916d'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .contains("20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0"));
+ Assertions
+ .assertTrue(
+ verificationDs
+ .filter("key = '50|dedup_wf_001::2899e571609779168222fdeb59cb916d'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .contains("20|pippo_wf_001::2899e571609779168222fdeb59cb916d"));
+
+ Assertions
+ .assertEquals(
+ 1, verificationDs
+ .filter("key = '50|doajarticles::03748bcb5d754c951efec9700e18a56d'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertTrue(
+ verificationDs
+ .filter("key = '50|doajarticles::03748bcb5d754c951efec9700e18a56d'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .contains("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"));
+
+ Assertions
+ .assertEquals(
+ 1, verificationDs
+ .filter("key = '50|openaire____::ec653e804967133b9436fdd30d3ff51d'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertTrue(
+ verificationDs
+ .filter("key = '50|openaire____::ec653e804967133b9436fdd30d3ff51d'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .contains("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074"));
+
+ Assertions
+ .assertEquals(
+ 1, verificationDs
+ .filter("key = '50|doajarticles::1cae0b82b56ccd97c2db1f698def7074'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertTrue(
+ verificationDs
+ .filter("key = '50|doajarticles::1cae0b82b56ccd97c2db1f698def7074'")
+ .collectAsList()
+ .get(0)
+ .getValueSet()
+ .contains("20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1"));
+
+ verificationDs
+ .foreach((ForeachFunction) v -> System.out.println(OBJECT_MAPPER.writeValueAsString(v)));
+
+ }
+
+ @Test
+ public void foundLeavesTest1() throws Exception {
+
+ PrepareInfo
+ .main(
+ new String[] {
+ "-isSparkSessionManaged", Boolean.FALSE.toString(),
+ "-graphPath", getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest")
+ .getPath(),
+ "-hive_metastore_uris", "",
+ "-leavesPath", workingDir.toString() + "/currentIteration/",
+ "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
+ "-childParentPath", workingDir.toString() + "/childParentOrg/",
+ "-relationPath", workingDir.toString() + "/relation"
+
+ });
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ JavaRDD tmp = sc
+ .textFile(workingDir.toString() + "/currentIteration/")
+ .map(item -> OBJECT_MAPPER.readValue(item, String.class));
+
+ Assertions.assertEquals(0, tmp.count());
+
+ }
+
+ @Test
+ public void foundLeavesTest2() throws Exception {
+ PrepareInfo
+ .main(
+ new String[] {
+ "-isSparkSessionManaged", Boolean.FALSE.toString(),
+ "-graphPath", getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest1")
+ .getPath(),
+ "-hive_metastore_uris", "",
+ "-leavesPath", workingDir.toString() + "/currentIteration/",
+ "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
+ "-childParentPath", workingDir.toString() + "/childParentOrg/",
+ "-relationPath", workingDir.toString() + "/relation"
+
+ });
+
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ JavaRDD tmp = sc
+ .textFile(workingDir.toString() + "/currentIteration/")
+ .map(item -> OBJECT_MAPPER.readValue(item, Leaves.class));
+
+ Dataset verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(Leaves.class));
+
+ Assertions.assertEquals(3, verificationDs.count());
+
+ Assertions
+ .assertEquals(
+ 1, verificationDs
+ .filter("value = '20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0'")
+ .count());
+
+ Assertions
+ .assertEquals(
+ 1, verificationDs
+ .filter("value = '20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1'")
+ .count());
+
+ Assertions
+ .assertEquals(
+ 1, verificationDs
+ .filter("value = '20|pippo_wf_001::2899e571609779168222fdeb59cb916d'")
+ .count());
+
+ verificationDs.foreach((ForeachFunction) l -> System.out.println(OBJECT_MAPPER.writeValueAsString(l)));
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkJobTest.java
new file mode 100644
index 0000000000..7dd575b660
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkJobTest.java
@@ -0,0 +1,325 @@
+
+package eu.dnetlib.dhp.resulttoorganizationfromsemrel;
+
+import static eu.dnetlib.dhp.PropagationConstant.isSparkSessionManaged;
+import static eu.dnetlib.dhp.PropagationConstant.readPath;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.ForeachFunction;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.KeyValueSet;
+import eu.dnetlib.dhp.PropagationConstant;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+
+public class SparkJobTest {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private static SparkSession spark;
+
+ private static Path workingDir;
+
+ private static final Logger log = LoggerFactory.getLogger(SparkJobTest.class);
+
+ @BeforeAll
+ public static void beforeAll() throws IOException {
+ workingDir = Files.createTempDirectory(StepActionsTest.class.getSimpleName());
+ log.info("using work dir {}", workingDir);
+
+ SparkConf conf = new SparkConf();
+ conf.setAppName(PrepareInfoJobTest.class.getSimpleName());
+
+ conf.setMaster("local[*]");
+ conf.set("spark.driver.host", "localhost");
+ conf.set("hive.metastore.local", "true");
+ conf.set("spark.ui.enabled", "false");
+ conf.set("spark.sql.warehouse.dir", workingDir.toString());
+ conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
+
+ spark = SparkSession
+ .builder()
+ .appName(PrepareInfoJobTest.class.getSimpleName())
+ .config(conf)
+ .getOrCreate();
+ }
+
+ @AfterAll
+ public static void afterAll() throws IOException {
+ FileUtils.deleteDirectory(workingDir.toFile());
+ spark.stop();
+ }
+
+ @Test
+ public void completeExecution() throws Exception {
+
+ final String graphPath = getClass()
+ .getResource("/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep")
+ .getPath();
+ final String leavesPath = getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/currentIteration/")
+ .getPath();
+ final String childParentPath = getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/childParentOrg/")
+ .getPath();
+
+ final String resultOrgPath = getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/resultOrganization/")
+ .getPath();
+
+ readPath(spark, leavesPath, Leaves.class)
+ .write()
+ .option("compression", "gzip")
+ .json(workingDir.toString() + "/leavesInput");
+
+ readPath(spark, resultOrgPath, KeyValueSet.class)
+ .write()
+ .option("compression", "gzip")
+ .json(workingDir.toString() + "/orgsInput");
+
+ SparkResultToOrganizationFromSemRel
+
+ .main(
+ new String[] {
+ "-isSparkSessionManaged", Boolean.FALSE.toString(),
+ "-relationPath", graphPath,
+ "-hive_metastore_uris", "",
+ "-outputPath", workingDir.toString() + "/finalrelation",
+ "-leavesPath", workingDir.toString() + "/leavesInput",
+ "-resultOrgPath", workingDir.toString() + "/orgsInput",
+ "-childParentPath", childParentPath,
+ "-workingDir", workingDir.toString()
+ });
+
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ JavaRDD tmp = sc
+ .textFile(workingDir.toString() + "/finalrelation")
+ .map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
+
+ tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
+
+ Assertions.assertEquals(18, tmp.count());
+ tmp.foreach(r -> Assertions.assertEquals(ModelConstants.AFFILIATION, r.getSubRelType()));
+ tmp.foreach(r -> Assertions.assertEquals(ModelConstants.RESULT_ORGANIZATION, r.getRelType()));
+ tmp
+ .foreach(
+ r -> Assertions
+ .assertEquals(
+ PropagationConstant.PROPAGATION_DATA_INFO_TYPE, r.getDataInfo().getInferenceprovenance()));
+ tmp
+ .foreach(
+ r -> Assertions
+ .assertEquals(
+ PropagationConstant.PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_ID,
+ r.getDataInfo().getProvenanceaction().getClassid()));
+ tmp
+ .foreach(
+ r -> Assertions
+ .assertEquals(
+ PropagationConstant.PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_NAME,
+ r.getDataInfo().getProvenanceaction().getClassname()));
+ tmp
+ .foreach(
+ r -> Assertions
+ .assertEquals(
+ "0.85",
+ r.getDataInfo().getTrust()));
+
+ Assertions.assertEquals(9, tmp.filter(r -> r.getSource().substring(0, 3).equals("50|")).count());
+ tmp
+ .filter(r -> r.getSource().substring(0, 3).equals("50|"))
+ .foreach(r -> Assertions.assertEquals(ModelConstants.HAS_AUTHOR_INSTITUTION, r.getRelClass()));
+ Assertions
+ .assertEquals(
+ 2, tmp.filter(r -> r.getSource().equals("50|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count());
+ Assertions
+ .assertEquals(
+ 3, tmp.filter(r -> r.getSource().equals("50|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count());
+ Assertions
+ .assertEquals(
+ 2, tmp.filter(r -> r.getSource().equals("50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count());
+ Assertions
+ .assertEquals(
+ 1, tmp.filter(r -> r.getSource().equals("50|openaire____::ec653e804967133b9436fdd30d3ff51d")).count());
+ Assertions
+ .assertEquals(
+ 1, tmp.filter(r -> r.getSource().equals("50|doajarticles::03748bcb5d754c951efec9700e18a56d")).count());
+
+ Assertions.assertEquals(9, tmp.filter(r -> r.getSource().substring(0, 3).equals("20|")).count());
+ tmp
+ .filter(r -> r.getSource().substring(0, 3).equals("20|"))
+ .foreach(r -> Assertions.assertEquals(ModelConstants.IS_AUTHOR_INSTITUTION_OF, r.getRelClass()));
+ Assertions
+ .assertEquals(
+ 1, tmp.filter(r -> r.getSource().equals("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count());
+ Assertions
+ .assertEquals(
+ 1, tmp.filter(r -> r.getSource().equals("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count());
+ Assertions
+ .assertEquals(
+ 2, tmp.filter(r -> r.getSource().equals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count());
+ Assertions
+ .assertEquals(
+ 2, tmp.filter(r -> r.getSource().equals("20|openaire____::ec653e804967133b9436fdd30d3ff51d")).count());
+ Assertions
+ .assertEquals(
+ 3, tmp.filter(r -> r.getSource().equals("20|doajarticles::03748bcb5d754c951efec9700e18a56d")).count());
+
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(r -> r.getSource().equals("50|doajarticles::1cae0b82b56ccd97c2db1f698def7074"))
+ .map(r -> r.getTarget())
+ .collect()
+ .contains("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074"));
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(r -> r.getSource().equals("50|doajarticles::1cae0b82b56ccd97c2db1f698def7074"))
+ .map(r -> r.getTarget())
+ .collect()
+ .contains("20|openaire____::ec653e804967133b9436fdd30d3ff51d"));
+
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(r -> r.getSource().equals("50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"))
+ .map(r -> r.getTarget())
+ .collect()
+ .contains("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"));
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(r -> r.getSource().equals("50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"))
+ .map(r -> r.getTarget())
+ .collect()
+ .contains("20|doajarticles::03748bcb5d754c951efec9700e18a56d"));
+
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(r -> r.getSource().equals("50|dedup_wf_001::2899e571609779168222fdeb59cb916d"))
+ .map(r -> r.getTarget())
+ .collect()
+ .contains("20|dedup_wf_001::2899e571609779168222fdeb59cb916d"));
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(r -> r.getSource().equals("50|dedup_wf_001::2899e571609779168222fdeb59cb916d"))
+ .map(r -> r.getTarget())
+ .collect()
+ .contains("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"));
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(r -> r.getSource().equals("50|dedup_wf_001::2899e571609779168222fdeb59cb916d"))
+ .map(r -> r.getTarget())
+ .collect()
+ .contains("20|doajarticles::03748bcb5d754c951efec9700e18a56d"));
+
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(r -> r.getSource().equals("50|openaire____::ec653e804967133b9436fdd30d3ff51d"))
+ .map(r -> r.getTarget())
+ .collect()
+ .contains("20|openaire____::ec653e804967133b9436fdd30d3ff51d"));
+
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(r -> r.getSource().equals("50|doajarticles::03748bcb5d754c951efec9700e18a56d"))
+ .map(r -> r.getTarget())
+ .collect()
+ .contains("20|doajarticles::03748bcb5d754c951efec9700e18a56d"));
+
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(r -> r.getSource().equals("20|openaire____::ec653e804967133b9436fdd30d3ff51d"))
+ .map(r -> r.getTarget())
+ .collect()
+ .contains("50|doajarticles::1cae0b82b56ccd97c2db1f698def7074"));
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(r -> r.getSource().equals("20|openaire____::ec653e804967133b9436fdd30d3ff51d"))
+ .map(r -> r.getTarget())
+ .collect()
+ .contains("50|openaire____::ec653e804967133b9436fdd30d3ff51d"));
+
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(r -> r.getSource().equals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"))
+ .map(r -> r.getTarget())
+ .collect()
+ .contains("50|dedup_wf_001::2899e571609779168222fdeb59cb916d"));
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(r -> r.getSource().equals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"))
+ .map(r -> r.getTarget())
+ .collect()
+ .contains("50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"));
+
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(r -> r.getSource().equals("20|doajarticles::03748bcb5d754c951efec9700e18a56d"))
+ .map(r -> r.getTarget())
+ .collect()
+ .contains("50|dedup_wf_001::2899e571609779168222fdeb59cb916d"));
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(r -> r.getSource().equals("20|doajarticles::03748bcb5d754c951efec9700e18a56d"))
+ .map(r -> r.getTarget())
+ .collect()
+ .contains("50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"));
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(r -> r.getSource().equals("20|doajarticles::03748bcb5d754c951efec9700e18a56d"))
+ .map(r -> r.getTarget())
+ .collect()
+ .contains("50|doajarticles::03748bcb5d754c951efec9700e18a56d"));
+
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(r -> r.getSource().equals("20|dedup_wf_001::2899e571609779168222fdeb59cb916d"))
+ .map(r -> r.getTarget())
+ .collect()
+ .contains("50|dedup_wf_001::2899e571609779168222fdeb59cb916d"));
+
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(r -> r.getSource().equals("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074"))
+ .map(r -> r.getTarget())
+ .collect()
+ .contains("50|doajarticles::1cae0b82b56ccd97c2db1f698def7074"));
+ }
+
+}
diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActionsTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActionsTest.java
new file mode 100644
index 0000000000..5c715f3b92
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActionsTest.java
@@ -0,0 +1,411 @@
+
+package eu.dnetlib.dhp.resulttoorganizationfromsemrel;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FilterFunction;
+import org.apache.spark.api.java.function.ForeachFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.KeyValueSet;
+import eu.dnetlib.dhp.PropagationConstant;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+
+public class StepActionsTest {
+
+ private static final Logger log = LoggerFactory.getLogger(StepActionsTest.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private static SparkSession spark;
+
+ private static Path workingDir;
+
+ @BeforeAll
+ public static void beforeAll() throws IOException {
+ workingDir = Files.createTempDirectory(StepActionsTest.class.getSimpleName());
+ log.info("using work dir {}", workingDir);
+
+ SparkConf conf = new SparkConf();
+ conf.setAppName(PrepareInfoJobTest.class.getSimpleName());
+
+ conf.setMaster("local[*]");
+ conf.set("spark.driver.host", "localhost");
+ conf.set("hive.metastore.local", "true");
+ conf.set("spark.ui.enabled", "false");
+ conf.set("spark.sql.warehouse.dir", workingDir.toString());
+ conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
+
+ spark = SparkSession
+ .builder()
+ .appName(PrepareInfoJobTest.class.getSimpleName())
+ .config(conf)
+ .getOrCreate();
+ }
+
+ @AfterAll
+ public static void afterAll() throws IOException {
+ FileUtils.deleteDirectory(workingDir.toFile());
+ spark.stop();
+ }
+
+ @Test
+ public void execStepTest() {
+
+ StepActions
+ .execStep(
+ spark, getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/")
+ .getPath(),
+ workingDir.toString() + "/newRelationPath",
+ getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/currentIteration/")
+ .getPath(),
+ getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/childParentOrg/")
+ .getPath(),
+ getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/resultOrganization/")
+ .getPath());
+
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ JavaRDD tmp = sc
+ .textFile(workingDir.toString() + "/newRelationPath")
+ .map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
+
+ Assertions.assertEquals(4, tmp.count());
+
+ Dataset verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
+
+ verificationDs
+ .foreach(
+ (ForeachFunction) r -> Assertions
+ .assertEquals("propagation", r.getDataInfo().getInferenceprovenance()));
+
+ verificationDs
+ .foreach((ForeachFunction) r -> Assertions.assertEquals("0.85", r.getDataInfo().getTrust()));
+
+ verificationDs
+ .foreach((ForeachFunction) r -> Assertions.assertEquals("50|", r.getSource().substring(0, 3)));
+
+ verificationDs
+ .foreach((ForeachFunction) r -> Assertions.assertEquals("20|", r.getTarget().substring(0, 3)));
+
+ verificationDs
+ .foreach(
+ (ForeachFunction) r -> Assertions
+ .assertEquals(ModelConstants.HAS_AUTHOR_INSTITUTION, r.getRelClass()));
+
+ verificationDs
+ .foreach(
+ (ForeachFunction) r -> Assertions
+ .assertEquals(ModelConstants.RESULT_ORGANIZATION, r.getRelType()));
+
+ verificationDs
+ .foreach(
+ (ForeachFunction) r -> Assertions
+ .assertEquals(ModelConstants.AFFILIATION, r.getSubRelType()));
+
+ verificationDs
+ .foreach(
+ (ForeachFunction) r -> Assertions
+ .assertEquals(
+ PropagationConstant.PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_ID,
+ r.getDataInfo().getProvenanceaction().getClassid()));
+
+ verificationDs
+ .foreach(
+ (ForeachFunction) r -> Assertions
+ .assertEquals(
+ PropagationConstant.PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_NAME,
+ r.getDataInfo().getProvenanceaction().getClassname()));
+
+ verificationDs
+ .filter(
+ (FilterFunction) r -> r
+ .getSource()
+ .equals("50|doajarticles::1cae0b82b56ccd97c2db1f698def7074"))
+ .foreach(
+ (ForeachFunction) r -> Assertions
+ .assertEquals("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074", r.getTarget()));
+
+ verificationDs
+ .filter(
+ (FilterFunction) r -> r
+ .getSource()
+ .equals("50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"))
+ .foreach(
+ (ForeachFunction) r -> Assertions
+ .assertEquals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f", r.getTarget()));
+
+ Assertions
+ .assertEquals(
+ 2,
+ verificationDs
+ .filter(
+ (FilterFunction) r -> r
+ .getSource()
+ .equals("50|dedup_wf_001::2899e571609779168222fdeb59cb916d"))
+ .count());
+
+ Assertions
+ .assertEquals(
+ 1,
+ verificationDs
+ .filter(
+ (FilterFunction) r -> r
+ .getSource()
+ .equals("50|dedup_wf_001::2899e571609779168222fdeb59cb916d") &&
+ r.getTarget().equals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"))
+ .count());
+
+ Assertions
+ .assertEquals(
+ 1,
+ verificationDs
+ .filter(
+ (FilterFunction) r -> r
+ .getSource()
+ .equals("50|dedup_wf_001::2899e571609779168222fdeb59cb916d") &&
+ r.getTarget().equals("20|dedup_wf_001::2899e571609779168222fdeb59cb916d"))
+ .count());
+
+ tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
+ }
+
+ @Test
+ public void prepareForNextStepLeavesTest() {
+
+ StepActions
+ .prepareForNextStep(
+ spark,
+ getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/relsforiteration1/")
+ .getPath(),
+ getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/resultOrganization/")
+ .getPath(),
+ getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/currentIteration/")
+ .getPath(),
+ getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/childParentOrg/")
+ .getPath(),
+ workingDir.toString() + "/tempLeaves", workingDir.toString() + "/tempOrgs");
+
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ JavaRDD tmp = sc
+ .textFile(workingDir.toString() + "/tempLeaves")
+ .map(item -> OBJECT_MAPPER.readValue(item, Leaves.class));
+
+ Assertions.assertEquals(3, tmp.count());
+
+ Assertions
+ .assertEquals(
+ 1, tmp.filter(l -> l.getValue().equals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count());
+
+ Assertions
+ .assertEquals(
+ 1, tmp.filter(l -> l.getValue().equals("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count());
+
+ Assertions
+ .assertEquals(
+ 1, tmp.filter(l -> l.getValue().equals("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count());
+
+ }
+
+ @Test
+ public void prepareFonNextStepOrgTest() {
+ StepActions
+ .prepareForNextStep(
+ spark,
+ getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/relsforiteration1/")
+ .getPath(),
+ getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/resultOrganization/")
+ .getPath(),
+ getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/currentIteration/")
+ .getPath(),
+ getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/childParentOrg/")
+ .getPath(),
+ workingDir.toString() + "/tempLeaves", workingDir.toString() + "/tempOrgs");
+
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ JavaRDD tmp = sc
+ .textFile(workingDir.toString() + "/tempOrgs")
+ .map(item -> OBJECT_MAPPER.readValue(item, KeyValueSet.class));
+
+ Assertions.assertEquals(5, tmp.count());
+
+ Assertions
+ .assertEquals(
+ 1, tmp
+ .filter(kv -> kv.getKey().equals("50|openaire____::ec653e804967133b9436fdd30d3ff51d"))
+ .collect()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertEquals(
+ "20|doajarticles::1cae0b82b56ccd97c2db1f698def7074",
+ tmp
+ .filter(kv -> kv.getKey().equals("50|openaire____::ec653e804967133b9436fdd30d3ff51d"))
+ .collect()
+ .get(0)
+ .getValueSet()
+ .get(0));
+
+ Assertions
+ .assertEquals(
+ 1, tmp
+ .filter(kv -> kv.getKey().equals("50|doajarticles::03748bcb5d754c951efec9700e18a56d"))
+ .collect()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertEquals(
+ "20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f",
+ tmp
+ .filter(kv -> kv.getKey().equals("50|doajarticles::03748bcb5d754c951efec9700e18a56d"))
+ .collect()
+ .get(0)
+ .getValueSet()
+ .get(0));
+
+ Assertions
+ .assertEquals(
+ 4, tmp
+ .filter(kv -> kv.getKey().equals("50|dedup_wf_001::2899e571609779168222fdeb59cb916d"))
+ .collect()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(kv -> kv.getKey().equals("50|dedup_wf_001::2899e571609779168222fdeb59cb916d"))
+ .collect()
+ .get(0)
+ .getValueSet()
+ .contains("20|dedup_wf_001::2899e571609779168222fdeb59cb916d"));
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(kv -> kv.getKey().equals("50|dedup_wf_001::2899e571609779168222fdeb59cb916d"))
+ .collect()
+ .get(0)
+ .getValueSet()
+ .contains("20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0"));
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(kv -> kv.getKey().equals("50|dedup_wf_001::2899e571609779168222fdeb59cb916d"))
+ .collect()
+ .get(0)
+ .getValueSet()
+ .contains("20|pippo_wf_001::2899e571609779168222fdeb59cb916d"));
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(kv -> kv.getKey().equals("50|dedup_wf_001::2899e571609779168222fdeb59cb916d"))
+ .collect()
+ .get(0)
+ .getValueSet()
+ .contains("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"));
+
+ Assertions
+ .assertEquals(
+ 2, tmp
+ .filter(kv -> kv.getKey().equals("50|doajarticles::1cae0b82b56ccd97c2db1f698def7074"))
+ .collect()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(kv -> kv.getKey().equals("50|doajarticles::1cae0b82b56ccd97c2db1f698def7074"))
+ .collect()
+ .get(0)
+ .getValueSet()
+ .contains("20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1"));
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(kv -> kv.getKey().equals("50|doajarticles::1cae0b82b56ccd97c2db1f698def7074"))
+ .collect()
+ .get(0)
+ .getValueSet()
+ .contains("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074"));
+
+ Assertions
+ .assertEquals(
+ 3, tmp
+ .filter(kv -> kv.getKey().equals("50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"))
+ .collect()
+ .get(0)
+ .getValueSet()
+ .size());
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(kv -> kv.getKey().equals("50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"))
+ .collect()
+ .get(0)
+ .getValueSet()
+ .contains("20|dedup_wf_001::2899e571609779168222fdeb59cb916d"));
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(kv -> kv.getKey().equals("50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"))
+ .collect()
+ .get(0)
+ .getValueSet()
+ .contains("20|pippo_wf_001::2899e571609779168222fdeb59cb916d"));
+ Assertions
+ .assertTrue(
+ tmp
+ .filter(kv -> kv.getKey().equals("50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"))
+ .collect()
+ .get(0)
+ .getValueSet()
+ .contains("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"));
+
+ }
+}
diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest1/relation b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest1/relation
new file mode 100644
index 0000000000..c63a2e0acf
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest1/relation
@@ -0,0 +1,7 @@
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"20|pippo_wf_001::2899e571609779168222fdeb59cb916d"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d","subRelType":"provision","target":"20|pippo_wf_001::2899e571609779168222fdeb59cb916d"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d","subRelType":"provision","target":"20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::03748bcb5d754c951efec9700e18a56d","subRelType":"provision","target":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|openaire____::ec653e804967133b9436fdd30d3ff51d","subRelType":"provision","target":"20|doajarticles::1cae0b82b56ccd97c2db1f698def7074"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::1cae0b82b56ccd97c2db1f698def7074","subRelType":"provision","target":"20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1"}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest2/relation b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest2/relation
new file mode 100644
index 0000000000..54589de329
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest2/relation
@@ -0,0 +1,7 @@
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":true,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"20|pippo_wf_001::2899e571609779168222fdeb59cb916d"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":true,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d","subRelType":"provision","target":"20|pippo_wf_001::2899e571609779168222fdeb59cb916d"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d","subRelType":"provision","target":"20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::03748bcb5d754c951efec9700e18a56d","subRelType":"provision","target":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|openaire____::ec653e804967133b9436fdd30d3ff51d","subRelType":"provision","target":"20|doajarticles::1cae0b82b56ccd97c2db1f698def7074"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::1cae0b82b56ccd97c2db1f698def7074","subRelType":"provision","target":"20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1"}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/childParentOrg/childparent b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/childParentOrg/childparent
new file mode 100644
index 0000000000..7d9ea588b3
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/childParentOrg/childparent
@@ -0,0 +1,6 @@
+{"key":"20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1","valueSet":["20|doajarticles::1cae0b82b56ccd97c2db1f698def7074"]}
+{"key":"20|pippo_wf_001::2899e571609779168222fdeb59cb916d","valueSet":["20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","20|dedup_wf_001::2899e571609779168222fdeb59cb916d"]}
+{"key":"20|doajarticles::1cae0b82b56ccd97c2db1f698def7074","valueSet":["20|openaire____::ec653e804967133b9436fdd30d3ff51d"]}
+{"key":"20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0","valueSet":["20|dedup_wf_001::2899e571609779168222fdeb59cb916d"]}
+{"key":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d","valueSet":["20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"]}
+{"key":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","valueSet":["20|doajarticles::03748bcb5d754c951efec9700e18a56d"]}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/currentIteration/leaves b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/currentIteration/leaves
new file mode 100644
index 0000000000..3be9cae3bf
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/currentIteration/leaves
@@ -0,0 +1,3 @@
+{"value":"20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1"}
+{"value":"20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0"}
+{"value":"20|pippo_wf_001::2899e571609779168222fdeb59cb916d"}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/relation b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/relation
new file mode 100644
index 0000000000..db7db8fdd1
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/relation
@@ -0,0 +1,14 @@
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"20|pippo_wf_001::2899e571609779168222fdeb59cb916d"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d","subRelType":"provision","target":"20|pippo_wf_001::2899e571609779168222fdeb59cb916d"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d","subRelType":"provision","target":"20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::03748bcb5d754c951efec9700e18a56d","subRelType":"provision","target":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|openaire____::ec653e804967133b9436fdd30d3ff51d","subRelType":"provision","target":"20|doajarticles::1cae0b82b56ccd97c2db1f698def7074"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::1cae0b82b56ccd97c2db1f698def7074","subRelType":"provision","target":"20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"20|pippo_wf_001::2899e571609779168222fdeb59cb916d"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|dedup_wf_001::2899e571609779168222fdeb59cb916d","subRelType":"provision","target":"20|pippo_wf_001::2899e571609779168222fdeb59cb916d"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|dedup_wf_001::2899e571609779168222fdeb59cb916d","subRelType":"provision","target":"20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|doajarticles::03748bcb5d754c951efec9700e18a56d","subRelType":"provision","target":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|openaire____::ec653e804967133b9436fdd30d3ff51d","subRelType":"provision","target":"20|doajarticles::1cae0b82b56ccd97c2db1f698def7074"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|doajarticles::1cae0b82b56ccd97c2db1f698def7074","subRelType":"provision","target":"20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1"}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/relsforiteration1/relation b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/relsforiteration1/relation
new file mode 100644
index 0000000000..32b816ef79
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/relsforiteration1/relation
@@ -0,0 +1,4 @@
+{"collectedfrom":null,"dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"0.85","inferenceprovenance":"propagation","provenanceaction":{"classid":"result:organization:semrel","classname":"Propagation of affiliation to result through sematic relations","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":null,"relType":"resultOrganization","subRelType":"affiliation","relClass":"hasAuthorInstitution","source":"50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","target":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","validated":false,"validationDate":null,"properties":[]}
+{"collectedfrom":null,"dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"0.85","inferenceprovenance":"propagation","provenanceaction":{"classid":"result:organization:semrel","classname":"Propagation of affiliation to result through sematic relations","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":null,"relType":"resultOrganization","subRelType":"affiliation","relClass":"hasAuthorInstitution","source":"50|dedup_wf_001::2899e571609779168222fdeb59cb916d","target":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","validated":false,"validationDate":null,"properties":[]}
+{"collectedfrom":null,"dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"0.85","inferenceprovenance":"propagation","provenanceaction":{"classid":"result:organization:semrel","classname":"Propagation of affiliation to result through sematic relations","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":null,"relType":"resultOrganization","subRelType":"affiliation","relClass":"hasAuthorInstitution","source":"50|dedup_wf_001::2899e571609779168222fdeb59cb916d","target":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d","validated":false,"validationDate":null,"properties":[]}
+{"collectedfrom":null,"dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"0.85","inferenceprovenance":"propagation","provenanceaction":{"classid":"result:organization:semrel","classname":"Propagation of affiliation to result through sematic relations","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":null,"relType":"resultOrganization","subRelType":"affiliation","relClass":"hasAuthorInstitution","source":"50|doajarticles::1cae0b82b56ccd97c2db1f698def7074","target":"20|doajarticles::1cae0b82b56ccd97c2db1f698def7074","validated":false,"validationDate":null,"properties":[]}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/resultOrganization/resultorganization b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/resultOrganization/resultorganization
new file mode 100644
index 0000000000..b4e227227d
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/execstep/resultOrganization/resultorganization
@@ -0,0 +1,5 @@
+{"key":"50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","valueSet":["20|pippo_wf_001::2899e571609779168222fdeb59cb916d","20|dedup_wf_001::2899e571609779168222fdeb59cb916d"]}
+{"key":"50|doajarticles::1cae0b82b56ccd97c2db1f698def7074","valueSet":["20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1"]}
+{"key":"50|dedup_wf_001::2899e571609779168222fdeb59cb916d","valueSet":["20|pippo_wf_001::2899e571609779168222fdeb59cb916d","20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0"]}
+{"key":"50|openaire____::ec653e804967133b9436fdd30d3ff51d","valueSet":["20|doajarticles::1cae0b82b56ccd97c2db1f698def7074"]}
+{"key":"50|doajarticles::03748bcb5d754c951efec9700e18a56d","valueSet":["20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"]}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest/relation b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest/relation
new file mode 100644
index 0000000000..5aeabb71b3
--- /dev/null
+++ b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest/relation
@@ -0,0 +1,7 @@
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"20|pippo_wf_001::2899e571609779168222fdeb59cb916d"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|dedup_wf_001::2899e571609779168222fdeb59cb916d","subRelType":"provision","target":"20|pippo_wf_001::2899e571609779168222fdeb59cb916d"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|dedup_wf_001::2899e571609779168222fdeb59cb916d","subRelType":"provision","target":"20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|doajarticles::03748bcb5d754c951efec9700e18a56d","subRelType":"provision","target":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|openaire____::ec653e804967133b9436fdd30d3ff51d","subRelType":"provision","target":"20|doajarticles::1cae0b82b56ccd97c2db1f698def7074"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|doajarticles::1cae0b82b56ccd97c2db1f698def7074","subRelType":"provision","target":"20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1"}
\ No newline at end of file