diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hive/GraphHiveTableExporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hive/GraphHiveTableExporterJob.java new file mode 100644 index 000000000..85db99855 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hive/GraphHiveTableExporterJob.java @@ -0,0 +1,80 @@ + +package eu.dnetlib.dhp.oa.graph.hive; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Oaf; + +public class GraphHiveTableExporterJob { + + private static final Logger log = LoggerFactory.getLogger(GraphHiveTableExporterJob.class); + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + GraphHiveTableExporterJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/hive_db_exporter_parameters.json"))); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(-1); + log.info("numPartitions: {}", numPartitions); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + String hiveTableName = parser.get("hiveTableName"); + log.info("hiveTableName: {}", hiveTableName); + + String hiveMetastoreUris = parser.get("hiveMetastoreUris"); + log.info("hiveMetastoreUris: {}", hiveMetastoreUris); + + String mode = parser.get("mode"); + log.info("mode: {}", mode); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", hiveMetastoreUris); + + runWithSparkHiveSession( + conf, isSparkSessionManaged, + spark -> saveGraphTable(spark, outputPath, hiveTableName, mode, numPartitions)); + } + + // protected for testing + private static void saveGraphTable(SparkSession spark, String outputPath, String hiveTableName, + String mode, int numPartitions) { + + Dataset dataset = spark.table(hiveTableName); + + if (numPartitions > 0) { + log.info("repartitioning to {} partitions", numPartitions); + dataset = dataset.repartition(numPartitions); + } + + dataset + .write() + .mode(mode) + .option("compression", "gzip") + .json(outputPath); + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive_db_exporter_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive_db_exporter_parameters.json new file mode 100644 index 000000000..548d75a3d --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive_db_exporter_parameters.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path to the graph data dump to read", + "paramRequired": true + }, + { + "paramName": "mode", + "paramLongName": "mode", + "paramDescription": "mode (append|overwrite)", + "paramRequired": true + }, + { + "paramName": "hmu", + "paramLongName": "hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName": "db", + "paramLongName": "hiveTableName", + "paramDescription": "the input hive table identifier", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkAppendContextCleanedGraph.scala b/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkAppendContextCleanedGraph.scala index 63c17a74a..a7a77ec32 100644 --- a/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkAppendContextCleanedGraph.scala +++ b/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkAppendContextCleanedGraph.scala @@ -21,19 +21,15 @@ object SparkAppendContextCleanedGraph { val parser = new ArgumentApplicationParser( IOUtils.toString( getClass.getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json" + "/eu/dnetlib/dhp/oa/graph/incremental/export_hive/append_context_cleaned_graph.json" ) ) ) parser.parseArgument(args) conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris")) - val graphBasePath = parser.get("graphBasePath") - log.info(s"graphBasePath -> $graphBasePath") - val relationPath = parser.get("relationPath") - log.info(s"relationPath -> $relationPath") - val targetPath = parser.get("targetGraph") - log.info(s"targetGraph -> $targetPath") + val outputPath = parser.get("outputPath") + log.info(s"outputPath -> $outputPath") val hiveDbName = parser.get("hiveDbName") log.info(s"hiveDbName -> $hiveDbName") @@ -46,7 +42,7 @@ object SparkAppendContextCleanedGraph { .appName(getClass.getSimpleName) .getOrCreate() - for ((entity, clazz) <- ModelSupport.oafTypes.asScala) { + for ((entity, clazz) <- ModelSupport.oafTypes.asScala.filter(t => !Seq("datasource", "organization", "person", "project").contains(t._1))) { if (classOf[OafEntity].isAssignableFrom(clazz)) { val classEnc: Encoder[Oaf] = Encoders.bean(clazz).asInstanceOf[Encoder[Oaf]] @@ -63,8 +59,9 @@ object SparkAppendContextCleanedGraph { c.getDataInfo.asScala .filter( di => - !di.getInferenceprovenance.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE) - && !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE) + di == null || di.getInferenceprovenance == null || + (!di.getInferenceprovenance.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE) + && !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE)) ) .toList .asJava @@ -82,14 +79,14 @@ object SparkAppendContextCleanedGraph { .write .option("compression", "gzip") .mode(SaveMode.Append) - .json(s"$targetPath/${entity}") + .json(s"$outputPath/${entity}") } else { spark .table(s"${hiveDbName}.${entity}") .write .option("compression", "gzip") .mode(SaveMode.Append) - .json(s"$targetPath/${entity}") + .json(s"$outputPath/${entity}") } } } diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/append_context_cleaned_graph.json b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/append_context_cleaned_graph.json new file mode 100644 index 000000000..1ea451b41 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/append_context_cleaned_graph.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path to the graph data dump to read", + "paramRequired": true + }, + { + "paramName": "hmu", + "paramLongName": "hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName": "db", + "paramLongName": "hiveDbName", + "paramDescription": "the input hive database identifier", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/oozie_app/config-default.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/oozie_app/config-default.xml new file mode 100644 index 000000000..9608732ed --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/oozie_app/config-default.xml @@ -0,0 +1,26 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + hiveDbName + openaire + + \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/oozie_app/workflow.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/oozie_app/workflow.xml new file mode 100644 index 000000000..ca64996f0 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/oozie_app/workflow.xml @@ -0,0 +1,63 @@ + + + + + outputPath + the source path + + + hiveDbName + the target hive database name + + + hiveMetastoreUris + hive server metastore URIs + + + + sparkClusterOpts + --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory + spark cluster-wide options + + + sparkResourceOpts + --executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4 + spark resource options + + + sparkApplicationOpts + --conf spark.sql.shuffle.partitions=1024 + spark resource options + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Merge Oaf Entities from hive db + eu.dnetlib.dhp.incremental.SparkAppendContextCleanedGraph + dhp-incremental-graph-${projectVersion}.jar + + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + ${sparkClusterOpts} + ${sparkResourceOpts} + ${sparkApplicationOpts} + + --outputPath${outputPath} + --hiveDbName${hiveDbName} + --hiveMetastoreUris${hiveMetastoreUris} + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/oozie_app/workflow.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/oozie_app/workflow.xml index 15ffc2342..e22d17666 100644 --- a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/oozie_app/workflow.xml @@ -98,10 +98,10 @@ ${nameNode}/${graphBasePath}/otherresearchproduct ${nameNode}/${targetGraph}/otherresearchproduct - + - + ${nameNode}/${graphBasePath}/project