added option to split dumps bigger that 40Gb on different files

This commit is contained in:
Miriam Baglioni 2020-10-30 14:09:04 +01:00
parent 78fdb11c3f
commit 14bf2e7238
4 changed files with 85 additions and 1 deletions

View File

@ -59,7 +59,79 @@ public class MakeTar implements Serializable {
String p_string = p.toString(); String p_string = p.toString();
String entity = p_string.substring(p_string.lastIndexOf("/") + 1); String entity = p_string.substring(p_string.lastIndexOf("/") + 1);
write(fileSystem, p_string, outputPath + "/" + entity + ".tar", entity); writeMaxSize(fileSystem, p_string, outputPath + "/" + entity, entity, 40);
}
}
private static TarArchiveOutputStream getTar(FileSystem fileSystem, String outputPath) throws IOException {
Path hdfsWritePath = new Path(outputPath);
FSDataOutputStream fsDataOutputStream = null;
if (fileSystem.exists(hdfsWritePath)) {
fileSystem.delete(hdfsWritePath, true);
}
fsDataOutputStream = fileSystem.create(hdfsWritePath);
return new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream());
}
private static void writeMaxSize(FileSystem fileSystem, String inputPath, String outputPath, String dir_name,
int gBperSplit) throws IOException {
final long bytesPerSplit = 1024L * 1024L * 1024L * gBperSplit;
long sourceSize = fileSystem.getContentSummary(new Path(inputPath)).getSpaceConsumed();
final long numberOfSplits = sourceSize / bytesPerSplit;
if (numberOfSplits <= 1) {
write(fileSystem, inputPath, outputPath + ".tar", dir_name);
} else {
int partNum = 0;
long remainingBytes = sourceSize % bytesPerSplit;
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
.listFiles(
new Path(inputPath), true);
while (sourceSize > 0) {
TarArchiveOutputStream ar = getTar(fileSystem, outputPath + "_" + partNum + ".tar");
long current_size = 0;
while (fileStatusListIterator.hasNext() && current_size < bytesPerSplit) {
LocatedFileStatus fileStatus = fileStatusListIterator.next();
Path p = fileStatus.getPath();
String p_string = p.toString();
if (!p_string.endsWith("_SUCCESS")) {
String name = p_string.substring(p_string.lastIndexOf("/") + 1);
if (name.trim().equalsIgnoreCase("communities_infrastructures")) {
name = "communities_infrastructures.json";
}
TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name);
entry.setSize(fileStatus.getLen());
current_size += fileStatus.getLen();
ar.putArchiveEntry(entry);
InputStream is = fileSystem.open(fileStatus.getPath());
BufferedInputStream bis = new BufferedInputStream(is);
int count;
byte data[] = new byte[1024];
while ((count = bis.read(data, 0, data.length)) != -1) {
ar.write(data, 0, count);
}
bis.close();
ar.closeArchiveEntry();
}
}
sourceSize = sourceSize - current_size;
partNum += 1;
ar.close();
}
} }
} }

View File

@ -0,0 +1,4 @@
package eu.dnetlib.dhp.oa.graph.dump.ttl;
public class OrganizationInfo {
}

View File

@ -0,0 +1,4 @@
package eu.dnetlib.dhp.oa.graph.dump.ttl;
public class Pids {
}

View File

@ -0,0 +1,4 @@
package eu.dnetlib.dhp.oa.graph.dump.ttl;
public class SparkPrepareOrganizationInfo {
}