changed to query for community map just once and save the result for remaining executions

This commit is contained in:
Miriam Baglioni 2020-08-03 17:56:31 +02:00
parent 872d7783fc
commit c892c7dfa7
6 changed files with 320 additions and 1 deletions

View File

@ -0,0 +1,87 @@
package eu.dnetlib.dhp.oa.graph.dump.community;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CommunitySplitS3 implements Serializable {
private static final Logger log = LoggerFactory.getLogger(CommunitySplitS3.class);
public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, String communityMapPath) {
// public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap
// communityMap) {
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
SparkContext sc = spark.sparkContext();
sc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
sc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "AK0MM6C2BYA0K1PNJYYX");
sc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "fpeiqUUpKAUOtO6JWMWLTxxlSxJ+yGYwHozm3jHK");
execSplit(spark, inputPath, outputPath, communityMapPath); // communityMap.keySet());// ,
// inputClazz);
// execSplit(spark, inputPath, outputPath, communityMap.keySet());
});
}
private static void execSplit(SparkSession spark, String inputPath, String outputPath,
String communityMapPath) {
// Set<String> communities) {
Set<String> communities = Utils.getCommunityMap(spark, communityMapPath).keySet();
Dataset<CommunityResult> result = Utils
.readPath(spark, inputPath + "/publication", CommunityResult.class)
.union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class))
.union(Utils.readPath(spark, inputPath + "/orp", CommunityResult.class))
.union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class));
communities
.stream()
.forEach(c -> printResult(c, result, outputPath));
}
private static void printResult(String c, Dataset<CommunityResult> result, String outputPath) {
Dataset<CommunityResult> community_products = result
.filter(r -> containsCommunity(r, c));
if (community_products.count() > 0) {
log.info("Writing dump for community: {} ", c);
community_products
.repartition(1)
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath + "/" + c);
}
}
private static boolean containsCommunity(CommunityResult r, String c) {
if (Optional.ofNullable(r.getContext()).isPresent()) {
return r
.getContext()
.stream()
.filter(con -> con.getCode().equals(c))
.collect(Collectors.toList())
.size() > 0;
}
return false;
}
}

View File

@ -0,0 +1,122 @@
package eu.dnetlib.dhp.oa.graph.dump.community;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.graph.CreateContextEntities;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
public class SaveCommunityMap implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SaveCommunityMap.class);
private final QueryInformationSystem queryInformationSystem;
private final Configuration conf;
private final BufferedWriter writer;
public SaveCommunityMap(String hdfsPath, String hdfsNameNode, String isLookUpUrl) throws IOException {
conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
Path hdfsWritePath = new Path(hdfsPath);
FSDataOutputStream fsDataOutputStream = null;
if (fileSystem.exists(hdfsWritePath)) {
fsDataOutputStream = fileSystem.append(hdfsWritePath);
} else {
fsDataOutputStream = fileSystem.create(hdfsWritePath);
}
queryInformationSystem = new QueryInformationSystem();
queryInformationSystem.setIsLookUp(Utils.getIsLookUpService(isLookUpUrl));
writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
}
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SaveCommunityMap.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_cm_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final String nameNode = parser.get("nameNode");
log.info("nameNode: {}", nameNode);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookUpUrl: {}", isLookUpUrl);
// Boolean isSparkSessionManaged = Optional
// .ofNullable(parser.get("isSparkSessionManaged"))
// .map(Boolean::valueOf)
// .orElse(Boolean.TRUE);
// log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final SaveCommunityMap scm = new SaveCommunityMap(outputPath, nameNode, isLookUpUrl);
scm.saveCommunityMap();
// CommunityMap communityMap = queryInformationSystem.getCommunityMap();
// SparkConf conf = new SparkConf();
//
// runWithSparkSession(
// conf,
// isSparkSessionManaged,
// spark -> {
// Utils.removeOutputDir(spark, outputPath);
//
//// execDump(spark, inputPath, outputPath, communityMap, inputClazz, outputClazz, graph);// ,
// // dumpClazz);
// });
// Configuration conf = new Configuration();
// conf.set("fs.defaultFS", nameNode);
// FileSystem fileSystem = FileSystem.get(conf);
// Path hdfsWritePath = new Path(outputPath);
// FSDataOutputStream fsDataOutputStream = null;
// if (fileSystem.exists(hdfsWritePath)) {
// fsDataOutputStream = fileSystem.append(hdfsWritePath);
// } else {
// fsDataOutputStream = fileSystem.create(hdfsWritePath);
// }
//
// BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
//
// writer.write(OBJECT_MAPPER.writeValueAsString(communityMap));
// writer.close();
}
private void saveCommunityMap() throws ISLookUpException, IOException {
writer.write(Utils.OBJECT_MAPPER.writeValueAsString(queryInformationSystem.getCommunityMap()));
writer.close();
}
}

View File

@ -45,6 +45,8 @@ public class SparkDumpCommunityProducts implements Serializable {
final String resultClassName = parser.get("resultTableName"); final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName); log.info("resultTableName: {}", resultClassName);
String communityMapPath = parser.get("communityMapPath");
final String isLookUpUrl = parser.get("isLookUpUrl"); final String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookUpUrl: {}", isLookUpUrl); log.info("isLookUpUrl: {}", isLookUpUrl);
@ -56,7 +58,15 @@ public class SparkDumpCommunityProducts implements Serializable {
DumpProducts dump = new DumpProducts(); DumpProducts dump = new DumpProducts();
dump.run(isSparkSessionManaged, inputPath, outputPath, communityMap, inputClazz, CommunityResult.class, false); dump
.run(
isSparkSessionManaged, inputPath, outputPath, communityMapPath, inputClazz, CommunityResult.class,
false);
// dump
// .run(
// isSparkSessionManaged, inputPath, outputPath, communityMap, inputClazz, CommunityResult.class,
// false);
} }

View File

@ -0,0 +1,64 @@
package eu.dnetlib.dhp.oa.graph.dump.community;
import java.io.Serializable;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class SparkSplitForCommunityS3 implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkSplitForCommunityS3.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkSplitForCommunityS3.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/split_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String communityMapPath = parser.get("communityMapPath");
final String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookUpUrl: {}", isLookUpUrl);
CommunitySplitS3 split = new CommunitySplitS3();
// CommunityMap communityMap;
// QueryInformationSystem queryInformationSystem = new QueryInformationSystem();
// queryInformationSystem.setIsLookUp(getIsLookUpService(isLookUpUrl));
// communityMap = queryInformationSystem.getCommunityMap();
split.run(isSparkSessionManaged, inputPath, outputPath, communityMapPath);
// split.run(isSparkSessionManaged, inputPath, outputPath, communityMap);
}
public static ISLookUpService getIsLookUpService(String isLookUpUrl) {
return ISLookupClientFactory.getLookUpService(isLookUpUrl);
}
}

View File

@ -0,0 +1,36 @@
[
{
"paramName":"cmp",
"paramLongName":"communityMapPath",
"paramDescription": "the path to the serialization of the community map",
"paramRequired": false
},
{
"paramName":"is",
"paramLongName":"isLookUpUrl",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
},
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
}
]