forked from D-Net/dnet-hadoop
new implementation for result to project propagation. Use the prepared info in propagation
This commit is contained in:
parent
ceb1f299bf
commit
cadab9b81d
|
@ -1,143 +1,160 @@
|
||||||
package eu.dnetlib.dhp.projecttoresult;
|
package eu.dnetlib.dhp.projecttoresult;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.gson.Gson;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.countrypropagation.DatasourceCountry;
|
||||||
|
import eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation;
|
||||||
|
import eu.dnetlib.dhp.countrypropagation.ResultCountrySet;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
import org.apache.arrow.flatbuf.Bool;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.api.java.function.FlatMapFunction2;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.*;
|
||||||
|
import org.omg.CORBA.OBJ_ADAPTER;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
public class SparkResultToProjectThroughSemRelJob3 {
|
public class SparkResultToProjectThroughSemRelJob3 {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class);
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
String jsonConfiguration = IOUtils.toString(SparkResultToProjectThroughSemRelJob3.class
|
||||||
|
.getResourceAsStream("/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json"));
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils.toString(SparkResultToProjectThroughSemRelJob3.class
|
jsonConfiguration);
|
||||||
.getResourceAsStream("/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json")));
|
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
String inputPath = parser.get("sourcePath");
|
||||||
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath {}: ", outputPath);
|
||||||
|
|
||||||
|
final String potentialUpdatePath = parser.get("potentialUpdatePath");
|
||||||
|
log.info("potentialUpdatePath {}: ", potentialUpdatePath);
|
||||||
|
|
||||||
|
final String alreadyLinkedPath = parser.get("alreadyLinkedPath");
|
||||||
|
log.info("alreadyLinkedPath {}: ", alreadyLinkedPath);
|
||||||
|
|
||||||
|
final Boolean writeUpdates = Boolean.valueOf(parser.get("writeUpdate"));
|
||||||
|
log.info("writeUpdate: {}", writeUpdates);
|
||||||
|
|
||||||
|
final Boolean saveGraph = Boolean.valueOf(parser.get("saveGraph"));
|
||||||
|
log.info("saveGraph: {}", saveGraph);
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
|
||||||
final SparkSession spark = SparkSession
|
|
||||||
.builder()
|
|
||||||
.appName(SparkResultToProjectThroughSemRelJob3.class.getSimpleName())
|
|
||||||
.master(parser.get("master"))
|
|
||||||
.config(conf)
|
|
||||||
.enableHiveSupport()
|
|
||||||
.getOrCreate();
|
|
||||||
|
|
||||||
|
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
runWithSparkSession(conf, isSparkSessionManaged,
|
||||||
final String inputPath = parser.get("sourcePath");
|
spark -> {
|
||||||
final String outputPath = "/tmp/provision/propagation/projecttoresult";
|
//createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
|
||||||
boolean writeUpdates = "true".equals(parser.get("writeUpdate"));
|
if(isTest(parser)) {
|
||||||
boolean saveGraph = "true".equals(parser.get("saveGraph"));
|
removeOutputDir(spark, outputPath);
|
||||||
|
}
|
||||||
|
execPropagation(spark, inputPath, outputPath, alreadyLinkedPath, potentialUpdatePath, writeUpdates, saveGraph);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
|
|
||||||
|
|
||||||
createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
|
private static void execPropagation(SparkSession spark, String inputPath, String outputPath, String alreadyLinkedPath, String potentialUpdatePath,
|
||||||
|
Boolean writeUpdate, Boolean saveGraph){
|
||||||
|
|
||||||
Dataset<Relation> relation = spark.createDataset(sc.textFile(inputPath + "/relation")
|
Dataset<ProjectResultSet> toaddrelations = readAssocProjectResults(spark, potentialUpdatePath);
|
||||||
.map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class));
|
Dataset<ProjectResultSet> alreadyLinked = readAssocProjectResults(spark, alreadyLinkedPath);
|
||||||
|
|
||||||
relation.createOrReplaceTempView("relation");
|
if(writeUpdate){
|
||||||
|
toaddrelations
|
||||||
|
.toJSON()
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression","gzip")
|
||||||
|
.text(outputPath +"/potential_updates");
|
||||||
|
//writeUpdates(toaddrelations.toJavaRDD(), outputPath + "/potential_updates");
|
||||||
|
}
|
||||||
|
if (saveGraph){
|
||||||
|
getNewRelations(alreadyLinked, toaddrelations)
|
||||||
|
.toJSON()
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Append)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.text(outputPath);
|
||||||
|
// JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
// sc.textFile(inputPath)
|
||||||
|
// .map(item -> OBJECT_MAPPER.readValue(item, Relation.class))
|
||||||
|
// .union(getNewRelations(alreadyLinked, toaddrelations)
|
||||||
|
// .toJavaRDD())
|
||||||
|
// .map(r -> OBJECT_MAPPER.writeValueAsString(r))
|
||||||
|
// .saveAsTextFile(outputPath , GzipCodec.class);
|
||||||
|
}
|
||||||
|
|
||||||
String query = "Select source, target " +
|
}
|
||||||
"from relation " +
|
|
||||||
"where datainfo.deletedbyinference = false and relClass = '" + RELATION_RESULT_PROJECT_REL_CLASS + "'";
|
|
||||||
|
|
||||||
Dataset<Row> resproj_relation = spark.sql(query);
|
private static Dataset<Relation> getNewRelations(Dataset<ProjectResultSet> alreadyLinked,
|
||||||
|
Dataset<ProjectResultSet> toaddrelations){
|
||||||
|
|
||||||
query = "Select source, target " +
|
|
||||||
"from relation " +
|
|
||||||
"where datainfo.deletedbyinference = false " + getConstraintList(" relClass = '", allowedsemrel );
|
|
||||||
|
|
||||||
Dataset<Row> resres_relation = spark.sql(query);
|
return toaddrelations
|
||||||
resres_relation.createOrReplaceTempView("resres_relation");
|
.joinWith(alreadyLinked, toaddrelations.col("projectId").equalTo(alreadyLinked.col("projectId")), "left")
|
||||||
resproj_relation.createOrReplaceTempView("resproj_relation");
|
.flatMap(value -> {
|
||||||
|
List<Relation> new_relations = new ArrayList<>();
|
||||||
query ="SELECT proj, collect_set(r1target) result_set " +
|
ProjectResultSet potential_update = value._1();
|
||||||
"FROM (" +
|
ProjectResultSet already_linked = value._2();
|
||||||
" SELECT r1.source as source, r1.target as r1target, r2.target as proj " +
|
String projId = already_linked.getProjectId();
|
||||||
" FROM resres_relation r1 " +
|
potential_update
|
||||||
" JOIN resproj_relation r2 " +
|
.getResultSet()
|
||||||
" ON r1.source = r2.source " +
|
.stream()
|
||||||
" ) tmp " +
|
.forEach(rId -> {
|
||||||
"GROUP BY proj ";
|
if (!already_linked.getResultSet().contains(rId)){
|
||||||
|
new_relations.add(getRelation(rId, projId, RELATION_RESULT_PROJECT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE,
|
||||||
Dataset<Row> toaddrelations = spark.sql(query);
|
RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE,
|
||||||
|
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
|
||||||
query = "select target, collect_set(source) result_list from " +
|
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
|
||||||
"resproj_relation " +
|
new_relations.add(getRelation(projId, rId, RELATION_PROJECT_RESULT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE,
|
||||||
"group by target";
|
RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE,
|
||||||
|
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
|
||||||
Dataset<Row> project_resultlist = spark.sql(query);
|
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
|
||||||
|
|
||||||
//if (writeUpdaes){
|
|
||||||
toaddrelations.toJavaRDD().map(r->new ObjectMapper().writeValueAsString(r))
|
|
||||||
.saveAsTextFile(outputPath + "/toupdaterelations");
|
|
||||||
//}
|
|
||||||
|
|
||||||
if(saveGraph){
|
|
||||||
JavaRDD<Relation> new_relations = toaddrelations.toJavaRDD().mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))
|
|
||||||
.leftOuterJoin(project_resultlist.toJavaRDD().mapToPair(pr -> new Tuple2<>(pr.getString(0), pr.getList(1))))
|
|
||||||
.flatMap(c -> {
|
|
||||||
List<Object> toAddRel = new ArrayList<>();
|
|
||||||
toAddRel.addAll(c._2()._1());
|
|
||||||
if (c._2()._2().isPresent()) {
|
|
||||||
List<Object> originalRels = c._2()._2().get();
|
|
||||||
for (Object o : originalRels) {
|
|
||||||
if (toAddRel.contains(o)) {
|
|
||||||
toAddRel.remove(o);
|
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
}
|
return new_relations.iterator();
|
||||||
List<Relation> relationList = new ArrayList<>();
|
|
||||||
String projId = c._1();
|
|
||||||
for (Object r : toAddRel) {
|
|
||||||
String rId = (String) r;
|
|
||||||
relationList.add(getRelation(rId, projId, RELATION_RESULT_PROJECT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE,
|
|
||||||
RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE,
|
|
||||||
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
|
|
||||||
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
|
|
||||||
relationList.add(getRelation(projId, rId, RELATION_PROJECT_RESULT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE,
|
|
||||||
RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE,
|
|
||||||
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
|
|
||||||
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
|
|
||||||
|
|
||||||
}
|
}
|
||||||
if(relationList.size()==0){
|
,Encoders.bean(Relation.class));
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return relationList.iterator();
|
|
||||||
}).filter(r -> !(r==null));
|
|
||||||
|
|
||||||
|
|
||||||
new_relations.map(r-> new ObjectMapper().writeValueAsString(r))
|
|
||||||
.saveAsTextFile(outputPath + "/new_relations" );
|
|
||||||
|
|
||||||
sc.textFile(inputPath + "/relation")
|
|
||||||
.map(item -> new ObjectMapper().readValue(item, Relation.class))
|
|
||||||
.union(new_relations)
|
|
||||||
.map(r -> new ObjectMapper().writeValueAsString(r))
|
|
||||||
.saveAsTextFile(outputPath + "/relation");
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void writeUpdates(JavaRDD<ProjectResultSet> potentialUpdates, String outputPath){
|
||||||
|
potentialUpdates.map(u -> OBJECT_MAPPER.writeValueAsString(u))
|
||||||
|
.saveAsTextFile(outputPath, GzipCodec.class);
|
||||||
|
}
|
||||||
//JavaPairRDD<String, TypedRow> result_result = getResultResultSemRel(allowedsemrel, relations);
|
//JavaPairRDD<String, TypedRow> result_result = getResultResultSemRel(allowedsemrel, relations);
|
||||||
|
|
||||||
// JavaPairRDD<String, TypedRow> result_project = relations
|
// JavaPairRDD<String, TypedRow> result_project = relations
|
||||||
|
@ -211,9 +228,14 @@ public class SparkResultToProjectThroughSemRelJob3 {
|
||||||
// newRels.union(relations).map(p -> new ObjectMapper().writeValueAsString(p))
|
// newRels.union(relations).map(p -> new ObjectMapper().writeValueAsString(p))
|
||||||
// .saveAsTextFile(outputPath + "/relation");
|
// .saveAsTextFile(outputPath + "/relation");
|
||||||
|
|
||||||
|
//}
|
||||||
|
|
||||||
|
|
||||||
|
private static Dataset<ProjectResultSet> readAssocProjectResults(SparkSession spark, String potentialUpdatePath) {
|
||||||
|
return spark
|
||||||
|
.read()
|
||||||
|
.textFile(potentialUpdatePath)
|
||||||
|
.map(value -> OBJECT_MAPPER.readValue(value, ProjectResultSet.class), Encoders.bean(ProjectResultSet.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue