new implementation for result to project propagation. Prepare some info to be used in propagation

This commit is contained in:
Miriam Baglioni 2020-04-11 16:26:23 +02:00
parent 90469789b9
commit 7783b09c5b
7 changed files with 226 additions and 69 deletions

View File

@ -0,0 +1,113 @@
package eu.dnetlib.dhp.projecttoresult;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation;
import eu.dnetlib.dhp.schema.oaf.Relation;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskResultLost;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.PropagationConstant.getConstraintList;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
public class PrepareResultProjectAssociation {
private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception{
String jsonConfiguration = IOUtils.toString(PrepareDatasourceCountryAssociation.class
.getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_prepareprojecttoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String potentialUpdatePath = parser.get("potentialUpdatePath");
log.info("potentialUpdatePath {}: ", potentialUpdatePath);
String alreadyLinkedPath = parser.get("alreadyLinkedPath");
log.info("alreadyLinkedPath: {} ", alreadyLinkedPath);
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel));
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
runWithSparkHiveSession(conf, isSparkSessionManaged,
spark -> {
createOutputDirs(potentialUpdatePath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
prepareResultProjAssoc(spark, inputPath, potentialUpdatePath, alreadyLinkedPath, allowedsemrel);
});
}
private static void prepareResultProjAssoc(SparkSession spark, String inputPath, String potentialUpdatePath,
String alreadyLinkedPath, List<String> allowedsemrel) {
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
Dataset<Relation> relation = spark.createDataset(sc.textFile(inputPath + "/relation")
.map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class));
relation.createOrReplaceTempView("relation");
String query = "SELECT source, target " +
" FROM relation " +
" WHERE datainfo.deletedbyinference = false " +
" AND relClass = '" + RELATION_RESULT_PROJECT_REL_CLASS + "'";
Dataset<Row> resproj_relation = spark.sql(query);
resproj_relation.createOrReplaceTempView("resproj_relation");
query ="SELECT projectId, collect_set(r1target) resultSet " +
"FROM (" +
" SELECT r1.source as source, r1.target as r1target, r2.target as proj " +
" FROM (SELECT source, target " +
" FROM relation " +
" WHERE datainfo.deletedbyinference = false " +
getConstraintList(" relClass = '", allowedsemrel ) + ") r1" +
" JOIN resproj_relation r2 " +
" ON r1.source = r2.source " +
" ) tmp " +
"GROUP BY proj ";
spark.sql(query).as(Encoders.bean(ProjectResultSet.class))
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(potentialUpdatePath, GzipCodec.class);
query = "SELECT target, collect_set(source) result_list " +
"FROM resproj_relation " +
"GROUP BY target";
spark.sql(query)
.as(Encoders.bean(ProjectResultSet.class))
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(alreadyLinkedPath, GzipCodec.class);
}
}

View File

@ -1,69 +0,0 @@
package eu.dnetlib.dhp.projecttoresult;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskResultLost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import static eu.dnetlib.dhp.PropagationConstant.TRUE;
import static eu.dnetlib.dhp.PropagationConstant.createOutputDirs;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
public class PrepareResultProjectAssociation {
private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception{
String jsonConfiguration = IOUtils.toString(PrepareDatasourceCountryAssociation.class
.getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_projecttoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
boolean writeUpdates = TRUE.equals(parser.get("writeUpdate"));
log.info("writeUpdates: {} ", writeUpdates);
boolean saveGraph = TRUE.equals(parser.get("saveGraph"));
log.info("saveGraph {}", saveGraph);
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel));
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
runWithSparkHiveSession(conf, isSparkSessionManaged,
spark -> {
createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
});
}
}

View File

@ -0,0 +1,4 @@
package eu.dnetlib.dhp.projecttoresult;
public class ProjectResultSet {
}

View File

@ -0,0 +1,62 @@
[
{
"paramName":"mt",
"paramLongName":"master",
"paramDescription": "should be local or yarn",
"paramRequired": true
},
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName":"h",
"paramLongName":"hive_metastore_uris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName":"wu",
"paramLongName":"writeUpdate",
"paramDescription": "true if the update must be written. No double check if information is already present",
"paramRequired": false
},
{
"paramName":"sg",
"paramLongName":"saveGraph",
"paramDescription": "true if the new version of the graph must be saved",
"paramRequired": false
},
{
"paramName":"tn",
"paramLongName":"resultTableName",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": false
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": false
},
{
"paramName": "p",
"paramLongName": "preparedInfoPath",
"paramDescription": "the path where prepared info have been stored",
"paramRequired": false
},
{
"paramName": "w",
"paramLongName": "whitelist",
"paramDescription": "the datasource having a type different from the allowed ones but that we want to add anyway",
"paramRequired": true
},
{
"paramName": "at",
"paramLongName": "allowedtypes",
"paramDescription": "the allowed datasource types for country propagation",
"paramRequired": true
}
]

View File

@ -0,0 +1,39 @@
[
{
"paramName":"mt",
"paramLongName":"master",
"paramDescription": "should be local or yarn",
"paramRequired": true
},
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName":"asr",
"paramLongName":"allowedsemrels",
"paramDescription": "the types of the allowed datasources. Split by ;",
"paramRequired": true
},
{
"paramName":"h",
"paramLongName":"hive_metastore_uris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName":"wu",
"paramLongName":"writeUpdate",
"paramDescription": "true if the update must be writte. No double check if information is already present",
"paramRequired": true
},
{
"paramName":"sg",
"paramLongName":"saveGraph",
"paramDescription": "true if the new version of the graph must be saved",
"paramRequired": true
}
]

View File

@ -0,0 +1,4 @@
package eu.dnetlib.dhp.countrypropagation;
public class CountryPropagationJobTest {
}

View File

@ -0,0 +1,4 @@
package eu.dnetlib.dhp.countrypropagation;
public class PrepareDataForTest {
}