From cbbb1bdc5499b5176b91236a6638b69ab8e8fa99 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 3 Nov 2020 16:55:50 +0100 Subject: [PATCH] moved business logic to new class in common for handling the zip of hte archives --- .../eu/dnetlib/dhp/oa/graph/dump/MakeTar.java | 256 +++++++++--------- 1 file changed, 131 insertions(+), 125 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java index 5366b1f15..7d8013c53 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java @@ -2,7 +2,9 @@ package eu.dnetlib.dhp.oa.graph.dump; import java.io.*; +import java.util.Optional; +import eu.dnetlib.dhp.common.MakeTarArchive; import org.apache.commons.compress.archivers.ar.ArArchiveEntry; import org.apache.commons.compress.archivers.ar.ArArchiveOutputStream; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; @@ -39,16 +41,20 @@ public class MakeTar implements Serializable { final String inputPath = parser.get("sourcePath"); log.info("input path : {}", inputPath); + final int gBperSplit = Optional.ofNullable(parser.get("splitSize")) + .map(Integer::valueOf) + .orElse(10); + Configuration conf = new Configuration(); conf.set("fs.defaultFS", hdfsNameNode); FileSystem fileSystem = FileSystem.get(conf); - makeTArArchive(fileSystem, inputPath, outputPath); + makeTArArchive(fileSystem, inputPath, outputPath, gBperSplit); } - public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath) throws IOException { + public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit) throws IOException { RemoteIterator dir_iterator = fileSystem.listLocatedStatus(new Path(inputPath)); @@ -59,132 +65,132 @@ public class MakeTar implements Serializable { String p_string = p.toString(); String entity = p_string.substring(p_string.lastIndexOf("/") + 1); - writeMaxSize(fileSystem, p_string, outputPath + "/" + entity, entity, 10); + MakeTarArchive.tarMaxSize(fileSystem, p_string, outputPath + "/" + entity, entity, gBperSplit); } } - private static TarArchiveOutputStream getTar(FileSystem fileSystem, String outputPath) throws IOException { - Path hdfsWritePath = new Path(outputPath); - FSDataOutputStream fsDataOutputStream = null; - if (fileSystem.exists(hdfsWritePath)) { - fileSystem.delete(hdfsWritePath, true); - - } - fsDataOutputStream = fileSystem.create(hdfsWritePath); - - return new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream()); - } - - private static void writeMaxSize(FileSystem fileSystem, String inputPath, String outputPath, String dir_name, - int gBperSplit) throws IOException { - final long bytesPerSplit = 1024L * 1024L * 1024L * gBperSplit; - - long sourceSize = fileSystem.getContentSummary(new Path(inputPath)).getSpaceConsumed(); - - final long numberOfSplits = sourceSize / bytesPerSplit; - - if (numberOfSplits < 2) { - write(fileSystem, inputPath, outputPath + ".tar", dir_name); - } else { - int partNum = 0; - - RemoteIterator fileStatusListIterator = fileSystem - .listFiles( - new Path(inputPath), true); - boolean next = fileStatusListIterator.hasNext(); - while (sourceSize > 0 && next) { - TarArchiveOutputStream ar = getTar(fileSystem, outputPath + "_" + (partNum + 1) + ".tar"); - - long current_size = 0; - while (next && current_size < bytesPerSplit) { - LocatedFileStatus fileStatus = fileStatusListIterator.next(); - - Path p = fileStatus.getPath(); - String p_string = p.toString(); - if (!p_string.endsWith("_SUCCESS")) { - String name = p_string.substring(p_string.lastIndexOf("/") + 1); - if (name.trim().equalsIgnoreCase("communities_infrastructures")) { - name = "communities_infrastructures.json"; - } - TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name); - entry.setSize(fileStatus.getLen()); - current_size += fileStatus.getLen(); - ar.putArchiveEntry(entry); - - InputStream is = fileSystem.open(fileStatus.getPath()); - - BufferedInputStream bis = new BufferedInputStream(is); - - int count; - byte data[] = new byte[1024]; - while ((count = bis.read(data, 0, data.length)) != -1) { - ar.write(data, 0, count); - } - bis.close(); - ar.closeArchiveEntry(); - - } - next = fileStatusListIterator.hasNext(); - - } - sourceSize = sourceSize - current_size; - partNum += 1; - ar.close(); - } - - } - - } - - private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dir_name) - throws IOException { - - Path hdfsWritePath = new Path(outputPath); - FSDataOutputStream fsDataOutputStream = null; - if (fileSystem.exists(hdfsWritePath)) { - fileSystem.delete(hdfsWritePath, true); - - } - fsDataOutputStream = fileSystem.create(hdfsWritePath); - - TarArchiveOutputStream ar = new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream()); - - RemoteIterator fileStatusListIterator = fileSystem - .listFiles( - new Path(inputPath), true); - - while (fileStatusListIterator.hasNext()) { - LocatedFileStatus fileStatus = fileStatusListIterator.next(); - - Path p = fileStatus.getPath(); - String p_string = p.toString(); - if (!p_string.endsWith("_SUCCESS")) { - String name = p_string.substring(p_string.lastIndexOf("/") + 1); - if (name.trim().equalsIgnoreCase("communities_infrastructures")) { - name = "communities_infrastructures.json"; - } - TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name); - entry.setSize(fileStatus.getLen()); - ar.putArchiveEntry(entry); - - InputStream is = fileSystem.open(fileStatus.getPath()); - - BufferedInputStream bis = new BufferedInputStream(is); - - int count; - byte data[] = new byte[1024]; - while ((count = bis.read(data, 0, data.length)) != -1) { - ar.write(data, 0, count); - } - bis.close(); - ar.closeArchiveEntry(); - - } - - } - - ar.close(); - } +// private static TarArchiveOutputStream getTar(FileSystem fileSystem, String outputPath) throws IOException { +// Path hdfsWritePath = new Path(outputPath); +// FSDataOutputStream fsDataOutputStream = null; +// if (fileSystem.exists(hdfsWritePath)) { +// fileSystem.delete(hdfsWritePath, true); +// +// } +// fsDataOutputStream = fileSystem.create(hdfsWritePath); +// +// return new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream()); +// } +// +// private static void writeMaxSize(FileSystem fileSystem, String inputPath, String outputPath, String dir_name, +// int gBperSplit) throws IOException { +// final long bytesPerSplit = 1024L * 1024L * 1024L * gBperSplit; +// +// long sourceSize = fileSystem.getContentSummary(new Path(inputPath)).getSpaceConsumed(); +// +// final long numberOfSplits = sourceSize / bytesPerSplit; +// +// if (numberOfSplits < 2) { +// write(fileSystem, inputPath, outputPath + ".tar", dir_name); +// } else { +// int partNum = 0; +// +// RemoteIterator fileStatusListIterator = fileSystem +// .listFiles( +// new Path(inputPath), true); +// boolean next = fileStatusListIterator.hasNext(); +// while (sourceSize > 0 && next) { +// TarArchiveOutputStream ar = getTar(fileSystem, outputPath + "_" + (partNum + 1) + ".tar"); +// +// long current_size = 0; +// while (next && current_size < bytesPerSplit) { +// LocatedFileStatus fileStatus = fileStatusListIterator.next(); +// +// Path p = fileStatus.getPath(); +// String p_string = p.toString(); +// if (!p_string.endsWith("_SUCCESS")) { +// String name = p_string.substring(p_string.lastIndexOf("/") + 1); +// if (name.trim().equalsIgnoreCase("communities_infrastructures")) { +// name = "communities_infrastructures.json"; +// } +// TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name); +// entry.setSize(fileStatus.getLen()); +// current_size += fileStatus.getLen(); +// ar.putArchiveEntry(entry); +// +// InputStream is = fileSystem.open(fileStatus.getPath()); +// +// BufferedInputStream bis = new BufferedInputStream(is); +// +// int count; +// byte data[] = new byte[1024]; +// while ((count = bis.read(data, 0, data.length)) != -1) { +// ar.write(data, 0, count); +// } +// bis.close(); +// ar.closeArchiveEntry(); +// +// } +// next = fileStatusListIterator.hasNext(); +// +// } +// sourceSize = sourceSize - current_size; +// partNum += 1; +// ar.close(); +// } +// +// } +// +// } +// +// private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dir_name) +// throws IOException { +// +// Path hdfsWritePath = new Path(outputPath); +// FSDataOutputStream fsDataOutputStream = null; +// if (fileSystem.exists(hdfsWritePath)) { +// fileSystem.delete(hdfsWritePath, true); +// +// } +// fsDataOutputStream = fileSystem.create(hdfsWritePath); +// +// TarArchiveOutputStream ar = new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream()); +// +// RemoteIterator fileStatusListIterator = fileSystem +// .listFiles( +// new Path(inputPath), true); +// +// while (fileStatusListIterator.hasNext()) { +// LocatedFileStatus fileStatus = fileStatusListIterator.next(); +// +// Path p = fileStatus.getPath(); +// String p_string = p.toString(); +// if (!p_string.endsWith("_SUCCESS")) { +// String name = p_string.substring(p_string.lastIndexOf("/") + 1); +// if (name.trim().equalsIgnoreCase("communities_infrastructures")) { +// name = "communities_infrastructures.json"; +// } +// TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name); +// entry.setSize(fileStatus.getLen()); +// ar.putArchiveEntry(entry); +// +// InputStream is = fileSystem.open(fileStatus.getPath()); +// +// BufferedInputStream bis = new BufferedInputStream(is); +// +// int count; +// byte data[] = new byte[1024]; +// while ((count = bis.read(data, 0, data.length)) != -1) { +// ar.write(data, 0, count); +// } +// bis.close(); +// ar.closeArchiveEntry(); +// +// } +// +// } +// +// ar.close(); +// } }