diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java index 1ae775bec..c967d4cae 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java @@ -1,36 +1,21 @@ package eu.dnetlib.dhp.actionmanager.project.utils; -import java.io.BufferedWriter; -import java.io.Closeable; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.nio.charset.StandardCharsets; +import java.io.*; import java.util.Optional; import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.collection.HttpConnector2; +import eu.dnetlib.dhp.common.collection.GetCSV; +import eu.dnetlib.dhp.common.collection.HttpConnector2; /** * Applies the parsing of a csv file and writes the Serialization of it in hdfs */ -public class ReadCSV implements Closeable { - private static final Log log = LogFactory.getLog(ReadCSV.class); - - private final BufferedWriter writer; - private final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private final String csvFile; - private final char delimiter; +public class ReadCSV { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -50,57 +35,18 @@ public class ReadCSV implements Closeable { char del = ';'; if (delimiter.isPresent()) del = delimiter.get().charAt(0); - try (final ReadCSV readCSV = new ReadCSV(hdfsPath, hdfsNameNode, fileURL, del)) { - log.info("Getting CSV file..."); - readCSV.execute(classForName); - } - - } - - public void execute(final String classForName) - throws IOException, ClassNotFoundException, IllegalAccessException, InstantiationException { - CSVParser csvParser = new CSVParser(); - csvParser - .parse(csvFile, classForName, delimiter) - .stream() - .forEach(this::write); - } - - @Override - public void close() throws IOException { - writer.close(); - } - - public ReadCSV( - final String hdfsPath, - final String hdfsNameNode, - final String fileURL, - char delimiter) - throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", hdfsNameNode); - HttpConnector2 httpConnector = new HttpConnector2(); + FileSystem fileSystem = FileSystem.get(conf); - Path hdfsWritePath = new Path(hdfsPath); + BufferedReader reader = new BufferedReader( + new InputStreamReader(new HttpConnector2().getInputSourceAsStream(fileURL))); - if (fileSystem.exists(hdfsWritePath)) { - fileSystem.delete(hdfsWritePath, false); - } - final FSDataOutputStream fos = fileSystem.create(hdfsWritePath); + GetCSV.getCsv(fileSystem, reader, hdfsPath, classForName, del); - this.writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8)); - this.csvFile = httpConnector.getInputSource(fileURL); - this.delimiter = delimiter; - } + reader.close(); - protected void write(final Object p) { - try { - writer.write(OBJECT_MAPPER.writeValueAsString(p)); - writer.newLine(); - } catch (final Exception e) { - throw new RuntimeException(e); - } } }