1
0
Fork 0

code to compute the prepared information used in the actual propagation step. This step will produce who files: one with potential updates (association between projects and a list of results), the other already linked entities (association between projects and the list of results already linked to them)

This commit is contained in:
Miriam Baglioni 2020-04-14 15:31:26 +02:00
parent f47ee5b78e
commit c0bebb7c35
1 changed files with 14 additions and 16 deletions

View File

@ -9,8 +9,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskResultLost;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -18,20 +16,19 @@ import org.slf4j.LoggerFactory;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional;
import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.PropagationConstant.getConstraintList; import static eu.dnetlib.dhp.PropagationConstant.getConstraintList;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
public class PrepareResultProjectAssociation { public class PrepareProjectResultsAssociation {
private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class); private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception{ public static void main(String[] args) throws Exception{
String jsonConfiguration = IOUtils.toString(PrepareDatasourceCountryAssociation.class String jsonConfiguration = IOUtils.toString(PrepareProjectResultsAssociation.class
.getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_prepareprojecttoresult_parameters.json")); .getResourceAsStream("/eu/dnetlib/dhp/projecttoresult/input_prepareprojecttoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
jsonConfiguration); jsonConfiguration);
@ -58,17 +55,18 @@ public class PrepareResultProjectAssociation {
runWithSparkHiveSession(conf, isSparkSessionManaged, runWithSparkHiveSession(conf, isSparkSessionManaged,
spark -> { spark -> {
createOutputDirs(potentialUpdatePath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); removeOutputDir(spark, potentialUpdatePath);
prepareResultProjAssoc(spark, inputPath, potentialUpdatePath, alreadyLinkedPath, allowedsemrel); removeOutputDir(spark, alreadyLinkedPath);
prepareResultProjProjectResults(spark, inputPath, potentialUpdatePath, alreadyLinkedPath, allowedsemrel);
}); });
} }
private static void prepareResultProjAssoc(SparkSession spark, String inputPath, String potentialUpdatePath, private static void prepareResultProjProjectResults(SparkSession spark, String inputPath, String potentialUpdatePath,
String alreadyLinkedPath, List<String> allowedsemrel) { String alreadyLinkedPath, List<String> allowedsemrel) {
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
Dataset<Relation> relation = spark.createDataset(sc.textFile(inputPath + "/relation") Dataset<Relation> relation = spark.createDataset(sc.textFile(inputPath )
.map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class));
relation.createOrReplaceTempView("relation"); relation.createOrReplaceTempView("relation");
@ -80,9 +78,9 @@ public class PrepareResultProjectAssociation {
Dataset<Row> resproj_relation = spark.sql(query); Dataset<Row> resproj_relation = spark.sql(query);
resproj_relation.createOrReplaceTempView("resproj_relation"); resproj_relation.createOrReplaceTempView("resproj_relation");
query ="SELECT projectId, collect_set(r1target) resultSet " + query ="SELECT projectId, collect_set(resId) resultSet " +
"FROM (" + "FROM (" +
" SELECT r1.source as source, r1.target as r1target, r2.target as proj " + " SELECT r1.target resId, r2.target projectId " +
" FROM (SELECT source, target " + " FROM (SELECT source, target " +
" FROM relation " + " FROM relation " +
" WHERE datainfo.deletedbyinference = false " + " WHERE datainfo.deletedbyinference = false " +
@ -90,7 +88,7 @@ public class PrepareResultProjectAssociation {
" JOIN resproj_relation r2 " + " JOIN resproj_relation r2 " +
" ON r1.source = r2.source " + " ON r1.source = r2.source " +
" ) tmp " + " ) tmp " +
"GROUP BY proj "; "GROUP BY projectId ";
spark.sql(query).as(Encoders.bean(ProjectResultSet.class)) spark.sql(query).as(Encoders.bean(ProjectResultSet.class))
.toJavaRDD() .toJavaRDD()
@ -98,7 +96,7 @@ public class PrepareResultProjectAssociation {
.saveAsTextFile(potentialUpdatePath, GzipCodec.class); .saveAsTextFile(potentialUpdatePath, GzipCodec.class);
query = "SELECT target, collect_set(source) result_list " + query = "SELECT target projectId, collect_set(source) resultSet " +
"FROM resproj_relation " + "FROM resproj_relation " +
"GROUP BY target"; "GROUP BY target";