forked from D-Net/dnet-hadoop
Implemented workflow for updating table , added step to check if the new generated table is valid
This commit is contained in:
parent
0386f36385
commit
b32a9d1994
|
@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -50,6 +51,9 @@ public class OrcidGetUpdatesFile {
|
|||
final String namenode = parser.get("namenode");
|
||||
log.info("got variable namenode: {}", namenode);
|
||||
|
||||
final String master = parser.get("master");
|
||||
log.info("got variable master: {}", master);
|
||||
|
||||
final String targetPath = parser.get("targetPath");
|
||||
log.info("got variable targetPath: {}", targetPath);
|
||||
|
||||
|
@ -59,11 +63,27 @@ public class OrcidGetUpdatesFile {
|
|||
final String accessToken = parser.get("accessToken");
|
||||
log.info("got variable accessToken: {}", accessToken);
|
||||
|
||||
System.out.println("namenode = " + namenode);
|
||||
final String graphPath = parser.get("graphPath");
|
||||
log.info("got variable graphPath: {}", graphPath);
|
||||
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName(OrcidGetUpdatesFile.class.getName())
|
||||
.master(master)
|
||||
.getOrCreate();
|
||||
|
||||
final String latestDate = spark
|
||||
.read()
|
||||
.load(graphPath + "/Authors")
|
||||
.selectExpr("max(lastModifiedDate)")
|
||||
.first()
|
||||
.getString(0);
|
||||
|
||||
log.info("latest date is {}", latestDate);
|
||||
|
||||
final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(namenode));
|
||||
|
||||
new OrcidGetUpdatesFile().readTar(fileSystem, accessToken, apiURL, targetPath, "2023-09-30");
|
||||
new OrcidGetUpdatesFile().readTar(fileSystem, accessToken, apiURL, targetPath, latestDate);
|
||||
|
||||
}
|
||||
|
||||
|
@ -102,6 +122,8 @@ public class OrcidGetUpdatesFile {
|
|||
Path hdfsWritePath = new Path("/tmp/orcid_updates.tar.gz");
|
||||
final FSDataOutputStream fsDataOutputStream = fileSystem.create(hdfsWritePath, true);
|
||||
IOUtils.copy(input, fsDataOutputStream);
|
||||
fsDataOutputStream.flush();
|
||||
fsDataOutputStream.close();
|
||||
FSDataInputStream updateFile = fileSystem.open(hdfsWritePath);
|
||||
TarArchiveInputStream tais = new TarArchiveInputStream(new GzipCompressorInputStream(
|
||||
new BufferedInputStream(
|
||||
|
|
|
@ -4,6 +4,12 @@
|
|||
"paramDescription": "the Name Node URI",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "m",
|
||||
"paramLongName": "master",
|
||||
"paramDescription": "the master name",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "t",
|
||||
"paramLongName": "targetPath",
|
||||
|
@ -16,6 +22,12 @@
|
|||
"paramDescription": "the URL to download the tar file",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "g",
|
||||
"paramLongName": "graphPath",
|
||||
"paramDescription": "the path of the input graph",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "at",
|
||||
"paramLongName": "accessToken",
|
||||
|
|
|
@ -28,23 +28,35 @@
|
|||
</kill>
|
||||
|
||||
<action name="startUpdate">
|
||||
<java>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
</configuration>
|
||||
<main-class>eu.dnetlib.dhp.collection.orcid.OrcidGetUpdatesFile</main-class>
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Check Latest Orcid and Download updates</name>
|
||||
<class>eu.dnetlib.dhp.collection.orcid.OrcidGetUpdatesFile</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.executor.memoryOverhead=2g
|
||||
--conf spark.sql.shuffle.partitions=3000
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--master</arg><arg>yarn</arg>
|
||||
<arg>--namenode</arg><arg>${nameNode}</arg>
|
||||
<arg>--graphPath</arg><arg>${graphPath}</arg>
|
||||
<arg>--targetPath</arg><arg>${targetPath}</arg>
|
||||
<arg>--apiURL</arg><arg>${apiURL}</arg>
|
||||
<arg>--accessToken</arg><arg>${accessToken}</arg>
|
||||
|
||||
</java>
|
||||
</spark>
|
||||
<ok to="generateTables"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="generateTables">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
|
@ -90,7 +102,7 @@
|
|||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--graphPath</arg><arg>graphPath</arg>
|
||||
<arg>--graphPath</arg><arg>${graphPath}</arg>
|
||||
<arg>--updatePath</arg><arg>${targetPath}/updateTable</arg>
|
||||
<arg>--targetPath</arg><arg>${targetPath}/newTable</arg>
|
||||
<arg>--master</arg><arg>yarn</arg>
|
||||
|
|
|
@ -19,10 +19,34 @@ class SparkApplyUpdate(propertyPath: String, args: Array[String], log: Logger)
|
|||
val targetPath: String = parser.get("targetPath")
|
||||
log.info("found parameters targetPath: {}", targetPath)
|
||||
applyTableUpdate(spark, graphPath, updatePath, targetPath)
|
||||
checkUpdate(spark, graphPath, targetPath)
|
||||
moveTable(spark, graphPath, targetPath)
|
||||
|
||||
}
|
||||
|
||||
def updateDataset(
|
||||
private def moveTable(spark: SparkSession, graphPath: String, updatePath: String): Unit = {
|
||||
spark.read
|
||||
.load(s"$updatePath/Authors")
|
||||
.repartition(1000)
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(s"$graphPath/Authors")
|
||||
spark.read
|
||||
.load(s"$updatePath/Works")
|
||||
.repartition(1000)
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(s"$graphPath/Works")
|
||||
spark.read
|
||||
.load(s"$updatePath/Employments")
|
||||
.repartition(1000)
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(s"$graphPath/Employments")
|
||||
|
||||
}
|
||||
|
||||
private def updateDataset(
|
||||
inputDataset: DataFrame,
|
||||
idUpdate: DataFrame,
|
||||
updateDataframe: DataFrame,
|
||||
|
@ -37,7 +61,28 @@ class SparkApplyUpdate(propertyPath: String, args: Array[String], log: Logger)
|
|||
.save(targetPath)
|
||||
}
|
||||
|
||||
def applyTableUpdate(spark: SparkSession, graphPath: String, updatePath: String, targetPath: String) = {
|
||||
private def checkUpdate(spark: SparkSession, graphPath: String, updatePath: String): Unit = {
|
||||
val totalOriginalAuthors = spark.read.load(s"$graphPath/Authors").count
|
||||
val totalOriginalWorks = spark.read.load(s"$graphPath/Works").count
|
||||
val totalOriginalEmployments = spark.read.load(s"$graphPath/Employments").count
|
||||
val totalUpdateAuthors = spark.read.load(s"$updatePath/Authors").count
|
||||
val totalUpdateWorks = spark.read.load(s"$updatePath/Works").count
|
||||
val totalUpdateEmployments = spark.read.load(s"$updatePath/Employments").count
|
||||
|
||||
log.info("totalOriginalAuthors: {}", totalOriginalAuthors)
|
||||
log.info("totalOriginalWorks: {}", totalOriginalWorks)
|
||||
log.info("totalOriginalEmployments: {}", totalOriginalEmployments)
|
||||
log.info("totalUpdateAuthors: {}", totalUpdateAuthors)
|
||||
log.info("totalUpdateWorks: {}", totalUpdateWorks)
|
||||
log.info("totalUpdateEmployments: {}", totalUpdateEmployments)
|
||||
if (
|
||||
totalUpdateAuthors < totalOriginalAuthors || totalUpdateEmployments < totalOriginalEmployments || totalUpdateWorks < totalOriginalWorks
|
||||
)
|
||||
throw new RuntimeException("The updated Graph contains less elements of the original one")
|
||||
|
||||
}
|
||||
|
||||
private def applyTableUpdate(spark: SparkSession, graphPath: String, updatePath: String, targetPath: String): Unit = {
|
||||
val orcidIDUpdate = spark.read.load(s"$updatePath/Authors").select("orcid")
|
||||
updateDataset(
|
||||
spark.read.load(s"$graphPath/Authors"),
|
||||
|
|
Loading…
Reference in New Issue