From 62bb82a8efcbfbf885d86dc660278ab91f8f62ff Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Mon, 13 Jan 2025 12:10:03 +0100 Subject: [PATCH] incremental/export_hive job has to clean propagation context --- .../SparkAppendContextCleanedGraph.scala | 21 ++-- .../export_hive/oozie_app/workflow.xml | 111 ++---------------- .../migrate/oozie_app/workflow.xml | 7 +- 3 files changed, 19 insertions(+), 120 deletions(-) diff --git a/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkAppendContextCleanedGraph.scala b/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkAppendContextCleanedGraph.scala index 63c17a74a3..b5dba42694 100644 --- a/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkAppendContextCleanedGraph.scala +++ b/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkAppendContextCleanedGraph.scala @@ -21,19 +21,15 @@ object SparkAppendContextCleanedGraph { val parser = new ArgumentApplicationParser( IOUtils.toString( getClass.getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json" + "/eu/dnetlib/dhp/oa/graph/incremental/export_hive/append_context_cleaned_graph.json" ) ) ) parser.parseArgument(args) conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris")) - val graphBasePath = parser.get("graphBasePath") - log.info(s"graphBasePath -> $graphBasePath") - val relationPath = parser.get("relationPath") - log.info(s"relationPath -> $relationPath") - val targetPath = parser.get("targetGraph") - log.info(s"targetGraph -> $targetPath") + val outputPath = parser.get("outputPath") + log.info(s"outputPath -> $outputPath") val hiveDbName = parser.get("hiveDbName") log.info(s"hiveDbName -> $hiveDbName") @@ -46,7 +42,7 @@ object SparkAppendContextCleanedGraph { .appName(getClass.getSimpleName) .getOrCreate() - for ((entity, clazz) <- ModelSupport.oafTypes.asScala) { + for ((entity, clazz) <- ModelSupport.oafTypes.asScala.filter(t => !Seq("datasource", "organization", "person", "project").contains(t._1))) { if (classOf[OafEntity].isAssignableFrom(clazz)) { val classEnc: Encoder[Oaf] = Encoders.bean(clazz).asInstanceOf[Encoder[Oaf]] @@ -63,8 +59,9 @@ object SparkAppendContextCleanedGraph { c.getDataInfo.asScala .filter( di => - !di.getInferenceprovenance.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE) - && !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE) + di == null || di.getInferenceprovenance == null || + (!di.getInferenceprovenance.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE) + && !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE)) ) .toList .asJava @@ -82,14 +79,14 @@ object SparkAppendContextCleanedGraph { .write .option("compression", "gzip") .mode(SaveMode.Append) - .json(s"$targetPath/${entity}") + .json(s"$outputPath/${entity}") } else { spark .table(s"${hiveDbName}.${entity}") .write .option("compression", "gzip") .mode(SaveMode.Append) - .json(s"$targetPath/${entity}") + .json(s"$outputPath/${entity}") } } } diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/oozie_app/workflow.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/oozie_app/workflow.xml index 428fdf0005..ca64996f0e 100644 --- a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/oozie_app/workflow.xml @@ -31,127 +31,32 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - + yarn cluster - Merge table publication - eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob - dhp-graph-mapper-${projectVersion}.jar + Merge Oaf Entities from hive db + eu.dnetlib.dhp.incremental.SparkAppendContextCleanedGraph + dhp-incremental-graph-${projectVersion}.jar --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} ${sparkClusterOpts} ${sparkResourceOpts} ${sparkApplicationOpts} - --outputPath${outputPath}/publication - --modeappend - --hiveTableName${hiveDbName}.publication + --outputPath${outputPath} + --hiveDbName${hiveDbName} --hiveMetastoreUris${hiveMetastoreUris} - + - - - yarn - cluster - Merge table dataset - eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob - dhp-graph-mapper-${projectVersion}.jar - - --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - ${sparkClusterOpts} - ${sparkResourceOpts} - ${sparkApplicationOpts} - - --outputPath${outputPath}/dataset - --modeappend - --hiveTableName${hiveDbName}.dataset - --hiveMetastoreUris${hiveMetastoreUris} - - - - - - - yarn - cluster - Merge table otherresearchproduct - eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob - dhp-graph-mapper-${projectVersion}.jar - - --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - ${sparkClusterOpts} - ${sparkResourceOpts} - ${sparkApplicationOpts} - - --outputPath${outputPath}/otherresearchproduct - --modeappend - --hiveTableName${hiveDbName}.otherresearchproduct - --hiveMetastoreUris${hiveMetastoreUris} - - - - - - - yarn - cluster - Merge table software - eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob - dhp-graph-mapper-${projectVersion}.jar - - --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - ${sparkClusterOpts} - ${sparkResourceOpts} - ${sparkApplicationOpts} - - --outputPath${outputPath}/software - --modeappend - --hiveTableName${hiveDbName}.software - --hiveMetastoreUris${hiveMetastoreUris} - - - - - - - yarn - cluster - Merge table relation - eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob - dhp-graph-mapper-${projectVersion}.jar - - --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - ${sparkClusterOpts} - ${sparkResourceOpts} - ${sparkApplicationOpts} - - --outputPath${outputPath}/relation - --modeappend - --hiveTableName${hiveDbName}.relation - --hiveMetastoreUris${hiveMetastoreUris} - - - - - - diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/workflow.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/workflow.xml index 67532ec668..85e398d9b1 100644 --- a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/workflow.xml @@ -15,13 +15,10 @@ hiveDbName hive database containing last generated graph - - action - sparkClusterOpts - --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory + --conf spark.network.timeout=600 --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory spark cluster-wide options @@ -56,7 +53,7 @@ ${sparkApplicationOpts} --hiveMetastoreUris${hiveMetastoreUris} - --sqleu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/${action}.sql + --sqleu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/migration.sql --hiveDbName${hiveDbName} --outputPath${outputPath}