diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java index f94d67734a..2e9255ed56 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java @@ -1,37 +1,36 @@ package eu.dnetlib.dhp.projecttoresult; +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.PropagationConstant.getConstraintList; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation; import eu.dnetlib.dhp.schema.oaf.Relation; +import java.util.Arrays; +import java.util.List; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.List; - -import static eu.dnetlib.dhp.PropagationConstant.*; -import static eu.dnetlib.dhp.PropagationConstant.getConstraintList; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; - public class PrepareProjectResultsAssociation { - private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class); + private static final Logger log = + LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception{ + public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils.toString(PrepareProjectResultsAssociation.class - .getResourceAsStream("/eu/dnetlib/dhp/projecttoresult/input_prepareprojecttoresult_parameters.json")); + String jsonConfiguration = + IOUtils.toString( + PrepareProjectResultsAssociation.class.getResourceAsStream( + "/eu/dnetlib/dhp/projecttoresult/input_prepareprojecttoresult_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - jsonConfiguration); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -53,69 +52,88 @@ public class PrepareProjectResultsAssociation { SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - runWithSparkHiveSession(conf, isSparkSessionManaged, + runWithSparkHiveSession( + conf, + isSparkSessionManaged, spark -> { -// removeOutputDir(spark, potentialUpdatePath); -// removeOutputDir(spark, alreadyLinkedPath); - prepareResultProjProjectResults(spark, inputPath, potentialUpdatePath, alreadyLinkedPath, allowedsemrel); - + // removeOutputDir(spark, potentialUpdatePath); + // removeOutputDir(spark, alreadyLinkedPath); + prepareResultProjProjectResults( + spark, + inputPath, + potentialUpdatePath, + alreadyLinkedPath, + allowedsemrel); }); } - private static void prepareResultProjProjectResults(SparkSession spark, String inputPath, String potentialUpdatePath, - String alreadyLinkedPath, List allowedsemrel) { + private static void prepareResultProjProjectResults( + SparkSession spark, + String inputPath, + String potentialUpdatePath, + String alreadyLinkedPath, + List allowedsemrel) { JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - Dataset relation = spark.createDataset(sc.textFile(inputPath ) - .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); + Dataset relation = + spark.createDataset( + sc.textFile(inputPath) + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)) + .rdd(), + Encoders.bean(Relation.class)); relation.createOrReplaceTempView("relation"); - String query = "SELECT source, target " + - " FROM relation " + - " WHERE datainfo.deletedbyinference = false " + - " AND relClass = '" + RELATION_RESULT_PROJECT_REL_CLASS + "'"; + String query = + "SELECT source, target " + + " FROM relation " + + " WHERE datainfo.deletedbyinference = false " + + " AND relClass = '" + + RELATION_RESULT_PROJECT_REL_CLASS + + "'"; Dataset resproj_relation = spark.sql(query); resproj_relation.createOrReplaceTempView("resproj_relation"); - query ="SELECT projectId, collect_set(resId) resultSet " + - "FROM (" + - " SELECT r1.target resId, r2.target projectId " + - " FROM (SELECT source, target " + - " FROM relation " + - " WHERE datainfo.deletedbyinference = false " + - getConstraintList(" relClass = '", allowedsemrel ) + ") r1" + - " JOIN resproj_relation r2 " + - " ON r1.source = r2.source " + - " ) tmp " + - "GROUP BY projectId "; - - spark.sql(query).as(Encoders.bean(ProjectResultSet.class)) - .toJSON() - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .text(potentialUpdatePath); -// .toJavaRDD() -// .map(r -> OBJECT_MAPPER.writeValueAsString(r)) -// .saveAsTextFile(potentialUpdatePath, GzipCodec.class); - - - query = "SELECT target projectId, collect_set(source) resultSet " + - "FROM resproj_relation " + - "GROUP BY target"; + query = + "SELECT projectId, collect_set(resId) resultSet " + + "FROM (" + + " SELECT r1.target resId, r2.target projectId " + + " FROM (SELECT source, target " + + " FROM relation " + + " WHERE datainfo.deletedbyinference = false " + + getConstraintList(" relClass = '", allowedsemrel) + + ") r1" + + " JOIN resproj_relation r2 " + + " ON r1.source = r2.source " + + " ) tmp " + + "GROUP BY projectId "; spark.sql(query) .as(Encoders.bean(ProjectResultSet.class)) .toJSON() .write() .mode(SaveMode.Overwrite) - .option("compression","gzip") - .text(alreadyLinkedPath); -// .toJavaRDD() -// .map(r -> OBJECT_MAPPER.writeValueAsString(r)) -// .saveAsTextFile(alreadyLinkedPath, GzipCodec.class); + .option("compression", "gzip") + .text(potentialUpdatePath); + // .toJavaRDD() + // .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + // .saveAsTextFile(potentialUpdatePath, GzipCodec.class); + query = + "SELECT target projectId, collect_set(source) resultSet " + + "FROM resproj_relation " + + "GROUP BY target"; + + spark.sql(query) + .as(Encoders.bean(ProjectResultSet.class)) + .toJSON() + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .text(alreadyLinkedPath); + // .toJavaRDD() + // .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + // .saveAsTextFile(alreadyLinkedPath, GzipCodec.class); } } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob3.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob3.java index 2125f351fb..45d6976512 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob3.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob3.java @@ -1,31 +1,34 @@ package eu.dnetlib.dhp.projecttoresult; +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation; import eu.dnetlib.dhp.schema.oaf.Relation; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import static eu.dnetlib.dhp.PropagationConstant.*; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; public class SparkResultToProjectThroughSemRelJob3 { - private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class); + private static final Logger log = + LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils.toString(SparkResultToProjectThroughSemRelJob3.class - .getResourceAsStream("/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json")); + String jsonConfiguration = + IOUtils.toString( + SparkResultToProjectThroughSemRelJob3.class.getResourceAsStream( + "/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - jsonConfiguration); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -49,84 +52,104 @@ public class SparkResultToProjectThroughSemRelJob3 { SparkConf conf = new SparkConf(); - - runWithSparkSession(conf, isSparkSessionManaged, + runWithSparkSession( + conf, + isSparkSessionManaged, spark -> { - if(isTest(parser)) { + if (isTest(parser)) { removeOutputDir(spark, outputPath); } - execPropagation(spark, outputPath, alreadyLinkedPath, potentialUpdatePath, writeUpdates, saveGraph); + execPropagation( + spark, + outputPath, + alreadyLinkedPath, + potentialUpdatePath, + writeUpdates, + saveGraph); }); } + private static void execPropagation( + SparkSession spark, + String outputPath, + String alreadyLinkedPath, + String potentialUpdatePath, + Boolean writeUpdate, + Boolean saveGraph) { - private static void execPropagation(SparkSession spark, String outputPath, String alreadyLinkedPath, String potentialUpdatePath, - Boolean writeUpdate, Boolean saveGraph){ + Dataset toaddrelations = + readAssocProjectResults(spark, potentialUpdatePath); + Dataset alreadyLinked = readAssocProjectResults(spark, alreadyLinkedPath); - Dataset toaddrelations = readAssocProjectResults(spark, potentialUpdatePath); - Dataset alreadyLinked = readAssocProjectResults(spark, alreadyLinkedPath); - - if(writeUpdate){ + if (writeUpdate) { toaddrelations .toJSON() .write() .mode(SaveMode.Overwrite) - .option("compression","gzip") - .text(outputPath +"/potential_updates"); + .option("compression", "gzip") + .text(outputPath + "/potential_updates"); } - if (saveGraph){ + if (saveGraph) { getNewRelations(alreadyLinked, toaddrelations) .toJSON() .write() .mode(SaveMode.Append) .option("compression", "gzip") .text(outputPath); - } - } - private static Dataset getNewRelations(Dataset alreadyLinked, - Dataset toaddrelations){ - + private static Dataset getNewRelations( + Dataset alreadyLinked, Dataset toaddrelations) { return toaddrelations - .joinWith(alreadyLinked, toaddrelations.col("projectId").equalTo(alreadyLinked.col("projectId")), "left") - .flatMap(value -> { - List new_relations = new ArrayList<>(); - ProjectResultSet potential_update = value._1(); - ProjectResultSet already_linked = value._2(); - String projId = already_linked.getProjectId(); - potential_update - .getResultSet() - .stream() - .forEach(rId -> { - if (!already_linked.getResultSet().contains(rId)){ - new_relations.add(getRelation(rId, projId, RELATION_RESULT_PROJECT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE, - RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); - new_relations.add(getRelation(projId, rId, RELATION_PROJECT_RESULT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE, - RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); - } - }); - return new_relations.iterator(); - - } - ,Encoders.bean(Relation.class)); - - - - - } - - private static Dataset readAssocProjectResults(SparkSession spark, String potentialUpdatePath) { - return spark - .read() - .textFile(potentialUpdatePath) - .map(value -> OBJECT_MAPPER.readValue(value, ProjectResultSet.class), Encoders.bean(ProjectResultSet.class)); + .joinWith( + alreadyLinked, + toaddrelations.col("projectId").equalTo(alreadyLinked.col("projectId")), + "left") + .flatMap( + value -> { + List new_relations = new ArrayList<>(); + ProjectResultSet potential_update = value._1(); + ProjectResultSet already_linked = value._2(); + String projId = already_linked.getProjectId(); + potential_update.getResultSet().stream() + .forEach( + rId -> { + if (!already_linked.getResultSet().contains(rId)) { + new_relations.add( + getRelation( + rId, + projId, + RELATION_RESULT_PROJECT_REL_CLASS, + RELATION_RESULTPROJECT_REL_TYPE, + RELATION_RESULTPROJECT_SUBREL_TYPE, + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); + new_relations.add( + getRelation( + projId, + rId, + RELATION_PROJECT_RESULT_REL_CLASS, + RELATION_RESULTPROJECT_REL_TYPE, + RELATION_RESULTPROJECT_SUBREL_TYPE, + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); + } + }); + return new_relations.iterator(); + }, + Encoders.bean(Relation.class)); } + private static Dataset readAssocProjectResults( + SparkSession spark, String potentialUpdatePath) { + return spark.read() + .textFile(potentialUpdatePath) + .map( + value -> OBJECT_MAPPER.readValue(value, ProjectResultSet.class), + Encoders.bean(ProjectResultSet.class)); + } }