re-implemented inverting the couple: from (projectId, relatedResultList) to (resultId, relatedProjectList)

This commit is contained in:
Miriam Baglioni 2020-04-27 10:26:55 +02:00
parent adcbf0e29a
commit 8802e4126b
5 changed files with 112 additions and 105 deletions

View File

@ -12,6 +12,7 @@ import eu.dnetlib.dhp.schema.oaf.Relation;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
@ -95,45 +96,57 @@ public class PrepareProjectResultsAssociation {
resproj_relation.createOrReplaceTempView("resproj_relation");
query =
"SELECT projectId, collect_set(resId) resultSet "
+ "FROM ("
+ " SELECT r1.target resId, r2.target projectId "
"SELECT resultId, collect_set(projectId) projectSet "
+ "FROM ( "
+ "SELECT r1.target resultId, r2.target projectId "
+ " FROM (SELECT source, target "
+ " FROM relation "
+ " WHERE datainfo.deletedbyinference = false "
+ getConstraintList(" relClass = '", allowedsemrel)
+ ") r1"
+ " ) r1"
+ " JOIN resproj_relation r2 "
+ " ON r1.source = r2.source "
+ " ) tmp "
+ "GROUP BY projectId ";
+ "GROUP BY resultId ";
// query =
// "SELECT projectId, collect_set(resId) resultSet "
// + "FROM ("
// + " SELECT r1.target resId, r2.target projectId "
// + " FROM (SELECT source, target "
// + " FROM relation "
// + " WHERE datainfo.deletedbyinference = false "
// + getConstraintList(" relClass = '", allowedsemrel)
// + ") r1"
// + " JOIN resproj_relation r2 "
// + " ON r1.source = r2.source "
// + " ) tmp "
// + "GROUP BY projectId ";
spark.sql(query)
.as(Encoders.bean(ProjectResultSet.class))
.toJSON()
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.text(potentialUpdatePath);
// .toJavaRDD()
// .map(r -> OBJECT_MAPPER.writeValueAsString(r))
// .saveAsTextFile(potentialUpdatePath, GzipCodec.class);
.as(Encoders.bean(ResultProjectSet.class))
// .toJSON()
// .write()
// .mode(SaveMode.Overwrite)
// .option("compression", "gzip")
// .text(potentialUpdatePath);
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(potentialUpdatePath, GzipCodec.class);
query =
"SELECT target projectId, collect_set(source) resultSet "
"SELECT source resultId, collect_set(target) projectSet "
+ "FROM resproj_relation "
+ "GROUP BY target";
+ "GROUP BY source";
spark.sql(query)
.as(Encoders.bean(ProjectResultSet.class))
.toJSON()
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.text(alreadyLinkedPath);
// .toJavaRDD()
// .map(r -> OBJECT_MAPPER.writeValueAsString(r))
// .saveAsTextFile(alreadyLinkedPath, GzipCodec.class);
.as(Encoders.bean(ResultProjectSet.class))
// .toJSON()
// .write()
// .mode(SaveMode.Overwrite)
// .option("compression", "gzip")
// .text(alreadyLinkedPath);
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(alreadyLinkedPath, GzipCodec.class);
}
}

View File

@ -1,25 +0,0 @@
package eu.dnetlib.dhp.projecttoresult;
import java.io.Serializable;
import java.util.ArrayList;
public class ProjectResultSet implements Serializable {
private String projectId;
private ArrayList<String> resultSet;
public String getProjectId() {
return projectId;
}
public void setProjectId(String projectId) {
this.projectId = projectId;
}
public ArrayList<String> getResultSet() {
return resultSet;
}
public void setResultSet(ArrayList<String> resultSet) {
this.resultSet = resultSet;
}
}

View File

@ -0,0 +1,25 @@
package eu.dnetlib.dhp.projecttoresult;
import java.io.Serializable;
import java.util.ArrayList;
public class ResultProjectSet implements Serializable {
private String resultId;
private ArrayList<String> projectSet;
public String getResultId() {
return resultId;
}
public void setResultId(String resultId) {
this.resultId = resultId;
}
public ArrayList<String> getProjectSet() {
return projectSet;
}
public void setProjectSet(ArrayList<String> project) {
this.projectSet = project;
}
}

View File

