diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCSV.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCSV.java
index be35d31f85..dff761c34e 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCSV.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCSV.java
@@ -7,16 +7,15 @@ import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
-import eu.dnetlib.dhp.common.collection.CollectorException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.GetCSV;
import eu.dnetlib.dhp.common.collection.HttpConnector2;
@@ -68,8 +67,9 @@ public class DownloadCSV {
}
- protected void doDownload(String fileURL, String workingPath, String outputFile, String classForName, char delimiter, FileSystem fs)
- throws IOException, ClassNotFoundException, CollectorException {
+ protected void doDownload(String fileURL, String workingPath, String outputFile, String classForName,
+ char delimiter, FileSystem fs)
+ throws IOException, ClassNotFoundException, CollectorException {
final HttpConnector2 connector2 = new HttpConnector2();
@@ -78,11 +78,11 @@ public class DownloadCSV {
try (BufferedReader in = new BufferedReader(
new InputStreamReader(connector2.getInputSourceAsStream(fileURL)))) {
- try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fs.create(path, true), Charset.defaultCharset()))) {
+ try (PrintWriter writer = new PrintWriter(
+ new OutputStreamWriter(fs.create(path, true), StandardCharsets.UTF_8))) {
String line;
while ((line = in.readLine()) != null) {
- writer.write(line.replace("\\\"", "\""));
- writer.newLine();
+ writer.println(line.replace("\\\"", "\""));
}
}
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCSV2.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCSV2.java
new file mode 100644
index 0000000000..d82d008629
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCSV2.java
@@ -0,0 +1,84 @@
+
+package eu.dnetlib.dhp.oa.graph.hostedbymap;
+
+import java.io.*;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.collection.GetCSV;
+import eu.dnetlib.dhp.common.collection.HttpConnector2;
+
+public class DownloadCSV2 {
+
+ private static final Logger log = LoggerFactory.getLogger(DownloadCSV2.class);
+
+ public static final char DEFAULT_DELIMITER = ';';
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ Objects
+ .requireNonNull(
+ DownloadCSV2.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/graph/hostedbymap/download_csv_parameters.json"))));
+
+ parser.parseArgument(args);
+
+ final String fileURL = parser.get("fileURL");
+ log.info("fileURL {}", fileURL);
+
+ final String tmpFile = parser.get("tmpFile");
+ log.info("tmpFile {}", tmpFile);
+
+ final String outputFile = parser.get("outputFile");
+ log.info("outputFile {}", outputFile);
+
+ final String hdfsNameNode = parser.get("hdfsNameNode");
+ log.info("hdfsNameNode {}", hdfsNameNode);
+
+ final String classForName = parser.get("classForName");
+ log.info("classForName {}", classForName);
+
+ final char delimiter = Optional
+ .ofNullable(parser.get("delimiter"))
+ .map(s -> s.charAt(0))
+ .orElse(DEFAULT_DELIMITER);
+ log.info("delimiter {}", delimiter);
+
+ HttpConnector2 connector2 = new HttpConnector2();
+
+ try (BufferedReader in = new BufferedReader(
+ new InputStreamReader(connector2.getInputSourceAsStream(fileURL)))) {
+
+ try (PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(tmpFile)))) {
+ String line;
+ while ((line = in.readLine()) != null) {
+ writer.println(line.replace("\\\"", "\""));
+ }
+ }
+ }
+
+ try (BufferedReader in = new BufferedReader(new FileReader(tmpFile))) {
+ Configuration conf = new Configuration();
+ conf.set("fs.defaultFS", hdfsNameNode);
+
+ FileSystem fileSystem = FileSystem.get(conf);
+
+ GetCSV.getCsv(fileSystem, in, outputFile, classForName, delimiter);
+ } finally {
+ FileUtils.deleteQuietly(new File(tmpFile));
+ }
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/download_csv_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/download_csv_parameters.json
index 22c65eb359..50fbb00f0f 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/download_csv_parameters.json
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/download_csv_parameters.json
@@ -6,9 +6,9 @@
"paramRequired": true
},
{
- "paramName":"wp",
- "paramLongName":"workingPath",
- "paramDescription": "the path where to find the pre-processed data for unibi gold list and doj artciles",
+ "paramName":"tf",
+ "paramLongName":"tmpFile",
+ "paramDescription": "the temporary local file storing the cleaned CSV contents for unibi gold list and doj artciles",
"paramRequired": true
},
{
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/oozie_app/workflow.xml
index a03f4e36ae..84035fe4ef 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/oozie_app/workflow.xml
@@ -73,7 +73,8 @@
- ${wf:conf('resumeFrom') eq 'ProduceHBM'}
+ ${wf:conf('resumeFrom') eq 'ProduceHBM'}
+ ${wf:conf('resumeFrom') eq 'download_csv'}
@@ -98,10 +99,10 @@
- eu.dnetlib.dhp.oa.graph.hostedbymap.DownloadCSV
+ eu.dnetlib.dhp.oa.graph.hostedbymap.DownloadCSV2
--hdfsNameNode${nameNode}
--fileURL${unibiFileURL}
- --workingPath${workingDir}/unibi_gold
+ --tmpFile/tmp/unibi_gold_replaced.csv
--outputFile${workingDir}/unibi_gold.json
--classForNameeu.dnetlib.dhp.oa.graph.hostedbymap.model.UnibiGoldModel
@@ -111,10 +112,10 @@
- eu.dnetlib.dhp.oa.graph.hostedbymap.DownloadCSV
+ eu.dnetlib.dhp.oa.graph.hostedbymap.DownloadCSV2
--hdfsNameNode${nameNode}
--fileURL${doajFileURL}
- --workingPath${workingDir}/doaj
+ --tmpFile/tmp/doaj_replaced.csv
--outputFile${workingDir}/doaj.json
--classForNameeu.dnetlib.dhp.oa.graph.hostedbymap.model.DOAJModel
@@ -141,7 +142,7 @@
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--datasourcePath${sourcePath}/datasource
- --workingPath${workingDir}
+ --workingPath/user/${wf:user()}/data
--outputPath${hostedByMapPath}
--masteryarn-cluster
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCsvTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCsvTest.java
index 7b02025fb2..edf74fc6a6 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCsvTest.java
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCsvTest.java
@@ -1,133 +1,149 @@
+
package eu.dnetlib.dhp.oa.graph.hostedbymap;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import eu.dnetlib.dhp.common.collection.CollectorException;
-import eu.dnetlib.dhp.common.collection.GetCSV;
-import eu.dnetlib.dhp.common.collection.HttpConnector2;
-import eu.dnetlib.dhp.oa.graph.hostedbymap.model.DOAJModel;
-import eu.dnetlib.dhp.oa.graph.hostedbymap.model.UnibiGoldModel;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
-import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.*;
-import java.nio.file.Files;
+import com.fasterxml.jackson.databind.ObjectMapper;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import eu.dnetlib.dhp.common.collection.CollectorException;
+import eu.dnetlib.dhp.oa.graph.hostedbymap.model.DOAJModel;
+import eu.dnetlib.dhp.oa.graph.hostedbymap.model.UnibiGoldModel;
public class DownloadCsvTest {
- private static String workingDir;
+ private static final Logger log = LoggerFactory.getLogger(DownloadCsvTest.class);
- private static LocalFileSystem fs;
+ private static String workingDir;
- @BeforeAll
- public static void beforeAll() throws IOException {
- workingDir = Files
- .createTempDirectory(DownloadCsvTest.class.getSimpleName())
- .toString();
+ private static LocalFileSystem fs;
- fs = FileSystem.getLocal(new Configuration());
- }
+ @BeforeAll
+ public static void beforeAll() throws IOException {
+ workingDir = Files
+ .createTempDirectory(DownloadCsvTest.class.getSimpleName())
+ .toString();
- @Disabled
- @Test
- void getUnibiFileTest() throws CollectorException, IOException, ClassNotFoundException {
+ fs = FileSystem.getLocal(new Configuration());
+ }
- String fileURL = "https://pub.uni-bielefeld.de/download/2944717/2944718/issn_gold_oa_version_4.csv";
+ @Disabled
+ @Test
+ void getUnibiFileTest() throws CollectorException, IOException, ClassNotFoundException {
- final String outputFile = workingDir + "/unibi_gold.json";
- new DownloadCSV().doDownload(
- fileURL,
- workingDir + "/unibi_gold",
- outputFile,
- UnibiGoldModel.class.getName(),
- ',',
- fs);
+ String fileURL = "https://pub.uni-bielefeld.de/download/2944717/2944718/issn_gold_oa_version_4.csv";
- BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(outputFile))));
+ final String outputFile = workingDir + "/unibi_gold.json";
+ new DownloadCSV()
+ .doDownload(
+ fileURL,
+ workingDir + "/unibi_gold",
+ outputFile,
+ UnibiGoldModel.class.getName(),
+ ',',
+ fs);
- String line;
- int count = 0;
- while ((line = in.readLine()) != null) {
- UnibiGoldModel unibi = new ObjectMapper().readValue(line, UnibiGoldModel.class);
- if (count == 0) {
- assertTrue(unibi.getIssn().equals("0001-625X"));
- assertTrue(unibi.getIssnL().equals("0001-625X"));
- assertTrue(unibi.getTitle().equals("Acta Mycologica"));
+ BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(outputFile))));
- }
- if (count == 43158) {
- assertTrue(unibi.getIssn().equals("2088-6330"));
- assertTrue(unibi.getIssnL().equals("2088-6330"));
- assertTrue(unibi.getTitle().equals("Religió: Jurnal Studi Agama-agama"));
+ String line;
+ int count = 0;
+ while ((line = in.readLine()) != null) {
+ UnibiGoldModel unibi = new ObjectMapper().readValue(line, UnibiGoldModel.class);
+ if (count == 0) {
+ assertTrue(unibi.getIssn().equals("0001-625X"));
+ assertTrue(unibi.getIssnL().equals("0001-625X"));
+ assertTrue(unibi.getTitle().equals("Acta Mycologica"));
- }
- if (count == 67027) {
- assertTrue(unibi.getIssn().equals("2658-7068"));
- assertTrue(unibi.getIssnL().equals("2308-2488"));
- assertTrue(unibi.getTitle().equals("Istoriko-èkonomičeskie issledovaniâ."));
- }
+ }
+ if (count == 43158) {
+ assertTrue(unibi.getIssn().equals("2088-6330"));
+ assertTrue(unibi.getIssnL().equals("2088-6330"));
+ assertTrue(unibi.getTitle().equals("Religió: Jurnal Studi Agama-agama"));
- count += 1;
- }
+ }
+ if (count == 67027) {
+ assertTrue(unibi.getIssn().equals("2658-7068"));
+ assertTrue(unibi.getIssnL().equals("2308-2488"));
+ assertTrue(unibi.getTitle().equals("Istoriko-èkonomičeskie issledovaniâ."));
+ }
- Assertions.assertEquals(67028, count);
- }
+ count += 1;
+ }
- @Disabled
- @Test
- void getDoajFileTest() throws CollectorException, IOException, ClassNotFoundException {
+ assertEquals(67028, count);
+ }
- String fileURL = "https://doaj.org/csv";
+ @Disabled
+ @Test
+ void getDoajFileTest() throws CollectorException, IOException, ClassNotFoundException {
- final String outputFile = workingDir + "/doaj.json";
- new DownloadCSV().doDownload(
- fileURL,
- workingDir + "/doaj",
- outputFile,
- DOAJModel.class.getName(),
- ',',
- fs);
+ String fileURL = "https://doaj.org/csv";
- BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(outputFile))));
+ final String outputFile = workingDir + "/doaj.json";
+ new DownloadCSV()
+ .doDownload(
+ fileURL,
+ workingDir + "/doaj",
+ outputFile,
+ DOAJModel.class.getName(),
+ ',',
+ fs);
- String line;
- int count = 0;
- while ((line = in.readLine()) != null) {
- DOAJModel doaj = new ObjectMapper().readValue(line, DOAJModel.class);
- if (count == 0) {
- Assertions.assertEquals("0001-3765", doaj.getIssn());
- Assertions.assertEquals("1678-2690", doaj.getEissn());
- Assertions.assertEquals("Anais da Academia Brasileira de Ciências", doaj.getJournalTitle());
+ BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(outputFile))));
- }
- if (count == 7904) {
- System.out.println(new ObjectMapper().writeValueAsString(doaj));
- Assertions.assertEquals("", doaj.getIssn());
- Assertions.assertEquals("2055-7159", doaj.getEissn());
- Assertions.assertEquals("BJR|case reports", doaj.getJournalTitle());
- }
- if (count == 16707) {
+ String line;
+ int count = 0;
+ while ((line = in.readLine()) != null) {
+ DOAJModel doaj = new ObjectMapper().readValue(line, DOAJModel.class);
+ if (count == 0) {
+ assertEquals("0001-3765", doaj.getIssn());
+ assertEquals("1678-2690", doaj.getEissn());
+ assertEquals("Anais da Academia Brasileira de Ciências", doaj.getJournalTitle());
+ }
+ if (count == 22) {
+ log.info(new ObjectMapper().writeValueAsString(doaj));
+ System.out.println(new ObjectMapper().writeValueAsString(doaj));
+ }
+ if (count == 7904) {
+ // log.info(new ObjectMapper().writeValueAsString(doaj));
+ assertEquals("", doaj.getIssn());
+ assertEquals("2055-7159", doaj.getEissn());
+ assertEquals("BJR|case reports", doaj.getJournalTitle());
+ }
+ if (count == 16707) {
- Assertions.assertEquals("", doaj.getIssn());
- Assertions.assertEquals("2788-6298", doaj.getEissn());
- Assertions
- .assertEquals("Teacher Education through Flexible Learning in Africa", doaj.getJournalTitle());
- }
+ assertEquals("2783-1043", doaj.getIssn());
+ assertEquals("2783-1051", doaj.getEissn());
+ assertEquals("فیزیک کاربردی ایران", doaj.getJournalTitle());
+ }
- count += 1;
- }
+ count += 1;
+ }
- Assertions.assertEquals(16713, count);
- }
+ assertEquals(16715, count);
+ }
- @AfterAll
- public static void cleanup() {
- FileUtils.deleteQuietly(new File(workingDir));
- }
+ @AfterAll
+ public static void cleanup() {
+ FileUtils.deleteQuietly(new File(workingDir));
+ }
}