diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java index 7ac1f2de3..fd83f7072 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java @@ -41,18 +41,14 @@ public class ReadCOCI implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - final String hdfsNameNode = parser.get("nameNode"); - log.info("nameNode: {}", hdfsNameNode); - - final String inputPath = parser.get("sourcePath"); - log.info("input path : {}", inputPath); + final String[] inputFile = parser.get("inputFile").split(";"); + log.info("inputFile {}", inputFile.toString()); Boolean isSparkSessionManaged = isSparkSessionManaged(parser); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - Configuration conf = new Configuration(); - conf.set("fs.defaultFS", hdfsNameNode); + final String workingPath = parser.get("workingPath"); + log.info("workingPath {}", workingPath); - FileSystem fileSystem = FileSystem.get(conf); SparkConf sconf = new SparkConf(); final String delimiter = Optional @@ -65,25 +61,20 @@ public class ReadCOCI implements Serializable { spark -> { doRead( spark, - fileSystem, - inputPath, + workingPath, + inputFile, outputPath, delimiter); }); } - public static void doRead(SparkSession spark, FileSystem fileSystem, String inputPath, String outputPath, + private static void doRead(SparkSession spark, String workingPath, String[] inputFiles, + String outputPath, String delimiter) throws IOException { - RemoteIterator iterator = fileSystem - .listFiles( - new Path(inputPath), true); + for(String inputFile : inputFiles){ + String p_string = workingPath + "/" + inputFile ; - while (iterator.hasNext()) { - LocatedFileStatus fileStatus = iterator.next(); - - Path p = fileStatus.getPath(); - String p_string = p.toString(); Dataset cociData = spark .read() .format("csv") @@ -91,7 +82,8 @@ public class ReadCOCI implements Serializable { .option("inferSchema", "true") .option("header", "true") .option("quotes", "\"") - .load(p_string); + .load(p_string) + .repartition(100); cociData.map((MapFunction) row -> { COCI coci = new COCI(); @@ -103,7 +95,7 @@ public class ReadCOCI implements Serializable { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(outputPath + "/" + p_string.substring(p_string.lastIndexOf("/") + 1)); + .json(outputPath + inputFile); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json index 14c20f762..b57cb5d9a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json @@ -1,17 +1,12 @@ [ { - "paramName": "sp", - "paramLongName": "sourcePath", + "paramName": "wp", + "paramLongName": "workingPath", "paramDescription": "the zipped opencitations file", "paramRequired": true }, - { - "paramName": "nn", - "paramLongName": "nameNode", - "paramDescription": "the hdfs name node", - "paramRequired": true - }, + { "paramName": "issm", "paramLongName": "isSparkSessionManaged", @@ -28,7 +23,13 @@ "paramName": "op", "paramLongName": "outputPath", "paramDescription": "the hdfs name node", - "paramRequired": false + "paramRequired": true + }, + { + "paramName": "if", + "paramLongName": "inputFile", + "paramDescription": "the hdfs name node", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml index 7276d2d3e..aee2559ee 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml @@ -82,10 +82,10 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --sourcePath${workingPath}/COCI - --outputPath${workingDir}/COCI - --nameNode${nameNode} + --workingPath${workingPath}/COCI + --outputPath${workingPath}/COCI_JSON --delimiter${delimiter} + --inputFile${inputFileCoci} @@ -108,7 +108,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --inputPath${workingPath}/COCI + --inputPath${workingPath}/COCI_JSON --outputPath${outputPath} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java index e1b9c4d23..27627f9f6 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java @@ -10,6 +10,7 @@ import java.nio.file.Path; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -73,15 +74,53 @@ public class ReadCOCITest { "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") .getPath(); + LocalFileSystem fs = FileSystem.getLocal(new Configuration()); + fs + .copyFromLocalFile( + false, new org.apache.hadoop.fs.Path(getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input1") + .getPath()), + new org.apache.hadoop.fs.Path(workingDir + "/COCI/input1")); + + fs + .copyFromLocalFile( + false, new org.apache.hadoop.fs.Path(getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input2") + .getPath()), + new org.apache.hadoop.fs.Path(workingDir + "/COCI/input2")); + + fs + .copyFromLocalFile( + false, new org.apache.hadoop.fs.Path(getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input3") + .getPath()), + new org.apache.hadoop.fs.Path(workingDir + "/COCI/input3")); + + fs + .copyFromLocalFile( + false, new org.apache.hadoop.fs.Path(getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input4") + .getPath()), + new org.apache.hadoop.fs.Path(workingDir + "/COCI/input4")); + ReadCOCI - .doRead( - spark, FileSystem.getLocal(new Configuration()), inputPath, - workingDir.toString() + "/COCI", DEFAULT_DELIMITER); + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-workingPath", + workingDir.toString() + "/COCI", + "-outputPath", + workingDir.toString() + "/COCI_json/", + "-inputFile", "input1;input2;input3;input4" + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/COCI/*/") + .textFile(workingDir.toString() + "/COCI_json/*/") .map(item -> OBJECT_MAPPER.readValue(item, COCI.class)); Assertions.assertEquals(23, tmp.count());