From c892c7dfa7ef27cc5668fe0acde21cb0c63bc763 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 3 Aug 2020 17:56:31 +0200 Subject: [PATCH] changed to query for community map just once and save the result for remaining executions --- .../dump/community/CommunitySplitS3.java | 87 +++++++++++++ .../dump/community/SaveCommunityMap.java | 122 ++++++++++++++++++ .../community/SparkDumpCommunityProducts.java | 12 +- .../community/SparkSplitForCommunityS3.java | 64 +++++++++ .../oa/graph/dump/input_cm_parameters.json | 36 ++++++ .../dump/communityMapPath/communitymap.json | 0 6 files changed, 320 insertions(+), 1 deletion(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplitS3.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SaveCommunityMap.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunityS3.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/input_cm_parameters.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplitS3.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplitS3.java new file mode 100644 index 000000000..440e08c33 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplitS3.java @@ -0,0 +1,87 @@ + +package eu.dnetlib.dhp.oa.graph.dump.community; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; + +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CommunitySplitS3 implements Serializable { + private static final Logger log = LoggerFactory.getLogger(CommunitySplitS3.class); + public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, String communityMapPath) { + // public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap + // communityMap) { + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + SparkContext sc = spark.sparkContext(); + sc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem"); + sc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "AK0MM6C2BYA0K1PNJYYX"); + sc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "fpeiqUUpKAUOtO6JWMWLTxxlSxJ+yGYwHozm3jHK"); + execSplit(spark, inputPath, outputPath, communityMapPath); // communityMap.keySet());// , + // inputClazz); + // execSplit(spark, inputPath, outputPath, communityMap.keySet()); + }); + } + + private static void execSplit(SparkSession spark, String inputPath, String outputPath, + String communityMapPath) { + // Set communities) { + + Set communities = Utils.getCommunityMap(spark, communityMapPath).keySet(); + + Dataset result = Utils + .readPath(spark, inputPath + "/publication", CommunityResult.class) + .union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class)) + .union(Utils.readPath(spark, inputPath + "/orp", CommunityResult.class)) + .union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); + + communities + .stream() + .forEach(c -> printResult(c, result, outputPath)); + + } + + private static void printResult(String c, Dataset result, String outputPath) { + Dataset community_products = result + .filter(r -> containsCommunity(r, c)); + + if (community_products.count() > 0) { + log.info("Writing dump for community: {} ", c); + community_products + .repartition(1) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath + "/" + c); + } + + } + + private static boolean containsCommunity(CommunityResult r, String c) { + if (Optional.ofNullable(r.getContext()).isPresent()) { + return r + .getContext() + .stream() + .filter(con -> con.getCode().equals(c)) + .collect(Collectors.toList()) + .size() > 0; + } + return false; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SaveCommunityMap.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SaveCommunityMap.java new file mode 100644 index 000000000..87f7dfe61 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SaveCommunityMap.java @@ -0,0 +1,122 @@ + +package eu.dnetlib.dhp.oa.graph.dump.community; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.SparkConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.oa.graph.dump.graph.CreateContextEntities; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; + +public class SaveCommunityMap implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SaveCommunityMap.class); + private final QueryInformationSystem queryInformationSystem; + + private final Configuration conf; + private final BufferedWriter writer; + + public SaveCommunityMap(String hdfsPath, String hdfsNameNode, String isLookUpUrl) throws IOException { + conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + FileSystem fileSystem = FileSystem.get(conf); + Path hdfsWritePath = new Path(hdfsPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fsDataOutputStream = fileSystem.append(hdfsWritePath); + } else { + fsDataOutputStream = fileSystem.create(hdfsWritePath); + } + + queryInformationSystem = new QueryInformationSystem(); + queryInformationSystem.setIsLookUp(Utils.getIsLookUpService(isLookUpUrl)); + + writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); + + } + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SaveCommunityMap.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/input_cm_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + final String nameNode = parser.get("nameNode"); + log.info("nameNode: {}", nameNode); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String isLookUpUrl = parser.get("isLookUpUrl"); + log.info("isLookUpUrl: {}", isLookUpUrl); + +// Boolean isSparkSessionManaged = Optional +// .ofNullable(parser.get("isSparkSessionManaged")) +// .map(Boolean::valueOf) +// .orElse(Boolean.TRUE); +// log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final SaveCommunityMap scm = new SaveCommunityMap(outputPath, nameNode, isLookUpUrl); + + scm.saveCommunityMap(); + + // CommunityMap communityMap = queryInformationSystem.getCommunityMap(); + +// SparkConf conf = new SparkConf(); +// +// runWithSparkSession( +// conf, +// isSparkSessionManaged, +// spark -> { +// Utils.removeOutputDir(spark, outputPath); +// +//// execDump(spark, inputPath, outputPath, communityMap, inputClazz, outputClazz, graph);// , +// // dumpClazz); +// }); + +// Configuration conf = new Configuration(); +// conf.set("fs.defaultFS", nameNode); +// FileSystem fileSystem = FileSystem.get(conf); +// Path hdfsWritePath = new Path(outputPath); +// FSDataOutputStream fsDataOutputStream = null; +// if (fileSystem.exists(hdfsWritePath)) { +// fsDataOutputStream = fileSystem.append(hdfsWritePath); +// } else { +// fsDataOutputStream = fileSystem.create(hdfsWritePath); +// } +// +// BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); +// +// writer.write(OBJECT_MAPPER.writeValueAsString(communityMap)); +// writer.close(); + } + + private void saveCommunityMap() throws ISLookUpException, IOException { + writer.write(Utils.OBJECT_MAPPER.writeValueAsString(queryInformationSystem.getCommunityMap())); + writer.close(); + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkDumpCommunityProducts.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkDumpCommunityProducts.java index 71d3e6bf4..419d078d7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkDumpCommunityProducts.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkDumpCommunityProducts.java @@ -45,6 +45,8 @@ public class SparkDumpCommunityProducts implements Serializable { final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); + String communityMapPath = parser.get("communityMapPath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); log.info("isLookUpUrl: {}", isLookUpUrl); @@ -56,7 +58,15 @@ public class SparkDumpCommunityProducts implements Serializable { DumpProducts dump = new DumpProducts(); - dump.run(isSparkSessionManaged, inputPath, outputPath, communityMap, inputClazz, CommunityResult.class, false); + dump + .run( + isSparkSessionManaged, inputPath, outputPath, communityMapPath, inputClazz, CommunityResult.class, + false); + +// dump +// .run( +// isSparkSessionManaged, inputPath, outputPath, communityMap, inputClazz, CommunityResult.class, +// false); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunityS3.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunityS3.java new file mode 100644 index 000000000..68c832d63 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunityS3.java @@ -0,0 +1,64 @@ + +package eu.dnetlib.dhp.oa.graph.dump.community; + +import java.io.Serializable; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; + +public class SparkSplitForCommunityS3 implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkSplitForCommunityS3.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkSplitForCommunityS3.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/split_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String communityMapPath = parser.get("communityMapPath"); + + final String isLookUpUrl = parser.get("isLookUpUrl"); + log.info("isLookUpUrl: {}", isLookUpUrl); + + CommunitySplitS3 split = new CommunitySplitS3(); + + // CommunityMap communityMap; + +// QueryInformationSystem queryInformationSystem = new QueryInformationSystem(); +// queryInformationSystem.setIsLookUp(getIsLookUpService(isLookUpUrl)); +// communityMap = queryInformationSystem.getCommunityMap(); + + split.run(isSparkSessionManaged, inputPath, outputPath, communityMapPath); + // split.run(isSparkSessionManaged, inputPath, outputPath, communityMap); + + } + + public static ISLookUpService getIsLookUpService(String isLookUpUrl) { + return ISLookupClientFactory.getLookUpService(isLookUpUrl); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/input_cm_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/input_cm_parameters.json new file mode 100644 index 000000000..b19be2f52 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/input_cm_parameters.json @@ -0,0 +1,36 @@ + +[ + { + "paramName":"cmp", + "paramLongName":"communityMapPath", + "paramDescription": "the path to the serialization of the community map", + "paramRequired": false + }, + { + "paramName":"is", + "paramLongName":"isLookUpUrl", + "paramDescription": "URL of the isLookUp Service", + "paramRequired": true + }, + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + { + "paramName": "ssm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false + } +] + + + diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json new file mode 100644 index 000000000..e69de29bb