diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/ExtractAndMapDoajJson.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/ExtractAndMapDoajJson.java new file mode 100644 index 000000000..17a78760e --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/ExtractAndMapDoajJson.java @@ -0,0 +1,117 @@ + +package eu.dnetlib.dhp.oa.graph.hostedbymap; + +import static eu.dnetlib.dhp.common.collection.DecompressTarGz.doExtract; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Arrays; +import java.util.Objects; + +import eu.dnetlib.dhp.oa.graph.hostedbymap.model.DOAJModel; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.hostedbymap.model.doaj.DOAJEntry; + +public class ExtractAndMapDoajJson { + + private static final Logger log = LoggerFactory.getLogger(ExtractAndMapDoajJson.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + ExtractAndMapDoajJson.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/hostedbymap/download_json_parameters.json")))); + + parser.parseArgument(args); + + final String compressedInput = parser.get("compressedFile"); + log.info("compressedInput {}", compressedInput); + + final String hdfsNameNode = parser.get("hdfsNameNode"); + log.info("hdfsNameNode {}", hdfsNameNode); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}", outputPath); + + final String workingPath = parser.get("workingPath"); + log.info("workingPath {}", workingPath); + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fs = FileSystem.get(conf); + CompressionCodecFactory factory = new CompressionCodecFactory(conf); + CompressionCodec codec = factory.getCodecByClassName("org.apache.hadoop.io.compress.GzipCodec"); + doExtract(fs, workingPath, compressedInput); + doMap(fs, workingPath, outputPath, codec); + + } + + private static void doMap(FileSystem fs, String workingPath, String outputPath, CompressionCodec codec) + throws IOException { + RemoteIterator fileStatusListIterator = fs + .listFiles( + new Path(workingPath), true); + + Path hdfsWritePath = new Path(outputPath); + if (fs.exists(hdfsWritePath)) { + fs.delete(hdfsWritePath, true); + + } + try ( + + FSDataOutputStream out = fs + .create(hdfsWritePath); + PrintWriter writer = new PrintWriter(new BufferedOutputStream(out))) { + + while (fileStatusListIterator.hasNext()) { + Path path = fileStatusListIterator.next().getPath(); + if (!fs.isDirectory(path)) { + FSDataInputStream is = fs.open(path); + CompressionInputStream compressionInputStream = codec.createInputStream(is); + DOAJEntry[] doajEntries = new ObjectMapper().readValue(compressionInputStream, DOAJEntry[].class); + Arrays.stream(doajEntries).forEach(doaj -> { + try { + writer.println(new ObjectMapper().writeValueAsString(getDoajModel(doaj))); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + }); + } + + } + + } + + } + + @NotNull + public static DOAJModel getDoajModel(DOAJEntry doaj) { + DOAJModel doajModel = new DOAJModel(); + doajModel.setOaStart(doaj.getBibjson().getOa_start()); + doajModel.setEissn(doaj.getBibjson().getEissn()); + doajModel.setIssn(doaj.getBibjson().getPissn()); + doajModel.setJournalTitle(doaj.getBibjson().getTitle()); + doajModel.setReviewProcess(doaj.getBibjson().getEditorial().getReview_process()); + return doajModel; + } + +}