From 964b22d41806228542f9497d3dc4a2b9a0e2faf1 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 15 Apr 2020 12:32:01 +0200 Subject: [PATCH] modified the writing of the new relations. before: read old rels, add the new ones to them, write all the relations in new location. Now: first step of the wf copies the old relation i new location. If new relations are found, they are saved in the new location in append mode. --- ...SparkResultToProjectThroughSemRelJob2.java | 156 +++++++++--------- 1 file changed, 78 insertions(+), 78 deletions(-) diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob2.java index 563fcb3bcb..9dbbf140bf 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob2.java @@ -3,10 +3,9 @@ package eu.dnetlib.dhp.projecttoresult; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.TypedRow; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Relation; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -17,59 +16,65 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import scala.Tuple2; -import java.io.File; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; import static eu.dnetlib.dhp.PropagationConstant.*; -import static eu.dnetlib.dhp.PropagationConstant.toPair; -public class SparkResultToProjectThroughSemRelJob { +public class SparkResultToProjectThroughSemRelJob2 { public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToProjectThroughSemRelJob.class.getResourceAsStream("/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json"))); + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString(SparkResultToProjectThroughSemRelJob2.class + .getResourceAsStream("/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json"))); parser.parseArgument(args); + + parser.getObjectMap().keySet().stream().forEach(k -> System.out.println(k + " = " + parser.getObjectMap().get(k))); SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); final SparkSession spark = SparkSession .builder() - .appName(SparkResultToProjectThroughSemRelJob.class.getSimpleName()) + .appName(SparkResultToProjectThroughSemRelJob2.class.getSimpleName()) .master(parser.get("master")) .config(conf) .enableHiveSupport() .getOrCreate(); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final String inputPath = parser.get("sourcePath"); final String outputPath = "/tmp/provision/propagation/projecttoresult"; + boolean writeUpdates = "true".equals(parser.get("writeUpdate")); + boolean saveGraph = "true".equals(parser.get("saveGraph")); final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - JavaRDD all_relations = sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)); + Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); + + relation.createOrReplaceTempView("relation"); - JavaRDD relations = all_relations.filter(r -> !r.getDataInfo().getDeletedbyinference()).cache(); + String query = "Select source, target " + + "from relation " + + "where datainfo.deletedbyinference = false and relClass = '" + RELATION_RESULT_PROJECT_REL_CLASS + "'"; - JavaRDD result_result = relations - .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())); - - org.apache.spark.sql.Dataset resres_relation = spark.createDataset(result_result.rdd(), - Encoders.bean(Relation.class)); - - JavaRDD result_project = relations - .filter(r -> RELATION_RESULT_PROJECT_REL_CLASS.equals(r.getRelClass()) - && RELATION_RESULTPROJECT_REL_TYPE.equals(r.getRelType())); - - org.apache.spark.sql.Dataset resproj_relation = spark.createDataset(result_project.rdd(), - Encoders.bean(Relation.class)); + Dataset resproj_relation = spark.sql(query); + + query = "Select source, target " + + "from relation " + + "where datainfo.deletedbyinference = false " + getConstraintList(" relClass = '", allowedsemrel ); + Dataset resres_relation = spark.sql(query); resres_relation.createOrReplaceTempView("resres_relation"); resproj_relation.createOrReplaceTempView("resproj_relation"); - String query ="SELECT proj, collect_set(r1target) result_set " + + query ="SELECT proj, collect_set(r1target) result_set " + "FROM (" + - " SELECT r1.source as sourcer, r1.relclass as r1rel, r1.target as r1target, r2.target as proj " + + " SELECT r1.source as source, r1.target as r1target, r2.target as proj " + " FROM resres_relation r1 " + " JOIN resproj_relation r2 " + " ON r1.source = r2.source " + @@ -77,68 +82,63 @@ public class SparkResultToProjectThroughSemRelJob { "GROUP BY proj "; Dataset toaddrelations = spark.sql(query); + + query = "select target, collect_set(source) result_list from " + + "resproj_relation " + + "group by target"; + Dataset project_resultlist = spark.sql(query); - JavaPairRDD project_resultlist = relations - .filter(r -> RELATION_PROJECT_RESULT_REL_CLASS.equals(r.getRelClass())) - .map(r -> { - TypedRow tp = new TypedRow(); - tp.setSourceId(r.getSource()); - tp.add(r.getTarget()); - return tp; - }).mapToPair(toPair()) - .reduceByKey((a, b) -> { - if (a == null) { - return b; - } - if (b == null) { - return a; - } + //if (writeUpdaes){ + toaddrelations.toJavaRDD().map(r->new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath + "/toupdaterelations"); + //} - a.addAll(b.getAccumulator()); - return a; - }).cache(); - - - JavaRDD new_relations = toaddrelations.toJavaRDD().mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) - .leftOuterJoin(project_resultlist) - .flatMap(c -> { - List toAddRel = new ArrayList<>(); - toAddRel.addAll(c._2()._1()); - if (c._2()._2().isPresent()) { - Set originalRels = c._2()._2().get().getAccumulator(); - for (String o : originalRels) { - if (toAddRel.contains(o)) { - toAddRel.remove(o); + if(saveGraph){ + JavaRDD new_relations = toaddrelations.toJavaRDD().mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) + .leftOuterJoin(project_resultlist.toJavaRDD().mapToPair(pr -> new Tuple2<>(pr.getString(0), pr.getList(1)))) + .flatMap(c -> { + List toAddRel = new ArrayList<>(); + toAddRel.addAll(c._2()._1()); + if (c._2()._2().isPresent()) { + List originalRels = c._2()._2().get(); + for (Object o : originalRels) { + if (toAddRel.contains(o)) { + toAddRel.remove(o); + } } } - } - List relationList = new ArrayList<>(); - String projId = c._1(); - for (Object r : toAddRel) { - String rId = (String) r; - relationList.add(getRelation(rId, projId, RELATION_RESULT_PROJECT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE, - RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); - relationList.add(getRelation(projId, rId, RELATION_PROJECT_RESULT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE, - RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); + List relationList = new ArrayList<>(); + String projId = c._1(); + for (Object r : toAddRel) { + String rId = (String) r; + relationList.add(getRelation(rId, projId, RELATION_RESULT_PROJECT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE, + RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); + relationList.add(getRelation(projId, rId, RELATION_PROJECT_RESULT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE, + RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); - } - return relationList.iterator(); - }).cache(); + } + if(relationList.size()==0){ + return null; + } + return relationList.iterator(); + }).filter(r -> !(r==null)); - toaddrelations.toJavaRDD().map(r->new ObjectMapper().writeValueAsString(r)) - .saveAsTextFile(outputPath + "/toupdaterelations"); - new_relations.map(r-> new ObjectMapper().writeValueAsString(r)) - .saveAsTextFile(outputPath + "/new_relations" ); + new_relations.map(r-> new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath + "/new_relations" ); - all_relations.union(new_relations) - .map(r -> new ObjectMapper().writeValueAsString(r)) - .saveAsTextFile(outputPath + "/relation"); + sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)) + .union(new_relations) + .map(r -> new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath + "/relation"); + + } //JavaPairRDD result_result = getResultResultSemRel(allowedsemrel, relations);