From 7dfb3116612d70ca939efd9ff3f423238277e8c8 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Wed, 30 Oct 2024 11:22:04 +0100 Subject: [PATCH] New CopyEntitiesSparkJob as replacement to distcp for copying intermediate graph data --- .../dhp/oa/merge/CopyEntitiesSparkJob.java | 108 ++++++++++++++++++ .../merge/copy_graph_entities_parameters.json | 32 ++++++ 2 files changed, 140 insertions(+) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/CopyEntitiesSparkJob.java create mode 100644 dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/copy_graph_entities_parameters.json diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/CopyEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/CopyEntitiesSparkJob.java new file mode 100644 index 000000000..ba378c7ea --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/CopyEntitiesSparkJob.java @@ -0,0 +1,108 @@ + +package eu.dnetlib.dhp.oa.merge; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Arrays; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; + +/** + * Copy specified entities from a graph snapshot to another + */ +public class CopyEntitiesSparkJob { + private static final Logger log = LoggerFactory.getLogger(CopyEntitiesSparkJob.class); + + private ArgumentApplicationParser parser; + + public CopyEntitiesSparkJob(ArgumentApplicationParser parser) { + this.parser = parser; + } + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + CopyEntitiesSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/merge/copy_graph_entities_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + new CopyEntitiesSparkJob(parser).run(isSparkSessionManaged); + } + + public void run(Boolean isSparkSessionManaged) + throws ISLookUpException { + + String graphInputPath = parser.get("graphInputPath"); + log.info("graphInputPath: {}", graphInputPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + String entities = parser.get("entities"); + log.info("entities: {}", entities); + + String format = parser.get("format"); + log.info("format: {}", format); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Arrays + .stream(entities.split(",")) + .map(x -> x.trim().toLowerCase()) + .filter(ModelSupport.oafTypes::containsKey) + .forEachOrdered( + entity -> { + switch (format.toLowerCase()) { + case "text": + spark + .read() + .text(graphInputPath + "/" + entity) + .write() + .option("compression", "gzip") + .mode("overwrite") + .text(outputPath + "/" + entity); + break; + case "json": + spark + .read() + .json(graphInputPath + "/" + entity) + .write() + .option("compression", "gzip") + .mode("overwrite") + .json(outputPath + "/" + entity); + break; + case "parquet": + spark + .read() + .parquet(graphInputPath + "/" + entity) + .write() + .option("compression", "gzip") + .mode("overwrite") + .parquet(outputPath + "/" + entity); + break; + } + }); + }); + } +} diff --git a/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/copy_graph_entities_parameters.json b/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/copy_graph_entities_parameters.json new file mode 100644 index 000000000..0617228d1 --- /dev/null +++ b/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/copy_graph_entities_parameters.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "gin", + "paramLongName": "graphInputPath", + "paramDescription": "the input graph root path", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the output graph root path", + "paramRequired": true + }, + { + "paramName": "ent", + "paramLongName": "entities", + "paramDescription": "the output graph root path", + "paramRequired": true + }, + { + "paramName": "fmt", + "paramLongName": "format", + "paramDescription": "the output graph root path", + "paramRequired": true + } +] \ No newline at end of file