refactoring

This commit is contained in:
Miriam Baglioni 2020-04-27 10:34:03 +02:00
parent 95a54d5460
commit e000754c92
3 changed files with 226 additions and 540 deletions

View File

@ -1,5 +1,8 @@
package eu.dnetlib.dhp.resulttocommunityfromsemrel; package eu.dnetlib.dhp.resulttocommunityfromsemrel;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson; import com.google.gson.Gson;
import eu.dnetlib.dhp.QueryInformationSystem; import eu.dnetlib.dhp.QueryInformationSystem;
@ -7,6 +10,8 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -18,23 +23,18 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
public class PrepareResultCommunitySetStep1 { public class PrepareResultCommunitySetStep1 {
private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySetStep1.class); private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySetStep1.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils.toString(PrepareResultCommunitySetStep1.class String jsonConfiguration =
.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_preparecommunitytoresult_parameters.json")); IOUtils.toString(
PrepareResultCommunitySetStep1.class.getResourceAsStream(
"/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_preparecommunitytoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);
@ -62,30 +62,47 @@ public class PrepareResultCommunitySetStep1 {
final List<String> communityIdList = QueryInformationSystem.getCommunityList(isLookupUrl); final List<String> communityIdList = QueryInformationSystem.getCommunityList(isLookupUrl);
log.info("communityIdList: {}", new Gson().toJson(communityIdList)); log.info("communityIdList: {}", new Gson().toJson(communityIdList));
final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); final String resultType =
resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase();
log.info("resultType: {}", resultType); log.info("resultType: {}", resultType);
Class<? extends Result> resultClazz =
(Class<? extends Result>) Class.forName(resultClassName);
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName); runWithSparkHiveSession(
conf,
isSparkSessionManaged,
runWithSparkHiveSession(conf, isSparkSessionManaged,
spark -> { spark -> {
if (isTest(parser)) { if (isTest(parser)) {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
} }
prepareInfo(spark, inputPath, outputPath, allowedsemrel, resultClazz, resultType, prepareInfo(
spark,
inputPath,
outputPath,
allowedsemrel,
resultClazz,
resultType,
communityIdList); communityIdList);
}); });
} }
private static <R extends Result> void prepareInfo(SparkSession spark, String inputPath, String outputPath, private static <R extends Result> void prepareInfo(
List<String> allowedsemrel, Class<R> resultClazz, String resultType, SparkSession spark,
List<String> communityIdList) { String inputPath,
//read the relation table and the table related to the result it is using String outputPath,
List<String> allowedsemrel,
Class<R> resultClazz,
String resultType,
List<String> communityIdList) {
// read the relation table and the table related to the result it is using
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
org.apache.spark.sql.Dataset<Relation> relation = spark.createDataset(sc.textFile(inputPath + "/relation") org.apache.spark.sql.Dataset<Relation> relation =
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); spark.createDataset(
sc.textFile(inputPath + "/relation")
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class))
.rdd(),
Encoders.bean(Relation.class));
relation.createOrReplaceTempView("relation"); relation.createOrReplaceTempView("relation");
log.info("Reading Graph table from: {}", inputPath + "/" + resultType); log.info("Reading Graph table from: {}", inputPath + "/" + resultType);
@ -93,16 +110,18 @@ public class PrepareResultCommunitySetStep1 {
result.createOrReplaceTempView("result"); result.createOrReplaceTempView("result");
getPossibleResultcommunityAssociation(spark, allowedsemrel, outputPath + "/" + resultType, communityIdList); getPossibleResultcommunityAssociation(
spark, allowedsemrel, outputPath + "/" + resultType, communityIdList);
} }
private static void getPossibleResultcommunityAssociation(SparkSession spark, List<String> allowedsemrel, String outputPath, private static void getPossibleResultcommunityAssociation(
List<String> communityIdList) { SparkSession spark,
List<String> allowedsemrel,
String outputPath,
List<String> communityIdList) {
String communitylist = getConstraintList(" co.id = '", communityIdList); String communitylist = getConstraintList(" co.id = '", communityIdList);
String semrellist = getConstraintList(" relClass = '", allowedsemrel ); String semrellist = getConstraintList(" relClass = '", allowedsemrel);
/* /*
associates to each result the set of community contexts they are associated to associates to each result the set of community contexts they are associated to
@ -115,33 +134,38 @@ public class PrepareResultCommunitySetStep1 {
associates to each target of a relation with allowed semantics the set of community context it could possibly associates to each target of a relation with allowed semantics the set of community context it could possibly
inherit from the source of the relation inherit from the source of the relation
*/ */
String query = "Select target resultId, community_context " + String query =
"from (select id, collect_set(co.id) community_context " + "Select target resultId, community_context "
" from result " + + "from (select id, collect_set(co.id) community_context "
" lateral view explode (context) c as co " + + " from result "
" where datainfo.deletedbyinference = false "+ communitylist + + " lateral view explode (context) c as co "
" group by id) p " + + " where datainfo.deletedbyinference = false "
"JOIN " + + communitylist
"(select source, target " + + " group by id) p "
"from relation " + + "JOIN "
"where datainfo.deletedbyinference = false " + semrellist + ") r " + + "(select source, target "
"ON p.id = r.source"; + "from relation "
+ "where datainfo.deletedbyinference = false "
+ semrellist
+ ") r "
+ "ON p.id = r.source";
org.apache.spark.sql.Dataset<Row> result_context = spark.sql( query); org.apache.spark.sql.Dataset<Row> result_context = spark.sql(query);
result_context.createOrReplaceTempView("result_context"); result_context.createOrReplaceTempView("result_context");
//( target, (mes, dh-ch-, ni)) // ( target, (mes, dh-ch-, ni))
/* /*
a dataset for example could be linked to more than one publication. For each publication linked to that dataset a dataset for example could be linked to more than one publication. For each publication linked to that dataset
the previous query will produce a row: targetId set of community context the te=arget could possibly inherit the previous query will produce a row: targetId set of community context the target could possibly inherit
with the following query there will be a single row for each result linked to more than one result of the result type with the following query there will be a single row for each result linked to more than one result of the result type
currently being used currently being used
*/ */
query = "select resultId , collect_set(co) communityList " + query =
"from result_context " + "select resultId , collect_set(co) communityList "
"lateral view explode (community_context) c as co " + + "from result_context "
"where length(co) > 0 " + + "lateral view explode (community_context) c as co "
"group by resultId"; + "where length(co) > 0 "
+ "group by resultId";
spark.sql(query) spark.sql(query)
.as(Encoders.bean(ResultCommunityList.class)) .as(Encoders.bean(ResultCommunityList.class))

View File

@ -1,8 +1,13 @@
package eu.dnetlib.dhp.resulttocommunityfromsemrel; package eu.dnetlib.dhp.resulttocommunityfromsemrel;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.orcidtoresultfromsemrel.ResultOrcidList;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -11,27 +16,20 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import scala.Tuple2; import scala.Tuple2;
import java.util.HashSet;
import java.util.Set;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class PrepareResultCommunitySetStep2 { public class PrepareResultCommunitySetStep2 {
private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySetStep2.class); private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySetStep2.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils.toString(PrepareResultCommunitySetStep2.class String jsonConfiguration =
.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_prepareresulttocommunity2_parameters.json")); IOUtils.toString(
PrepareResultCommunitySetStep2.class.getResourceAsStream(
"/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_preparecommunitytoresult2_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);
@ -46,60 +44,61 @@ public class PrepareResultCommunitySetStep2 {
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> { spark -> {
if (isTest(parser)) { if (isTest(parser)) {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
} }
mergeInfo(spark, inputPath, outputPath); mergeInfo(spark, inputPath, outputPath);
}); });
} }
private static void mergeInfo(SparkSession spark, String inputPath, String outputPath) { private static void mergeInfo(SparkSession spark, String inputPath, String outputPath) {
Dataset<ResultCommunityList> resultOrcidAssocCommunityList = readResultCommunityList(spark, inputPath + "/publication") Dataset<ResultCommunityList> resultOrcidAssocCommunityList =
.union(readResultCommunityList(spark, inputPath + "/dataset")) readResultCommunityList(spark, inputPath + "/publication")
.union(readResultCommunityList(spark, inputPath + "/otherresearchproduct")) .union(readResultCommunityList(spark, inputPath + "/dataset"))
.union(readResultCommunityList(spark, inputPath + "/software")); .union(readResultCommunityList(spark, inputPath + "/otherresearchproduct"))
.union(readResultCommunityList(spark, inputPath + "/software"));
resultOrcidAssocCommunityList resultOrcidAssocCommunityList
.toJavaRDD() .toJavaRDD()
.mapToPair(r -> new Tuple2<>(r.getResultId(), r)) .mapToPair(r -> new Tuple2<>(r.getResultId(), r))
.reduceByKey((a, b) -> { .reduceByKey(
if (a == null) { (a, b) -> {
return b; if (a == null) {
} return b;
if (b == null) { }
return a; if (b == null) {
} return a;
Set<String> community_set = new HashSet<>(); }
Set<String> community_set = new HashSet<>();
a.getCommunityList().stream().forEach(aa -> community_set.add(aa)); a.getCommunityList().stream().forEach(aa -> community_set.add(aa));
b.getCommunityList().stream().forEach(aa -> { b.getCommunityList().stream()
if (!community_set.contains(aa)) { .forEach(
a.getCommunityList().add(aa); aa -> {
community_set.add(aa); if (!community_set.contains(aa)) {
} a.getCommunityList().add(aa);
}); community_set.add(aa);
return a; }
}) });
return a;
})
.map(c -> c._2()) .map(c -> c._2())
.map(r -> OBJECT_MAPPER.writeValueAsString(r)) .map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(outputPath, GzipCodec.class); .saveAsTextFile(outputPath, GzipCodec.class);
} }
private static Dataset<ResultCommunityList> readResultCommunityList(SparkSession spark, String relationPath) { private static Dataset<ResultCommunityList> readResultCommunityList(
return spark SparkSession spark, String relationPath) {
.read() return spark.read()
.textFile(relationPath) .textFile(relationPath)
.map(value -> OBJECT_MAPPER.readValue(value, ResultCommunityList.class), Encoders.bean(ResultCommunityList.class)); .map(
value -> OBJECT_MAPPER.readValue(value, ResultCommunityList.class),
Encoders.bean(ResultCommunityList.class));
} }
} }

