1
0
Fork 0

to make it visible to Claudio

This commit is contained in:
Miriam Baglioni 2020-03-30 10:50:03 +02:00
parent 19d7f8b51d
commit b1af90a45f
1 changed files with 356 additions and 23 deletions

View File

@ -4,32 +4,37 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.QueryInformationSystem; import eu.dnetlib.dhp.QueryInformationSystem;
import eu.dnetlib.dhp.TypedRow; import eu.dnetlib.dhp.TypedRow;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import scala.Tuple2; import scala.Tuple2;
import java.io.File;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.PropagationConstant.PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME;
public class SparkResultToCommunityThroughSemRelJob { public class SparkResultToCommunityThroughSemRelJob {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToCommunityThroughSemRelJob.class.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json"))); final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils
.toString(SparkResultToCommunityThroughSemRelJob.class
.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json")));
parser.parseArgument(args); parser.parseArgument(args);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
final SparkSession spark = SparkSession final SparkSession spark = SparkSession
.builder() .builder()
.appName(SparkResultToCommunityThroughSemRelJob.class.getSimpleName()) .appName(SparkResultToCommunityThroughSemRelJob.class.getSimpleName())
.master(parser.get("master")) .master(parser.get("master"))
.config(conf)
.enableHiveSupport() .enableHiveSupport()
.getOrCreate(); .getOrCreate();
@ -37,29 +42,125 @@ public class SparkResultToCommunityThroughSemRelJob {
final String inputPath = parser.get("sourcePath"); final String inputPath = parser.get("sourcePath");
final String outputPath = "/tmp/provision/propagation/communitytoresultthroughsemrel"; final String outputPath = "/tmp/provision/propagation/communitytoresultthroughsemrel";
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); //final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
final List<String> communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); final List<String> allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo");
//final List<String> communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl"));
final List<String> communityIdList = QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp");
File directory = new File(outputPath); createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
if (!directory.exists()) {
directory.mkdirs();
}
JavaPairRDD<String, TypedRow> result_result = getResultResultSemRel(allowedsemrel, JavaRDD<Publication> all_publication_rdd = sc.textFile(inputPath + "/publication")
sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) .map(item -> new ObjectMapper().readValue(item, Publication.class))
.map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class))); .filter(p -> !p.getDataInfo().getDeletedbyinference()).cache();
JavaRDD<Publication> publication_rdd = all_publication_rdd
.filter(p -> relatedToCommunities(p, communityIdList)).cache();
JavaRDD<Publication> publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) JavaRDD<Dataset> all_dataset_rdd = sc.textFile(inputPath + "/dataset")
.map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)).cache(); .map(item -> new ObjectMapper().readValue(item, Dataset.class))
JavaRDD<Dataset> datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) .filter(p -> !p.getDataInfo().getDeletedbyinference()).cache();
.map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)).cache(); JavaRDD<Dataset> dataset_rdd = all_dataset_rdd
JavaRDD<Software> software = sc.sequenceFile(inputPath + "/software", Text.class, Text.class) .filter(p -> relatedToCommunities(p, communityIdList)).cache();
.map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)).cache();
JavaRDD<OtherResearchProduct> other = sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)).cache();
JavaPairRDD<String, TypedRow> resultLinkedToCommunities = publications JavaRDD<OtherResearchProduct> all_orp_rdd = sc.textFile(inputPath + "/otherresearchproduct")
.map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class))
.filter(p -> !p.getDataInfo().getDeletedbyinference()).cache();
JavaRDD<OtherResearchProduct> orp_rdd = all_orp_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache();
JavaRDD<Software> all_software_rdd = sc.textFile(inputPath + "/software")
.map(item -> new ObjectMapper().readValue(item, Software.class))
.filter(p -> !p.getDataInfo().getDeletedbyinference()).cache();
JavaRDD<Software> software_rdd = all_software_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache();
JavaRDD<Relation> relation_rdd = sc.textFile(inputPath + "/relation")
.map(item -> new ObjectMapper().readValue(item, Relation.class))
.filter(r -> !r.getDataInfo().getDeletedbyinference())
.filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache();
org.apache.spark.sql.Dataset<Publication> publication = spark.createDataset(publication_rdd.rdd(),
Encoders.bean(Publication.class));
org.apache.spark.sql.Dataset<Dataset> dataset = spark.createDataset(dataset_rdd.rdd(),
Encoders.bean(Dataset.class));
org.apache.spark.sql.Dataset<OtherResearchProduct> other = spark.createDataset(orp_rdd.rdd(),
Encoders.bean(OtherResearchProduct.class));
org.apache.spark.sql.Dataset<Software> software = spark.createDataset(software_rdd.rdd(),
Encoders.bean(Software.class));
org.apache.spark.sql.Dataset<Relation> relation = spark.createDataset(relation_rdd.rdd(),
Encoders.bean(Relation.class));
publication.createOrReplaceTempView("publication");
relation.createOrReplaceTempView("relation");
dataset.createOrReplaceTempView("dataset");
software.createOrReplaceTempView("software");
other.createOrReplaceTempView("other");
org.apache.spark.sql.Dataset<Row> publication_context = getContext(spark, "publication");
publication_context.createOrReplaceTempView("publication_context");
org.apache.spark.sql.Dataset<Row> dataset_context = getContext(spark, "dataset");
dataset_context.createOrReplaceTempView("dataset_context");
org.apache.spark.sql.Dataset<Row> software_context = getContext(spark, "software");
software_context.createOrReplaceTempView("software_context");
org.apache.spark.sql.Dataset<Row> other_context = getContext(spark, "other");
other_context.createOrReplaceTempView("other_context");
publication = spark.createDataset(all_publication_rdd.rdd(),
Encoders.bean(Publication.class));
publication.createOrReplaceTempView("publication");
dataset = spark.createDataset(all_dataset_rdd.rdd(),
Encoders.bean(Dataset.class));
dataset.createOrReplaceTempView("dataset");
other = spark.createDataset(all_orp_rdd.rdd(),
Encoders.bean(OtherResearchProduct.class));
other.createOrReplaceTempView("other");
software = spark.createDataset(all_software_rdd.rdd(),
Encoders.bean(Software.class));
software.createOrReplaceTempView("software");
org.apache.spark.sql.Dataset<Row> toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software");
org.apache.spark.sql.Dataset<Row> toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset");
org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult = getUpdateCommunitiesForTable(spark, "publication");
org.apache.spark.sql.Dataset<Row> toupdateotherresult = getUpdateCommunitiesForTable(spark, "other");
createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
updateForDatasetDataset(toupdatesoftwareresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "software",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
updateForOtherDataset(toupdatepublicationreresult.toJavaRDD(), other.toJavaRDD(), outputPath, "publication",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
/*
JavaPairRDD<String, TypedRow> resultLinkedToCommunities = publication
.map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication")) .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication"))
.filter(p -> !(p == null)) .filter(p -> !(p == null))
.mapToPair(toPair()) .mapToPair(toPair())
@ -93,7 +194,239 @@ public class SparkResultToCommunityThroughSemRelJob {
updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME);
//leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato] //leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato]
//per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla //per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla
*/
}
private static org.apache.spark.sql.Dataset<Row> getUpdateCommunitiesForTable(SparkSession spark, String table){
String query = "SELECT target_id, collect_set(co.id) context_id " +
" FROM (SELECT t.id target_id, s.context source_context " +
" FROM context_software s " +
" JOIN " + table + " t " +
" ON s.target = t.id " +
" UNION ALL " +
" SELECT t.id target_id, d.context source_context " +
" FROM dataset_context d " +
" JOIN " + table + " t" +
" ON s.target = t.id " +
" UNION ALL " +
" SELECT t.id target_id, p.context source_context " +
" FROM publication_context p" +
" JOIN " + table +" t " +
" on p.target = t.id " +
" UNION ALL " +
" SELECT t.id target_id, o.context source_context " +
" FROM other_context o " +
" JOIN " + table + " t " +
" ON o.target = t.id) TMP " +
" LATERAL VIEW EXPLORE(source_context) MyT as co " +
" GROUP BY target_id" ;
return spark.sql(query);
}
private static JavaRDD<Result> createUpdateForResultDatasetWrite(JavaRDD<Row> toupdateresult, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
return toupdateresult.map(r -> {
List<Context> contextList = new ArrayList();
List<String> toAddContext = r.getList(1);
for (String cId : toAddContext) {
if (communityIdList.contains(cId)) {
Context newContext = new Context();
newContext.setId(cId);
newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
contextList.add(newContext);
}
}
if (contextList.size() > 0) {
Result ret = new Result();
ret.setId(r.getString(0));
ret.setContext(contextList);
return ret;
}
return null;
}).filter(r -> r != null);
}
private static void updateForSoftwareDataset(JavaRDD<Row> toupdateresult, JavaRDD<Software> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
.map(r -> (Software) r)
.map(s -> new ObjectMapper().writeValueAsString(s))
.saveAsTextFile(outputPath + "/" + type);
}
private static void updateForDatasetDataset(JavaRDD<Row> toupdateresult, JavaRDD<Dataset> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
.map( r-> (Dataset)r)
.map(d -> new ObjectMapper().writeValueAsString(d))
.saveAsTextFile(outputPath + "/" + type);
}
private static void updateForPublicationDataset(JavaRDD<Row> toupdateresult, JavaRDD<Publication> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
.map(r -> (Publication)r)
.map(p -> new ObjectMapper().writeValueAsString(p))
.saveAsTextFile(outputPath + "/" + type);
}
private static void updateForOtherDataset(JavaRDD<Row> toupdateresult, JavaRDD<OtherResearchProduct> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
.map( r -> (OtherResearchProduct)r)
.map( o -> new ObjectMapper().writeValueAsString(o))
.saveAsTextFile(outputPath + "/" + type);
}
private static JavaRDD<Result> getUpdateForResultDataset(JavaRDD<Row> toupdateresult, JavaPairRDD<String, Result> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
return result.leftOuterJoin(toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))))
.map(c -> {
if(! c._2()._2().isPresent()){
return c._2()._1();
}
List<Object> toAddContext = c._2()._2().get();
Set<String> context_set = new HashSet<>();
for(Object cId: toAddContext){
String id = (String)cId;
if (communityIdList.contains(id)){
context_set.add(id);
}
}
for (Context context: c._2()._1().getContext()){
if(context_set.contains(context)){
context_set.remove(context);
}
}
List<Context> contextList = context_set.stream().map(co -> {
Context newContext = new Context();
newContext.setId(co);
newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
return newContext;
}).collect(Collectors.toList());
if(contextList.size() > 0 ){
Result r = new Result();
r.setId(c._1());
r.setContext(contextList);
return r;
}
return null;
}).filter(r -> r != null);
// return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))
// .join(result)
// .map(c -> {
// List<Object> toAddContext = c._2()._1();
// Set<String> context_set = new HashSet<>();
// for(Object cId: toAddContext){
// String id = (String)cId;
// if (communityIdList.contains(id)){
// context_set.add(id);
// }
// }
// for (Context context: c._2()._2().getContext()){
// if(context_set.contains(context)){
// context_set.remove(context);
// }
// }
//
// List<Context> contextList = context_set.stream().map(co -> {
// Context newContext = new Context();
// newContext.setId(co);
// newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
// return newContext;
//
// }).collect(Collectors.toList());
//
// if(contextList.size() > 0 ){
// Result r = new Result();
// r.setId(c._1());
// r.setContext(contextList);
// return r;
// }
// return null;
// })
// .filter(r -> r != null);
}
private static JavaRDD<Software> createUpdateForSoftwareDataset(JavaRDD<Row> toupdateresult, List<String> communityList,
JavaRDD<Software> result, String class_id, String class_name) {
return result
.mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList))
.map(c -> {
Software oaf = c._2()._1();
if (c._2()._2().isPresent()) {
HashSet<String> contexts = new HashSet<>(c._2()._2().get());
for (Context context : oaf.getContext()) {
if (contexts.contains(context.getId())){
if (!context.getDataInfo().stream().map(di -> di.getInferenceprovenance())
.collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){
context.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name));
//community id already in the context of the result. Remove it from the set that has to be added
contexts.remove(context.getId());
}
}
}
List<Context> cc = oaf.getContext();
for(String cId: contexts){
Context context = new Context();
context.setId(cId);
context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
cc.add(context);
}
oaf.setContext(cc);
}
return oaf;
});
}
private static JavaPairRDD<String, List<String>> getStringResultJavaPairRDD(JavaRDD<Row> toupdateresult, List<String> communityList) {
return toupdateresult.mapToPair(c -> {
List<String> contextList = new ArrayList<>();
List<String> contexts = c.getList(1);
for (String context : contexts) {
if (communityList.contains(context)) {
contextList.add(context);
}
}
return new Tuple2<>(c.getString(0) ,contextList);
});
}
private static org.apache.spark.sql.Dataset<Row> getContext(SparkSession spark, String table){
String query = "SELECT source, context , target " +
"FROM " + table +
" JOIN relation " +
"ON id = source" ;
return spark.sql(query);
}
private static Boolean relatedToCommunities(Result r, List<String> communityIdList) {
Set<String> result_communities = r.getContext()
.stream()
.map(c -> c.getId())
.collect(Collectors.toSet());
for (String communityId : result_communities) {
if (communityIdList.contains(communityId)) {
return true;
}
}
return false;
} }
private static void updateResult(JavaPairRDD<String, Result> results, JavaPairRDD<String, TypedRow> toupdateresult, String outputPath, String type) { private static void updateResult(JavaPairRDD<String, Result> results, JavaPairRDD<String, TypedRow> toupdateresult, String outputPath, String type) {