refactoring

This commit is contained in:
Miriam Baglioni 2020-04-23 12:40:44 +02:00
parent d8dc31d4af
commit 769aa8178a
2 changed files with 163 additions and 122 deletions

View File

@ -1,37 +1,36 @@
package eu.dnetlib.dhp.projecttoresult;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.PropagationConstant.getConstraintList;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation;
import eu.dnetlib.dhp.schema.oaf.Relation;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.PropagationConstant.getConstraintList;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
public class PrepareProjectResultsAssociation {
private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class);
private static final Logger log =
LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception{
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils.toString(PrepareProjectResultsAssociation.class
.getResourceAsStream("/eu/dnetlib/dhp/projecttoresult/input_prepareprojecttoresult_parameters.json"));
String jsonConfiguration =
IOUtils.toString(
PrepareProjectResultsAssociation.class.getResourceAsStream(
"/eu/dnetlib/dhp/projecttoresult/input_prepareprojecttoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
jsonConfiguration);
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
@ -53,69 +52,88 @@ public class PrepareProjectResultsAssociation {
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
runWithSparkHiveSession(conf, isSparkSessionManaged,
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
// removeOutputDir(spark, potentialUpdatePath);
// removeOutputDir(spark, alreadyLinkedPath);
prepareResultProjProjectResults(spark, inputPath, potentialUpdatePath, alreadyLinkedPath, allowedsemrel);
// removeOutputDir(spark, potentialUpdatePath);
// removeOutputDir(spark, alreadyLinkedPath);
prepareResultProjProjectResults(
spark,
inputPath,
potentialUpdatePath,
alreadyLinkedPath,
allowedsemrel);
});
}
private static void prepareResultProjProjectResults(SparkSession spark, String inputPath, String potentialUpdatePath,
String alreadyLinkedPath, List<String> allowedsemrel) {
private static void prepareResultProjProjectResults(
SparkSession spark,
String inputPath,
String potentialUpdatePath,
String alreadyLinkedPath,
List<String> allowedsemrel) {
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
Dataset<Relation> relation = spark.createDataset(sc.textFile(inputPath )
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class));
Dataset<Relation> relation =
spark.createDataset(
sc.textFile(inputPath)
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class))
.rdd(),
Encoders.bean(Relation.class));
relation.createOrReplaceTempView("relation");
String query = "SELECT source, target " +
" FROM relation " +
" WHERE datainfo.deletedbyinference = false " +
" AND relClass = '" + RELATION_RESULT_PROJECT_REL_CLASS + "'";
String query =
"SELECT source, target "
+ " FROM relation "
+ " WHERE datainfo.deletedbyinference = false "
+ " AND relClass = '"
+ RELATION_RESULT_PROJECT_REL_CLASS
+ "'";
Dataset<Row> resproj_relation = spark.sql(query);
resproj_relation.createOrReplaceTempView("resproj_relation");
query ="SELECT projectId, collect_set(resId) resultSet " +
"FROM (" +
" SELECT r1.target resId, r2.target projectId " +
" FROM (SELECT source, target " +
" FROM relation " +
" WHERE datainfo.deletedbyinference = false " +
getConstraintList(" relClass = '", allowedsemrel ) + ") r1" +
" JOIN resproj_relation r2 " +
" ON r1.source = r2.source " +
" ) tmp " +
"GROUP BY projectId ";
spark.sql(query).as(Encoders.bean(ProjectResultSet.class))
.toJSON()
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.text(potentialUpdatePath);
// .toJavaRDD()
// .map(r -> OBJECT_MAPPER.writeValueAsString(r))
// .saveAsTextFile(potentialUpdatePath, GzipCodec.class);
query = "SELECT target projectId, collect_set(source) resultSet " +
"FROM resproj_relation " +
"GROUP BY target";
query =
"SELECT projectId, collect_set(resId) resultSet "
+ "FROM ("
+ " SELECT r1.target resId, r2.target projectId "
+ " FROM (SELECT source, target "
+ " FROM relation "
+ " WHERE datainfo.deletedbyinference = false "
+ getConstraintList(" relClass = '", allowedsemrel)
+ ") r1"
+ " JOIN resproj_relation r2 "
+ " ON r1.source = r2.source "
+ " ) tmp "
+ "GROUP BY projectId ";
spark.sql(query)
.as(Encoders.bean(ProjectResultSet.class))
.toJSON()
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.text(alreadyLinkedPath);
// .toJavaRDD()
// .map(r -> OBJECT_MAPPER.writeValueAsString(r))
// .saveAsTextFile(alreadyLinkedPath, GzipCodec.class);
.option("compression", "gzip")
.text(potentialUpdatePath);
// .toJavaRDD()
// .map(r -> OBJECT_MAPPER.writeValueAsString(r))
// .saveAsTextFile(potentialUpdatePath, GzipCodec.class);
query =
"SELECT target projectId, collect_set(source) resultSet "
+ "FROM resproj_relation "
+ "GROUP BY target";
spark.sql(query)
.as(Encoders.bean(ProjectResultSet.class))
.toJSON()
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.text(alreadyLinkedPath);
// .toJavaRDD()
// .map(r -> OBJECT_MAPPER.writeValueAsString(r))
// .saveAsTextFile(alreadyLinkedPath, GzipCodec.class);
}
}