View File

@ -1,473 +1,136 @@
package eu.dnetlib.dhp.resulttocommunityfromsemrel; package eu.dnetlib.dhp.resulttocommunityfromsemrel;
import com.fasterxml.jackson.databind.ObjectMapper; import static eu.dnetlib.dhp.PropagationConstant.*;
import eu.dnetlib.dhp.QueryInformationSystem; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import eu.dnetlib.dhp.TypedRow;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ximpleware.extended.xpath.parser;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import eu.dnetlib.dhp.schema.oaf.*;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static eu.dnetlib.dhp.PropagationConstant.*; public class SparkResultToCommunityThroughSemRelJob4 {
private static final Logger log =
LoggerFactory.getLogger(SparkResultToCommunityThroughSemRelJob4.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public class SparkResultToCommunityThroughSemRelJob3 {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils String jsonConfiguration =
.toString(SparkResultToCommunityThroughSemRelJob3.class IOUtils.toString(
.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json"))); SparkResultToCommunityThroughSemRelJob4.class.getResourceAsStream(
"/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_communitytoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);
SparkConf conf = new SparkConf(); Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String preparedInfoPath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", preparedInfoPath);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
final SparkSession spark = SparkSession
.builder()
.appName(SparkResultToCommunityThroughSemRelJob3.class.getSimpleName())
.master(parser.get("master"))
.config(conf)
.enableHiveSupport()
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final String resultClassName = parser.get("resultTableName");
final String inputPath = parser.get("sourcePath"); log.info("resultTableName: {}", resultClassName);
final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel";
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); final Boolean saveGraph =
Optional.ofNullable(parser.get("saveGraph"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("saveGraph: {}", saveGraph);
final List<String> communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); Class<? extends Result> resultClazz =
(Class<? extends Result>) Class.forName(resultClassName);
createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); runWithSparkHiveSession(
conf,
isSparkSessionManaged,
JavaRDD<Publication> publication_rdd = sc.textFile(inputPath + "/publication") spark -> {
.map(item -> new ObjectMapper().readValue(item, Publication.class)); if (isTest(parser)) {
removeOutputDir(spark, outputPath);
JavaRDD<Dataset> dataset_rdd = sc.textFile(inputPath + "/dataset")
.map(item -> new ObjectMapper().readValue(item, Dataset.class));
JavaRDD<OtherResearchProduct> orp_rdd = sc.textFile(inputPath + "/otherresearchproduct")
.map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class));
JavaRDD<Software> software_rdd = sc.textFile(inputPath + "/software")
.map(item -> new ObjectMapper().readValue(item, Software.class));
JavaRDD<Relation> relation_rdd = sc.textFile(inputPath + "/relation")
.map(item -> new ObjectMapper().readValue(item, Relation.class));
org.apache.spark.sql.Dataset<Publication> publication = spark.createDataset(publication_rdd.rdd(),
Encoders.bean(Publication.class));
org.apache.spark.sql.Dataset<Relation> relation = spark.createDataset(relation_rdd.rdd(),
Encoders.bean(Relation.class));
org.apache.spark.sql.Dataset<Dataset> dataset = spark.createDataset(dataset_rdd.rdd(),
Encoders.bean(Dataset.class));
org.apache.spark.sql.Dataset<OtherResearchProduct> other = spark.createDataset(orp_rdd.rdd(),
Encoders.bean(OtherResearchProduct.class));
org.apache.spark.sql.Dataset<Software> software = spark.createDataset(software_rdd.rdd(),
Encoders.bean(Software.class));
publication.createOrReplaceTempView("publication");
relation.createOrReplaceTempView("relation");
dataset.createOrReplaceTempView("dataset");
software.createOrReplaceTempView("software");
other.createOrReplaceTempView("other");
String communitylist = getConstraintList(" co.id = '", communityIdList);
String semrellist = getConstraintList(" relClass = '", allowedsemrel );
String query = "Select source, community_context, target " +
"from (select id, collect_set(co.id) community_context " +
"from publication " +
"lateral view explode (context) c as co " +
"where datainfo.deletedbyinference = false "+ communitylist +
" group by id) p " +
"JOIN " +
"(select * " +
"from relation " +
"where datainfo.deletedbyinference = false " + semrellist + ") r " +
"ON p.id = r.source";
org.apache.spark.sql.Dataset<Row> publication_context = spark.sql( query);
publication_context.createOrReplaceTempView("publication_context");
//( source, (mes, dh-ch-, ni), target )
query = "select target , collect_set(co) " +
"from (select target, community_context " +
"from publication_context pc join publication p on " +
"p.id = pc.source) tmp " +
"lateral view explode (community_context) c as co " +
"group by target";
org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult = spark.sql(query);
org.apache.spark.sql.Dataset<Row> toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software");
org.apache.spark.sql.Dataset<Row> toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset");
org.apache.spark.sql.Dataset<Row> toupdateotherresult = getUpdateCommunitiesForTable(spark, "other");
createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
//
/*
JavaPairRDD<String, TypedRow> resultLinkedToCommunities = publication
.map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication"))
.filter(p -> !(p == null))
.mapToPair(toPair())
.union(datasets
.map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"dataset"))
.filter(p -> !(p == null))
.mapToPair(toPair())
)
.union(software
.map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"software"))
.filter(p -> !(p == null))
.mapToPair(toPair())
)
.union(other
.map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"otherresearchproduct"))
.filter(p -> !(p == null))
.mapToPair(toPair())
);
JavaPairRDD<String, TypedRow> to_add_result_communities = resultLinkedToCommunities.join(result_result).map(r -> r._2()._1().setSourceId(r._2()._2().getTargetId()))
.mapToPair(toPair());
JavaPairRDD<String, Result> pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p));
JavaPairRDD<String, Result> dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p));
JavaPairRDD<String, Result> sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p));
JavaPairRDD<String, Result> orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p));
updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME);
updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME);
updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME);
updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME);
//leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato]
//per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla
*/
}
private static org.apache.spark.sql.Dataset<Row> getUpdateCommunitiesForTable(SparkSession spark, String table){
String query = "SELECT target_id, collect_set(co.id) context_id " +
" FROM (SELECT t.id target_id, s.context source_context " +
" FROM context_software s " +
" JOIN " + table + " t " +
" ON s.target = t.id " +
" UNION ALL " +
" SELECT t.id target_id, d.context source_context " +
" FROM dataset_context d " +
" JOIN " + table + " t" +
" ON s.target = t.id " +
" UNION ALL " +
" SELECT t.id target_id, p.context source_context " +
" FROM publication_context p" +
" JOIN " + table +" t " +
" on p.target = t.id " +
" UNION ALL " +
" SELECT t.id target_id, o.context source_context " +
" FROM other_context o " +
" JOIN " + table + " t " +
" ON o.target = t.id) TMP " +
" LATERAL VIEW EXPLODE(source_context) MyT as co " +
" GROUP BY target_id" ;
return spark.sql(query);
}
private static JavaRDD<Result> createUpdateForResultDatasetWrite(JavaRDD<Row> toupdateresult, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
return toupdateresult.map(r -> {
List<Context> contextList = new ArrayList();
List<String> toAddContext = r.getList(1);
for (String cId : toAddContext) {
if (communityIdList.contains(cId)) {
Context newContext = new Context();
newContext.setId(cId);
newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
contextList.add(newContext);
}
}
if (contextList.size() > 0) {
Result ret = new Result();
ret.setId(r.getString(0));
ret.setContext(contextList);
return ret;
}
return null;
}).filter(r -> r != null);
}
private static void updateForSoftwareDataset(JavaRDD<Row> toupdateresult, JavaRDD<Software> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
.map(r -> (Software) r)
.map(s -> new ObjectMapper().writeValueAsString(s))
.saveAsTextFile(outputPath + "/" + type);
}
private static void updateForDatasetDataset(JavaRDD<Row> toupdateresult, JavaRDD<Dataset> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
.map( r-> (Dataset)r)
.map(d -> new ObjectMapper().writeValueAsString(d))
.saveAsTextFile(outputPath + "/" + type);
}
private static void updateForPublicationDataset(JavaRDD<Row> toupdateresult, JavaRDD<Publication> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
.map(r -> (Publication)r)
.map(p -> new ObjectMapper().writeValueAsString(p))
.saveAsTextFile(outputPath + "/" + type);
}
private static void updateForOtherDataset(JavaRDD<Row> toupdateresult, JavaRDD<OtherResearchProduct> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
.map( r -> (OtherResearchProduct)r)
.map( o -> new ObjectMapper().writeValueAsString(o))
.saveAsTextFile(outputPath + "/" + type);
}
private static JavaRDD<Result> getUpdateForResultDataset(JavaRDD<Row> toupdateresult, JavaPairRDD<String, Result> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
return result.leftOuterJoin(toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))))
.map(c -> {
if(! c._2()._2().isPresent()){
return c._2()._1();
} }
if (saveGraph) {
List<Object> toAddContext = c._2()._2().get(); execPropagation(
Set<String> context_set = new HashSet<>(); spark, inputPath, outputPath, preparedInfoPath, resultClazz);
for(Object cId: toAddContext){
String id = (String)cId;
if (communityIdList.contains(id)){
context_set.add(id);
}
} }
for (Context context: c._2()._1().getContext()){
if(context_set.contains(context)){
context_set.remove(context);
}
}
List<Context> contextList = context_set.stream().map(co -> {
Context newContext = new Context();
newContext.setId(co);
newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
return newContext;
}).collect(Collectors.toList());
if(contextList.size() > 0 ){
Result r = new Result();
r.setId(c._1());
r.setContext(contextList);
return r;
}
return null;
}).filter(r -> r != null);
// return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))
// .join(result)
// .map(c -> {
// List<Object> toAddContext = c._2()._1();
// Set<String> context_set = new HashSet<>();
// for(Object cId: toAddContext){
// String id = (String)cId;
// if (communityIdList.contains(id)){
// context_set.add(id);
// }
// }
// for (Context context: c._2()._2().getContext()){
// if(context_set.contains(context)){
// context_set.remove(context);
// }
// }
//
// List<Context> contextList = context_set.stream().map(co -> {
// Context newContext = new Context();
// newContext.setId(co);
// newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
// return newContext;
//
// }).collect(Collectors.toList());
//
// if(contextList.size() > 0 ){
// Result r = new Result();
// r.setId(c._1());
// r.setContext(contextList);
// return r;
// }
// return null;
// })
// .filter(r -> r != null);
}
private static JavaRDD<Software> createUpdateForSoftwareDataset(JavaRDD<Row> toupdateresult, List<String> communityList,
JavaRDD<Software> result, String class_id, String class_name) {
return result
.mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList))
.map(c -> {
Software oaf = c._2()._1();
if (c._2()._2().isPresent()) {
HashSet<String> contexts = new HashSet<>(c._2()._2().get());
for (Context context : oaf.getContext()) {
if (contexts.contains(context.getId())){
if (!context.getDataInfo().stream().map(di -> di.getInferenceprovenance())
.collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){
context.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name));
//community id already in the context of the result. Remove it from the set that has to be added
contexts.remove(context.getId());
}
}
}
List<Context> cc = oaf.getContext();
for(String cId: contexts){
Context context = new Context();
context.setId(cId);
context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
cc.add(context);
}
oaf.setContext(cc);
}
return oaf;
}); });
} }
private static JavaPairRDD<String, List<String>> getStringResultJavaPairRDD(JavaRDD<Row> toupdateresult, List<String> communityList) { private static <R extends Result> void execPropagation(
return toupdateresult.mapToPair(c -> { SparkSession spark,
String inputPath,
String outputPath,
String preparedInfoPath,
Class<R> resultClazz) {
List<String> contextList = new ArrayList<>(); org.apache.spark.sql.Dataset<ResultCommunityList> possibleUpdates =
List<String> contexts = c.getList(1); readResultCommunityList(spark, preparedInfoPath);
for (String context : contexts) { org.apache.spark.sql.Dataset<R> result = readPathEntity(spark, inputPath, resultClazz);
if (communityList.contains(context)) {
contextList.add(context);
}
}
return new Tuple2<>(c.getString(0) ,contextList); result.joinWith(
}); possibleUpdates,
} result.col("id").equalTo(possibleUpdates.col("resultId")),
"left_outer")
.map(
private static org.apache.spark.sql.Dataset<Row> getContext(SparkSession spark, String table){ value -> {
String query = "SELECT relation.source, " + table +".context , relation.target " + R ret = value._1();
"FROM " + table + Optional<ResultCommunityList> rcl = Optional.ofNullable(value._2());
" JOIN relation " + if (rcl.isPresent()) {
"ON id = source" ; Set<String> context_set = new HashSet<>();
ret.getContext().stream().forEach(c -> context_set.add(c.getId()));
return spark.sql(query); List<Context> contextList =
} rcl.get().getCommunityList().stream()
.map(
private static Boolean relatedToCommunities(Result r, List<String> communityIdList) { c -> {
Set<String> result_communities = r.getContext() if (!context_set.contains(c)) {
.stream() Context newContext = new Context();
.map(c -> c.getId()) newContext.setId(c);
.collect(Collectors.toSet()); newContext.setDataInfo(
for (String communityId : result_communities) { Arrays.asList(
if (communityIdList.contains(communityId)) { getDataInfo(
return true; PROPAGATION_DATA_INFO_TYPE,
} PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
} PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)));
return false; return newContext;
} }
return null;
private static void updateResult(JavaPairRDD<String, Result> results, JavaPairRDD<String, TypedRow> toupdateresult, String outputPath, String type) { })
results.leftOuterJoin(toupdateresult) .filter(c -> c != null)
.map(p -> { .collect(Collectors.toList());
Result r = p._2()._1(); Result r = new Result();
if (p._2()._2().isPresent()){ r.setId(ret.getId());
Set<String> communityList = p._2()._2().get().getAccumulator(); r.setContext(contextList);
for(Context c: r.getContext()){ ret.mergeFrom(r);
if (communityList.contains(c.getId())){
//verify if the datainfo for this context contains propagation
if (!c.getDataInfo().stream().map(di -> di.getInferenceprovenance()).collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){
c.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME));
//community id already in the context of the result. Remove it from the set that has to be added
communityList.remove(c.getId());
}
} }
}
List<Context> cc = r.getContext();
for(String cId: communityList){
Context context = new Context();
context.setId(cId);
context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)));
cc.add(context);
}
r.setContext(cc);
}
return r;
})
.map(p -> new ObjectMapper().writeValueAsString(p))
.saveAsTextFile(outputPath+"/"+type);
}
return ret;
},
private static TypedRow getTypedRow(List<String> communityIdList, List<Context> context, String id, String type) { Encoders.bean(resultClazz))
Set<String> result_communities = context .toJSON()
.stream() .write()
.map(c -> c.getId()) .mode(SaveMode.Overwrite)
.collect(Collectors.toSet()); .option("compression", "gzip")
TypedRow tp = new TypedRow(); .text(outputPath);
tp.setSourceId(id);
tp.setType(type);
for (String communityId : result_communities) {
if (communityIdList.contains(communityId)) {
tp.add(communityId);
}
}
if (tp.getAccumulator() != null) {
return tp;
}
return null;
} }
} }