diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/datacite/DumpExtractor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/datacite/DumpExtractor.java new file mode 100644 index 000000000..b5463ef8e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/datacite/DumpExtractor.java @@ -0,0 +1,109 @@ + +package eu.dnetlib.dhp.collection.datacite; + +import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Objects; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class DumpExtractor { + + private static final Logger log = LoggerFactory.getLogger(DumpExtractor.class); + + public static InputStream createInputStream(FileSystem fileSystem, Path sourcePath) throws IOException { + CompressionCodecFactory factory = new CompressionCodecFactory(fileSystem.getConf()); + CompressionCodec codec = factory.getCodec(sourcePath); + if (codec == null) { + System.err.println("No codec found for " + sourcePath.getName()); + System.exit(1); + } + + return codec.createInputStream(fileSystem.open(sourcePath)); + } + + public static void iterateTar(SequenceFile.Writer sfile, InputStream gzipInputStream) throws IOException { + + int extractedItem = 0; + try (final TarArchiveInputStream tais = new TarArchiveInputStream(gzipInputStream)) { + + TarArchiveEntry entry; + while ((entry = tais.getNextTarEntry()) != null) { + if (entry.isFile()) { + if (sfile != null) { + final Text key = new Text(entry.getName()); + final BufferedReader br = new BufferedReader(new InputStreamReader(tais)); + while (br.ready()) { + sfile.append(key, new Text(br.readLine())); + extractedItem++; + } + if (extractedItem % 100000 == 0) { + log.info("Extracted {} items", extractedItem); + } + } + } + } + } finally { + if (sfile != null) { + sfile.hflush(); + sfile.close(); + } + } + } + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + DumpExtractor.class + .getResourceAsStream( + "/eu/dnetlib/dhp/datacite/extract_datacite_parameter.json")))); + argumentParser.parseArgument(args); + + final String hdfsuri = argumentParser.get("namenode"); + log.info("hdfsURI is {}", hdfsuri); + + final String sourcePath = argumentParser.get("sourcePath"); + log.info("sourcePath is {}", sourcePath); + + final String targetPath = argumentParser.get("targetPath"); + log.info("targetPath is {}", targetPath); + + final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(hdfsuri)); + + final Path sPath = new Path(sourcePath); + + final InputStream gzipInputStream = createInputStream(fileSystem, sPath); + + final SequenceFile.Writer outputFile = SequenceFile + .createWriter( + fileSystem.getConf(), + SequenceFile.Writer.file(new Path(targetPath)), + SequenceFile.Writer.keyClass(Text.class), + SequenceFile.Writer.valueClass(Text.class)); + + iterateTar(outputFile, gzipInputStream); + gzipInputStream.close(); + + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/dump/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/dump/oozie_app/config-default.xml new file mode 100644 index 000000000..dd3c32c62 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/dump/oozie_app/config-default.xml @@ -0,0 +1,23 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + + oozie.launcher.mapreduce.user.classpath.first + true + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/dump/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/dump/oozie_app/workflow.xml new file mode 100644 index 000000000..1b21f7984 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/dump/oozie_app/workflow.xml @@ -0,0 +1,52 @@ + + + + mainPath + the working path of Datacite + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.collection.datacite.DumpExtractor + --namenode${nameNode} + --targetPath${mainPath}/metadata.seq + --sourcePath${mainPath}/datacite.gz + + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.collection.datacite.DumpExtractor + --namenode${nameNode} + --targetPath${mainPath}/links.seq + --sourcePath${mainPath}/pidlinks.gz + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/extract_datacite_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/extract_datacite_parameter.json new file mode 100644 index 000000000..4af371875 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/extract_datacite_parameter.json @@ -0,0 +1,21 @@ +[ + { + "paramName": "n", + "paramLongName": "namenode", + "paramDescription": "the Name Node URI", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "targetPath", + "paramDescription": "the target PATH to extract files", + "paramRequired": true + }, + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the PATH where the tar.gz files were downloaded", + "paramRequired": true + } + +] \ No newline at end of file