From 06b03840bd2f826b54558b1864bfe17bb612543e Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 22 Jun 2020 16:23:00 +0200 Subject: [PATCH] new classes for Gcat catalogue, Mapping to the catalogue, spark code and workflow definition --- .../dhp/schema/dump/gcat/CatalogueEntry.java | 64 +++++ .../dnetlib/dhp/oa/graph/dump/APIClient.java | 6 +- .../dnetlib/dhp/oa/graph/dump/Constants.java | 15 ++ .../dhp/oa/graph/dump/gcat/GCatAPIClient.java | 25 +- .../dhp/oa/graph/dump/gcat/Mapper.java | 215 ++++++++++++++++ .../dump/gcat/SparkDumpRISISCatalogue.java | 126 +++++++++ .../oa/graph/gcat/catalogue_parameters.json | 38 +++ .../graph/gcat/oozie_app/config-default.xml | 26 ++ .../dhp/oa/graph/gcat/oozie_app/workflow.xml | 216 ++++++++++++++++ .../dhp/oa/graph/dump/GCatAPIClientTest.java | 41 +-- .../dhp/oa/graph/dump/ZenodoUploadTest.java | 50 ++-- .../dhp/oa/graph/gcat/DumpJobTest.java | 242 ++++++++++++++++++ 12 files changed, 1011 insertions(+), 53 deletions(-) create mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/gcat/CatalogueEntry.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/gcat/Mapper.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/gcat/SparkDumpRISISCatalogue.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/gcat/catalogue_parameters.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/gcat/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/gcat/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/gcat/DumpJobTest.java diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/gcat/CatalogueEntry.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/gcat/CatalogueEntry.java new file mode 100644 index 0000000000..7819ed92d8 --- /dev/null +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/gcat/CatalogueEntry.java @@ -0,0 +1,64 @@ +package eu.dnetlib.dhp.schema.dump.gcat; + +import eu.dnetlib.dhp.schema.dump.oaf.KeyValue; + +import java.io.Serializable; +import java.util.List; + +public class CatologueEntry implements Serializable { + + private String name; //openaire id withouut :: substitute with $$ + private String licence_id; //default "notspecified", + private String title; // title.maintitle + private String notes; // description.value (the first description + private String url; //the url of the resource in the openaire dashboard + private List extras; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getLicence_id() { + return licence_id; + } + + public void setLicence_id(String licence_id) { + this.licence_id = licence_id; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getNotes() { + return notes; + } + + public void setNotes(String notes) { + this.notes = notes; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public List getExtras() { + return extras; + } + + public void setExtras(List extras) { + this.extras = extras; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/APIClient.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/APIClient.java index 74d02aacf9..7a7755cd50 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/APIClient.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/APIClient.java @@ -10,9 +10,7 @@ import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; - import org.apache.http.entity.StringEntity; - import org.apache.http.entity.mime.MultipartEntityBuilder; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.util.EntityUtils; @@ -74,7 +72,6 @@ public class APIClient implements Serializable { return response.getStatusLine().getStatusCode(); - } public void upload(String filePath, String file_name) throws IOException { @@ -108,12 +105,11 @@ public class APIClient implements Serializable { HttpResponse response = client.execute(post); System.out.println(response.getStatusLine().getStatusCode()); - } public void publish() throws IOException { HttpClient client = new DefaultHttpClient(); - HttpPost post = new HttpPost(urlString +"/"+ deposition_id +"/actions/publish") ; + HttpPost post = new HttpPost(urlString + "/" + deposition_id + "/actions/publish"); post.setHeader("Authorization", "Bearer " + ACCESS_TOKEN); HttpResponse response = client.execute(post); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Constants.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Constants.java index b77cfdb6b9..5f0c4d0132 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Constants.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Constants.java @@ -7,8 +7,13 @@ import com.google.common.collect.Maps; public class Constants { + public static String PUBLICATION_URL = "https://beta.risis.openaire.eu/search/publication?articleId="; + public static String DATASET_URL = "https://beta.risis.openaire.eu/search/dataset?datasetId="; + public static String SOFTWARE_URL = "https://beta.risis.openaire.eu/search/software?softwareId="; + public static String ORP_URL = "https://beta.risis.openaire.eu/search/other?orpId="; public static final Map accessRightsCoarMap = Maps.newHashMap(); public static final Map coarCodeLabelMap = Maps.newHashMap(); + public static final Map gcatCatalogue = Maps.newHashMap(); public static String COAR_ACCESS_RIGHT_SCHEMA = "http://vocabularies.coar-repositories.org/documentation/access_rights/"; @@ -26,4 +31,14 @@ public class Constants { coarCodeLabelMap.put("c_14cb", "CLOSED"); coarCodeLabelMap.put("c_f1cf", "EMBARGO"); } + + static { + gcatCatalogue.put("OPEN", "OPEN"); + accessRightsCoarMap.put("RESTRICTED", "RESTRICTED"); + accessRightsCoarMap.put("OPEN SOURCE", "OPEN"); + accessRightsCoarMap.put("CLOSED", "CLOSED"); + accessRightsCoarMap.put("EMBARGO", "EMBARGO"); + accessRightsCoarMap.put("UNKNOWN", "UNKNOWN"); + accessRightsCoarMap.put("OTHER", "UNKNOWN"); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/gcat/GCatAPIClient.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/gcat/GCatAPIClient.java index 1927cac4ae..c845fd1f33 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/gcat/GCatAPIClient.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/gcat/GCatAPIClient.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.oa.graph.dump.gcat; import java.io.IOException; @@ -5,7 +6,8 @@ import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.List; -import com.google.gson.Gson; +import javax.ws.rs.HttpMethod; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.http.HttpEntity; @@ -22,7 +24,7 @@ import org.apache.http.impl.client.HttpClients; import org.apache.http.protocol.HTTP; import org.apache.http.util.EntityUtils; -import javax.ws.rs.HttpMethod; +import com.google.gson.Gson; /** * Created by Alessia Bardi on 19/06/2020. @@ -37,17 +39,19 @@ public class GCatAPIClient { private final String itemPath = "items"; private String applicationToken; - public GCatAPIClient(){} + public GCatAPIClient() { + } /** - * Publish the json as in the D4science catalogue as an item. - * TODO: does the POST returns the whole item or just its catalogue identifier? + * Publish the json as in the D4science catalogue as an item. TODO: does the POST returns the whole item or just its + * catalogue identifier? + * * @param jsonMetadata * @return the HTTP status code of the request * @throws IOException */ public int publish(final String jsonMetadata) throws IOException { - try(CloseableHttpClient client = HttpClients.createDefault()) { + try (CloseableHttpClient client = HttpClients.createDefault()) { HttpPost post = new HttpPost(getGcatBaseURL() + itemPath); post.setHeader("gcube-token", getApplicationToken()); post.addHeader("Content-Type", "application/json"); @@ -62,6 +66,7 @@ public class GCatAPIClient { /** * List items in the catalogue + * * @param offset offset * @param limit limit * @return list of json items @@ -69,10 +74,10 @@ public class GCatAPIClient { * @throws URISyntaxException */ public List list(final int offset, final int limit) throws IOException, URISyntaxException { - try(CloseableHttpClient client = HttpClients.createDefault()) { + try (CloseableHttpClient client = HttpClients.createDefault()) { URIBuilder builder = new URIBuilder(getGcatBaseURL() + itemPath) - .addParameter("offset", String.valueOf(offset)) - .addParameter("limit", String.valueOf(limit)); + .addParameter("offset", String.valueOf(offset)) + .addParameter("limit", String.valueOf(limit)); HttpGet get = new HttpGet(builder.build()); get.setHeader("gcube-token", getApplicationToken()); get.addHeader("Content-Type", "application/json"); @@ -82,7 +87,7 @@ public class GCatAPIClient { @Override public String handleResponse( - final HttpResponse response) throws ClientProtocolException, IOException { + final HttpResponse response) throws ClientProtocolException, IOException { int status = response.getStatusLine().getStatusCode(); if (status >= 200 && status < 300) { HttpEntity entity = response.getEntity(); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/gcat/Mapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/gcat/Mapper.java new file mode 100644 index 0000000000..f61b047724 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/gcat/Mapper.java @@ -0,0 +1,215 @@ + +package eu.dnetlib.dhp.oa.graph.dump.gcat; + +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; + +import eu.dnetlib.dhp.oa.graph.dump.Constants; +import eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry; +import eu.dnetlib.dhp.schema.dump.oaf.*; +import eu.dnetlib.dhp.schema.dump.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Qualifier; + +public class Mapper implements Serializable { + + public static CatalogueEntry map(I input) { + + final CatalogueEntry out = new CatalogueEntry(); + Optional ort = Optional.ofNullable(input.getResulttype()); + List externals = new ArrayList<>(); + if (ort.isPresent()) { + switch (ort.get().getClassid()) { + case "publication": + Optional journal = Optional + .ofNullable(((eu.dnetlib.dhp.schema.oaf.Publication) input).getJournal()); + if (journal.isPresent()) { + Journal j = journal.get(); + KeyValue kv = new KeyValue(); + kv.setKey("journal"); + kv.setValue(j.getName() + ", " + j.getVol() + ", " + j.getIss()); + externals.add(kv); + } + out.setUrl(Constants.PUBLICATION_URL + input.getId().substring(3)); + break; + case "dataset": + eu.dnetlib.dhp.schema.oaf.Dataset id = (eu.dnetlib.dhp.schema.oaf.Dataset) input; + Optional.ofNullable(id.getVersion()).ifPresent(v -> out.setVersion(v.getValue())); + out.setUrl(Constants.DATASET_URL + input.getId().substring(3)); + break; + case "software": + + eu.dnetlib.dhp.schema.oaf.Software is = (eu.dnetlib.dhp.schema.oaf.Software) input; + Optional + .ofNullable(is.getCodeRepositoryUrl()) + .ifPresent(value -> externals.add(KeyValue.newInstance("url", value.getValue()))); + Optional + .ofNullable(is.getDocumentationUrl()) + .ifPresent( + value -> value + .stream() + .map(v -> externals.add(KeyValue.newInstance("url", v.getValue())))); + + Optional + .ofNullable(is.getProgrammingLanguage()) + .ifPresent( + value -> externals.add(KeyValue.newInstance("programming language", value.getClassname()))); + out.setUrl(Constants.SOFTWARE_URL + input.getId().substring(3)); + break; + + case "other": + out.setUrl(Constants.ORP_URL + input.getId().substring(3)); + break; + + } + + Optional + .ofNullable(input.getAuthor()) + .ifPresent( + value -> value + .forEach(v -> externals.add(KeyValue.newInstance("author", v.getFullname())))); + + Optional + .ofNullable(input.getBestaccessright()) + .ifPresent( + value -> externals + .add(KeyValue.newInstance("access right", Constants.gcatCatalogue.get(value.getClassid())))); + + Optional + .ofNullable(input.getCollectedfrom()) + .ifPresent( + value -> value + .forEach(v -> externals.add(KeyValue.newInstance("collected from", v.getValue())))); + + Optional + .ofNullable(input.getContributor()) + .ifPresent( + value -> value + .forEach(v -> externals.add(KeyValue.newInstance("contributor", v.getValue())))); + + Optional + .ofNullable(input.getCountry()) + .ifPresent( + value -> value + .forEach(v -> externals.add(KeyValue.newInstance("country", v.getClassname())))); + + final List descriptionList = new ArrayList<>(); + Optional + .ofNullable(input.getDescription()) + .ifPresent(value -> { + Iterator> it = value.iterator(); + out.setName(it.next().getValue()); + it.forEachRemaining(v -> externals.add(KeyValue.newInstance("description", v.getValue()))); + }); + + Optional + .ofNullable(input.getEmbargoenddate()) + .ifPresent(oStr -> externals.add(KeyValue.newInstance("embargo end date", oStr.getValue()))); + + final List formatList = new ArrayList<>(); + Optional + .ofNullable(input.getFormat()) + .ifPresent(value -> value.forEach(f -> formatList.add(f.getValue()))); + + out.setName(input.getId().replace(":", "$")); + + Optional + .ofNullable(input.getInstance()) + .ifPresent( + value -> value + .forEach(v -> { + + Optional + .ofNullable(v.getHostedby()) + .ifPresent(hb -> externals.add(KeyValue.newInstance("hosted by", hb.getValue()))); + + final HashSet urlSet = new HashSet<>(); + Optional + .ofNullable(v.getUrl()) + .ifPresent(u -> u.forEach(url -> urlSet.add(url))); + urlSet.forEach(url -> externals.add(KeyValue.newInstance("url", url))); + + })); + + Optional + .ofNullable(input.getLanguage()) + .ifPresent(value -> externals.add(KeyValue.newInstance("language", value.getClassname()))); + + List iTitle = Optional + .ofNullable(input.getTitle()) + .map( + value -> value + .stream() + .filter(t -> t.getQualifier().getClassid().equalsIgnoreCase("main title")) + .collect(Collectors.toList())) + .orElse(new ArrayList<>()); + + if (iTitle.size() > 0) { + out.setTitle(iTitle.get(0).getValue()); + } + + Optional + .ofNullable(input.getPid()) + .ifPresent( + value -> value + .forEach( + v -> externals + .add(KeyValue.newInstance("pid", v.getQualifier().getClassid() + ":" + v.getValue())))); + + Optional + .ofNullable(input.getDateofacceptance()) + .ifPresent(value -> externals.add(KeyValue.newInstance("publication date", value.getValue()))); + + Optional + .ofNullable(input.getPublisher()) + .ifPresent(value -> externals.add(KeyValue.newInstance("publisher", value.getValue()))); + + List subjectList = new ArrayList<>(); + Optional + .ofNullable(input.getSubject()) + .ifPresent( + value -> value + .stream() + .forEach( + s -> externals + .add( + KeyValue + .newInstance("subject", s.getQualifier().getClassid() + ":" + s.getValue())))); + externals.add(KeyValue.newInstance("resource type", input.getResourcetype().getClassid())); + + out.setExtras(externals); + + } + + return out; + } + + private static eu.dnetlib.dhp.schema.dump.oaf.Author getAuthor(eu.dnetlib.dhp.schema.oaf.Author oa) { + eu.dnetlib.dhp.schema.dump.oaf.Author a = new eu.dnetlib.dhp.schema.dump.oaf.Author(); + Optional + .ofNullable(oa.getAffiliation()) + .ifPresent( + value -> a + .setAffiliation( + value + .stream() + .map(aff -> aff.getValue()) + .collect(Collectors.toList()))); + a.setFullname(oa.getFullname()); + a.setName(oa.getName()); + a.setSurname(oa.getSurname()); + a.setRank(oa.getRank()); + Optional + .ofNullable(oa.getPid()) + .ifPresent( + value -> a + .setPid( + value + .stream() + .map(p -> ControlledField.newInstance(p.getQualifier().getClassid(), p.getValue())) + .collect(Collectors.toList()))); + return a; + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/gcat/SparkDumpRISISCatalogue.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/gcat/SparkDumpRISISCatalogue.java new file mode 100644 index 0000000000..38ca1ea4a0 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/gcat/SparkDumpRISISCatalogue.java @@ -0,0 +1,126 @@ + +package eu.dnetlib.dhp.oa.graph.dump.gcat; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.CommunityMap; +import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem; +import eu.dnetlib.dhp.oa.graph.dump.SparkDumpCommunityProducts; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry; +import eu.dnetlib.dhp.schema.oaf.Context; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; + +public class SparkDumpRISISCatalogue implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkDumpRISISCatalogue.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkDumpRISISCatalogue.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/gcat/catalogue_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + final String communityName = parser.get("communityName"); + log.info("communityName: {}", communityName); + + Class inputClazz = (Class) Class.forName(resultClassName); + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + execDump(spark, inputPath, outputPath, inputClazz, communityName);// , dumpClazz); + + }); + + } + + public static void execDump(SparkSession spark, + String inputPath, + String outputPath, + Class inputClazz, + String communityName) {// Class dumpClazz) { + + // Set communities = communityMap.keySet(); + Dataset tmp = Utils.readPath(spark, inputPath, inputClazz); + + tmp + .map( + value -> execMap(value, communityName), + Encoders.bean(eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry.class)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + + } + + private static CatalogueEntry execMap(I value, String community) { + { + + Optional> inputContext = Optional.ofNullable(value.getContext()); + if (!inputContext.isPresent()) { + return null; + } + List toDumpFor = inputContext.get().stream().map(c -> { + String id = c.getId(); + if (id.contains("::")) { + id = id.substring(0, id.indexOf("::")); + } + if (community.equals(id)) { + return id; + } + return null; + }).filter(Objects::nonNull).collect(Collectors.toList()); + if (toDumpFor.size() == 0) { + return null; + } + return Mapper.map(value); + } + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/gcat/catalogue_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/gcat/catalogue_parameters.json new file mode 100644 index 0000000000..b68449458f --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/gcat/catalogue_parameters.json @@ -0,0 +1,38 @@ + + +[ + + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + { + "paramName": "ssm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false + }, + { + "paramName":"tn", + "paramLongName":"resultTableName", + "paramDescription": "the name of the result table we are currently working on", + "paramRequired": true + }, + { + "paramName":"cm", + "paramLongName":"communityName", + "paramDescription": "the name of the community for which to execute the dump to the catalogue", + "paramRequired": true + } +] + + + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/gcat/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/gcat/oozie_app/config-default.xml new file mode 100644 index 0000000000..9608732eda --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/gcat/oozie_app/config-default.xml @@ -0,0 +1,26 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + hiveDbName + openaire + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/gcat/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/gcat/oozie_app/workflow.xml new file mode 100644 index 0000000000..82c134f89c --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/gcat/oozie_app/workflow.xml @@ -0,0 +1,216 @@ + + + + + sourcePath + the source path + + + outputPath + the output path + + + communityName + The name of the community for which execute the dump for the catalogue + + + hiveDbName + the target hive database name + + + hiveJdbcUrl + hive server jdbc url + + + hiveMetastoreUris + hive server metastore URIs + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + + + + + + + + + yarn + cluster + Dump table publication for RISIS related products + eu.dnetlib.dhp.oa.graph.dump.gcat.SparkDumpRISISCatalogue + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath}/publication + --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication + --outputPath${workingDir}/publication + --communityName${communityName} + + + + + + + + yarn + cluster + Dump table dataset for RISIS related products + eu.dnetlib.dhp.oa.graph.dump.gcat.SparkDumpRISISCatalogue + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath}/dataset + --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset + --outputPath${workingDir}/dataset + --communityName${communityName} + + + + + + + + yarn + cluster + Dump table other for RISIS related products + eu.dnetlib.dhp.oa.graph.dump.gcat.SparkDumpRISISCatalogue + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath}/otherresearchproduct + --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + --outputPath${workingDir}/otherresearchproduct + --communityName${communityName} + + + + + + + + yarn + cluster + Dump table software for RISIS related products + eu.dnetlib.dhp.oa.graph.dump.gcat.SparkDumpRISISCatalogue + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath}/software + --resultTableNameeu.dnetlib.dhp.schema.oaf.Software + --outputPath${workingDir}/software + --communityName${communityName} + + + + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/GCatAPIClientTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/GCatAPIClientTest.java index fc084e9b70..4272276f96 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/GCatAPIClientTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/GCatAPIClientTest.java @@ -1,35 +1,38 @@ + package eu.dnetlib.dhp.oa.graph.dump; -import eu.dnetlib.dhp.oa.graph.dump.gcat.GCatAPIClient; +import java.io.IOException; +import java.net.URISyntaxException; + import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import java.io.IOException; -import java.net.URISyntaxException; +import eu.dnetlib.dhp.oa.graph.dump.gcat.GCatAPIClient; /** * TODO: ask for a token for the dev gcat */ public class GCatAPIClientTest { - private static GCatAPIClient client; + private static GCatAPIClient client; - //@BeforeAll - public static void setup(){ - client = new GCatAPIClient(); - client.setApplicationToken(""); - client.setGcatBaseURL("https://gcat.d4science.org/gcat/"); - } + // @BeforeAll + public static void setup() { + client = new GCatAPIClient(); + client.setApplicationToken(""); + client.setGcatBaseURL("https://gcat.d4science.org/gcat/"); + } - // @Test - public void testList() throws IOException, URISyntaxException { - System.out.println( client.list(0, 10)); - } + // @Test + public void testList() throws IOException, URISyntaxException { + System.out.println(client.list(0, 10)); + } - //@Test - public void testPublish() throws IOException { - String json = IOUtils.toString(getClass().getResourceAsStream("eu/dnetlib/dhp/oa/graph/dump/gcat/gcat_pub.json")); - client.publish(json); - } + // @Test + public void testPublish() throws IOException { + String json = IOUtils + .toString(getClass().getResourceAsStream("eu/dnetlib/dhp/oa/graph/dump/gcat/gcat_pub.json")); + client.publish(json); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/ZenodoUploadTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/ZenodoUploadTest.java index c6a3b0a4dc..b10fb889c6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/ZenodoUploadTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/ZenodoUploadTest.java @@ -4,14 +4,16 @@ package eu.dnetlib.dhp.oa.graph.dump; import java.io.IOException; import java.util.Arrays; -import com.google.gson.Gson; -import eu.dnetlib.dhp.oa.graph.dump.zenodo.Creator; -import eu.dnetlib.dhp.oa.graph.dump.zenodo.Metadata; -import eu.dnetlib.dhp.oa.graph.dump.zenodo.ZenodoModel; import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import com.google.gson.Gson; + +import eu.dnetlib.dhp.oa.graph.dump.zenodo.Creator; +import eu.dnetlib.dhp.oa.graph.dump.zenodo.Metadata; +import eu.dnetlib.dhp.oa.graph.dump.zenodo.ZenodoModel; + public class ZenodoUploadTest { @Test @@ -22,21 +24,33 @@ public class ZenodoUploadTest { Assertions.assertEquals(201, s.connect()); - s.upload(getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/ni") - .getPath(), "Neuroinformatics"); + s + .upload( + getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/ni") + .getPath(), + "Neuroinformatics"); - s.upload(getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/dh-ch") - .getPath(), "DigitalHumanitiesandCulturalHeritage"); + s + .upload( + getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/dh-ch") + .getPath(), + "DigitalHumanitiesandCulturalHeritage"); - s.upload(getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/egi") - .getPath(), "EGI"); + s + .upload( + getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/egi") + .getPath(), + "EGI"); - s.upload(getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/science-innovation-policy") - .getPath(), "ScienceandInnovationPolicyStudies"); + s + .upload( + getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/zenodo/science-innovation-policy") + .getPath(), + "ScienceandInnovationPolicyStudies"); // // @@ -67,12 +81,12 @@ public class ZenodoUploadTest { } - @Test public void testPublish() throws IOException { APIClient s = new APIClient("https://sandbox.zenodo.org/api/deposit/depositions"); s.publish(); } + @Test public void testUpload() throws IOException { @@ -84,7 +98,5 @@ public class ZenodoUploadTest { s.upload(sourcePath, "Neuroinformatics"); - - } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/gcat/DumpJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/gcat/DumpJobTest.java new file mode 100644 index 0000000000..1678c0bdb7 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/gcat/DumpJobTest.java @@ -0,0 +1,242 @@ + +package eu.dnetlib.dhp.oa.graph.gcat; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; + +import eu.dnetlib.dhp.oa.graph.dump.SparkDumpCommunityProducts; +import eu.dnetlib.dhp.oa.graph.dump.gcat.SparkDumpRISISCatalogue; + +//@ExtendWith(MockitoExtension.class) +public class DumpJobTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + + private static final Logger log = LoggerFactory.getLogger(DumpJobTest.class); + + private static HashMap map = new HashMap<>(); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(DumpJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(DumpJobTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(DumpJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void testDataset() throws Exception { + + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/dataset.json") + .getPath(); + + SparkDumpRISISCatalogue.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/result", + "-sourcePath", sourcePath, + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", + "-communityName", "risis" + }); + +// dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset"); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/result") + .map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry.class)); + + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry.class)); + + Assertions.assertEquals(90, verificationDataset.count()); + // verificationDataset.show(false); + + Assertions + .assertTrue( + verificationDataset.filter("bestAccessright.code = 'c_abf2'").count() == verificationDataset + .filter("bestAccessright.code = 'c_abf2' and bestAccessright.label = 'OPEN'") + .count()); + + Assertions + .assertTrue( + verificationDataset.filter("bestAccessright.code = 'c_16ec'").count() == verificationDataset + .filter("bestAccessright.code = 'c_16ec' and bestAccessright.label = 'RESTRICTED'") + .count()); + + Assertions + .assertTrue( + verificationDataset.filter("bestAccessright.code = 'c_14cb'").count() == verificationDataset + .filter("bestAccessright.code = 'c_14cb' and bestAccessright.label = 'CLOSED'") + .count()); + + Assertions + .assertTrue( + verificationDataset.filter("bestAccessright.code = 'c_f1cf'").count() == verificationDataset + .filter("bestAccessright.code = 'c_f1cf' and bestAccessright.label = 'EMBARGO'") + .count()); + + Assertions.assertTrue(verificationDataset.filter("size(context) > 0").count() == 90); + + Assertions.assertTrue(verificationDataset.filter("type = 'dataset'").count() == 90); + + // verificationDataset.select("instance.type").show(false); + +//TODO verify value and name of the fields for vocab related value (i.e. accessright, bestaccessright) + + } + +// @Test +// public void testPublication() throws Exception { +// +// final String sourcePath = getClass() +// .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/publication.json") +// .getPath(); +// +// SparkDumpCommunityProducts.main(new String[] { +// "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, +// "-isSparkSessionManaged", Boolean.FALSE.toString(), +// "-outputPath", workingDir.toString() + "/result", +// "-sourcePath", sourcePath, +// "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication", +// "-communityMap", new Gson().toJson(map) +// }); +// +//// dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset"); +// +// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); +// +// JavaRDD tmp = sc +// .textFile(workingDir.toString() + "/result") +// .map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.dump.oaf.Result.class)); +// +// org.apache.spark.sql.Dataset verificationDataset = spark +// .createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.Result.class)); +// +// Assertions.assertEquals(76, verificationDataset.count()); +// verificationDataset.show(false); +// +// Assertions.assertEquals(76, verificationDataset.filter("type = 'publication'").count()); +// +////TODO verify value and name of the fields for vocab related value (i.e. accessright, bestaccessright) +// +// } +// +// @Test +// public void testSoftware() throws Exception { +// +// final String sourcePath = getClass() +// .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/software.json") +// .getPath(); +// +// SparkDumpCommunityProducts.main(new String[] { +// "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, +// "-isSparkSessionManaged", Boolean.FALSE.toString(), +// "-outputPath", workingDir.toString() + "/result", +// "-sourcePath", sourcePath, +// "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software", +// "-communityMap", new Gson().toJson(map) +// }); +// +//// dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset"); +// +// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); +// +// JavaRDD tmp = sc +// .textFile(workingDir.toString() + "/result") +// .map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.dump.oaf.Result.class)); +// +// org.apache.spark.sql.Dataset verificationDataset = spark +// .createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.Result.class)); +// +// Assertions.assertEquals(6, verificationDataset.count()); +// +// Assertions.assertEquals(6, verificationDataset.filter("type = 'software'").count()); +// verificationDataset.show(false); +// +////TODO verify value and name of the fields for vocab related value (i.e. accessright, bestaccessright) +// +// } +// +// @Test +// public void testORP() throws Exception { +// +// final String sourcePath = getClass() +// .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/orp.json") +// .getPath(); +// +// SparkDumpCommunityProducts.main(new String[] { +// "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, +// "-isSparkSessionManaged", Boolean.FALSE.toString(), +// "-outputPath", workingDir.toString() + "/result", +// "-sourcePath", sourcePath, +// "-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct", +// "-communityMap", new Gson().toJson(map) +// }); +// +//// dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset"); +// +// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); +// +// JavaRDD tmp = sc +// .textFile(workingDir.toString() + "/result") +// .map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.dump.oaf.Result.class)); +// +// org.apache.spark.sql.Dataset verificationDataset = spark +// .createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.Result.class)); +// +// Assertions.assertEquals(3, verificationDataset.count()); +// +// Assertions.assertEquals(3, verificationDataset.filter("type = 'other'").count()); +// verificationDataset.show(false); +// +////TODO verify value and name of the fields for vocab related value (i.e. accessright, bestaccessright) +// +// } + +}