From c0bebb7c359904364e2c27aad703443ced3a2370 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 14 Apr 2020 15:31:26 +0200 Subject: [PATCH] code to compute the prepared information used in the actual propagation step. This step will produce who files: one with potential updates (association between projects and a list of results), the other already linked entities (association between projects and the list of results already linked to them) --- .../PrepareProjectResultsAssociation.java | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java index 5daaceeea6..39adf92a43 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java @@ -9,8 +9,6 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; -import org.apache.spark.SparkContext; -import org.apache.spark.TaskResultLost; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.*; import org.slf4j.Logger; @@ -18,20 +16,19 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.List; -import java.util.Optional; import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.PropagationConstant.getConstraintList; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; -public class PrepareResultProjectAssociation { +public class PrepareProjectResultsAssociation { private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) throws Exception{ - String jsonConfiguration = IOUtils.toString(PrepareDatasourceCountryAssociation.class - .getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_prepareprojecttoresult_parameters.json")); + String jsonConfiguration = IOUtils.toString(PrepareProjectResultsAssociation.class + .getResourceAsStream("/eu/dnetlib/dhp/projecttoresult/input_prepareprojecttoresult_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser( jsonConfiguration); @@ -58,17 +55,18 @@ public class PrepareResultProjectAssociation { runWithSparkHiveSession(conf, isSparkSessionManaged, spark -> { - createOutputDirs(potentialUpdatePath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - prepareResultProjAssoc(spark, inputPath, potentialUpdatePath, alreadyLinkedPath, allowedsemrel); + removeOutputDir(spark, potentialUpdatePath); + removeOutputDir(spark, alreadyLinkedPath); + prepareResultProjProjectResults(spark, inputPath, potentialUpdatePath, alreadyLinkedPath, allowedsemrel); }); } - private static void prepareResultProjAssoc(SparkSession spark, String inputPath, String potentialUpdatePath, - String alreadyLinkedPath, List allowedsemrel) { + private static void prepareResultProjProjectResults(SparkSession spark, String inputPath, String potentialUpdatePath, + String alreadyLinkedPath, List allowedsemrel) { JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); + Dataset relation = spark.createDataset(sc.textFile(inputPath ) + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); relation.createOrReplaceTempView("relation"); @@ -80,9 +78,9 @@ public class PrepareResultProjectAssociation { Dataset resproj_relation = spark.sql(query); resproj_relation.createOrReplaceTempView("resproj_relation"); - query ="SELECT projectId, collect_set(r1target) resultSet " + + query ="SELECT projectId, collect_set(resId) resultSet " + "FROM (" + - " SELECT r1.source as source, r1.target as r1target, r2.target as proj " + + " SELECT r1.target resId, r2.target projectId " + " FROM (SELECT source, target " + " FROM relation " + " WHERE datainfo.deletedbyinference = false " + @@ -90,7 +88,7 @@ public class PrepareResultProjectAssociation { " JOIN resproj_relation r2 " + " ON r1.source = r2.source " + " ) tmp " + - "GROUP BY proj "; + "GROUP BY projectId "; spark.sql(query).as(Encoders.bean(ProjectResultSet.class)) .toJavaRDD() @@ -98,7 +96,7 @@ public class PrepareResultProjectAssociation { .saveAsTextFile(potentialUpdatePath, GzipCodec.class); - query = "SELECT target, collect_set(source) result_list " + + query = "SELECT target projectId, collect_set(source) resultSet " + "FROM resproj_relation " + "GROUP BY target";