From 19a80e46384300ba75eade1b88ba5365e8e022e7 Mon Sep 17 00:00:00 2001 From: "sandro.labruzzo" Date: Fri, 24 Jan 2020 09:58:55 +0100 Subject: [PATCH] implemented workfow for aggregation and generation of infospace graph --- dhp-common/pom.xml | 4 + .../dhp/parser/utility/VtdException.java | 12 + .../dhp/parser/utility/VtdUtilityParser.java | 107 +++++++ dhp-schemas/pom.xml | 6 + .../eu/dnetlib/dhp/schema/oaf/KeyValue.java | 7 +- .../dhp/schema/scholexplorer/DLIDataset.java | 70 +++++ .../schema/scholexplorer/DLIPublication.java | 66 +++++ .../dhp/schema/scholexplorer/DLIUnknown.java | 108 +++++++ .../schema/scholexplorer/ProvenaceInfo.java | 46 +++ .../dhp/schema/scholexplorer/DLItest.java | 81 ++++++ dhp-workflows/dhp-dedup/pom.xml | 4 - dhp-workflows/dhp-graph-mapper/pom.xml | 4 + .../SparkExtractEntitiesJob.java | 101 +++++++ .../SparkScholexplorerGraphImporter.java | 49 ++++ .../SparkScholexplorerMergeEntitiesJob.java | 138 +++++++++ .../parser/AbstractScholexplorerParser.java | 112 ++++++++ .../parser/DatasetScholexplorerParser.java | 263 ++++++++++++++++++ .../PublicationScholexplorerParser.java | 233 ++++++++++++++++ .../input_extract_entities_parameters.json | 7 + .../graph/input_graph_scholix_parameters.json | 6 + .../merge_entities_scholix_parameters.json | 6 + .../oozie_app/config-default.xml | 10 + .../mergeentities/oozie_app/workflow.xml | 64 +++++ .../oozie_app/config-default.xml | 10 + .../extractentities/oozie_app/workflow.xml | 68 +++++ .../oozie_app/config-default.xml | 10 + .../scholexplorer/oozie_app/workflow.xml | 63 +++++ .../SparkScholexplorerGraphImporterTest.java | 19 ++ pom.xml | 6 + 29 files changed, 1675 insertions(+), 5 deletions(-) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/parser/utility/VtdException.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/parser/utility/VtdUtilityParser.java create mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIDataset.java create mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIPublication.java create mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIUnknown.java create mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/ProvenaceInfo.java create mode 100644 dhp-schemas/src/test/java/eu/dnetlib/dhp/schema/scholexplorer/DLItest.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkExtractEntitiesJob.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerGraphImporter.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/AbstractScholexplorerParser.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/DatasetScholexplorerParser.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/PublicationScholexplorerParser.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_extract_entities_parameters.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_scholix_parameters.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/merge_entities_scholix_parameters.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/mergeentities/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/mergeentities/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/extractentities/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/extractentities/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerGraphImporterTest.java diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml index 43c2a3834..59b7d35d2 100644 --- a/dhp-common/pom.xml +++ b/dhp-common/pom.xml @@ -42,6 +42,10 @@ com.rabbitmq amqp-client + + com.ximpleware + vtd-xml + diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/parser/utility/VtdException.java b/dhp-common/src/main/java/eu/dnetlib/dhp/parser/utility/VtdException.java new file mode 100644 index 000000000..77b28f207 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/parser/utility/VtdException.java @@ -0,0 +1,12 @@ +package eu.dnetlib.dhp.parser.utility; + +public class VtdException extends Exception { + + public VtdException(final Exception e) { + super(e); + } + + public VtdException(final Throwable e) { + super(e); + } +} \ No newline at end of file diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/parser/utility/VtdUtilityParser.java b/dhp-common/src/main/java/eu/dnetlib/dhp/parser/utility/VtdUtilityParser.java new file mode 100644 index 000000000..5d92e1c5f --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/parser/utility/VtdUtilityParser.java @@ -0,0 +1,107 @@ +package eu.dnetlib.dhp.parser.utility; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +import com.ximpleware.AutoPilot; +import com.ximpleware.VTDNav; + +/** + * Created by sandro on 9/29/16. + */ +public class VtdUtilityParser { + + public static List getTextValuesWithAttributes(final AutoPilot ap, final VTDNav vn, final String xpath, final List attributes) + throws VtdException { + final List results = new ArrayList<>(); + try { + ap.selectXPath(xpath); + + while (ap.evalXPath() != -1) { + final Node currentNode = new Node(); + int t = vn.getText(); + if (t >= 0) { + currentNode.setTextValue(vn.toNormalizedString(t)); + } + currentNode.setAttributes(getAttributes(vn, attributes)); + results.add(currentNode); + } + return results; + } catch (Exception e) { + throw new VtdException(e); + } + } + + private static Map getAttributes(final VTDNav vn, final List attributes) { + final Map currentAttributes = new HashMap<>(); + if (attributes != null) { + + attributes.forEach(attributeKey -> { + try { + int attr = vn.getAttrVal(attributeKey); + if (attr > -1) { + currentAttributes.put(attributeKey, vn.toNormalizedString(attr)); + } + } catch (Throwable e) { + throw new RuntimeException(e); + } + }); + } + return currentAttributes; + } + + public static List getTextValue(final AutoPilot ap, final VTDNav vn, final String xpath) throws VtdException { + List results = new ArrayList<>(); + try { + ap.selectXPath(xpath); + while (ap.evalXPath() != -1) { + int t = vn.getText(); + if (t > -1) results.add(vn.toNormalizedString(t)); + } + return results; + } catch (Exception e) { + throw new VtdException(e); + } + } + + public static String getSingleValue(final AutoPilot ap, final VTDNav nav, final String xpath) throws VtdException { + try { + ap.selectXPath(xpath); + while (ap.evalXPath() != -1) { + int it = nav.getText(); + if (it > -1) + return nav.toNormalizedString(it); + } + return null; + } catch (Exception e) { + throw new VtdException(e); + } + } + + public static class Node { + + private String textValue; + + private Map attributes; + + public String getTextValue() { + return textValue; + } + + public void setTextValue(final String textValue) { + this.textValue = textValue; + } + + public Map getAttributes() { + return attributes; + } + + public void setAttributes(final Map attributes) { + this.attributes = attributes; + } + } + +} diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml index 20896a61d..8bc30a8b0 100644 --- a/dhp-schemas/pom.xml +++ b/dhp-schemas/pom.xml @@ -32,6 +32,12 @@ ${project.version} + + com.fasterxml.jackson.core + jackson-databind + test + + diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/KeyValue.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/KeyValue.java index 74d9f77bd..59cefa40e 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/KeyValue.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/KeyValue.java @@ -1,9 +1,12 @@ package eu.dnetlib.dhp.schema.oaf; +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.commons.lang3.StringUtils; import java.io.Serializable; - +@JsonIgnoreProperties({"blank"}) public class KeyValue implements Serializable { private String key; @@ -36,10 +39,12 @@ public class KeyValue implements Serializable { this.dataInfo = dataInfo; } + @JsonIgnore public String toComparableString() { return isBlank()?"":String.format("%s::%s", key != null ? key.toLowerCase() : "", value != null ? value.toLowerCase() : ""); } + @JsonIgnore public boolean isBlank() { return StringUtils.isBlank(key) && StringUtils.isBlank(value); } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIDataset.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIDataset.java new file mode 100644 index 000000000..df124395f --- /dev/null +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIDataset.java @@ -0,0 +1,70 @@ +package eu.dnetlib.dhp.schema.scholexplorer; + +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DLIDataset extends Dataset { + + private List dlicollectedfrom; + + private String completionStatus; + + public String getCompletionStatus() { + return completionStatus; + } + + public void setCompletionStatus(String completionStatus) { + this.completionStatus = completionStatus; + } + + public List getDlicollectedfrom() { + return dlicollectedfrom; + } + + public void setDlicollectedfrom(List dlicollectedfrom) { + this.dlicollectedfrom = dlicollectedfrom; + } + + @Override + public void mergeFrom(OafEntity e) { + super.mergeFrom(e); + DLIDataset p = (DLIDataset) e; + if (StringUtils.isBlank(completionStatus) && StringUtils.isNotBlank(p.completionStatus)) + completionStatus = p.completionStatus; + if ("complete".equalsIgnoreCase(p.completionStatus)) + completionStatus = "complete"; + dlicollectedfrom = mergeProvenance(dlicollectedfrom, p.getDlicollectedfrom()); + } + + private List mergeProvenance(final List a, final List b) { + Map result = new HashMap<>(); + if (a != null) + a.forEach(p -> { + if (p != null && StringUtils.isNotBlank(p.getId()) && result.containsKey(p.getId())) { + if ("incomplete".equalsIgnoreCase(result.get(p.getId()).getCompletionStatus()) && StringUtils.isNotBlank(p.getCompletionStatus())) { + result.put(p.getId(), p); + } + + } else if (p != null && p.getId() != null && !result.containsKey(p.getId())) + result.put(p.getId(), p); + }); + if (b != null) + b.forEach(p -> { + if (p != null && StringUtils.isNotBlank(p.getId()) && result.containsKey(p.getId())) { + if ("incomplete".equalsIgnoreCase(result.get(p.getId()).getCompletionStatus()) && StringUtils.isNotBlank(p.getCompletionStatus())) { + result.put(p.getId(), p); + } + + } else if (p != null && p.getId() != null && !result.containsKey(p.getId())) + result.put(p.getId(), p); + }); + + return new ArrayList<>(result.values()); + } +} diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIPublication.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIPublication.java new file mode 100644 index 000000000..f0b5d0bd6 --- /dev/null +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIPublication.java @@ -0,0 +1,66 @@ +package eu.dnetlib.dhp.schema.scholexplorer; + +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Publication; +import org.apache.commons.lang3.StringUtils; +import java.io.Serializable; +import java.util.*; + +public class DLIPublication extends Publication implements Serializable { + private List dlicollectedfrom; + + private String completionStatus; + + public String getCompletionStatus() { + return completionStatus; + } + + public void setCompletionStatus(String completionStatus) { + this.completionStatus = completionStatus; + } + + public List getDlicollectedfrom() { + return dlicollectedfrom; + } + + public void setDlicollectedfrom(List dlicollectedfrom) { + this.dlicollectedfrom = dlicollectedfrom; + } + + @Override + public void mergeFrom(OafEntity e) { + super.mergeFrom(e); + DLIPublication p = (DLIPublication) e; + if (StringUtils.isBlank(completionStatus) && StringUtils.isNotBlank(p.completionStatus)) + completionStatus = p.completionStatus; + if ("complete".equalsIgnoreCase(p.completionStatus)) + completionStatus = "complete"; + dlicollectedfrom = mergeProvenance(dlicollectedfrom, p.getDlicollectedfrom()); + } + + private List mergeProvenance(final List a, final List b) { + Map result = new HashMap<>(); + if (a != null) + a.forEach(p -> { + if (p != null && StringUtils.isNotBlank(p.getId()) && result.containsKey(p.getId())) { + if ("incomplete".equalsIgnoreCase(result.get(p.getId()).getCompletionStatus()) && StringUtils.isNotBlank(p.getCompletionStatus())) { + result.put(p.getId(), p); + } + + } else if (p != null && p.getId() != null && !result.containsKey(p.getId())) + result.put(p.getId(), p); + }); + if (b != null) + b.forEach(p -> { + if (p != null && StringUtils.isNotBlank(p.getId()) && result.containsKey(p.getId())) { + if ("incomplete".equalsIgnoreCase(result.get(p.getId()).getCompletionStatus()) && StringUtils.isNotBlank(p.getCompletionStatus())) { + result.put(p.getId(), p); + } + + } else if (p != null && p.getId() != null && !result.containsKey(p.getId())) + result.put(p.getId(), p); + }); + + return new ArrayList<>(result.values()); + } +} diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIUnknown.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIUnknown.java new file mode 100644 index 000000000..c7e6dda27 --- /dev/null +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIUnknown.java @@ -0,0 +1,108 @@ +package eu.dnetlib.dhp.schema.scholexplorer; + +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import org.apache.commons.lang3.StringUtils; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DLIUnknown extends Oaf implements Serializable { + + private String id; + + private List pid; + + private String dateofcollection; + + private String dateoftransformation; + + private List dlicollectedfrom; + + private String completionStatus = "incomplete"; + + public String getCompletionStatus() { + return completionStatus; + } + + public void setCompletionStatus(String completionStatus) { + this.completionStatus = completionStatus; + } + + public List getDlicollectedfrom() { + return dlicollectedfrom; + } + + public void setDlicollectedfrom(List dlicollectedfrom) { + this.dlicollectedfrom = dlicollectedfrom; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + + public List getPid() { + return pid; + } + + public void setPid(List pid) { + this.pid = pid; + } + + public String getDateofcollection() { + return dateofcollection; + } + + public void setDateofcollection(String dateofcollection) { + this.dateofcollection = dateofcollection; + } + + public String getDateoftransformation() { + return dateoftransformation; + } + + public void setDateoftransformation(String dateoftransformation) { + this.dateoftransformation = dateoftransformation; + } + + public void mergeFrom(DLIUnknown p) { + if ("complete".equalsIgnoreCase(p.completionStatus)) + completionStatus = "complete"; + dlicollectedfrom = mergeProvenance(dlicollectedfrom, p.getDlicollectedfrom()); + } + + private List mergeProvenance(final List a, final List b) { + Map result = new HashMap<>(); + if (a != null) + a.forEach(p -> { + if (p != null && StringUtils.isNotBlank(p.getId()) && result.containsKey(p.getId())) { + if ("incomplete".equalsIgnoreCase(result.get(p.getId()).getCompletionStatus()) && StringUtils.isNotBlank(p.getCompletionStatus())) { + result.put(p.getId(), p); + } + + } else if (p != null && p.getId() != null && !result.containsKey(p.getId())) + result.put(p.getId(), p); + }); + if (b != null) + b.forEach(p -> { + if (p != null && StringUtils.isNotBlank(p.getId()) && result.containsKey(p.getId())) { + if ("incomplete".equalsIgnoreCase(result.get(p.getId()).getCompletionStatus()) && StringUtils.isNotBlank(p.getCompletionStatus())) { + result.put(p.getId(), p); + } + + } else if (p != null && p.getId() != null && !result.containsKey(p.getId())) + result.put(p.getId(), p); + }); + + return new ArrayList<>(result.values()); + } +} diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/ProvenaceInfo.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/ProvenaceInfo.java new file mode 100644 index 000000000..3fe069b03 --- /dev/null +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/ProvenaceInfo.java @@ -0,0 +1,46 @@ +package eu.dnetlib.dhp.schema.scholexplorer; + +import java.io.Serializable; + +public class ProvenaceInfo implements Serializable { + + private String id; + + private String name; + + private String completionStatus; + + private String collectionMode ="collected"; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getCompletionStatus() { + return completionStatus; + } + + public void setCompletionStatus(String completionStatus) { + this.completionStatus = completionStatus; + } + + public String getCollectionMode() { + return collectionMode; + } + + public void setCollectionMode(String collectionMode) { + this.collectionMode = collectionMode; + } +} diff --git a/dhp-schemas/src/test/java/eu/dnetlib/dhp/schema/scholexplorer/DLItest.java b/dhp-schemas/src/test/java/eu/dnetlib/dhp/schema/scholexplorer/DLItest.java new file mode 100644 index 000000000..54f5f5f06 --- /dev/null +++ b/dhp-schemas/src/test/java/eu/dnetlib/dhp/schema/scholexplorer/DLItest.java @@ -0,0 +1,81 @@ +package eu.dnetlib.dhp.schema.scholexplorer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; + +public class DLItest { + + + @Test + public void testMergePublication() throws JsonProcessingException { + DLIPublication a1 = new DLIPublication(); + a1.setPid(Arrays.asList( createSP("123456","pdb","dnet:pid_types"))); + a1.setTitle(Collections.singletonList(createSP("Un Titolo", "title", "dnetTitle"))); + a1.setDlicollectedfrom(Arrays.asList(createCollectedFrom("znd","Zenodo","complete"))); + a1.setCompletionStatus("complete"); + + DLIPublication a = new DLIPublication(); + a.setPid(Arrays.asList(createSP("10.11","doi","dnet:pid_types"), createSP("123456","pdb","dnet:pid_types"))); + a.setTitle(Collections.singletonList(createSP("A Title", "title", "dnetTitle"))); + a.setDlicollectedfrom(Arrays.asList(createCollectedFrom("dct","datacite","complete"),createCollectedFrom("dct","datacite","incomplete"))); + a.setCompletionStatus("incomplete"); + + a.mergeFrom(a1); + + ObjectMapper mapper = new ObjectMapper(); + System.out.println(mapper.writeValueAsString(a)); + + + + + + + + } + + + + @Test + public void testDeserialization() throws IOException { + + final String json ="{\"dataInfo\":{\"invisible\":false,\"inferred\":null,\"deletedbyinference\":false,\"trust\":\"0.9\",\"inferenceprovenance\":null,\"provenanceaction\":null},\"lastupdatetimestamp\":null,\"id\":\"60|bd9352547098929a394655ad1a44a479\",\"originalId\":[\"bd9352547098929a394655ad1a44a479\"],\"collectedfrom\":[{\"key\":\"dli_________::datacite\",\"value\":\"Datasets in Datacite\",\"dataInfo\":null,\"blank\":false}],\"pid\":[{\"value\":\"10.7925/DRS1.DUCHAS_5078760\",\"qualifier\":{\"classid\":\"doi\",\"classname\":\"doi\",\"schemeid\":\"dnet:pid_types\",\"schemename\":\"dnet:pid_types\",\"blank\":false},\"dataInfo\":null}],\"dateofcollection\":\"2020-01-09T08:29:31.885Z\",\"dateoftransformation\":null,\"extraInfo\":null,\"oaiprovenance\":null,\"author\":[{\"fullname\":\"Cathail, S. Ó\",\"name\":null,\"surname\":null,\"rank\":null,\"pid\":null,\"affiliation\":null},{\"fullname\":\"Donnell, Breda Mc\",\"name\":null,\"surname\":null,\"rank\":null,\"pid\":null,\"affiliation\":null},{\"fullname\":\"Ireland. Department of Arts, Culture, and the Gaeltacht\",\"name\":null,\"surname\":null,\"rank\":null,\"pid\":null,\"affiliation\":null},{\"fullname\":\"University College Dublin\",\"name\":null,\"surname\":null,\"rank\":null,\"pid\":null,\"affiliation\":null},{\"fullname\":\"National Folklore Foundation\",\"name\":null,\"surname\":null,\"rank\":null,\"pid\":null,\"affiliation\":null},{\"fullname\":\"Cathail, S. Ó\",\"name\":null,\"surname\":null,\"rank\":null,\"pid\":null,\"affiliation\":null},{\"fullname\":\"Donnell, Breda Mc\",\"name\":null,\"surname\":null,\"rank\":null,\"pid\":null,\"affiliation\":null}],\"resulttype\":null,\"language\":null,\"country\":null,\"subject\":[{\"value\":\"Recreation\",\"qualifier\":{\"classid\":\"dnet:subject\",\"classname\":\"dnet:subject\",\"schemeid\":\"unknown\",\"schemename\":\"unknown\",\"blank\":false},\"dataInfo\":null},{\"value\":\"Entertainments and recreational activities\",\"qualifier\":{\"classid\":\"dnet:subject\",\"classname\":\"dnet:subject\",\"schemeid\":\"unknown\",\"schemename\":\"unknown\",\"blank\":false},\"dataInfo\":null},{\"value\":\"Siamsaíocht agus caitheamh aimsire\",\"qualifier\":{\"classid\":\"dnet:subject\",\"classname\":\"dnet:subject\",\"schemeid\":\"unknown\",\"schemename\":\"unknown\",\"blank\":false},\"dataInfo\":null}],\"title\":[{\"value\":\"Games We Play\",\"qualifier\":null,\"dataInfo\":null}],\"relevantdate\":[{\"value\":\"1938-09-28\",\"qualifier\":{\"classid\":\"date\",\"classname\":\"date\",\"schemeid\":\"dnet::date\",\"schemename\":\"dnet::date\",\"blank\":false},\"dataInfo\":null}],\"description\":[{\"value\":\"Story collected by Breda Mc Donnell, a student at Tenure school (Tinure, Co. Louth) (no informant identified).\",\"dataInfo\":null}],\"dateofacceptance\":null,\"publisher\":{\"value\":\"University College Dublin\",\"dataInfo\":null},\"embargoenddate\":null,\"source\":null,\"fulltext\":null,\"format\":null,\"contributor\":null,\"resourcetype\":null,\"coverage\":null,\"refereed\":null,\"context\":null,\"processingchargeamount\":null,\"processingchargecurrency\":null,\"externalReference\":null,\"instance\":[],\"storagedate\":null,\"device\":null,\"size\":null,\"version\":null,\"lastmetadataupdate\":null,\"metadataversionnumber\":null,\"geolocation\":null,\"dlicollectedfrom\":[{\"id\":\"dli_________::datacite\",\"name\":\"Datasets in Datacite\",\"completionStatus\":\"complete\",\"collectionMode\":\"resolved\"}],\"completionStatus\":\"complete\"}"; + + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + DLIDataset dliDataset = mapper.readValue(json, DLIDataset.class); + mapper.enable(SerializationFeature.INDENT_OUTPUT); + System.out.println(mapper.writeValueAsString(dliDataset)); + } + + private ProvenaceInfo createCollectedFrom(final String id, final String name, final String completionStatus) { + ProvenaceInfo p = new ProvenaceInfo(); + p.setId(id); + p.setName(name); + p.setCompletionStatus(completionStatus); + return p; + } + + + private StructuredProperty createSP(final String value, final String className, final String schemeName) { + StructuredProperty p = new StructuredProperty(); + p.setValue(value); + Qualifier schema = new Qualifier(); + schema.setClassname(className); + schema.setClassid(className); + schema.setSchemename(schemeName); + schema.setSchemeid(schemeName); + p.setQualifier(schema); + return p; + } + + +} diff --git a/dhp-workflows/dhp-dedup/pom.xml b/dhp-workflows/dhp-dedup/pom.xml index 28ef6a453..67bcc27c1 100644 --- a/dhp-workflows/dhp-dedup/pom.xml +++ b/dhp-workflows/dhp-dedup/pom.xml @@ -31,10 +31,6 @@ dhp-schemas ${project.version} - - com.arakelian - java-jq - eu.dnetlib diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml index 9186fa829..ff7450663 100644 --- a/dhp-workflows/dhp-graph-mapper/pom.xml +++ b/dhp-workflows/dhp-graph-mapper/pom.xml @@ -30,6 +30,10 @@ dhp-schemas ${project.version} + + com.jayway.jsonpath + json-path + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkExtractEntitiesJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkExtractEntitiesJob.java new file mode 100644 index 000000000..686337c7a --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkExtractEntitiesJob.java @@ -0,0 +1,101 @@ +package eu.dnetlib.dhp.graph.scholexplorer; + +import com.jayway.jsonpath.JsonPath; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.graph.SparkGraphImporterJob; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import net.minidev.json.JSONArray; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + + +public class SparkExtractEntitiesJob { + final static String IDJSONPATH = "$.id"; + final static String SOURCEJSONPATH = "$.source"; + final static String TARGETJSONPATH = "$.target"; + + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkExtractEntitiesJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_extract_entities_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkGraphImporterJob.class.getSimpleName()) + .master(parser.get("master")) + .getOrCreate(); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String targetPath = parser.get("targetPath"); + final String tdir =parser.get("targetDir"); + final JavaRDD inputRDD = sc.textFile(inputPath); + + List entities = Arrays.stream(parser.get("entities").split(",")).map(String::trim).collect(Collectors.toList()); + if (entities.stream().anyMatch("dataset"::equalsIgnoreCase)) { + //Extract Dataset + inputRDD.filter(SparkExtractEntitiesJob::isDataset).saveAsTextFile(targetPath + "/dataset/"+tdir, GzipCodec.class); + } + if (entities.stream().anyMatch("unknown"::equalsIgnoreCase)) { + //Extract Unknown + inputRDD.filter(SparkExtractEntitiesJob::isUnknown).saveAsTextFile(targetPath + "/unknown/"+tdir, GzipCodec.class); + } + + if (entities.stream().anyMatch("relation"::equalsIgnoreCase)) { + //Extract Relation + inputRDD.filter(SparkExtractEntitiesJob::isRelation).saveAsTextFile(targetPath + "/relation/"+tdir, GzipCodec.class); + } + if (entities.stream().anyMatch("publication"::equalsIgnoreCase)) { + //Extract Relation + inputRDD.filter(SparkExtractEntitiesJob::isPublication).saveAsTextFile(targetPath + "/publication/"+tdir, GzipCodec.class); + } + } + + + public static boolean isDataset(final String json) { + final String id = getJPathString(IDJSONPATH, json); + if (StringUtils.isBlank(id)) return false; + return id.startsWith("60|"); + } + + + public static boolean isPublication(final String json) { + final String id = getJPathString(IDJSONPATH, json); + if (StringUtils.isBlank(id)) return false; + return id.startsWith("50|"); + } + + public static boolean isUnknown(final String json) { + final String id = getJPathString(IDJSONPATH, json); + if (StringUtils.isBlank(id)) return false; + return id.startsWith("70|"); + } + + public static boolean isRelation(final String json) { + final String source = getJPathString(SOURCEJSONPATH, json); + final String target = getJPathString(TARGETJSONPATH, json); + return StringUtils.isNotBlank(source) && StringUtils.isNotBlank(target); + } + + + public static String getJPathString(final String jsonPath, final String json) { + try { + Object o = JsonPath.read(json, jsonPath); + if (o instanceof String) + return (String) o; + if (o instanceof JSONArray && ((JSONArray) o).size() > 0) + return (String) ((JSONArray) o).get(0); + return ""; + } catch (Exception e) { + return ""; + } + } + + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerGraphImporter.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerGraphImporter.java new file mode 100644 index 000000000..33c269622 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerGraphImporter.java @@ -0,0 +1,49 @@ +package eu.dnetlib.dhp.graph.scholexplorer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.graph.SparkGraphImporterJob; +import eu.dnetlib.dhp.graph.scholexplorer.parser.DatasetScholexplorerParser; +import eu.dnetlib.dhp.graph.scholexplorer.parser.PublicationScholexplorerParser; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +public class SparkScholexplorerGraphImporter { + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkScholexplorerGraphImporter.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_scholix_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkGraphImporterJob.class.getSimpleName()) + .master(parser.get("master")) + .getOrCreate(); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + + sc.sequenceFile(inputPath, IntWritable.class, Text.class).map(Tuple2::_2).map(Text::toString).repartition(500) + .flatMap((FlatMapFunction) record -> { + switch (parser.get("entity")) { + case "dataset": + final DatasetScholexplorerParser d = new DatasetScholexplorerParser(); + return d.parseObject(record).iterator(); + case "publication": + final PublicationScholexplorerParser p = new PublicationScholexplorerParser(); + return p.parseObject(record).iterator(); + default: + throw new IllegalArgumentException("wrong values of entities"); + } + }).map(k -> { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(k); + }).saveAsTextFile(parser.get("targetPath"), GzipCodec.class); + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java new file mode 100644 index 000000000..b320fd51c --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java @@ -0,0 +1,138 @@ +package eu.dnetlib.dhp.graph.scholexplorer; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.jayway.jsonpath.JsonPath; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.graph.SparkGraphImporterJob; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset; +import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication; +import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown; +import eu.dnetlib.dhp.utils.DHPUtils; +import net.minidev.json.JSONArray; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class SparkScholexplorerMergeEntitiesJob { + + final static String IDJSONPATH = "$.id"; + final static String SOURCEJSONPATH = "$.source"; + final static String TARGETJSONPATH = "$.target"; + final static String RELJSONPATH = "$.relType"; + + public static void main(String[] args) throws Exception { + + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkScholexplorerMergeEntitiesJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/merge_entities_scholix_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkGraphImporterJob.class.getSimpleName()) + .master(parser.get("master")) + .getOrCreate(); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String targetPath = parser.get("targetPath"); + final String entity = parser.get("entity"); + + + FileSystem fs = FileSystem.get(sc.sc().hadoopConfiguration()); + List subFolder = Arrays.stream(fs.listStatus(new Path(inputPath))).filter(FileStatus::isDirectory).map(FileStatus::getPath).collect(Collectors.toList()); + List> inputRdd = new ArrayList<>(); + subFolder.forEach(p -> inputRdd.add(sc.textFile(p.toUri().getRawPath()))); + JavaRDD union = sc.emptyRDD(); + for (JavaRDD item : inputRdd) { + union = union.union(item); + } + switch (entity) { + case "dataset": + union.mapToPair((PairFunction) f -> { + final String id = getJPathString(IDJSONPATH, f); + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return new Tuple2<>(id, mapper.readValue(f, DLIDataset.class)); + }).reduceByKey((a, b) -> { + a.mergeFrom(b); + return a; + }).map(item -> { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(item._2()); + }).saveAsTextFile(targetPath, GzipCodec.class); + break; + case "publication": + union.mapToPair((PairFunction) f -> { + final String id = getJPathString(IDJSONPATH, f); + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return new Tuple2<>(id, mapper.readValue(f, DLIPublication.class)); + }).reduceByKey((a, b) -> { + a.mergeFrom(b); + return a; + }).map(item -> { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(item._2()); + }).saveAsTextFile(targetPath, GzipCodec.class); + break; + case "unknown": + union.mapToPair((PairFunction) f -> { + final String id = getJPathString(IDJSONPATH, f); + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return new Tuple2<>(id, mapper.readValue(f, DLIUnknown.class)); + }).reduceByKey((a, b) -> { + a.mergeFrom(b); + return a; + }).map(item -> { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(item._2()); + }).saveAsTextFile(targetPath, GzipCodec.class); + break; + case "relation": + union.mapToPair((PairFunction) f -> { + final String source = getJPathString(SOURCEJSONPATH, f); + final String target = getJPathString(TARGETJSONPATH, f); + final String reltype = getJPathString(RELJSONPATH, f); + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return new Tuple2<>(DHPUtils.md5(String.format("%s::%s::%s", source, reltype, target)), mapper.readValue(f, Relation.class)); + }).reduceByKey((a, b) -> { + a.mergeOAFDataInfo(b); + return a; + }).map(item -> { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(item._2()); + }).saveAsTextFile(targetPath, GzipCodec.class); + break; + } + } + + public static String getJPathString(final String jsonPath, final String json) { + try { + Object o = JsonPath.read(json, jsonPath); + if (o instanceof String) + return (String) o; + if (o instanceof JSONArray && ((JSONArray) o).size() > 0) + return (String) ((JSONArray) o).get(0); + return ""; + } catch (Exception e) { + return ""; + } + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/AbstractScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/AbstractScholexplorerParser.java new file mode 100644 index 000000000..0ba7b25ee --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/AbstractScholexplorerParser.java @@ -0,0 +1,112 @@ +package eu.dnetlib.dhp.graph.scholexplorer.parser; + + +import eu.dnetlib.dhp.parser.utility.VtdUtilityParser; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.utils.DHPUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.xml.stream.XMLStreamReader; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public abstract class AbstractScholexplorerParser { + + protected static final Log log = LogFactory.getLog(AbstractScholexplorerParser.class); + final static Pattern pattern = Pattern.compile("10\\.\\d{4,9}/[-._;()/:A-Z0-9]+$", Pattern.CASE_INSENSITIVE); + private List datasetSubTypes = Arrays.asList("dataset", "software", "film", "sound", "physicalobject", "audiovisual", "collection", "other", "study", "metadata"); + + public abstract List parseObject(final String record); + + protected Map getAttributes(final XMLStreamReader parser) { + final Map attributesMap = new HashMap<>(); + for (int i = 0; i < parser.getAttributeCount(); i++) { + attributesMap.put(parser.getAttributeLocalName(i), parser.getAttributeValue(i)); + } + return attributesMap; + } + + + protected List extractSubject(List subjects) { + final List subjectResult = new ArrayList<>(); + if (subjects != null && subjects.size() > 0) { + subjects.forEach(subjectMap -> { + final StructuredProperty subject = new StructuredProperty(); + subject.setValue(subjectMap.getTextValue()); + final Qualifier schema = new Qualifier(); + schema.setClassid("dnet:subject"); + schema.setClassname("dnet:subject"); + schema.setSchemeid(subjectMap.getAttributes().get("subjectScheme")); + schema.setSchemename(subjectMap.getAttributes().get("subjectScheme")); + subject.setQualifier(schema); + subjectResult.add(subject); + }); + } + return subjectResult; + } + + + protected StructuredProperty extractIdentifier(List identifierType, final String fieldName) { + final StructuredProperty pid = new StructuredProperty(); + if (identifierType != null && identifierType.size() > 0) { + final VtdUtilityParser.Node result = identifierType.get(0); + pid.setValue(result.getTextValue()); + final Qualifier pidType = new Qualifier(); + pidType.setClassname(result.getAttributes().get(fieldName)); + pidType.setClassid(result.getAttributes().get(fieldName)); + pidType.setSchemename("dnet:pid_types"); + pidType.setSchemeid("dnet:pid_types"); + pid.setQualifier(pidType); + return pid; + } + return null; + } + + protected void inferPid(final StructuredProperty input) { + final Matcher matcher = pattern.matcher(input.getValue()); + if (matcher.find()) { + input.setValue(matcher.group()); + if (input.getQualifier() == null) { + input.setQualifier(new Qualifier()); + input.getQualifier().setSchemename("dnet:pid_types"); + input.getQualifier().setSchemeid("dnet:pid_types"); + } + input.getQualifier().setClassid("doi"); + input.getQualifier().setClassname("doi"); + } + } + + protected String generateId(final String pid, final String pidType, final String entityType) { + String type = "50|"; + switch (entityType){ + case "publication": + type = "50|"; + break; + case "dataset": + type = "60|"; + break; + case "unknown": + type = "70|"; + break; + default: + throw new IllegalArgumentException("unexpected value "+entityType); + + } + if ("dnet".equalsIgnoreCase(pidType)) + return type+StringUtils.substringAfter(pid, "::"); + + return type+ DHPUtils.md5(String.format("%s::%s", pid, pidType)); + } + + + + +} + + + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/DatasetScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/DatasetScholexplorerParser.java new file mode 100644 index 000000000..578b18085 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/DatasetScholexplorerParser.java @@ -0,0 +1,263 @@ +package eu.dnetlib.dhp.graph.scholexplorer.parser; + +import com.ximpleware.AutoPilot; +import com.ximpleware.VTDGen; +import com.ximpleware.VTDNav; +import eu.dnetlib.dhp.parser.utility.VtdUtilityParser; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset; +import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown; +import eu.dnetlib.dhp.schema.scholexplorer.ProvenaceInfo; + +import eu.dnetlib.dhp.parser.utility.VtdUtilityParser.Node; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class DatasetScholexplorerParser extends AbstractScholexplorerParser { + @Override + public List parseObject(String record) { + try { + final DLIDataset parsedObject = new DLIDataset(); + final VTDGen vg = new VTDGen(); + vg.setDoc(record.getBytes()); + final List result = new ArrayList<>(); + vg.parse(true); + + final VTDNav vn = vg.getNav(); + final AutoPilot ap = new AutoPilot(vn); + + DataInfo di = new DataInfo(); + di.setTrust("0.9"); + di.setDeletedbyinference(false); + di.setInvisible(false); + parsedObject.setDataInfo(di); + + + final String objIdentifier = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='objIdentifier']"); + parsedObject.setId("60|" + StringUtils.substringAfter(objIdentifier, "::")); + + parsedObject.setOriginalId(Collections.singletonList(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='recordIdentifier']"))); + + + parsedObject.setDateofcollection(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='dateOfCollection']")); + + final String resolvedDate = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='resolvedDate']"); + + if (StringUtils.isNotBlank(resolvedDate)) { + StructuredProperty currentDate = new StructuredProperty(); + currentDate.setValue(resolvedDate); + final Qualifier dateQualifier = new Qualifier(); + dateQualifier.setClassname("resolvedDate"); + dateQualifier.setClassid("resolvedDate"); + dateQualifier.setSchemename("dnet::date"); + dateQualifier.setSchemeid("dnet::date"); + currentDate.setQualifier(dateQualifier); + parsedObject.setRelevantdate(Collections.singletonList(currentDate)); + } + + final String completionStatus = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='completionStatus']"); + final String provisionMode = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='provisionMode']"); + + final String publisher = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='resource']/*[local-name()='publisher']"); + + List collectedFromNodes = + VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='collectedFrom']", Arrays.asList("name", "id", "mode", "completionStatus")); + + List resolvededFromNodes = + VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='resolvedFrom']", Arrays.asList("name", "id", "mode", "completionStatus")); + + Field pf = new Field<>(); + pf.setValue(publisher); + + parsedObject.setPublisher(pf); + final List provenances = new ArrayList<>(); + if (collectedFromNodes != null && collectedFromNodes.size() > 0) { + collectedFromNodes.forEach(it -> { + final ProvenaceInfo provenance = new ProvenaceInfo(); + provenance.setId(it.getAttributes().get("id")); + provenance.setName(it.getAttributes().get("name")); + provenance.setCollectionMode(provisionMode); + provenance.setCompletionStatus(it.getAttributes().get("completionStatus")); + provenances.add(provenance); + }); + } + + if (resolvededFromNodes != null && resolvededFromNodes.size() > 0) { + resolvededFromNodes.forEach(it -> { + final ProvenaceInfo provenance = new ProvenaceInfo(); + provenance.setId(it.getAttributes().get("id")); + provenance.setName(it.getAttributes().get("name")); + provenance.setCollectionMode("resolved"); + provenance.setCompletionStatus(it.getAttributes().get("completionStatus")); + provenances.add(provenance); + }); + } + + parsedObject.setDlicollectedfrom(provenances); + parsedObject.setCollectedfrom(parsedObject.getDlicollectedfrom().stream().map( + p-> { + final KeyValue cf = new KeyValue(); + cf.setKey(p.getId()); + cf.setValue(p.getName()); + return cf; + } + ).collect(Collectors.toList())); + parsedObject.setCompletionStatus(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='completionStatus']")); + + final List identifierType = + VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='resource']/*[local-name()='identifier']", Collections.singletonList("identifierType")); + + StructuredProperty currentPid = extractIdentifier(identifierType, "type"); + if (currentPid == null) return null; + inferPid(currentPid); + parsedObject.setPid(Collections.singletonList(currentPid)); + + + List descs = VtdUtilityParser.getTextValue(ap, vn, "//*[local-name()='description']"); + if (descs != null && descs.size() > 0) + parsedObject.setDescription(descs.stream() + .map(it -> it.length() < 512 ? it : it.substring(0, 512)) + .map(it -> { + final Field d = new Field<>(); + d.setValue(it); + return d; + }) + .collect(Collectors.toList())); + + + final List relatedIdentifiers = + VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='relatedIdentifier']", + Arrays.asList("relatedIdentifierType", "relationType", "entityType", "inverseRelationType")); + + + if(relatedIdentifiers!= null) { + result.addAll(relatedIdentifiers.stream() + .flatMap(n -> { + final List rels = new ArrayList<>(); + Relation r = new Relation(); + r.setSource(parsedObject.getId()); + final String relatedPid = n.getTextValue(); + final String relatedPidType = n.getAttributes().get("relatedIdentifierType"); + final String relatedType = n.getAttributes().getOrDefault("entityType", "unknown"); + final String relationSemantic = n.getAttributes().get("relationType"); + final String inverseRelation = n.getAttributes().get("inverseRelationType"); + final String targetId = generateId(relatedPid, relatedPidType, relatedType); + r.setTarget(targetId); + r.setRelType(relationSemantic); + r.setCollectedFrom(parsedObject.getCollectedfrom()); + rels.add(r); + r = new Relation(); + r.setSource(targetId); + r.setTarget(parsedObject.getId()); + r.setRelType(inverseRelation); + r.setCollectedFrom(parsedObject.getCollectedfrom()); + rels.add(r); + result.add(createUnknownObject(relatedPid, relatedPidType, parsedObject.getCollectedfrom().get(0), di)); + return rels.stream(); + }).collect(Collectors.toList())); + } + + + final List hostedBy = + VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='hostedBy']", Arrays.asList("id", "name")); + + + if (hostedBy != null) { + parsedObject.setInstance(hostedBy.stream().map(it -> + { + final Instance i = new Instance(); + i.setUrl(Collections.singletonList(currentPid.getValue())); + KeyValue h = new KeyValue(); + i.setHostedby(h); + h.setKey(it.getAttributes().get("id")); + h.setValue(it.getAttributes().get("name")); + return i; + }).collect(Collectors.toList())); + } + + + List subjects = extractSubject(VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='resource']//*[local-name()='subject']", Arrays.asList("subjectScheme"))); + + parsedObject.setSubject(subjects); + + parsedObject.setCompletionStatus(completionStatus); + + final List creators = VtdUtilityParser.getTextValue(ap, vn, "//*[local-name()='resource']//*[local-name()='creator']/*[local-name()='creatorName']"); + if (creators != null && creators.size() > 0) { + parsedObject.setAuthor(creators + .stream() + .map(a -> { + final Author author = new Author(); + author.setFullname(a); + return author; + }).collect(Collectors.toList()) + ); + } + final List titles = VtdUtilityParser.getTextValue(ap, vn, "//*[local-name()='resource']//*[local-name()='title']"); + if (titles != null && titles.size() > 0) { + parsedObject.setTitle(titles.stream() + .map(t -> { + final StructuredProperty st = new StructuredProperty(); + st.setValue(t); + return st; + } + ).collect(Collectors.toList()) + ); + } + + final List dates = VtdUtilityParser.getTextValue(ap, vn, "//*[local-name()='resource']/*[local-name()='dates']/*[local-name()='date']"); + + + if (dates != null && dates.size() > 0) { + parsedObject.setRelevantdate(dates.stream().map( + cd -> { + StructuredProperty date = new StructuredProperty(); + date.setValue(cd); + final Qualifier dq = new Qualifier(); + dq.setClassname("date"); + dq.setClassid("date"); + dq.setSchemename("dnet::date"); + dq.setSchemeid("dnet::date"); + date.setQualifier(dq); + return date; + } + ).collect(Collectors.toList())); + } + + + + result.add(parsedObject); + return result; + } catch (Throwable e) { + log.error("Error on parsing record " + record, e); + return null; + } + } + + + private DLIUnknown createUnknownObject(final String pid, final String pidType, final KeyValue cf, final DataInfo di) { + final DLIUnknown uk = new DLIUnknown(); + uk.setId(generateId(pid, pidType, "unknown")); + ProvenaceInfo pi = new ProvenaceInfo(); + pi.setId(cf.getKey()); + pi.setName(cf.getValue()); + pi.setCompletionStatus("incomplete"); + uk.setDataInfo(di); + uk.setDlicollectedfrom(Collections.singletonList(pi)); + final StructuredProperty sourcePid = new StructuredProperty(); + sourcePid.setValue(pid); + final Qualifier pt = new Qualifier(); + pt.setClassname(pidType); + pt.setClassid(pidType); + pt.setSchemename("dnet:pid_types"); + pt.setSchemeid("dnet:pid_types"); + sourcePid.setQualifier(pt); + uk.setPid(Collections.singletonList(sourcePid)); + return uk; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/PublicationScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/PublicationScholexplorerParser.java new file mode 100644 index 000000000..6e3221da5 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/PublicationScholexplorerParser.java @@ -0,0 +1,233 @@ +package eu.dnetlib.dhp.graph.scholexplorer.parser; + +import com.ximpleware.AutoPilot; +import com.ximpleware.VTDGen; +import com.ximpleware.VTDNav; +import eu.dnetlib.dhp.parser.utility.VtdUtilityParser; +import eu.dnetlib.dhp.parser.utility.VtdUtilityParser.Node; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication; +import eu.dnetlib.dhp.schema.scholexplorer.ProvenaceInfo; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class PublicationScholexplorerParser extends AbstractScholexplorerParser { + + @Override + public List parseObject(final String record) { + try { + final List result = new ArrayList<>(); + final DLIPublication parsedObject = new DLIPublication(); + final VTDGen vg = new VTDGen(); + vg.setDoc(record.getBytes()); + vg.parse(true); + + + final VTDNav vn = vg.getNav(); + final AutoPilot ap = new AutoPilot(vn); + + final DataInfo di = new DataInfo(); + di.setTrust("0.9"); + di.setDeletedbyinference(false); + di.setInvisible(false); + + final String objIdentifier = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='objIdentifier']"); + parsedObject.setId("50|" + StringUtils.substringAfter(objIdentifier, "::")); + + parsedObject.setDateofcollection(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='dateOfCollection']")); + + final String resolvedDate = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='resolvedDate']"); + parsedObject.setOriginalId(Collections.singletonList(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='recordIdentifier']"))); + + if (StringUtils.isNotBlank(resolvedDate)) { + StructuredProperty currentDate = new StructuredProperty(); + currentDate.setValue(resolvedDate); + final Qualifier dateQualifier = new Qualifier(); + dateQualifier.setClassname("resolvedDate"); + dateQualifier.setClassid("resolvedDate"); + dateQualifier.setSchemename("dnet::date"); + dateQualifier.setSchemeid("dnet::date"); + currentDate.setQualifier(dateQualifier); + parsedObject.setRelevantdate(Collections.singletonList(currentDate)); + } + + + final List pid = VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='pid']", Arrays.asList("type")); + + StructuredProperty currentPid = extractIdentifier(pid, "type"); + if (currentPid == null) return null; + inferPid(currentPid); + parsedObject.setPid(Collections.singletonList(currentPid)); + + String provisionMode = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='provisionMode']"); + + List collectedFromNodes = + VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='collectedFrom']", Arrays.asList("name", "id", "mode", "completionStatus")); + + List resolvededFromNodes = + VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='resolvedFrom']", Arrays.asList("name", "id", "mode", "completionStatus")); + + final String publisher = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='publisher']"); + Field pf = new Field<>(); + pf.setValue(publisher); + + parsedObject.setPublisher(pf); + final List provenances = new ArrayList<>(); + if (collectedFromNodes != null && collectedFromNodes.size() > 0) { + collectedFromNodes.forEach(it -> { + final ProvenaceInfo provenance = new ProvenaceInfo(); + provenance.setId(it.getAttributes().get("id")); + provenance.setName(it.getAttributes().get("name")); + provenance.setCollectionMode(provisionMode); + provenance.setCompletionStatus(it.getAttributes().get("completionStatus")); + provenances.add(provenance); + }); + } + + if (resolvededFromNodes != null && resolvededFromNodes.size() > 0) { + resolvededFromNodes.forEach(it -> { + final ProvenaceInfo provenance = new ProvenaceInfo(); + provenance.setId(it.getAttributes().get("id")); + provenance.setName(it.getAttributes().get("name")); + provenance.setCollectionMode("resolved"); + provenance.setCompletionStatus(it.getAttributes().get("completionStatus")); + provenances.add(provenance); + }); + } + + parsedObject.setDlicollectedfrom(provenances); + parsedObject.setCompletionStatus(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='completionStatus']")); + + parsedObject.setCollectedfrom(parsedObject.getDlicollectedfrom().stream().map( + p -> { + final KeyValue cf = new KeyValue(); + cf.setKey(p.getId()); + cf.setValue(p.getName()); + return cf; + } + ).collect(Collectors.toList())); + + final List relatedIdentifiers = + VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='relatedIdentifier']", + Arrays.asList("relatedIdentifierType", "relationType", "entityType", "inverseRelationType")); + + + if (relatedIdentifiers != null) { + result.addAll(relatedIdentifiers.stream() + .flatMap(n -> { + final List rels = new ArrayList<>(); + Relation r = new Relation(); + r.setSource(parsedObject.getId()); + final String relatedPid = n.getTextValue(); + final String relatedPidType = n.getAttributes().get("relatedIdentifierType"); + final String relatedType = n.getAttributes().getOrDefault("entityType", "unknown"); + final String relationSemantic = n.getAttributes().get("relationType"); + final String inverseRelation = n.getAttributes().get("inverseRelationType"); + final String targetId = generateId(relatedPid, relatedPidType, relatedType); + r.setTarget(targetId); + r.setRelType(relationSemantic); + r.setCollectedFrom(parsedObject.getCollectedfrom()); + r.setRelClass("datacite"); + r.setDataInfo(di); + rels.add(r); + r = new Relation(); + r.setSource(targetId); + r.setTarget(parsedObject.getId()); + r.setRelType(inverseRelation); + r.setCollectedFrom(parsedObject.getCollectedfrom()); + r.setDataInfo(di); + r.setRelClass("datacite"); + rels.add(r); + + return rels.stream(); + }).collect(Collectors.toList())); + } + + final List hostedBy = + VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='hostedBy']", Arrays.asList("id", "name")); + + + if (hostedBy != null) { + parsedObject.setInstance(hostedBy.stream().map(it -> + { + final Instance i = new Instance(); + i.setUrl(Collections.singletonList(currentPid.getValue())); + KeyValue h = new KeyValue(); + i.setHostedby(h); + h.setKey(it.getAttributes().get("id")); + h.setValue(it.getAttributes().get("name")); + return i; + }).collect(Collectors.toList())); + } + + final List authorsNode = VtdUtilityParser.getTextValue(ap, vn, "//*[local-name()='creator']"); + if (authorsNode != null) + parsedObject.setAuthor(authorsNode + .stream() + .map(a -> { + final Author author = new Author(); + author.setFullname(a); + return author; + }).collect(Collectors.toList()) + ); + + final List titles = VtdUtilityParser.getTextValue(ap, vn, "//*[local-name()='title']"); + if (titles != null) { + parsedObject.setTitle(titles.stream() + .map(t -> { + final StructuredProperty st = new StructuredProperty(); + st.setValue(t); + return st; + } + ).collect(Collectors.toList()) + ); + } + + + Field description = new Field<>(); + + description.setValue(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='description']")); + + if (StringUtils.isNotBlank(description.getValue()) && description.getValue().length() > 512) { + description.setValue(description.getValue().substring(0, 512)); + } + + parsedObject.setDescription(Collections.singletonList(description)); + + + final String cd = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='date']"); + + StructuredProperty date = new StructuredProperty(); + date.setValue(cd); + final Qualifier dq = new Qualifier(); + dq.setClassname("date"); + dq.setClassid("date"); + dq.setSchemename("dnet::date"); + dq.setSchemeid("dnet::date"); + date.setQualifier(dq); + parsedObject.setRelevantdate(Collections.singletonList(date)); + + List subjects = extractSubject(VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='subject']", Collections.singletonList("scheme"))); + parsedObject.setSubject(subjects); + + parsedObject.setDataInfo(di); + + + result.add(parsedObject); + return result; + + } catch (Throwable e) { + log.error("Input record: " + record); + log.error("Error on parsing record ", e); + return null; + } + + } + + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_extract_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_extract_entities_parameters.json new file mode 100644 index 000000000..1c02109d0 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_extract_entities_parameters.json @@ -0,0 +1,7 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the result data", "paramRequired": true}, + {"paramName":"td", "paramLongName":"targetDir", "paramDescription": "the name of the result data", "paramRequired": true}, + {"paramName":"e", "paramLongName":"entities", "paramDescription": "the entity type to be filtered", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_scholix_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_scholix_parameters.json new file mode 100644 index 000000000..c02aa0226 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_scholix_parameters.json @@ -0,0 +1,6 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the result data", "paramRequired": true}, + {"paramName":"e", "paramLongName":"entity", "paramDescription": "the entity type", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/merge_entities_scholix_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/merge_entities_scholix_parameters.json new file mode 100644 index 000000000..1ce482e67 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/merge_entities_scholix_parameters.json @@ -0,0 +1,6 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, + {"paramName":"e", "paramLongName":"entity", "paramDescription": "the entity type", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the result data", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/mergeentities/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/mergeentities/oozie_app/config-default.xml new file mode 100644 index 000000000..6fb2a1253 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/mergeentities/oozie_app/config-default.xml @@ -0,0 +1,10 @@ + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/mergeentities/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/mergeentities/oozie_app/workflow.xml new file mode 100644 index 000000000..102587ab0 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/mergeentities/oozie_app/workflow.xml @@ -0,0 +1,64 @@ + + + + sourcePath + the source path + + + targetPath + the source path + + + targetDir + the name of the path + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + entity + the entity to be merged + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Merge ${entity} + eu.dnetlib.dhp.graph.scholexplorer.SparkScholexplorerMergeEntitiesJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --num-executors 100 + --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" + + -mt yarn-cluster + --sourcePath${sourcePath} + --targetPath${targetPath} + --entity${entity} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/extractentities/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/extractentities/oozie_app/config-default.xml new file mode 100644 index 000000000..6fb2a1253 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/extractentities/oozie_app/config-default.xml @@ -0,0 +1,10 @@ + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/extractentities/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/extractentities/oozie_app/workflow.xml new file mode 100644 index 000000000..ef968b0cd --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/extractentities/oozie_app/workflow.xml @@ -0,0 +1,68 @@ + + + + sourcePath + the source path + + + targetPath + the source path + + + targetDir + the name of the path + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + entities + the entities to be extracted + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Extract ${entities} + eu.dnetlib.dhp.graph.scholexplorer.SparkExtractEntitiesJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --num-executors 100 + + + + --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" + + -mt yarn-cluster + --sourcePath${sourcePath} + --targetPath${targetPath} + --targetDir${targetDir} + --entities${entities} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/oozie_app/config-default.xml new file mode 100644 index 000000000..6fb2a1253 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/oozie_app/config-default.xml @@ -0,0 +1,10 @@ + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/oozie_app/workflow.xml new file mode 100644 index 000000000..3efb90ae4 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/oozie_app/workflow.xml @@ -0,0 +1,63 @@ + + + + sourcePath + the source path + + + targetPath + the source path + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + entity + the entity type + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Import ${entity} and related entities + eu.dnetlib.dhp.graph.scholexplorer.SparkScholexplorerGraphImporter + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --num-executors 100 + + + + --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" + + -mt yarn-cluster + --sourcePath${sourcePath} + --targetPath${targetPath} + --entity${entity} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerGraphImporterTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerGraphImporterTest.java new file mode 100644 index 000000000..c6e4bac1d --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerGraphImporterTest.java @@ -0,0 +1,19 @@ +package eu.dnetlib.dhp.graph.scholexplorer; + +import org.junit.Test; + +public class SparkScholexplorerGraphImporterTest { + + @Test + + public void testImport() throws Exception { + SparkScholexplorerGraphImporter.main(new String[]{ + "-mt", "local[*]", + "-e", "publication", + "-s", "file:///data/scholexplorer_dump/pmf.dli.seq", + "-t", "file:///data/scholexplorer_dump/pmf_dli_with_rel"} + ); + + + } +} diff --git a/pom.xml b/pom.xml index aedf5ebff..5323276aa 100644 --- a/pom.xml +++ b/pom.xml @@ -231,6 +231,11 @@ secondstring 1.0.0 + + com.ximpleware + vtd-xml + ${vtd.version} + org.apache.oozie @@ -421,6 +426,7 @@ 2.9.6 3.5 2.11.12 + [2.12,3.0)