diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkCopyGraph.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkCopyGraph.java new file mode 100644 index 0000000..77ff618 --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkCopyGraph.java @@ -0,0 +1,92 @@ + +package eu.dnetlib.dhp.oa.graph.dump; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Result; +import scala.Tuple2; + +/** + * @author miriam.baglioni + * @Date 22/09/23 + */ +public class SparkCopyGraph implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkCopyGraph.class); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + SparkCopyGraph.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/copygraph_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String hivePath = parser.get("hivePath"); + log.info("hivePath: {}", hivePath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> + + execCopy( + spark, + hivePath, + outputPath)); + } + + private static void execCopy(SparkSession spark, String hivePath, String outputPath) { + + ModelSupport.oafTypes.entrySet().parallelStream().forEach(entry -> { + String entityType = entry.getKey(); + Class clazz = entry.getValue(); + // if (!entityType.equalsIgnoreCase("relation")) { + spark + .read() + .schema(Encoders.bean(clazz).schema()) + .parquet(hivePath + "/" + entityType) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/" + entityType); + + }); + + } + +} diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/copygraph/oozie_app/config-default.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/copygraph/oozie_app/config-default.xml new file mode 100644 index 0000000..e5ec3d0 --- /dev/null +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/copygraph/oozie_app/config-default.xml @@ -0,0 +1,30 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + hiveDbName + openaire + + + oozie.launcher.mapreduce.user.classpath.first + true + + \ No newline at end of file diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/copygraph/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/copygraph/oozie_app/workflow.xml new file mode 100644 index 0000000..47420ed --- /dev/null +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/copygraph/oozie_app/workflow.xml @@ -0,0 +1,110 @@ + + + + + outputPath + the output path + + + hivePath + the country for which to produce the dump + + + hiveDbName + the target hive database name + + + hiveJdbcUrl + hive server jdbc url + + + hiveMetastoreUris + hive server metastore URIs + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + yarn + cluster + Copy graph + eu.dnetlib.dhp.oa.graph.dump.SparkCopyGraph + dump-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --hivePath${hivePath} + --outputPath${outputPath} + + + + + + + \ No newline at end of file diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/copygraph_parameters.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/copygraph_parameters.json new file mode 100644 index 0000000..cf76eb0 --- /dev/null +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/copygraph_parameters.json @@ -0,0 +1,21 @@ +[ + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + { + "paramName":"hp", + "paramLongName":"hivePath", + "paramDescription": "the name of the result table we are currently working on", + "paramRequired": true + }, + { + "paramName":"issm", + "paramLongName":"isSparkSessionManaged", + "paramDescription": "the name of the result table we are currently working on", + "paramRequired": false +} +] + diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/main/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/main/oozie_app/workflow.xml index 32226fc..165afad 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/main/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/main/oozie_app/workflow.xml @@ -175,6 +175,8 @@ + + ${wf:conf('dumpType') eq "funder"}