From d4382b54df454f6550efe4c724dddfcadbf99b51 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 3 Nov 2020 16:54:50 +0100 Subject: [PATCH] moved the tar archive with maz size on common module --- .../eu/dnetlib/dhp/common/MakeTarArchive.java | 119 ++++++++++++++++++ .../community_infrastructure_schema.json | 38 ++++++ .../dump_whole/schema/context_schema.json | 38 ------ 3 files changed, 157 insertions(+), 38 deletions(-) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump_whole/schema/community_infrastructure_schema.json delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump_whole/schema/context_schema.json diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java new file mode 100644 index 0000000000..5dff1d0a8f --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java @@ -0,0 +1,119 @@ +package eu.dnetlib.dhp.common; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.hadoop.fs.*; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; + +public class MakeTarArchive implements Serializable { + + private static TarArchiveOutputStream getTar(FileSystem fileSystem, String outputPath) throws IOException { + Path hdfsWritePath = new Path(outputPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fileSystem.delete(hdfsWritePath, true); + + } + fsDataOutputStream = fileSystem.create(hdfsWritePath); + + return new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream()); + } + + private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dir_name) + throws IOException { + + Path hdfsWritePath = new Path(outputPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fileSystem.delete(hdfsWritePath, true); + + } + fsDataOutputStream = fileSystem.create(hdfsWritePath); + + TarArchiveOutputStream ar = new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream()); + + RemoteIterator fileStatusListIterator = fileSystem + .listFiles( + new Path(inputPath), true); + + while (fileStatusListIterator.hasNext()) { + writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, 0); + } + + ar.close(); + } + + public static void tarMaxSize(FileSystem fileSystem, String inputPath, String outputPath, String dir_name, + int gBperSplit) throws IOException { + final long bytesPerSplit = 1024L * 1024L * 1024L * gBperSplit; + + long sourceSize = fileSystem.getContentSummary(new Path(inputPath)).getSpaceConsumed(); + + if (sourceSize < bytesPerSplit) { + write(fileSystem, inputPath, outputPath + ".tar", dir_name); + } else { + int partNum = 0; + + RemoteIterator fileStatusListIterator = fileSystem + .listFiles( + new Path(inputPath), true); + boolean next = fileStatusListIterator.hasNext(); + while (next) { + TarArchiveOutputStream ar = getTar(fileSystem, outputPath + "_" + (partNum + 1) + ".tar"); + + long current_size = 0; + while (next && current_size < bytesPerSplit) { + current_size = writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, current_size); + next = fileStatusListIterator.hasNext(); + + } + + partNum += 1; + ar.close(); + } + + } + + } + + private static long writeCurrentFile(FileSystem fileSystem, String dir_name, + RemoteIterator fileStatusListIterator, + TarArchiveOutputStream ar, long current_size) throws IOException { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); + + Path p = fileStatus.getPath(); + String p_string = p.toString(); + if (!p_string.endsWith("_SUCCESS")) { + String name = p_string.substring(p_string.lastIndexOf("/") + 1); + if (name.trim().equalsIgnoreCase("communities_infrastructures")) { + name = "communities_infrastructures.json"; + } + TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name); + entry.setSize(fileStatus.getLen()); + current_size += fileStatus.getLen(); + ar.putArchiveEntry(entry); + + InputStream is = fileSystem.open(fileStatus.getPath()); + + BufferedInputStream bis = new BufferedInputStream(is); + + int count; + byte data[] = new byte[1024]; + while ((count = bis.read(data, 0, data.length)) != -1) { + ar.write(data, 0, count); + } + bis.close(); + ar.closeArchiveEntry(); + + } + return current_size; + } + + + + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump_whole/schema/community_infrastructure_schema.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump_whole/schema/community_infrastructure_schema.json new file mode 100644 index 0000000000..9fc338224b --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump_whole/schema/community_infrastructure_schema.json @@ -0,0 +1,38 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "description": { + "type": "string", + "description": "Description of the research community/ research infrastructure" + }, + "id": { + "type": "string", + "description": "OpenAIRE id of the research community/ research infrastructure" + }, + "name": { + "type": "string", + "description": "The long name of the community" + }, + "originalId": { + "type": "string", + "description": "The acronym of the community" + }, + "subject": { + "description": "Description of subject", + "type": "array", + "items": { + "type": "string", + "description": "Only for research communities: the list of the subjects associated to the research community" + } + }, + "type": { + "type": "string", + "description": "The type of the record (research community/research infrastructure)" + }, + "zenodo_community": { + "type": "string", + "description": "The id of the Zenodo communities associated to the research community/Research infrastructure" + } + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump_whole/schema/context_schema.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump_whole/schema/context_schema.json deleted file mode 100644 index ba6609a50e..0000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump_whole/schema/context_schema.json +++ /dev/null @@ -1,38 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "description": { - "type": "string", - "description": "Description of description" - }, - "id": { - "type": "string", - "description": "Description of id" - }, - "name": { - "type": "string", - "description": "Description of name" - }, - "originalId": { - "type": "string", - "description": "Description of originalId" - }, - "subject": { - "description": "Description of subject", - "type": "array", - "items": { - "type": "string", - "description": "Description of subject" - } - }, - "type": { - "type": "string", - "description": "Description of type" - }, - "zenodo_community": { - "type": "string", - "description": "Description of zenodo_community" - } - } -} \ No newline at end of file