forked from D-Net/dnet-hadoop
changed the way the tar archive is build to support renaming in case we need to change .tt.gz into .json.gz
This commit is contained in:
parent
9c1df15071
commit
9a9cc6a1dd
|
@ -45,16 +45,24 @@ public class MakeTarArchive implements Serializable {
|
||||||
.map(Integer::valueOf)
|
.map(Integer::valueOf)
|
||||||
.orElse(10);
|
.orElse(10);
|
||||||
|
|
||||||
|
final boolean rename = Optional
|
||||||
|
.ofNullable(parser.get("rename"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.FALSE);
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.set("fs.defaultFS", hdfsNameNode);
|
conf.set("fs.defaultFS", hdfsNameNode);
|
||||||
|
|
||||||
FileSystem fileSystem = FileSystem.get(conf);
|
FileSystem fileSystem = FileSystem.get(conf);
|
||||||
|
|
||||||
makeTArArchive(fileSystem, inputPath, outputPath, gBperSplit);
|
makeTArArchive(fileSystem, inputPath, outputPath, gBperSplit, rename);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit) throws IOException{
|
||||||
public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit)
|
makeTArArchive(fileSystem,inputPath,outputPath,gBperSplit,false);
|
||||||
|
}
|
||||||
|
public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit,
|
||||||
|
boolean rename)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
RemoteIterator<LocatedFileStatus> dirIterator = fileSystem.listLocatedStatus(new Path(inputPath));
|
RemoteIterator<LocatedFileStatus> dirIterator = fileSystem.listLocatedStatus(new Path(inputPath));
|
||||||
|
@ -66,7 +74,7 @@ public class MakeTarArchive implements Serializable {
|
||||||
String pathString = p.toString();
|
String pathString = p.toString();
|
||||||
String entity = pathString.substring(pathString.lastIndexOf("/") + 1);
|
String entity = pathString.substring(pathString.lastIndexOf("/") + 1);
|
||||||
|
|
||||||
MakeTarArchive.tarMaxSize(fileSystem, pathString, outputPath + "/" + entity, entity, gBperSplit);
|
MakeTarArchive.tarMaxSize(fileSystem, pathString, outputPath + "/" + entity, entity, gBperSplit, rename);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,7 +87,8 @@ public class MakeTarArchive implements Serializable {
|
||||||
return new TarArchiveOutputStream(fileSystem.create(hdfsWritePath).getWrappedStream());
|
return new TarArchiveOutputStream(fileSystem.create(hdfsWritePath).getWrappedStream());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dirName)
|
private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dirName,
|
||||||
|
boolean rename)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
Path hdfsWritePath = new Path(outputPath);
|
Path hdfsWritePath = new Path(outputPath);
|
||||||
|
@ -95,20 +104,20 @@ public class MakeTarArchive implements Serializable {
|
||||||
new Path(inputPath), true);
|
new Path(inputPath), true);
|
||||||
|
|
||||||
while (iterator.hasNext()) {
|
while (iterator.hasNext()) {
|
||||||
writeCurrentFile(fileSystem, dirName, iterator, ar, 0);
|
writeCurrentFile(fileSystem, dirName, iterator, ar, 0, rename);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void tarMaxSize(FileSystem fileSystem, String inputPath, String outputPath, String dir_name,
|
public static void tarMaxSize(FileSystem fileSystem, String inputPath, String outputPath, String dir_name,
|
||||||
int gBperSplit) throws IOException {
|
int gBperSplit, boolean rename) throws IOException {
|
||||||
final long bytesPerSplit = 1024L * 1024L * 1024L * gBperSplit;
|
final long bytesPerSplit = 1024L * 1024L * 1024L * gBperSplit;
|
||||||
|
|
||||||
long sourceSize = fileSystem.getContentSummary(new Path(inputPath)).getSpaceConsumed();
|
long sourceSize = fileSystem.getContentSummary(new Path(inputPath)).getSpaceConsumed();
|
||||||
|
|
||||||
if (sourceSize < bytesPerSplit) {
|
if (sourceSize < bytesPerSplit) {
|
||||||
write(fileSystem, inputPath, outputPath + ".tar", dir_name);
|
write(fileSystem, inputPath, outputPath + ".tar", dir_name, rename);
|
||||||
} else {
|
} else {
|
||||||
int partNum = 0;
|
int partNum = 0;
|
||||||
|
|
||||||
|
@ -121,7 +130,8 @@ public class MakeTarArchive implements Serializable {
|
||||||
|
|
||||||
long currentSize = 0;
|
long currentSize = 0;
|
||||||
while (next && currentSize < bytesPerSplit) {
|
while (next && currentSize < bytesPerSplit) {
|
||||||
currentSize = writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, currentSize);
|
currentSize = writeCurrentFile(
|
||||||
|
fileSystem, dir_name, fileStatusListIterator, ar, currentSize, rename);
|
||||||
next = fileStatusListIterator.hasNext();
|
next = fileStatusListIterator.hasNext();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -134,7 +144,7 @@ public class MakeTarArchive implements Serializable {
|
||||||
|
|
||||||
private static long writeCurrentFile(FileSystem fileSystem, String dirName,
|
private static long writeCurrentFile(FileSystem fileSystem, String dirName,
|
||||||
RemoteIterator<LocatedFileStatus> fileStatusListIterator,
|
RemoteIterator<LocatedFileStatus> fileStatusListIterator,
|
||||||
TarArchiveOutputStream ar, long currentSize) throws IOException {
|
TarArchiveOutputStream ar, long currentSize, boolean rename) throws IOException {
|
||||||
LocatedFileStatus fileStatus = fileStatusListIterator.next();
|
LocatedFileStatus fileStatus = fileStatusListIterator.next();
|
||||||
|
|
||||||
Path p = fileStatus.getPath();
|
Path p = fileStatus.getPath();
|
||||||
|
@ -148,6 +158,11 @@ public class MakeTarArchive implements Serializable {
|
||||||
}
|
}
|
||||||
name = tmp;
|
name = tmp;
|
||||||
}
|
}
|
||||||
|
if (rename) {
|
||||||
|
if (name.endsWith(".txt.gz"))
|
||||||
|
name = name.replace(".txt.gz", ".json.gz");
|
||||||
|
}
|
||||||
|
|
||||||
TarArchiveEntry entry = new TarArchiveEntry(dirName + "/" + name);
|
TarArchiveEntry entry = new TarArchiveEntry(dirName + "/" + name);
|
||||||
entry.setSize(fileStatus.getLen());
|
entry.setSize(fileStatus.getLen());
|
||||||
currentSize += fileStatus.getLen();
|
currentSize += fileStatus.getLen();
|
||||||
|
|
|
@ -23,6 +23,12 @@
|
||||||
"paramLongName":"splitSize",
|
"paramLongName":"splitSize",
|
||||||
"paramDescription": "the maximum size of the archive",
|
"paramDescription": "the maximum size of the archive",
|
||||||
"paramRequired": false
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName":"rn",
|
||||||
|
"paramLongName":"rename",
|
||||||
|
"paramDescription": "if the file has to be renamed",
|
||||||
|
"paramRequired": false
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue