diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java index 2e9255ed56..cf970048d7 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java @@ -12,6 +12,7 @@ import eu.dnetlib.dhp.schema.oaf.Relation; import java.util.Arrays; import java.util.List; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.*; @@ -95,45 +96,57 @@ public class PrepareProjectResultsAssociation { resproj_relation.createOrReplaceTempView("resproj_relation"); query = - "SELECT projectId, collect_set(resId) resultSet " - + "FROM (" - + " SELECT r1.target resId, r2.target projectId " + "SELECT resultId, collect_set(projectId) projectSet " + + "FROM ( " + + "SELECT r1.target resultId, r2.target projectId " + " FROM (SELECT source, target " + " FROM relation " + " WHERE datainfo.deletedbyinference = false " + getConstraintList(" relClass = '", allowedsemrel) - + ") r1" + + " ) r1" + " JOIN resproj_relation r2 " + " ON r1.source = r2.source " + " ) tmp " - + "GROUP BY projectId "; + + "GROUP BY resultId "; + // query = + // "SELECT projectId, collect_set(resId) resultSet " + // + "FROM (" + // + " SELECT r1.target resId, r2.target projectId " + // + " FROM (SELECT source, target " + // + " FROM relation " + // + " WHERE datainfo.deletedbyinference = false " + // + getConstraintList(" relClass = '", allowedsemrel) + // + ") r1" + // + " JOIN resproj_relation r2 " + // + " ON r1.source = r2.source " + // + " ) tmp " + // + "GROUP BY projectId "; spark.sql(query) - .as(Encoders.bean(ProjectResultSet.class)) - .toJSON() - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .text(potentialUpdatePath); - // .toJavaRDD() - // .map(r -> OBJECT_MAPPER.writeValueAsString(r)) - // .saveAsTextFile(potentialUpdatePath, GzipCodec.class); + .as(Encoders.bean(ResultProjectSet.class)) + // .toJSON() + // .write() + // .mode(SaveMode.Overwrite) + // .option("compression", "gzip") + // .text(potentialUpdatePath); + .toJavaRDD() + .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + .saveAsTextFile(potentialUpdatePath, GzipCodec.class); query = - "SELECT target projectId, collect_set(source) resultSet " + "SELECT source resultId, collect_set(target) projectSet " + "FROM resproj_relation " - + "GROUP BY target"; + + "GROUP BY source"; spark.sql(query) - .as(Encoders.bean(ProjectResultSet.class)) - .toJSON() - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .text(alreadyLinkedPath); - // .toJavaRDD() - // .map(r -> OBJECT_MAPPER.writeValueAsString(r)) - // .saveAsTextFile(alreadyLinkedPath, GzipCodec.class); - + .as(Encoders.bean(ResultProjectSet.class)) + // .toJSON() + // .write() + // .mode(SaveMode.Overwrite) + // .option("compression", "gzip") + // .text(alreadyLinkedPath); + .toJavaRDD() + .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + .saveAsTextFile(alreadyLinkedPath, GzipCodec.class); } } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/ProjectResultSet.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/ProjectResultSet.java deleted file mode 100644 index da1be3b69d..0000000000 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/ProjectResultSet.java +++ /dev/null @@ -1,25 +0,0 @@ -package eu.dnetlib.dhp.projecttoresult; - -import java.io.Serializable; -import java.util.ArrayList; - -public class ProjectResultSet implements Serializable { - private String projectId; - private ArrayList resultSet; - - public String getProjectId() { - return projectId; - } - - public void setProjectId(String projectId) { - this.projectId = projectId; - } - - public ArrayList getResultSet() { - return resultSet; - } - - public void setResultSet(ArrayList resultSet) { - this.resultSet = resultSet; - } -} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/ResultProjectSet.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/ResultProjectSet.java new file mode 100644 index 0000000000..183ae14896 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/ResultProjectSet.java @@ -0,0 +1,25 @@ +package eu.dnetlib.dhp.projecttoresult; + +import java.io.Serializable; +import java.util.ArrayList; + +public class ResultProjectSet implements Serializable { + private String resultId; + private ArrayList projectSet; + + public String getResultId() { + return resultId; + } + + public void setResultId(String resultId) { + this.resultId = resultId; + } + + public ArrayList getProjectSet() { + return projectSet; + } + + public void setProjectSet(ArrayList project) { + this.projectSet = project; + } +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob3.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob3.java index 45d6976512..e32242a90f 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob3.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob3.java @@ -9,6 +9,7 @@ import eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation; import eu.dnetlib.dhp.schema.oaf.Relation; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.*; @@ -44,9 +45,6 @@ public class SparkResultToProjectThroughSemRelJob3 { final String alreadyLinkedPath = parser.get("alreadyLinkedPath"); log.info("alreadyLinkedPath {}: ", alreadyLinkedPath); - final Boolean writeUpdates = Boolean.valueOf(parser.get("writeUpdate")); - log.info("writeUpdate: {}", writeUpdates); - final Boolean saveGraph = Boolean.valueOf(parser.get("saveGraph")); log.info("saveGraph: {}", saveGraph); @@ -60,12 +58,7 @@ public class SparkResultToProjectThroughSemRelJob3 { removeOutputDir(spark, outputPath); } execPropagation( - spark, - outputPath, - alreadyLinkedPath, - potentialUpdatePath, - writeUpdates, - saveGraph); + spark, outputPath, alreadyLinkedPath, potentialUpdatePath, saveGraph); }); } @@ -74,21 +67,12 @@ public class SparkResultToProjectThroughSemRelJob3 { String outputPath, String alreadyLinkedPath, String potentialUpdatePath, - Boolean writeUpdate, Boolean saveGraph) { - Dataset toaddrelations = - readAssocProjectResults(spark, potentialUpdatePath); - Dataset alreadyLinked = readAssocProjectResults(spark, alreadyLinkedPath); + Dataset toaddrelations = + readAssocResultProjects(spark, potentialUpdatePath); + Dataset alreadyLinked = readAssocResultProjects(spark, alreadyLinkedPath); - if (writeUpdate) { - toaddrelations - .toJSON() - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .text(outputPath + "/potential_updates"); - } if (saveGraph) { getNewRelations(alreadyLinked, toaddrelations) .toJSON() @@ -100,56 +84,66 @@ public class SparkResultToProjectThroughSemRelJob3 { } private static Dataset getNewRelations( - Dataset alreadyLinked, Dataset toaddrelations) { + Dataset alreadyLinked, Dataset toaddrelations) { return toaddrelations .joinWith( alreadyLinked, - toaddrelations.col("projectId").equalTo(alreadyLinked.col("projectId")), - "left") + toaddrelations.col("resultId").equalTo(alreadyLinked.col("resultId")), + "left_outer") .flatMap( value -> { List new_relations = new ArrayList<>(); - ProjectResultSet potential_update = value._1(); - ProjectResultSet already_linked = value._2(); - String projId = already_linked.getProjectId(); - potential_update.getResultSet().stream() + ResultProjectSet potential_update = value._1(); + Optional already_linked = + Optional.ofNullable(value._2()); + if (already_linked.isPresent()) { + already_linked.get().getProjectSet().stream() + .forEach( + (p -> { + if (potential_update + .getProjectSet() + .contains(p)) { + potential_update.getProjectSet().remove(p); + } + })); + } + String resId = potential_update.getResultId(); + potential_update.getProjectSet().stream() .forEach( - rId -> { - if (!already_linked.getResultSet().contains(rId)) { - new_relations.add( - getRelation( - rId, - projId, - RELATION_RESULT_PROJECT_REL_CLASS, - RELATION_RESULTPROJECT_REL_TYPE, - RELATION_RESULTPROJECT_SUBREL_TYPE, - PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); - new_relations.add( - getRelation( - projId, - rId, - RELATION_PROJECT_RESULT_REL_CLASS, - RELATION_RESULTPROJECT_REL_TYPE, - RELATION_RESULTPROJECT_SUBREL_TYPE, - PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); - } + pId -> { + new_relations.add( + getRelation( + resId, + pId, + RELATION_RESULT_PROJECT_REL_CLASS, + RELATION_RESULTPROJECT_REL_TYPE, + RELATION_RESULTPROJECT_SUBREL_TYPE, + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); + new_relations.add( + getRelation( + pId, + resId, + RELATION_PROJECT_RESULT_REL_CLASS, + RELATION_RESULTPROJECT_REL_TYPE, + RELATION_RESULTPROJECT_SUBREL_TYPE, + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); }); return new_relations.iterator(); }, Encoders.bean(Relation.class)); } - private static Dataset readAssocProjectResults( + private static Dataset readAssocResultProjects( SparkSession spark, String potentialUpdatePath) { return spark.read() .textFile(potentialUpdatePath) .map( - value -> OBJECT_MAPPER.readValue(value, ProjectResultSet.class), - Encoders.bean(ProjectResultSet.class)); + value -> OBJECT_MAPPER.readValue(value, ResultProjectSet.class), + Encoders.bean(ResultProjectSet.class)); } } diff --git a/dhp-workflows/dhp-propagation/src/test/resources/eu/dnetlib/dhp/projecttoresult/preparedInfo/tenupdates/potentialUpdates/potentialUpdates_1.json.gz b/dhp-workflows/dhp-propagation/src/test/resources/eu/dnetlib/dhp/projecttoresult/preparedInfo/tenupdates/potentialUpdates/potentialUpdates_1.json.gz deleted file mode 100644 index 0280131de5..0000000000 Binary files a/dhp-workflows/dhp-propagation/src/test/resources/eu/dnetlib/dhp/projecttoresult/preparedInfo/tenupdates/potentialUpdates/potentialUpdates_1.json.gz and /dev/null differ