From e000754c92cb679518bb086b189b2306559881d0 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 27 Apr 2020 10:34:03 +0200 Subject: [PATCH] refactoring --- .../PrepareResultCommunitySetStep1.java | 118 ++-- .../PrepareResultCommunitySetStep2.java | 93 ++- ...arkResultToCommunityThroughSemRelJob4.java | 555 ++++-------------- 3 files changed, 226 insertions(+), 540 deletions(-) diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java index d334e518d..cbb9b580e 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java @@ -1,5 +1,8 @@ package eu.dnetlib.dhp.resulttocommunityfromsemrel; +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import eu.dnetlib.dhp.QueryInformationSystem; @@ -7,6 +10,8 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Result; +import java.util.Arrays; +import java.util.List; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; @@ -18,23 +23,18 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.List; - -import static eu.dnetlib.dhp.PropagationConstant.*; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; - public class PrepareResultCommunitySetStep1 { private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySetStep1.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils.toString(PrepareResultCommunitySetStep1.class - .getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_preparecommunitytoresult_parameters.json")); + String jsonConfiguration = + IOUtils.toString( + PrepareResultCommunitySetStep1.class.getResourceAsStream( + "/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_preparecommunitytoresult_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - jsonConfiguration); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -62,30 +62,47 @@ public class PrepareResultCommunitySetStep1 { final List communityIdList = QueryInformationSystem.getCommunityList(isLookupUrl); log.info("communityIdList: {}", new Gson().toJson(communityIdList)); - final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); + final String resultType = + resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); log.info("resultType: {}", resultType); + Class resultClazz = + (Class) Class.forName(resultClassName); - Class resultClazz = (Class) Class.forName(resultClassName); - - - runWithSparkHiveSession(conf, isSparkSessionManaged, + runWithSparkHiveSession( + conf, + isSparkSessionManaged, spark -> { if (isTest(parser)) { removeOutputDir(spark, outputPath); } - prepareInfo(spark, inputPath, outputPath, allowedsemrel, resultClazz, resultType, + prepareInfo( + spark, + inputPath, + outputPath, + allowedsemrel, + resultClazz, + resultType, communityIdList); }); } - private static void prepareInfo(SparkSession spark, String inputPath, String outputPath, - List allowedsemrel, Class resultClazz, String resultType, - List communityIdList) { - //read the relation table and the table related to the result it is using + private static void prepareInfo( + SparkSession spark, + String inputPath, + String outputPath, + List allowedsemrel, + Class resultClazz, + String resultType, + List communityIdList) { + // read the relation table and the table related to the result it is using final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - org.apache.spark.sql.Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") - .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); + org.apache.spark.sql.Dataset relation = + spark.createDataset( + sc.textFile(inputPath + "/relation") + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)) + .rdd(), + Encoders.bean(Relation.class)); relation.createOrReplaceTempView("relation"); log.info("Reading Graph table from: {}", inputPath + "/" + resultType); @@ -93,16 +110,18 @@ public class PrepareResultCommunitySetStep1 { result.createOrReplaceTempView("result"); - getPossibleResultcommunityAssociation(spark, allowedsemrel, outputPath + "/" + resultType, communityIdList); - + getPossibleResultcommunityAssociation( + spark, allowedsemrel, outputPath + "/" + resultType, communityIdList); } - private static void getPossibleResultcommunityAssociation(SparkSession spark, List allowedsemrel, String outputPath, - List communityIdList) { + private static void getPossibleResultcommunityAssociation( + SparkSession spark, + List allowedsemrel, + String outputPath, + List communityIdList) { String communitylist = getConstraintList(" co.id = '", communityIdList); - String semrellist = getConstraintList(" relClass = '", allowedsemrel ); - + String semrellist = getConstraintList(" relClass = '", allowedsemrel); /* associates to each result the set of community contexts they are associated to @@ -115,33 +134,38 @@ public class PrepareResultCommunitySetStep1 { associates to each target of a relation with allowed semantics the set of community context it could possibly inherit from the source of the relation */ - String query = "Select target resultId, community_context " + - "from (select id, collect_set(co.id) community_context " + - " from result " + - " lateral view explode (context) c as co " + - " where datainfo.deletedbyinference = false "+ communitylist + - " group by id) p " + - "JOIN " + - "(select source, target " + - "from relation " + - "where datainfo.deletedbyinference = false " + semrellist + ") r " + - "ON p.id = r.source"; + String query = + "Select target resultId, community_context " + + "from (select id, collect_set(co.id) community_context " + + " from result " + + " lateral view explode (context) c as co " + + " where datainfo.deletedbyinference = false " + + communitylist + + " group by id) p " + + "JOIN " + + "(select source, target " + + "from relation " + + "where datainfo.deletedbyinference = false " + + semrellist + + ") r " + + "ON p.id = r.source"; - org.apache.spark.sql.Dataset result_context = spark.sql( query); + org.apache.spark.sql.Dataset result_context = spark.sql(query); result_context.createOrReplaceTempView("result_context"); - //( target, (mes, dh-ch-, ni)) + // ( target, (mes, dh-ch-, ni)) /* a dataset for example could be linked to more than one publication. For each publication linked to that dataset - the previous query will produce a row: targetId set of community context the te=arget could possibly inherit + the previous query will produce a row: targetId set of community context the target could possibly inherit with the following query there will be a single row for each result linked to more than one result of the result type currently being used */ - query = "select resultId , collect_set(co) communityList " + - "from result_context " + - "lateral view explode (community_context) c as co " + - "where length(co) > 0 " + - "group by resultId"; + query = + "select resultId , collect_set(co) communityList " + + "from result_context " + + "lateral view explode (community_context) c as co " + + "where length(co) > 0 " + + "group by resultId"; spark.sql(query) .as(Encoders.bean(ResultCommunityList.class)) diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java index 6efb2205b..3579db9e6 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java @@ -1,8 +1,13 @@ package eu.dnetlib.dhp.resulttocommunityfromsemrel; +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.orcidtoresultfromsemrel.ResultOrcidList; import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; +import java.util.HashSet; +import java.util.Set; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; @@ -11,27 +16,20 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; import scala.Tuple2; -import java.util.HashSet; -import java.util.Set; - -import static eu.dnetlib.dhp.PropagationConstant.*; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - - public class PrepareResultCommunitySetStep2 { private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySetStep2.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils.toString(PrepareResultCommunitySetStep2.class - .getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_prepareresulttocommunity2_parameters.json")); + String jsonConfiguration = + IOUtils.toString( + PrepareResultCommunitySetStep2.class.getResourceAsStream( + "/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_preparecommunitytoresult2_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - jsonConfiguration); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -46,60 +44,61 @@ public class PrepareResultCommunitySetStep2 { SparkConf conf = new SparkConf(); - runWithSparkSession(conf, isSparkSessionManaged, + runWithSparkSession( + conf, + isSparkSessionManaged, spark -> { if (isTest(parser)) { removeOutputDir(spark, outputPath); } - mergeInfo(spark, inputPath, outputPath); + mergeInfo(spark, inputPath, outputPath); }); - } private static void mergeInfo(SparkSession spark, String inputPath, String outputPath) { - Dataset resultOrcidAssocCommunityList = readResultCommunityList(spark, inputPath + "/publication") - .union(readResultCommunityList(spark, inputPath + "/dataset")) - .union(readResultCommunityList(spark, inputPath + "/otherresearchproduct")) - .union(readResultCommunityList(spark, inputPath + "/software")); + Dataset resultOrcidAssocCommunityList = + readResultCommunityList(spark, inputPath + "/publication") + .union(readResultCommunityList(spark, inputPath + "/dataset")) + .union(readResultCommunityList(spark, inputPath + "/otherresearchproduct")) + .union(readResultCommunityList(spark, inputPath + "/software")); resultOrcidAssocCommunityList .toJavaRDD() .mapToPair(r -> new Tuple2<>(r.getResultId(), r)) - .reduceByKey((a, b) -> { - if (a == null) { - return b; - } - if (b == null) { - return a; - } - Set community_set = new HashSet<>(); + .reduceByKey( + (a, b) -> { + if (a == null) { + return b; + } + if (b == null) { + return a; + } + Set community_set = new HashSet<>(); - a.getCommunityList().stream().forEach(aa -> community_set.add(aa)); + a.getCommunityList().stream().forEach(aa -> community_set.add(aa)); - b.getCommunityList().stream().forEach(aa -> { - if (!community_set.contains(aa)) { - a.getCommunityList().add(aa); - community_set.add(aa); - } - }); - return a; - }) + b.getCommunityList().stream() + .forEach( + aa -> { + if (!community_set.contains(aa)) { + a.getCommunityList().add(aa); + community_set.add(aa); + } + }); + return a; + }) .map(c -> c._2()) .map(r -> OBJECT_MAPPER.writeValueAsString(r)) .saveAsTextFile(outputPath, GzipCodec.class); } - private static Dataset readResultCommunityList(SparkSession spark, String relationPath) { - return spark - .read() + private static Dataset readResultCommunityList( + SparkSession spark, String relationPath) { + return spark.read() .textFile(relationPath) - .map(value -> OBJECT_MAPPER.readValue(value, ResultCommunityList.class), Encoders.bean(ResultCommunityList.class)); + .map( + value -> OBJECT_MAPPER.readValue(value, ResultCommunityList.class), + Encoders.bean(ResultCommunityList.class)); } - - - - - } - diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob4.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob4.java index 0e39090dd..4e72fac27 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob4.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob4.java @@ -1,473 +1,136 @@ package eu.dnetlib.dhp.resulttocommunityfromsemrel; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.QueryInformationSystem; -import eu.dnetlib.dhp.TypedRow; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.*; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import scala.Tuple2; +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.ximpleware.extended.xpath.parser; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; +import eu.dnetlib.dhp.schema.oaf.*; import java.util.*; import java.util.stream.Collectors; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import static eu.dnetlib.dhp.PropagationConstant.*; +public class SparkResultToCommunityThroughSemRelJob4 { + + private static final Logger log = + LoggerFactory.getLogger(SparkResultToCommunityThroughSemRelJob4.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); -public class SparkResultToCommunityThroughSemRelJob3 { public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils - .toString(SparkResultToCommunityThroughSemRelJob3.class - .getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json"))); + String jsonConfiguration = + IOUtils.toString( + SparkResultToCommunityThroughSemRelJob4.class.getResourceAsStream( + "/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_communitytoresult_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - SparkConf conf = new SparkConf(); + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String preparedInfoPath = parser.get("preparedInfoPath"); + log.info("preparedInfoPath: {}", preparedInfoPath); + + SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - final SparkSession spark = SparkSession - .builder() - .appName(SparkResultToCommunityThroughSemRelJob3.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .enableHiveSupport() - .getOrCreate(); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel"; + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); - final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); + final Boolean saveGraph = + Optional.ofNullable(parser.get("saveGraph")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("saveGraph: {}", saveGraph); - final List communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); + Class resultClazz = + (Class) Class.forName(resultClassName); - createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - - - JavaRDD publication_rdd = sc.textFile(inputPath + "/publication") - .map(item -> new ObjectMapper().readValue(item, Publication.class)); - - JavaRDD dataset_rdd = sc.textFile(inputPath + "/dataset") - .map(item -> new ObjectMapper().readValue(item, Dataset.class)); - - JavaRDD orp_rdd = sc.textFile(inputPath + "/otherresearchproduct") - .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); - - JavaRDD software_rdd = sc.textFile(inputPath + "/software") - .map(item -> new ObjectMapper().readValue(item, Software.class)); - - JavaRDD relation_rdd = sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)); - - - org.apache.spark.sql.Dataset publication = spark.createDataset(publication_rdd.rdd(), - Encoders.bean(Publication.class)); - - org.apache.spark.sql.Dataset relation = spark.createDataset(relation_rdd.rdd(), - Encoders.bean(Relation.class)); - - org.apache.spark.sql.Dataset dataset = spark.createDataset(dataset_rdd.rdd(), - Encoders.bean(Dataset.class)); - - org.apache.spark.sql.Dataset other = spark.createDataset(orp_rdd.rdd(), - Encoders.bean(OtherResearchProduct.class)); - - org.apache.spark.sql.Dataset software = spark.createDataset(software_rdd.rdd(), - Encoders.bean(Software.class)); - - - publication.createOrReplaceTempView("publication"); - relation.createOrReplaceTempView("relation"); - dataset.createOrReplaceTempView("dataset"); - software.createOrReplaceTempView("software"); - other.createOrReplaceTempView("other"); - - String communitylist = getConstraintList(" co.id = '", communityIdList); - - String semrellist = getConstraintList(" relClass = '", allowedsemrel ); - - - String query = "Select source, community_context, target " + - "from (select id, collect_set(co.id) community_context " + - "from publication " + - "lateral view explode (context) c as co " + - "where datainfo.deletedbyinference = false "+ communitylist + - " group by id) p " + - "JOIN " + - "(select * " + - "from relation " + - "where datainfo.deletedbyinference = false " + semrellist + ") r " + - "ON p.id = r.source"; - - - org.apache.spark.sql.Dataset publication_context = spark.sql( query); - publication_context.createOrReplaceTempView("publication_context"); - - //( source, (mes, dh-ch-, ni), target ) - query = "select target , collect_set(co) " + - "from (select target, community_context " + - "from publication_context pc join publication p on " + - "p.id = pc.source) tmp " + - "lateral view explode (community_context) c as co " + - "group by target"; - - - - org.apache.spark.sql.Dataset toupdatepublicationreresult = spark.sql(query); - org.apache.spark.sql.Dataset toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software"); - org.apache.spark.sql.Dataset toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset"); - org.apache.spark.sql.Dataset toupdateotherresult = getUpdateCommunitiesForTable(spark, "other"); - - createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - - createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - - createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - - createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - - updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - - updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - - updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - - updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); -// - -/* - JavaPairRDD resultLinkedToCommunities = publication - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - .union(datasets - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"dataset")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - ) - .union(software - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"software")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - ) - .union(other - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"otherresearchproduct")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - ); - - JavaPairRDD to_add_result_communities = resultLinkedToCommunities.join(result_result).map(r -> r._2()._1().setSourceId(r._2()._2().getTargetId())) - .mapToPair(toPair()); - - JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); - - updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - //leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato] - //per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla -*/ - } - - private static org.apache.spark.sql.Dataset getUpdateCommunitiesForTable(SparkSession spark, String table){ - String query = "SELECT target_id, collect_set(co.id) context_id " + - " FROM (SELECT t.id target_id, s.context source_context " + - " FROM context_software s " + - " JOIN " + table + " t " + - " ON s.target = t.id " + - " UNION ALL " + - " SELECT t.id target_id, d.context source_context " + - " FROM dataset_context d " + - " JOIN " + table + " t" + - " ON s.target = t.id " + - " UNION ALL " + - " SELECT t.id target_id, p.context source_context " + - " FROM publication_context p" + - " JOIN " + table +" t " + - " on p.target = t.id " + - " UNION ALL " + - " SELECT t.id target_id, o.context source_context " + - " FROM other_context o " + - " JOIN " + table + " t " + - " ON o.target = t.id) TMP " + - " LATERAL VIEW EXPLODE(source_context) MyT as co " + - " GROUP BY target_id" ; - - return spark.sql(query); - } - - private static JavaRDD createUpdateForResultDatasetWrite(JavaRDD toupdateresult, String outputPath, String type, String class_id, String class_name, List communityIdList){ - return toupdateresult.map(r -> { - List contextList = new ArrayList(); - List toAddContext = r.getList(1); - for (String cId : toAddContext) { - if (communityIdList.contains(cId)) { - Context newContext = new Context(); - newContext.setId(cId); - newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); - contextList.add(newContext); - } - - } - - if (contextList.size() > 0) { - Result ret = new Result(); - ret.setId(r.getString(0)); - ret.setContext(contextList); - return ret; - } - return null; - }).filter(r -> r != null); - } - - private static void updateForSoftwareDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) - .map(r -> (Software) r) - .map(s -> new ObjectMapper().writeValueAsString(s)) - .saveAsTextFile(outputPath + "/" + type); - } - - private static void updateForDatasetDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) - .map( r-> (Dataset)r) - .map(d -> new ObjectMapper().writeValueAsString(d)) - .saveAsTextFile(outputPath + "/" + type); - } - - private static void updateForPublicationDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) - .map(r -> (Publication)r) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/" + type); - } - - private static void updateForOtherDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) - .map( r -> (OtherResearchProduct)r) - .map( o -> new ObjectMapper().writeValueAsString(o)) - .saveAsTextFile(outputPath + "/" + type); - } - - - - private static JavaRDD getUpdateForResultDataset(JavaRDD toupdateresult, JavaPairRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ - return result.leftOuterJoin(toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))) - .map(c -> { - if(! c._2()._2().isPresent()){ - return c._2()._1(); + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + if (isTest(parser)) { + removeOutputDir(spark, outputPath); } - - List toAddContext = c._2()._2().get(); - Set context_set = new HashSet<>(); - for(Object cId: toAddContext){ - String id = (String)cId; - if (communityIdList.contains(id)){ - context_set.add(id); - } + if (saveGraph) { + execPropagation( + spark, inputPath, outputPath, preparedInfoPath, resultClazz); } - for (Context context: c._2()._1().getContext()){ - if(context_set.contains(context)){ - context_set.remove(context); - } - } - - List contextList = context_set.stream().map(co -> { - Context newContext = new Context(); - newContext.setId(co); - newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); - return newContext; - - }).collect(Collectors.toList()); - - if(contextList.size() > 0 ){ - Result r = new Result(); - r.setId(c._1()); - r.setContext(contextList); - return r; - } - return null; - }).filter(r -> r != null); - - -// return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) -// .join(result) -// .map(c -> { -// List toAddContext = c._2()._1(); -// Set context_set = new HashSet<>(); -// for(Object cId: toAddContext){ -// String id = (String)cId; -// if (communityIdList.contains(id)){ -// context_set.add(id); -// } -// } -// for (Context context: c._2()._2().getContext()){ -// if(context_set.contains(context)){ -// context_set.remove(context); -// } -// } -// -// List contextList = context_set.stream().map(co -> { -// Context newContext = new Context(); -// newContext.setId(co); -// newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); -// return newContext; -// -// }).collect(Collectors.toList()); -// -// if(contextList.size() > 0 ){ -// Result r = new Result(); -// r.setId(c._1()); -// r.setContext(contextList); -// return r; -// } -// return null; -// }) -// .filter(r -> r != null); - } - - private static JavaRDD createUpdateForSoftwareDataset(JavaRDD toupdateresult, List communityList, - JavaRDD result, String class_id, String class_name) { - return result - .mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList)) - .map(c -> { - Software oaf = c._2()._1(); - if (c._2()._2().isPresent()) { - - HashSet contexts = new HashSet<>(c._2()._2().get()); - - for (Context context : oaf.getContext()) { - if (contexts.contains(context.getId())){ - if (!context.getDataInfo().stream().map(di -> di.getInferenceprovenance()) - .collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ - context.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)); - //community id already in the context of the result. Remove it from the set that has to be added - contexts.remove(context.getId()); - } - } - } - List cc = oaf.getContext(); - for(String cId: contexts){ - Context context = new Context(); - context.setId(cId); - context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); - cc.add(context); - } - oaf.setContext(cc); - - } - return oaf; }); } - private static JavaPairRDD> getStringResultJavaPairRDD(JavaRDD toupdateresult, List communityList) { - return toupdateresult.mapToPair(c -> { + private static void execPropagation( + SparkSession spark, + String inputPath, + String outputPath, + String preparedInfoPath, + Class resultClazz) { - List contextList = new ArrayList<>(); - List contexts = c.getList(1); - for (String context : contexts) { - if (communityList.contains(context)) { - contextList.add(context); - } - } + org.apache.spark.sql.Dataset possibleUpdates = + readResultCommunityList(spark, preparedInfoPath); + org.apache.spark.sql.Dataset result = readPathEntity(spark, inputPath, resultClazz); - return new Tuple2<>(c.getString(0) ,contextList); - }); - } - - - private static org.apache.spark.sql.Dataset getContext(SparkSession spark, String table){ - String query = "SELECT relation.source, " + table +".context , relation.target " + - "FROM " + table + - " JOIN relation " + - "ON id = source" ; - - return spark.sql(query); - } - - private static Boolean relatedToCommunities(Result r, List communityIdList) { - Set result_communities = r.getContext() - .stream() - .map(c -> c.getId()) - .collect(Collectors.toSet()); - for (String communityId : result_communities) { - if (communityIdList.contains(communityId)) { - return true; - } - } - return false; - } - - private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { - results.leftOuterJoin(toupdateresult) - .map(p -> { - Result r = p._2()._1(); - if (p._2()._2().isPresent()){ - Set communityList = p._2()._2().get().getAccumulator(); - for(Context c: r.getContext()){ - if (communityList.contains(c.getId())){ - //verify if the datainfo for this context contains propagation - if (!c.getDataInfo().stream().map(di -> di.getInferenceprovenance()).collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ - c.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)); - //community id already in the context of the result. Remove it from the set that has to be added - communityList.remove(c.getId()); - } + result.joinWith( + possibleUpdates, + result.col("id").equalTo(possibleUpdates.col("resultId")), + "left_outer") + .map( + value -> { + R ret = value._1(); + Optional rcl = Optional.ofNullable(value._2()); + if (rcl.isPresent()) { + Set context_set = new HashSet<>(); + ret.getContext().stream().forEach(c -> context_set.add(c.getId())); + List contextList = + rcl.get().getCommunityList().stream() + .map( + c -> { + if (!context_set.contains(c)) { + Context newContext = new Context(); + newContext.setId(c); + newContext.setDataInfo( + Arrays.asList( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); + return newContext; + } + return null; + }) + .filter(c -> c != null) + .collect(Collectors.toList()); + Result r = new Result(); + r.setId(ret.getId()); + r.setContext(contextList); + ret.mergeFrom(r); } - } - List cc = r.getContext(); - for(String cId: communityList){ - Context context = new Context(); - context.setId(cId); - context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); - cc.add(context); - } - r.setContext(cc); - } - return r; - }) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath+"/"+type); - } - - - private static TypedRow getTypedRow(List communityIdList, List context, String id, String type) { - Set result_communities = context - .stream() - .map(c -> c.getId()) - .collect(Collectors.toSet()); - TypedRow tp = new TypedRow(); - tp.setSourceId(id); - tp.setType(type); - for (String communityId : result_communities) { - if (communityIdList.contains(communityId)) { - tp.add(communityId); - } - } - if (tp.getAccumulator() != null) { - return tp; - } - return null; + return ret; + }, + Encoders.bean(resultClazz)) + .toJSON() + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .text(outputPath); } }