From b1af90a45f63a807379e485ed3d21f64ac3eed8e Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 30 Mar 2020 10:50:03 +0200 Subject: [PATCH] to make it visible to Claudio --- ...parkResultToCommunityThroughSemRelJob.java | 379 ++++++++++++++++-- 1 file changed, 356 insertions(+), 23 deletions(-) diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/communitytoresultthroughsemrel/SparkResultToCommunityThroughSemRelJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/communitytoresultthroughsemrel/SparkResultToCommunityThroughSemRelJob.java index ad3cb95b4..acc411fd0 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/communitytoresultthroughsemrel/SparkResultToCommunityThroughSemRelJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/communitytoresultthroughsemrel/SparkResultToCommunityThroughSemRelJob.java @@ -4,32 +4,37 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.QueryInformationSystem; import eu.dnetlib.dhp.TypedRow; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.Text; +import org.apache.hadoop.fs.FileSystem; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import scala.Tuple2; -import java.io.File; import java.util.*; import java.util.stream.Collectors; import static eu.dnetlib.dhp.PropagationConstant.*; -import static eu.dnetlib.dhp.PropagationConstant.PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME; public class SparkResultToCommunityThroughSemRelJob { public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToCommunityThroughSemRelJob.class.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json"))); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils + .toString(SparkResultToCommunityThroughSemRelJob.class + .getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json"))); parser.parseArgument(args); + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); final SparkSession spark = SparkSession .builder() .appName(SparkResultToCommunityThroughSemRelJob.class.getSimpleName()) .master(parser.get("master")) + .config(conf) .enableHiveSupport() .getOrCreate(); @@ -37,29 +42,125 @@ public class SparkResultToCommunityThroughSemRelJob { final String inputPath = parser.get("sourcePath"); final String outputPath = "/tmp/provision/propagation/communitytoresultthroughsemrel"; - final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); - final List communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); + //final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); + final List allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo"); + //final List communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); + final List communityIdList = QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp"); - File directory = new File(outputPath); + createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - if (!directory.exists()) { - directory.mkdirs(); - } - JavaPairRDD result_result = getResultResultSemRel(allowedsemrel, - sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class))); + JavaRDD all_publication_rdd = sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, Publication.class)) + .filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); + JavaRDD publication_rdd = all_publication_rdd + .filter(p -> relatedToCommunities(p, communityIdList)).cache(); - JavaRDD publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)).cache(); - JavaRDD datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)).cache(); - JavaRDD software = sc.sequenceFile(inputPath + "/software", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)).cache(); - JavaRDD other = sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)).cache(); + JavaRDD all_dataset_rdd = sc.textFile(inputPath + "/dataset") + .map(item -> new ObjectMapper().readValue(item, Dataset.class)) + .filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); + JavaRDD dataset_rdd = all_dataset_rdd + .filter(p -> relatedToCommunities(p, communityIdList)).cache(); - JavaPairRDD resultLinkedToCommunities = publications + JavaRDD all_orp_rdd = sc.textFile(inputPath + "/otherresearchproduct") + .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)) + .filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); + JavaRDD orp_rdd = all_orp_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache(); + + JavaRDD all_software_rdd = sc.textFile(inputPath + "/software") + .map(item -> new ObjectMapper().readValue(item, Software.class)) + .filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); + JavaRDD software_rdd = all_software_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache(); + + JavaRDD relation_rdd = sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)) + .filter(r -> !r.getDataInfo().getDeletedbyinference()) + .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache(); + + + org.apache.spark.sql.Dataset publication = spark.createDataset(publication_rdd.rdd(), + Encoders.bean(Publication.class)); + + org.apache.spark.sql.Dataset dataset = spark.createDataset(dataset_rdd.rdd(), + Encoders.bean(Dataset.class)); + + org.apache.spark.sql.Dataset other = spark.createDataset(orp_rdd.rdd(), + Encoders.bean(OtherResearchProduct.class)); + + org.apache.spark.sql.Dataset software = spark.createDataset(software_rdd.rdd(), + Encoders.bean(Software.class)); + + org.apache.spark.sql.Dataset relation = spark.createDataset(relation_rdd.rdd(), + Encoders.bean(Relation.class)); + + publication.createOrReplaceTempView("publication"); + relation.createOrReplaceTempView("relation"); + dataset.createOrReplaceTempView("dataset"); + software.createOrReplaceTempView("software"); + other.createOrReplaceTempView("other"); + + org.apache.spark.sql.Dataset publication_context = getContext(spark, "publication"); + publication_context.createOrReplaceTempView("publication_context"); + + org.apache.spark.sql.Dataset dataset_context = getContext(spark, "dataset"); + dataset_context.createOrReplaceTempView("dataset_context"); + + org.apache.spark.sql.Dataset software_context = getContext(spark, "software"); + software_context.createOrReplaceTempView("software_context"); + + org.apache.spark.sql.Dataset other_context = getContext(spark, "other"); + other_context.createOrReplaceTempView("other_context"); + + publication = spark.createDataset(all_publication_rdd.rdd(), + Encoders.bean(Publication.class)); + publication.createOrReplaceTempView("publication"); + + dataset = spark.createDataset(all_dataset_rdd.rdd(), + Encoders.bean(Dataset.class)); + dataset.createOrReplaceTempView("dataset"); + + other = spark.createDataset(all_orp_rdd.rdd(), + Encoders.bean(OtherResearchProduct.class)); + other.createOrReplaceTempView("other"); + + software = spark.createDataset(all_software_rdd.rdd(), + Encoders.bean(Software.class)); + software.createOrReplaceTempView("software"); + + + org.apache.spark.sql.Dataset toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software"); + org.apache.spark.sql.Dataset toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset"); + org.apache.spark.sql.Dataset toupdatepublicationreresult = getUpdateCommunitiesForTable(spark, "publication"); + org.apache.spark.sql.Dataset toupdateotherresult = getUpdateCommunitiesForTable(spark, "other"); + + createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + + updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + updateForDatasetDataset(toupdatesoftwareresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "software", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + updateForOtherDataset(toupdatepublicationreresult.toJavaRDD(), other.toJavaRDD(), outputPath, "publication", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + +/* + JavaPairRDD resultLinkedToCommunities = publication .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication")) .filter(p -> !(p == null)) .mapToPair(toPair()) @@ -93,7 +194,239 @@ public class SparkResultToCommunityThroughSemRelJob { updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); //leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato] //per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla +*/ + } + private static org.apache.spark.sql.Dataset getUpdateCommunitiesForTable(SparkSession spark, String table){ + String query = "SELECT target_id, collect_set(co.id) context_id " + + " FROM (SELECT t.id target_id, s.context source_context " + + " FROM context_software s " + + " JOIN " + table + " t " + + " ON s.target = t.id " + + " UNION ALL " + + " SELECT t.id target_id, d.context source_context " + + " FROM dataset_context d " + + " JOIN " + table + " t" + + " ON s.target = t.id " + + " UNION ALL " + + " SELECT t.id target_id, p.context source_context " + + " FROM publication_context p" + + " JOIN " + table +" t " + + " on p.target = t.id " + + " UNION ALL " + + " SELECT t.id target_id, o.context source_context " + + " FROM other_context o " + + " JOIN " + table + " t " + + " ON o.target = t.id) TMP " + + " LATERAL VIEW EXPLORE(source_context) MyT as co " + + " GROUP BY target_id" ; + + return spark.sql(query); + } + + private static JavaRDD createUpdateForResultDatasetWrite(JavaRDD toupdateresult, String outputPath, String type, String class_id, String class_name, List communityIdList){ + return toupdateresult.map(r -> { + List contextList = new ArrayList(); + List toAddContext = r.getList(1); + for (String cId : toAddContext) { + if (communityIdList.contains(cId)) { + Context newContext = new Context(); + newContext.setId(cId); + newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); + contextList.add(newContext); + } + + } + + if (contextList.size() > 0) { + Result ret = new Result(); + ret.setId(r.getString(0)); + ret.setContext(contextList); + return ret; + } + return null; + }).filter(r -> r != null); + } + + private static void updateForSoftwareDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) + .map(r -> (Software) r) + .map(s -> new ObjectMapper().writeValueAsString(s)) + .saveAsTextFile(outputPath + "/" + type); + } + + private static void updateForDatasetDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) + .map( r-> (Dataset)r) + .map(d -> new ObjectMapper().writeValueAsString(d)) + .saveAsTextFile(outputPath + "/" + type); + } + + private static void updateForPublicationDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) + .map(r -> (Publication)r) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/" + type); + } + + private static void updateForOtherDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) + .map( r -> (OtherResearchProduct)r) + .map( o -> new ObjectMapper().writeValueAsString(o)) + .saveAsTextFile(outputPath + "/" + type); + } + + + + private static JavaRDD getUpdateForResultDataset(JavaRDD toupdateresult, JavaPairRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ + return result.leftOuterJoin(toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))) + .map(c -> { + if(! c._2()._2().isPresent()){ + return c._2()._1(); + } + + List toAddContext = c._2()._2().get(); + Set context_set = new HashSet<>(); + for(Object cId: toAddContext){ + String id = (String)cId; + if (communityIdList.contains(id)){ + context_set.add(id); + } + } + for (Context context: c._2()._1().getContext()){ + if(context_set.contains(context)){ + context_set.remove(context); + } + } + + List contextList = context_set.stream().map(co -> { + Context newContext = new Context(); + newContext.setId(co); + newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); + return newContext; + + }).collect(Collectors.toList()); + + if(contextList.size() > 0 ){ + Result r = new Result(); + r.setId(c._1()); + r.setContext(contextList); + return r; + } + return null; + }).filter(r -> r != null); + + +// return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) +// .join(result) +// .map(c -> { +// List toAddContext = c._2()._1(); +// Set context_set = new HashSet<>(); +// for(Object cId: toAddContext){ +// String id = (String)cId; +// if (communityIdList.contains(id)){ +// context_set.add(id); +// } +// } +// for (Context context: c._2()._2().getContext()){ +// if(context_set.contains(context)){ +// context_set.remove(context); +// } +// } +// +// List contextList = context_set.stream().map(co -> { +// Context newContext = new Context(); +// newContext.setId(co); +// newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); +// return newContext; +// +// }).collect(Collectors.toList()); +// +// if(contextList.size() > 0 ){ +// Result r = new Result(); +// r.setId(c._1()); +// r.setContext(contextList); +// return r; +// } +// return null; +// }) +// .filter(r -> r != null); + } + + private static JavaRDD createUpdateForSoftwareDataset(JavaRDD toupdateresult, List communityList, + JavaRDD result, String class_id, String class_name) { + return result + .mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList)) + .map(c -> { + Software oaf = c._2()._1(); + if (c._2()._2().isPresent()) { + + HashSet contexts = new HashSet<>(c._2()._2().get()); + + for (Context context : oaf.getContext()) { + if (contexts.contains(context.getId())){ + if (!context.getDataInfo().stream().map(di -> di.getInferenceprovenance()) + .collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ + context.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)); + //community id already in the context of the result. Remove it from the set that has to be added + contexts.remove(context.getId()); + } + } + } + List cc = oaf.getContext(); + for(String cId: contexts){ + Context context = new Context(); + context.setId(cId); + context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); + cc.add(context); + } + oaf.setContext(cc); + + } + return oaf; + }); + } + + private static JavaPairRDD> getStringResultJavaPairRDD(JavaRDD toupdateresult, List communityList) { + return toupdateresult.mapToPair(c -> { + + List contextList = new ArrayList<>(); + List contexts = c.getList(1); + for (String context : contexts) { + if (communityList.contains(context)) { + contextList.add(context); + } + } + + return new Tuple2<>(c.getString(0) ,contextList); + }); + } + + + private static org.apache.spark.sql.Dataset getContext(SparkSession spark, String table){ + String query = "SELECT source, context , target " + + "FROM " + table + + " JOIN relation " + + "ON id = source" ; + + return spark.sql(query); + } + + private static Boolean relatedToCommunities(Result r, List communityIdList) { + Set result_communities = r.getContext() + .stream() + .map(c -> c.getId()) + .collect(Collectors.toSet()); + for (String communityId : result_communities) { + if (communityIdList.contains(communityId)) { + return true; + } + } + return false; } private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) {