[OpenCitations] move the extracted contents under a backup path to avoid needing to re-download it in case of errors

This commit is contained in:
Claudio Atzori 2024-09-25 15:27:02 +02:00
parent c1a309df75
commit 2ba67f08d3
3 changed files with 21 additions and 7 deletions

View File

@ -49,6 +49,9 @@ public class ReadCOCI implements Serializable {
final String workingPath = parser.get("inputPath"); final String workingPath = parser.get("inputPath");
log.info("workingPath {}", workingPath); log.info("workingPath {}", workingPath);
final String backupPath = parser.get("backupPath");
log.info("backupPath {}", backupPath);
SparkConf sconf = new SparkConf(); SparkConf sconf = new SparkConf();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -68,12 +71,14 @@ public class ReadCOCI implements Serializable {
workingPath, workingPath,
fileSystem, fileSystem,
outputPath, outputPath,
backupPath,
delimiter); delimiter);
}); });
} }
private static void doRead(SparkSession spark, String workingPath, FileSystem fileSystem, private static void doRead(SparkSession spark, String workingPath, FileSystem fileSystem,
String outputPath, String outputPath,
String backupPath,
String delimiter) throws IOException { String delimiter) throws IOException {
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
.listFiles( .listFiles(
@ -107,7 +112,8 @@ public class ReadCOCI implements Serializable {
.mode(SaveMode.Append) .mode(SaveMode.Append)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath); .json(outputPath);
fileSystem.rename(fileStatus.getPath(), new Path("/tmp/miriam/OC/DONE"));
fileSystem.rename(fileStatus.getPath(), new Path(backupPath));
} }
} }

View File

@ -24,12 +24,19 @@
"paramLongName": "outputPath", "paramLongName": "outputPath",
"paramDescription": "the hdfs name node", "paramDescription": "the hdfs name node",
"paramRequired": true "paramRequired": true
}, { },
"paramName": "nn", {
"paramLongName": "hdfsNameNode", "paramName": "nn",
"paramDescription": "the hdfs name node", "paramLongName": "hdfsNameNode",
"paramRequired": true "paramDescription": "the hdfs name node",
} "paramRequired": true
},
{
"paramName": "bp",
"paramLongName": "backupPath",
"paramDescription": "the hdfs path to move the OC data after the extraction",
"paramRequired": true
}
] ]

View File

@ -129,6 +129,7 @@
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/Extracted</arg> <arg>--inputPath</arg><arg>${inputPath}/Extracted</arg>
<arg>--outputPath</arg><arg>${inputPath}/JSON</arg> <arg>--outputPath</arg><arg>${inputPath}/JSON</arg>
<arg>--backupPath</arg><arg>${inputPath}/backup</arg>
<arg>--delimiter</arg><arg>${delimiter}</arg> <arg>--delimiter</arg><arg>${delimiter}</arg>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg> <arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
</spark> </spark>