forked from antonis.lempesis/dnet-hadoop
moved in common. Zenodo response model and APIClient to deposit in Zenodo
This commit is contained in:
parent
b6b5498a9a
commit
545ea9f77e
|
@ -0,0 +1,53 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.common.api;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
|
||||||
|
import okhttp3.MediaType;
|
||||||
|
import okhttp3.RequestBody;
|
||||||
|
import okhttp3.internal.Util;
|
||||||
|
import okio.BufferedSink;
|
||||||
|
import okio.Okio;
|
||||||
|
import okio.Source;
|
||||||
|
|
||||||
|
public class InputStreamRequestBody extends RequestBody {
|
||||||
|
|
||||||
|
private InputStream inputStream;
|
||||||
|
private MediaType mediaType;
|
||||||
|
private long lenght;
|
||||||
|
|
||||||
|
public static RequestBody create(final MediaType mediaType, final InputStream inputStream, final long len) {
|
||||||
|
|
||||||
|
return new InputStreamRequestBody(inputStream, mediaType, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
private InputStreamRequestBody(InputStream inputStream, MediaType mediaType, long len) {
|
||||||
|
this.inputStream = inputStream;
|
||||||
|
this.mediaType = mediaType;
|
||||||
|
this.lenght = len;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MediaType contentType() {
|
||||||
|
return mediaType;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long contentLength() {
|
||||||
|
|
||||||
|
return lenght;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeTo(BufferedSink sink) throws IOException {
|
||||||
|
Source source = null;
|
||||||
|
try {
|
||||||
|
source = Okio.source(inputStream);
|
||||||
|
sink.writeAll(source);
|
||||||
|
} finally {
|
||||||
|
Util.closeQuietly(source);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,7 @@
|
||||||
|
package eu.dnetlib.dhp.common.api;
|
||||||
|
|
||||||
|
public class MissingConceptDoiException extends Throwable {
|
||||||
|
public MissingConceptDoiException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,15 +1,17 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump;
|
package eu.dnetlib.dhp.common.api;
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import com.google.gson.Gson;
|
import com.google.gson.Gson;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.zenodo.ZenodoModel;
|
import eu.dnetlib.dhp.common.api.zenodo.ZenodoModel;
|
||||||
|
import eu.dnetlib.dhp.common.api.zenodo.ZenodoModelList;
|
||||||
import okhttp3.*;
|
import okhttp3.*;
|
||||||
|
|
||||||
public class APIClient implements Serializable {
|
public class ZenodoAPIClient implements Serializable {
|
||||||
|
|
||||||
|
|
||||||
String urlString;
|
String urlString;
|
||||||
String bucket;
|
String bucket;
|
||||||
|
@ -17,6 +19,7 @@ public class APIClient implements Serializable {
|
||||||
String deposition_id;
|
String deposition_id;
|
||||||
String access_token;
|
String access_token;
|
||||||
|
|
||||||
|
|
||||||
public static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8");
|
public static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8");
|
||||||
|
|
||||||
private static final MediaType MEDIA_TYPE_ZIP = MediaType.parse("application/zip");
|
private static final MediaType MEDIA_TYPE_ZIP = MediaType.parse("application/zip");
|
||||||
|
@ -37,13 +40,20 @@ public class APIClient implements Serializable {
|
||||||
this.bucket = bucket;
|
this.bucket = bucket;
|
||||||
}
|
}
|
||||||
|
|
||||||
public APIClient(String urlString, String access_token) throws IOException {
|
public void setDeposition_id(String deposition_id){this.deposition_id = deposition_id;}
|
||||||
|
|
||||||
|
public ZenodoAPIClient(String urlString, String access_token) throws IOException {
|
||||||
|
|
||||||
this.urlString = urlString;
|
this.urlString = urlString;
|
||||||
this.access_token = access_token;
|
this.access_token = access_token;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int connect() throws IOException {
|
/**
|
||||||
|
* Brand new deposition in Zenodo. It sets the deposition_id and the bucket where to store the files to upload
|
||||||
|
* @return response code
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public int newDeposition() throws IOException {
|
||||||
String json = "{}";
|
String json = "{}";
|
||||||
OkHttpClient httpClient = new OkHttpClient();
|
OkHttpClient httpClient = new OkHttpClient();
|
||||||
|
|
||||||
|
@ -74,51 +84,36 @@ public class APIClient implements Serializable {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public int upload(File file, String file_name) {
|
/**
|
||||||
|
* Upload files in Zenodo.
|
||||||
|
* @param is the inputStream for the file to upload
|
||||||
|
* @param file_name the name of the file as it will appear on Zenodo
|
||||||
|
* @param len the size of the file
|
||||||
|
* @return the response code
|
||||||
|
*/
|
||||||
|
public int uploadIS(InputStream is, String file_name, long len) throws IOException {
|
||||||
OkHttpClient httpClient = new OkHttpClient();
|
OkHttpClient httpClient = new OkHttpClient();
|
||||||
|
|
||||||
Request request = new Request.Builder()
|
Request request = new Request.Builder()
|
||||||
.url(bucket + "/" + file_name)
|
.url(bucket + "/" + file_name)
|
||||||
.addHeader("Content-Type", "application/zip") // add request headers
|
.addHeader("Content-Type", "application/zip") // add request headers
|
||||||
.addHeader("Authorization", "Bearer " + access_token)
|
.addHeader("Authorization", "Bearer " + access_token)
|
||||||
.put(RequestBody.create(MEDIA_TYPE_ZIP, file))
|
.put(InputStreamRequestBody.create(MEDIA_TYPE_ZIP, is, len))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
try (Response response = httpClient.newCall(request).execute()) {
|
try (Response response = httpClient.newCall(request).execute()) {
|
||||||
if (!response.isSuccessful())
|
if (!response.isSuccessful())
|
||||||
throw new IOException("Unexpected code " + response + response.body().string());
|
throw new IOException("Unexpected code " + response + response.body().string());
|
||||||
return response.code();
|
return response.code();
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int uploadIS(InputStream is, String file_name){
|
|
||||||
OkHttpClient httpClient = new OkHttpClient();
|
|
||||||
|
|
||||||
Request request = new Request.Builder()
|
|
||||||
.url(bucket + "/" + file_name)
|
|
||||||
.addHeader("Content-Type", "application/zip") // add request headers
|
|
||||||
.addHeader("Authorization", "Bearer " + access_token)
|
|
||||||
.put(InputStreamRequestBody.create(MEDIA_TYPE_ZIP, is))
|
|
||||||
.build();
|
|
||||||
|
|
||||||
try (Response response = httpClient.newCall(request).execute()) {
|
|
||||||
if (!response.isSuccessful())
|
|
||||||
throw new IOException("Unexpected code " + response + response.body().string());
|
|
||||||
return response.code();
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Associates metadata information to the current deposition
|
||||||
|
* @param metadata the metadata
|
||||||
|
* @return response code
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
public int sendMretadata(String metadata) throws IOException {
|
public int sendMretadata(String metadata) throws IOException {
|
||||||
|
|
||||||
OkHttpClient httpClient = new OkHttpClient();
|
OkHttpClient httpClient = new OkHttpClient();
|
||||||
|
@ -143,6 +138,11 @@ public class APIClient implements Serializable {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* To publish the current deposition. It works for both new deposition or new version of an old deposition
|
||||||
|
* @return
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
public int publish() throws IOException {
|
public int publish() throws IOException {
|
||||||
|
|
||||||
String json = "{}";
|
String json = "{}";
|
||||||
|
@ -165,7 +165,104 @@ public class APIClient implements Serializable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// public int connect() throws IOException {
|
/**
|
||||||
|
* To create a new version of an already published deposition.
|
||||||
|
* It sets the deposition_id and the bucket to be used for the new version.
|
||||||
|
* @param concept_rec_id the concept record id of the deposition for which to create a new version. It is
|
||||||
|
* the last part of the url for the DOI Zenodo suggest to use to cite all versions:
|
||||||
|
* DOI: 10.xxx/zenodo.656930 concept_rec_id = 656930
|
||||||
|
* @return response code
|
||||||
|
* @throws IOException
|
||||||
|
* @throws MissingConceptDoiException
|
||||||
|
*/
|
||||||
|
public int newVersion(String concept_rec_id) throws IOException, MissingConceptDoiException {
|
||||||
|
setDepositionId(concept_rec_id);
|
||||||
|
String json = "{}";
|
||||||
|
|
||||||
|
OkHttpClient httpClient = new OkHttpClient();
|
||||||
|
|
||||||
|
Request request = new Request.Builder()
|
||||||
|
.url(urlString + "/" + deposition_id + "/actions/newversion")
|
||||||
|
.addHeader("Authorization", "Bearer " + access_token)
|
||||||
|
.post(RequestBody.create(MEDIA_TYPE_JSON, json))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
try (Response response = httpClient.newCall(request).execute()) {
|
||||||
|
|
||||||
|
if (!response.isSuccessful())
|
||||||
|
throw new IOException("Unexpected code " + response + response.body().string());
|
||||||
|
|
||||||
|
ZenodoModel zenodoModel = new Gson().fromJson(response.body().string(), ZenodoModel.class);
|
||||||
|
String latest_draft = zenodoModel.getLinks().getLatest_draft();
|
||||||
|
deposition_id = latest_draft.substring(latest_draft.lastIndexOf("/") + 1);
|
||||||
|
bucket = getBucket(latest_draft);
|
||||||
|
return response.code();
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setDepositionId(String concept_rec_id) throws IOException, MissingConceptDoiException {
|
||||||
|
|
||||||
|
ZenodoModelList zenodoModelList = new Gson().fromJson(getPrevDepositions(), ZenodoModelList.class);
|
||||||
|
|
||||||
|
for(ZenodoModel zm : zenodoModelList){
|
||||||
|
if (zm.getConceptrecid().equals(concept_rec_id)){
|
||||||
|
deposition_id = zm.getId();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new MissingConceptDoiException("The concept record id specified was missing in the list of depositions");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getPrevDepositions() throws IOException {
|
||||||
|
OkHttpClient httpClient = new OkHttpClient();
|
||||||
|
|
||||||
|
Request request = new Request.Builder()
|
||||||
|
.url(urlString)
|
||||||
|
.addHeader("Content-Type", "application/json") // add request headers
|
||||||
|
.addHeader("Authorization", "Bearer " + access_token)
|
||||||
|
.get()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
try (Response response = httpClient.newCall(request).execute()) {
|
||||||
|
|
||||||
|
if (!response.isSuccessful())
|
||||||
|
throw new IOException("Unexpected code " + response + response.body().string());
|
||||||
|
|
||||||
|
return response.body().string();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getBucket(String url) throws IOException {
|
||||||
|
OkHttpClient httpClient = new OkHttpClient();
|
||||||
|
|
||||||
|
Request request = new Request.Builder()
|
||||||
|
.url(url)
|
||||||
|
.addHeader("Content-Type", "application/json") // add request headers
|
||||||
|
.addHeader("Authorization", "Bearer " + access_token)
|
||||||
|
.get()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
try (Response response = httpClient.newCall(request).execute()) {
|
||||||
|
|
||||||
|
if (!response.isSuccessful())
|
||||||
|
throw new IOException("Unexpected code " + response + response.body().string());
|
||||||
|
|
||||||
|
// Get response body
|
||||||
|
ZenodoModel zenodoModel = new Gson().fromJson(response.body().string(), ZenodoModel.class);
|
||||||
|
|
||||||
|
|
||||||
|
return zenodoModel.getLinks().getBucket();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// public int newDeposition() throws IOException {
|
||||||
//
|
//
|
||||||
// String json = "{}";
|
// String json = "{}";
|
||||||
//
|
//
|
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.zenodo;
|
package eu.dnetlib.dhp.common.api.zenodo;
|
||||||
|
|
||||||
public class Community {
|
public class Community {
|
||||||
private String identifier;
|
private String identifier;
|
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.zenodo;
|
package eu.dnetlib.dhp.common.api.zenodo;
|
||||||
|
|
||||||
public class Creator {
|
public class Creator {
|
||||||
private String affiliation;
|
private String affiliation;
|
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.zenodo;
|
package eu.dnetlib.dhp.common.api.zenodo;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.zenodo;
|
package eu.dnetlib.dhp.common.api.zenodo;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.zenodo;
|
package eu.dnetlib.dhp.common.api.zenodo;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.zenodo;
|
package eu.dnetlib.dhp.common.api.zenodo;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.List;
|
import java.util.List;
|
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.zenodo;
|
package eu.dnetlib.dhp.common.api.zenodo;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.zenodo;
|
package eu.dnetlib.dhp.common.api.zenodo;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.zenodo;
|
package eu.dnetlib.dhp.common.api.zenodo;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.List;
|
import java.util.List;
|
|
@ -0,0 +1,6 @@
|
||||||
|
package eu.dnetlib.dhp.common.api.zenodo;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
public class ZenodoModelList extends ArrayList<ZenodoModel> {
|
||||||
|
}
|
|
@ -0,0 +1,4 @@
|
||||||
|
package eu.dnetlib.dhp.common.api;
|
||||||
|
|
||||||
|
public class ZenodoAPIClientTest {
|
||||||
|
}
|
|
@ -0,0 +1 @@
|
||||||
|
This is a test for a new version of an old deposition
|
|
@ -1,53 +0,0 @@
|
||||||
package eu.dnetlib.dhp.oa.graph.dump;
|
|
||||||
|
|
||||||
import okhttp3.MediaType;
|
|
||||||
import okhttp3.RequestBody;
|
|
||||||
import okhttp3.internal.Util;
|
|
||||||
import okio.BufferedSink;
|
|
||||||
import okio.Okio;
|
|
||||||
import okio.Source;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
|
|
||||||
public class InputStreamRequestBody extends RequestBody {
|
|
||||||
|
|
||||||
private InputStream inputStream;
|
|
||||||
private MediaType mediaType;
|
|
||||||
|
|
||||||
public static RequestBody create(final MediaType mediaType, final InputStream inputStream) {
|
|
||||||
|
|
||||||
|
|
||||||
return new InputStreamRequestBody(inputStream, mediaType);
|
|
||||||
}
|
|
||||||
|
|
||||||
private InputStreamRequestBody(InputStream inputStream, MediaType mediaType) {
|
|
||||||
this.inputStream = inputStream;
|
|
||||||
this.mediaType = mediaType;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public MediaType contentType() {
|
|
||||||
return mediaType;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long contentLength() {
|
|
||||||
try {
|
|
||||||
return inputStream.available();
|
|
||||||
} catch (IOException e) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void writeTo(BufferedSink sink) throws IOException {
|
|
||||||
Source source = null;
|
|
||||||
try {
|
|
||||||
source = Okio.source(inputStream);
|
|
||||||
sink.writeAll(source);
|
|
||||||
} finally {
|
|
||||||
Util.closeQuietly(source);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,86 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.dump;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.Serializable;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
|
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
|
||||||
|
|
||||||
public class SendToZenodo implements Serializable {
|
|
||||||
|
|
||||||
private static final Log log = LogFactory.getLog(SendToZenodo.class);
|
|
||||||
|
|
||||||
public static void main(final String[] args) throws Exception {
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
|
||||||
IOUtils
|
|
||||||
.toString(
|
|
||||||
SendToZenodo.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/oa/graph/dump/upload_zenodo.json")));
|
|
||||||
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
final String hdfsPath = parser.get("hdfsPath");
|
|
||||||
final String hdfsNameNode = parser.get("hdfsNameNode");
|
|
||||||
final String access_token = parser.get("accessToken");
|
|
||||||
final String connection_url = parser.get("connectionUrl");
|
|
||||||
final String metadata = parser.get("metadata");
|
|
||||||
final String isLookUpUrl = parser.get("isLookUpUrl");
|
|
||||||
|
|
||||||
QueryInformationSystem qis = new QueryInformationSystem();
|
|
||||||
qis.setIsLookUp(ISLookupClientFactory.getLookUpService(isLookUpUrl));
|
|
||||||
CommunityMap communityMap = qis.getCommunityMap();
|
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
conf.set("fs.defaultFS", hdfsNameNode);
|
|
||||||
|
|
||||||
FileSystem fileSystem = FileSystem.get(conf);
|
|
||||||
|
|
||||||
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
|
|
||||||
.listFiles(
|
|
||||||
new Path(hdfsPath), true);
|
|
||||||
APIClient apiClient = new APIClient(connection_url, access_token);
|
|
||||||
apiClient.connect();
|
|
||||||
while (fileStatusListIterator.hasNext()) {
|
|
||||||
LocatedFileStatus fileStatus = fileStatusListIterator.next();
|
|
||||||
|
|
||||||
Path p = fileStatus.getPath();
|
|
||||||
String p_string = p.toString();
|
|
||||||
if (!p_string.endsWith("_SUCCESS")) {
|
|
||||||
String tmp = p_string.substring(0, p_string.lastIndexOf("/"));
|
|
||||||
String community = tmp.substring(tmp.lastIndexOf("/") + 1);
|
|
||||||
log.info("Sending information for community: " + community);
|
|
||||||
String community_name = communityMap.get(community).replace(" ", "_") + ".json.gz";
|
|
||||||
log.info("Copying information for community: " + community);
|
|
||||||
fileSystem.copyToLocalFile(p, new Path("/tmp/" + community_name));
|
|
||||||
File f = new File("/tmp/" + community_name);
|
|
||||||
try {
|
|
||||||
apiClient.upload(f, community_name);
|
|
||||||
|
|
||||||
} finally {
|
|
||||||
if (f.exists()) {
|
|
||||||
log.info("Deleting information for community: " + community);
|
|
||||||
f.delete();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
apiClient.sendMretadata(metadata);
|
|
||||||
apiClient.publish();
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.graph.dump.graph;
|
||||||
|
|
||||||
|
import java.io.*;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import org.apache.commons.compress.archivers.ar.ArArchiveEntry;
|
||||||
|
import org.apache.commons.compress.archivers.ar.ArArchiveOutputStream;
|
||||||
|
import org.apache.commons.crypto.utils.IoUtils;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.*;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.oa.graph.dump.APIClient;
|
||||||
|
|
||||||
|
public class MakeTar implements Serializable {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MakeTar.class);
|
||||||
|
private final Configuration conf;
|
||||||
|
private final ArArchiveOutputStream ar;
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
String jsonConfiguration = IOUtils
|
||||||
|
.toString(
|
||||||
|
MakeTar.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/oa/graph/dump_whole/input_maketar_parameter.json"));
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
final String hdfsPath = parser.get("hdfsPath");
|
||||||
|
log.info("hdfsPath: {}", hdfsPath);
|
||||||
|
|
||||||
|
final String hdfsNameNode = parser.get("hdfsNameNode");
|
||||||
|
log.info("nameNode: {}", hdfsNameNode);
|
||||||
|
|
||||||
|
final String inputPath = parser.get("sourcePath");
|
||||||
|
log.info("input path : {}", inputPath);
|
||||||
|
|
||||||
|
MakeTar mt = new MakeTar(hdfsPath, hdfsNameNode);
|
||||||
|
mt.execute(inputPath);
|
||||||
|
mt.close();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void execute(String inputPath) throws IOException {
|
||||||
|
FileSystem fileSystem = FileSystem.get(conf);
|
||||||
|
|
||||||
|
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
|
||||||
|
.listFiles(
|
||||||
|
new Path(inputPath), true);
|
||||||
|
|
||||||
|
while (fileStatusListIterator.hasNext()) {
|
||||||
|
LocatedFileStatus fileStatus = fileStatusListIterator.next();
|
||||||
|
|
||||||
|
Path p = fileStatus.getPath();
|
||||||
|
String p_string = p.toString();
|
||||||
|
if (!p_string.endsWith("_SUCCESS")) {
|
||||||
|
String tmp = p_string.substring(0, p_string.lastIndexOf("/"));
|
||||||
|
String name = tmp.substring(tmp.lastIndexOf("/") + 1);
|
||||||
|
ar.putArchiveEntry(new ArArchiveEntry(name, fileStatus.getLen()));
|
||||||
|
InputStream is = fileSystem.open(fileStatus.getPath());
|
||||||
|
ar.write(IOUtils.toByteArray(is));
|
||||||
|
ar.closeArchiveEntry();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void close() throws IOException {
|
||||||
|
ar.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
public MakeTar(String hdfsPath, String hdfsNameNode) throws IOException {
|
||||||
|
this.conf = new Configuration();
|
||||||
|
this.conf.set("fs.defaultFS", hdfsNameNode);
|
||||||
|
FileSystem fileSystem = FileSystem.get(this.conf);
|
||||||
|
Path hdfsWritePath = new Path(hdfsPath);
|
||||||
|
FSDataOutputStream fsDataOutputStream = null;
|
||||||
|
if (fileSystem.exists(hdfsWritePath)) {
|
||||||
|
fsDataOutputStream = fileSystem.append(hdfsWritePath);
|
||||||
|
} else {
|
||||||
|
fsDataOutputStream = fileSystem.create(hdfsWritePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.ar = new ArArchiveOutputStream(fsDataOutputStream.getWrappedStream());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,41 @@
|
||||||
|
package eu.dnetlib.dhp.oa.graph.dump.graph;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
|
||||||
|
public class MakeTarTest {
|
||||||
|
private static String workingDir;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void beforeAll() throws IOException {
|
||||||
|
workingDir = Files
|
||||||
|
.createTempDirectory(eu.dnetlib.dhp.oa.graph.dump.graph.MakeTarTest.class.getSimpleName())
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTar() throws IOException {
|
||||||
|
LocalFileSystem fs = FileSystem.getLocal(new Configuration());
|
||||||
|
|
||||||
|
fs
|
||||||
|
.copyFromLocalFile(
|
||||||
|
false, new Path(getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/ni")
|
||||||
|
.getPath()),
|
||||||
|
new Path(workingDir + "/zenodo/ni/ni"));
|
||||||
|
fs
|
||||||
|
.copyFromLocalFile(
|
||||||
|
false, new Path(getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/dh-ch")
|
||||||
|
.getPath()),
|
||||||
|
new Path(workingDir + "/zenodo/dh-ch/dh-ch"));
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue