From 14bf2e7238c416983aa5b39e3823ffb9f52e23cc Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 30 Oct 2020 14:09:04 +0100 Subject: [PATCH] added option to split dumps bigger that 40Gb on different files --- .../eu/dnetlib/dhp/oa/graph/dump/MakeTar.java | 74 ++++++++++++++++++- .../oa/graph/dump/ttl/OrganizationInfo.java | 4 + .../dnetlib/dhp/oa/graph/dump/ttl/Pids.java | 4 + .../ttl/SparkPrepareOrganizationInfo.java | 4 + 4 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/OrganizationInfo.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/Pids.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/SparkPrepareOrganizationInfo.java diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java index 8d10efe60..89c65fb33 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java @@ -59,7 +59,79 @@ public class MakeTar implements Serializable { String p_string = p.toString(); String entity = p_string.substring(p_string.lastIndexOf("/") + 1); - write(fileSystem, p_string, outputPath + "/" + entity + ".tar", entity); + writeMaxSize(fileSystem, p_string, outputPath + "/" + entity, entity, 40); + } + + } + + private static TarArchiveOutputStream getTar(FileSystem fileSystem, String outputPath) throws IOException { + Path hdfsWritePath = new Path(outputPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fileSystem.delete(hdfsWritePath, true); + + } + fsDataOutputStream = fileSystem.create(hdfsWritePath); + + return new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream()); + } + + private static void writeMaxSize(FileSystem fileSystem, String inputPath, String outputPath, String dir_name, + int gBperSplit) throws IOException { + final long bytesPerSplit = 1024L * 1024L * 1024L * gBperSplit; + + long sourceSize = fileSystem.getContentSummary(new Path(inputPath)).getSpaceConsumed(); + + final long numberOfSplits = sourceSize / bytesPerSplit; + + if (numberOfSplits <= 1) { + write(fileSystem, inputPath, outputPath + ".tar", dir_name); + } else { + int partNum = 0; + long remainingBytes = sourceSize % bytesPerSplit; + + RemoteIterator fileStatusListIterator = fileSystem + .listFiles( + new Path(inputPath), true); + while (sourceSize > 0) { + TarArchiveOutputStream ar = getTar(fileSystem, outputPath + "_" + partNum + ".tar"); + + long current_size = 0; + while (fileStatusListIterator.hasNext() && current_size < bytesPerSplit) { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); + + Path p = fileStatus.getPath(); + String p_string = p.toString(); + if (!p_string.endsWith("_SUCCESS")) { + String name = p_string.substring(p_string.lastIndexOf("/") + 1); + if (name.trim().equalsIgnoreCase("communities_infrastructures")) { + name = "communities_infrastructures.json"; + } + TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name); + entry.setSize(fileStatus.getLen()); + current_size += fileStatus.getLen(); + ar.putArchiveEntry(entry); + + InputStream is = fileSystem.open(fileStatus.getPath()); + + BufferedInputStream bis = new BufferedInputStream(is); + + int count; + byte data[] = new byte[1024]; + while ((count = bis.read(data, 0, data.length)) != -1) { + ar.write(data, 0, count); + } + bis.close(); + ar.closeArchiveEntry(); + + } + + } + sourceSize = sourceSize - current_size; + partNum += 1; + ar.close(); + } + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/OrganizationInfo.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/OrganizationInfo.java new file mode 100644 index 000000000..31aa2b0a4 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/OrganizationInfo.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.oa.graph.dump.ttl; + +public class OrganizationInfo { +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/Pids.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/Pids.java new file mode 100644 index 000000000..f540e161e --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/Pids.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.oa.graph.dump.ttl; + +public class Pids { +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/SparkPrepareOrganizationInfo.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/SparkPrepareOrganizationInfo.java new file mode 100644 index 000000000..293d91671 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/SparkPrepareOrganizationInfo.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.oa.graph.dump.ttl; + +public class SparkPrepareOrganizationInfo { +}