diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java
index 2ba131e0f..c96423147 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
+import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +51,9 @@ public class OrcidGetUpdatesFile {
final String namenode = parser.get("namenode");
log.info("got variable namenode: {}", namenode);
+ final String master = parser.get("master");
+ log.info("got variable master: {}", master);
+
final String targetPath = parser.get("targetPath");
log.info("got variable targetPath: {}", targetPath);
@@ -59,11 +63,27 @@ public class OrcidGetUpdatesFile {
final String accessToken = parser.get("accessToken");
log.info("got variable accessToken: {}", accessToken);
- System.out.println("namenode = " + namenode);
+ final String graphPath = parser.get("graphPath");
+ log.info("got variable graphPath: {}", graphPath);
+
+ final SparkSession spark = SparkSession
+ .builder()
+ .appName(OrcidGetUpdatesFile.class.getName())
+ .master(master)
+ .getOrCreate();
+
+ final String latestDate = spark
+ .read()
+ .load(graphPath + "/Authors")
+ .selectExpr("max(lastModifiedDate)")
+ .first()
+ .getString(0);
+
+ log.info("latest date is {}", latestDate);
final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(namenode));
- new OrcidGetUpdatesFile().readTar(fileSystem, accessToken, apiURL, targetPath, "2023-09-30");
+ new OrcidGetUpdatesFile().readTar(fileSystem, accessToken, apiURL, targetPath, latestDate);
}
@@ -102,6 +122,8 @@ public class OrcidGetUpdatesFile {
Path hdfsWritePath = new Path("/tmp/orcid_updates.tar.gz");
final FSDataOutputStream fsDataOutputStream = fileSystem.create(hdfsWritePath, true);
IOUtils.copy(input, fsDataOutputStream);
+ fsDataOutputStream.flush();
+ fsDataOutputStream.close();
FSDataInputStream updateFile = fileSystem.open(hdfsWritePath);
TarArchiveInputStream tais = new TarArchiveInputStream(new GzipCompressorInputStream(
new BufferedInputStream(
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/download_orcid_update_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/download_orcid_update_parameter.json
index dbf63a31a..48b37f85d 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/download_orcid_update_parameter.json
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/download_orcid_update_parameter.json
@@ -4,6 +4,12 @@
"paramDescription": "the Name Node URI",
"paramRequired": true
},
+ {
+ "paramName": "m",
+ "paramLongName": "master",
+ "paramDescription": "the master name",
+ "paramRequired": true
+ },
{
"paramName": "t",
"paramLongName": "targetPath",
@@ -16,6 +22,12 @@
"paramDescription": "the URL to download the tar file",
"paramRequired": true
},
+ {
+ "paramName": "g",
+ "paramLongName": "graphPath",
+ "paramDescription": "the path of the input graph",
+ "paramRequired": true
+ },
{
"paramName": "at",
"paramLongName": "accessToken",
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/update/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/update/oozie_app/workflow.xml
index 434d21a0c..e46e303a9 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/update/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/update/oozie_app/workflow.xml
@@ -28,23 +28,35 @@
-
-
-
- oozie.launcher.mapreduce.user.classpath.first
- true
-
-
- eu.dnetlib.dhp.collection.orcid.OrcidGetUpdatesFile
+
+ yarn
+ cluster
+ Check Latest Orcid and Download updates
+ eu.dnetlib.dhp.collection.orcid.OrcidGetUpdatesFile
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.executor.memoryOverhead=2g
+ --conf spark.sql.shuffle.partitions=3000
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --masteryarn
--namenode${nameNode}
+ --graphPath${graphPath}
--targetPath${targetPath}
--apiURL${apiURL}
--accessToken${accessToken}
-
-
+
+
+
yarn
@@ -90,7 +102,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --graphPathgraphPath
+ --graphPath${graphPath}
--updatePath${targetPath}/updateTable
--targetPath${targetPath}/newTable
--masteryarn
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/orcid/SparkApplyUpdate.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/orcid/SparkApplyUpdate.scala
index 4494a2d3d..57cb0e2e4 100644
--- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/orcid/SparkApplyUpdate.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/orcid/SparkApplyUpdate.scala
@@ -19,10 +19,34 @@ class SparkApplyUpdate(propertyPath: String, args: Array[String], log: Logger)
val targetPath: String = parser.get("targetPath")
log.info("found parameters targetPath: {}", targetPath)
applyTableUpdate(spark, graphPath, updatePath, targetPath)
+ checkUpdate(spark, graphPath, targetPath)
+ moveTable(spark, graphPath, targetPath)
}
- def updateDataset(
+ private def moveTable(spark: SparkSession, graphPath: String, updatePath: String): Unit = {
+ spark.read
+ .load(s"$updatePath/Authors")
+ .repartition(1000)
+ .write
+ .mode(SaveMode.Overwrite)
+ .save(s"$graphPath/Authors")
+ spark.read
+ .load(s"$updatePath/Works")
+ .repartition(1000)
+ .write
+ .mode(SaveMode.Overwrite)
+ .save(s"$graphPath/Works")
+ spark.read
+ .load(s"$updatePath/Employments")
+ .repartition(1000)
+ .write
+ .mode(SaveMode.Overwrite)
+ .save(s"$graphPath/Employments")
+
+ }
+
+ private def updateDataset(
inputDataset: DataFrame,
idUpdate: DataFrame,
updateDataframe: DataFrame,
@@ -37,7 +61,28 @@ class SparkApplyUpdate(propertyPath: String, args: Array[String], log: Logger)
.save(targetPath)
}
- def applyTableUpdate(spark: SparkSession, graphPath: String, updatePath: String, targetPath: String) = {
+ private def checkUpdate(spark: SparkSession, graphPath: String, updatePath: String): Unit = {
+ val totalOriginalAuthors = spark.read.load(s"$graphPath/Authors").count
+ val totalOriginalWorks = spark.read.load(s"$graphPath/Works").count
+ val totalOriginalEmployments = spark.read.load(s"$graphPath/Employments").count
+ val totalUpdateAuthors = spark.read.load(s"$updatePath/Authors").count
+ val totalUpdateWorks = spark.read.load(s"$updatePath/Works").count
+ val totalUpdateEmployments = spark.read.load(s"$updatePath/Employments").count
+
+ log.info("totalOriginalAuthors: {}", totalOriginalAuthors)
+ log.info("totalOriginalWorks: {}", totalOriginalWorks)
+ log.info("totalOriginalEmployments: {}", totalOriginalEmployments)
+ log.info("totalUpdateAuthors: {}", totalUpdateAuthors)
+ log.info("totalUpdateWorks: {}", totalUpdateWorks)
+ log.info("totalUpdateEmployments: {}", totalUpdateEmployments)
+ if (
+ totalUpdateAuthors < totalOriginalAuthors || totalUpdateEmployments < totalOriginalEmployments || totalUpdateWorks < totalOriginalWorks
+ )
+ throw new RuntimeException("The updated Graph contains less elements of the original one")
+
+ }
+
+ private def applyTableUpdate(spark: SparkSession, graphPath: String, updatePath: String, targetPath: String): Unit = {
val orcidIDUpdate = spark.read.load(s"$updatePath/Authors").select("orcid")
updateDataset(
spark.read.load(s"$graphPath/Authors"),