Merge branch 'dump_zenodo_2' of https://code-repo.d4science.org/D-Net/dhp-graph-dump into dump_zenodo_2

This commit is contained in:
Miriam Baglioni 2023-07-01 10:39:37 +02:00
commit 766288d1c9
4 changed files with 190 additions and 5 deletions

View File

@ -80,7 +80,7 @@ public class SendToZenodoHDFS implements Serializable {
if (!pString.endsWith("_SUCCESS")) {
String name = pString.substring(pString.lastIndexOf("/") + 1);
FSDataInputStream inputStream = fileSystem.open(p);
zenodoApiClient.uploadIS2(inputStream, name);
zenodoApiClient.uploadIS3(inputStream, name, fileSystem.getFileStatus(p).getLen());
}
}

View File

@ -360,12 +360,10 @@ public class ZenodoAPIClient2 implements Serializable {
import com.google.gson.Gson;
>>>>>>> 6ace388cff1e9fef4f6497a1ed5b5bc6e0bbd94a
import eu.dnetlib.dhp.common.api.InputStreamRequestBody;
import eu.dnetlib.dhp.common.api.zenodo.ZenodoModel;
import eu.dnetlib.dhp.common.api.zenodo.ZenodoModelList;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.*;
public class ZenodoAPIClient2 implements Serializable {
@ -375,6 +373,10 @@ public class ZenodoAPIClient2 implements Serializable {
String deposition_id;
String access_token;
public static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8");
private static final MediaType MEDIA_TYPE_ZIP = MediaType.parse("application/zip");
public String getUrlString() {
return urlString;
}
@ -530,6 +532,27 @@ public class ZenodoAPIClient2 implements Serializable {
return body;
}
public int uploadIS3(InputStream is, String file_name, long len) throws IOException {
OkHttpClient httpClient = new OkHttpClient.Builder()
.writeTimeout(600, TimeUnit.SECONDS)
.readTimeout(600, TimeUnit.SECONDS)
.connectTimeout(600, TimeUnit.SECONDS)
.build();
Request request = new Request.Builder()
.url(bucket + "/" + file_name)
.addHeader(HttpHeaders.CONTENT_TYPE, "application/zip") // add request headers
.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + access_token)
.put(InputStreamRequestBody.create(MEDIA_TYPE_ZIP, is, len))
.build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful())
throw new IOException("Unexpected code " + response + response.body().string());
return response.code();
}
}
/**
* Associates metadata information to the current deposition
*

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,132 @@
<workflow-app name="dump_graph" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
<property>
<name>accessToken</name>
<description>the access token used for the deposition in Zenodo</description>
</property>
<property>
<name>connectionUrl</name>
<description>the connection url for Zenodo</description>
</property>
<property>
<name>metadata</name>
<value>""</value>
<description> the metadata associated to the deposition</description>
</property>
<property>
<name>depositionType</name>
<description>the type of deposition we want to perform. "new" for brand new deposition, "version" for a new version of a published deposition (in this case the concept record id must be provided), "upload" to upload content to an open deposition for which we already have the deposition id (in this case the deposition id should be provided)</description>
</property>
<property>
<name>conceptRecordId</name>
<value>none</value>
<description>for new version, the id of the record for the old deposition</description>
</property>
<property>
<name>depositionId</name>
<value>none</value>
<description>the depositionId of a deposition open that has to be added content</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="send_zenodo"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="make_archive">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.MakeTar</main-class>
<arg>--hdfsPath</arg><arg>${outputPath}/tar</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
</java>
<ok to="send_zenodo"/>
<error to="Kill"/>
</action>
<action name="send_zenodo">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS</main-class>
<arg>--hdfsPath</arg><arg>${outputPath}/tar/</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--accessToken</arg><arg>${accessToken}</arg>
<arg>--connectionUrl</arg><arg>${connectionUrl}</arg>
<arg>--metadata</arg><arg>${metadata}</arg>
<arg>--conceptRecordId</arg><arg>${conceptRecordId}</arg>
<arg>--depositionType</arg><arg>${depositionType}</arg>
<arg>--depositionId</arg><arg>${depositionId}</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>