@ -9,6 +9,7 @@ import eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation;
import eu.dnetlib.dhp.schema.oaf.Relation;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
@ -44,9 +45,6 @@ public class SparkResultToProjectThroughSemRelJob3 {
final String alreadyLinkedPath = parser.get("alreadyLinkedPath");
log.info("alreadyLinkedPath {}: ", alreadyLinkedPath);
final Boolean writeUpdates = Boolean.valueOf(parser.get("writeUpdate"));
log.info("writeUpdate: {}", writeUpdates);
final Boolean saveGraph = Boolean.valueOf(parser.get("saveGraph"));
log.info("saveGraph: {}", saveGraph);
@ -60,12 +58,7 @@ public class SparkResultToProjectThroughSemRelJob3 {
removeOutputDir(spark, outputPath);
}
execPropagation(
spark,
outputPath,
alreadyLinkedPath,
potentialUpdatePath,
writeUpdates,
saveGraph);
spark, outputPath, alreadyLinkedPath, potentialUpdatePath, saveGraph);
});
}
@ -74,21 +67,12 @@ public class SparkResultToProjectThroughSemRelJob3 {
String outputPath,
String alreadyLinkedPath,
String potentialUpdatePath,
Boolean writeUpdate,
Boolean saveGraph) {
Dataset<ProjectResultSet> toaddrelations =
readAssocProjectResults(spark, potentialUpdatePath);
Dataset<ProjectResultSet> alreadyLinked = readAssocProjectResults(spark, alreadyLinkedPath);
Dataset<ResultProjectSet> toaddrelations =
readAssocResultProjects(spark, potentialUpdatePath);
Dataset<ResultProjectSet> alreadyLinked = readAssocResultProjects(spark, alreadyLinkedPath);
if (writeUpdate) {
toaddrelations
.toJSON()
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.text(outputPath + "/potential_updates");
}
if (saveGraph) {
getNewRelations(alreadyLinked, toaddrelations)
.toJSON()
@ -100,56 +84,66 @@ public class SparkResultToProjectThroughSemRelJob3 {
}
private static Dataset<Relation> getNewRelations(
Dataset<ProjectResultSet> alreadyLinked, Dataset<ProjectResultSet> toaddrelations) {
Dataset<ResultProjectSet> alreadyLinked, Dataset<ResultProjectSet> toaddrelations) {
return toaddrelations
.joinWith(
alreadyLinked,
toaddrelations.col("projectId").equalTo(alreadyLinked.col("projectId")),
"left")
toaddrelations.col("resultId").equalTo(alreadyLinked.col("resultId")),
"left_outer")
.flatMap(
value -> {
List<Relation> new_relations = new ArrayList<>();
ProjectResultSet potential_update = value._1();
ProjectResultSet already_linked = value._2();
String projId = already_linked.getProjectId();
potential_update.getResultSet().stream()
ResultProjectSet potential_update = value._1();
Optional<ResultProjectSet> already_linked =
Optional.ofNullable(value._2());
if (already_linked.isPresent()) {
already_linked.get().getProjectSet().stream()
.forEach(
(p -> {
if (potential_update
.getProjectSet()
.contains(p)) {
potential_update.getProjectSet().remove(p);
}
}));
}
String resId = potential_update.getResultId();
potential_update.getProjectSet().stream()
.forEach(
rId -> {
if (!already_linked.getResultSet().contains(rId)) {
new_relations.add(
getRelation(
rId,
projId,
RELATION_RESULT_PROJECT_REL_CLASS,
RELATION_RESULTPROJECT_REL_TYPE,
RELATION_RESULTPROJECT_SUBREL_TYPE,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
new_relations.add(
getRelation(
projId,
rId,
RELATION_PROJECT_RESULT_REL_CLASS,
RELATION_RESULTPROJECT_REL_TYPE,
RELATION_RESULTPROJECT_SUBREL_TYPE,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
}
pId -> {
new_relations.add(
getRelation(
resId,
pId,
RELATION_RESULT_PROJECT_REL_CLASS,
RELATION_RESULTPROJECT_REL_TYPE,
RELATION_RESULTPROJECT_SUBREL_TYPE,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
new_relations.add(
getRelation(
pId,
resId,
RELATION_PROJECT_RESULT_REL_CLASS,
RELATION_RESULTPROJECT_REL_TYPE,
RELATION_RESULTPROJECT_SUBREL_TYPE,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
});
return new_relations.iterator();
},
Encoders.bean(Relation.class));
}
private static Dataset<ProjectResultSet> readAssocProjectResults(
private static Dataset<ResultProjectSet> readAssocResultProjects(
SparkSession spark, String potentialUpdatePath) {
return spark.read()
.textFile(potentialUpdatePath)
.map(
value -> OBJECT_MAPPER.readValue(value, ProjectResultSet.class),
Encoders.bean(ProjectResultSet.class));
value -> OBJECT_MAPPER.readValue(value, ResultProjectSet.class),
Encoders.bean(ResultProjectSet.class));
}
}