From f8db4948dc7708489059dae3e0deb3839f259a42 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Tue, 8 Oct 2024 10:30:54 +0200 Subject: [PATCH] Import main branch code --- .gitignore | 1 + README.md | 18 + pom.xml | 109 ++++ .../java/org/apache/hadoop/tar/HadoopTar.java | 466 ++++++++++++++++++ 4 files changed, 594 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 pom.xml create mode 100644 src/main/java/org/apache/hadoop/tar/HadoopTar.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b83d222 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..f4a8278 --- /dev/null +++ b/README.md @@ -0,0 +1,18 @@ +# Info + +Tar archiver for Hadoop HDFS. + +# Build + + mvn package + sudo mv target/hadoop-tar-*.jar /usr/local/share/hadoop-tar.jar + +# Usage + +Help: + + hadoop jar /usr/local/share/hadoop-tar.jar --help + +Streaming example with s3cmd: + + hadoop jar /usr/local/share/hadoop-tar.jar -v -c hdfs:///user/hawking | pbzip2 | s3cmd --multipart-chunk-size-mb=100 put - s3://hador/user-hawking.tar.bz2 diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..fa1cc38 --- /dev/null +++ b/pom.xml @@ -0,0 +1,109 @@ + + + + 4.0.0 + + hadoop-tar + org.apache.hadoop.tar + 2.0.1-SNAPSHOT + jar + + Hadoop Tar + Tar archiver for Hadoop + + + scm:git:https://gitlab.cesnet.cz/702/HADOOP/tar + scm:git:git@gitlab.cesnet.cz:702/HADOOP/tar.git + https://gitlab.cesnet.cz/702/HADOOP/tar + HEAD + + + + org.apache.hadoop.tar.HadoopTar + UTF-8 + 1.8 + 1.8 + + + + + zcu-releases + University of West Bohemia Releases + https://maven.civ.zcu.cz/repository/maven-releases/ + + + zcu-snapshots + University of West Bohemia Snapshots + https://maven.civ.zcu.cz/repository/maven-snapshots/ + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + -Xlint:all + ${project.source.version} + ${project.target.version} + + + + org.apache.maven.plugins + maven-deploy-plugin + 3.0.0-M1 + + + org.apache.maven.plugins + maven-jar-plugin + 3.2.0 + + + + ${exec.mainClass} + + + + + + org.apache.maven.plugins + maven-release-plugin + 3.0.0-M4 + + false + package install + true + [Release] + false + @{version} + + + + + + + + commons-cli + commons-cli + 1.2 + + + commons-io + commons-io + 2.6 + + + org.apache.commons + commons-compress + 1.4.1 + + + org.apache.hadoop + hadoop-common + 3.0.0 + + + + diff --git a/src/main/java/org/apache/hadoop/tar/HadoopTar.java b/src/main/java/org/apache/hadoop/tar/HadoopTar.java new file mode 100644 index 0000000..d8a92b5 --- /dev/null +++ b/src/main/java/org/apache/hadoop/tar/HadoopTar.java @@ -0,0 +1,466 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tar; + +import java.io.BufferedInputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.io.IOException; +import java.util.List; +import java.util.Arrays; +import java.util.LinkedList; + +import org.apache.commons.cli.PosixParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; + +import org.apache.commons.logging.*; + +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; + +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; + + +public class HadoopTar extends Configured implements Tool { + private Options allOptions; + private CommandLineParser parser ; + private FsPermission defaultPermission; + + public static final Log LOG = LogFactory.getLog(HadoopTar.class); + + @SuppressWarnings("static-access") + private Option createBoolOption(String name, String longopt, String desc){ + if( name.isEmpty() ){ + return OptionBuilder.withDescription(desc) + .withLongOpt(longopt) + .create(); + } else { + return OptionBuilder.withDescription(desc) + .withLongOpt(longopt) + .create(name); + } + } + + @SuppressWarnings("static-access") + private Option createOption(String name, String longopt, + String desc, + String argName, int max, + boolean required){ + if( name.isEmpty() ){ + return OptionBuilder + .withArgName(argName) + .hasArgs(max) + .withDescription(desc) + .isRequired(required) + .withLongOpt(longopt) + .create(); + } else { + return OptionBuilder + .withArgName(argName) + .hasArgs(max) + .withDescription(desc) + .isRequired(required) + .withLongOpt(longopt) + .create(name); + } + } + + private void setupOptions() { + Option verbose = createBoolOption("v", "verbose", "print verbose output"); + Option create = createBoolOption("c", "create", "create a new archive" ); + Option help = createBoolOption("", "help", "show help message" ); + Option extract = createBoolOption("x", "extract", + "extract files from an archive"); + Option list = createBoolOption("t", "list", "list files from an archive"); + Option overwrite = createBoolOption("", "overwrite", + "overwrite existing directory"); + Option sameowner = createBoolOption("", "same-owner", + "create extracted files with the same ownership"); + Option samegroup = createBoolOption("", "same-group", + "create extracted files with the same group id"); + Option absolutepath = createBoolOption("P", "absolute-names", + "don't strip leading / from file name"); + Option compress = createBoolOption("z", "compress", + "filter the archive through compress/uncompress gzip"); + Option nosamepermission = createBoolOption("p", "preserve-permissions", + "apply recorded permissions instead of applying user's umask when extracting files"); + Option directory = createOption("C", "directory", + "Set the working directory to DIR", + "DIR", 1, false); + Option file = createOption("f", "file", + "Use archive file (default '-' for stdin/stdout)", + "FILE", 1, false); + allOptions = new Options(). + addOption(verbose). + addOption(create). + addOption(extract). + addOption(overwrite). + addOption(directory). + addOption(nosamepermission). + addOption(list). + addOption(sameowner). + addOption(samegroup). + addOption(absolutepath). + addOption(help). + addOption(compress). + addOption(file); + parser = new PosixParser(); + } + + private void usage(Options options) { + HelpFormatter help = new HelpFormatter(); + help.printHelp("hadoop jar hadoop-tar.jar [options] ", options); + } + + private void fail(String msg) { + LOG.error(msg); + LOG.error(""); + usage(allOptions); + System.exit(1); + } + + //Almost a copy of IOUtils.java:copyBytes + //except it explicitly set how many bytes to copy + private void copyBytes(InputStream in, OutputStream out, + int buffSize, long size ) throws IOException { + if( size == 0 ) { + return; + } + PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; + byte buf[] = new byte[buffSize]; + long bytesToBeCopied = size; + int len; + int bytesRead; + do { + len = bytesToBeCopied < buffSize ? (int)bytesToBeCopied : buffSize ; + bytesRead = in.read(buf, 0, len); + out.write(buf, 0, bytesRead); + if ((ps != null) && ps.checkError()) { + throw new IOException("Unable to write to output stream."); + } + bytesToBeCopied -= bytesRead; + } while (bytesToBeCopied > 0 ) ; + } + + private void archive(TarArchiveOutputStream os, Path baseDir, Path p, + boolean keepAbsolutePath, boolean optionVerbose ) + throws IOException { + if( optionVerbose ) { + System.err.println(p.toString() ); + } + + TarArchiveEntry entry = new TarArchiveEntry(p.getName()); + + Path absolutePath = p.isAbsolute() ? p : new Path(baseDir, p); + FileSystem fs = absolutePath.getFileSystem(getConf()); + FileStatus fileStatus = fs.getFileStatus(absolutePath); + + entry.setNames(fileStatus.getOwner(), fileStatus.getGroup()); + entry.setMode(fileStatus.getPermission().toShort()); + entry.setModTime(fileStatus.getModificationTime()); + + String name = p.toUri().getPath(); + if( !keepAbsolutePath && + name.charAt(0) == '/' ) { + name = name.substring(1); + } + if( fileStatus.isDirectory() ) { + entry.setName(name + "/"); + entry.setSize(0); + os.putArchiveEntry(entry); + for( FileStatus child : fs.listStatus(absolutePath) ) { + archive(os, baseDir, new Path(p, child.getPath().getName()), + keepAbsolutePath, optionVerbose ); + } + } else { + entry.setName(name); + entry.setSize(fileStatus.getLen()); + os.putArchiveEntry(entry); + InputStream in = fs.open(absolutePath); + try { + copyBytes(in, os, getConf().getInt("io.file.buffer.size", 4096), + entry.getSize() ); + } finally { + if( in != null) { + in.close(); + } + } + os.closeArchiveEntry(); + } + } + + private Path makeRelativePath(Path baseDir, Path path) throws IOException { + Path p = new Path(baseDir.toUri().relativize(path.toUri()).getPath()); + if( p.isAbsolute() ) { + throw new IOException("makeRelativePath failed to get relative path" + + baseDir + ":" + path ); + } + return p; + } + + private Path [] getTopSrcPaths(Path curDirPath, List args, + boolean keepAbsolutePath ) throws IOException { + List listOfPath = new LinkedList(); + for (String arg : args ) { + Path p = new Path(arg); + if( p.isAbsolute() && !keepAbsolutePath ) { + System.err.println("Removing leading '/' from member names"); + } + Path absolutePath = p.isAbsolute() ? p : new Path(curDirPath, p); + FileSystem fs = absolutePath.getFileSystem(getConf()); + FileStatus [] fsarr = fs.globStatus(absolutePath); + if( fsarr == null || fsarr.length == 0 ) { + throw new IOException("Cannot find " + p ); + } + for( FileStatus status : fsarr ) { + Path fileStatusPath = status.getPath(); + if( !p.isAbsolute() ) { + fileStatusPath = makeRelativePath(curDirPath, fileStatusPath); + } + listOfPath.add(fileStatusPath); + } + } + return listOfPath.toArray(new Path[0] ); + } + + private void create(OutputStream os, Path curDirPath, List args, + boolean keepAbsolutePath, boolean optionVerbose ) + throws IOException { + + TarArchiveOutputStream tos = new TarArchiveOutputStream(os); + // GNU tar extensions are used to store long file names in the archive. + try { + tos.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX); + tos.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU); + for (Path path : getTopSrcPaths(curDirPath, args, keepAbsolutePath ) ) { + archive(tos, curDirPath, path, keepAbsolutePath, optionVerbose ); + } + } finally { + if( tos != null ) { + tos.close(); + } + } + } + + + private void extract(InputStream in, Path curDirPath, + boolean overwrite, boolean keeppermission, + boolean keepowner, boolean keepgroup, + boolean keepAbsolutePath, + boolean optionVerbose, + boolean dryrun ) + throws IOException { + boolean warningPrinted = false; + TarArchiveInputStream tarin; + if( keeppermission ) { + FsPermission.setUMask(getConf(), new FsPermission((short)0)); + } + + FileSystem ofs = curDirPath.getFileSystem(getConf()); + + if( ! dryrun ) { + if( !ofs.getFileStatus(curDirPath).isDirectory() ) { + fail("No such directory [" + ofs.makeQualified(curDirPath) + "]"); + } + } + + tarin = new TarArchiveInputStream(in); + + + try { + TarArchiveEntry entry; + String name; + while ((entry = tarin.getNextTarEntry()) != null) { + name = entry.getName(); + + if( optionVerbose ) { + System.err.println(name); + } + if( dryrun ) { + continue; + } + + if( name.startsWith("/") && !keepAbsolutePath ) { + if( !warningPrinted ) { + warningPrinted = true; + System.err.println("Removing leading '/' from member names"); + } + name = name.substring(1); + } + Path p = new Path(name); + + if( ! p.isAbsolute() ) { + p = new Path( curDirPath, p ); + } + + FsPermission permission = null; + if( keeppermission ) { + permission = new FsPermission((short)entry.getMode()); + } else { + permission = defaultPermission; + } + if( entry.isDirectory() ) { + ofs.mkdirs(p, permission); + } else { + OutputStream out = ofs.create(p, + permission, + overwrite, + getConf().getInt("io.file.buffer.size", 4096), + (short)getConf().getInt("dfs.replication", 3), + getConf().getLong("dfs.block.size", 134217728), + null ); + try { + copyBytes(tarin, out, + getConf().getInt("io.file.buffer.size", 4096), + entry.getSize() ); + } finally { + if( out != null ) { + out.close(); + } + } + } + if( keepowner || keepgroup ) { + String username = keepowner ? entry.getUserName() : null; + String groupname = keepgroup ? entry.getGroupName() : null; + ofs.setOwner(p, username, groupname); + } + } + } finally { + if( tarin != null ) { + tarin.close(); + } + } + } + + public int run(String[] args) throws Exception { + defaultPermission = + FsPermission.getDefault().applyUMask( + FsPermission.getUMask(getConf())); + setupOptions(); + CommandLine cmdLine = null; + try{ + cmdLine = parser.parse(allOptions, args, false); + } catch(Exception e){ + fail(e.getMessage()); + } + if( cmdLine.hasOption("help") ){ + usage(allOptions); + return 0; + } + + if( ((cmdLine.hasOption("create") ? 1:0 ) + + (cmdLine.hasOption("extract") ? 1:0) + + (cmdLine.hasOption("list") ? 1:0 ) ) > 1 ) { + fail("You may not specify create/extract/list at once"); + } + if( !cmdLine.hasOption("create") + && !cmdLine.hasOption("extract") + && !cmdLine.hasOption("list") ) { + fail("You must specify one of create/extract/list options"); + } + + String file = (String)cmdLine.getOptionValue("file", "-"); + String curDir = (String)cmdLine.getOptionValue("directory", "."); + + Path curDirPath = new Path(curDir); + curDirPath = curDirPath.getFileSystem(getConf()).makeQualified(curDirPath); + + List list = Arrays.asList(cmdLine.getArgs()); + + CompressionCodec codec = null; + if( cmdLine.hasOption("compress")) { + codec = (CompressionCodec) ReflectionUtils.newInstance( + GzipCodec.class, getConf()); + } + + if( cmdLine.hasOption("create") ) { + if( list.size() == 0 ) { + fail("Refusing to create an empty archive" ); + } + OutputStream os; + if( file == null ) { + throw new IOException ("outputfile null"); + } + + if( file.equals("-") ) { + os = System.out; + } else { + Path tarFile = new Path(file); + os = tarFile.getFileSystem(getConf()).create(new Path(file)); + } + if( codec != null ) { + os = codec.createOutputStream(os); + } + + create(os, curDirPath, list, + cmdLine.hasOption("absolute-names"), + cmdLine.hasOption("verbose") ); + + } else if ( cmdLine.hasOption("list") || cmdLine.hasOption("extract") ) { + if( list.size() != 0 ) { + fail("Unknown arguments" + list.toString() ); + } + InputStream in; + if( file.equals("-") ) { + in = new BufferedInputStream(System.in); + } else { + Path tarfile = new Path(file); + FileSystem ifs = tarfile.getFileSystem(getConf()); + in = new BufferedInputStream(ifs.open(tarfile)); + } + if( codec != null ) { + in = codec.createInputStream(in); + } + extract(in, curDirPath, + cmdLine.hasOption("overwrite"), + cmdLine.hasOption("preserve-permissions"), + cmdLine.hasOption("same-owner"), + cmdLine.hasOption("same-group"), + cmdLine.hasOption("absolute-names"), + cmdLine.hasOption("verbose") || cmdLine.hasOption("list"), + cmdLine.hasOption("list") ); + } + return 0; + } + + + + + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(null, new HadoopTar(), args)); + } +} \ No newline at end of file