diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganization.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganization.java index 1f634a6934..de14946ccf 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganization.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganization.java @@ -4,17 +4,18 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import eu.dnetlib.dhp.TypedRow; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.Text; +import org.apache.hadoop.fs.FileSystem; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import scala.Tuple2; -import java.io.File; import java.util.*; import java.util.stream.Collectors; @@ -26,10 +27,13 @@ public class SparkResultToCommunityFromOrganization { final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToCommunityFromOrganization.class.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromorganization/input_communitytoresult_parameters.json"))); parser.parseArgument(args); + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); final SparkSession spark = SparkSession .builder() .appName(SparkResultToCommunityFromOrganization.class.getSimpleName()) .master(parser.get("master")) + .config(conf) .enableHiveSupport() .getOrCreate(); @@ -37,83 +41,226 @@ public class SparkResultToCommunityFromOrganization { final String inputPath = parser.get("sourcePath"); final String outputPath = "/tmp/provision/propagation/communitytoresult"; final OrganizationMap organizationMap = new Gson().fromJson(parser.get("organizationtoresultcommunitymap"), OrganizationMap.class); + boolean writeUpdates = TRUE.equals(parser.get("writeUpdate")); + boolean saveGraph = TRUE.equals(parser.get("saveGraph")); + + System.out.println(new Gson().toJson(organizationMap)); + + createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); + + JavaRDD relations_rdd_all = sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)); + JavaRDD publications_rdd_all = sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, Publication.class)); + JavaRDD dataset_rdd_all = sc.textFile(inputPath + "/dataset") + .map(item -> new ObjectMapper().readValue(item, Dataset.class)); + JavaRDD orp_rdd_all = sc.textFile(inputPath + "/otheresearchproduct") + .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); + JavaRDD software_rdd_all = sc.textFile(inputPath + "/software") + .map(item -> new ObjectMapper().readValue(item, Software.class)); + + org.apache.spark.sql.Dataset relation = spark.createDataset(relations_rdd_all.rdd(), Encoders.bean(Relation.class)); + + relation.createOrReplaceTempView("relation"); +// String query = "SELECT source, target" + +// " FROM relation " + +// " WHERE datainfo.deletedbyinference = false " + +// " AND relClass = '" + RELATION_RESULT_ORGANIZATION_REL_CLASS + "'"; +// +// org.apache.spark.sql.Dataset result_organization = spark.sql(query); +// result_organization.createOrReplaceTempView("result_organization"); +// +// query = "SELECT source, target " + +// "FROM relation " + +// "WHERE datainfo.deletedbyinference = false " + +// "AND relClass = '" + RELATION_REPRESENTATIVERESULT_RESULT_CLASS + "'" ; + + String query = "SELECT result_organization.source, result_organization.target, org_set " + + "FROM (SELECT source, target " + + " FROM relation " + + " WHERE datainfo.deletedbyinference = false " + + " AND relClass = '" + RELATION_RESULT_ORGANIZATION_REL_CLASS + "') result_organization " + + "LEFT JOIN (SELECT source, collect_set(target) org_set " + + " FROM relation " + + " WHERE datainfo.deletedbyinference = false " + + " AND relClass = '" + RELATION_REPRESENTATIVERESULT_RESULT_CLASS + "' " + + " GROUP BY source) organization_organization " + + "ON result_organization.target = organization_organization.source "; + + org.apache.spark.sql.Dataset result_organizationset = spark.sql(query); + + JavaPairRDD result_communitySet = result_organizationset.toJavaRDD().map(r -> { + String rId = r.getString(0); + List orgs = r.getList(2); + String oTarget = r.getString(1); + TypedRow tp = new TypedRow(); + if (organizationMap.containsKey(oTarget)) { + tp.addAll(organizationMap.get(oTarget)); + } + try{ + for (String oId : orgs) { + if (organizationMap.containsKey(oId)) { + tp.addAll(organizationMap.get(oId)); + } + } + } + catch(Exception e){ + //System.out.println("organizationTargetID: " + oTarget); + } + + if(tp.getAccumulator() == null ){ + return null; + } + tp.setSourceId(rId); - File directory = new File(outputPath); + return tp; + }) + .filter(tr -> !(tr==null)) + .mapToPair(toPair()).cache(); - if (!directory.exists()) { - directory.mkdirs(); + if(writeUpdates){ + result_communitySet.map(c->new ObjectMapper().writeValueAsString(c)).saveAsTextFile(outputPath +"/result_communityset"); } - JavaRDD relations = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)).cache(); + if(saveGraph){ + updatePublicationResult(publications_rdd_all, result_communitySet) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/publication"); + + updateDatasetResult(dataset_rdd_all, result_communitySet) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/dataset"); + + updateORPResult(orp_rdd_all, result_communitySet) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/otherresearchproduct"); + + updateSoftwareResult(software_rdd_all, result_communitySet) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/software"); + } //relations between organziations and results - JavaPairRDD organization_result = relations - .filter(r -> !r.getDataInfo().getDeletedbyinference()) - .filter(r -> RELATION_RESULT_ORGANIZATION_REL_CLASS.equals(r.getRelClass()) && RELATION_RESULTORGANIZATION_REL_TYPE.equals(r.getRelType())) - .map(r -> new TypedRow().setSourceId(r.getTarget()).setTargetId(r.getSource() )) - .mapToPair(toPair()); +// JavaPairRDD organization_result = relations +// .filter(r -> !r.getDataInfo().getDeletedbyinference()) +// .filter(r -> RELATION_RESULT_ORGANIZATION_REL_CLASS.equals(r.getRelClass()) +// && RELATION_RESULTORGANIZATION_REL_TYPE.equals(r.getRelType())) +// .map(r -> { +// TypedRow tp = new TypedRow(); +// tp.setSourceId(r.getTarget()); +// tp.setTargetId(r.getSource() ); +// return tp; +// }) +// .mapToPair(toPair()); //relations between representative organization and merged Id. One relation per merged organization - JavaPairRDD organization_organization = relations - .filter(r -> !r.getDataInfo().getDeletedbyinference()) - .filter( r -> RELATION_ORGANIZATIONORGANIZATION_REL_TYPE.equals(r.getRelType()) && RELATION_REPRESENTATIVERESULT_RESULT_CLASS.equals(r.getRelClass())) - .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) - .mapToPair(toPair()); +// JavaPairRDD organization_organization = relations +// .filter(r -> !r.getDataInfo().getDeletedbyinference()) +// .filter( r -> RELATION_ORGANIZATIONORGANIZATION_REL_TYPE.equals(r.getRelType()) +// && RELATION_REPRESENTATIVERESULT_RESULT_CLASS.equals(r.getRelClass())) +// .map(r -> { +// TypedRow tp = new TypedRow(); +// tp.setSourceId(r.getSource()); +// tp.setTargetId(r.getTarget()); +// return tp; +// }) +// .mapToPair(toPair()); //get the original id of the organizations to be checked against the id associated to the communities - JavaPairRDD organizationoriginal_result = organization_result.leftOuterJoin(organization_organization) - .map(c -> { - if (!c._2()._2().isPresent()) - return c._2()._1(); - return c._2()._1().setSourceId(c._2()._2().get().getTargetId()); - }) - .mapToPair(toPair()); +// JavaPairRDD organizationoriginal_result = organization_result.leftOuterJoin(organization_organization) +// .map(c -> { +// if (!c._2()._2().isPresent()) +// return c._2()._1(); +// return c._2()._1().setSourceId(c._2()._2().get().getTargetId()); +// }) +// .mapToPair(toPair()); //associates to each result connected to an organization the list of communities related to that organization - JavaPairRDD to_add_result_communities = organizationoriginal_result.map(o -> { - List communityList = organizationMap.get(o._1()); - if (communityList.size() == 0) - return null; - TypedRow tp = o._2(); - tp.setAccumulator(new HashSet<>(communityList)).setSourceId(tp.getTargetId()); - return tp; - }) - .filter(r -> !(r == null)) - .mapToPair(toPair()) - .reduceByKey((a, b) -> { - if (a == null) - return b; - if (b == null) - return a; - a.addAll(b.getAccumulator()); - return a; - }); +// JavaPairRDD to_add_result_communities = organizationoriginal_result.map(o -> { +// List communityList = organizationMap.get(o._1()); +// if (communityList.size() == 0) +// return null; +// TypedRow tp = o._2(); +// tp.setAccumulator(new HashSet<>(communityList)); +// tp.setSourceId(tp.getTargetId()); +// return tp; +// }) +// .filter(r -> !(r == null)) +// .mapToPair(toPair()) +// .reduceByKey((a, b) -> { +// if (a == null) +// return b; +// if (b == null) +// return a; +// a.addAll(b.getAccumulator()); +// return a; +// }); - JavaRDD publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)).cache(); - JavaRDD datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)).cache(); - JavaRDD software = sc.sequenceFile(inputPath + "/software", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)).cache(); - JavaRDD other = sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)).cache(); - - JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); - - updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME); - updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME); - updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME); - updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME); +// JavaRDD publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) +// .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)).cache(); +// JavaRDD datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) +// .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)).cache(); +// JavaRDD software = sc.sequenceFile(inputPath + "/software", Text.class, Text.class) +// .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)).cache(); +// JavaRDD other = sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class) +// .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)).cache(); +// +// JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); +// JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); +// JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); +// JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); +// +// updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME); +// updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME); +// updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME); +// updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME); } + private static JavaRDD updatePublicationResult(JavaRDD result, JavaPairRDD result_communitySet){ + JavaRDD tmp = result.map(r -> (Result)r); + return updateResultDataset(tmp, result_communitySet).map(r -> (Publication)r); + + } + private static JavaRDD updateDatasetResult(JavaRDD result, JavaPairRDD result_communitySet){ + JavaRDD tmp = result.map(r -> (Result)r); + return updateResultDataset(tmp, result_communitySet).map(r -> (Dataset) r); + + } + private static JavaRDD updateORPResult(JavaRDD result, JavaPairRDD result_communitySet){ + JavaRDD tmp = result.map(r -> (Result)r); + return updateResultDataset(tmp, result_communitySet).map(r -> (OtherResearchProduct)r); + + } + private static JavaRDD updateSoftwareResult(JavaRDD result, JavaPairRDD result_communitySet){ + JavaRDD tmp = result.map(r -> (Result)r); + return updateResultDataset(tmp, result_communitySet).map(r -> (Software) r); + + } + private static JavaRDD updateResultDataset(JavaRDD result, JavaPairRDD result_communitySet){ + return result.mapToPair(p -> new Tuple2<>(p.getId(),p)).leftOuterJoin(result_communitySet) + .map(c -> { + Result r = c._2()._1(); + if(c._2()._2().isPresent()){ + Set communitySet = c._2()._2().get().getAccumulator(); + List contextList = r.getContext().stream().map(con -> con.getId()).collect(Collectors.toList()); + for(String cId:communitySet){ + if(!contextList.contains(cId)){ + Context newContext = new Context(); + newContext.setId(cId); + newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME))); + r.getContext().add(newContext); + } + } + } + return r; + }); + + } // private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { // results.leftOuterJoin(toupdateresult) // .map(p -> {