From fbf5c27c279fe27dcbf03ec7fe2a5196f7998469 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Sat, 18 Apr 2020 14:09:03 +0200 Subject: [PATCH] Added preparation classes before actual propagation --- .../PrepareResultCommunitySetStep1.java | 152 ++++++++++++ .../PrepareResultCommunitySetStep2.java | 105 ++++++++ ...parkResultToCommunityThroughSemRelJob.java | 25 +- ...arkResultToCommunityThroughSemRelJob2.java | 232 ++++++++++-------- ...arkResultToCommunityThroughSemRelJob3.java | 105 ++++---- ...t_preparecommunitytoresult_parameters.json | 33 +++ 6 files changed, 481 insertions(+), 171 deletions(-) create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java create mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/input_preparecommunitytoresult_parameters.json diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java new file mode 100644 index 000000000..d334e518d --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java @@ -0,0 +1,152 @@ +package eu.dnetlib.dhp.resulttocommunityfromsemrel; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import eu.dnetlib.dhp.QueryInformationSystem; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Result; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; + +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +public class PrepareResultCommunitySetStep1 { + private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySetStep1.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils.toString(PrepareResultCommunitySetStep1.class + .getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_preparecommunitytoresult_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + + final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); + log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel)); + + final String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: {}", isLookupUrl); + + final List communityIdList = QueryInformationSystem.getCommunityList(isLookupUrl); + log.info("communityIdList: {}", new Gson().toJson(communityIdList)); + + final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); + log.info("resultType: {}", resultType); + + + Class resultClazz = (Class) Class.forName(resultClassName); + + + runWithSparkHiveSession(conf, isSparkSessionManaged, + spark -> { + if (isTest(parser)) { + removeOutputDir(spark, outputPath); + } + prepareInfo(spark, inputPath, outputPath, allowedsemrel, resultClazz, resultType, + communityIdList); + }); + } + + private static void prepareInfo(SparkSession spark, String inputPath, String outputPath, + List allowedsemrel, Class resultClazz, String resultType, + List communityIdList) { + //read the relation table and the table related to the result it is using + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + org.apache.spark.sql.Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); + relation.createOrReplaceTempView("relation"); + + log.info("Reading Graph table from: {}", inputPath + "/" + resultType); + Dataset result = readPathEntity(spark, inputPath + "/" + resultType, resultClazz); + + result.createOrReplaceTempView("result"); + + getPossibleResultcommunityAssociation(spark, allowedsemrel, outputPath + "/" + resultType, communityIdList); + + } + + private static void getPossibleResultcommunityAssociation(SparkSession spark, List allowedsemrel, String outputPath, + List communityIdList) { + + String communitylist = getConstraintList(" co.id = '", communityIdList); + String semrellist = getConstraintList(" relClass = '", allowedsemrel ); + + + /* + associates to each result the set of community contexts they are associated to + select id, collect_set(co.id) community_context " + + " from result " + + " lateral view explode (context) c as co " + + " where datainfo.deletedbyinference = false "+ communitylist + + " group by id + + associates to each target of a relation with allowed semantics the set of community context it could possibly + inherit from the source of the relation + */ + String query = "Select target resultId, community_context " + + "from (select id, collect_set(co.id) community_context " + + " from result " + + " lateral view explode (context) c as co " + + " where datainfo.deletedbyinference = false "+ communitylist + + " group by id) p " + + "JOIN " + + "(select source, target " + + "from relation " + + "where datainfo.deletedbyinference = false " + semrellist + ") r " + + "ON p.id = r.source"; + + org.apache.spark.sql.Dataset result_context = spark.sql( query); + result_context.createOrReplaceTempView("result_context"); + + //( target, (mes, dh-ch-, ni)) + /* + a dataset for example could be linked to more than one publication. For each publication linked to that dataset + the previous query will produce a row: targetId set of community context the te=arget could possibly inherit + with the following query there will be a single row for each result linked to more than one result of the result type + currently being used + */ + query = "select resultId , collect_set(co) communityList " + + "from result_context " + + "lateral view explode (community_context) c as co " + + "where length(co) > 0 " + + "group by resultId"; + + spark.sql(query) + .as(Encoders.bean(ResultCommunityList.class)) + .toJavaRDD() + .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + .saveAsTextFile(outputPath, GzipCodec.class); + } +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java new file mode 100644 index 000000000..6efb2205b --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java @@ -0,0 +1,105 @@ +package eu.dnetlib.dhp.resulttocommunityfromsemrel; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.orcidtoresultfromsemrel.ResultOrcidList; +import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; +import scala.Tuple2; + +import java.util.HashSet; +import java.util.Set; + +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + + +public class PrepareResultCommunitySetStep2 { + private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySetStep2.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils.toString(PrepareResultCommunitySetStep2.class + .getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_prepareresulttocommunity2_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, + spark -> { + if (isTest(parser)) { + removeOutputDir(spark, outputPath); + } + mergeInfo(spark, inputPath, outputPath); + }); + + } + + private static void mergeInfo(SparkSession spark, String inputPath, String outputPath) { + + Dataset resultOrcidAssocCommunityList = readResultCommunityList(spark, inputPath + "/publication") + .union(readResultCommunityList(spark, inputPath + "/dataset")) + .union(readResultCommunityList(spark, inputPath + "/otherresearchproduct")) + .union(readResultCommunityList(spark, inputPath + "/software")); + + resultOrcidAssocCommunityList + .toJavaRDD() + .mapToPair(r -> new Tuple2<>(r.getResultId(), r)) + .reduceByKey((a, b) -> { + if (a == null) { + return b; + } + if (b == null) { + return a; + } + Set community_set = new HashSet<>(); + + a.getCommunityList().stream().forEach(aa -> community_set.add(aa)); + + b.getCommunityList().stream().forEach(aa -> { + if (!community_set.contains(aa)) { + a.getCommunityList().add(aa); + community_set.add(aa); + } + }); + return a; + }) + .map(c -> c._2()) + .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + .saveAsTextFile(outputPath, GzipCodec.class); + } + + private static Dataset readResultCommunityList(SparkSession spark, String relationPath) { + return spark + .read() + .textFile(relationPath) + .map(value -> OBJECT_MAPPER.readValue(value, ResultCommunityList.class), Encoders.bean(ResultCommunityList.class)); + } + + + + + +} + diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java index 7739ff99d..f5c859280 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.communitytoresultthroughsemrel; +package eu.dnetlib.dhp.resulttocommunityfromsemrel; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.QueryInformationSystem; @@ -33,7 +33,8 @@ public class SparkResultToCommunityThroughSemRelJob { System.out.println(key + " = " + parser.get(key)); } - /* SparkConf conf = new SparkConf(); + + SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); final SparkSession spark = SparkSession .builder() @@ -45,7 +46,7 @@ public class SparkResultToCommunityThroughSemRelJob { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final String inputPath = parser.get("sourcePath"); - final String outputPath = "/tmp/provision/propagation/communitytoresultthroughsemrel"; + final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel"; //final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); final List allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo"); @@ -104,8 +105,14 @@ public class SparkResultToCommunityThroughSemRelJob { software.createOrReplaceTempView("software"); other.createOrReplaceTempView("other"); - org.apache.spark.sql.Dataset publication_context = getContext(spark, "publication"); - publication_context.createOrReplaceTempView("publication_context"); +// org.apache.spark.sql.Dataset publication_context = getContext(spark, "publication"); +// publication_context.createOrReplaceTempView("publication_context"); + + org.apache.spark.sql.Dataset publication_context = spark.sql( "SELECT relation.source, " + + "publication.context , relation.target " + + "FROM publication " + + " JOIN relation " + + "ON id = source"); org.apache.spark.sql.Dataset dataset_context = getContext(spark, "dataset"); dataset_context.createOrReplaceTempView("dataset_context"); @@ -202,7 +209,7 @@ public class SparkResultToCommunityThroughSemRelJob { */ } - /* private static org.apache.spark.sql.Dataset getUpdateCommunitiesForTable(SparkSession spark, String table){ + private static org.apache.spark.sql.Dataset getUpdateCommunitiesForTable(SparkSession spark, String table){ String query = "SELECT target_id, collect_set(co.id) context_id " + " FROM (SELECT t.id target_id, s.context source_context " + " FROM context_software s " + @@ -223,7 +230,7 @@ public class SparkResultToCommunityThroughSemRelJob { " FROM other_context o " + " JOIN " + table + " t " + " ON o.target = t.id) TMP " + - " LATERAL VIEW EXPLORE(source_context) MyT as co " + + " LATERAL VIEW EXPLODE(source_context) MyT as co " + " GROUP BY target_id" ; return spark.sql(query); @@ -413,7 +420,7 @@ public class SparkResultToCommunityThroughSemRelJob { private static org.apache.spark.sql.Dataset getContext(SparkSession spark, String table){ - String query = "SELECT source, context , target " + + String query = "SELECT relation.source, " + table +".context , relation.target " + "FROM " + table + " JOIN relation " + "ON id = source" ; @@ -484,5 +491,5 @@ public class SparkResultToCommunityThroughSemRelJob { return tp; } return null; - }*/ + } } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob2.java index 2da8d648f..345bd7905 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob2.java @@ -1,6 +1,6 @@ package eu.dnetlib.dhp.resulttocommunityfromsemrel; -import com.cloudera.com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.QueryInformationSystem; import eu.dnetlib.dhp.TypedRow; import eu.dnetlib.dhp.application.ArgumentApplicationParser; @@ -21,24 +21,19 @@ import java.util.stream.Collectors; import static eu.dnetlib.dhp.PropagationConstant.*; -public class SparkResultToCommunityThroughSemRelJob { +public class SparkResultToCommunityThroughSemRelJob2 { public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils - .toString(SparkResultToCommunityThroughSemRelJob.class + .toString(SparkResultToCommunityThroughSemRelJob2.class .getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json"))); parser.parseArgument(args); - for(String key : parser.getObjectMap().keySet()){ - System.out.println(key + " = " + parser.get(key)); - } - - SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); final SparkSession spark = SparkSession .builder() - .appName(SparkResultToCommunityThroughSemRelJob.class.getSimpleName()) + .appName(SparkResultToCommunityThroughSemRelJob2.class.getSimpleName()) .master(parser.get("master")) .config(conf) .enableHiveSupport() @@ -48,128 +43,157 @@ public class SparkResultToCommunityThroughSemRelJob { final String inputPath = parser.get("sourcePath"); final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel"; - //final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); - final List allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo"); - //final List communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); - final List communityIdList = QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp"); + final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); + //final List allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo"); + final List communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); + //final List communityIdList = QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp"); createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - JavaRDD all_publication_rdd = sc.textFile(inputPath + "/publication") - .map(item -> new ObjectMapper().readValue(item, Publication.class)) - .filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); - JavaRDD publication_rdd = all_publication_rdd - .filter(p -> relatedToCommunities(p, communityIdList)).cache(); + JavaRDD publication_rdd = sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, Publication.class)); - JavaRDD all_dataset_rdd = sc.textFile(inputPath + "/dataset") - .map(item -> new ObjectMapper().readValue(item, Dataset.class)) - .filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); - JavaRDD dataset_rdd = all_dataset_rdd - .filter(p -> relatedToCommunities(p, communityIdList)).cache(); - - JavaRDD all_orp_rdd = sc.textFile(inputPath + "/otherresearchproduct") - .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)) - .filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); - JavaRDD orp_rdd = all_orp_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache(); - - JavaRDD all_software_rdd = sc.textFile(inputPath + "/software") - .map(item -> new ObjectMapper().readValue(item, Software.class)) - .filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); - JavaRDD software_rdd = all_software_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache(); + System.out.println(publication_rdd.count()); +// JavaRDD dataset_rdd = sc.textFile(inputPath + "/dataset") +// .map(item -> new ObjectMapper().readValue(item, Dataset.class)); +// +// JavaRDD orp_rdd = sc.textFile(inputPath + "/otherresearchproduct") +// .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); +// +// JavaRDD software_rdd = sc.textFile(inputPath + "/software") +// .map(item -> new ObjectMapper().readValue(item, Software.class)); JavaRDD relation_rdd = sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)) - .filter(r -> !r.getDataInfo().getDeletedbyinference()) - .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache(); + .map(item -> new ObjectMapper().readValue(item, Relation.class)); + + System.out.println(relation_rdd.count()); + +// .filter(r -> !r.getDataInfo().getDeletedbyinference()) +// .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache(); org.apache.spark.sql.Dataset publication = spark.createDataset(publication_rdd.rdd(), Encoders.bean(Publication.class)); - org.apache.spark.sql.Dataset dataset = spark.createDataset(dataset_rdd.rdd(), - Encoders.bean(Dataset.class)); - - org.apache.spark.sql.Dataset other = spark.createDataset(orp_rdd.rdd(), - Encoders.bean(OtherResearchProduct.class)); - - org.apache.spark.sql.Dataset software = spark.createDataset(software_rdd.rdd(), - Encoders.bean(Software.class)); - org.apache.spark.sql.Dataset relation = spark.createDataset(relation_rdd.rdd(), Encoders.bean(Relation.class)); +// org.apache.spark.sql.Dataset dataset = spark.createDataset(dataset_rdd.rdd(), +// Encoders.bean(Dataset.class)); +// +// org.apache.spark.sql.Dataset other = spark.createDataset(orp_rdd.rdd(), +// Encoders.bean(OtherResearchProduct.class)); +// +// org.apache.spark.sql.Dataset software = spark.createDataset(software_rdd.rdd(), +// Encoders.bean(Software.class)); +// +// org.apache.spark.sql.Dataset relation = spark.createDataset(relation_rdd.rdd(), +// Encoders.bean(Relation.class)); + publication.createOrReplaceTempView("publication"); relation.createOrReplaceTempView("relation"); - dataset.createOrReplaceTempView("dataset"); - software.createOrReplaceTempView("software"); - other.createOrReplaceTempView("other"); +// relation.createOrReplaceTempView("relation"); +// dataset.createOrReplaceTempView("dataset"); +// software.createOrReplaceTempView("software"); +// other.createOrReplaceTempView("other"); -// org.apache.spark.sql.Dataset publication_context = getContext(spark, "publication"); -// publication_context.createOrReplaceTempView("publication_context"); + String communitylist = getConstraintList(" co.id = '", communityIdList); - org.apache.spark.sql.Dataset publication_context = spark.sql( "SELECT relation.source, " + - "publication.context , relation.target " + - "FROM publication " + - " JOIN relation " + - "ON id = source"); - - org.apache.spark.sql.Dataset dataset_context = getContext(spark, "dataset"); - dataset_context.createOrReplaceTempView("dataset_context"); - - org.apache.spark.sql.Dataset software_context = getContext(spark, "software"); - software_context.createOrReplaceTempView("software_context"); - - org.apache.spark.sql.Dataset other_context = getContext(spark, "other"); - other_context.createOrReplaceTempView("other_context"); - - publication = spark.createDataset(all_publication_rdd.rdd(), - Encoders.bean(Publication.class)); - publication.createOrReplaceTempView("publication"); - - dataset = spark.createDataset(all_dataset_rdd.rdd(), - Encoders.bean(Dataset.class)); - dataset.createOrReplaceTempView("dataset"); - - other = spark.createDataset(all_orp_rdd.rdd(), - Encoders.bean(OtherResearchProduct.class)); - other.createOrReplaceTempView("other"); - - software = spark.createDataset(all_software_rdd.rdd(), - Encoders.bean(Software.class)); - software.createOrReplaceTempView("software"); + String semrellist = getConstraintList(" relClass = '", allowedsemrel ); - org.apache.spark.sql.Dataset toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software"); - org.apache.spark.sql.Dataset toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset"); - org.apache.spark.sql.Dataset toupdatepublicationreresult = getUpdateCommunitiesForTable(spark, "publication"); - org.apache.spark.sql.Dataset toupdateotherresult = getUpdateCommunitiesForTable(spark, "other"); - - createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - - createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - - createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - - createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + String query = "Select source, community_context, target " + + "from (select id, collect_set(co.id) community_context " + + "from publication " + + "lateral view explode (context) c as co " + + "where datainfo.deletedbyinference = false "+ communitylist + + " group by id) p " + + "JOIN " + + "(select * " + + "from relation " + + "where datainfo.deletedbyinference = false and (relClass = 'isSupplementedBy' OR relClass = 'isSupplementTo')) r " + + "ON p.id = r.source"; - updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + org.apache.spark.sql.Dataset publication_context = spark.sql( query); + publication_context.createOrReplaceTempView("publication_context"); - updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + //( source, (mes, dh-ch-, ni), target ) + query = "select target , collect_set(co) " + + "from (select target, community_context " + + "from publication_context pc join publication p on " + + "p.id = pc.source) tmp " + + "lateral view explode (community_context) c as co " + + "group by target"; - updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + org.apache.spark.sql.Dataset toupdatepublicationreresult = spark.sql(query); + + System.out.println(toupdatepublicationreresult.count()); + + toupdatepublicationreresult.toJavaRDD() + .map(r -> { + TypedRow tp = new TypedRow(); + tp.setSourceId(r.getString(0)); + r.getList(1).stream().forEach(c -> tp.add((String)c)); + return tp; + }) + .map(tr -> new ObjectMapper().writeValueAsString(tr)) + .saveAsTextFile(outputPath + "/community2semrelonpublication"); +// toupdatepublicationreresult.toJavaRDD().flatMap(c -> { +// +// String source = c.getString(0); +// List relation_list = new ArrayList<>(); +// c.getList(1).stream() +// .forEach(res -> { +// Relation r = new Relation(); +// r.setSource(source); +// r.setTarget((String)res); +// r.setRelClass("produces"); +// relation_list.add(r); +// r = new Relation(); +// r.setSource((String)res); +// r.setTarget(source); +// r.setRelClass("isProducedBy"); +// relation_list.add(r); +// }); +// return relation_list.iterator(); +// }).map(tr -> new ObjectMapper().writeValueAsString(tr)) +// .saveAsTextFile(outputPath + "/community2semrel"); +// + +// org.apache.spark.sql.Dataset toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software"); +// org.apache.spark.sql.Dataset toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset"); +// org.apache.spark.sql.Dataset toupdatepublicationreresult = getUpdateCommunitiesForTable(spark, "publication"); +// org.apache.spark.sql.Dataset toupdateotherresult = getUpdateCommunitiesForTable(spark, "other"); + +// createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update", +// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); +// +// createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update", +// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + +// createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update", +// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + +// createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update", +// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); +// +// +// updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset", +// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); +// +// updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct", +// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); +// +// updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software", +// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); +// +// updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication", +// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); +// /* JavaPairRDD resultLinkedToCommunities = publication diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob3.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob3.java index c55c0e8ea..0e39090dd 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob3.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob3.java @@ -1,6 +1,6 @@ package eu.dnetlib.dhp.resulttocommunityfromsemrel; -import com.cloudera.com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.QueryInformationSystem; import eu.dnetlib.dhp.TypedRow; import eu.dnetlib.dhp.application.ArgumentApplicationParser; @@ -21,11 +21,11 @@ import java.util.stream.Collectors; import static eu.dnetlib.dhp.PropagationConstant.*; -public class SparkResultToCommunityThroughSemRelJob2 { +public class SparkResultToCommunityThroughSemRelJob3 { public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils - .toString(SparkResultToCommunityThroughSemRelJob2.class + .toString(SparkResultToCommunityThroughSemRelJob3.class .getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json"))); parser.parseArgument(args); @@ -33,7 +33,7 @@ public class SparkResultToCommunityThroughSemRelJob2 { conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); final SparkSession spark = SparkSession .builder() - .appName(SparkResultToCommunityThroughSemRelJob2.class.getSimpleName()) + .appName(SparkResultToCommunityThroughSemRelJob3.class.getSimpleName()) .master(parser.get("master")) .config(conf) .enableHiveSupport() @@ -44,9 +44,8 @@ public class SparkResultToCommunityThroughSemRelJob2 { final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel"; final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); - //final List allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo"); + final List communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); - //final List communityIdList = QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp"); createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); @@ -54,21 +53,18 @@ public class SparkResultToCommunityThroughSemRelJob2 { JavaRDD publication_rdd = sc.textFile(inputPath + "/publication") .map(item -> new ObjectMapper().readValue(item, Publication.class)); -// JavaRDD dataset_rdd = sc.textFile(inputPath + "/dataset") -// .map(item -> new ObjectMapper().readValue(item, Dataset.class)); -// -// JavaRDD orp_rdd = sc.textFile(inputPath + "/otherresearchproduct") -// .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); -// -// JavaRDD software_rdd = sc.textFile(inputPath + "/software") -// .map(item -> new ObjectMapper().readValue(item, Software.class)); + JavaRDD dataset_rdd = sc.textFile(inputPath + "/dataset") + .map(item -> new ObjectMapper().readValue(item, Dataset.class)); + + JavaRDD orp_rdd = sc.textFile(inputPath + "/otherresearchproduct") + .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); + + JavaRDD software_rdd = sc.textFile(inputPath + "/software") + .map(item -> new ObjectMapper().readValue(item, Software.class)); JavaRDD relation_rdd = sc.textFile(inputPath + "/relation") .map(item -> new ObjectMapper().readValue(item, Relation.class)); -// .filter(r -> !r.getDataInfo().getDeletedbyinference()) -// .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache(); - org.apache.spark.sql.Dataset publication = spark.createDataset(publication_rdd.rdd(), Encoders.bean(Publication.class)); @@ -76,24 +72,21 @@ public class SparkResultToCommunityThroughSemRelJob2 { org.apache.spark.sql.Dataset relation = spark.createDataset(relation_rdd.rdd(), Encoders.bean(Relation.class)); -// org.apache.spark.sql.Dataset dataset = spark.createDataset(dataset_rdd.rdd(), -// Encoders.bean(Dataset.class)); -// -// org.apache.spark.sql.Dataset other = spark.createDataset(orp_rdd.rdd(), -// Encoders.bean(OtherResearchProduct.class)); -// -// org.apache.spark.sql.Dataset software = spark.createDataset(software_rdd.rdd(), -// Encoders.bean(Software.class)); -// -// org.apache.spark.sql.Dataset relation = spark.createDataset(relation_rdd.rdd(), -// Encoders.bean(Relation.class)); + org.apache.spark.sql.Dataset dataset = spark.createDataset(dataset_rdd.rdd(), + Encoders.bean(Dataset.class)); + + org.apache.spark.sql.Dataset other = spark.createDataset(orp_rdd.rdd(), + Encoders.bean(OtherResearchProduct.class)); + + org.apache.spark.sql.Dataset software = spark.createDataset(software_rdd.rdd(), + Encoders.bean(Software.class)); + publication.createOrReplaceTempView("publication"); relation.createOrReplaceTempView("relation"); -// relation.createOrReplaceTempView("relation"); -// dataset.createOrReplaceTempView("dataset"); -// software.createOrReplaceTempView("software"); -// other.createOrReplaceTempView("other"); + dataset.createOrReplaceTempView("dataset"); + software.createOrReplaceTempView("software"); + other.createOrReplaceTempView("other"); String communitylist = getConstraintList(" co.id = '", communityIdList); @@ -109,7 +102,7 @@ public class SparkResultToCommunityThroughSemRelJob2 { "JOIN " + "(select * " + "from relation " + - "where datainfo.deletedbyinference = false and (relClass = 'isSupplementedBy' OR relClass = 'isSupplementTo')) r " + + "where datainfo.deletedbyinference = false " + semrellist + ") r " + "ON p.id = r.source"; @@ -127,37 +120,33 @@ public class SparkResultToCommunityThroughSemRelJob2 { org.apache.spark.sql.Dataset toupdatepublicationreresult = spark.sql(query); + org.apache.spark.sql.Dataset toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software"); + org.apache.spark.sql.Dataset toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset"); + org.apache.spark.sql.Dataset toupdateotherresult = getUpdateCommunitiesForTable(spark, "other"); + createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); -// org.apache.spark.sql.Dataset toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software"); -// org.apache.spark.sql.Dataset toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset"); -// org.apache.spark.sql.Dataset toupdatepublicationreresult = getUpdateCommunitiesForTable(spark, "publication"); -// org.apache.spark.sql.Dataset toupdateotherresult = getUpdateCommunitiesForTable(spark, "other"); - -// createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update", -// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); -// -// createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update", -// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); -// createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update", -// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); -// -// -// updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset", -// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); -// -// updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct", -// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); -// -// updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software", -// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); -// -// updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication", -// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); // /* diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/input_preparecommunitytoresult_parameters.json b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/input_preparecommunitytoresult_parameters.json new file mode 100644 index 000000000..e03b3c6a4 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/input_preparecommunitytoresult_parameters.json @@ -0,0 +1,33 @@ +[ + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName":"ocm", + "paramLongName":"organizationtoresultcommunitymap", + "paramDescription": "the map for the association organization communities", + "paramRequired": true + }, + { + "paramName":"h", + "paramLongName":"hive_metastore_uris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName":"wu", + "paramLongName":"writeUpdate", + "paramDescription": "true if the update must be writte. No double check if information is already present", + "paramRequired": true + }, + { + "paramName":"sg", + "paramLongName":"saveGraph", + "paramDescription": "true if the new version of the graph must be saved", + "paramRequired": true + } + +] \ No newline at end of file