This commit is contained in:
Miriam Baglioni 2020-06-22 10:50:41 +02:00
parent edeb862476
commit e8f914f8b3
3 changed files with 210 additions and 192 deletions

View File

@ -80,7 +80,7 @@
</configuration>
</global>
<start to="reset_outputpath"/>
<start to="send_zenodo"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -366,146 +366,19 @@
<error to="Kill"/>
</action>
<!-- <join name="join_extend" to="fork_splitForCommunities"/>-->
<!-- <fork name="fork_splitForCommunities">-->
<!-- <path start="split_publication"/>-->
<!-- <path start="split_dataset"/>-->
<!-- <path start="split_orp"/>-->
<!-- <path start="split_software"/>-->
<!-- </fork>-->
<!-- <action name="split_publication">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <master>yarn</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>Split dumped result for community</name>-->
<!-- <class>eu.dnetlib.dhp.oa.graph.dump.SparkSplitForCommunity</class>-->
<!-- <jar>dhp-graph-mapper-${projectVersion}.jar</jar>-->
<!-- <spark-opts>-->
<!-- &#45;&#45;executor-memory=${sparkExecutorMemory}-->
<!-- &#45;&#45;executor-cores=${sparkExecutorCores}-->
<!-- &#45;&#45;driver-memory=${sparkDriverMemory}-->
<!-- &#45;&#45;conf spark.extraListeners=${spark2ExtraListeners}-->
<!-- &#45;&#45;conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
<!-- &#45;&#45;conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
<!-- &#45;&#45;conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
<!-- &#45;&#45;conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}-->
<!-- </spark-opts>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${workingDir}/ext/publication</arg>-->
<!-- <arg>&#45;&#45;outputPath</arg><arg>${outputPath}</arg>-->
<!-- <arg>&#45;&#45;resultTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.Publication</arg>-->
<!-- <arg>&#45;&#45;isLookUpUrl</arg><arg>${isLookUpUrl}</arg>-->
<!-- </spark>-->
<!-- <ok to="join_split"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<!-- <action name="split_dataset">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <master>yarn</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>Split dumped result for community</name>-->
<!-- <class>eu.dnetlib.dhp.oa.graph.dump.SparkSplitForCommunity</class>-->
<!-- <jar>dhp-graph-mapper-${projectVersion}.jar</jar>-->
<!-- <spark-opts>-->
<!-- &#45;&#45;executor-memory=${sparkExecutorMemory}-->
<!-- &#45;&#45;executor-cores=${sparkExecutorCores}-->
<!-- &#45;&#45;driver-memory=${sparkDriverMemory}-->
<!-- &#45;&#45;conf spark.extraListeners=${spark2ExtraListeners}-->
<!-- &#45;&#45;conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
<!-- &#45;&#45;conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
<!-- &#45;&#45;conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
<!-- &#45;&#45;conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}-->
<!-- </spark-opts>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${workingDir}/ext/dataset</arg>-->
<!-- <arg>&#45;&#45;outputPath</arg><arg>${outputPath}</arg>-->
<!-- <arg>&#45;&#45;resultTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.Dataset</arg>-->
<!-- <arg>&#45;&#45;isLookUpUrl</arg><arg>${isLookUpUrl}</arg>-->
<!-- </spark>-->
<!-- <ok to="join_split"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<!-- <action name="split_orp">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <master>yarn</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>Split dumped result for community</name>-->
<!-- <class>eu.dnetlib.dhp.oa.graph.dump.SparkSplitForCommunity</class>-->
<!-- <jar>dhp-graph-mapper-${projectVersion}.jar</jar>-->
<!-- <spark-opts>-->
<!-- &#45;&#45;executor-memory=${sparkExecutorMemory}-->
<!-- &#45;&#45;executor-cores=${sparkExecutorCores}-->
<!-- &#45;&#45;driver-memory=${sparkDriverMemory}-->
<!-- &#45;&#45;conf spark.extraListeners=${spark2ExtraListeners}-->
<!-- &#45;&#45;conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
<!-- &#45;&#45;conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
<!-- &#45;&#45;conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
<!-- &#45;&#45;conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}-->
<!-- </spark-opts>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${workingDir}/ext/orp</arg>-->
<!-- <arg>&#45;&#45;outputPath</arg><arg>${outputPath}</arg>-->
<!-- <arg>&#45;&#45;resultTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.OtherResearchProduct</arg>-->
<!-- <arg>&#45;&#45;isLookUpUrl</arg><arg>${isLookUpUrl}</arg>-->
<!-- </spark>-->
<!-- <ok to="join_split"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<!-- <action name="split_software">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <master>yarn</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>Split dumped result for community</name>-->
<!-- <class>eu.dnetlib.dhp.oa.graph.dump.SparkSplitForCommunity</class>-->
<!-- <jar>dhp-graph-mapper-${projectVersion}.jar</jar>-->
<!-- <spark-opts>-->
<!-- &#45;&#45;executor-memory=${sparkExecutorMemory}-->
<!-- &#45;&#45;executor-cores=${sparkExecutorCores}-->
<!-- &#45;&#45;driver-memory=${sparkDriverMemory}-->
<!-- &#45;&#45;conf spark.extraListeners=${spark2ExtraListeners}-->
<!-- &#45;&#45;conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
<!-- &#45;&#45;conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
<!-- &#45;&#45;conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
<!-- &#45;&#45;conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}-->
<!-- </spark-opts>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${workingDir}/ext/software</arg>-->
<!-- <arg>&#45;&#45;outputPath</arg><arg>${outputPath}</arg>-->
<!-- <arg>&#45;&#45;resultTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.Software</arg>-->
<!-- <arg>&#45;&#45;isLookUpUrl</arg><arg>${isLookUpUrl}</arg>-->
<!-- </spark>-->
<!-- <ok to="join_split"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<!-- <join name="join_split" to="loadInZenodo"/>-->
<!-- <join name="join_split" to="End"/>-->
<!-- <action name="loadInZenodo">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <master>yarn</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>Import table software</name>-->
<!-- <class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableImporterJob</class>-->
<!-- <jar>dhp-graph-mapper-${projectVersion}.jar</jar>-->
<!-- <spark-opts>-->
<!-- &#45;&#45;executor-memory=${sparkExecutorMemory}-->
<!-- &#45;&#45;executor-cores=${sparkExecutorCores}-->
<!-- &#45;&#45;driver-memory=${sparkDriverMemory}-->
<!-- &#45;&#45;conf spark.extraListeners=${spark2ExtraListeners}-->
<!-- &#45;&#45;conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
<!-- &#45;&#45;conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
<!-- &#45;&#45;conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
<!-- &#45;&#45;conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}-->
<!-- </spark-opts>-->
<!-- <arg>&#45;&#45;inputPath</arg><arg>${workingDir}/ext/publication</arg>-->
<!-- <arg>&#45;&#45;hiveDbName</arg><arg>${hiveDbName}</arg>-->
<!-- <arg>&#45;&#45;resultTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.Publication</arg>-->
<!-- -->
<!-- </spark>-->
<!-- <ok to="End"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<action name="send_zenodo">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.SendToZenodo</main-class>
<arg>--hdfsPath</arg><arg>${outputPath}</arg>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--accessToken</arg><arg>${accessToken}</arg>
<arg>--connectionUrl</arg><arg>${connectionUrl}</arg>
<arg>--metadata</arg><arg>${metadata}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>

