From df0e15231841438f12c7ed165a37847f425aaa7f Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 4 Aug 2023 16:43:55 +0200 Subject: [PATCH] moved maketar in common under the dmp --- .../eu/dnetlib/dhp/common/MakeTarArchive.java | 190 ++++++++++++++++++ .../eosc_input_maketar_parameters.json | 0 .../dump/{ => eosc}/oozie_app/workflow.xml | 0 3 files changed, 190 insertions(+) create mode 100644 dump/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java rename dump/src/main/resources/eu/dnetlib/dhp/{oa/graph/dump => common}/eosc_input_maketar_parameters.json (100%) rename dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/{ => eosc}/oozie_app/workflow.xml (100%) diff --git a/dump/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java b/dump/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java new file mode 100644 index 0000000..43e61e5 --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java @@ -0,0 +1,190 @@ + +package eu.dnetlib.dhp.common; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.util.Optional; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class MakeTarArchive implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(MakeTarArchive.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + MakeTarArchive.class + .getResourceAsStream( + "/eu/dnetlib/dhp/common/input_maketar_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + final String outputPath = parser.get("hdfsPath"); + log.info("hdfsPath: {}", outputPath); + + final String hdfsNameNode = parser.get("nameNode"); + log.info("nameNode: {}", hdfsNameNode); + + final String inputPath = parser.get("sourcePath"); + log.info("input path : {}", inputPath); + + final int gBperSplit = Optional + .ofNullable(parser.get("splitSize")) + .map(Integer::valueOf) + .orElse(10); + + final boolean rename = Optional + .ofNullable(parser.get("rename")) + .map(Boolean::valueOf) + .orElse(Boolean.FALSE); + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fileSystem = FileSystem.get(conf); + + makeTArArchive(fileSystem, inputPath, outputPath, gBperSplit, rename); + + } + + public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit) + throws IOException { + makeTArArchive(fileSystem, inputPath, outputPath, gBperSplit, false); + } + + public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit, + boolean rename) + throws IOException { + + RemoteIterator dirIterator = fileSystem.listLocatedStatus(new Path(inputPath)); + + while (dirIterator.hasNext()) { + LocatedFileStatus fileStatus = dirIterator.next(); + + Path p = fileStatus.getPath(); + String pathString = p.toString(); + String entity = pathString.substring(pathString.lastIndexOf("/") + 1); + + MakeTarArchive.tarMaxSize(fileSystem, pathString, outputPath + "/" + entity, entity, gBperSplit, rename); + } + } + + private static TarArchiveOutputStream getTar(FileSystem fileSystem, String outputPath) throws IOException { + Path hdfsWritePath = new Path(outputPath); + if (fileSystem.exists(hdfsWritePath)) { + fileSystem.delete(hdfsWritePath, true); + + } + return new TarArchiveOutputStream(fileSystem.create(hdfsWritePath).getWrappedStream()); + } + + private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dirName, + boolean rename) + throws IOException { + + Path hdfsWritePath = new Path(outputPath); + if (fileSystem.exists(hdfsWritePath)) { + fileSystem.delete(hdfsWritePath, true); + + } + try (TarArchiveOutputStream ar = new TarArchiveOutputStream( + fileSystem.create(hdfsWritePath).getWrappedStream())) { + + RemoteIterator iterator = fileSystem + .listFiles( + new Path(inputPath), true); + + while (iterator.hasNext()) { + writeCurrentFile(fileSystem, dirName, iterator, ar, 0, rename); + } + + } + } + + public static void tarMaxSize(FileSystem fileSystem, String inputPath, String outputPath, String dir_name, + int gBperSplit, boolean rename) throws IOException { + final long bytesPerSplit = 1024L * 1024L * 1024L * gBperSplit; + + long sourceSize = fileSystem.getContentSummary(new Path(inputPath)).getSpaceConsumed(); + + if (sourceSize < bytesPerSplit) { + write(fileSystem, inputPath, outputPath + ".tar", dir_name, rename); + } else { + int partNum = 0; + + RemoteIterator fileStatusListIterator = fileSystem + .listFiles( + new Path(inputPath), true); + boolean next = fileStatusListIterator.hasNext(); + while (next) { + try (TarArchiveOutputStream ar = getTar(fileSystem, outputPath + "_" + (partNum + 1) + ".tar")) { + + long currentSize = 0; + while (next && currentSize < bytesPerSplit) { + currentSize = writeCurrentFile( + fileSystem, dir_name, fileStatusListIterator, ar, currentSize, rename); + next = fileStatusListIterator.hasNext(); + + } + + partNum += 1; + } + } + } + } + + private static long writeCurrentFile(FileSystem fileSystem, String dirName, + RemoteIterator fileStatusListIterator, + TarArchiveOutputStream ar, long currentSize, boolean rename) throws IOException { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); + + Path p = fileStatus.getPath(); + String pString = p.toString(); + if (!pString.endsWith("_SUCCESS")) { + String name = pString.substring(pString.lastIndexOf("/") + 1); + if (name.startsWith("part-") & name.length() > 10) { + String tmp = name.substring(0, 10); + if (name.contains(".")) { + tmp += name.substring(name.indexOf(".")); + } + name = tmp; + } + if (rename) { + if (name.endsWith(".txt.gz")) + name = name.replace(".txt.gz", ".json.gz"); + } + + TarArchiveEntry entry = new TarArchiveEntry(dirName + "/" + name); + entry.setSize(fileStatus.getLen()); + currentSize += fileStatus.getLen(); + ar.putArchiveEntry(entry); + + InputStream is = fileSystem.open(fileStatus.getPath()); + + BufferedInputStream bis = new BufferedInputStream(is); + + int count; + byte[] data = new byte[1024]; + while ((count = bis.read(data, 0, data.length)) != -1) { + ar.write(data, 0, count); + } + bis.close(); + ar.closeArchiveEntry(); + + } + return currentSize; + } + +} diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_input_maketar_parameters.json b/dump/src/main/resources/eu/dnetlib/dhp/common/eosc_input_maketar_parameters.json similarity index 100% rename from dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_input_maketar_parameters.json rename to dump/src/main/resources/eu/dnetlib/dhp/common/eosc_input_maketar_parameters.json diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml similarity index 100% rename from dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/oozie_app/workflow.xml rename to dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml