From 44fab140de0cb06323385be07eb99e5983e00000 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 23 Apr 2020 12:42:07 +0200 Subject: [PATCH] - --- ...parkResultToCommunityThroughSemRelJob.java | 895 ++++++++++------- ...arkResultToCommunityThroughSemRelJob2.java | 943 ++++++++++-------- ...arkResultToCommunityThroughSemRelJob3.java | 869 +++++++++------- 3 files changed, 1626 insertions(+), 1081 deletions(-) diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java index f5c859280..cc90feac2 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java @@ -1,10 +1,14 @@ package eu.dnetlib.dhp.resulttocommunityfromsemrel; +import static eu.dnetlib.dhp.PropagationConstant.*; + import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.QueryInformationSystem; import eu.dnetlib.dhp.TypedRow; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.*; +import java.util.*; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.spark.SparkConf; @@ -16,88 +20,102 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import scala.Tuple2; -import java.util.*; -import java.util.stream.Collectors; - -import static eu.dnetlib.dhp.PropagationConstant.*; - public class SparkResultToCommunityThroughSemRelJob { public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils - .toString(SparkResultToCommunityThroughSemRelJob.class - .getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json"))); + final ArgumentApplicationParser parser = + new ArgumentApplicationParser( + IOUtils.toString( + SparkResultToCommunityThroughSemRelJob.class.getResourceAsStream( + "/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_communitytoresult_parameters.json"))); parser.parseArgument(args); - for(String key : parser.getObjectMap().keySet()){ + for (String key : parser.getObjectMap().keySet()) { System.out.println(key + " = " + parser.get(key)); } - - SparkConf conf = new SparkConf(); + SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - final SparkSession spark = SparkSession - .builder() - .appName(SparkResultToCommunityThroughSemRelJob.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .enableHiveSupport() - .getOrCreate(); + final SparkSession spark = + SparkSession.builder() + .appName(SparkResultToCommunityThroughSemRelJob.class.getSimpleName()) + .master(parser.get("master")) + .config(conf) + .enableHiveSupport() + .getOrCreate(); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final String inputPath = parser.get("sourcePath"); final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel"; - //final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); + // final List allowedsemrel = + // Arrays.asList(parser.get("allowedsemrels").split(";")); final List allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo"); - //final List communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); - final List communityIdList = QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp"); + // final List communityIdList = + // QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); + final List communityIdList = + QueryInformationSystem.getCommunityList( + "http://beta.services.openaire.eu:8280/is/services/isLookUp"); createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); + JavaRDD all_publication_rdd = + sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, Publication.class)) + .filter(p -> !p.getDataInfo().getDeletedbyinference()) + .cache(); + JavaRDD publication_rdd = + all_publication_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache(); - JavaRDD all_publication_rdd = sc.textFile(inputPath + "/publication") - .map(item -> new ObjectMapper().readValue(item, Publication.class)) - .filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); - JavaRDD publication_rdd = all_publication_rdd - .filter(p -> relatedToCommunities(p, communityIdList)).cache(); + JavaRDD all_dataset_rdd = + sc.textFile(inputPath + "/dataset") + .map(item -> new ObjectMapper().readValue(item, Dataset.class)) + .filter(p -> !p.getDataInfo().getDeletedbyinference()) + .cache(); + JavaRDD dataset_rdd = + all_dataset_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache(); - JavaRDD all_dataset_rdd = sc.textFile(inputPath + "/dataset") - .map(item -> new ObjectMapper().readValue(item, Dataset.class)) - .filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); - JavaRDD dataset_rdd = all_dataset_rdd - .filter(p -> relatedToCommunities(p, communityIdList)).cache(); + JavaRDD all_orp_rdd = + sc.textFile(inputPath + "/otherresearchproduct") + .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)) + .filter(p -> !p.getDataInfo().getDeletedbyinference()) + .cache(); + JavaRDD orp_rdd = + all_orp_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache(); - JavaRDD all_orp_rdd = sc.textFile(inputPath + "/otherresearchproduct") - .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)) - .filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); - JavaRDD orp_rdd = all_orp_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache(); + JavaRDD all_software_rdd = + sc.textFile(inputPath + "/software") + .map(item -> new ObjectMapper().readValue(item, Software.class)) + .filter(p -> !p.getDataInfo().getDeletedbyinference()) + .cache(); + JavaRDD software_rdd = + all_software_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache(); - JavaRDD all_software_rdd = sc.textFile(inputPath + "/software") - .map(item -> new ObjectMapper().readValue(item, Software.class)) - .filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); - JavaRDD software_rdd = all_software_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache(); + JavaRDD relation_rdd = + sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)) + .filter(r -> !r.getDataInfo().getDeletedbyinference()) + .filter( + r -> + allowedsemrel.contains(r.getRelClass()) + && RELATION_RESULTRESULT_REL_TYPE.equals( + r.getRelType())) + .cache(); - JavaRDD relation_rdd = sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)) - .filter(r -> !r.getDataInfo().getDeletedbyinference()) - .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache(); + org.apache.spark.sql.Dataset publication = + spark.createDataset(publication_rdd.rdd(), Encoders.bean(Publication.class)); + org.apache.spark.sql.Dataset dataset = + spark.createDataset(dataset_rdd.rdd(), Encoders.bean(Dataset.class)); - org.apache.spark.sql.Dataset publication = spark.createDataset(publication_rdd.rdd(), - Encoders.bean(Publication.class)); + org.apache.spark.sql.Dataset other = + spark.createDataset(orp_rdd.rdd(), Encoders.bean(OtherResearchProduct.class)); - org.apache.spark.sql.Dataset dataset = spark.createDataset(dataset_rdd.rdd(), - Encoders.bean(Dataset.class)); + org.apache.spark.sql.Dataset software = + spark.createDataset(software_rdd.rdd(), Encoders.bean(Software.class)); - org.apache.spark.sql.Dataset other = spark.createDataset(orp_rdd.rdd(), - Encoders.bean(OtherResearchProduct.class)); - - org.apache.spark.sql.Dataset software = spark.createDataset(software_rdd.rdd(), - Encoders.bean(Software.class)); - - org.apache.spark.sql.Dataset relation = spark.createDataset(relation_rdd.rdd(), - Encoders.bean(Relation.class)); + org.apache.spark.sql.Dataset relation = + spark.createDataset(relation_rdd.rdd(), Encoders.bean(Relation.class)); publication.createOrReplaceTempView("publication"); relation.createOrReplaceTempView("relation"); @@ -105,14 +123,17 @@ public class SparkResultToCommunityThroughSemRelJob { software.createOrReplaceTempView("software"); other.createOrReplaceTempView("other"); -// org.apache.spark.sql.Dataset publication_context = getContext(spark, "publication"); -// publication_context.createOrReplaceTempView("publication_context"); + // org.apache.spark.sql.Dataset publication_context = getContext(spark, + // "publication"); + // publication_context.createOrReplaceTempView("publication_context"); - org.apache.spark.sql.Dataset publication_context = spark.sql( "SELECT relation.source, " + - "publication.context , relation.target " + - "FROM publication " + - " JOIN relation " + - "ON id = source"); + org.apache.spark.sql.Dataset publication_context = + spark.sql( + "SELECT relation.source, " + + "publication.context , relation.target " + + "FROM publication " + + " JOIN relation " + + "ON id = source"); org.apache.spark.sql.Dataset dataset_context = getContext(spark, "dataset"); dataset_context.createOrReplaceTempView("dataset_context"); @@ -123,316 +144,473 @@ public class SparkResultToCommunityThroughSemRelJob { org.apache.spark.sql.Dataset other_context = getContext(spark, "other"); other_context.createOrReplaceTempView("other_context"); - publication = spark.createDataset(all_publication_rdd.rdd(), - Encoders.bean(Publication.class)); + publication = + spark.createDataset(all_publication_rdd.rdd(), Encoders.bean(Publication.class)); publication.createOrReplaceTempView("publication"); - dataset = spark.createDataset(all_dataset_rdd.rdd(), - Encoders.bean(Dataset.class)); + dataset = spark.createDataset(all_dataset_rdd.rdd(), Encoders.bean(Dataset.class)); dataset.createOrReplaceTempView("dataset"); - other = spark.createDataset(all_orp_rdd.rdd(), - Encoders.bean(OtherResearchProduct.class)); + other = spark.createDataset(all_orp_rdd.rdd(), Encoders.bean(OtherResearchProduct.class)); other.createOrReplaceTempView("other"); - software = spark.createDataset(all_software_rdd.rdd(), - Encoders.bean(Software.class)); + software = spark.createDataset(all_software_rdd.rdd(), Encoders.bean(Software.class)); software.createOrReplaceTempView("software"); + org.apache.spark.sql.Dataset toupdatesoftwareresult = + getUpdateCommunitiesForTable(spark, "software"); + org.apache.spark.sql.Dataset toupdatedatasetresult = + getUpdateCommunitiesForTable(spark, "dataset"); + org.apache.spark.sql.Dataset toupdatepublicationreresult = + getUpdateCommunitiesForTable(spark, "publication"); + org.apache.spark.sql.Dataset toupdateotherresult = + getUpdateCommunitiesForTable(spark, "other"); - org.apache.spark.sql.Dataset toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software"); - org.apache.spark.sql.Dataset toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset"); - org.apache.spark.sql.Dataset toupdatepublicationreresult = getUpdateCommunitiesForTable(spark, "publication"); - org.apache.spark.sql.Dataset toupdateotherresult = getUpdateCommunitiesForTable(spark, "other"); + createUpdateForResultDatasetWrite( + toupdatesoftwareresult.toJavaRDD(), + outputPath, + "software_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, + communityIdList); - createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + createUpdateForResultDatasetWrite( + toupdatedatasetresult.toJavaRDD(), + outputPath, + "dataset_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, + communityIdList); - createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + createUpdateForResultDatasetWrite( + toupdatepublicationreresult.toJavaRDD(), + outputPath, + "publication_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, + communityIdList); - createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + createUpdateForResultDatasetWrite( + toupdateotherresult.toJavaRDD(), + outputPath, + "other_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, + communityIdList); - createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + updateForDatasetDataset( + toupdatedatasetresult.toJavaRDD(), + dataset.toJavaRDD(), + outputPath, + "dataset", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, + communityIdList); + updateForOtherDataset( + toupdateotherresult.toJavaRDD(), + other.toJavaRDD(), + outputPath, + "otherresearchproduct", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, + communityIdList); - updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + updateForSoftwareDataset( + toupdatesoftwareresult.toJavaRDD(), + software.toJavaRDD(), + outputPath, + "software", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, + communityIdList); - updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + updateForPublicationDataset( + toupdatepublicationreresult.toJavaRDD(), + publication.toJavaRDD(), + outputPath, + "publication", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, + communityIdList); - updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - - updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - - -/* - JavaPairRDD resultLinkedToCommunities = publication - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - .union(datasets - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"dataset")) + /* + JavaPairRDD resultLinkedToCommunities = publication + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication")) .filter(p -> !(p == null)) .mapToPair(toPair()) - ) - .union(software - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"software")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - ) - .union(other - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"otherresearchproduct")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - ); + .union(datasets + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"dataset")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + ) + .union(software + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"software")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + ) + .union(other + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"otherresearchproduct")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + ); - JavaPairRDD to_add_result_communities = resultLinkedToCommunities.join(result_result).map(r -> r._2()._1().setSourceId(r._2()._2().getTargetId())) - .mapToPair(toPair()); + JavaPairRDD to_add_result_communities = resultLinkedToCommunities.join(result_result).map(r -> r._2()._1().setSourceId(r._2()._2().getTargetId())) + .mapToPair(toPair()); - JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); - updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - //leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato] - //per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla -*/ + updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + //leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato] + //per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla + */ } - private static org.apache.spark.sql.Dataset getUpdateCommunitiesForTable(SparkSession spark, String table){ - String query = "SELECT target_id, collect_set(co.id) context_id " + - " FROM (SELECT t.id target_id, s.context source_context " + - " FROM context_software s " + - " JOIN " + table + " t " + - " ON s.target = t.id " + - " UNION ALL " + - " SELECT t.id target_id, d.context source_context " + - " FROM dataset_context d " + - " JOIN " + table + " t" + - " ON s.target = t.id " + - " UNION ALL " + - " SELECT t.id target_id, p.context source_context " + - " FROM publication_context p" + - " JOIN " + table +" t " + - " on p.target = t.id " + - " UNION ALL " + - " SELECT t.id target_id, o.context source_context " + - " FROM other_context o " + - " JOIN " + table + " t " + - " ON o.target = t.id) TMP " + - " LATERAL VIEW EXPLODE(source_context) MyT as co " + - " GROUP BY target_id" ; + private static org.apache.spark.sql.Dataset getUpdateCommunitiesForTable( + SparkSession spark, String table) { + String query = + "SELECT target_id, collect_set(co.id) context_id " + + " FROM (SELECT t.id target_id, s.context source_context " + + " FROM context_software s " + + " JOIN " + + table + + " t " + + " ON s.target = t.id " + + " UNION ALL " + + " SELECT t.id target_id, d.context source_context " + + " FROM dataset_context d " + + " JOIN " + + table + + " t" + + " ON s.target = t.id " + + " UNION ALL " + + " SELECT t.id target_id, p.context source_context " + + " FROM publication_context p" + + " JOIN " + + table + + " t " + + " on p.target = t.id " + + " UNION ALL " + + " SELECT t.id target_id, o.context source_context " + + " FROM other_context o " + + " JOIN " + + table + + " t " + + " ON o.target = t.id) TMP " + + " LATERAL VIEW EXPLODE(source_context) MyT as co " + + " GROUP BY target_id"; return spark.sql(query); } - private static JavaRDD createUpdateForResultDatasetWrite(JavaRDD toupdateresult, String outputPath, String type, String class_id, String class_name, List communityIdList){ - return toupdateresult.map(r -> { - List contextList = new ArrayList(); - List toAddContext = r.getList(1); - for (String cId : toAddContext) { - if (communityIdList.contains(cId)) { - Context newContext = new Context(); - newContext.setId(cId); - newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); - contextList.add(newContext); - } + private static JavaRDD createUpdateForResultDatasetWrite( + JavaRDD toupdateresult, + String outputPath, + String type, + String class_id, + String class_name, + List communityIdList) { + return toupdateresult + .map( + r -> { + List contextList = new ArrayList(); + List toAddContext = r.getList(1); + for (String cId : toAddContext) { + if (communityIdList.contains(cId)) { + Context newContext = new Context(); + newContext.setId(cId); + newContext.setDataInfo( + Arrays.asList( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + class_id, + class_name))); + contextList.add(newContext); + } + } - } - - if (contextList.size() > 0) { - Result ret = new Result(); - ret.setId(r.getString(0)); - ret.setContext(contextList); - return ret; - } - return null; - }).filter(r -> r != null); + if (contextList.size() > 0) { + Result ret = new Result(); + ret.setId(r.getString(0)); + ret.setContext(contextList); + return ret; + } + return null; + }) + .filter(r -> r != null); } - private static void updateForSoftwareDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) - .map(r -> (Software) r) - .map(s -> new ObjectMapper().writeValueAsString(s)) - .saveAsTextFile(outputPath + "/" + type); + private static void updateForSoftwareDataset( + JavaRDD toupdateresult, + JavaRDD result, + String outputPath, + String type, + String class_id, + String class_name, + List communityIdList) { + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset( + toupdateresult, + tmp, + outputPath, + type, + class_id, + class_name, + communityIdList) + .map(r -> (Software) r) + .map(s -> new ObjectMapper().writeValueAsString(s)) + .saveAsTextFile(outputPath + "/" + type); } - private static void updateForDatasetDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) - .map( r-> (Dataset)r) + private static void updateForDatasetDataset( + JavaRDD toupdateresult, + JavaRDD result, + String outputPath, + String type, + String class_id, + String class_name, + List communityIdList) { + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset( + toupdateresult, + tmp, + outputPath, + type, + class_id, + class_name, + communityIdList) + .map(r -> (Dataset) r) .map(d -> new ObjectMapper().writeValueAsString(d)) .saveAsTextFile(outputPath + "/" + type); } - private static void updateForPublicationDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) - .map(r -> (Publication)r) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/" + type); + private static void updateForPublicationDataset( + JavaRDD toupdateresult, + JavaRDD result, + String outputPath, + String type, + String class_id, + String class_name, + List communityIdList) { + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset( + toupdateresult, + tmp, + outputPath, + type, + class_id, + class_name, + communityIdList) + .map(r -> (Publication) r) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/" + type); } - private static void updateForOtherDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) - .map( r -> (OtherResearchProduct)r) - .map( o -> new ObjectMapper().writeValueAsString(o)) - .saveAsTextFile(outputPath + "/" + type); + private static void updateForOtherDataset( + JavaRDD toupdateresult, + JavaRDD result, + String outputPath, + String type, + String class_id, + String class_name, + List communityIdList) { + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset( + toupdateresult, + tmp, + outputPath, + type, + class_id, + class_name, + communityIdList) + .map(r -> (OtherResearchProduct) r) + .map(o -> new ObjectMapper().writeValueAsString(o)) + .saveAsTextFile(outputPath + "/" + type); } + private static JavaRDD getUpdateForResultDataset( + JavaRDD toupdateresult, + JavaPairRDD result, + String outputPath, + String type, + String class_id, + String class_name, + List communityIdList) { + return result.leftOuterJoin( + toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))) + .map( + c -> { + if (!c._2()._2().isPresent()) { + return c._2()._1(); + } - - private static JavaRDD getUpdateForResultDataset(JavaRDD toupdateresult, JavaPairRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - return result.leftOuterJoin(toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))) - .map(c -> { - if(! c._2()._2().isPresent()){ - return c._2()._1(); - } - - List toAddContext = c._2()._2().get(); - Set context_set = new HashSet<>(); - for(Object cId: toAddContext){ - String id = (String)cId; - if (communityIdList.contains(id)){ - context_set.add(id); - } - } - for (Context context: c._2()._1().getContext()){ - if(context_set.contains(context)){ - context_set.remove(context); - } - } - - List contextList = context_set.stream().map(co -> { - Context newContext = new Context(); - newContext.setId(co); - newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); - return newContext; - - }).collect(Collectors.toList()); - - if(contextList.size() > 0 ){ - Result r = new Result(); - r.setId(c._1()); - r.setContext(contextList); - return r; - } - return null; - }).filter(r -> r != null); - - -// return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) -// .join(result) -// .map(c -> { -// List toAddContext = c._2()._1(); -// Set context_set = new HashSet<>(); -// for(Object cId: toAddContext){ -// String id = (String)cId; -// if (communityIdList.contains(id)){ -// context_set.add(id); -// } -// } -// for (Context context: c._2()._2().getContext()){ -// if(context_set.contains(context)){ -// context_set.remove(context); -// } -// } -// -// List contextList = context_set.stream().map(co -> { -// Context newContext = new Context(); -// newContext.setId(co); -// newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); -// return newContext; -// -// }).collect(Collectors.toList()); -// -// if(contextList.size() > 0 ){ -// Result r = new Result(); -// r.setId(c._1()); -// r.setContext(contextList); -// return r; -// } -// return null; -// }) -// .filter(r -> r != null); - } - - private static JavaRDD createUpdateForSoftwareDataset(JavaRDD toupdateresult, List communityList, - JavaRDD result, String class_id, String class_name) { - return result - .mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList)) - .map(c -> { - Software oaf = c._2()._1(); - if (c._2()._2().isPresent()) { - - HashSet contexts = new HashSet<>(c._2()._2().get()); - - for (Context context : oaf.getContext()) { - if (contexts.contains(context.getId())){ - if (!context.getDataInfo().stream().map(di -> di.getInferenceprovenance()) - .collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ - context.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)); - //community id already in the context of the result. Remove it from the set that has to be added - contexts.remove(context.getId()); + List toAddContext = c._2()._2().get(); + Set context_set = new HashSet<>(); + for (Object cId : toAddContext) { + String id = (String) cId; + if (communityIdList.contains(id)) { + context_set.add(id); + } + } + for (Context context : c._2()._1().getContext()) { + if (context_set.contains(context)) { + context_set.remove(context); } } - } - List cc = oaf.getContext(); - for(String cId: contexts){ - Context context = new Context(); - context.setId(cId); - context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); - cc.add(context); - } - oaf.setContext(cc); + List contextList = + context_set.stream() + .map( + co -> { + Context newContext = new Context(); + newContext.setId(co); + newContext.setDataInfo( + Arrays.asList( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + class_id, + class_name))); + return newContext; + }) + .collect(Collectors.toList()); + + if (contextList.size() > 0) { + Result r = new Result(); + r.setId(c._1()); + r.setContext(contextList); + return r; + } + return null; + }) + .filter(r -> r != null); + + // return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) + // .join(result) + // .map(c -> { + // List toAddContext = c._2()._1(); + // Set context_set = new HashSet<>(); + // for(Object cId: toAddContext){ + // String id = (String)cId; + // if (communityIdList.contains(id)){ + // context_set.add(id); + // } + // } + // for (Context context: c._2()._2().getContext()){ + // if(context_set.contains(context)){ + // context_set.remove(context); + // } + // } + // + // List contextList = context_set.stream().map(co -> { + // Context newContext = new Context(); + // newContext.setId(co); + // + // newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, + // class_name))); + // return newContext; + // + // }).collect(Collectors.toList()); + // + // if(contextList.size() > 0 ){ + // Result r = new Result(); + // r.setId(c._1()); + // r.setContext(contextList); + // return r; + // } + // return null; + // }) + // .filter(r -> r != null); + } + + private static JavaRDD createUpdateForSoftwareDataset( + JavaRDD toupdateresult, + List communityList, + JavaRDD result, + String class_id, + String class_name) { + return result.mapToPair(s -> new Tuple2<>(s.getId(), s)) + .leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList)) + .map( + c -> { + Software oaf = c._2()._1(); + if (c._2()._2().isPresent()) { + + HashSet contexts = new HashSet<>(c._2()._2().get()); + + for (Context context : oaf.getContext()) { + if (contexts.contains(context.getId())) { + if (!context.getDataInfo().stream() + .map(di -> di.getInferenceprovenance()) + .collect(Collectors.toSet()) + .contains(PROPAGATION_DATA_INFO_TYPE)) { + context.getDataInfo() + .add( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + class_id, + class_name)); + // community id already in the context of the result. + // Remove it from the set that has to be added + contexts.remove(context.getId()); + } + } + } + List cc = oaf.getContext(); + for (String cId : contexts) { + Context context = new Context(); + context.setId(cId); + context.setDataInfo( + Arrays.asList( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + class_id, + class_name))); + cc.add(context); + } + oaf.setContext(cc); + } + return oaf; + }); + } + + private static JavaPairRDD> getStringResultJavaPairRDD( + JavaRDD toupdateresult, List communityList) { + return toupdateresult.mapToPair( + c -> { + List contextList = new ArrayList<>(); + List contexts = c.getList(1); + for (String context : contexts) { + if (communityList.contains(context)) { + contextList.add(context); + } } - return oaf; + + return new Tuple2<>(c.getString(0), contextList); }); } - private static JavaPairRDD> getStringResultJavaPairRDD(JavaRDD toupdateresult, List communityList) { - return toupdateresult.mapToPair(c -> { - - List contextList = new ArrayList<>(); - List contexts = c.getList(1); - for (String context : contexts) { - if (communityList.contains(context)) { - contextList.add(context); - } - } - - return new Tuple2<>(c.getString(0) ,contextList); - }); - } - - - private static org.apache.spark.sql.Dataset getContext(SparkSession spark, String table){ - String query = "SELECT relation.source, " + table +".context , relation.target " + - "FROM " + table + - " JOIN relation " + - "ON id = source" ; + private static org.apache.spark.sql.Dataset getContext(SparkSession spark, String table) { + String query = + "SELECT relation.source, " + + table + + ".context , relation.target " + + "FROM " + + table + + " JOIN relation " + + "ON id = source"; return spark.sql(query); } private static Boolean relatedToCommunities(Result r, List communityIdList) { - Set result_communities = r.getContext() - .stream() - .map(c -> c.getId()) - .collect(Collectors.toSet()); + Set result_communities = + r.getContext().stream().map(c -> c.getId()).collect(Collectors.toSet()); for (String communityId : result_communities) { if (communityIdList.contains(communityId)) { return true; @@ -441,44 +619,61 @@ public class SparkResultToCommunityThroughSemRelJob { return false; } - private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { + private static void updateResult( + JavaPairRDD results, + JavaPairRDD toupdateresult, + String outputPath, + String type) { results.leftOuterJoin(toupdateresult) - .map(p -> { - Result r = p._2()._1(); - if (p._2()._2().isPresent()){ - Set communityList = p._2()._2().get().getAccumulator(); - for(Context c: r.getContext()){ - if (communityList.contains(c.getId())){ - //verify if the datainfo for this context contains propagation - if (!c.getDataInfo().stream().map(di -> di.getInferenceprovenance()).collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ - c.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)); - //community id already in the context of the result. Remove it from the set that has to be added - communityList.remove(c.getId()); + .map( + p -> { + Result r = p._2()._1(); + if (p._2()._2().isPresent()) { + Set communityList = p._2()._2().get().getAccumulator(); + for (Context c : r.getContext()) { + if (communityList.contains(c.getId())) { + // verify if the datainfo for this context contains + // propagation + if (!c.getDataInfo().stream() + .map(di -> di.getInferenceprovenance()) + .collect(Collectors.toSet()) + .contains(PROPAGATION_DATA_INFO_TYPE)) { + c.getDataInfo() + .add( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)); + // community id already in the context of the result. + // Remove it from the set that has to be added + communityList.remove(c.getId()); + } + } } + List cc = r.getContext(); + for (String cId : communityList) { + Context context = new Context(); + context.setId(cId); + context.setDataInfo( + Arrays.asList( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); + cc.add(context); + } + r.setContext(cc); } - } - List cc = r.getContext(); - for(String cId: communityList){ - Context context = new Context(); - context.setId(cId); - context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); - cc.add(context); - } - r.setContext(cc); - } - return r; - }) + return r; + }) .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath+"/"+type); + .saveAsTextFile(outputPath + "/" + type); } - - - private static TypedRow getTypedRow(List communityIdList, List context, String id, String type) { - Set result_communities = context - .stream() - .map(c -> c.getId()) - .collect(Collectors.toSet()); + private static TypedRow getTypedRow( + List communityIdList, List context, String id, String type) { + Set result_communities = + context.stream().map(c -> c.getId()).collect(Collectors.toSet()); TypedRow tp = new TypedRow(); tp.setSourceId(id); tp.setType(type); diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob2.java index 345bd7905..3a0904404 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob2.java @@ -1,10 +1,14 @@ package eu.dnetlib.dhp.resulttocommunityfromsemrel; +import static eu.dnetlib.dhp.PropagationConstant.*; + import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.QueryInformationSystem; import eu.dnetlib.dhp.TypedRow; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.*; +import java.util.*; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.spark.SparkConf; @@ -16,447 +20,589 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import scala.Tuple2; -import java.util.*; -import java.util.stream.Collectors; - -import static eu.dnetlib.dhp.PropagationConstant.*; - public class SparkResultToCommunityThroughSemRelJob2 { public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils - .toString(SparkResultToCommunityThroughSemRelJob2.class - .getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json"))); + final ArgumentApplicationParser parser = + new ArgumentApplicationParser( + IOUtils.toString( + SparkResultToCommunityThroughSemRelJob2.class.getResourceAsStream( + "/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_communitytoresult_parameters.json"))); parser.parseArgument(args); - SparkConf conf = new SparkConf(); + SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - final SparkSession spark = SparkSession - .builder() - .appName(SparkResultToCommunityThroughSemRelJob2.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .enableHiveSupport() - .getOrCreate(); + final SparkSession spark = + SparkSession.builder() + .appName(SparkResultToCommunityThroughSemRelJob2.class.getSimpleName()) + .master(parser.get("master")) + .config(conf) + .enableHiveSupport() + .getOrCreate(); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final String inputPath = parser.get("sourcePath"); final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel"; final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); - //final List allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo"); - final List communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); - //final List communityIdList = QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp"); + // final List allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo"); + final List communityIdList = + QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); + // final List communityIdList = + // QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp"); createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - - JavaRDD publication_rdd = sc.textFile(inputPath + "/publication") - .map(item -> new ObjectMapper().readValue(item, Publication.class)); + JavaRDD publication_rdd = + sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, Publication.class)); System.out.println(publication_rdd.count()); -// JavaRDD dataset_rdd = sc.textFile(inputPath + "/dataset") -// .map(item -> new ObjectMapper().readValue(item, Dataset.class)); -// -// JavaRDD orp_rdd = sc.textFile(inputPath + "/otherresearchproduct") -// .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); -// -// JavaRDD software_rdd = sc.textFile(inputPath + "/software") -// .map(item -> new ObjectMapper().readValue(item, Software.class)); + // JavaRDD dataset_rdd = sc.textFile(inputPath + "/dataset") + // .map(item -> new ObjectMapper().readValue(item, Dataset.class)); + // + // JavaRDD orp_rdd = sc.textFile(inputPath + + // "/otherresearchproduct") + // .map(item -> new ObjectMapper().readValue(item, + // OtherResearchProduct.class)); + // + // JavaRDD software_rdd = sc.textFile(inputPath + "/software") + // .map(item -> new ObjectMapper().readValue(item, Software.class)); - JavaRDD relation_rdd = sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)); + JavaRDD relation_rdd = + sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)); System.out.println(relation_rdd.count()); -// .filter(r -> !r.getDataInfo().getDeletedbyinference()) -// .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache(); + // .filter(r -> !r.getDataInfo().getDeletedbyinference()) + // .filter(r -> allowedsemrel.contains(r.getRelClass()) && + // RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache(); + org.apache.spark.sql.Dataset publication = + spark.createDataset(publication_rdd.rdd(), Encoders.bean(Publication.class)); - org.apache.spark.sql.Dataset publication = spark.createDataset(publication_rdd.rdd(), - Encoders.bean(Publication.class)); + org.apache.spark.sql.Dataset relation = + spark.createDataset(relation_rdd.rdd(), Encoders.bean(Relation.class)); - org.apache.spark.sql.Dataset relation = spark.createDataset(relation_rdd.rdd(), - Encoders.bean(Relation.class)); - -// org.apache.spark.sql.Dataset dataset = spark.createDataset(dataset_rdd.rdd(), -// Encoders.bean(Dataset.class)); -// -// org.apache.spark.sql.Dataset other = spark.createDataset(orp_rdd.rdd(), -// Encoders.bean(OtherResearchProduct.class)); -// -// org.apache.spark.sql.Dataset software = spark.createDataset(software_rdd.rdd(), -// Encoders.bean(Software.class)); -// -// org.apache.spark.sql.Dataset relation = spark.createDataset(relation_rdd.rdd(), -// Encoders.bean(Relation.class)); + // org.apache.spark.sql.Dataset dataset = + // spark.createDataset(dataset_rdd.rdd(), + // Encoders.bean(Dataset.class)); + // + // org.apache.spark.sql.Dataset other = + // spark.createDataset(orp_rdd.rdd(), + // Encoders.bean(OtherResearchProduct.class)); + // + // org.apache.spark.sql.Dataset software = + // spark.createDataset(software_rdd.rdd(), + // Encoders.bean(Software.class)); + // + // org.apache.spark.sql.Dataset relation = + // spark.createDataset(relation_rdd.rdd(), + // Encoders.bean(Relation.class)); publication.createOrReplaceTempView("publication"); relation.createOrReplaceTempView("relation"); -// relation.createOrReplaceTempView("relation"); -// dataset.createOrReplaceTempView("dataset"); -// software.createOrReplaceTempView("software"); -// other.createOrReplaceTempView("other"); + // relation.createOrReplaceTempView("relation"); + // dataset.createOrReplaceTempView("dataset"); + // software.createOrReplaceTempView("software"); + // other.createOrReplaceTempView("other"); String communitylist = getConstraintList(" co.id = '", communityIdList); - String semrellist = getConstraintList(" relClass = '", allowedsemrel ); + String semrellist = getConstraintList(" relClass = '", allowedsemrel); + String query = + "Select source, community_context, target " + + "from (select id, collect_set(co.id) community_context " + + "from publication " + + "lateral view explode (context) c as co " + + "where datainfo.deletedbyinference = false " + + communitylist + + " group by id) p " + + "JOIN " + + "(select * " + + "from relation " + + "where datainfo.deletedbyinference = false and (relClass = 'isSupplementedBy' OR relClass = 'isSupplementTo')) r " + + "ON p.id = r.source"; - String query = "Select source, community_context, target " + - "from (select id, collect_set(co.id) community_context " + - "from publication " + - "lateral view explode (context) c as co " + - "where datainfo.deletedbyinference = false "+ communitylist + - " group by id) p " + - "JOIN " + - "(select * " + - "from relation " + - "where datainfo.deletedbyinference = false and (relClass = 'isSupplementedBy' OR relClass = 'isSupplementTo')) r " + - "ON p.id = r.source"; - - - org.apache.spark.sql.Dataset publication_context = spark.sql( query); + org.apache.spark.sql.Dataset publication_context = spark.sql(query); publication_context.createOrReplaceTempView("publication_context"); - //( source, (mes, dh-ch-, ni), target ) - query = "select target , collect_set(co) " + - "from (select target, community_context " + - "from publication_context pc join publication p on " + - "p.id = pc.source) tmp " + - "lateral view explode (community_context) c as co " + - "group by target"; - - + // ( source, (mes, dh-ch-, ni), target ) + query = + "select target , collect_set(co) " + + "from (select target, community_context " + + "from publication_context pc join publication p on " + + "p.id = pc.source) tmp " + + "lateral view explode (community_context) c as co " + + "group by target"; org.apache.spark.sql.Dataset toupdatepublicationreresult = spark.sql(query); System.out.println(toupdatepublicationreresult.count()); - toupdatepublicationreresult.toJavaRDD() - .map(r -> { - TypedRow tp = new TypedRow(); - tp.setSourceId(r.getString(0)); - r.getList(1).stream().forEach(c -> tp.add((String)c)); - return tp; - }) + toupdatepublicationreresult + .toJavaRDD() + .map( + r -> { + TypedRow tp = new TypedRow(); + tp.setSourceId(r.getString(0)); + r.getList(1).stream().forEach(c -> tp.add((String) c)); + return tp; + }) .map(tr -> new ObjectMapper().writeValueAsString(tr)) .saveAsTextFile(outputPath + "/community2semrelonpublication"); -// toupdatepublicationreresult.toJavaRDD().flatMap(c -> { -// -// String source = c.getString(0); -// List relation_list = new ArrayList<>(); -// c.getList(1).stream() -// .forEach(res -> { -// Relation r = new Relation(); -// r.setSource(source); -// r.setTarget((String)res); -// r.setRelClass("produces"); -// relation_list.add(r); -// r = new Relation(); -// r.setSource((String)res); -// r.setTarget(source); -// r.setRelClass("isProducedBy"); -// relation_list.add(r); -// }); -// return relation_list.iterator(); -// }).map(tr -> new ObjectMapper().writeValueAsString(tr)) -// .saveAsTextFile(outputPath + "/community2semrel"); -// + // toupdatepublicationreresult.toJavaRDD().flatMap(c -> { + // + // String source = c.getString(0); + // List relation_list = new ArrayList<>(); + // c.getList(1).stream() + // .forEach(res -> { + // Relation r = new Relation(); + // r.setSource(source); + // r.setTarget((String)res); + // r.setRelClass("produces"); + // relation_list.add(r); + // r = new Relation(); + // r.setSource((String)res); + // r.setTarget(source); + // r.setRelClass("isProducedBy"); + // relation_list.add(r); + // }); + // return relation_list.iterator(); + // }).map(tr -> new ObjectMapper().writeValueAsString(tr)) + // .saveAsTextFile(outputPath + "/community2semrel"); + // -// org.apache.spark.sql.Dataset toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software"); -// org.apache.spark.sql.Dataset toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset"); -// org.apache.spark.sql.Dataset toupdatepublicationreresult = getUpdateCommunitiesForTable(spark, "publication"); -// org.apache.spark.sql.Dataset toupdateotherresult = getUpdateCommunitiesForTable(spark, "other"); + // org.apache.spark.sql.Dataset toupdatesoftwareresult = + // getUpdateCommunitiesForTable(spark, "software"); + // org.apache.spark.sql.Dataset toupdatedatasetresult = + // getUpdateCommunitiesForTable(spark, "dataset"); + // org.apache.spark.sql.Dataset toupdatepublicationreresult = + // getUpdateCommunitiesForTable(spark, "publication"); + // org.apache.spark.sql.Dataset toupdateotherresult = + // getUpdateCommunitiesForTable(spark, "other"); -// createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update", -// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); -// -// createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update", -// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + // createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, + // "software_update", + // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + // + // createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, + // "dataset_update", + // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); -// createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update", -// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + // createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), + // outputPath, "publication_update", + // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); -// createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update", -// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); -// -// -// updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset", -// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); -// -// updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct", -// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); -// -// updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software", -// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); -// -// updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication", -// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); -// + // createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, + // "other_update", + // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + // + // + // updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), + // outputPath, "dataset", + // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + // + // updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), + // outputPath, "otherresearchproduct", + // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + // + // updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), + // outputPath, "software", + // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + // + // updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), + // publication.toJavaRDD(), outputPath, "publication", + // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + // -/* - JavaPairRDD resultLinkedToCommunities = publication - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - .union(datasets - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"dataset")) + /* + JavaPairRDD resultLinkedToCommunities = publication + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication")) .filter(p -> !(p == null)) .mapToPair(toPair()) - ) - .union(software - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"software")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - ) - .union(other - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"otherresearchproduct")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - ); + .union(datasets + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"dataset")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + ) + .union(software + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"software")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + ) + .union(other + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"otherresearchproduct")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + ); - JavaPairRDD to_add_result_communities = resultLinkedToCommunities.join(result_result).map(r -> r._2()._1().setSourceId(r._2()._2().getTargetId())) - .mapToPair(toPair()); + JavaPairRDD to_add_result_communities = resultLinkedToCommunities.join(result_result).map(r -> r._2()._1().setSourceId(r._2()._2().getTargetId())) + .mapToPair(toPair()); - JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); - updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - //leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato] - //per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla -*/ + updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + //leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato] + //per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla + */ } - private static org.apache.spark.sql.Dataset getUpdateCommunitiesForTable(SparkSession spark, String table){ - String query = "SELECT target_id, collect_set(co.id) context_id " + - " FROM (SELECT t.id target_id, s.context source_context " + - " FROM context_software s " + - " JOIN " + table + " t " + - " ON s.target = t.id " + - " UNION ALL " + - " SELECT t.id target_id, d.context source_context " + - " FROM dataset_context d " + - " JOIN " + table + " t" + - " ON s.target = t.id " + - " UNION ALL " + - " SELECT t.id target_id, p.context source_context " + - " FROM publication_context p" + - " JOIN " + table +" t " + - " on p.target = t.id " + - " UNION ALL " + - " SELECT t.id target_id, o.context source_context " + - " FROM other_context o " + - " JOIN " + table + " t " + - " ON o.target = t.id) TMP " + - " LATERAL VIEW EXPLODE(source_context) MyT as co " + - " GROUP BY target_id" ; + private static org.apache.spark.sql.Dataset getUpdateCommunitiesForTable( + SparkSession spark, String table) { + String query = + "SELECT target_id, collect_set(co.id) context_id " + + " FROM (SELECT t.id target_id, s.context source_context " + + " FROM context_software s " + + " JOIN " + + table + + " t " + + " ON s.target = t.id " + + " UNION ALL " + + " SELECT t.id target_id, d.context source_context " + + " FROM dataset_context d " + + " JOIN " + + table + + " t" + + " ON s.target = t.id " + + " UNION ALL " + + " SELECT t.id target_id, p.context source_context " + + " FROM publication_context p" + + " JOIN " + + table + + " t " + + " on p.target = t.id " + + " UNION ALL " + + " SELECT t.id target_id, o.context source_context " + + " FROM other_context o " + + " JOIN " + + table + + " t " + + " ON o.target = t.id) TMP " + + " LATERAL VIEW EXPLODE(source_context) MyT as co " + + " GROUP BY target_id"; return spark.sql(query); } - private static JavaRDD createUpdateForResultDatasetWrite(JavaRDD toupdateresult, String outputPath, String type, String class_id, String class_name, List communityIdList){ - return toupdateresult.map(r -> { - List contextList = new ArrayList(); - List toAddContext = r.getList(1); - for (String cId : toAddContext) { - if (communityIdList.contains(cId)) { - Context newContext = new Context(); - newContext.setId(cId); - newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); - contextList.add(newContext); - } + private static JavaRDD createUpdateForResultDatasetWrite( + JavaRDD toupdateresult, + String outputPath, + String type, + String class_id, + String class_name, + List communityIdList) { + return toupdateresult + .map( + r -> { + List contextList = new ArrayList(); + List toAddContext = r.getList(1); + for (String cId : toAddContext) { + if (communityIdList.contains(cId)) { + Context newContext = new Context(); + newContext.setId(cId); + newContext.setDataInfo( + Arrays.asList( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + class_id, + class_name))); + contextList.add(newContext); + } + } - } - - if (contextList.size() > 0) { - Result ret = new Result(); - ret.setId(r.getString(0)); - ret.setContext(contextList); - return ret; - } - return null; - }).filter(r -> r != null); + if (contextList.size() > 0) { + Result ret = new Result(); + ret.setId(r.getString(0)); + ret.setContext(contextList); + return ret; + } + return null; + }) + .filter(r -> r != null); } - private static void updateForSoftwareDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) - .map(r -> (Software) r) - .map(s -> new ObjectMapper().writeValueAsString(s)) - .saveAsTextFile(outputPath + "/" + type); + private static void updateForSoftwareDataset( + JavaRDD toupdateresult, + JavaRDD result, + String outputPath, + String type, + String class_id, + String class_name, + List communityIdList) { + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset( + toupdateresult, + tmp, + outputPath, + type, + class_id, + class_name, + communityIdList) + .map(r -> (Software) r) + .map(s -> new ObjectMapper().writeValueAsString(s)) + .saveAsTextFile(outputPath + "/" + type); } - private static void updateForDatasetDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) - .map( r-> (Dataset)r) + private static void updateForDatasetDataset( + JavaRDD toupdateresult, + JavaRDD result, + String outputPath, + String type, + String class_id, + String class_name, + List communityIdList) { + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset( + toupdateresult, + tmp, + outputPath, + type, + class_id, + class_name, + communityIdList) + .map(r -> (Dataset) r) .map(d -> new ObjectMapper().writeValueAsString(d)) .saveAsTextFile(outputPath + "/" + type); } - private static void updateForPublicationDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) - .map(r -> (Publication)r) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/" + type); + private static void updateForPublicationDataset( + JavaRDD toupdateresult, + JavaRDD result, + String outputPath, + String type, + String class_id, + String class_name, + List communityIdList) { + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset( + toupdateresult, + tmp, + outputPath, + type, + class_id, + class_name, + communityIdList) + .map(r -> (Publication) r) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/" + type); } - private static void updateForOtherDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) - .map( r -> (OtherResearchProduct)r) - .map( o -> new ObjectMapper().writeValueAsString(o)) - .saveAsTextFile(outputPath + "/" + type); + private static void updateForOtherDataset( + JavaRDD toupdateresult, + JavaRDD result, + String outputPath, + String type, + String class_id, + String class_name, + List communityIdList) { + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset( + toupdateresult, + tmp, + outputPath, + type, + class_id, + class_name, + communityIdList) + .map(r -> (OtherResearchProduct) r) + .map(o -> new ObjectMapper().writeValueAsString(o)) + .saveAsTextFile(outputPath + "/" + type); } + private static JavaRDD getUpdateForResultDataset( + JavaRDD toupdateresult, + JavaPairRDD result, + String outputPath, + String type, + String class_id, + String class_name, + List communityIdList) { + return result.leftOuterJoin( + toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))) + .map( + c -> { + if (!c._2()._2().isPresent()) { + return c._2()._1(); + } - - private static JavaRDD getUpdateForResultDataset(JavaRDD toupdateresult, JavaPairRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - return result.leftOuterJoin(toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))) - .map(c -> { - if(! c._2()._2().isPresent()){ - return c._2()._1(); - } - - List toAddContext = c._2()._2().get(); - Set context_set = new HashSet<>(); - for(Object cId: toAddContext){ - String id = (String)cId; - if (communityIdList.contains(id)){ - context_set.add(id); - } - } - for (Context context: c._2()._1().getContext()){ - if(context_set.contains(context)){ - context_set.remove(context); - } - } - - List contextList = context_set.stream().map(co -> { - Context newContext = new Context(); - newContext.setId(co); - newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); - return newContext; - - }).collect(Collectors.toList()); - - if(contextList.size() > 0 ){ - Result r = new Result(); - r.setId(c._1()); - r.setContext(contextList); - return r; - } - return null; - }).filter(r -> r != null); - - -// return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) -// .join(result) -// .map(c -> { -// List toAddContext = c._2()._1(); -// Set context_set = new HashSet<>(); -// for(Object cId: toAddContext){ -// String id = (String)cId; -// if (communityIdList.contains(id)){ -// context_set.add(id); -// } -// } -// for (Context context: c._2()._2().getContext()){ -// if(context_set.contains(context)){ -// context_set.remove(context); -// } -// } -// -// List contextList = context_set.stream().map(co -> { -// Context newContext = new Context(); -// newContext.setId(co); -// newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); -// return newContext; -// -// }).collect(Collectors.toList()); -// -// if(contextList.size() > 0 ){ -// Result r = new Result(); -// r.setId(c._1()); -// r.setContext(contextList); -// return r; -// } -// return null; -// }) -// .filter(r -> r != null); - } - - private static JavaRDD createUpdateForSoftwareDataset(JavaRDD toupdateresult, List communityList, - JavaRDD result, String class_id, String class_name) { - return result - .mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList)) - .map(c -> { - Software oaf = c._2()._1(); - if (c._2()._2().isPresent()) { - - HashSet contexts = new HashSet<>(c._2()._2().get()); - - for (Context context : oaf.getContext()) { - if (contexts.contains(context.getId())){ - if (!context.getDataInfo().stream().map(di -> di.getInferenceprovenance()) - .collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ - context.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)); - //community id already in the context of the result. Remove it from the set that has to be added - contexts.remove(context.getId()); + List toAddContext = c._2()._2().get(); + Set context_set = new HashSet<>(); + for (Object cId : toAddContext) { + String id = (String) cId; + if (communityIdList.contains(id)) { + context_set.add(id); + } + } + for (Context context : c._2()._1().getContext()) { + if (context_set.contains(context)) { + context_set.remove(context); } } - } - List cc = oaf.getContext(); - for(String cId: contexts){ - Context context = new Context(); - context.setId(cId); - context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); - cc.add(context); - } - oaf.setContext(cc); + List contextList = + context_set.stream() + .map( + co -> { + Context newContext = new Context(); + newContext.setId(co); + newContext.setDataInfo( + Arrays.asList( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + class_id, + class_name))); + return newContext; + }) + .collect(Collectors.toList()); + + if (contextList.size() > 0) { + Result r = new Result(); + r.setId(c._1()); + r.setContext(contextList); + return r; + } + return null; + }) + .filter(r -> r != null); + + // return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) + // .join(result) + // .map(c -> { + // List toAddContext = c._2()._1(); + // Set context_set = new HashSet<>(); + // for(Object cId: toAddContext){ + // String id = (String)cId; + // if (communityIdList.contains(id)){ + // context_set.add(id); + // } + // } + // for (Context context: c._2()._2().getContext()){ + // if(context_set.contains(context)){ + // context_set.remove(context); + // } + // } + // + // List contextList = context_set.stream().map(co -> { + // Context newContext = new Context(); + // newContext.setId(co); + // + // newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, + // class_name))); + // return newContext; + // + // }).collect(Collectors.toList()); + // + // if(contextList.size() > 0 ){ + // Result r = new Result(); + // r.setId(c._1()); + // r.setContext(contextList); + // return r; + // } + // return null; + // }) + // .filter(r -> r != null); + } + + private static JavaRDD createUpdateForSoftwareDataset( + JavaRDD toupdateresult, + List communityList, + JavaRDD result, + String class_id, + String class_name) { + return result.mapToPair(s -> new Tuple2<>(s.getId(), s)) + .leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList)) + .map( + c -> { + Software oaf = c._2()._1(); + if (c._2()._2().isPresent()) { + + HashSet contexts = new HashSet<>(c._2()._2().get()); + + for (Context context : oaf.getContext()) { + if (contexts.contains(context.getId())) { + if (!context.getDataInfo().stream() + .map(di -> di.getInferenceprovenance()) + .collect(Collectors.toSet()) + .contains(PROPAGATION_DATA_INFO_TYPE)) { + context.getDataInfo() + .add( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + class_id, + class_name)); + // community id already in the context of the result. + // Remove it from the set that has to be added + contexts.remove(context.getId()); + } + } + } + List cc = oaf.getContext(); + for (String cId : contexts) { + Context context = new Context(); + context.setId(cId); + context.setDataInfo( + Arrays.asList( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + class_id, + class_name))); + cc.add(context); + } + oaf.setContext(cc); + } + return oaf; + }); + } + + private static JavaPairRDD> getStringResultJavaPairRDD( + JavaRDD toupdateresult, List communityList) { + return toupdateresult.mapToPair( + c -> { + List contextList = new ArrayList<>(); + List contexts = c.getList(1); + for (String context : contexts) { + if (communityList.contains(context)) { + contextList.add(context); + } } - return oaf; + + return new Tuple2<>(c.getString(0), contextList); }); } - private static JavaPairRDD> getStringResultJavaPairRDD(JavaRDD toupdateresult, List communityList) { - return toupdateresult.mapToPair(c -> { - - List contextList = new ArrayList<>(); - List contexts = c.getList(1); - for (String context : contexts) { - if (communityList.contains(context)) { - contextList.add(context); - } - } - - return new Tuple2<>(c.getString(0) ,contextList); - }); - } - - - private static org.apache.spark.sql.Dataset getContext(SparkSession spark, String table){ - String query = "SELECT relation.source, " + table +".context , relation.target " + - "FROM " + table + - " JOIN relation " + - "ON id = source" ; + private static org.apache.spark.sql.Dataset getContext(SparkSession spark, String table) { + String query = + "SELECT relation.source, " + + table + + ".context , relation.target " + + "FROM " + + table + + " JOIN relation " + + "ON id = source"; return spark.sql(query); } private static Boolean relatedToCommunities(Result r, List communityIdList) { - Set result_communities = r.getContext() - .stream() - .map(c -> c.getId()) - .collect(Collectors.toSet()); + Set result_communities = + r.getContext().stream().map(c -> c.getId()).collect(Collectors.toSet()); for (String communityId : result_communities) { if (communityIdList.contains(communityId)) { return true; @@ -465,44 +611,61 @@ public class SparkResultToCommunityThroughSemRelJob2 { return false; } - private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { + private static void updateResult( + JavaPairRDD results, + JavaPairRDD toupdateresult, + String outputPath, + String type) { results.leftOuterJoin(toupdateresult) - .map(p -> { - Result r = p._2()._1(); - if (p._2()._2().isPresent()){ - Set communityList = p._2()._2().get().getAccumulator(); - for(Context c: r.getContext()){ - if (communityList.contains(c.getId())){ - //verify if the datainfo for this context contains propagation - if (!c.getDataInfo().stream().map(di -> di.getInferenceprovenance()).collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ - c.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)); - //community id already in the context of the result. Remove it from the set that has to be added - communityList.remove(c.getId()); + .map( + p -> { + Result r = p._2()._1(); + if (p._2()._2().isPresent()) { + Set communityList = p._2()._2().get().getAccumulator(); + for (Context c : r.getContext()) { + if (communityList.contains(c.getId())) { + // verify if the datainfo for this context contains + // propagation + if (!c.getDataInfo().stream() + .map(di -> di.getInferenceprovenance()) + .collect(Collectors.toSet()) + .contains(PROPAGATION_DATA_INFO_TYPE)) { + c.getDataInfo() + .add( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)); + // community id already in the context of the result. + // Remove it from the set that has to be added + communityList.remove(c.getId()); + } + } } + List cc = r.getContext(); + for (String cId : communityList) { + Context context = new Context(); + context.setId(cId); + context.setDataInfo( + Arrays.asList( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); + cc.add(context); + } + r.setContext(cc); } - } - List cc = r.getContext(); - for(String cId: communityList){ - Context context = new Context(); - context.setId(cId); - context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); - cc.add(context); - } - r.setContext(cc); - } - return r; - }) + return r; + }) .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath+"/"+type); + .saveAsTextFile(outputPath + "/" + type); } - - - private static TypedRow getTypedRow(List communityIdList, List context, String id, String type) { - Set result_communities = context - .stream() - .map(c -> c.getId()) - .collect(Collectors.toSet()); + private static TypedRow getTypedRow( + List communityIdList, List context, String id, String type) { + Set result_communities = + context.stream().map(c -> c.getId()).collect(Collectors.toSet()); TypedRow tp = new TypedRow(); tp.setSourceId(id); tp.setType(type); diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob3.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob3.java index 0e39090dd..9c5e6c3b3 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob3.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob3.java @@ -1,10 +1,14 @@ package eu.dnetlib.dhp.resulttocommunityfromsemrel; +import static eu.dnetlib.dhp.PropagationConstant.*; + import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.QueryInformationSystem; import eu.dnetlib.dhp.TypedRow; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.*; +import java.util.*; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.spark.SparkConf; @@ -16,28 +20,25 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import scala.Tuple2; -import java.util.*; -import java.util.stream.Collectors; - -import static eu.dnetlib.dhp.PropagationConstant.*; - public class SparkResultToCommunityThroughSemRelJob3 { public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils - .toString(SparkResultToCommunityThroughSemRelJob3.class - .getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json"))); + final ArgumentApplicationParser parser = + new ArgumentApplicationParser( + IOUtils.toString( + SparkResultToCommunityThroughSemRelJob3.class.getResourceAsStream( + "/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_communitytoresult_parameters.json"))); parser.parseArgument(args); - SparkConf conf = new SparkConf(); + SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - final SparkSession spark = SparkSession - .builder() - .appName(SparkResultToCommunityThroughSemRelJob3.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .enableHiveSupport() - .getOrCreate(); + final SparkSession spark = + SparkSession.builder() + .appName(SparkResultToCommunityThroughSemRelJob3.class.getSimpleName()) + .master(parser.get("master")) + .config(conf) + .enableHiveSupport() + .getOrCreate(); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final String inputPath = parser.get("sourcePath"); @@ -45,42 +46,48 @@ public class SparkResultToCommunityThroughSemRelJob3 { final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); - final List communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); + final List communityIdList = + QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); + JavaRDD publication_rdd = + sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, Publication.class)); - JavaRDD publication_rdd = sc.textFile(inputPath + "/publication") - .map(item -> new ObjectMapper().readValue(item, Publication.class)); + JavaRDD dataset_rdd = + sc.textFile(inputPath + "/dataset") + .map(item -> new ObjectMapper().readValue(item, Dataset.class)); - JavaRDD dataset_rdd = sc.textFile(inputPath + "/dataset") - .map(item -> new ObjectMapper().readValue(item, Dataset.class)); + JavaRDD orp_rdd = + sc.textFile(inputPath + "/otherresearchproduct") + .map( + item -> + new ObjectMapper() + .readValue(item, OtherResearchProduct.class)); - JavaRDD orp_rdd = sc.textFile(inputPath + "/otherresearchproduct") - .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); + JavaRDD software_rdd = + sc.textFile(inputPath + "/software") + .map(item -> new ObjectMapper().readValue(item, Software.class)); - JavaRDD software_rdd = sc.textFile(inputPath + "/software") - .map(item -> new ObjectMapper().readValue(item, Software.class)); + JavaRDD relation_rdd = + sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)); - JavaRDD relation_rdd = sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)); + org.apache.spark.sql.Dataset publication = + spark.createDataset(publication_rdd.rdd(), Encoders.bean(Publication.class)); + org.apache.spark.sql.Dataset relation = + spark.createDataset(relation_rdd.rdd(), Encoders.bean(Relation.class)); - org.apache.spark.sql.Dataset publication = spark.createDataset(publication_rdd.rdd(), - Encoders.bean(Publication.class)); + org.apache.spark.sql.Dataset dataset = + spark.createDataset(dataset_rdd.rdd(), Encoders.bean(Dataset.class)); - org.apache.spark.sql.Dataset relation = spark.createDataset(relation_rdd.rdd(), - Encoders.bean(Relation.class)); - - org.apache.spark.sql.Dataset dataset = spark.createDataset(dataset_rdd.rdd(), - Encoders.bean(Dataset.class)); - - org.apache.spark.sql.Dataset other = spark.createDataset(orp_rdd.rdd(), - Encoders.bean(OtherResearchProduct.class)); - - org.apache.spark.sql.Dataset software = spark.createDataset(software_rdd.rdd(), - Encoders.bean(Software.class)); + org.apache.spark.sql.Dataset other = + spark.createDataset(orp_rdd.rdd(), Encoders.bean(OtherResearchProduct.class)); + org.apache.spark.sql.Dataset software = + spark.createDataset(software_rdd.rdd(), Encoders.bean(Software.class)); publication.createOrReplaceTempView("publication"); relation.createOrReplaceTempView("relation"); @@ -90,327 +97,490 @@ public class SparkResultToCommunityThroughSemRelJob3 { String communitylist = getConstraintList(" co.id = '", communityIdList); - String semrellist = getConstraintList(" relClass = '", allowedsemrel ); + String semrellist = getConstraintList(" relClass = '", allowedsemrel); + String query = + "Select source, community_context, target " + + "from (select id, collect_set(co.id) community_context " + + "from publication " + + "lateral view explode (context) c as co " + + "where datainfo.deletedbyinference = false " + + communitylist + + " group by id) p " + + "JOIN " + + "(select * " + + "from relation " + + "where datainfo.deletedbyinference = false " + + semrellist + + ") r " + + "ON p.id = r.source"; - String query = "Select source, community_context, target " + - "from (select id, collect_set(co.id) community_context " + - "from publication " + - "lateral view explode (context) c as co " + - "where datainfo.deletedbyinference = false "+ communitylist + - " group by id) p " + - "JOIN " + - "(select * " + - "from relation " + - "where datainfo.deletedbyinference = false " + semrellist + ") r " + - "ON p.id = r.source"; - - - org.apache.spark.sql.Dataset publication_context = spark.sql( query); + org.apache.spark.sql.Dataset publication_context = spark.sql(query); publication_context.createOrReplaceTempView("publication_context"); - //( source, (mes, dh-ch-, ni), target ) - query = "select target , collect_set(co) " + - "from (select target, community_context " + - "from publication_context pc join publication p on " + - "p.id = pc.source) tmp " + - "lateral view explode (community_context) c as co " + - "group by target"; - - + // ( source, (mes, dh-ch-, ni), target ) + query = + "select target , collect_set(co) " + + "from (select target, community_context " + + "from publication_context pc join publication p on " + + "p.id = pc.source) tmp " + + "lateral view explode (community_context) c as co " + + "group by target"; org.apache.spark.sql.Dataset toupdatepublicationreresult = spark.sql(query); - org.apache.spark.sql.Dataset toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software"); - org.apache.spark.sql.Dataset toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset"); - org.apache.spark.sql.Dataset toupdateotherresult = getUpdateCommunitiesForTable(spark, "other"); + org.apache.spark.sql.Dataset toupdatesoftwareresult = + getUpdateCommunitiesForTable(spark, "software"); + org.apache.spark.sql.Dataset toupdatedatasetresult = + getUpdateCommunitiesForTable(spark, "dataset"); + org.apache.spark.sql.Dataset toupdateotherresult = + getUpdateCommunitiesForTable(spark, "other"); - createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + createUpdateForResultDatasetWrite( + toupdatesoftwareresult.toJavaRDD(), + outputPath, + "software_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, + communityIdList); - createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + createUpdateForResultDatasetWrite( + toupdatedatasetresult.toJavaRDD(), + outputPath, + "dataset_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, + communityIdList); - createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + createUpdateForResultDatasetWrite( + toupdatepublicationreresult.toJavaRDD(), + outputPath, + "publication_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, + communityIdList); - createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + createUpdateForResultDatasetWrite( + toupdateotherresult.toJavaRDD(), + outputPath, + "other_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, + communityIdList); - updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + updateForDatasetDataset( + toupdatedatasetresult.toJavaRDD(), + dataset.toJavaRDD(), + outputPath, + "dataset", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, + communityIdList); - updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + updateForOtherDataset( + toupdateotherresult.toJavaRDD(), + other.toJavaRDD(), + outputPath, + "otherresearchproduct", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, + communityIdList); - updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + updateForSoftwareDataset( + toupdatesoftwareresult.toJavaRDD(), + software.toJavaRDD(), + outputPath, + "software", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, + communityIdList); - updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); -// + updateForPublicationDataset( + toupdatepublicationreresult.toJavaRDD(), + publication.toJavaRDD(), + outputPath, + "publication", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, + communityIdList); + // -/* - JavaPairRDD resultLinkedToCommunities = publication - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - .union(datasets - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"dataset")) + /* + JavaPairRDD resultLinkedToCommunities = publication + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication")) .filter(p -> !(p == null)) .mapToPair(toPair()) - ) - .union(software - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"software")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - ) - .union(other - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"otherresearchproduct")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - ); + .union(datasets + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"dataset")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + ) + .union(software + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"software")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + ) + .union(other + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"otherresearchproduct")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + ); - JavaPairRDD to_add_result_communities = resultLinkedToCommunities.join(result_result).map(r -> r._2()._1().setSourceId(r._2()._2().getTargetId())) - .mapToPair(toPair()); + JavaPairRDD to_add_result_communities = resultLinkedToCommunities.join(result_result).map(r -> r._2()._1().setSourceId(r._2()._2().getTargetId())) + .mapToPair(toPair()); - JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); - updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - //leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato] - //per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla -*/ + updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + //leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato] + //per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla + */ } - private static org.apache.spark.sql.Dataset getUpdateCommunitiesForTable(SparkSession spark, String table){ - String query = "SELECT target_id, collect_set(co.id) context_id " + - " FROM (SELECT t.id target_id, s.context source_context " + - " FROM context_software s " + - " JOIN " + table + " t " + - " ON s.target = t.id " + - " UNION ALL " + - " SELECT t.id target_id, d.context source_context " + - " FROM dataset_context d " + - " JOIN " + table + " t" + - " ON s.target = t.id " + - " UNION ALL " + - " SELECT t.id target_id, p.context source_context " + - " FROM publication_context p" + - " JOIN " + table +" t " + - " on p.target = t.id " + - " UNION ALL " + - " SELECT t.id target_id, o.context source_context " + - " FROM other_context o " + - " JOIN " + table + " t " + - " ON o.target = t.id) TMP " + - " LATERAL VIEW EXPLODE(source_context) MyT as co " + - " GROUP BY target_id" ; + private static org.apache.spark.sql.Dataset getUpdateCommunitiesForTable( + SparkSession spark, String table) { + String query = + "SELECT target_id, collect_set(co.id) context_id " + + " FROM (SELECT t.id target_id, s.context source_context " + + " FROM context_software s " + + " JOIN " + + table + + " t " + + " ON s.target = t.id " + + " UNION ALL " + + " SELECT t.id target_id, d.context source_context " + + " FROM dataset_context d " + + " JOIN " + + table + + " t" + + " ON s.target = t.id " + + " UNION ALL " + + " SELECT t.id target_id, p.context source_context " + + " FROM publication_context p" + + " JOIN " + + table + + " t " + + " on p.target = t.id " + + " UNION ALL " + + " SELECT t.id target_id, o.context source_context " + + " FROM other_context o " + + " JOIN " + + table + + " t " + + " ON o.target = t.id) TMP " + + " LATERAL VIEW EXPLODE(source_context) MyT as co " + + " GROUP BY target_id"; return spark.sql(query); } - private static JavaRDD createUpdateForResultDatasetWrite(JavaRDD toupdateresult, String outputPath, String type, String class_id, String class_name, List communityIdList){ - return toupdateresult.map(r -> { - List contextList = new ArrayList(); - List toAddContext = r.getList(1); - for (String cId : toAddContext) { - if (communityIdList.contains(cId)) { - Context newContext = new Context(); - newContext.setId(cId); - newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); - contextList.add(newContext); - } + private static JavaRDD createUpdateForResultDatasetWrite( + JavaRDD toupdateresult, + String outputPath, + String type, + String class_id, + String class_name, + List communityIdList) { + return toupdateresult + .map( + r -> { + List contextList = new ArrayList(); + List toAddContext = r.getList(1); + for (String cId : toAddContext) { + if (communityIdList.contains(cId)) { + Context newContext = new Context(); + newContext.setId(cId); + newContext.setDataInfo( + Arrays.asList( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + class_id, + class_name))); + contextList.add(newContext); + } + } - } - - if (contextList.size() > 0) { - Result ret = new Result(); - ret.setId(r.getString(0)); - ret.setContext(contextList); - return ret; - } - return null; - }).filter(r -> r != null); + if (contextList.size() > 0) { + Result ret = new Result(); + ret.setId(r.getString(0)); + ret.setContext(contextList); + return ret; + } + return null; + }) + .filter(r -> r != null); } - private static void updateForSoftwareDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) - .map(r -> (Software) r) - .map(s -> new ObjectMapper().writeValueAsString(s)) - .saveAsTextFile(outputPath + "/" + type); + private static void updateForSoftwareDataset( + JavaRDD toupdateresult, + JavaRDD result, + String outputPath, + String type, + String class_id, + String class_name, + List communityIdList) { + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset( + toupdateresult, + tmp, + outputPath, + type, + class_id, + class_name, + communityIdList) + .map(r -> (Software) r) + .map(s -> new ObjectMapper().writeValueAsString(s)) + .saveAsTextFile(outputPath + "/" + type); } - private static void updateForDatasetDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) - .map( r-> (Dataset)r) + private static void updateForDatasetDataset( + JavaRDD toupdateresult, + JavaRDD result, + String outputPath, + String type, + String class_id, + String class_name, + List communityIdList) { + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset( + toupdateresult, + tmp, + outputPath, + type, + class_id, + class_name, + communityIdList) + .map(r -> (Dataset) r) .map(d -> new ObjectMapper().writeValueAsString(d)) .saveAsTextFile(outputPath + "/" + type); } - private static void updateForPublicationDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) - .map(r -> (Publication)r) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/" + type); + private static void updateForPublicationDataset( + JavaRDD toupdateresult, + JavaRDD result, + String outputPath, + String type, + String class_id, + String class_name, + List communityIdList) { + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset( + toupdateresult, + tmp, + outputPath, + type, + class_id, + class_name, + communityIdList) + .map(r -> (Publication) r) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/" + type); } - private static void updateForOtherDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) - .map( r -> (OtherResearchProduct)r) - .map( o -> new ObjectMapper().writeValueAsString(o)) - .saveAsTextFile(outputPath + "/" + type); + private static void updateForOtherDataset( + JavaRDD toupdateresult, + JavaRDD result, + String outputPath, + String type, + String class_id, + String class_name, + List communityIdList) { + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset( + toupdateresult, + tmp, + outputPath, + type, + class_id, + class_name, + communityIdList) + .map(r -> (OtherResearchProduct) r) + .map(o -> new ObjectMapper().writeValueAsString(o)) + .saveAsTextFile(outputPath + "/" + type); } + private static JavaRDD getUpdateForResultDataset( + JavaRDD toupdateresult, + JavaPairRDD result, + String outputPath, + String type, + String class_id, + String class_name, + List communityIdList) { + return result.leftOuterJoin( + toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))) + .map( + c -> { + if (!c._2()._2().isPresent()) { + return c._2()._1(); + } - - private static JavaRDD getUpdateForResultDataset(JavaRDD toupdateresult, JavaPairRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - return result.leftOuterJoin(toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))) - .map(c -> { - if(! c._2()._2().isPresent()){ - return c._2()._1(); - } - - List toAddContext = c._2()._2().get(); - Set context_set = new HashSet<>(); - for(Object cId: toAddContext){ - String id = (String)cId; - if (communityIdList.contains(id)){ - context_set.add(id); - } - } - for (Context context: c._2()._1().getContext()){ - if(context_set.contains(context)){ - context_set.remove(context); - } - } - - List contextList = context_set.stream().map(co -> { - Context newContext = new Context(); - newContext.setId(co); - newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); - return newContext; - - }).collect(Collectors.toList()); - - if(contextList.size() > 0 ){ - Result r = new Result(); - r.setId(c._1()); - r.setContext(contextList); - return r; - } - return null; - }).filter(r -> r != null); - - -// return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) -// .join(result) -// .map(c -> { -// List toAddContext = c._2()._1(); -// Set context_set = new HashSet<>(); -// for(Object cId: toAddContext){ -// String id = (String)cId; -// if (communityIdList.contains(id)){ -// context_set.add(id); -// } -// } -// for (Context context: c._2()._2().getContext()){ -// if(context_set.contains(context)){ -// context_set.remove(context); -// } -// } -// -// List contextList = context_set.stream().map(co -> { -// Context newContext = new Context(); -// newContext.setId(co); -// newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); -// return newContext; -// -// }).collect(Collectors.toList()); -// -// if(contextList.size() > 0 ){ -// Result r = new Result(); -// r.setId(c._1()); -// r.setContext(contextList); -// return r; -// } -// return null; -// }) -// .filter(r -> r != null); - } - - private static JavaRDD createUpdateForSoftwareDataset(JavaRDD toupdateresult, List communityList, - JavaRDD result, String class_id, String class_name) { - return result - .mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList)) - .map(c -> { - Software oaf = c._2()._1(); - if (c._2()._2().isPresent()) { - - HashSet contexts = new HashSet<>(c._2()._2().get()); - - for (Context context : oaf.getContext()) { - if (contexts.contains(context.getId())){ - if (!context.getDataInfo().stream().map(di -> di.getInferenceprovenance()) - .collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ - context.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)); - //community id already in the context of the result. Remove it from the set that has to be added - contexts.remove(context.getId()); + List toAddContext = c._2()._2().get(); + Set context_set = new HashSet<>(); + for (Object cId : toAddContext) { + String id = (String) cId; + if (communityIdList.contains(id)) { + context_set.add(id); + } + } + for (Context context : c._2()._1().getContext()) { + if (context_set.contains(context)) { + context_set.remove(context); } } - } - List cc = oaf.getContext(); - for(String cId: contexts){ - Context context = new Context(); - context.setId(cId); - context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); - cc.add(context); - } - oaf.setContext(cc); + List contextList = + context_set.stream() + .map( + co -> { + Context newContext = new Context(); + newContext.setId(co); + newContext.setDataInfo( + Arrays.asList( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + class_id, + class_name))); + return newContext; + }) + .collect(Collectors.toList()); + + if (contextList.size() > 0) { + Result r = new Result(); + r.setId(c._1()); + r.setContext(contextList); + return r; + } + return null; + }) + .filter(r -> r != null); + + // return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) + // .join(result) + // .map(c -> { + // List toAddContext = c._2()._1(); + // Set context_set = new HashSet<>(); + // for(Object cId: toAddContext){ + // String id = (String)cId; + // if (communityIdList.contains(id)){ + // context_set.add(id); + // } + // } + // for (Context context: c._2()._2().getContext()){ + // if(context_set.contains(context)){ + // context_set.remove(context); + // } + // } + // + // List contextList = context_set.stream().map(co -> { + // Context newContext = new Context(); + // newContext.setId(co); + // + // newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, + // class_name))); + // return newContext; + // + // }).collect(Collectors.toList()); + // + // if(contextList.size() > 0 ){ + // Result r = new Result(); + // r.setId(c._1()); + // r.setContext(contextList); + // return r; + // } + // return null; + // }) + // .filter(r -> r != null); + } + + private static JavaRDD createUpdateForSoftwareDataset( + JavaRDD toupdateresult, + List communityList, + JavaRDD result, + String class_id, + String class_name) { + return result.mapToPair(s -> new Tuple2<>(s.getId(), s)) + .leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList)) + .map( + c -> { + Software oaf = c._2()._1(); + if (c._2()._2().isPresent()) { + + HashSet contexts = new HashSet<>(c._2()._2().get()); + + for (Context context : oaf.getContext()) { + if (contexts.contains(context.getId())) { + if (!context.getDataInfo().stream() + .map(di -> di.getInferenceprovenance()) + .collect(Collectors.toSet()) + .contains(PROPAGATION_DATA_INFO_TYPE)) { + context.getDataInfo() + .add( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + class_id, + class_name)); + // community id already in the context of the result. + // Remove it from the set that has to be added + contexts.remove(context.getId()); + } + } + } + List cc = oaf.getContext(); + for (String cId : contexts) { + Context context = new Context(); + context.setId(cId); + context.setDataInfo( + Arrays.asList( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + class_id, + class_name))); + cc.add(context); + } + oaf.setContext(cc); + } + return oaf; + }); + } + + private static JavaPairRDD> getStringResultJavaPairRDD( + JavaRDD toupdateresult, List communityList) { + return toupdateresult.mapToPair( + c -> { + List contextList = new ArrayList<>(); + List contexts = c.getList(1); + for (String context : contexts) { + if (communityList.contains(context)) { + contextList.add(context); + } } - return oaf; + + return new Tuple2<>(c.getString(0), contextList); }); } - private static JavaPairRDD> getStringResultJavaPairRDD(JavaRDD toupdateresult, List communityList) { - return toupdateresult.mapToPair(c -> { - - List contextList = new ArrayList<>(); - List contexts = c.getList(1); - for (String context : contexts) { - if (communityList.contains(context)) { - contextList.add(context); - } - } - - return new Tuple2<>(c.getString(0) ,contextList); - }); - } - - - private static org.apache.spark.sql.Dataset getContext(SparkSession spark, String table){ - String query = "SELECT relation.source, " + table +".context , relation.target " + - "FROM " + table + - " JOIN relation " + - "ON id = source" ; + private static org.apache.spark.sql.Dataset getContext(SparkSession spark, String table) { + String query = + "SELECT relation.source, " + + table + + ".context , relation.target " + + "FROM " + + table + + " JOIN relation " + + "ON id = source"; return spark.sql(query); } private static Boolean relatedToCommunities(Result r, List communityIdList) { - Set result_communities = r.getContext() - .stream() - .map(c -> c.getId()) - .collect(Collectors.toSet()); + Set result_communities = + r.getContext().stream().map(c -> c.getId()).collect(Collectors.toSet()); for (String communityId : result_communities) { if (communityIdList.contains(communityId)) { return true; @@ -419,44 +589,61 @@ public class SparkResultToCommunityThroughSemRelJob3 { return false; } - private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { + private static void updateResult( + JavaPairRDD results, + JavaPairRDD toupdateresult, + String outputPath, + String type) { results.leftOuterJoin(toupdateresult) - .map(p -> { - Result r = p._2()._1(); - if (p._2()._2().isPresent()){ - Set communityList = p._2()._2().get().getAccumulator(); - for(Context c: r.getContext()){ - if (communityList.contains(c.getId())){ - //verify if the datainfo for this context contains propagation - if (!c.getDataInfo().stream().map(di -> di.getInferenceprovenance()).collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ - c.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)); - //community id already in the context of the result. Remove it from the set that has to be added - communityList.remove(c.getId()); + .map( + p -> { + Result r = p._2()._1(); + if (p._2()._2().isPresent()) { + Set communityList = p._2()._2().get().getAccumulator(); + for (Context c : r.getContext()) { + if (communityList.contains(c.getId())) { + // verify if the datainfo for this context contains + // propagation + if (!c.getDataInfo().stream() + .map(di -> di.getInferenceprovenance()) + .collect(Collectors.toSet()) + .contains(PROPAGATION_DATA_INFO_TYPE)) { + c.getDataInfo() + .add( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)); + // community id already in the context of the result. + // Remove it from the set that has to be added + communityList.remove(c.getId()); + } + } } + List cc = r.getContext(); + for (String cId : communityList) { + Context context = new Context(); + context.setId(cId); + context.setDataInfo( + Arrays.asList( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); + cc.add(context); + } + r.setContext(cc); } - } - List cc = r.getContext(); - for(String cId: communityList){ - Context context = new Context(); - context.setId(cId); - context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); - cc.add(context); - } - r.setContext(cc); - } - return r; - }) + return r; + }) .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath+"/"+type); + .saveAsTextFile(outputPath + "/" + type); } - - - private static TypedRow getTypedRow(List communityIdList, List context, String id, String type) { - Set result_communities = context - .stream() - .map(c -> c.getId()) - .collect(Collectors.toSet()); + private static TypedRow getTypedRow( + List communityIdList, List context, String id, String type) { + Set result_communities = + context.stream().map(c -> c.getId()).collect(Collectors.toSet()); TypedRow tp = new TypedRow(); tp.setSourceId(id); tp.setType(type);