From b32a9d1994f331c1e00f0d703dc8538df85be5c6 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 23 Feb 2024 10:04:28 +0100 Subject: [PATCH] Implemented workflow for updating table , added step to check if the new generated table is valid --- .../collection/orcid/OrcidGetUpdatesFile.java | 26 +++++++++- .../download_orcid_update_parameter.json | 12 +++++ .../orcid/update/oozie_app/workflow.xml | 34 ++++++++----- .../collection/orcid/SparkApplyUpdate.scala | 49 ++++++++++++++++++- 4 files changed, 106 insertions(+), 15 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java index 2ba131e0f0..c964231472 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +51,9 @@ public class OrcidGetUpdatesFile { final String namenode = parser.get("namenode"); log.info("got variable namenode: {}", namenode); + final String master = parser.get("master"); + log.info("got variable master: {}", master); + final String targetPath = parser.get("targetPath"); log.info("got variable targetPath: {}", targetPath); @@ -59,11 +63,27 @@ public class OrcidGetUpdatesFile { final String accessToken = parser.get("accessToken"); log.info("got variable accessToken: {}", accessToken); - System.out.println("namenode = " + namenode); + final String graphPath = parser.get("graphPath"); + log.info("got variable graphPath: {}", graphPath); + + final SparkSession spark = SparkSession + .builder() + .appName(OrcidGetUpdatesFile.class.getName()) + .master(master) + .getOrCreate(); + + final String latestDate = spark + .read() + .load(graphPath + "/Authors") + .selectExpr("max(lastModifiedDate)") + .first() + .getString(0); + + log.info("latest date is {}", latestDate); final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(namenode)); - new OrcidGetUpdatesFile().readTar(fileSystem, accessToken, apiURL, targetPath, "2023-09-30"); + new OrcidGetUpdatesFile().readTar(fileSystem, accessToken, apiURL, targetPath, latestDate); } @@ -102,6 +122,8 @@ public class OrcidGetUpdatesFile { Path hdfsWritePath = new Path("/tmp/orcid_updates.tar.gz"); final FSDataOutputStream fsDataOutputStream = fileSystem.create(hdfsWritePath, true); IOUtils.copy(input, fsDataOutputStream); + fsDataOutputStream.flush(); + fsDataOutputStream.close(); FSDataInputStream updateFile = fileSystem.open(hdfsWritePath); TarArchiveInputStream tais = new TarArchiveInputStream(new GzipCompressorInputStream( new BufferedInputStream( diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/download_orcid_update_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/download_orcid_update_parameter.json index dbf63a31a4..48b37f85df 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/download_orcid_update_parameter.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/download_orcid_update_parameter.json @@ -4,6 +4,12 @@ "paramDescription": "the Name Node URI", "paramRequired": true }, + { + "paramName": "m", + "paramLongName": "master", + "paramDescription": "the master name", + "paramRequired": true + }, { "paramName": "t", "paramLongName": "targetPath", @@ -16,6 +22,12 @@ "paramDescription": "the URL to download the tar file", "paramRequired": true }, + { + "paramName": "g", + "paramLongName": "graphPath", + "paramDescription": "the path of the input graph", + "paramRequired": true + }, { "paramName": "at", "paramLongName": "accessToken", diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/update/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/update/oozie_app/workflow.xml index 434d21a0c2..e46e303a95 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/update/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/update/oozie_app/workflow.xml @@ -28,23 +28,35 @@ - - - - oozie.launcher.mapreduce.user.classpath.first - true - - - eu.dnetlib.dhp.collection.orcid.OrcidGetUpdatesFile + + yarn + cluster + Check Latest Orcid and Download updates + eu.dnetlib.dhp.collection.orcid.OrcidGetUpdatesFile + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=2g + --conf spark.sql.shuffle.partitions=3000 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn --namenode${nameNode} + --graphPath${graphPath} --targetPath${targetPath} --apiURL${apiURL} --accessToken${accessToken} - - + + + yarn @@ -90,7 +102,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --graphPathgraphPath + --graphPath${graphPath} --updatePath${targetPath}/updateTable --targetPath${targetPath}/newTable --masteryarn diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/orcid/SparkApplyUpdate.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/orcid/SparkApplyUpdate.scala index 4494a2d3db..57cb0e2e45 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/orcid/SparkApplyUpdate.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/orcid/SparkApplyUpdate.scala @@ -19,10 +19,34 @@ class SparkApplyUpdate(propertyPath: String, args: Array[String], log: Logger) val targetPath: String = parser.get("targetPath") log.info("found parameters targetPath: {}", targetPath) applyTableUpdate(spark, graphPath, updatePath, targetPath) + checkUpdate(spark, graphPath, targetPath) + moveTable(spark, graphPath, targetPath) } - def updateDataset( + private def moveTable(spark: SparkSession, graphPath: String, updatePath: String): Unit = { + spark.read + .load(s"$updatePath/Authors") + .repartition(1000) + .write + .mode(SaveMode.Overwrite) + .save(s"$graphPath/Authors") + spark.read + .load(s"$updatePath/Works") + .repartition(1000) + .write + .mode(SaveMode.Overwrite) + .save(s"$graphPath/Works") + spark.read + .load(s"$updatePath/Employments") + .repartition(1000) + .write + .mode(SaveMode.Overwrite) + .save(s"$graphPath/Employments") + + } + + private def updateDataset( inputDataset: DataFrame, idUpdate: DataFrame, updateDataframe: DataFrame, @@ -37,7 +61,28 @@ class SparkApplyUpdate(propertyPath: String, args: Array[String], log: Logger) .save(targetPath) } - def applyTableUpdate(spark: SparkSession, graphPath: String, updatePath: String, targetPath: String) = { + private def checkUpdate(spark: SparkSession, graphPath: String, updatePath: String): Unit = { + val totalOriginalAuthors = spark.read.load(s"$graphPath/Authors").count + val totalOriginalWorks = spark.read.load(s"$graphPath/Works").count + val totalOriginalEmployments = spark.read.load(s"$graphPath/Employments").count + val totalUpdateAuthors = spark.read.load(s"$updatePath/Authors").count + val totalUpdateWorks = spark.read.load(s"$updatePath/Works").count + val totalUpdateEmployments = spark.read.load(s"$updatePath/Employments").count + + log.info("totalOriginalAuthors: {}", totalOriginalAuthors) + log.info("totalOriginalWorks: {}", totalOriginalWorks) + log.info("totalOriginalEmployments: {}", totalOriginalEmployments) + log.info("totalUpdateAuthors: {}", totalUpdateAuthors) + log.info("totalUpdateWorks: {}", totalUpdateWorks) + log.info("totalUpdateEmployments: {}", totalUpdateEmployments) + if ( + totalUpdateAuthors < totalOriginalAuthors || totalUpdateEmployments < totalOriginalEmployments || totalUpdateWorks < totalOriginalWorks + ) + throw new RuntimeException("The updated Graph contains less elements of the original one") + + } + + private def applyTableUpdate(spark: SparkSession, graphPath: String, updatePath: String, targetPath: String): Unit = { val orcidIDUpdate = spark.read.load(s"$updatePath/Authors").select("orcid") updateDataset( spark.read.load(s"$graphPath/Authors"),