Import main branch code

This commit is contained in:
Giambattista Bloisi 2024-10-08 10:30:54 +02:00
commit f8db4948dc
4 changed files with 594 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target/

18
README.md Normal file
View File

@ -0,0 +1,18 @@
# Info
Tar archiver for Hadoop HDFS.
# Build
mvn package
sudo mv target/hadoop-tar-*.jar /usr/local/share/hadoop-tar.jar
# Usage
Help:
hadoop jar /usr/local/share/hadoop-tar.jar --help
Streaming example with s3cmd:
hadoop jar /usr/local/share/hadoop-tar.jar -v -c hdfs:///user/hawking | pbzip2 | s3cmd --multipart-chunk-size-mb=100 put - s3://hador/user-hawking.tar.bz2

109
pom.xml Normal file
View File

@ -0,0 +1,109 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>hadoop-tar</artifactId>
<groupId>org.apache.hadoop.tar</groupId>
<version>2.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Hadoop Tar</name>
<description>Tar archiver for Hadoop</description>
<scm>
<connection>scm:git:https://gitlab.cesnet.cz/702/HADOOP/tar</connection>
<developerConnection>scm:git:git@gitlab.cesnet.cz:702/HADOOP/tar.git</developerConnection>
<url>https://gitlab.cesnet.cz/702/HADOOP/tar</url>
<tag>HEAD</tag>
</scm>
<properties>
<exec.mainClass>org.apache.hadoop.tar.HadoopTar</exec.mainClass>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.source.version>1.8</project.source.version>
<project.target.version>1.8</project.target.version>
</properties>
<distributionManagement>
<repository>
<id>zcu-releases</id>
<name>University of West Bohemia Releases</name>
<url>https://maven.civ.zcu.cz/repository/maven-releases/</url>
</repository>
<snapshotRepository>
<id>zcu-snapshots</id>
<name>University of West Bohemia Snapshots</name>
<url>https://maven.civ.zcu.cz/repository/maven-snapshots/</url>
</snapshotRepository>
</distributionManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<compilerArgument>-Xlint:all</compilerArgument>
<source>${project.source.version}</source>
<target>${project.target.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>3.0.0-M1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<archive>
<manifest>
<mainClass>${exec.mainClass}</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
<version>3.0.0-M4</version>
<configuration>
<addSchema>false</addSchema>
<goals>package install</goals>
<localCheckout>true</localCheckout>
<scmCommentPrefix>[Release] </scmCommentPrefix>
<pushChanges>false</pushChanges>
<tagNameFormat>@{version}</tagNameFormat>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,466 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tar;
import java.io.BufferedInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.IOException;
import java.util.List;
import java.util.Arrays;
import java.util.LinkedList;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.logging.*;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
public class HadoopTar extends Configured implements Tool {
private Options allOptions;
private CommandLineParser parser ;
private FsPermission defaultPermission;
public static final Log LOG = LogFactory.getLog(HadoopTar.class);
@SuppressWarnings("static-access")
private Option createBoolOption(String name, String longopt, String desc){
if( name.isEmpty() ){
return OptionBuilder.withDescription(desc)
.withLongOpt(longopt)
.create();
} else {
return OptionBuilder.withDescription(desc)
.withLongOpt(longopt)
.create(name);
}
}
@SuppressWarnings("static-access")
private Option createOption(String name, String longopt,
String desc,
String argName, int max,
boolean required){
if( name.isEmpty() ){
return OptionBuilder
.withArgName(argName)
.hasArgs(max)
.withDescription(desc)
.isRequired(required)
.withLongOpt(longopt)
.create();
} else {
return OptionBuilder
.withArgName(argName)
.hasArgs(max)
.withDescription(desc)
.isRequired(required)
.withLongOpt(longopt)
.create(name);
}
}
private void setupOptions() {
Option verbose = createBoolOption("v", "verbose", "print verbose output");
Option create = createBoolOption("c", "create", "create a new archive" );
Option help = createBoolOption("", "help", "show help message" );
Option extract = createBoolOption("x", "extract",
"extract files from an archive");
Option list = createBoolOption("t", "list", "list files from an archive");
Option overwrite = createBoolOption("", "overwrite",
"overwrite existing directory");
Option sameowner = createBoolOption("", "same-owner",
"create extracted files with the same ownership");
Option samegroup = createBoolOption("", "same-group",
"create extracted files with the same group id");
Option absolutepath = createBoolOption("P", "absolute-names",
"don't strip leading / from file name");
Option compress = createBoolOption("z", "compress",
"filter the archive through compress/uncompress gzip");
Option nosamepermission = createBoolOption("p", "preserve-permissions",
"apply recorded permissions instead of applying user's umask when extracting files");
Option directory = createOption("C", "directory",
"Set the working directory to DIR",
"DIR", 1, false);
Option file = createOption("f", "file",
"Use archive file (default '-' for stdin/stdout)",
"FILE", 1, false);
allOptions = new Options().
addOption(verbose).
addOption(create).
addOption(extract).
addOption(overwrite).
addOption(directory).
addOption(nosamepermission).
addOption(list).
addOption(sameowner).
addOption(samegroup).
addOption(absolutepath).
addOption(help).
addOption(compress).
addOption(file);
parser = new PosixParser();
}
private void usage(Options options) {
HelpFormatter help = new HelpFormatter();
help.printHelp("hadoop jar hadoop-tar.jar [options] ", options);
}
private void fail(String msg) {
LOG.error(msg);
LOG.error("");
usage(allOptions);
System.exit(1);
}
//Almost a copy of IOUtils.java:copyBytes
//except it explicitly set how many bytes to copy
private void copyBytes(InputStream in, OutputStream out,
int buffSize, long size ) throws IOException {
if( size == 0 ) {
return;
}
PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
byte buf[] = new byte[buffSize];
long bytesToBeCopied = size;
int len;
int bytesRead;
do {
len = bytesToBeCopied < buffSize ? (int)bytesToBeCopied : buffSize ;
bytesRead = in.read(buf, 0, len);
out.write(buf, 0, bytesRead);
if ((ps != null) && ps.checkError()) {
throw new IOException("Unable to write to output stream.");
}
bytesToBeCopied -= bytesRead;
} while (bytesToBeCopied > 0 ) ;
}
private void archive(TarArchiveOutputStream os, Path baseDir, Path p,
boolean keepAbsolutePath, boolean optionVerbose )
throws IOException {
if( optionVerbose ) {
System.err.println(p.toString() );
}
TarArchiveEntry entry = new TarArchiveEntry(p.getName());
Path absolutePath = p.isAbsolute() ? p : new Path(baseDir, p);
FileSystem fs = absolutePath.getFileSystem(getConf());
FileStatus fileStatus = fs.getFileStatus(absolutePath);
entry.setNames(fileStatus.getOwner(), fileStatus.getGroup());
entry.setMode(fileStatus.getPermission().toShort());
entry.setModTime(fileStatus.getModificationTime());
String name = p.toUri().getPath();
if( !keepAbsolutePath &&
name.charAt(0) == '/' ) {
name = name.substring(1);
}
if( fileStatus.isDirectory() ) {
entry.setName(name + "/");
entry.setSize(0);
os.putArchiveEntry(entry);
for( FileStatus child : fs.listStatus(absolutePath) ) {
archive(os, baseDir, new Path(p, child.getPath().getName()),
keepAbsolutePath, optionVerbose );
}
} else {
entry.setName(name);
entry.setSize(fileStatus.getLen());
os.putArchiveEntry(entry);
InputStream in = fs.open(absolutePath);
try {
copyBytes(in, os, getConf().getInt("io.file.buffer.size", 4096),
entry.getSize() );
} finally {
if( in != null) {
in.close();
}
}
os.closeArchiveEntry();
}
}
private Path makeRelativePath(Path baseDir, Path path) throws IOException {
Path p = new Path(baseDir.toUri().relativize(path.toUri()).getPath());
if( p.isAbsolute() ) {
throw new IOException("makeRelativePath failed to get relative path"
+ baseDir + ":" + path );
}
return p;
}
private Path [] getTopSrcPaths(Path curDirPath, List<String> args,
boolean keepAbsolutePath ) throws IOException {
List<Path> listOfPath = new LinkedList<Path>();
for (String arg : args ) {
Path p = new Path(arg);
if( p.isAbsolute() && !keepAbsolutePath ) {
System.err.println("Removing leading '/' from member names");
}
Path absolutePath = p.isAbsolute() ? p : new Path(curDirPath, p);
FileSystem fs = absolutePath.getFileSystem(getConf());
FileStatus [] fsarr = fs.globStatus(absolutePath);
if( fsarr == null || fsarr.length == 0 ) {
throw new IOException("Cannot find " + p );
}
for( FileStatus status : fsarr ) {
Path fileStatusPath = status.getPath();
if( !p.isAbsolute() ) {
fileStatusPath = makeRelativePath(curDirPath, fileStatusPath);
}
listOfPath.add(fileStatusPath);
}
}
return listOfPath.toArray(new Path[0] );
}
private void create(OutputStream os, Path curDirPath, List<String> args,
boolean keepAbsolutePath, boolean optionVerbose )
throws IOException {
TarArchiveOutputStream tos = new TarArchiveOutputStream(os);
// GNU tar extensions are used to store long file names in the archive.
try {
tos.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
tos.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
for (Path path : getTopSrcPaths(curDirPath, args, keepAbsolutePath ) ) {
archive(tos, curDirPath, path, keepAbsolutePath, optionVerbose );
}
} finally {
if( tos != null ) {
tos.close();
}
}
}
private void extract(InputStream in, Path curDirPath,
boolean overwrite, boolean keeppermission,
boolean keepowner, boolean keepgroup,
boolean keepAbsolutePath,
boolean optionVerbose,
boolean dryrun )
throws IOException {
boolean warningPrinted = false;
TarArchiveInputStream tarin;
if( keeppermission ) {
FsPermission.setUMask(getConf(), new FsPermission((short)0));
}
FileSystem ofs = curDirPath.getFileSystem(getConf());
if( ! dryrun ) {
if( !ofs.getFileStatus(curDirPath).isDirectory() ) {
fail("No such directory [" + ofs.makeQualified(curDirPath) + "]");
}
}
tarin = new TarArchiveInputStream(in);
try {
TarArchiveEntry entry;
String name;
while ((entry = tarin.getNextTarEntry()) != null) {
name = entry.getName();
if( optionVerbose ) {
System.err.println(name);
}
if( dryrun ) {
continue;
}
if( name.startsWith("/") && !keepAbsolutePath ) {
if( !warningPrinted ) {
warningPrinted = true;
System.err.println("Removing leading '/' from member names");
}
name = name.substring(1);
}
Path p = new Path(name);
if( ! p.isAbsolute() ) {
p = new Path( curDirPath, p );
}
FsPermission permission = null;
if( keeppermission ) {
permission = new FsPermission((short)entry.getMode());
} else {
permission = defaultPermission;
}
if( entry.isDirectory() ) {
ofs.mkdirs(p, permission);
} else {
OutputStream out = ofs.create(p,
permission,
overwrite,
getConf().getInt("io.file.buffer.size", 4096),
(short)getConf().getInt("dfs.replication", 3),
getConf().getLong("dfs.block.size", 134217728),
null );
try {
copyBytes(tarin, out,
getConf().getInt("io.file.buffer.size", 4096),
entry.getSize() );
} finally {
if( out != null ) {
out.close();
}
}
}
if( keepowner || keepgroup ) {
String username = keepowner ? entry.getUserName() : null;
String groupname = keepgroup ? entry.getGroupName() : null;
ofs.setOwner(p, username, groupname);
}
}
} finally {
if( tarin != null ) {
tarin.close();
}
}
}
public int run(String[] args) throws Exception {
defaultPermission =
FsPermission.getDefault().applyUMask(
FsPermission.getUMask(getConf()));
setupOptions();
CommandLine cmdLine = null;
try{
cmdLine = parser.parse(allOptions, args, false);
} catch(Exception e){
fail(e.getMessage());
}
if( cmdLine.hasOption("help") ){
usage(allOptions);
return 0;
}
if( ((cmdLine.hasOption("create") ? 1:0 ) +
(cmdLine.hasOption("extract") ? 1:0) +
(cmdLine.hasOption("list") ? 1:0 ) ) > 1 ) {
fail("You may not specify create/extract/list at once");
}
if( !cmdLine.hasOption("create")
&& !cmdLine.hasOption("extract")
&& !cmdLine.hasOption("list") ) {
fail("You must specify one of create/extract/list options");
}
String file = (String)cmdLine.getOptionValue("file", "-");
String curDir = (String)cmdLine.getOptionValue("directory", ".");
Path curDirPath = new Path(curDir);
curDirPath = curDirPath.getFileSystem(getConf()).makeQualified(curDirPath);
List<String> list = Arrays.asList(cmdLine.getArgs());
CompressionCodec codec = null;
if( cmdLine.hasOption("compress")) {
codec = (CompressionCodec) ReflectionUtils.newInstance(
GzipCodec.class, getConf());
}
if( cmdLine.hasOption("create") ) {
if( list.size() == 0 ) {
fail("Refusing to create an empty archive" );
}
OutputStream os;
if( file == null ) {
throw new IOException ("outputfile null");
}
if( file.equals("-") ) {
os = System.out;
} else {
Path tarFile = new Path(file);
os = tarFile.getFileSystem(getConf()).create(new Path(file));
}
if( codec != null ) {
os = codec.createOutputStream(os);
}
create(os, curDirPath, list,
cmdLine.hasOption("absolute-names"),
cmdLine.hasOption("verbose") );
} else if ( cmdLine.hasOption("list") || cmdLine.hasOption("extract") ) {
if( list.size() != 0 ) {
fail("Unknown arguments" + list.toString() );
}
InputStream in;
if( file.equals("-") ) {
in = new BufferedInputStream(System.in);
} else {
Path tarfile = new Path(file);
FileSystem ifs = tarfile.getFileSystem(getConf());
in = new BufferedInputStream(ifs.open(tarfile));
}
if( codec != null ) {
in = codec.createInputStream(in);
}
extract(in, curDirPath,
cmdLine.hasOption("overwrite"),
cmdLine.hasOption("preserve-permissions"),
cmdLine.hasOption("same-owner"),
cmdLine.hasOption("same-group"),
cmdLine.hasOption("absolute-names"),
cmdLine.hasOption("verbose") || cmdLine.hasOption("list"),
cmdLine.hasOption("list") );
}
return 0;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(null, new HadoopTar(), args));
}
}