openCitations #194

Merged
miriam.baglioni merged 6 commits from openCitations into beta 2022-02-14 14:58:28 +01:00
4 changed files with 70 additions and 38 deletions
Showing only changes of commit b071f8e415 - Show all commits

View File

@ -41,18 +41,14 @@ public class ReadCOCI implements Serializable {
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String hdfsNameNode = parser.get("nameNode");
log.info("nameNode: {}", hdfsNameNode);
final String inputPath = parser.get("sourcePath");
log.info("input path : {}", inputPath);
final String[] inputFile = parser.get("inputFile").split(";");
log.info("inputFile {}", inputFile.toString());
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
final String workingPath = parser.get("workingPath");
log.info("workingPath {}", workingPath);
FileSystem fileSystem = FileSystem.get(conf);
SparkConf sconf = new SparkConf();
final String delimiter = Optional
@ -65,25 +61,20 @@ public class ReadCOCI implements Serializable {
spark -> {
doRead(
spark,
fileSystem,
inputPath,
workingPath,
inputFile,
outputPath,
delimiter);
});
}
public static void doRead(SparkSession spark, FileSystem fileSystem, String inputPath, String outputPath,
private static void doRead(SparkSession spark, String workingPath, String[] inputFiles,
String outputPath,
String delimiter) throws IOException {
RemoteIterator<LocatedFileStatus> iterator = fileSystem
.listFiles(
new Path(inputPath), true);
for(String inputFile : inputFiles){
String p_string = workingPath + "/" + inputFile ;
while (iterator.hasNext()) {
LocatedFileStatus fileStatus = iterator.next();
Path p = fileStatus.getPath();
String p_string = p.toString();
Dataset<Row> cociData = spark
.read()
.format("csv")
@ -91,7 +82,8 @@ public class ReadCOCI implements Serializable {
.option("inferSchema", "true")
.option("header", "true")
.option("quotes", "\"")
.load(p_string);
.load(p_string)
.repartition(100);
cociData.map((MapFunction<Row, COCI>) row -> {
COCI coci = new COCI();
@ -103,7 +95,7 @@ public class ReadCOCI implements Serializable {
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + p_string.substring(p_string.lastIndexOf("/") + 1));
.json(outputPath + inputFile);
}
}

View File

@ -1,17 +1,12 @@
[
{
"paramName": "sp",
"paramLongName": "sourcePath",
"paramName": "wp",
"paramLongName": "workingPath",
"paramDescription": "the zipped opencitations file",
"paramRequired": true
},
{
"paramName": "nn",
"paramLongName": "nameNode",
"paramDescription": "the hdfs name node",
"paramRequired": true
},
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
@ -28,7 +23,13 @@
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "the hdfs name node",
"paramRequired": false
"paramRequired": true
},
{
"paramName": "if",
"paramLongName": "inputFile",
"paramDescription": "the hdfs name node",
"paramRequired": true
}
]

View File

@ -82,10 +82,10 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingPath}/COCI</arg>
<arg>--outputPath</arg><arg>${workingDir}/COCI</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--workingPath</arg><arg>${workingPath}/COCI</arg>
<arg>--outputPath</arg><arg>${workingPath}/COCI_JSON</arg>
<arg>--delimiter</arg><arg>${delimiter}</arg>
<arg>--inputFile</arg><arg>${inputFileCoci}</arg>
</spark>
<ok to="create_actionset"/>
<error to="Kill"/>
@ -108,7 +108,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/COCI</arg>
<arg>--inputPath</arg><arg>${workingPath}/COCI_JSON</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
</spark>
<ok to="End"/>

View File

@ -10,6 +10,7 @@ import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@ -73,15 +74,53 @@ public class ReadCOCITest {
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles")
.getPath();
LocalFileSystem fs = FileSystem.getLocal(new Configuration());
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input1")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input1"));
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input2")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input2"));
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input3")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input3"));
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input4")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input4"));
ReadCOCI
.doRead(
spark, FileSystem.getLocal(new Configuration()), inputPath,
workingDir.toString() + "/COCI", DEFAULT_DELIMITER);
.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-workingPath",
workingDir.toString() + "/COCI",
"-outputPath",
workingDir.toString() + "/COCI_json/",
"-inputFile", "input1;input2;input3;input4"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<COCI> tmp = sc
.textFile(workingDir.toString() + "/COCI/*/")
.textFile(workingDir.toString() + "/COCI_json/*/")
.map(item -> OBJECT_MAPPER.readValue(item, COCI.class));
Assertions.assertEquals(23, tmp.count());