View File

@ -0,0 +1,39 @@
[
{
"paramName":"is",
"paramLongName":"isLookUpUrl",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
},
{
"paramName":"hdfsp",
"paramLongName":"hdfsPath",
"paramDescription": "the path of the folder tofind files to send to Zenodo",
"paramRequired": true
},
{
"paramName": "hdfsnn",
"paramLongName": "hdfsNameNode",
"paramDescription": "the name node",
"paramRequired": true
},
{
"paramName": "at",
"paramLongName": "accessToken",
"paramDescription": "the access token for the deposition",
"paramRequired": false
},
{
"paramName":"cu",
"paramLongName":"connectionUrl",
"paramDescription": "the url to connect to deposit",
"paramRequired": false
},
{
"paramName":"m",
"paramLongName":"metadata",
"paramDescription": "metadata associated to the deposition",
"paramRequired": false
}
]

View File

@ -1,54 +1,180 @@
package eu.dnetlib.dhp.oa.graph.dump;
import java.io.IOException;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Arrays;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import com.google.gson.Gson;
import eu.dnetlib.dhp.oa.graph.dump.zenodo.Creator;
import eu.dnetlib.dhp.oa.graph.dump.zenodo.Metadata;
import eu.dnetlib.dhp.oa.graph.dump.zenodo.ZenodoModel;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class ZenodoUploadTest {
private static String workingDir;
// private static FileSystem fileSystem;
//
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(eu.dnetlib.dhp.oa.graph.dump.UpdateProjectInfoTest.class.getSimpleName())
.toString();
}
//
// Configuration conf = new Configuration();
// conf.set("fs.defaultFS", "localhost");
//
// fileSystem = FileSystem.get(conf);
//
// FSDataOutputStream fsDataOutputStream = fileSystem.create(new org.apache.hadoop.fs.Path(workingDir + "/ni"));
//
// BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
//
//
// writer.write(ZenodoUploadTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/dump/zenodo/ni").toString());
// }
@Test
public void HDFSConnection() throws IOException {
CommunityMap communityMap = new CommunityMap();
communityMap.put("ni", "Neuroinformatics");
communityMap.put("dh-ch", "Digital Humanities and Cultural Heritage");
LocalFileSystem fs = FileSystem.getLocal(new Configuration());
fs
.copyFromLocalFile(
false, new Path(getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/ni")
.getPath()),
new Path(workingDir + "/zenodo/ni/ni"));
fs
.copyFromLocalFile(
false, new Path(getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/dh-ch")
.getPath()),
new Path(workingDir + "/zenodo/dh-ch/dh-ch"));
System.out.println("pr");
// Configuration conf = new Configuration();
// conf.set("fs.defaultFS", "localhost");
//
// APIClient s = new APIClient(
// "https://sandbox.zenodo.org/api/deposit/depositions");
//
// s.connect();
// s.upload(workingDir +"/ni", "Neuroinformatics", fs);
APIClient client = new APIClient("https://sandbox.zenodo.org/api/deposit/depositions",
"5ImUj0VC1ICg4ifK5dc3AGzJhcfAB4osxrFlsr8WxHXxjaYgCE0hY8HZcDoe");
client.connect();
// the second boolean parameter here sets the recursion to true
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs
.listFiles(
new Path(workingDir + "/zenodo"), true);
while (fileStatusListIterator.hasNext()) {
LocatedFileStatus fileStatus = fileStatusListIterator.next();
// do stuff with the file like ...
// BufferedInputStream bis = new BufferedInputStream(fs.open( fileStatus.getPath()));
String p_string = fileStatus.getPath().toString();
int index = p_string.lastIndexOf("/");
String community = p_string.substring(0, index);
community = community.substring(community.lastIndexOf("/") + 1);
String community_name = communityMap.get(community).replace(" ", "_");
fs.copyToLocalFile(fileStatus.getPath(), new Path("/tmp/" + community_name));
System.out.println(community);
// System.out.println(client.upload(bis, community));
File f = new File("/tmp/" + community_name);
client.upload(f, community_name);
if (f.exists()) {
f.delete();
}
}
ZenodoModel zenodo = new ZenodoModel();
Metadata data = new Metadata();
data.setTitle("Dump of OpenAIRE Communities related graph");
data.setUpload_type("dataset");
data.setDescription("this is a fake uploade done for testing purposes");
Creator c = new Creator();
c.setName("Miriam Baglioni");
c.setAffiliation("CNR _ISTI");
data.setCreators(Arrays.asList(c));
zenodo.setMetadata(data);
System.out.println(client.sendMretadata(new Gson().toJson(zenodo)));
System.out.println(client.publish());
}
@Test
public void serializeMetadata() {
ZenodoModel zenodo = new ZenodoModel();
Metadata data = new Metadata();
data.setTitle("Dump of OpenAIRE Communities related graph");
data.setUpload_type("dataset");
data.setDescription("this is a fake uploade done for testing purposes");
Creator c = new Creator();
c.setName("Miriam Baglioni");
c.setAffiliation("CNR _ISTI");
data.setCreators(Arrays.asList(c));
zenodo.setMetadata(data);
System.out.println(new Gson().toJson(zenodo));
}
@Test
public void testConnection() throws IOException {
// InputStream is = getClass().getClassLoader().getResourceAsStream("eu/dnetlib/dhp/oa/graph/dump/zenodo/ni");
APIClient s = new APIClient(
// "https://sandbox.zenodo.org/api/deposit/depositions?access_token=5ImUj0VC1ICg4ifK5dc3AGzJhcfAB4osxrFlsr8WxHXxjaYgCE0hY8HZcDoe");
"https://sandbox.zenodo.org/api/deposit/depositions");
"https://sandbox.zenodo.org/api/deposit/depositions",
"5ImUj0VC1ICg4ifK5dc3AGzJhcfAB4osxrFlsr8WxHXxjaYgCE0hY8HZcDoe");
Assertions.assertEquals(201, s.connect());
s.upload(getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/ni")
.getPath(), "Neuroinformatics");
s
.upload(
new File(getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/ni")
.getPath()),
"Neuroinformatics");
s.upload(getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/dh-ch")
.getPath(), "DigitalHumanitiesandCulturalHeritage");
// s.upload(getClass()
// .getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/dh-ch")
// .getPath(), "DigitalHumanitiesandCulturalHeritage");
//
// s.upload(getClass()
// .getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/egi")
// .getPath(), "EGI");
//
// s.upload(getClass()
// .getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/science-innovation-policy")
// .getPath(), "ScienceandInnovationPolicyStudies");
s.upload(getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/egi")
.getPath(), "EGI");
s.upload(getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/science-innovation-policy")
.getPath(), "ScienceandInnovationPolicyStudies");
//
//
// String data = "{\"metadata\": {\"title\": \"My first upload\", " +
// "\"upload_type\": \"poster\", " +
// "\"description\": \"This is my first upload\", " +
// "\"creators\": [{\"name\": \"Doe, John\", " +
// "\"affiliation': 'Zenodo'}]
//... }
//... }
//
//
ZenodoModel zenodo = new ZenodoModel();
Metadata data = new Metadata();
@ -67,24 +193,4 @@ public class ZenodoUploadTest {
}
@Test
public void testPublish() throws IOException {
APIClient s = new APIClient("https://sandbox.zenodo.org/api/deposit/depositions");
s.publish();
}
@Test
public void testUpload() throws IOException {
APIClient s = new APIClient(
"https://sandbox.zenodo.org/api/deposit/depositions?access_token=5ImUj0VC1ICg4ifK5dc3AGzJhcfAB4osxrFlsr8WxHXxjaYgCE0hY8HZcDoe");
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/ni")
.getPath();
s.upload(sourcePath, "Neuroinformatics");
}
}