This commit is contained in:
Miriam Baglioni 2020-04-23 12:42:07 +02:00
parent 769aa8178a
commit 44fab140de
3 changed files with 1626 additions and 1081 deletions

View File

@ -1,10 +1,14 @@
package eu.dnetlib.dhp.resulttocommunityfromsemrel; package eu.dnetlib.dhp.resulttocommunityfromsemrel;
import static eu.dnetlib.dhp.PropagationConstant.*;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.QueryInformationSystem; import eu.dnetlib.dhp.QueryInformationSystem;
import eu.dnetlib.dhp.TypedRow; import eu.dnetlib.dhp.TypedRow;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -16,28 +20,24 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import scala.Tuple2; import scala.Tuple2;
import java.util.*;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.PropagationConstant.*;
public class SparkResultToCommunityThroughSemRelJob { public class SparkResultToCommunityThroughSemRelJob {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils final ArgumentApplicationParser parser =
.toString(SparkResultToCommunityThroughSemRelJob.class new ArgumentApplicationParser(
.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json"))); IOUtils.toString(
SparkResultToCommunityThroughSemRelJob.class.getResourceAsStream(
"/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_communitytoresult_parameters.json")));
parser.parseArgument(args); parser.parseArgument(args);
for (String key : parser.getObjectMap().keySet()) { for (String key : parser.getObjectMap().keySet()) {
System.out.println(key + " = " + parser.get(key)); System.out.println(key + " = " + parser.get(key));
} }
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
final SparkSession spark = SparkSession final SparkSession spark =
.builder() SparkSession.builder()
.appName(SparkResultToCommunityThroughSemRelJob.class.getSimpleName()) .appName(SparkResultToCommunityThroughSemRelJob.class.getSimpleName())
.master(parser.get("master")) .master(parser.get("master"))
.config(conf) .config(conf)
@ -48,56 +48,74 @@ public class SparkResultToCommunityThroughSemRelJob {
final String inputPath = parser.get("sourcePath"); final String inputPath = parser.get("sourcePath");
final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel"; final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel";
//final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); // final List<String> allowedsemrel =
// Arrays.asList(parser.get("allowedsemrels").split(";"));
final List<String> allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo"); final List<String> allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo");
//final List<String> communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); // final List<String> communityIdList =
final List<String> communityIdList = QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp"); // QueryInformationSystem.getCommunityList(parser.get("isLookupUrl"));
final List<String> communityIdList =
QueryInformationSystem.getCommunityList(
"http://beta.services.openaire.eu:8280/is/services/isLookUp");
createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
JavaRDD<Publication> all_publication_rdd =
JavaRDD<Publication> all_publication_rdd = sc.textFile(inputPath + "/publication") sc.textFile(inputPath + "/publication")
.map(item -> new ObjectMapper().readValue(item, Publication.class)) .map(item -> new ObjectMapper().readValue(item, Publication.class))
.filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); .filter(p -> !p.getDataInfo().getDeletedbyinference())
JavaRDD<Publication> publication_rdd = all_publication_rdd .cache();
.filter(p -> relatedToCommunities(p, communityIdList)).cache(); JavaRDD<Publication> publication_rdd =
all_publication_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache();
JavaRDD<Dataset> all_dataset_rdd = sc.textFile(inputPath + "/dataset") JavaRDD<Dataset> all_dataset_rdd =
sc.textFile(inputPath + "/dataset")
.map(item -> new ObjectMapper().readValue(item, Dataset.class)) .map(item -> new ObjectMapper().readValue(item, Dataset.class))
.filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); .filter(p -> !p.getDataInfo().getDeletedbyinference())
JavaRDD<Dataset> dataset_rdd = all_dataset_rdd .cache();
.filter(p -> relatedToCommunities(p, communityIdList)).cache(); JavaRDD<Dataset> dataset_rdd =
all_dataset_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache();
JavaRDD<OtherResearchProduct> all_orp_rdd = sc.textFile(inputPath + "/otherresearchproduct") JavaRDD<OtherResearchProduct> all_orp_rdd =
sc.textFile(inputPath + "/otherresearchproduct")
.map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)) .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class))
.filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); .filter(p -> !p.getDataInfo().getDeletedbyinference())
JavaRDD<OtherResearchProduct> orp_rdd = all_orp_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache(); .cache();
JavaRDD<OtherResearchProduct> orp_rdd =
all_orp_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache();
JavaRDD<Software> all_software_rdd = sc.textFile(inputPath + "/software") JavaRDD<Software> all_software_rdd =
sc.textFile(inputPath + "/software")
.map(item -> new ObjectMapper().readValue(item, Software.class)) .map(item -> new ObjectMapper().readValue(item, Software.class))
.filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); .filter(p -> !p.getDataInfo().getDeletedbyinference())
JavaRDD<Software> software_rdd = all_software_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache(); .cache();
JavaRDD<Software> software_rdd =
all_software_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache();
JavaRDD<Relation> relation_rdd = sc.textFile(inputPath + "/relation") JavaRDD<Relation> relation_rdd =
sc.textFile(inputPath + "/relation")
.map(item -> new ObjectMapper().readValue(item, Relation.class)) .map(item -> new ObjectMapper().readValue(item, Relation.class))
.filter(r -> !r.getDataInfo().getDeletedbyinference()) .filter(r -> !r.getDataInfo().getDeletedbyinference())
.filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache(); .filter(
r ->
allowedsemrel.contains(r.getRelClass())
&& RELATION_RESULTRESULT_REL_TYPE.equals(
r.getRelType()))
.cache();
org.apache.spark.sql.Dataset<Publication> publication =
spark.createDataset(publication_rdd.rdd(), Encoders.bean(Publication.class));
org.apache.spark.sql.Dataset<Publication> publication = spark.createDataset(publication_rdd.rdd(), org.apache.spark.sql.Dataset<Dataset> dataset =
Encoders.bean(Publication.class)); spark.createDataset(dataset_rdd.rdd(), Encoders.bean(Dataset.class));
org.apache.spark.sql.Dataset<Dataset> dataset = spark.createDataset(dataset_rdd.rdd(), org.apache.spark.sql.Dataset<OtherResearchProduct> other =
Encoders.bean(Dataset.class)); spark.createDataset(orp_rdd.rdd(), Encoders.bean(OtherResearchProduct.class));
org.apache.spark.sql.Dataset<OtherResearchProduct> other = spark.createDataset(orp_rdd.rdd(), org.apache.spark.sql.Dataset<Software> software =
Encoders.bean(OtherResearchProduct.class)); spark.createDataset(software_rdd.rdd(), Encoders.bean(Software.class));
org.apache.spark.sql.Dataset<Software> software = spark.createDataset(software_rdd.rdd(), org.apache.spark.sql.Dataset<Relation> relation =
Encoders.bean(Software.class)); spark.createDataset(relation_rdd.rdd(), Encoders.bean(Relation.class));
org.apache.spark.sql.Dataset<Relation> relation = spark.createDataset(relation_rdd.rdd(),
Encoders.bean(Relation.class));
publication.createOrReplaceTempView("publication"); publication.createOrReplaceTempView("publication");
relation.createOrReplaceTempView("relation"); relation.createOrReplaceTempView("relation");
@ -105,14 +123,17 @@ public class SparkResultToCommunityThroughSemRelJob {
software.createOrReplaceTempView("software"); software.createOrReplaceTempView("software");
other.createOrReplaceTempView("other"); other.createOrReplaceTempView("other");
// org.apache.spark.sql.Dataset<Row> publication_context = getContext(spark, "publication"); // org.apache.spark.sql.Dataset<Row> publication_context = getContext(spark,
// "publication");
// publication_context.createOrReplaceTempView("publication_context"); // publication_context.createOrReplaceTempView("publication_context");
org.apache.spark.sql.Dataset<Row> publication_context = spark.sql( "SELECT relation.source, " + org.apache.spark.sql.Dataset<Row> publication_context =
"publication.context , relation.target " + spark.sql(
"FROM publication " + "SELECT relation.source, "
" JOIN relation " + + "publication.context , relation.target "
"ON id = source"); + "FROM publication "
+ " JOIN relation "
+ "ON id = source");
org.apache.spark.sql.Dataset<Row> dataset_context = getContext(spark, "dataset"); org.apache.spark.sql.Dataset<Row> dataset_context = getContext(spark, "dataset");
dataset_context.createOrReplaceTempView("dataset_context"); dataset_context.createOrReplaceTempView("dataset_context");
@ -123,53 +144,95 @@ public class SparkResultToCommunityThroughSemRelJob {
org.apache.spark.sql.Dataset<Row> other_context = getContext(spark, "other"); org.apache.spark.sql.Dataset<Row> other_context = getContext(spark, "other");
other_context.createOrReplaceTempView("other_context"); other_context.createOrReplaceTempView("other_context");
publication = spark.createDataset(all_publication_rdd.rdd(), publication =
Encoders.bean(Publication.class)); spark.createDataset(all_publication_rdd.rdd(), Encoders.bean(Publication.class));
publication.createOrReplaceTempView("publication"); publication.createOrReplaceTempView("publication");
dataset = spark.createDataset(all_dataset_rdd.rdd(), dataset = spark.createDataset(all_dataset_rdd.rdd(), Encoders.bean(Dataset.class));
Encoders.bean(Dataset.class));
dataset.createOrReplaceTempView("dataset"); dataset.createOrReplaceTempView("dataset");
other = spark.createDataset(all_orp_rdd.rdd(), other = spark.createDataset(all_orp_rdd.rdd(), Encoders.bean(OtherResearchProduct.class));
Encoders.bean(OtherResearchProduct.class));
other.createOrReplaceTempView("other"); other.createOrReplaceTempView("other");
software = spark.createDataset(all_software_rdd.rdd(), software = spark.createDataset(all_software_rdd.rdd(), Encoders.bean(Software.class));
Encoders.bean(Software.class));
software.createOrReplaceTempView("software"); software.createOrReplaceTempView("software");
org.apache.spark.sql.Dataset<Row> toupdatesoftwareresult =
getUpdateCommunitiesForTable(spark, "software");
org.apache.spark.sql.Dataset<Row> toupdatedatasetresult =
getUpdateCommunitiesForTable(spark, "dataset");
org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult =
getUpdateCommunitiesForTable(spark, "publication");
org.apache.spark.sql.Dataset<Row> toupdateotherresult =
getUpdateCommunitiesForTable(spark, "other");
org.apache.spark.sql.Dataset<Row> toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software"); createUpdateForResultDatasetWrite(
org.apache.spark.sql.Dataset<Row> toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset"); toupdatesoftwareresult.toJavaRDD(),
org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult = getUpdateCommunitiesForTable(spark, "publication"); outputPath,
org.apache.spark.sql.Dataset<Row> toupdateotherresult = getUpdateCommunitiesForTable(spark, "other"); "software_update",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
communityIdList);
createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update", createUpdateForResultDatasetWrite(
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); toupdatedatasetresult.toJavaRDD(),
outputPath,
"dataset_update",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
communityIdList);
createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update", createUpdateForResultDatasetWrite(
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); toupdatepublicationreresult.toJavaRDD(),
outputPath,
"publication_update",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
communityIdList);
createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update", createUpdateForResultDatasetWrite(
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); toupdateotherresult.toJavaRDD(),
outputPath,
"other_update",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
communityIdList);
createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update", updateForDatasetDataset(
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); toupdatedatasetresult.toJavaRDD(),
dataset.toJavaRDD(),
outputPath,
"dataset",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
communityIdList);
updateForOtherDataset(
toupdateotherresult.toJavaRDD(),
other.toJavaRDD(),
outputPath,
"otherresearchproduct",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
communityIdList);
updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset", updateForSoftwareDataset(
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); toupdatesoftwareresult.toJavaRDD(),
software.toJavaRDD(),
updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct", outputPath,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); "software",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); communityIdList);
updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
updateForPublicationDataset(
toupdatepublicationreresult.toJavaRDD(),
publication.toJavaRDD(),
outputPath,
"publication",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
communityIdList);
/* /*
JavaPairRDD<String, TypedRow> resultLinkedToCommunities = publication JavaPairRDD<String, TypedRow> resultLinkedToCommunities = publication
@ -209,45 +272,67 @@ public class SparkResultToCommunityThroughSemRelJob {
*/ */
} }
private static org.apache.spark.sql.Dataset<Row> getUpdateCommunitiesForTable(SparkSession spark, String table){ private static org.apache.spark.sql.Dataset<Row> getUpdateCommunitiesForTable(
String query = "SELECT target_id, collect_set(co.id) context_id " + SparkSession spark, String table) {
" FROM (SELECT t.id target_id, s.context source_context " + String query =
" FROM context_software s " + "SELECT target_id, collect_set(co.id) context_id "
" JOIN " + table + " t " + + " FROM (SELECT t.id target_id, s.context source_context "
" ON s.target = t.id " + + " FROM context_software s "
" UNION ALL " + + " JOIN "
" SELECT t.id target_id, d.context source_context " + + table
" FROM dataset_context d " + + " t "
" JOIN " + table + " t" + + " ON s.target = t.id "
" ON s.target = t.id " + + " UNION ALL "
" UNION ALL " + + " SELECT t.id target_id, d.context source_context "
" SELECT t.id target_id, p.context source_context " + + " FROM dataset_context d "
" FROM publication_context p" + + " JOIN "
" JOIN " + table +" t " + + table
" on p.target = t.id " + + " t"
" UNION ALL " + + " ON s.target = t.id "
" SELECT t.id target_id, o.context source_context " + + " UNION ALL "
" FROM other_context o " + + " SELECT t.id target_id, p.context source_context "
" JOIN " + table + " t " + + " FROM publication_context p"
" ON o.target = t.id) TMP " + + " JOIN "
" LATERAL VIEW EXPLODE(source_context) MyT as co " + + table
" GROUP BY target_id" ; + " t "
+ " on p.target = t.id "
+ " UNION ALL "
+ " SELECT t.id target_id, o.context source_context "
+ " FROM other_context o "
+ " JOIN "
+ table
+ " t "
+ " ON o.target = t.id) TMP "
+ " LATERAL VIEW EXPLODE(source_context) MyT as co "
+ " GROUP BY target_id";
return spark.sql(query); return spark.sql(query);
} }
private static JavaRDD<Result> createUpdateForResultDatasetWrite(JavaRDD<Row> toupdateresult, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){ private static JavaRDD<Result> createUpdateForResultDatasetWrite(
return toupdateresult.map(r -> { JavaRDD<Row> toupdateresult,
String outputPath,
String type,
String class_id,
String class_name,
List<String> communityIdList) {
return toupdateresult
.map(
r -> {
List<Context> contextList = new ArrayList(); List<Context> contextList = new ArrayList();
List<String> toAddContext = r.getList(1); List<String> toAddContext = r.getList(1);
for (String cId : toAddContext) { for (String cId : toAddContext) {
if (communityIdList.contains(cId)) { if (communityIdList.contains(cId)) {
Context newContext = new Context(); Context newContext = new Context();
newContext.setId(cId); newContext.setId(cId);
newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); newContext.setDataInfo(
Arrays.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
class_id,
class_name)));
contextList.add(newContext); contextList.add(newContext);
} }
} }
if (contextList.size() > 0) { if (contextList.size() > 0) {
@ -257,46 +342,110 @@ public class SparkResultToCommunityThroughSemRelJob {
return ret; return ret;
} }
return null; return null;
}).filter(r -> r != null); })
.filter(r -> r != null);
} }
private static void updateForSoftwareDataset(JavaRDD<Row> toupdateresult, JavaRDD<Software> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){ private static void updateForSoftwareDataset(
JavaRDD<Row> toupdateresult,
JavaRDD<Software> result,
String outputPath,
String type,
String class_id,
String class_name,
List<String> communityIdList) {
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) getUpdateForResultDataset(
toupdateresult,
tmp,
outputPath,
type,
class_id,
class_name,
communityIdList)
.map(r -> (Software) r) .map(r -> (Software) r)
.map(s -> new ObjectMapper().writeValueAsString(s)) .map(s -> new ObjectMapper().writeValueAsString(s))
.saveAsTextFile(outputPath + "/" + type); .saveAsTextFile(outputPath + "/" + type);
} }
private static void updateForDatasetDataset(JavaRDD<Row> toupdateresult, JavaRDD<Dataset> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){ private static void updateForDatasetDataset(
JavaRDD<Row> toupdateresult,
JavaRDD<Dataset> result,
String outputPath,
String type,
String class_id,
String class_name,
List<String> communityIdList) {
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) getUpdateForResultDataset(
toupdateresult,
tmp,
outputPath,
type,
class_id,
class_name,
communityIdList)
.map(r -> (Dataset) r) .map(r -> (Dataset) r)
.map(d -> new ObjectMapper().writeValueAsString(d)) .map(d -> new ObjectMapper().writeValueAsString(d))
.saveAsTextFile(outputPath + "/" + type); .saveAsTextFile(outputPath + "/" + type);
} }
private static void updateForPublicationDataset(JavaRDD<Row> toupdateresult, JavaRDD<Publication> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){ private static void updateForPublicationDataset(
JavaRDD<Row> toupdateresult,
JavaRDD<Publication> result,
String outputPath,
String type,
String class_id,
String class_name,
List<String> communityIdList) {
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) getUpdateForResultDataset(
toupdateresult,
tmp,
outputPath,
type,
class_id,
class_name,
communityIdList)
.map(r -> (Publication) r) .map(r -> (Publication) r)
.map(p -> new ObjectMapper().writeValueAsString(p)) .map(p -> new ObjectMapper().writeValueAsString(p))
.saveAsTextFile(outputPath + "/" + type); .saveAsTextFile(outputPath + "/" + type);
} }
private static void updateForOtherDataset(JavaRDD<Row> toupdateresult, JavaRDD<OtherResearchProduct> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){ private static void updateForOtherDataset(
JavaRDD<Row> toupdateresult,
JavaRDD<OtherResearchProduct> result,
String outputPath,
String type,
String class_id,
String class_name,
List<String> communityIdList) {
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) getUpdateForResultDataset(
toupdateresult,
tmp,
outputPath,
type,
class_id,
class_name,
communityIdList)
.map(r -> (OtherResearchProduct) r) .map(r -> (OtherResearchProduct) r)
.map(o -> new ObjectMapper().writeValueAsString(o)) .map(o -> new ObjectMapper().writeValueAsString(o))
.saveAsTextFile(outputPath + "/" + type); .saveAsTextFile(outputPath + "/" + type);
} }
private static JavaRDD<Result> getUpdateForResultDataset(
JavaRDD<Row> toupdateresult,
private static JavaRDD<Result> getUpdateForResultDataset(JavaRDD<Row> toupdateresult, JavaPairRDD<String, Result> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){ JavaPairRDD<String, Result> result,
return result.leftOuterJoin(toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))) String outputPath,
.map(c -> { String type,
String class_id,
String class_name,
List<String> communityIdList) {
return result.leftOuterJoin(
toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))))
.map(
c -> {
if (!c._2()._2().isPresent()) { if (!c._2()._2().isPresent()) {
return c._2()._1(); return c._2()._1();
} }
@ -315,13 +464,21 @@ public class SparkResultToCommunityThroughSemRelJob {
} }
} }
List<Context> contextList = context_set.stream().map(co -> { List<Context> contextList =
context_set.stream()
.map(
co -> {
Context newContext = new Context(); Context newContext = new Context();
newContext.setId(co); newContext.setId(co);
newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); newContext.setDataInfo(
Arrays.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
class_id,
class_name)));
return newContext; return newContext;
})
}).collect(Collectors.toList()); .collect(Collectors.toList());
if (contextList.size() > 0) { if (contextList.size() > 0) {
Result r = new Result(); Result r = new Result();
@ -330,8 +487,8 @@ public class SparkResultToCommunityThroughSemRelJob {
return r; return r;
} }
return null; return null;
}).filter(r -> r != null); })
.filter(r -> r != null);
// return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) // return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))
// .join(result) // .join(result)
@ -353,7 +510,9 @@ public class SparkResultToCommunityThroughSemRelJob {
// List<Context> contextList = context_set.stream().map(co -> { // List<Context> contextList = context_set.stream().map(co -> {
// Context newContext = new Context(); // Context newContext = new Context();
// newContext.setId(co); // newContext.setId(co);
// newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); //
// newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id,
// class_name)));
// return newContext; // return newContext;
// //
// }).collect(Collectors.toList()); // }).collect(Collectors.toList());
@ -369,11 +528,16 @@ public class SparkResultToCommunityThroughSemRelJob {
// .filter(r -> r != null); // .filter(r -> r != null);
} }
private static JavaRDD<Software> createUpdateForSoftwareDataset(JavaRDD<Row> toupdateresult, List<String> communityList, private static JavaRDD<Software> createUpdateForSoftwareDataset(
JavaRDD<Software> result, String class_id, String class_name) { JavaRDD<Row> toupdateresult,
return result List<String> communityList,
.mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList)) JavaRDD<Software> result,
.map(c -> { String class_id,
String class_name) {
return result.mapToPair(s -> new Tuple2<>(s.getId(), s))
.leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList))
.map(
c -> {
Software oaf = c._2()._1(); Software oaf = c._2()._1();
if (c._2()._2().isPresent()) { if (c._2()._2().isPresent()) {
@ -381,10 +545,18 @@ public class SparkResultToCommunityThroughSemRelJob {
for (Context context : oaf.getContext()) { for (Context context : oaf.getContext()) {
if (contexts.contains(context.getId())) { if (contexts.contains(context.getId())) {
if (!context.getDataInfo().stream().map(di -> di.getInferenceprovenance()) if (!context.getDataInfo().stream()
.collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ .map(di -> di.getInferenceprovenance())
context.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)); .collect(Collectors.toSet())
//community id already in the context of the result. Remove it from the set that has to be added .contains(PROPAGATION_DATA_INFO_TYPE)) {
context.getDataInfo()
.add(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
class_id,
class_name));
// community id already in the context of the result.
// Remove it from the set that has to be added
contexts.remove(context.getId()); contexts.remove(context.getId());
} }
} }
@ -393,19 +565,24 @@ public class SparkResultToCommunityThroughSemRelJob {
for (String cId : contexts) { for (String cId : contexts) {
Context context = new Context(); Context context = new Context();
context.setId(cId); context.setId(cId);
context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); context.setDataInfo(
Arrays.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
class_id,
class_name)));
cc.add(context); cc.add(context);
} }
oaf.setContext(cc); oaf.setContext(cc);
} }
return oaf; return oaf;
}); });
} }
private static JavaPairRDD<String, List<String>> getStringResultJavaPairRDD(JavaRDD<Row> toupdateresult, List<String> communityList) { private static JavaPairRDD<String, List<String>> getStringResultJavaPairRDD(
return toupdateresult.mapToPair(c -> { JavaRDD<Row> toupdateresult, List<String> communityList) {
return toupdateresult.mapToPair(
c -> {
List<String> contextList = new ArrayList<>(); List<String> contextList = new ArrayList<>();
List<String> contexts = c.getList(1); List<String> contexts = c.getList(1);
for (String context : contexts) { for (String context : contexts) {
@ -418,21 +595,22 @@ public class SparkResultToCommunityThroughSemRelJob {
}); });
} }
private static org.apache.spark.sql.Dataset<Row> getContext(SparkSession spark, String table) { private static org.apache.spark.sql.Dataset<Row> getContext(SparkSession spark, String table) {
String query = "SELECT relation.source, " + table +".context , relation.target " + String query =
"FROM " + table + "SELECT relation.source, "
" JOIN relation " + + table
"ON id = source" ; + ".context , relation.target "
+ "FROM "
+ table
+ " JOIN relation "
+ "ON id = source";
return spark.sql(query); return spark.sql(query);
} }
private static Boolean relatedToCommunities(Result r, List<String> communityIdList) { private static Boolean relatedToCommunities(Result r, List<String> communityIdList) {
Set<String> result_communities = r.getContext() Set<String> result_communities =
.stream() r.getContext().stream().map(c -> c.getId()).collect(Collectors.toSet());
.map(c -> c.getId())
.collect(Collectors.toSet());
for (String communityId : result_communities) { for (String communityId : result_communities) {
if (communityIdList.contains(communityId)) { if (communityIdList.contains(communityId)) {
return true; return true;
@ -441,18 +619,33 @@ public class SparkResultToCommunityThroughSemRelJob {
return false; return false;
} }
private static void updateResult(JavaPairRDD<String, Result> results, JavaPairRDD<String, TypedRow> toupdateresult, String outputPath, String type) { private static void updateResult(
JavaPairRDD<String, Result> results,
JavaPairRDD<String, TypedRow> toupdateresult,
String outputPath,
String type) {
results.leftOuterJoin(toupdateresult) results.leftOuterJoin(toupdateresult)
.map(p -> { .map(
p -> {
Result r = p._2()._1(); Result r = p._2()._1();
if (p._2()._2().isPresent()) { if (p._2()._2().isPresent()) {
Set<String> communityList = p._2()._2().get().getAccumulator(); Set<String> communityList = p._2()._2().get().getAccumulator();
for (Context c : r.getContext()) { for (Context c : r.getContext()) {
if (communityList.contains(c.getId())) { if (communityList.contains(c.getId())) {
//verify if the datainfo for this context contains propagation // verify if the datainfo for this context contains
if (!c.getDataInfo().stream().map(di -> di.getInferenceprovenance()).collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ // propagation
c.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)); if (!c.getDataInfo().stream()
//community id already in the context of the result. Remove it from the set that has to be added .map(di -> di.getInferenceprovenance())
.collect(Collectors.toSet())
.contains(PROPAGATION_DATA_INFO_TYPE)) {
c.getDataInfo()
.add(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME));
// community id already in the context of the result.
// Remove it from the set that has to be added
communityList.remove(c.getId()); communityList.remove(c.getId());
} }
} }
@ -461,7 +654,12 @@ public class SparkResultToCommunityThroughSemRelJob {
for (String cId : communityList) { for (String cId : communityList) {
Context context = new Context(); Context context = new Context();
context.setId(cId); context.setId(cId);
context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); context.setDataInfo(
Arrays.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)));
cc.add(context); cc.add(context);
} }
r.setContext(cc); r.setContext(cc);
@ -472,13 +670,10 @@ public class SparkResultToCommunityThroughSemRelJob {
.saveAsTextFile(outputPath + "/" + type); .saveAsTextFile(outputPath + "/" + type);
} }
private static TypedRow getTypedRow(
List<String> communityIdList, List<Context> context, String id, String type) {
private static TypedRow getTypedRow(List<String> communityIdList, List<Context> context, String id, String type) { Set<String> result_communities =
Set<String> result_communities = context context.stream().map(c -> c.getId()).collect(Collectors.toSet());
.stream()
.map(c -> c.getId())
.collect(Collectors.toSet());
TypedRow tp = new TypedRow(); TypedRow tp = new TypedRow();
tp.setSourceId(id); tp.setSourceId(id);
tp.setType(type); tp.setType(type);

View File

@ -1,10 +1,14 @@
package eu.dnetlib.dhp.resulttocommunityfromsemrel; package eu.dnetlib.dhp.resulttocommunityfromsemrel;
import static eu.dnetlib.dhp.PropagationConstant.*;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.QueryInformationSystem; import eu.dnetlib.dhp.QueryInformationSystem;
import eu.dnetlib.dhp.TypedRow; import eu.dnetlib.dhp.TypedRow;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -16,23 +20,20 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import scala.Tuple2; import scala.Tuple2;
import java.util.*;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.PropagationConstant.*;
public class SparkResultToCommunityThroughSemRelJob2 { public class SparkResultToCommunityThroughSemRelJob2 {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils final ArgumentApplicationParser parser =
.toString(SparkResultToCommunityThroughSemRelJob2.class new ArgumentApplicationParser(
.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json"))); IOUtils.toString(
SparkResultToCommunityThroughSemRelJob2.class.getResourceAsStream(
"/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_communitytoresult_parameters.json")));
parser.parseArgument(args); parser.parseArgument(args);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
final SparkSession spark = SparkSession final SparkSession spark =
.builder() SparkSession.builder()
.appName(SparkResultToCommunityThroughSemRelJob2.class.getSimpleName()) .appName(SparkResultToCommunityThroughSemRelJob2.class.getSimpleName())
.master(parser.get("master")) .master(parser.get("master"))
.config(conf) .config(conf)
@ -45,50 +46,59 @@ public class SparkResultToCommunityThroughSemRelJob2 {
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
// final List<String> allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo"); // final List<String> allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo");
final List<String> communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); final List<String> communityIdList =
//final List<String> communityIdList = QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp"); QueryInformationSystem.getCommunityList(parser.get("isLookupUrl"));
// final List<String> communityIdList =
// QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp");
createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
JavaRDD<Publication> publication_rdd =
JavaRDD<Publication> publication_rdd = sc.textFile(inputPath + "/publication") sc.textFile(inputPath + "/publication")
.map(item -> new ObjectMapper().readValue(item, Publication.class)); .map(item -> new ObjectMapper().readValue(item, Publication.class));
System.out.println(publication_rdd.count()); System.out.println(publication_rdd.count());
// JavaRDD<Dataset> dataset_rdd = sc.textFile(inputPath + "/dataset") // JavaRDD<Dataset> dataset_rdd = sc.textFile(inputPath + "/dataset")
// .map(item -> new ObjectMapper().readValue(item, Dataset.class)); // .map(item -> new ObjectMapper().readValue(item, Dataset.class));
// //
// JavaRDD<OtherResearchProduct> orp_rdd = sc.textFile(inputPath + "/otherresearchproduct") // JavaRDD<OtherResearchProduct> orp_rdd = sc.textFile(inputPath +
// .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); // "/otherresearchproduct")
// .map(item -> new ObjectMapper().readValue(item,
// OtherResearchProduct.class));
// //
// JavaRDD<Software> software_rdd = sc.textFile(inputPath + "/software") // JavaRDD<Software> software_rdd = sc.textFile(inputPath + "/software")
// .map(item -> new ObjectMapper().readValue(item, Software.class)); // .map(item -> new ObjectMapper().readValue(item, Software.class));
JavaRDD<Relation> relation_rdd = sc.textFile(inputPath + "/relation") JavaRDD<Relation> relation_rdd =
sc.textFile(inputPath + "/relation")
.map(item -> new ObjectMapper().readValue(item, Relation.class)); .map(item -> new ObjectMapper().readValue(item, Relation.class));
System.out.println(relation_rdd.count()); System.out.println(relation_rdd.count());
// .filter(r -> !r.getDataInfo().getDeletedbyinference()) // .filter(r -> !r.getDataInfo().getDeletedbyinference())
// .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache(); // .filter(r -> allowedsemrel.contains(r.getRelClass()) &&
// RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache();
org.apache.spark.sql.Dataset<Publication> publication =
spark.createDataset(publication_rdd.rdd(), Encoders.bean(Publication.class));
org.apache.spark.sql.Dataset<Publication> publication = spark.createDataset(publication_rdd.rdd(), org.apache.spark.sql.Dataset<Relation> relation =
Encoders.bean(Publication.class)); spark.createDataset(relation_rdd.rdd(), Encoders.bean(Relation.class));
org.apache.spark.sql.Dataset<Relation> relation = spark.createDataset(relation_rdd.rdd(), // org.apache.spark.sql.Dataset<Dataset> dataset =
Encoders.bean(Relation.class)); // spark.createDataset(dataset_rdd.rdd(),
// org.apache.spark.sql.Dataset<Dataset> dataset = spark.createDataset(dataset_rdd.rdd(),
// Encoders.bean(Dataset.class)); // Encoders.bean(Dataset.class));
// //
// org.apache.spark.sql.Dataset<OtherResearchProduct> other = spark.createDataset(orp_rdd.rdd(), // org.apache.spark.sql.Dataset<OtherResearchProduct> other =
// spark.createDataset(orp_rdd.rdd(),
// Encoders.bean(OtherResearchProduct.class)); // Encoders.bean(OtherResearchProduct.class));
// //
// org.apache.spark.sql.Dataset<Software> software = spark.createDataset(software_rdd.rdd(), // org.apache.spark.sql.Dataset<Software> software =
// spark.createDataset(software_rdd.rdd(),
// Encoders.bean(Software.class)); // Encoders.bean(Software.class));
// //
// org.apache.spark.sql.Dataset<Relation> relation = spark.createDataset(relation_rdd.rdd(), // org.apache.spark.sql.Dataset<Relation> relation =
// spark.createDataset(relation_rdd.rdd(),
// Encoders.bean(Relation.class)); // Encoders.bean(Relation.class));
publication.createOrReplaceTempView("publication"); publication.createOrReplaceTempView("publication");
@ -102,39 +112,40 @@ public class SparkResultToCommunityThroughSemRelJob2 {
String semrellist = getConstraintList(" relClass = '", allowedsemrel); String semrellist = getConstraintList(" relClass = '", allowedsemrel);
String query =
String query = "Select source, community_context, target " + "Select source, community_context, target "
"from (select id, collect_set(co.id) community_context " + + "from (select id, collect_set(co.id) community_context "
"from publication " + + "from publication "
"lateral view explode (context) c as co " + + "lateral view explode (context) c as co "
"where datainfo.deletedbyinference = false "+ communitylist + + "where datainfo.deletedbyinference = false "
" group by id) p " + + communitylist
"JOIN " + + " group by id) p "
"(select * " + + "JOIN "
"from relation " + + "(select * "
"where datainfo.deletedbyinference = false and (relClass = 'isSupplementedBy' OR relClass = 'isSupplementTo')) r " + + "from relation "
"ON p.id = r.source"; + "where datainfo.deletedbyinference = false and (relClass = 'isSupplementedBy' OR relClass = 'isSupplementTo')) r "
+ "ON p.id = r.source";
org.apache.spark.sql.Dataset<Row> publication_context = spark.sql(query); org.apache.spark.sql.Dataset<Row> publication_context = spark.sql(query);
publication_context.createOrReplaceTempView("publication_context"); publication_context.createOrReplaceTempView("publication_context");
// ( source, (mes, dh-ch-, ni), target ) // ( source, (mes, dh-ch-, ni), target )
query = "select target , collect_set(co) " + query =
"from (select target, community_context " + "select target , collect_set(co) "
"from publication_context pc join publication p on " + + "from (select target, community_context "
"p.id = pc.source) tmp " + + "from publication_context pc join publication p on "
"lateral view explode (community_context) c as co " + + "p.id = pc.source) tmp "
"group by target"; + "lateral view explode (community_context) c as co "
+ "group by target";
org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult = spark.sql(query); org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult = spark.sql(query);
System.out.println(toupdatepublicationreresult.count()); System.out.println(toupdatepublicationreresult.count());
toupdatepublicationreresult.toJavaRDD() toupdatepublicationreresult
.map(r -> { .toJavaRDD()
.map(
r -> {
TypedRow tp = new TypedRow(); TypedRow tp = new TypedRow();
tp.setSourceId(r.getString(0)); tp.setSourceId(r.getString(0));
r.getList(1).stream().forEach(c -> tp.add((String) c)); r.getList(1).stream().forEach(c -> tp.add((String) c));
@ -164,35 +175,55 @@ public class SparkResultToCommunityThroughSemRelJob2 {
// .saveAsTextFile(outputPath + "/community2semrel"); // .saveAsTextFile(outputPath + "/community2semrel");
// //
// org.apache.spark.sql.Dataset<Row> toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software"); // org.apache.spark.sql.Dataset<Row> toupdatesoftwareresult =
// org.apache.spark.sql.Dataset<Row> toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset"); // getUpdateCommunitiesForTable(spark, "software");
// org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult = getUpdateCommunitiesForTable(spark, "publication"); // org.apache.spark.sql.Dataset<Row> toupdatedatasetresult =
// org.apache.spark.sql.Dataset<Row> toupdateotherresult = getUpdateCommunitiesForTable(spark, "other"); // getUpdateCommunitiesForTable(spark, "dataset");
// org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult =
// getUpdateCommunitiesForTable(spark, "publication");
// org.apache.spark.sql.Dataset<Row> toupdateotherresult =
// getUpdateCommunitiesForTable(spark, "other");
// createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update", // createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath,
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); // "software_update",
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
// //
// createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update", // createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath,
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); // "dataset_update",
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
// createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update", // createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(),
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); // outputPath, "publication_update",
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
// createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update", // createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath,
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); // "other_update",
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
// //
// //
// updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset", // updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(),
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); // outputPath, "dataset",
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
// //
// updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct", // updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(),
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); // outputPath, "otherresearchproduct",
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
// //
// updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software", // updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(),
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); // outputPath, "software",
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
// //
// updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication", // updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(),
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); // publication.toJavaRDD(), outputPath, "publication",
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
// //
/* /*
@ -233,45 +264,67 @@ public class SparkResultToCommunityThroughSemRelJob2 {
*/ */
} }
private static org.apache.spark.sql.Dataset<Row> getUpdateCommunitiesForTable(SparkSession spark, String table){ private static org.apache.spark.sql.Dataset<Row> getUpdateCommunitiesForTable(
String query = "SELECT target_id, collect_set(co.id) context_id " + SparkSession spark, String table) {
" FROM (SELECT t.id target_id, s.context source_context " + String query =
" FROM context_software s " + "SELECT target_id, collect_set(co.id) context_id "
" JOIN " + table + " t " + + " FROM (SELECT t.id target_id, s.context source_context "
" ON s.target = t.id " + + " FROM context_software s "
" UNION ALL " + + " JOIN "
" SELECT t.id target_id, d.context source_context " + + table
" FROM dataset_context d " + + " t "
" JOIN " + table + " t" + + " ON s.target = t.id "
" ON s.target = t.id " + + " UNION ALL "
" UNION ALL " + + " SELECT t.id target_id, d.context source_context "
" SELECT t.id target_id, p.context source_context " + + " FROM dataset_context d "
" FROM publication_context p" + + " JOIN "
" JOIN " + table +" t " + + table
" on p.target = t.id " + + " t"
" UNION ALL " + + " ON s.target = t.id "
" SELECT t.id target_id, o.context source_context " + + " UNION ALL "
" FROM other_context o " + + " SELECT t.id target_id, p.context source_context "
" JOIN " + table + " t " + + " FROM publication_context p"
" ON o.target = t.id) TMP " + + " JOIN "
" LATERAL VIEW EXPLODE(source_context) MyT as co " + + table
" GROUP BY target_id" ; + " t "
+ " on p.target = t.id "
+ " UNION ALL "
+ " SELECT t.id target_id, o.context source_context "
+ " FROM other_context o "
+ " JOIN "
+ table
+ " t "
+ " ON o.target = t.id) TMP "
+ " LATERAL VIEW EXPLODE(source_context) MyT as co "
+ " GROUP BY target_id";
return spark.sql(query); return spark.sql(query);
} }
private static JavaRDD<Result> createUpdateForResultDatasetWrite(JavaRDD<Row> toupdateresult, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){ private static JavaRDD<Result> createUpdateForResultDatasetWrite(
return toupdateresult.map(r -> { JavaRDD<Row> toupdateresult,
String outputPath,
String type,
String class_id,
String class_name,
List<String> communityIdList) {
return toupdateresult
.map(
r -> {
List<Context> contextList = new ArrayList(); List<Context> contextList = new ArrayList();
List<String> toAddContext = r.getList(1); List<String> toAddContext = r.getList(1);
for (String cId : toAddContext) { for (String cId : toAddContext) {
if (communityIdList.contains(cId)) { if (communityIdList.contains(cId)) {
Context newContext = new Context(); Context newContext = new Context();
newContext.setId(cId); newContext.setId(cId);
newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); newContext.setDataInfo(
Arrays.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
class_id,
class_name)));
contextList.add(newContext); contextList.add(newContext);
} }
} }
if (contextList.size() > 0) { if (contextList.size() > 0) {
@ -281,46 +334,110 @@ public class SparkResultToCommunityThroughSemRelJob2 {
return ret; return ret;
} }
return null; return null;
}).filter(r -> r != null); })
.filter(r -> r != null);
} }
private static void updateForSoftwareDataset(JavaRDD<Row> toupdateresult, JavaRDD<Software> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){ private static void updateForSoftwareDataset(
JavaRDD<Row> toupdateresult,
JavaRDD<Software> result,
String outputPath,
String type,
String class_id,
String class_name,
List<String> communityIdList) {
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) getUpdateForResultDataset(
toupdateresult,
tmp,
outputPath,
type,
class_id,
class_name,
communityIdList)
.map(r -> (Software) r) .map(r -> (Software) r)
.map(s -> new ObjectMapper().writeValueAsString(s)) .map(s -> new ObjectMapper().writeValueAsString(s))
.saveAsTextFile(outputPath + "/" + type); .saveAsTextFile(outputPath + "/" + type);
} }
private static void updateForDatasetDataset(JavaRDD<Row> toupdateresult, JavaRDD<Dataset> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){ private static void updateForDatasetDataset(
JavaRDD<Row> toupdateresult,
JavaRDD<Dataset> result,
String outputPath,
String type,
String class_id,
String class_name,
List<String> communityIdList) {
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) getUpdateForResultDataset(
toupdateresult,
tmp,
outputPath,
type,
class_id,
class_name,
communityIdList)
.map(r -> (Dataset) r) .map(r -> (Dataset) r)
.map(d -> new ObjectMapper().writeValueAsString(d)) .map(d -> new ObjectMapper().writeValueAsString(d))
.saveAsTextFile(outputPath + "/" + type); .saveAsTextFile(outputPath + "/" + type);
} }
private static void updateForPublicationDataset(JavaRDD<Row> toupdateresult, JavaRDD<Publication> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){ private static void updateForPublicationDataset(
JavaRDD<Row> toupdateresult,
JavaRDD<Publication> result,
String outputPath,
String type,
String class_id,
String class_name,
List<String> communityIdList) {
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) getUpdateForResultDataset(
toupdateresult,
tmp,
outputPath,
type,
class_id,
class_name,
communityIdList)
.map(r -> (Publication) r) .map(r -> (Publication) r)
.map(p -> new ObjectMapper().writeValueAsString(p)) .map(p -> new ObjectMapper().writeValueAsString(p))
.saveAsTextFile(outputPath + "/" + type); .saveAsTextFile(outputPath + "/" + type);
} }
private static void updateForOtherDataset(JavaRDD<Row> toupdateresult, JavaRDD<OtherResearchProduct> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){ private static void updateForOtherDataset(
JavaRDD<Row> toupdateresult,
JavaRDD<OtherResearchProduct> result,
String outputPath,
String type,
String class_id,
String class_name,
List<String> communityIdList) {
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) getUpdateForResultDataset(
toupdateresult,
tmp,
outputPath,
type,
class_id,
class_name,
communityIdList)
.map(r -> (OtherResearchProduct) r) .map(r -> (OtherResearchProduct) r)
.map(o -> new ObjectMapper().writeValueAsString(o)) .map(o -> new ObjectMapper().writeValueAsString(o))
.saveAsTextFile(outputPath + "/" + type); .saveAsTextFile(outputPath + "/" + type);
} }
private static JavaRDD<Result> getUpdateForResultDataset(
JavaRDD<Row> toupdateresult,
private static JavaRDD<Result> getUpdateForResultDataset(JavaRDD<Row> toupdateresult, JavaPairRDD<String, Result> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){ JavaPairRDD<String, Result> result,
return result.leftOuterJoin(toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))) String outputPath,
.map(c -> { String type,
String class_id,
String class_name,
List<String> communityIdList) {
return result.leftOuterJoin(
toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))))
.map(
c -> {
if (!c._2()._2().isPresent()) { if (!c._2()._2().isPresent()) {
return c._2()._1(); return c._2()._1();
} }
@ -339,13 +456,21 @@ public class SparkResultToCommunityThroughSemRelJob2 {
} }
} }
List<Context> contextList = context_set.stream().map(co -> { List<Context> contextList =
context_set.stream()
.map(
co -> {
Context newContext = new Context(); Context newContext = new Context();
newContext.setId(co); newContext.setId(co);
newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); newContext.setDataInfo(
Arrays.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
class_id,
class_name)));
return newContext; return newContext;
})
}).collect(Collectors.toList()); .collect(Collectors.toList());
if (contextList.size() > 0) { if (contextList.size() > 0) {
Result r = new Result(); Result r = new Result();
@ -354,8 +479,8 @@ public class SparkResultToCommunityThroughSemRelJob2 {
return r; return r;
} }
return null; return null;
}).filter(r -> r != null); })
.filter(r -> r != null);
// return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) // return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))
// .join(result) // .join(result)
@ -377,7 +502,9 @@ public class SparkResultToCommunityThroughSemRelJob2 {
// List<Context> contextList = context_set.stream().map(co -> { // List<Context> contextList = context_set.stream().map(co -> {
// Context newContext = new Context(); // Context newContext = new Context();
// newContext.setId(co); // newContext.setId(co);
// newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); //
// newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id,
// class_name)));
// return newContext; // return newContext;
// //
// }).collect(Collectors.toList()); // }).collect(Collectors.toList());
@ -393,11 +520,16 @@ public class SparkResultToCommunityThroughSemRelJob2 {
// .filter(r -> r != null); // .filter(r -> r != null);
} }
private static JavaRDD<Software> createUpdateForSoftwareDataset(JavaRDD<Row> toupdateresult, List<String> communityList, private static JavaRDD<Software> createUpdateForSoftwareDataset(
JavaRDD<Software> result, String class_id, String class_name) { JavaRDD<Row> toupdateresult,
return result List<String> communityList,
.mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList)) JavaRDD<Software> result,
.map(c -> { String class_id,
String class_name) {
return result.mapToPair(s -> new Tuple2<>(s.getId(), s))
.leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList))
.map(
c -> {
Software oaf = c._2()._1(); Software oaf = c._2()._1();
if (c._2()._2().isPresent()) { if (c._2()._2().isPresent()) {
@ -405,10 +537,18 @@ public class SparkResultToCommunityThroughSemRelJob2 {
for (Context context : oaf.getContext()) { for (Context context : oaf.getContext()) {
if (contexts.contains(context.getId())) { if (contexts.contains(context.getId())) {
if (!context.getDataInfo().stream().map(di -> di.getInferenceprovenance()) if (!context.getDataInfo().stream()
.collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ .map(di -> di.getInferenceprovenance())
context.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)); .collect(Collectors.toSet())
//community id already in the context of the result. Remove it from the set that has to be added .contains(PROPAGATION_DATA_INFO_TYPE)) {
context.getDataInfo()
.add(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
class_id,
class_name));
// community id already in the context of the result.
// Remove it from the set that has to be added
contexts.remove(context.getId()); contexts.remove(context.getId());
} }
} }
@ -417,19 +557,24 @@ public class SparkResultToCommunityThroughSemRelJob2 {
for (String cId : contexts) { for (String cId : contexts) {
Context context = new Context(); Context context = new Context();
context.setId(cId); context.setId(cId);
context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); context.setDataInfo(
Arrays.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
class_id,
class_name)));
cc.add(context); cc.add(context);
} }
oaf.setContext(cc); oaf.setContext(cc);
} }
return oaf; return oaf;
}); });
} }
private static JavaPairRDD<String, List<String>> getStringResultJavaPairRDD(JavaRDD<Row> toupdateresult, List<String> communityList) { private static JavaPairRDD<String, List<String>> getStringResultJavaPairRDD(
return toupdateresult.mapToPair(c -> { JavaRDD<Row> toupdateresult, List<String> communityList) {
return toupdateresult.mapToPair(
c -> {
List<String> contextList = new ArrayList<>(); List<String> contextList = new ArrayList<>();
List<String> contexts = c.getList(1); List<String> contexts = c.getList(1);
for (String context : contexts) { for (String context : contexts) {
@ -442,21 +587,22 @@ public class SparkResultToCommunityThroughSemRelJob2 {
}); });
} }
private static org.apache.spark.sql.Dataset<Row> getContext(SparkSession spark, String table) { private static org.apache.spark.sql.Dataset<Row> getContext(SparkSession spark, String table) {
String query = "SELECT relation.source, " + table +".context , relation.target " + String query =
"FROM " + table + "SELECT relation.source, "
" JOIN relation " + + table
"ON id = source" ; + ".context , relation.target "
+ "FROM "
+ table
+ " JOIN relation "
+ "ON id = source";
return spark.sql(query); return spark.sql(query);
} }
private static Boolean relatedToCommunities(Result r, List<String> communityIdList) { private static Boolean relatedToCommunities(Result r, List<String> communityIdList) {
Set<String> result_communities = r.getContext() Set<String> result_communities =
.stream() r.getContext().stream().map(c -> c.getId()).collect(Collectors.toSet());
.map(c -> c.getId())
.collect(Collectors.toSet());
for (String communityId : result_communities) { for (String communityId : result_communities) {
if (communityIdList.contains(communityId)) { if (communityIdList.contains(communityId)) {
return true; return true;
@ -465,18 +611,33 @@ public class SparkResultToCommunityThroughSemRelJob2 {
return false; return false;
} }
private static void updateResult(JavaPairRDD<String, Result> results, JavaPairRDD<String, TypedRow> toupdateresult, String outputPath, String type) { private static void updateResult(
JavaPairRDD<String, Result> results,
JavaPairRDD<String, TypedRow> toupdateresult,
String outputPath,
String type) {
results.leftOuterJoin(toupdateresult) results.leftOuterJoin(toupdateresult)
.map(p -> { .map(
p -> {
Result r = p._2()._1(); Result r = p._2()._1();
if (p._2()._2().isPresent()) { if (p._2()._2().isPresent()) {
Set<String> communityList = p._2()._2().get().getAccumulator(); Set<String> communityList = p._2()._2().get().getAccumulator();
for (Context c : r.getContext()) { for (Context c : r.getContext()) {
if (communityList.contains(c.getId())) { if (communityList.contains(c.getId())) {
//verify if the datainfo for this context contains propagation // verify if the datainfo for this context contains
if (!c.getDataInfo().stream().map(di -> di.getInferenceprovenance()).collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ // propagation
c.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)); if (!c.getDataInfo().stream()
//community id already in the context of the result. Remove it from the set that has to be added .map(di -> di.getInferenceprovenance())
.collect(Collectors.toSet())
.contains(PROPAGATION_DATA_INFO_TYPE)) {
c.getDataInfo()
.add(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME));
// community id already in the context of the result.
// Remove it from the set that has to be added
communityList.remove(c.getId()); communityList.remove(c.getId());
} }
} }
@ -485,7 +646,12 @@ public class SparkResultToCommunityThroughSemRelJob2 {
for (String cId : communityList) { for (String cId : communityList) {
Context context = new Context(); Context context = new Context();
context.setId(cId); context.setId(cId);
context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); context.setDataInfo(
Arrays.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)));
cc.add(context); cc.add(context);
} }
r.setContext(cc); r.setContext(cc);
@ -496,13 +662,10 @@ public class SparkResultToCommunityThroughSemRelJob2 {
.saveAsTextFile(outputPath + "/" + type); .saveAsTextFile(outputPath + "/" + type);
} }
private static TypedRow getTypedRow(
List<String> communityIdList, List<Context> context, String id, String type) {
private static TypedRow getTypedRow(List<String> communityIdList, List<Context> context, String id, String type) { Set<String> result_communities =
Set<String> result_communities = context context.stream().map(c -> c.getId()).collect(Collectors.toSet());
.stream()
.map(c -> c.getId())
.collect(Collectors.toSet());
TypedRow tp = new TypedRow(); TypedRow tp = new TypedRow();
tp.setSourceId(id); tp.setSourceId(id);
tp.setType(type); tp.setType(type);

View File

@ -1,10 +1,14 @@
package eu.dnetlib.dhp.resulttocommunityfromsemrel; package eu.dnetlib.dhp.resulttocommunityfromsemrel;
import static eu.dnetlib.dhp.PropagationConstant.*;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.QueryInformationSystem; import eu.dnetlib.dhp.QueryInformationSystem;
import eu.dnetlib.dhp.TypedRow; import eu.dnetlib.dhp.TypedRow;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -16,23 +20,20 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import scala.Tuple2; import scala.Tuple2;
import java.util.*;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.PropagationConstant.*;
public class SparkResultToCommunityThroughSemRelJob3 { public class SparkResultToCommunityThroughSemRelJob3 {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils final ArgumentApplicationParser parser =
.toString(SparkResultToCommunityThroughSemRelJob3.class new ArgumentApplicationParser(
.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json"))); IOUtils.toString(
SparkResultToCommunityThroughSemRelJob3.class.getResourceAsStream(
"/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_communitytoresult_parameters.json")));
parser.parseArgument(args); parser.parseArgument(args);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
final SparkSession spark = SparkSession final SparkSession spark =
.builder() SparkSession.builder()
.appName(SparkResultToCommunityThroughSemRelJob3.class.getSimpleName()) .appName(SparkResultToCommunityThroughSemRelJob3.class.getSimpleName())
.master(parser.get("master")) .master(parser.get("master"))
.config(conf) .config(conf)
@ -45,42 +46,48 @@ public class SparkResultToCommunityThroughSemRelJob3 {
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
final List<String> communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); final List<String> communityIdList =
QueryInformationSystem.getCommunityList(parser.get("isLookupUrl"));
createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
JavaRDD<Publication> publication_rdd =
JavaRDD<Publication> publication_rdd = sc.textFile(inputPath + "/publication") sc.textFile(inputPath + "/publication")
.map(item -> new ObjectMapper().readValue(item, Publication.class)); .map(item -> new ObjectMapper().readValue(item, Publication.class));
JavaRDD<Dataset> dataset_rdd = sc.textFile(inputPath + "/dataset") JavaRDD<Dataset> dataset_rdd =
sc.textFile(inputPath + "/dataset")
.map(item -> new ObjectMapper().readValue(item, Dataset.class)); .map(item -> new ObjectMapper().readValue(item, Dataset.class));
JavaRDD<OtherResearchProduct> orp_rdd = sc.textFile(inputPath + "/otherresearchproduct") JavaRDD<OtherResearchProduct> orp_rdd =
.map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); sc.textFile(inputPath + "/otherresearchproduct")
.map(
item ->
new ObjectMapper()
.readValue(item, OtherResearchProduct.class));
JavaRDD<Software> software_rdd = sc.textFile(inputPath + "/software") JavaRDD<Software> software_rdd =
sc.textFile(inputPath + "/software")
.map(item -> new ObjectMapper().readValue(item, Software.class)); .map(item -> new ObjectMapper().readValue(item, Software.class));
JavaRDD<Relation> relation_rdd = sc.textFile(inputPath + "/relation") JavaRDD<Relation> relation_rdd =
sc.textFile(inputPath + "/relation")
.map(item -> new ObjectMapper().readValue(item, Relation.class)); .map(item -> new ObjectMapper().readValue(item, Relation.class));
org.apache.spark.sql.Dataset<Publication> publication =
spark.createDataset(publication_rdd.rdd(), Encoders.bean(Publication.class));
org.apache.spark.sql.Dataset<Publication> publication = spark.createDataset(publication_rdd.rdd(), org.apache.spark.sql.Dataset<Relation> relation =
Encoders.bean(Publication.class)); spark.createDataset(relation_rdd.rdd(), Encoders.bean(Relation.class));
org.apache.spark.sql.Dataset<Relation> relation = spark.createDataset(relation_rdd.rdd(), org.apache.spark.sql.Dataset<Dataset> dataset =
Encoders.bean(Relation.class)); spark.createDataset(dataset_rdd.rdd(), Encoders.bean(Dataset.class));
org.apache.spark.sql.Dataset<Dataset> dataset = spark.createDataset(dataset_rdd.rdd(), org.apache.spark.sql.Dataset<OtherResearchProduct> other =
Encoders.bean(Dataset.class)); spark.createDataset(orp_rdd.rdd(), Encoders.bean(OtherResearchProduct.class));
org.apache.spark.sql.Dataset<OtherResearchProduct> other = spark.createDataset(orp_rdd.rdd(),
Encoders.bean(OtherResearchProduct.class));
org.apache.spark.sql.Dataset<Software> software = spark.createDataset(software_rdd.rdd(),
Encoders.bean(Software.class));
org.apache.spark.sql.Dataset<Software> software =
spark.createDataset(software_rdd.rdd(), Encoders.bean(Software.class));
publication.createOrReplaceTempView("publication"); publication.createOrReplaceTempView("publication");
relation.createOrReplaceTempView("relation"); relation.createOrReplaceTempView("relation");
@ -92,61 +99,109 @@ public class SparkResultToCommunityThroughSemRelJob3 {
String semrellist = getConstraintList(" relClass = '", allowedsemrel); String semrellist = getConstraintList(" relClass = '", allowedsemrel);
String query =
String query = "Select source, community_context, target " + "Select source, community_context, target "
"from (select id, collect_set(co.id) community_context " + + "from (select id, collect_set(co.id) community_context "
"from publication " + + "from publication "
"lateral view explode (context) c as co " + + "lateral view explode (context) c as co "
"where datainfo.deletedbyinference = false "+ communitylist + + "where datainfo.deletedbyinference = false "
" group by id) p " + + communitylist
"JOIN " + + " group by id) p "
"(select * " + + "JOIN "
"from relation " + + "(select * "
"where datainfo.deletedbyinference = false " + semrellist + ") r " + + "from relation "
"ON p.id = r.source"; + "where datainfo.deletedbyinference = false "
+ semrellist
+ ") r "
+ "ON p.id = r.source";
org.apache.spark.sql.Dataset<Row> publication_context = spark.sql(query); org.apache.spark.sql.Dataset<Row> publication_context = spark.sql(query);
publication_context.createOrReplaceTempView("publication_context"); publication_context.createOrReplaceTempView("publication_context");
// ( source, (mes, dh-ch-, ni), target ) // ( source, (mes, dh-ch-, ni), target )
query = "select target , collect_set(co) " + query =
"from (select target, community_context " + "select target , collect_set(co) "
"from publication_context pc join publication p on " + + "from (select target, community_context "
"p.id = pc.source) tmp " + + "from publication_context pc join publication p on "
"lateral view explode (community_context) c as co " + + "p.id = pc.source) tmp "
"group by target"; + "lateral view explode (community_context) c as co "
+ "group by target";
org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult = spark.sql(query); org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult = spark.sql(query);
org.apache.spark.sql.Dataset<Row> toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software"); org.apache.spark.sql.Dataset<Row> toupdatesoftwareresult =
org.apache.spark.sql.Dataset<Row> toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset"); getUpdateCommunitiesForTable(spark, "software");
org.apache.spark.sql.Dataset<Row> toupdateotherresult = getUpdateCommunitiesForTable(spark, "other"); org.apache.spark.sql.Dataset<Row> toupdatedatasetresult =
getUpdateCommunitiesForTable(spark, "dataset");
org.apache.spark.sql.Dataset<Row> toupdateotherresult =
getUpdateCommunitiesForTable(spark, "other");
createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update", createUpdateForResultDatasetWrite(
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); toupdatesoftwareresult.toJavaRDD(),
outputPath,
"software_update",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
communityIdList);
createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update", createUpdateForResultDatasetWrite(
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); toupdatedatasetresult.toJavaRDD(),
outputPath,
"dataset_update",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
communityIdList);
createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update", createUpdateForResultDatasetWrite(
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); toupdatepublicationreresult.toJavaRDD(),
outputPath,
"publication_update",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
communityIdList);
createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update", createUpdateForResultDatasetWrite(
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); toupdateotherresult.toJavaRDD(),
outputPath,
"other_update",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
communityIdList);
updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset", updateForDatasetDataset(
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); toupdatedatasetresult.toJavaRDD(),
dataset.toJavaRDD(),
outputPath,
"dataset",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
communityIdList);
updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct", updateForOtherDataset(
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); toupdateotherresult.toJavaRDD(),
other.toJavaRDD(),
outputPath,
"otherresearchproduct",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
communityIdList);
updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software", updateForSoftwareDataset(
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); toupdatesoftwareresult.toJavaRDD(),
software.toJavaRDD(),
outputPath,
"software",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
communityIdList);
updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication", updateForPublicationDataset(
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); toupdatepublicationreresult.toJavaRDD(),
publication.toJavaRDD(),
outputPath,
"publication",
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
communityIdList);
// //
/* /*
@ -187,45 +242,67 @@ public class SparkResultToCommunityThroughSemRelJob3 {
*/ */
} }
private static org.apache.spark.sql.Dataset<Row> getUpdateCommunitiesForTable(SparkSession spark, String table){ private static org.apache.spark.sql.Dataset<Row> getUpdateCommunitiesForTable(
String query = "SELECT target_id, collect_set(co.id) context_id " + SparkSession spark, String table) {
" FROM (SELECT t.id target_id, s.context source_context " + String query =
" FROM context_software s " + "SELECT target_id, collect_set(co.id) context_id "
" JOIN " + table + " t " + + " FROM (SELECT t.id target_id, s.context source_context "
" ON s.target = t.id " + + " FROM context_software s "
" UNION ALL " + + " JOIN "
" SELECT t.id target_id, d.context source_context " + + table
" FROM dataset_context d " + + " t "
" JOIN " + table + " t" + + " ON s.target = t.id "
" ON s.target = t.id " + + " UNION ALL "
" UNION ALL " + + " SELECT t.id target_id, d.context source_context "
" SELECT t.id target_id, p.context source_context " + + " FROM dataset_context d "
" FROM publication_context p" + + " JOIN "
" JOIN " + table +" t " + + table
" on p.target = t.id " + + " t"
" UNION ALL " + + " ON s.target = t.id "
" SELECT t.id target_id, o.context source_context " + + " UNION ALL "
" FROM other_context o " + + " SELECT t.id target_id, p.context source_context "
" JOIN " + table + " t " + + " FROM publication_context p"
" ON o.target = t.id) TMP " + + " JOIN "
" LATERAL VIEW EXPLODE(source_context) MyT as co " + + table
" GROUP BY target_id" ; + " t "
+ " on p.target = t.id "
+ " UNION ALL "
+ " SELECT t.id target_id, o.context source_context "
+ " FROM other_context o "
+ " JOIN "
+ table
+ " t "
+ " ON o.target = t.id) TMP "
+ " LATERAL VIEW EXPLODE(source_context) MyT as co "
+ " GROUP BY target_id";
return spark.sql(query); return spark.sql(query);
} }
private static JavaRDD<Result> createUpdateForResultDatasetWrite(JavaRDD<Row> toupdateresult, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){ private static JavaRDD<Result> createUpdateForResultDatasetWrite(
return toupdateresult.map(r -> { JavaRDD<Row> toupdateresult,
String outputPath,
String type,
String class_id,
String class_name,
List<String> communityIdList) {
return toupdateresult
.map(
r -> {
List<Context> contextList = new ArrayList(); List<Context> contextList = new ArrayList();
List<String> toAddContext = r.getList(1); List<String> toAddContext = r.getList(1);
for (String cId : toAddContext) { for (String cId : toAddContext) {
if (communityIdList.contains(cId)) { if (communityIdList.contains(cId)) {
Context newContext = new Context(); Context newContext = new Context();
newContext.setId(cId); newContext.setId(cId);
newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); newContext.setDataInfo(
Arrays.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
class_id,
class_name)));
contextList.add(newContext); contextList.add(newContext);
} }
} }
if (contextList.size() > 0) { if (contextList.size() > 0) {
@ -235,46 +312,110 @@ public class SparkResultToCommunityThroughSemRelJob3 {
return ret; return ret;
} }
return null; return null;
}).filter(r -> r != null); })
.filter(r -> r != null);
} }
private static void updateForSoftwareDataset(JavaRDD<Row> toupdateresult, JavaRDD<Software> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){ private static void updateForSoftwareDataset(
JavaRDD<Row> toupdateresult,
JavaRDD<Software> result,
String outputPath,
String type,
String class_id,
String class_name,
List<String> communityIdList) {
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) getUpdateForResultDataset(
toupdateresult,
tmp,
outputPath,
type,
class_id,
class_name,
communityIdList)
.map(r -> (Software) r) .map(r -> (Software) r)
.map(s -> new ObjectMapper().writeValueAsString(s)) .map(s -> new ObjectMapper().writeValueAsString(s))
.saveAsTextFile(outputPath + "/" + type); .saveAsTextFile(outputPath + "/" + type);
} }
private static void updateForDatasetDataset(JavaRDD<Row> toupdateresult, JavaRDD<Dataset> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){ private static void updateForDatasetDataset(
JavaRDD<Row> toupdateresult,
JavaRDD<Dataset> result,
String outputPath,
String type,
String class_id,
String class_name,
List<String> communityIdList) {
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) getUpdateForResultDataset(
toupdateresult,
tmp,
outputPath,
type,
class_id,
class_name,
communityIdList)
.map(r -> (Dataset) r) .map(r -> (Dataset) r)
.map(d -> new ObjectMapper().writeValueAsString(d)) .map(d -> new ObjectMapper().writeValueAsString(d))
.saveAsTextFile(outputPath + "/" + type); .saveAsTextFile(outputPath + "/" + type);
} }
private static void updateForPublicationDataset(JavaRDD<Row> toupdateresult, JavaRDD<Publication> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){ private static void updateForPublicationDataset(
JavaRDD<Row> toupdateresult,
JavaRDD<Publication> result,
String outputPath,
String type,
String class_id,
String class_name,
List<String> communityIdList) {
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) getUpdateForResultDataset(
toupdateresult,
tmp,
outputPath,
type,
class_id,
class_name,
communityIdList)
.map(r -> (Publication) r) .map(r -> (Publication) r)
.map(p -> new ObjectMapper().writeValueAsString(p)) .map(p -> new ObjectMapper().writeValueAsString(p))
.saveAsTextFile(outputPath + "/" + type); .saveAsTextFile(outputPath + "/" + type);
} }
private static void updateForOtherDataset(JavaRDD<Row> toupdateresult, JavaRDD<OtherResearchProduct> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){ private static void updateForOtherDataset(
JavaRDD<Row> toupdateresult,
JavaRDD<OtherResearchProduct> result,
String outputPath,
String type,
String class_id,
String class_name,
List<String> communityIdList) {
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) getUpdateForResultDataset(
toupdateresult,
tmp,
outputPath,
type,
class_id,
class_name,
communityIdList)
.map(r -> (OtherResearchProduct) r) .map(r -> (OtherResearchProduct) r)
.map(o -> new ObjectMapper().writeValueAsString(o)) .map(o -> new ObjectMapper().writeValueAsString(o))
.saveAsTextFile(outputPath + "/" + type); .saveAsTextFile(outputPath + "/" + type);
} }
private static JavaRDD<Result> getUpdateForResultDataset(
JavaRDD<Row> toupdateresult,
private static JavaRDD<Result> getUpdateForResultDataset(JavaRDD<Row> toupdateresult, JavaPairRDD<String, Result> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){ JavaPairRDD<String, Result> result,
return result.leftOuterJoin(toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))) String outputPath,
.map(c -> { String type,
String class_id,
String class_name,
List<String> communityIdList) {
return result.leftOuterJoin(
toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))))
.map(
c -> {
if (!c._2()._2().isPresent()) { if (!c._2()._2().isPresent()) {
return c._2()._1(); return c._2()._1();
} }
@ -293,13 +434,21 @@ public class SparkResultToCommunityThroughSemRelJob3 {
} }
} }
List<Context> contextList = context_set.stream().map(co -> { List<Context> contextList =
context_set.stream()
.map(
co -> {
Context newContext = new Context(); Context newContext = new Context();
newContext.setId(co); newContext.setId(co);
newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); newContext.setDataInfo(
Arrays.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
class_id,
class_name)));
return newContext; return newContext;
})
}).collect(Collectors.toList()); .collect(Collectors.toList());
if (contextList.size() > 0) { if (contextList.size() > 0) {
Result r = new Result(); Result r = new Result();
@ -308,8 +457,8 @@ public class SparkResultToCommunityThroughSemRelJob3 {
return r; return r;
} }
return null; return null;
}).filter(r -> r != null); })
.filter(r -> r != null);
// return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) // return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))
// .join(result) // .join(result)
@ -331,7 +480,9 @@ public class SparkResultToCommunityThroughSemRelJob3 {
// List<Context> contextList = context_set.stream().map(co -> { // List<Context> contextList = context_set.stream().map(co -> {
// Context newContext = new Context(); // Context newContext = new Context();
// newContext.setId(co); // newContext.setId(co);
// newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); //
// newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id,
// class_name)));
// return newContext; // return newContext;
// //
// }).collect(Collectors.toList()); // }).collect(Collectors.toList());
@ -347,11 +498,16 @@ public class SparkResultToCommunityThroughSemRelJob3 {
// .filter(r -> r != null); // .filter(r -> r != null);
} }
private static JavaRDD<Software> createUpdateForSoftwareDataset(JavaRDD<Row> toupdateresult, List<String> communityList, private static JavaRDD<Software> createUpdateForSoftwareDataset(
JavaRDD<Software> result, String class_id, String class_name) { JavaRDD<Row> toupdateresult,
return result List<String> communityList,
.mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList)) JavaRDD<Software> result,
.map(c -> { String class_id,
String class_name) {
return result.mapToPair(s -> new Tuple2<>(s.getId(), s))
.leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList))
.map(
c -> {
Software oaf = c._2()._1(); Software oaf = c._2()._1();
if (c._2()._2().isPresent()) { if (c._2()._2().isPresent()) {
@ -359,10 +515,18 @@ public class SparkResultToCommunityThroughSemRelJob3 {
for (Context context : oaf.getContext()) { for (Context context : oaf.getContext()) {
if (contexts.contains(context.getId())) { if (contexts.contains(context.getId())) {
if (!context.getDataInfo().stream().map(di -> di.getInferenceprovenance()) if (!context.getDataInfo().stream()
.collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ .map(di -> di.getInferenceprovenance())
context.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)); .collect(Collectors.toSet())
//community id already in the context of the result. Remove it from the set that has to be added .contains(PROPAGATION_DATA_INFO_TYPE)) {
context.getDataInfo()
.add(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
class_id,
class_name));
// community id already in the context of the result.
// Remove it from the set that has to be added
contexts.remove(context.getId()); contexts.remove(context.getId());
} }
} }
@ -371,19 +535,24 @@ public class SparkResultToCommunityThroughSemRelJob3 {
for (String cId : contexts) { for (String cId : contexts) {
Context context = new Context(); Context context = new Context();
context.setId(cId); context.setId(cId);
context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); context.setDataInfo(
Arrays.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
class_id,
class_name)));
cc.add(context); cc.add(context);
} }
oaf.setContext(cc); oaf.setContext(cc);
} }
return oaf; return oaf;
}); });
} }
private static JavaPairRDD<String, List<String>> getStringResultJavaPairRDD(JavaRDD<Row> toupdateresult, List<String> communityList) { private static JavaPairRDD<String, List<String>> getStringResultJavaPairRDD(
return toupdateresult.mapToPair(c -> { JavaRDD<Row> toupdateresult, List<String> communityList) {
return toupdateresult.mapToPair(
c -> {
List<String> contextList = new ArrayList<>(); List<String> contextList = new ArrayList<>();
List<String> contexts = c.getList(1); List<String> contexts = c.getList(1);
for (String context : contexts) { for (String context : contexts) {
@ -396,21 +565,22 @@ public class SparkResultToCommunityThroughSemRelJob3 {
}); });
} }
private static org.apache.spark.sql.Dataset<Row> getContext(SparkSession spark, String table) { private static org.apache.spark.sql.Dataset<Row> getContext(SparkSession spark, String table) {
String query = "SELECT relation.source, " + table +".context , relation.target " + String query =
"FROM " + table + "SELECT relation.source, "
" JOIN relation " + + table
"ON id = source" ; + ".context , relation.target "
+ "FROM "
+ table
+ " JOIN relation "
+ "ON id = source";
return spark.sql(query); return spark.sql(query);
} }
private static Boolean relatedToCommunities(Result r, List<String> communityIdList) { private static Boolean relatedToCommunities(Result r, List<String> communityIdList) {
Set<String> result_communities = r.getContext() Set<String> result_communities =
.stream() r.getContext().stream().map(c -> c.getId()).collect(Collectors.toSet());
.map(c -> c.getId())
.collect(Collectors.toSet());
for (String communityId : result_communities) { for (String communityId : result_communities) {
if (communityIdList.contains(communityId)) { if (communityIdList.contains(communityId)) {
return true; return true;
@ -419,18 +589,33 @@ public class SparkResultToCommunityThroughSemRelJob3 {
return false; return false;
} }
private static void updateResult(JavaPairRDD<String, Result> results, JavaPairRDD<String, TypedRow> toupdateresult, String outputPath, String type) { private static void updateResult(
JavaPairRDD<String, Result> results,
JavaPairRDD<String, TypedRow> toupdateresult,
String outputPath,
String type) {
results.leftOuterJoin(toupdateresult) results.leftOuterJoin(toupdateresult)
.map(p -> { .map(
p -> {
Result r = p._2()._1(); Result r = p._2()._1();
if (p._2()._2().isPresent()) { if (p._2()._2().isPresent()) {
Set<String> communityList = p._2()._2().get().getAccumulator(); Set<String> communityList = p._2()._2().get().getAccumulator();
for (Context c : r.getContext()) { for (Context c : r.getContext()) {
if (communityList.contains(c.getId())) { if (communityList.contains(c.getId())) {
//verify if the datainfo for this context contains propagation // verify if the datainfo for this context contains
if (!c.getDataInfo().stream().map(di -> di.getInferenceprovenance()).collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ // propagation
c.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)); if (!c.getDataInfo().stream()
//community id already in the context of the result. Remove it from the set that has to be added .map(di -> di.getInferenceprovenance())
.collect(Collectors.toSet())
.contains(PROPAGATION_DATA_INFO_TYPE)) {
c.getDataInfo()
.add(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME));
// community id already in the context of the result.
// Remove it from the set that has to be added
communityList.remove(c.getId()); communityList.remove(c.getId());
} }
} }
@ -439,7 +624,12 @@ public class SparkResultToCommunityThroughSemRelJob3 {
for (String cId : communityList) { for (String cId : communityList) {
Context context = new Context(); Context context = new Context();
context.setId(cId); context.setId(cId);
context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); context.setDataInfo(
Arrays.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)));
cc.add(context); cc.add(context);
} }
r.setContext(cc); r.setContext(cc);
@ -450,13 +640,10 @@ public class SparkResultToCommunityThroughSemRelJob3 {
.saveAsTextFile(outputPath + "/" + type); .saveAsTextFile(outputPath + "/" + type);
} }
private static TypedRow getTypedRow(
List<String> communityIdList, List<Context> context, String id, String type) {
private static TypedRow getTypedRow(List<String> communityIdList, List<Context> context, String id, String type) { Set<String> result_communities =
Set<String> result_communities = context context.stream().map(c -> c.getId()).collect(Collectors.toSet());
.stream()
.map(c -> c.getId())
.collect(Collectors.toSet());
TypedRow tp = new TypedRow(); TypedRow tp = new TypedRow();
tp.setSourceId(id); tp.setSourceId(id);
tp.setType(type); tp.setType(type);