added workflow for extracting datacite DUMP
This commit is contained in:
parent
a02f3f0d2b
commit
7d1faea135
|
@ -0,0 +1,109 @@
|
|||
|
||||
package eu.dnetlib.dhp.collection.datacite;
|
||||
|
||||
import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
||||
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
|
||||
public class DumpExtractor {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(DumpExtractor.class);
|
||||
|
||||
public static InputStream createInputStream(FileSystem fileSystem, Path sourcePath) throws IOException {
|
||||
CompressionCodecFactory factory = new CompressionCodecFactory(fileSystem.getConf());
|
||||
CompressionCodec codec = factory.getCodec(sourcePath);
|
||||
if (codec == null) {
|
||||
System.err.println("No codec found for " + sourcePath.getName());
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
return codec.createInputStream(fileSystem.open(sourcePath));
|
||||
}
|
||||
|
||||
public static void iterateTar(SequenceFile.Writer sfile, InputStream gzipInputStream) throws IOException {
|
||||
|
||||
int extractedItem = 0;
|
||||
try (final TarArchiveInputStream tais = new TarArchiveInputStream(gzipInputStream)) {
|
||||
|
||||
TarArchiveEntry entry;
|
||||
while ((entry = tais.getNextTarEntry()) != null) {
|
||||
if (entry.isFile()) {
|
||||
if (sfile != null) {
|
||||
final Text key = new Text(entry.getName());
|
||||
final BufferedReader br = new BufferedReader(new InputStreamReader(tais));
|
||||
while (br.ready()) {
|
||||
sfile.append(key, new Text(br.readLine()));
|
||||
extractedItem++;
|
||||
}
|
||||
if (extractedItem % 100000 == 0) {
|
||||
log.info("Extracted {} items", extractedItem);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (sfile != null) {
|
||||
sfile.hflush();
|
||||
sfile.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
Objects
|
||||
.requireNonNull(
|
||||
DumpExtractor.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/datacite/extract_datacite_parameter.json"))));
|
||||
argumentParser.parseArgument(args);
|
||||
|
||||
final String hdfsuri = argumentParser.get("namenode");
|
||||
log.info("hdfsURI is {}", hdfsuri);
|
||||
|
||||
final String sourcePath = argumentParser.get("sourcePath");
|
||||
log.info("sourcePath is {}", sourcePath);
|
||||
|
||||
final String targetPath = argumentParser.get("targetPath");
|
||||
log.info("targetPath is {}", targetPath);
|
||||
|
||||
final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(hdfsuri));
|
||||
|
||||
final Path sPath = new Path(sourcePath);
|
||||
|
||||
final InputStream gzipInputStream = createInputStream(fileSystem, sPath);
|
||||
|
||||
final SequenceFile.Writer outputFile = SequenceFile
|
||||
.createWriter(
|
||||
fileSystem.getConf(),
|
||||
SequenceFile.Writer.file(new Path(targetPath)),
|
||||
SequenceFile.Writer.keyClass(Text.class),
|
||||
SequenceFile.Writer.valueClass(Text.class));
|
||||
|
||||
iterateTar(outputFile, gzipInputStream);
|
||||
gzipInputStream.close();
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,52 @@
|
|||
<workflow-app name="collect_dump" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>mainPath</name>
|
||||
<description>the working path of Datacite</description>
|
||||
</property>
|
||||
|
||||
</parameters>
|
||||
|
||||
<start to="extractDump"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
|
||||
<action name="extractDump">
|
||||
<java>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
</configuration>
|
||||
<main-class>eu.dnetlib.dhp.collection.datacite.DumpExtractor</main-class>
|
||||
<arg>--namenode</arg><arg>${nameNode}</arg>
|
||||
<arg>--targetPath</arg><arg>${mainPath}/metadata.seq</arg>
|
||||
<arg>--sourcePath</arg><arg>${mainPath}/datacite.gz</arg>
|
||||
</java>
|
||||
<ok to="extractRelation"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="extractRelation">
|
||||
<java>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
</configuration>
|
||||
<main-class>eu.dnetlib.dhp.collection.datacite.DumpExtractor</main-class>
|
||||
<arg>--namenode</arg><arg>${nameNode}</arg>
|
||||
<arg>--targetPath</arg><arg>${mainPath}/links.seq</arg>
|
||||
<arg>--sourcePath</arg><arg>${mainPath}/pidlinks.gz</arg>
|
||||
</java>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -0,0 +1,21 @@
|
|||
[
|
||||
{
|
||||
"paramName": "n",
|
||||
"paramLongName": "namenode",
|
||||
"paramDescription": "the Name Node URI",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "t",
|
||||
"paramLongName": "targetPath",
|
||||
"paramDescription": "the target PATH to extract files",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "s",
|
||||
"paramLongName": "sourcePath",
|
||||
"paramDescription": "the PATH where the tar.gz files were downloaded",
|
||||
"paramRequired": true
|
||||
}
|
||||
|
||||
]
|
Loading…
Reference in New Issue