From cadab9b81dd0cd1f6eee02d621e593e50a4f1f4f Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 14 Apr 2020 16:46:07 +0200 Subject: [PATCH] new implementation for result to project propagation. Use the prepared info in propagation --- ...SparkResultToProjectThroughSemRelJob3.java | 218 ++++++++++-------- 1 file changed, 120 insertions(+), 98 deletions(-) diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob3.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob3.java index f85eb2bed7..01c6e90896 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob3.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob3.java @@ -1,143 +1,160 @@ package eu.dnetlib.dhp.projecttoresult; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.countrypropagation.DatasourceCountry; +import eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation; +import eu.dnetlib.dhp.countrypropagation.ResultCountrySet; import eu.dnetlib.dhp.schema.oaf.Relation; +import org.apache.arrow.flatbuf.Bool; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.FlatMapFunction2; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.omg.CORBA.OBJ_ADAPTER; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; public class SparkResultToProjectThroughSemRelJob3 { + + private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils.toString(SparkResultToProjectThroughSemRelJob3.class + .getResourceAsStream("/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils.toString(SparkResultToProjectThroughSemRelJob3.class - .getResourceAsStream("/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json"))); + jsonConfiguration); + parser.parseArgument(args); + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); + + final String potentialUpdatePath = parser.get("potentialUpdatePath"); + log.info("potentialUpdatePath {}: ", potentialUpdatePath); + + final String alreadyLinkedPath = parser.get("alreadyLinkedPath"); + log.info("alreadyLinkedPath {}: ", alreadyLinkedPath); + + final Boolean writeUpdates = Boolean.valueOf(parser.get("writeUpdate")); + log.info("writeUpdate: {}", writeUpdates); + + final Boolean saveGraph = Boolean.valueOf(parser.get("saveGraph")); + log.info("saveGraph: {}", saveGraph); SparkConf conf = new SparkConf(); - conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - final SparkSession spark = SparkSession - .builder() - .appName(SparkResultToProjectThroughSemRelJob3.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .enableHiveSupport() - .getOrCreate(); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - final String outputPath = "/tmp/provision/propagation/projecttoresult"; - boolean writeUpdates = "true".equals(parser.get("writeUpdate")); - boolean saveGraph = "true".equals(parser.get("saveGraph")); + runWithSparkSession(conf, isSparkSessionManaged, + spark -> { + //createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); + if(isTest(parser)) { + removeOutputDir(spark, outputPath); + } + execPropagation(spark, inputPath, outputPath, alreadyLinkedPath, potentialUpdatePath, writeUpdates, saveGraph); + }); + } - final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); - createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); + private static void execPropagation(SparkSession spark, String inputPath, String outputPath, String alreadyLinkedPath, String potentialUpdatePath, + Boolean writeUpdate, Boolean saveGraph){ - Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); - - relation.createOrReplaceTempView("relation"); + Dataset toaddrelations = readAssocProjectResults(spark, potentialUpdatePath); + Dataset alreadyLinked = readAssocProjectResults(spark, alreadyLinkedPath); - String query = "Select source, target " + - "from relation " + - "where datainfo.deletedbyinference = false and relClass = '" + RELATION_RESULT_PROJECT_REL_CLASS + "'"; + if(writeUpdate){ + toaddrelations + .toJSON() + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .text(outputPath +"/potential_updates"); + //writeUpdates(toaddrelations.toJavaRDD(), outputPath + "/potential_updates"); + } + if (saveGraph){ + getNewRelations(alreadyLinked, toaddrelations) + .toJSON() + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .text(outputPath); +// JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); +// sc.textFile(inputPath) +// .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)) +// .union(getNewRelations(alreadyLinked, toaddrelations) +// .toJavaRDD()) +// .map(r -> OBJECT_MAPPER.writeValueAsString(r)) +// .saveAsTextFile(outputPath , GzipCodec.class); + } - Dataset resproj_relation = spark.sql(query); - - query = "Select source, target " + - "from relation " + - "where datainfo.deletedbyinference = false " + getConstraintList(" relClass = '", allowedsemrel ); + } - Dataset resres_relation = spark.sql(query); - resres_relation.createOrReplaceTempView("resres_relation"); - resproj_relation.createOrReplaceTempView("resproj_relation"); + private static Dataset getNewRelations(Dataset alreadyLinked, + Dataset toaddrelations){ - query ="SELECT proj, collect_set(r1target) result_set " + - "FROM (" + - " SELECT r1.source as source, r1.target as r1target, r2.target as proj " + - " FROM resres_relation r1 " + - " JOIN resproj_relation r2 " + - " ON r1.source = r2.source " + - " ) tmp " + - "GROUP BY proj "; - Dataset toaddrelations = spark.sql(query); - - query = "select target, collect_set(source) result_list from " + - "resproj_relation " + - "group by target"; - - Dataset project_resultlist = spark.sql(query); - - //if (writeUpdaes){ - toaddrelations.toJavaRDD().map(r->new ObjectMapper().writeValueAsString(r)) - .saveAsTextFile(outputPath + "/toupdaterelations"); - //} - - if(saveGraph){ - JavaRDD new_relations = toaddrelations.toJavaRDD().mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) - .leftOuterJoin(project_resultlist.toJavaRDD().mapToPair(pr -> new Tuple2<>(pr.getString(0), pr.getList(1)))) - .flatMap(c -> { - List toAddRel = new ArrayList<>(); - toAddRel.addAll(c._2()._1()); - if (c._2()._2().isPresent()) { - List originalRels = c._2()._2().get(); - for (Object o : originalRels) { - if (toAddRel.contains(o)) { - toAddRel.remove(o); + return toaddrelations + .joinWith(alreadyLinked, toaddrelations.col("projectId").equalTo(alreadyLinked.col("projectId")), "left") + .flatMap(value -> { + List new_relations = new ArrayList<>(); + ProjectResultSet potential_update = value._1(); + ProjectResultSet already_linked = value._2(); + String projId = already_linked.getProjectId(); + potential_update + .getResultSet() + .stream() + .forEach(rId -> { + if (!already_linked.getResultSet().contains(rId)){ + new_relations.add(getRelation(rId, projId, RELATION_RESULT_PROJECT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE, + RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); + new_relations.add(getRelation(projId, rId, RELATION_PROJECT_RESULT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE, + RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); } - } - } - List relationList = new ArrayList<>(); - String projId = c._1(); - for (Object r : toAddRel) { - String rId = (String) r; - relationList.add(getRelation(rId, projId, RELATION_RESULT_PROJECT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE, - RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); - relationList.add(getRelation(projId, rId, RELATION_PROJECT_RESULT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE, - RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); + }); + return new_relations.iterator(); } - if(relationList.size()==0){ - return null; - } - return relationList.iterator(); - }).filter(r -> !(r==null)); + ,Encoders.bean(Relation.class)); - new_relations.map(r-> new ObjectMapper().writeValueAsString(r)) - .saveAsTextFile(outputPath + "/new_relations" ); - sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)) - .union(new_relations) - .map(r -> new ObjectMapper().writeValueAsString(r)) - .saveAsTextFile(outputPath + "/relation"); } - + private static void writeUpdates(JavaRDD potentialUpdates, String outputPath){ + potentialUpdates.map(u -> OBJECT_MAPPER.writeValueAsString(u)) + .saveAsTextFile(outputPath, GzipCodec.class); + } //JavaPairRDD result_result = getResultResultSemRel(allowedsemrel, relations); // JavaPairRDD result_project = relations @@ -211,9 +228,14 @@ public class SparkResultToProjectThroughSemRelJob3 { // newRels.union(relations).map(p -> new ObjectMapper().writeValueAsString(p)) // .saveAsTextFile(outputPath + "/relation"); + //} + + + private static Dataset readAssocProjectResults(SparkSession spark, String potentialUpdatePath) { + return spark + .read() + .textFile(potentialUpdatePath) + .map(value -> OBJECT_MAPPER.readValue(value, ProjectResultSet.class), Encoders.bean(ProjectResultSet.class)); } - - - }