diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/zenodo/MakeTar.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/zenodo/MakeTar.java new file mode 100644 index 000000000..95bea74a2 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/zenodo/MakeTar.java @@ -0,0 +1,111 @@ + +package eu.dnetlib.dhp.export.zenodo; + +import java.io.*; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class MakeTar implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(MakeTar.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + MakeTar.class + .getResourceAsStream( + "/eu/dnetlib/dhp/export/input_maketar_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + final String outputPath = parser.get("targetPath"); + log.info("hdfsPath: {}", outputPath); + + final String hdfsNameNode = parser.get("nameNode"); + log.info("nameNode: {}", hdfsNameNode); + + final String inputPath = parser.get("sourcePath"); + log.info("input path : {}", inputPath); + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fileSystem = FileSystem.get(conf); + + makeTArArchive(fileSystem, inputPath, outputPath); + + } + + public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath) throws IOException { + + RemoteIterator dir_iterator = fileSystem.listLocatedStatus(new Path(inputPath)); + + while (dir_iterator.hasNext()) { + LocatedFileStatus fileStatus = dir_iterator.next(); + + Path p = fileStatus.getPath(); + String p_string = p.toString(); + String entity = p_string.substring(p_string.lastIndexOf("/") + 1); + + write(fileSystem, p_string, outputPath + "/" + entity + ".tar", entity); + } + + } + + private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dir_name) + throws IOException { + + Path hdfsWritePath = new Path(outputPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fileSystem.delete(hdfsWritePath, true); + + } + fsDataOutputStream = fileSystem.create(hdfsWritePath); + + TarArchiveOutputStream ar = new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream()); + + RemoteIterator fileStatusListIterator = fileSystem + .listFiles( + new Path(inputPath), true); + + while (fileStatusListIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); + + Path p = fileStatus.getPath(); + String p_string = p.toString(); + if (!p_string.endsWith("_SUCCESS")) { + String name = p_string.substring(p_string.lastIndexOf("/") + 1); + TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name + ".json.gz"); + entry.setSize(fileStatus.getLen()); + ar.putArchiveEntry(entry); + + InputStream is = fileSystem.open(fileStatus.getPath()); + + BufferedInputStream bis = new BufferedInputStream(is); + + int count; + byte data[] = new byte[1024]; + while ((count = bis.read(data, 0, data.length)) != -1) { + ar.write(data, 0, count); + } + bis.close(); + ar.closeArchiveEntry(); + + } + + } + + ar.close(); + } + +} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/export/input_maketar_parameters.json b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/export/input_maketar_parameters.json new file mode 100644 index 000000000..6d90ced2c --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/export/input_maketar_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "n", + "paramLongName": "nameNode", + "paramDescription": "the Name Node", + "paramRequired": true + }, + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the source path", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "targetPath", + "paramDescription": "the target path", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/sx/zenodo/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/sx/zenodo/oozie_app/config-default.xml new file mode 100644 index 000000000..59e5c059f --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/sx/zenodo/oozie_app/config-default.xml @@ -0,0 +1,42 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + oozie.wf.rerun.failnodes + false + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + spark2ExtraListeners + "com.cloudera.spark.lineage.NavigatorAppListener" + + + spark2SqlQueryExecutionListeners + "com.cloudera.spark.lineage.NavigatorQueryListener" + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/sx/zenodo/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/sx/zenodo/oozie_app/workflow.xml new file mode 100644 index 000000000..ec536de1f --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/sx/zenodo/oozie_app/workflow.xml @@ -0,0 +1,33 @@ + + + + sourcePath + the source path + + + targetPath + the target path + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.export.zenodo.MakeTar + -t${targetPath} + -n${nameNode} + -s${sourcePath} + + + + + + + \ No newline at end of file