From cb7c07c54e59675e8dffe42b7f2a13f16c956068 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 11 Aug 2022 11:25:24 +0200 Subject: [PATCH] [scholix] added step to create tar archive --- .../eu/dnetlib/dhp/common/MakeTarArchive.java | 96 +++++++++++++++---- .../dhp/common/input_maketar_parameters.json | 30 ++++++ .../graph/dumpScholix/oozie_app/workflow.xml | 14 ++- 3 files changed, 118 insertions(+), 22 deletions(-) create mode 100644 dhp-common/src/main/resources/eu/dnetlib/dhp/common/input_maketar_parameters.json diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java index abb9dc148f..eca433e9e7 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java @@ -5,13 +5,71 @@ import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.Serializable; +import java.util.Optional; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; public class MakeTarArchive implements Serializable { + private static final Logger log = LoggerFactory.getLogger(MakeTarArchive.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + MakeTarArchive.class + .getResourceAsStream( + "/eu/dnetlib/dhp/common/input_maketar_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + final String outputPath = parser.get("hdfsPath"); + log.info("hdfsPath: {}", outputPath); + + final String hdfsNameNode = parser.get("nameNode"); + log.info("nameNode: {}", hdfsNameNode); + + final String inputPath = parser.get("sourcePath"); + log.info("input path : {}", inputPath); + + final int gBperSplit = Optional + .ofNullable(parser.get("splitSize")) + .map(Integer::valueOf) + .orElse(10); + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fileSystem = FileSystem.get(conf); + + makeTArArchive(fileSystem, inputPath, outputPath, gBperSplit); + + } + + public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit) + throws IOException { + + RemoteIterator dirIterator = fileSystem.listLocatedStatus(new Path(inputPath)); + + while (dirIterator.hasNext()) { + LocatedFileStatus fileStatus = dirIterator.next(); + + Path p = fileStatus.getPath(); + String pathString = p.toString(); + String entity = pathString.substring(pathString.lastIndexOf("/") + 1); + + MakeTarArchive.tarMaxSize(fileSystem, pathString, outputPath + "/" + entity, entity, gBperSplit); + } + } + private static TarArchiveOutputStream getTar(FileSystem fileSystem, String outputPath) throws IOException { Path hdfsWritePath = new Path(outputPath); if (fileSystem.exists(hdfsWritePath)) { @@ -21,7 +79,7 @@ public class MakeTarArchive implements Serializable { return new TarArchiveOutputStream(fileSystem.create(hdfsWritePath).getWrappedStream()); } - private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dir_name) + private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dirName) throws IOException { Path hdfsWritePath = new Path(outputPath); @@ -37,7 +95,7 @@ public class MakeTarArchive implements Serializable { new Path(inputPath), true); while (iterator.hasNext()) { - writeCurrentFile(fileSystem, dir_name, iterator, ar, 0); + writeCurrentFile(fileSystem, dirName, iterator, ar, 0); } } @@ -59,32 +117,30 @@ public class MakeTarArchive implements Serializable { new Path(inputPath), true); boolean next = fileStatusListIterator.hasNext(); while (next) { - TarArchiveOutputStream ar = getTar(fileSystem, outputPath + "_" + (partNum + 1) + ".tar"); + try (TarArchiveOutputStream ar = getTar(fileSystem, outputPath + "_" + (partNum + 1) + ".tar")) { - long current_size = 0; - while (next && current_size < bytesPerSplit) { - current_size = writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, current_size); - next = fileStatusListIterator.hasNext(); + long currentSize = 0; + while (next && currentSize < bytesPerSplit) { + currentSize = writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, currentSize); + next = fileStatusListIterator.hasNext(); + } + + partNum += 1; } - - partNum += 1; - ar.close(); } - } - } - private static long writeCurrentFile(FileSystem fileSystem, String dir_name, + private static long writeCurrentFile(FileSystem fileSystem, String dirName, RemoteIterator fileStatusListIterator, - TarArchiveOutputStream ar, long current_size) throws IOException { + TarArchiveOutputStream ar, long currentSize) throws IOException { LocatedFileStatus fileStatus = fileStatusListIterator.next(); Path p = fileStatus.getPath(); - String p_string = p.toString(); - if (!p_string.endsWith("_SUCCESS")) { - String name = p_string.substring(p_string.lastIndexOf("/") + 1); + String pString = p.toString(); + if (!pString.endsWith("_SUCCESS")) { + String name = pString.substring(pString.lastIndexOf("/") + 1); if (name.startsWith("part-") & name.length() > 10) { String tmp = name.substring(0, 10); if (name.contains(".")) { @@ -92,9 +148,9 @@ public class MakeTarArchive implements Serializable { } name = tmp; } - TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name); + TarArchiveEntry entry = new TarArchiveEntry(dirName + "/" + name); entry.setSize(fileStatus.getLen()); - current_size += fileStatus.getLen(); + currentSize += fileStatus.getLen(); ar.putArchiveEntry(entry); InputStream is = fileSystem.open(fileStatus.getPath()); @@ -110,7 +166,7 @@ public class MakeTarArchive implements Serializable { ar.closeArchiveEntry(); } - return current_size; + return currentSize; } } diff --git a/dhp-common/src/main/resources/eu/dnetlib/dhp/common/input_maketar_parameters.json b/dhp-common/src/main/resources/eu/dnetlib/dhp/common/input_maketar_parameters.json new file mode 100644 index 0000000000..a153188651 --- /dev/null +++ b/dhp-common/src/main/resources/eu/dnetlib/dhp/common/input_maketar_parameters.json @@ -0,0 +1,30 @@ +[ + + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName": "hdp", + "paramLongName": "hdfsPath", + "paramDescription": "the path used to store the output archive", + "paramRequired": true + }, + { + "paramName":"nn", + "paramLongName":"nameNode", + "paramDescription": "the name node", + "paramRequired": true + }, + { + "paramName":"ss", + "paramLongName":"splitSize", + "paramDescription": "the maximum size of the archive", + "paramRequired": false + } +] + + + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml index 8135c98156..d47ebb0bec 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml @@ -140,11 +140,21 @@ --objectTypescholix --maxPidNumberFiltermaxNumberOfPid + + + + + + + eu.dnetlib.dhp.common.MakeTarArchive + --nameNode${nameNode} + --hdfsPath${targetPath}/tar + --sourcePath${targetPath}/json + - - + \ No newline at end of file