From e737a47270a8e4aa48e4e1764743f8357777b263 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 5 Aug 2020 14:17:40 +0200 Subject: [PATCH] added classes to try to send input stream to zenodo for the upload --- .../oa/graph/dump/InputStreamRequestBody.java | 53 ++++++++++++ .../dhp/oa/graph/dump/SendToZenodoHDFS.java | 80 +++++++++++++++++++ 2 files changed, 133 insertions(+) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/InputStreamRequestBody.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SendToZenodoHDFS.java diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/InputStreamRequestBody.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/InputStreamRequestBody.java new file mode 100644 index 0000000000..9ab4773bea --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/InputStreamRequestBody.java @@ -0,0 +1,53 @@ +package eu.dnetlib.dhp.oa.graph.dump; + +import okhttp3.MediaType; +import okhttp3.RequestBody; +import okhttp3.internal.Util; +import okio.BufferedSink; +import okio.Okio; +import okio.Source; + +import java.io.IOException; +import java.io.InputStream; + +public class InputStreamRequestBody extends RequestBody { + + private InputStream inputStream; + private MediaType mediaType; + + public static RequestBody create(final MediaType mediaType, final InputStream inputStream) { + + + return new InputStreamRequestBody(inputStream, mediaType); + } + + private InputStreamRequestBody(InputStream inputStream, MediaType mediaType) { + this.inputStream = inputStream; + this.mediaType = mediaType; + } + + @Override + public MediaType contentType() { + return mediaType; + } + + @Override + public long contentLength() { + try { + return inputStream.available(); + } catch (IOException e) { + return 0; + } + } + + @Override + public void writeTo(BufferedSink sink) throws IOException { + Source source = null; + try { + source = Okio.source(inputStream); + sink.writeAll(source); + } finally { + Util.closeQuietly(source); + } + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SendToZenodoHDFS.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SendToZenodoHDFS.java new file mode 100644 index 0000000000..38a6516bc0 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SendToZenodoHDFS.java @@ -0,0 +1,80 @@ + +package eu.dnetlib.dhp.oa.graph.dump; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; + +import java.io.File; +import java.io.Serializable; + +public class SendToZenodoHDFS implements Serializable { + + private static final Log log = LogFactory.getLog(SendToZenodoHDFS.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SendToZenodoHDFS.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/upload_zenodo.json"))); + + parser.parseArgument(args); + + final String hdfsPath = parser.get("hdfsPath"); + final String hdfsNameNode = parser.get("hdfsNameNode"); + final String access_token = parser.get("accessToken"); + final String connection_url = parser.get("connectionUrl"); + final String metadata = parser.get("metadata"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + + QueryInformationSystem qis = new QueryInformationSystem(); + qis.setIsLookUp(ISLookupClientFactory.getLookUpService(isLookUpUrl)); + CommunityMap communityMap = qis.getCommunityMap(); + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fileSystem = FileSystem.get(conf); + + RemoteIterator fileStatusListIterator = fileSystem + .listFiles( + new Path(hdfsPath), true); + APIClient apiClient = new APIClient(connection_url, access_token); + apiClient.connect(); + while (fileStatusListIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); + + Path p = fileStatus.getPath(); + String p_string = p.toString(); + if (!p_string.endsWith("_SUCCESS")) { + String tmp = p_string.substring(0, p_string.lastIndexOf("/")); + String community = tmp.substring(tmp.lastIndexOf("/") + 1); + log.info("Sending information for community: " + community); + String community_name = communityMap.get(community).replace(" ", "_") + ".json.gz"; + //log.info("Copying information for community: " + community); + //fileSystem.copyToLocalFile(p, new Path("/tmp/" + community_name)); + //File f = new File("/tmp/" + community_name); + try { + FSDataInputStream inputStream = fileSystem.open(p); + apiClient.uploadIS(inputStream, community_name); + + } catch(Exception e){ + + } + } + + } + + apiClient.sendMretadata(metadata); + apiClient.publish(); + + } + +}