new classes for Gcat catalogue, Mapping to the catalogue, spark code and workflow definition

This commit is contained in:
Miriam Baglioni 2020-06-22 16:23:00 +02:00
parent ec19fcace0
commit 06b03840bd
12 changed files with 1011 additions and 53 deletions

View File

@ -0,0 +1,64 @@
package eu.dnetlib.dhp.schema.dump.gcat;
import eu.dnetlib.dhp.schema.dump.oaf.KeyValue;
import java.io.Serializable;
import java.util.List;
public class CatologueEntry implements Serializable {
private String name; //openaire id withouut :: substitute with $$
private String licence_id; //default "notspecified",
private String title; // title.maintitle
private String notes; // description.value (the first description
private String url; //the url of the resource in the openaire dashboard
private List<KeyValue> extras;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getLicence_id() {
return licence_id;
}
public void setLicence_id(String licence_id) {
this.licence_id = licence_id;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getNotes() {
return notes;
}
public void setNotes(String notes) {
this.notes = notes;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public List<KeyValue> getExtras() {
return extras;
}
public void setExtras(List<KeyValue> extras) {
this.extras = extras;
}
}

View File

@ -10,9 +10,7 @@ import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut; import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity; import org.apache.http.entity.StringEntity;
import org.apache.http.entity.mime.MultipartEntityBuilder; import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
@ -74,7 +72,6 @@ public class APIClient implements Serializable {
return response.getStatusLine().getStatusCode(); return response.getStatusLine().getStatusCode();
} }
public void upload(String filePath, String file_name) throws IOException { public void upload(String filePath, String file_name) throws IOException {
@ -108,12 +105,11 @@ public class APIClient implements Serializable {
HttpResponse response = client.execute(post); HttpResponse response = client.execute(post);
System.out.println(response.getStatusLine().getStatusCode()); System.out.println(response.getStatusLine().getStatusCode());
} }
public void publish() throws IOException { public void publish() throws IOException {
HttpClient client = new DefaultHttpClient(); HttpClient client = new DefaultHttpClient();
HttpPost post = new HttpPost(urlString +"/"+ deposition_id +"/actions/publish") ; HttpPost post = new HttpPost(urlString + "/" + deposition_id + "/actions/publish");
post.setHeader("Authorization", "Bearer " + ACCESS_TOKEN); post.setHeader("Authorization", "Bearer " + ACCESS_TOKEN);
HttpResponse response = client.execute(post); HttpResponse response = client.execute(post);

View File

@ -7,8 +7,13 @@ import com.google.common.collect.Maps;
public class Constants { public class Constants {
public static String PUBLICATION_URL = "https://beta.risis.openaire.eu/search/publication?articleId=";
public static String DATASET_URL = "https://beta.risis.openaire.eu/search/dataset?datasetId=";
public static String SOFTWARE_URL = "https://beta.risis.openaire.eu/search/software?softwareId=";
public static String ORP_URL = "https://beta.risis.openaire.eu/search/other?orpId=";
public static final Map<String, String> accessRightsCoarMap = Maps.newHashMap(); public static final Map<String, String> accessRightsCoarMap = Maps.newHashMap();
public static final Map<String, String> coarCodeLabelMap = Maps.newHashMap(); public static final Map<String, String> coarCodeLabelMap = Maps.newHashMap();
public static final Map<String, String> gcatCatalogue = Maps.newHashMap();
public static String COAR_ACCESS_RIGHT_SCHEMA = "http://vocabularies.coar-repositories.org/documentation/access_rights/"; public static String COAR_ACCESS_RIGHT_SCHEMA = "http://vocabularies.coar-repositories.org/documentation/access_rights/";
@ -26,4 +31,14 @@ public class Constants {
coarCodeLabelMap.put("c_14cb", "CLOSED"); coarCodeLabelMap.put("c_14cb", "CLOSED");
coarCodeLabelMap.put("c_f1cf", "EMBARGO"); coarCodeLabelMap.put("c_f1cf", "EMBARGO");
} }
static {
gcatCatalogue.put("OPEN", "OPEN");
accessRightsCoarMap.put("RESTRICTED", "RESTRICTED");
accessRightsCoarMap.put("OPEN SOURCE", "OPEN");
accessRightsCoarMap.put("CLOSED", "CLOSED");
accessRightsCoarMap.put("EMBARGO", "EMBARGO");
accessRightsCoarMap.put("UNKNOWN", "UNKNOWN");
accessRightsCoarMap.put("OTHER", "UNKNOWN");
}
} }

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.oa.graph.dump.gcat; package eu.dnetlib.dhp.oa.graph.dump.gcat;
import java.io.IOException; import java.io.IOException;
@ -5,7 +6,8 @@ import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
import com.google.gson.Gson; import javax.ws.rs.HttpMethod;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpEntity; import org.apache.http.HttpEntity;
@ -22,7 +24,7 @@ import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.HTTP; import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
import javax.ws.rs.HttpMethod; import com.google.gson.Gson;
/** /**
* Created by Alessia Bardi on 19/06/2020. * Created by Alessia Bardi on 19/06/2020.
@ -37,17 +39,19 @@ public class GCatAPIClient {
private final String itemPath = "items"; private final String itemPath = "items";
private String applicationToken; private String applicationToken;
public GCatAPIClient(){} public GCatAPIClient() {
}
/** /**
* Publish the json as in the D4science catalogue as an item. * Publish the json as in the D4science catalogue as an item. TODO: does the POST returns the whole item or just its
* TODO: does the POST returns the whole item or just its catalogue identifier? * catalogue identifier?
*
* @param jsonMetadata * @param jsonMetadata
* @return the HTTP status code of the request * @return the HTTP status code of the request
* @throws IOException * @throws IOException
*/ */
public int publish(final String jsonMetadata) throws IOException { public int publish(final String jsonMetadata) throws IOException {
try(CloseableHttpClient client = HttpClients.createDefault()) { try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpPost post = new HttpPost(getGcatBaseURL() + itemPath); HttpPost post = new HttpPost(getGcatBaseURL() + itemPath);
post.setHeader("gcube-token", getApplicationToken()); post.setHeader("gcube-token", getApplicationToken());
post.addHeader("Content-Type", "application/json"); post.addHeader("Content-Type", "application/json");
@ -62,6 +66,7 @@ public class GCatAPIClient {
/** /**
* List items in the catalogue * List items in the catalogue
*
* @param offset offset * @param offset offset
* @param limit limit * @param limit limit
* @return list of json items * @return list of json items
@ -69,10 +74,10 @@ public class GCatAPIClient {
* @throws URISyntaxException * @throws URISyntaxException
*/ */
public List<String> list(final int offset, final int limit) throws IOException, URISyntaxException { public List<String> list(final int offset, final int limit) throws IOException, URISyntaxException {
try(CloseableHttpClient client = HttpClients.createDefault()) { try (CloseableHttpClient client = HttpClients.createDefault()) {
URIBuilder builder = new URIBuilder(getGcatBaseURL() + itemPath) URIBuilder builder = new URIBuilder(getGcatBaseURL() + itemPath)
.addParameter("offset", String.valueOf(offset)) .addParameter("offset", String.valueOf(offset))
.addParameter("limit", String.valueOf(limit)); .addParameter("limit", String.valueOf(limit));
HttpGet get = new HttpGet(builder.build()); HttpGet get = new HttpGet(builder.build());
get.setHeader("gcube-token", getApplicationToken()); get.setHeader("gcube-token", getApplicationToken());
get.addHeader("Content-Type", "application/json"); get.addHeader("Content-Type", "application/json");
@ -82,7 +87,7 @@ public class GCatAPIClient {
@Override @Override
public String handleResponse( public String handleResponse(
final HttpResponse response) throws ClientProtocolException, IOException { final HttpResponse response) throws ClientProtocolException, IOException {
int status = response.getStatusLine().getStatusCode(); int status = response.getStatusLine().getStatusCode();
if (status >= 200 && status < 300) { if (status >= 200 && status < 300) {
HttpEntity entity = response.getEntity(); HttpEntity entity = response.getEntity();

View File

@ -0,0 +1,215 @@
package eu.dnetlib.dhp.oa.graph.dump.gcat;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.oa.graph.dump.Constants;
import eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry;
import eu.dnetlib.dhp.schema.dump.oaf.*;
import eu.dnetlib.dhp.schema.dump.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class Mapper implements Serializable {
public static <I extends eu.dnetlib.dhp.schema.oaf.Result> CatalogueEntry map(I input) {
final CatalogueEntry out = new CatalogueEntry();
Optional<Qualifier> ort = Optional.ofNullable(input.getResulttype());
List<KeyValue> externals = new ArrayList<>();
if (ort.isPresent()) {
switch (ort.get().getClassid()) {
case "publication":
Optional<Journal> journal = Optional
.ofNullable(((eu.dnetlib.dhp.schema.oaf.Publication) input).getJournal());
if (journal.isPresent()) {
Journal j = journal.get();
KeyValue kv = new KeyValue();
kv.setKey("journal");
kv.setValue(j.getName() + ", " + j.getVol() + ", " + j.getIss());
externals.add(kv);
}
out.setUrl(Constants.PUBLICATION_URL + input.getId().substring(3));
break;
case "dataset":
eu.dnetlib.dhp.schema.oaf.Dataset id = (eu.dnetlib.dhp.schema.oaf.Dataset) input;
Optional.ofNullable(id.getVersion()).ifPresent(v -> out.setVersion(v.getValue()));
out.setUrl(Constants.DATASET_URL + input.getId().substring(3));
break;
case "software":
eu.dnetlib.dhp.schema.oaf.Software is = (eu.dnetlib.dhp.schema.oaf.Software) input;
Optional
.ofNullable(is.getCodeRepositoryUrl())
.ifPresent(value -> externals.add(KeyValue.newInstance("url", value.getValue())));
Optional
.ofNullable(is.getDocumentationUrl())
.ifPresent(
value -> value
.stream()
.map(v -> externals.add(KeyValue.newInstance("url", v.getValue()))));
Optional
.ofNullable(is.getProgrammingLanguage())
.ifPresent(
value -> externals.add(KeyValue.newInstance("programming language", value.getClassname())));
out.setUrl(Constants.SOFTWARE_URL + input.getId().substring(3));
break;
case "other":
out.setUrl(Constants.ORP_URL + input.getId().substring(3));
break;
}
Optional
.ofNullable(input.getAuthor())
.ifPresent(
value -> value
.forEach(v -> externals.add(KeyValue.newInstance("author", v.getFullname()))));
Optional
.ofNullable(input.getBestaccessright())
.ifPresent(
value -> externals
.add(KeyValue.newInstance("access right", Constants.gcatCatalogue.get(value.getClassid()))));
Optional
.ofNullable(input.getCollectedfrom())
.ifPresent(
value -> value
.forEach(v -> externals.add(KeyValue.newInstance("collected from", v.getValue()))));
Optional
.ofNullable(input.getContributor())
.ifPresent(
value -> value
.forEach(v -> externals.add(KeyValue.newInstance("contributor", v.getValue()))));
Optional
.ofNullable(input.getCountry())
.ifPresent(
value -> value
.forEach(v -> externals.add(KeyValue.newInstance("country", v.getClassname()))));
final List<String> descriptionList = new ArrayList<>();
Optional
.ofNullable(input.getDescription())
.ifPresent(value -> {
Iterator<Field<String>> it = value.iterator();
out.setName(it.next().getValue());
it.forEachRemaining(v -> externals.add(KeyValue.newInstance("description", v.getValue())));
});
Optional
.ofNullable(input.getEmbargoenddate())
.ifPresent(oStr -> externals.add(KeyValue.newInstance("embargo end date", oStr.getValue())));
final List<String> formatList = new ArrayList<>();
Optional
.ofNullable(input.getFormat())
.ifPresent(value -> value.forEach(f -> formatList.add(f.getValue())));
out.setName(input.getId().replace(":", "$"));
Optional
.ofNullable(input.getInstance())
.ifPresent(
value -> value
.forEach(v -> {
Optional
.ofNullable(v.getHostedby())
.ifPresent(hb -> externals.add(KeyValue.newInstance("hosted by", hb.getValue())));
final HashSet<String> urlSet = new HashSet<>();
Optional
.ofNullable(v.getUrl())
.ifPresent(u -> u.forEach(url -> urlSet.add(url)));
urlSet.forEach(url -> externals.add(KeyValue.newInstance("url", url)));
}));
Optional
.ofNullable(input.getLanguage())
.ifPresent(value -> externals.add(KeyValue.newInstance("language", value.getClassname())));
List<StructuredProperty> iTitle = Optional
.ofNullable(input.getTitle())
.map(
value -> value
.stream()
.filter(t -> t.getQualifier().getClassid().equalsIgnoreCase("main title"))
.collect(Collectors.toList()))
.orElse(new ArrayList<>());
if (iTitle.size() > 0) {
out.setTitle(iTitle.get(0).getValue());
}
Optional
.ofNullable(input.getPid())
.ifPresent(
value -> value
.forEach(
v -> externals
.add(KeyValue.newInstance("pid", v.getQualifier().getClassid() + ":" + v.getValue()))));
Optional
.ofNullable(input.getDateofacceptance())
.ifPresent(value -> externals.add(KeyValue.newInstance("publication date", value.getValue())));
Optional
.ofNullable(input.getPublisher())
.ifPresent(value -> externals.add(KeyValue.newInstance("publisher", value.getValue())));
List<ControlledField> subjectList = new ArrayList<>();
Optional
.ofNullable(input.getSubject())
.ifPresent(
value -> value
.stream()
.forEach(
s -> externals
.add(
KeyValue
.newInstance("subject", s.getQualifier().getClassid() + ":" + s.getValue()))));
externals.add(KeyValue.newInstance("resource type", input.getResourcetype().getClassid()));
out.setExtras(externals);
}
return out;
}
private static eu.dnetlib.dhp.schema.dump.oaf.Author getAuthor(eu.dnetlib.dhp.schema.oaf.Author oa) {
eu.dnetlib.dhp.schema.dump.oaf.Author a = new eu.dnetlib.dhp.schema.dump.oaf.Author();
Optional
.ofNullable(oa.getAffiliation())
.ifPresent(
value -> a
.setAffiliation(
value
.stream()
.map(aff -> aff.getValue())
.collect(Collectors.toList())));
a.setFullname(oa.getFullname());
a.setName(oa.getName());
a.setSurname(oa.getSurname());
a.setRank(oa.getRank());
Optional
.ofNullable(oa.getPid())
.ifPresent(
value -> a
.setPid(
value
.stream()
.map(p -> ControlledField.newInstance(p.getQualifier().getClassid(), p.getValue()))
.collect(Collectors.toList())));
return a;
}
}

View File

@ -0,0 +1,126 @@
package eu.dnetlib.dhp.oa.graph.dump.gcat;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.CommunityMap;
import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem;
import eu.dnetlib.dhp.oa.graph.dump.SparkDumpCommunityProducts;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class SparkDumpRISISCatalogue implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkDumpRISISCatalogue.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkDumpRISISCatalogue.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/gcat/catalogue_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final String communityName = parser.get("communityName");
log.info("communityName: {}", communityName);
Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
execDump(spark, inputPath, outputPath, inputClazz, communityName);// , dumpClazz);
});
}
public static <I extends Result, O extends eu.dnetlib.dhp.schema.dump.oaf.Result> void execDump(SparkSession spark,
String inputPath,
String outputPath,
Class<I> inputClazz,
String communityName) {// Class<O> dumpClazz) {
// Set<String> communities = communityMap.keySet();
Dataset<I> tmp = Utils.readPath(spark, inputPath, inputClazz);
tmp
.map(
value -> execMap(value, communityName),
Encoders.bean(eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
private static <I extends Result> CatalogueEntry execMap(I value, String community) {
{
Optional<List<Context>> inputContext = Optional.ofNullable(value.getContext());
if (!inputContext.isPresent()) {
return null;
}
List<String> toDumpFor = inputContext.get().stream().map(c -> {
String id = c.getId();
if (id.contains("::")) {
id = id.substring(0, id.indexOf("::"));
}
if (community.equals(id)) {
return id;
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());
if (toDumpFor.size() == 0) {
return null;
}
return Mapper.map(value);
}
}
}

View File

@ -0,0 +1,38 @@
[
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName":"tn",
"paramLongName":"resultTableName",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
},
{
"paramName":"cm",
"paramLongName":"communityName",
"paramDescription": "the name of the community for which to execute the dump to the catalogue",
"paramRequired": true
}
]

View File

@ -0,0 +1,26 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
</configuration>

View File

@ -0,0 +1,216 @@
<workflow-app name="dump_community_products" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
<property>
<name>communityName</name>
<description>The name of the community for which execute the dump for the catalogue</description>
</property>
<property>
<name>hiveDbName</name>
<description>the target hive database name</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="reset_outputpath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="reset_outputpath">
<fs>
<delete path="${outputPath}"/>
<mkdir path="${outputPath}"/>
</fs>
<ok to="fork_dump"/>
<error to="Kill"/>
</action>
<fork name="fork_dump">
<path start="dump_publication"/>
<path start="dump_dataset"/>
<path start="dump_orp"/>
<path start="dump_software"/>
</fork>
<action name="dump_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table publication for RISIS related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.gcat.SparkDumpRISISCatalogue</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/publication</arg>
<arg>--communityName</arg><arg>${communityName}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table dataset for RISIS related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.gcat.SparkDumpRISISCatalogue</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/dataset</arg>
<arg>--communityName</arg><arg>${communityName}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table other for RISIS related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.gcat.SparkDumpRISISCatalogue</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--communityName</arg><arg>${communityName}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table software for RISIS related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.gcat.SparkDumpRISISCatalogue</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/software</arg>
<arg>--communityName</arg><arg>${communityName}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<join name="join_dump" to="End"/>
<end name="End"/>
</workflow-app>

View File

@ -1,35 +1,38 @@
package eu.dnetlib.dhp.oa.graph.dump; package eu.dnetlib.dhp.oa.graph.dump;
import eu.dnetlib.dhp.oa.graph.dump.gcat.GCatAPIClient; import java.io.IOException;
import java.net.URISyntaxException;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.IOException; import eu.dnetlib.dhp.oa.graph.dump.gcat.GCatAPIClient;
import java.net.URISyntaxException;
/** /**
* TODO: ask for a token for the dev gcat * TODO: ask for a token for the dev gcat
*/ */
public class GCatAPIClientTest { public class GCatAPIClientTest {
private static GCatAPIClient client; private static GCatAPIClient client;
//@BeforeAll // @BeforeAll
public static void setup(){ public static void setup() {
client = new GCatAPIClient(); client = new GCatAPIClient();
client.setApplicationToken(""); client.setApplicationToken("");
client.setGcatBaseURL("https://gcat.d4science.org/gcat/"); client.setGcatBaseURL("https://gcat.d4science.org/gcat/");
} }
// @Test // @Test
public void testList() throws IOException, URISyntaxException { public void testList() throws IOException, URISyntaxException {
System.out.println( client.list(0, 10)); System.out.println(client.list(0, 10));
} }
//@Test // @Test
public void testPublish() throws IOException { public void testPublish() throws IOException {
String json = IOUtils.toString(getClass().getResourceAsStream("eu/dnetlib/dhp/oa/graph/dump/gcat/gcat_pub.json")); String json = IOUtils
client.publish(json); .toString(getClass().getResourceAsStream("eu/dnetlib/dhp/oa/graph/dump/gcat/gcat_pub.json"));
} client.publish(json);
}
} }

View File

@ -4,14 +4,16 @@ package eu.dnetlib.dhp.oa.graph.dump;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import com.google.gson.Gson;
import eu.dnetlib.dhp.oa.graph.dump.zenodo.Creator;
import eu.dnetlib.dhp.oa.graph.dump.zenodo.Metadata;
import eu.dnetlib.dhp.oa.graph.dump.zenodo.ZenodoModel;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import com.google.gson.Gson;
import eu.dnetlib.dhp.oa.graph.dump.zenodo.Creator;
import eu.dnetlib.dhp.oa.graph.dump.zenodo.Metadata;
import eu.dnetlib.dhp.oa.graph.dump.zenodo.ZenodoModel;
public class ZenodoUploadTest { public class ZenodoUploadTest {
@Test @Test
@ -22,21 +24,33 @@ public class ZenodoUploadTest {
Assertions.assertEquals(201, s.connect()); Assertions.assertEquals(201, s.connect());
s.upload(getClass() s
.getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/ni") .upload(
.getPath(), "Neuroinformatics"); getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/ni")
.getPath(),
"Neuroinformatics");
s.upload(getClass() s
.getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/dh-ch") .upload(
.getPath(), "DigitalHumanitiesandCulturalHeritage"); getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/dh-ch")
.getPath(),
"DigitalHumanitiesandCulturalHeritage");
s.upload(getClass() s
.getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/egi") .upload(
.getPath(), "EGI"); getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/egi")
.getPath(),
"EGI");
s.upload(getClass() s
.getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/science-innovation-policy") .upload(
.getPath(), "ScienceandInnovationPolicyStudies"); getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/science-innovation-policy")
.getPath(),
"ScienceandInnovationPolicyStudies");
// //
// //
@ -67,12 +81,12 @@ public class ZenodoUploadTest {
} }
@Test @Test
public void testPublish() throws IOException { public void testPublish() throws IOException {
APIClient s = new APIClient("https://sandbox.zenodo.org/api/deposit/depositions"); APIClient s = new APIClient("https://sandbox.zenodo.org/api/deposit/depositions");
s.publish(); s.publish();
} }
@Test @Test
public void testUpload() throws IOException { public void testUpload() throws IOException {
@ -84,7 +98,5 @@ public class ZenodoUploadTest {
s.upload(sourcePath, "Neuroinformatics"); s.upload(sourcePath, "Neuroinformatics");
} }
} }

View File

@ -0,0 +1,242 @@
package eu.dnetlib.dhp.oa.graph.gcat;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.oa.graph.dump.SparkDumpCommunityProducts;
import eu.dnetlib.dhp.oa.graph.dump.gcat.SparkDumpRISISCatalogue;
//@ExtendWith(MockitoExtension.class)
public class DumpJobTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory.getLogger(DumpJobTest.class);
private static HashMap<String, String> map = new HashMap<>();
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(DumpJobTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(DumpJobTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(DumpJobTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void testDataset() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/dataset.json")
.getPath();
SparkDumpRISISCatalogue.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/result",
"-sourcePath", sourcePath,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-communityName", "risis"
});
// dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset");
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry> tmp = sc
.textFile(workingDir.toString() + "/result")
.map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry.class));
org.apache.spark.sql.Dataset<eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry.class));
Assertions.assertEquals(90, verificationDataset.count());
// verificationDataset.show(false);
Assertions
.assertTrue(
verificationDataset.filter("bestAccessright.code = 'c_abf2'").count() == verificationDataset
.filter("bestAccessright.code = 'c_abf2' and bestAccessright.label = 'OPEN'")
.count());
Assertions
.assertTrue(
verificationDataset.filter("bestAccessright.code = 'c_16ec'").count() == verificationDataset
.filter("bestAccessright.code = 'c_16ec' and bestAccessright.label = 'RESTRICTED'")
.count());
Assertions
.assertTrue(
verificationDataset.filter("bestAccessright.code = 'c_14cb'").count() == verificationDataset
.filter("bestAccessright.code = 'c_14cb' and bestAccessright.label = 'CLOSED'")
.count());
Assertions
.assertTrue(
verificationDataset.filter("bestAccessright.code = 'c_f1cf'").count() == verificationDataset
.filter("bestAccessright.code = 'c_f1cf' and bestAccessright.label = 'EMBARGO'")
.count());
Assertions.assertTrue(verificationDataset.filter("size(context) > 0").count() == 90);
Assertions.assertTrue(verificationDataset.filter("type = 'dataset'").count() == 90);
// verificationDataset.select("instance.type").show(false);
//TODO verify value and name of the fields for vocab related value (i.e. accessright, bestaccessright)
}
// @Test
// public void testPublication() throws Exception {
//
// final String sourcePath = getClass()
// .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/publication.json")
// .getPath();
//
// SparkDumpCommunityProducts.main(new String[] {
// "-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
// "-isSparkSessionManaged", Boolean.FALSE.toString(),
// "-outputPath", workingDir.toString() + "/result",
// "-sourcePath", sourcePath,
// "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
// "-communityMap", new Gson().toJson(map)
// });
//
//// dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset");
//
// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
//
// JavaRDD<eu.dnetlib.dhp.schema.dump.oaf.Result> tmp = sc
// .textFile(workingDir.toString() + "/result")
// .map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.dump.oaf.Result.class));
//
// org.apache.spark.sql.Dataset<eu.dnetlib.dhp.schema.dump.oaf.Result> verificationDataset = spark
// .createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.Result.class));
//
// Assertions.assertEquals(76, verificationDataset.count());
// verificationDataset.show(false);
//
// Assertions.assertEquals(76, verificationDataset.filter("type = 'publication'").count());
//
////TODO verify value and name of the fields for vocab related value (i.e. accessright, bestaccessright)
//
// }
//
// @Test
// public void testSoftware() throws Exception {
//
// final String sourcePath = getClass()
// .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/software.json")
// .getPath();
//
// SparkDumpCommunityProducts.main(new String[] {
// "-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
// "-isSparkSessionManaged", Boolean.FALSE.toString(),
// "-outputPath", workingDir.toString() + "/result",
// "-sourcePath", sourcePath,
// "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
// "-communityMap", new Gson().toJson(map)
// });
//
//// dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset");
//
// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
//
// JavaRDD<eu.dnetlib.dhp.schema.dump.oaf.Result> tmp = sc
// .textFile(workingDir.toString() + "/result")
// .map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.dump.oaf.Result.class));
//
// org.apache.spark.sql.Dataset<eu.dnetlib.dhp.schema.dump.oaf.Result> verificationDataset = spark
// .createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.Result.class));
//
// Assertions.assertEquals(6, verificationDataset.count());
//
// Assertions.assertEquals(6, verificationDataset.filter("type = 'software'").count());
// verificationDataset.show(false);
//
////TODO verify value and name of the fields for vocab related value (i.e. accessright, bestaccessright)
//
// }
//
// @Test
// public void testORP() throws Exception {
//
// final String sourcePath = getClass()
// .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/orp.json")
// .getPath();
//
// SparkDumpCommunityProducts.main(new String[] {
// "-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
// "-isSparkSessionManaged", Boolean.FALSE.toString(),
// "-outputPath", workingDir.toString() + "/result",
// "-sourcePath", sourcePath,
// "-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
// "-communityMap", new Gson().toJson(map)
// });
//
//// dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset");
//
// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
//
// JavaRDD<eu.dnetlib.dhp.schema.dump.oaf.Result> tmp = sc
// .textFile(workingDir.toString() + "/result")
// .map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.dump.oaf.Result.class));
//
// org.apache.spark.sql.Dataset<eu.dnetlib.dhp.schema.dump.oaf.Result> verificationDataset = spark
// .createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.Result.class));
//
// Assertions.assertEquals(3, verificationDataset.count());
//
// Assertions.assertEquals(3, verificationDataset.filter("type = 'other'").count());
// verificationDataset.show(false);
//
////TODO verify value and name of the fields for vocab related value (i.e. accessright, bestaccessright)
//
// }
}