diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java index be653aed21..6a779f6c4b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java @@ -46,6 +46,9 @@ public class GetOpenCitationsRefs implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath {}", outputPath); + final String backupPath = parser.get("backupPath"); + log.info("backupPath {}", backupPath); + Configuration conf = new Configuration(); conf.set("fs.defaultFS", hdfsNameNode); @@ -53,11 +56,11 @@ public class GetOpenCitationsRefs implements Serializable { GetOpenCitationsRefs ocr = new GetOpenCitationsRefs(); - ocr.doExtract(inputPath, outputPath, fileSystem); + ocr.doExtract(inputPath, outputPath, backupPath, fileSystem); } - private void doExtract(String inputPath, String outputPath, FileSystem fileSystem) + private void doExtract(String inputPath, String outputPath, String backupPath, FileSystem fileSystem) throws IOException { RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem @@ -89,6 +92,7 @@ public class GetOpenCitationsRefs implements Serializable { } } + fileSystem.rename(fileStatus.getPath(), new Path(backupPath)); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java index 4b0bbf145b..de45d50b20 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java @@ -49,9 +49,6 @@ public class ReadCOCI implements Serializable { final String workingPath = parser.get("inputPath"); log.info("workingPath {}", workingPath); - final String backupPath = parser.get("backupPath"); - log.info("backupPath {}", backupPath); - SparkConf sconf = new SparkConf(); Configuration conf = new Configuration(); @@ -71,14 +68,12 @@ public class ReadCOCI implements Serializable { workingPath, fileSystem, outputPath, - backupPath, delimiter); }); } private static void doRead(SparkSession spark, String workingPath, FileSystem fileSystem, String outputPath, - String backupPath, String delimiter) throws IOException { RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem .listFiles( @@ -113,7 +108,7 @@ public class ReadCOCI implements Serializable { .option("compression", "gzip") .json(outputPath); - fileSystem.rename(fileStatus.getPath(), new Path(backupPath)); + fileSystem.delete(fileStatus.getPath()); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json index f4b6e2d685..10225f367b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json @@ -16,5 +16,11 @@ "paramLongName": "hdfsNameNode", "paramDescription": "the hdfs name node", "paramRequired": true + }, + { + "paramName": "bp", + "paramLongName": "backupPath", + "paramDescription": "the hdfs path to move the OC data after the extraction", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json index d1f495d678..f3d72e0632 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json @@ -30,12 +30,6 @@ "paramLongName": "hdfsNameNode", "paramDescription": "the hdfs name node", "paramRequired": true - }, - { - "paramName": "bp", - "paramLongName": "backupPath", - "paramDescription": "the hdfs path to move the OC data after the extraction", - "paramRequired": true } ] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml index f170af96fe..bb6a0eb21f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml @@ -94,17 +94,7 @@ <arg>--hdfsNameNode</arg><arg>${nameNode}</arg> <arg>--inputPath</arg><arg>${inputPath}/Original</arg> <arg>--outputPath</arg><arg>${inputPath}/Extracted</arg> - </java> - <ok to="read"/> - <error to="Kill"/> - </action> - - <action name="extract_correspondence"> - <java> - <main-class>eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs</main-class> - <arg>--hdfsNameNode</arg><arg>${nameNode}</arg> - <arg>--inputPath</arg><arg>${inputPath}/correspondence</arg> - <arg>--outputPath</arg><arg>${inputPath}/correspondence_extracted</arg> + <arg>--backupPath</arg><arg>${inputPath}/backup</arg> </java> <ok to="read"/> <error to="Kill"/> @@ -129,7 +119,6 @@ </spark-opts> <arg>--inputPath</arg><arg>${inputPath}/Extracted</arg> <arg>--outputPath</arg><arg>${inputPath}/JSON</arg> - <arg>--backupPath</arg><arg>${inputPath}/backup</arg> <arg>--delimiter</arg><arg>${delimiter}</arg> <arg>--hdfsNameNode</arg><arg>${nameNode}</arg> </spark>