View File

@ -1,31 +1,34 @@
package eu.dnetlib.dhp.projecttoresult;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation;
import eu.dnetlib.dhp.schema.oaf.Relation;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class SparkResultToProjectThroughSemRelJob3 {
private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class);
private static final Logger log =
LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils.toString(SparkResultToProjectThroughSemRelJob3.class
.getResourceAsStream("/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json"));
String jsonConfiguration =
IOUtils.toString(
SparkResultToProjectThroughSemRelJob3.class.getResourceAsStream(
"/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
jsonConfiguration);
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
@ -49,84 +52,104 @@ public class SparkResultToProjectThroughSemRelJob3 {
SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged,
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
if(isTest(parser)) {
if (isTest(parser)) {
removeOutputDir(spark, outputPath);
}
execPropagation(spark, outputPath, alreadyLinkedPath, potentialUpdatePath, writeUpdates, saveGraph);
execPropagation(
spark,
outputPath,
alreadyLinkedPath,
potentialUpdatePath,
writeUpdates,
saveGraph);
});
}
private static void execPropagation(
SparkSession spark,
String outputPath,
String alreadyLinkedPath,
String potentialUpdatePath,
Boolean writeUpdate,
Boolean saveGraph) {
private static void execPropagation(SparkSession spark, String outputPath, String alreadyLinkedPath, String potentialUpdatePath,
Boolean writeUpdate, Boolean saveGraph){
Dataset<ProjectResultSet> toaddrelations =
readAssocProjectResults(spark, potentialUpdatePath);
Dataset<ProjectResultSet> alreadyLinked = readAssocProjectResults(spark, alreadyLinkedPath);
Dataset<ProjectResultSet> toaddrelations = readAssocProjectResults(spark, potentialUpdatePath);
Dataset<ProjectResultSet> alreadyLinked = readAssocProjectResults(spark, alreadyLinkedPath);
if(writeUpdate){
if (writeUpdate) {
toaddrelations
.toJSON()
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.text(outputPath +"/potential_updates");
.option("compression", "gzip")
.text(outputPath + "/potential_updates");
}
if (saveGraph){
if (saveGraph) {
getNewRelations(alreadyLinked, toaddrelations)
.toJSON()
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.text(outputPath);
}
}
private static Dataset<Relation> getNewRelations(Dataset<ProjectResultSet> alreadyLinked,
Dataset<ProjectResultSet> toaddrelations){
private static Dataset<Relation> getNewRelations(
Dataset<ProjectResultSet> alreadyLinked, Dataset<ProjectResultSet> toaddrelations) {
return toaddrelations
.joinWith(alreadyLinked, toaddrelations.col("projectId").equalTo(alreadyLinked.col("projectId")), "left")
.flatMap(value -> {
List<Relation> new_relations = new ArrayList<>();
ProjectResultSet potential_update = value._1();
ProjectResultSet already_linked = value._2();
String projId = already_linked.getProjectId();
potential_update
.getResultSet()
.stream()
.forEach(rId -> {
if (!already_linked.getResultSet().contains(rId)){
new_relations.add(getRelation(rId, projId, RELATION_RESULT_PROJECT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE,
RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
new_relations.add(getRelation(projId, rId, RELATION_PROJECT_RESULT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE,
RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
}
});
return new_relations.iterator();
}
,Encoders.bean(Relation.class));
}
private static Dataset<ProjectResultSet> readAssocProjectResults(SparkSession spark, String potentialUpdatePath) {
return spark
.read()
.textFile(potentialUpdatePath)
.map(value -> OBJECT_MAPPER.readValue(value, ProjectResultSet.class), Encoders.bean(ProjectResultSet.class));
.joinWith(
alreadyLinked,
toaddrelations.col("projectId").equalTo(alreadyLinked.col("projectId")),
"left")
.flatMap(
value -> {
List<Relation> new_relations = new ArrayList<>();
ProjectResultSet potential_update = value._1();
ProjectResultSet already_linked = value._2();
String projId = already_linked.getProjectId();
potential_update.getResultSet().stream()
.forEach(
rId -> {
if (!already_linked.getResultSet().contains(rId)) {
new_relations.add(
getRelation(
rId,
projId,
RELATION_RESULT_PROJECT_REL_CLASS,
RELATION_RESULTPROJECT_REL_TYPE,
RELATION_RESULTPROJECT_SUBREL_TYPE,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
new_relations.add(
getRelation(
projId,
rId,
RELATION_PROJECT_RESULT_REL_CLASS,
RELATION_RESULTPROJECT_REL_TYPE,
RELATION_RESULTPROJECT_SUBREL_TYPE,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
}
});
return new_relations.iterator();
},
Encoders.bean(Relation.class));
}
private static Dataset<ProjectResultSet> readAssocProjectResults(
SparkSession spark, String potentialUpdatePath) {
return spark.read()
.textFile(potentialUpdatePath)
.map(
value -> OBJECT_MAPPER.readValue(value, ProjectResultSet.class),
Encoders.bean(ProjectResultSet.class));
}
}