added DownloadCSV2 as alternative implementation of the same download procedure

This commit is contained in:
Claudio Atzori 2021-08-13 15:52:15 +02:00
parent 5f0903d50d
commit f74adc4752
5 changed files with 214 additions and 113 deletions

View File

@ -7,16 +7,15 @@ import java.nio.charset.StandardCharsets;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import eu.dnetlib.dhp.common.collection.CollectorException;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.GetCSV; import eu.dnetlib.dhp.common.collection.GetCSV;
import eu.dnetlib.dhp.common.collection.HttpConnector2; import eu.dnetlib.dhp.common.collection.HttpConnector2;
@ -68,7 +67,8 @@ public class DownloadCSV {
} }
protected void doDownload(String fileURL, String workingPath, String outputFile, String classForName, char delimiter, FileSystem fs) protected void doDownload(String fileURL, String workingPath, String outputFile, String classForName,
char delimiter, FileSystem fs)
throws IOException, ClassNotFoundException, CollectorException { throws IOException, ClassNotFoundException, CollectorException {
final HttpConnector2 connector2 = new HttpConnector2(); final HttpConnector2 connector2 = new HttpConnector2();
@ -78,11 +78,11 @@ public class DownloadCSV {
try (BufferedReader in = new BufferedReader( try (BufferedReader in = new BufferedReader(
new InputStreamReader(connector2.getInputSourceAsStream(fileURL)))) { new InputStreamReader(connector2.getInputSourceAsStream(fileURL)))) {
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fs.create(path, true), Charset.defaultCharset()))) { try (PrintWriter writer = new PrintWriter(
new OutputStreamWriter(fs.create(path, true), StandardCharsets.UTF_8))) {
String line; String line;
while ((line = in.readLine()) != null) { while ((line = in.readLine()) != null) {
writer.write(line.replace("\\\"", "\"")); writer.println(line.replace("\\\"", "\""));
writer.newLine();
} }
} }
} }

View File

@ -0,0 +1,84 @@
package eu.dnetlib.dhp.oa.graph.hostedbymap;
import java.io.*;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.collection.GetCSV;
import eu.dnetlib.dhp.common.collection.HttpConnector2;
public class DownloadCSV2 {
private static final Logger log = LoggerFactory.getLogger(DownloadCSV2.class);
public static final char DEFAULT_DELIMITER = ';';
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
Objects
.requireNonNull(
DownloadCSV2.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/hostedbymap/download_csv_parameters.json"))));
parser.parseArgument(args);
final String fileURL = parser.get("fileURL");
log.info("fileURL {}", fileURL);
final String tmpFile = parser.get("tmpFile");
log.info("tmpFile {}", tmpFile);
final String outputFile = parser.get("outputFile");
log.info("outputFile {}", outputFile);
final String hdfsNameNode = parser.get("hdfsNameNode");
log.info("hdfsNameNode {}", hdfsNameNode);
final String classForName = parser.get("classForName");
log.info("classForName {}", classForName);
final char delimiter = Optional
.ofNullable(parser.get("delimiter"))
.map(s -> s.charAt(0))
.orElse(DEFAULT_DELIMITER);
log.info("delimiter {}", delimiter);
HttpConnector2 connector2 = new HttpConnector2();
try (BufferedReader in = new BufferedReader(
new InputStreamReader(connector2.getInputSourceAsStream(fileURL)))) {
try (PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(tmpFile)))) {
String line;
while ((line = in.readLine()) != null) {
writer.println(line.replace("\\\"", "\""));
}
}
}
try (BufferedReader in = new BufferedReader(new FileReader(tmpFile))) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
GetCSV.getCsv(fileSystem, in, outputFile, classForName, delimiter);
} finally {
FileUtils.deleteQuietly(new File(tmpFile));
}
}
}

View File

@ -6,9 +6,9 @@
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName":"wp", "paramName":"tf",
"paramLongName":"workingPath", "paramLongName":"tmpFile",
"paramDescription": "the path where to find the pre-processed data for unibi gold list and doj artciles", "paramDescription": "the temporary local file storing the cleaned CSV contents for unibi gold list and doj artciles",
"paramRequired": true "paramRequired": true
}, },
{ {

View File

@ -73,7 +73,8 @@
<decision name="resume_from"> <decision name="resume_from">
<switch> <switch>
<case to="remove_hbmpath">${wf:conf('resumeFrom') eq 'ProduceHBM'}</case> <case to="produceHBM">${wf:conf('resumeFrom') eq 'ProduceHBM'}</case>
<case to="remove_hbmpath">${wf:conf('resumeFrom') eq 'download_csv'}</case>
<default to="prepareInfo"/> <default to="prepareInfo"/>
</switch> </switch>
</decision> </decision>
@ -98,10 +99,10 @@
<action name="download_gold"> <action name="download_gold">
<java> <java>
<main-class>eu.dnetlib.dhp.oa.graph.hostedbymap.DownloadCSV</main-class> <main-class>eu.dnetlib.dhp.oa.graph.hostedbymap.DownloadCSV2</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg> <arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--fileURL</arg><arg>${unibiFileURL}</arg> <arg>--fileURL</arg><arg>${unibiFileURL}</arg>
<arg>--workingPath</arg><arg>${workingDir}/unibi_gold</arg> <arg>--tmpFile</arg><arg>/tmp/unibi_gold_replaced.csv</arg>
<arg>--outputFile</arg><arg>${workingDir}/unibi_gold.json</arg> <arg>--outputFile</arg><arg>${workingDir}/unibi_gold.json</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.oa.graph.hostedbymap.model.UnibiGoldModel</arg> <arg>--classForName</arg><arg>eu.dnetlib.dhp.oa.graph.hostedbymap.model.UnibiGoldModel</arg>
</java> </java>
@ -111,10 +112,10 @@
<action name="download_doaj"> <action name="download_doaj">
<java> <java>
<main-class>eu.dnetlib.dhp.oa.graph.hostedbymap.DownloadCSV</main-class> <main-class>eu.dnetlib.dhp.oa.graph.hostedbymap.DownloadCSV2</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg> <arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--fileURL</arg><arg>${doajFileURL}</arg> <arg>--fileURL</arg><arg>${doajFileURL}</arg>
<arg>--workingPath</arg><arg>${workingDir}/doaj</arg> <arg>--tmpFile</arg><arg>/tmp/doaj_replaced.csv</arg>
<arg>--outputFile</arg><arg>${workingDir}/doaj.json</arg> <arg>--outputFile</arg><arg>${workingDir}/doaj.json</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.oa.graph.hostedbymap.model.DOAJModel</arg> <arg>--classForName</arg><arg>eu.dnetlib.dhp.oa.graph.hostedbymap.model.DOAJModel</arg>
</java> </java>
@ -141,7 +142,7 @@
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts> </spark-opts>
<arg>--datasourcePath</arg><arg>${sourcePath}/datasource</arg> <arg>--datasourcePath</arg><arg>${sourcePath}/datasource</arg>
<arg>--workingPath</arg><arg>${workingDir}</arg> <arg>--workingPath</arg><arg>/user/${wf:user()}/data</arg>
<arg>--outputPath</arg><arg>${hostedByMapPath}</arg> <arg>--outputPath</arg><arg>${hostedByMapPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg> <arg>--master</arg><arg>yarn-cluster</arg>
</spark> </spark>

View File

@ -1,25 +1,37 @@
package eu.dnetlib.dhp.oa.graph.hostedbymap; package eu.dnetlib.dhp.oa.graph.hostedbymap;
import com.fasterxml.jackson.databind.ObjectMapper; import static org.junit.jupiter.api.Assertions.assertEquals;
import eu.dnetlib.dhp.common.collection.CollectorException; import static org.junit.jupiter.api.Assertions.assertTrue;
import eu.dnetlib.dhp.common.collection.GetCSV;
import eu.dnetlib.dhp.common.collection.HttpConnector2; import java.io.BufferedReader;
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.DOAJModel; import java.io.File;
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.UnibiGoldModel; import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*; import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.file.Files;
import static org.junit.jupiter.api.Assertions.assertTrue; import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.DOAJModel;
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.UnibiGoldModel;
public class DownloadCsvTest { public class DownloadCsvTest {
private static final Logger log = LoggerFactory.getLogger(DownloadCsvTest.class);
private static String workingDir; private static String workingDir;
private static LocalFileSystem fs; private static LocalFileSystem fs;
@ -40,7 +52,8 @@ public class DownloadCsvTest {
String fileURL = "https://pub.uni-bielefeld.de/download/2944717/2944718/issn_gold_oa_version_4.csv"; String fileURL = "https://pub.uni-bielefeld.de/download/2944717/2944718/issn_gold_oa_version_4.csv";
final String outputFile = workingDir + "/unibi_gold.json"; final String outputFile = workingDir + "/unibi_gold.json";
new DownloadCSV().doDownload( new DownloadCSV()
.doDownload(
fileURL, fileURL,
workingDir + "/unibi_gold", workingDir + "/unibi_gold",
outputFile, outputFile,
@ -75,7 +88,7 @@ public class DownloadCsvTest {
count += 1; count += 1;
} }
Assertions.assertEquals(67028, count); assertEquals(67028, count);
} }
@Disabled @Disabled
@ -85,7 +98,8 @@ public class DownloadCsvTest {
String fileURL = "https://doaj.org/csv"; String fileURL = "https://doaj.org/csv";
final String outputFile = workingDir + "/doaj.json"; final String outputFile = workingDir + "/doaj.json";
new DownloadCSV().doDownload( new DownloadCSV()
.doDownload(
fileURL, fileURL,
workingDir + "/doaj", workingDir + "/doaj",
outputFile, outputFile,
@ -100,29 +114,31 @@ public class DownloadCsvTest {
while ((line = in.readLine()) != null) { while ((line = in.readLine()) != null) {
DOAJModel doaj = new ObjectMapper().readValue(line, DOAJModel.class); DOAJModel doaj = new ObjectMapper().readValue(line, DOAJModel.class);
if (count == 0) { if (count == 0) {
Assertions.assertEquals("0001-3765", doaj.getIssn()); assertEquals("0001-3765", doaj.getIssn());
Assertions.assertEquals("1678-2690", doaj.getEissn()); assertEquals("1678-2690", doaj.getEissn());
Assertions.assertEquals("Anais da Academia Brasileira de Ciências", doaj.getJournalTitle()); assertEquals("Anais da Academia Brasileira de Ciências", doaj.getJournalTitle());
}
if (count == 22) {
log.info(new ObjectMapper().writeValueAsString(doaj));
System.out.println(new ObjectMapper().writeValueAsString(doaj));
} }
if (count == 7904) { if (count == 7904) {
System.out.println(new ObjectMapper().writeValueAsString(doaj)); // log.info(new ObjectMapper().writeValueAsString(doaj));
Assertions.assertEquals("", doaj.getIssn()); assertEquals("", doaj.getIssn());
Assertions.assertEquals("2055-7159", doaj.getEissn()); assertEquals("2055-7159", doaj.getEissn());
Assertions.assertEquals("BJR|case reports", doaj.getJournalTitle()); assertEquals("BJR|case reports", doaj.getJournalTitle());
} }
if (count == 16707) { if (count == 16707) {
Assertions.assertEquals("", doaj.getIssn()); assertEquals("2783-1043", doaj.getIssn());
Assertions.assertEquals("2788-6298", doaj.getEissn()); assertEquals("2783-1051", doaj.getEissn());
Assertions assertEquals("فیزیک کاربردی ایران", doaj.getJournalTitle());
.assertEquals("Teacher Education through Flexible Learning in Africa", doaj.getJournalTitle());
} }
count += 1; count += 1;
} }
Assertions.assertEquals(16713, count); assertEquals(16715, count);
} }
@AfterAll @AfterAll