From c36239e6931564f79fb14f229bd83c65b24788c9 Mon Sep 17 00:00:00 2001 From: sandro Date: Tue, 14 Apr 2020 17:47:36 +0200 Subject: [PATCH] fixed incremental indexing --- .../dhp/schema/scholexplorer/DLIRelation.java | 15 +++ .../sx/graph/SparkSXGeneratePidSimlarity.java | 7 +- .../SparkScholexplorerCreateRawGraphJob.java | 7 +- .../parser/AbstractScholexplorerParser.java | 78 ++++++++++++++- .../parser/DatasetScholexplorerParser.java | 72 ++------------ .../PublicationScholexplorerParser.java | 50 +--------- .../dhp-graph-provision-scholexplorer/pom.xml | 5 + .../provision/scholix/ScholixResource.java | 10 +- .../provision/update/Datacite2Scholix.java | 2 +- .../update/SparkResolveScholixTarget.java | 57 ++++++++++- .../dhp/sx/index/oozie_app/config-default.xml | 10 ++ .../dhp/sx/index/oozie_app/workflow.xml | 68 +++++++++++++ .../dhp/sx/synch/oozie_app/config-default.xml | 10 ++ .../dhp/sx/synch/oozie_app/workflow.xml | 97 +++++++++++++++++++ .../dhp/provision/DataciteClientTest.java | 64 +----------- 15 files changed, 364 insertions(+), 188 deletions(-) create mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIRelation.java create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/index/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/index/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/synch/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/synch/oozie_app/workflow.xml diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIRelation.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIRelation.java new file mode 100644 index 000000000..1a45b64ff --- /dev/null +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIRelation.java @@ -0,0 +1,15 @@ +package eu.dnetlib.dhp.schema.scholexplorer; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class DLIRelation extends Relation { + private String dateOfCollection; + + public String getDateOfCollection() { + return dateOfCollection; + } + + public void setDateOfCollection(String dateOfCollection) { + this.dateOfCollection = dateOfCollection; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java index 806140160..aa2f2cc58 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.sx.graph; import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation; import eu.dnetlib.dhp.utils.DHPUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.api.java.JavaPairRDD; @@ -49,15 +50,15 @@ public class SparkSXGeneratePidSimlarity { .equalsIgnoreCase(StringUtils.substringAfter(t._2(), "::"))) .distinct(); - JavaRDD simRel = datasetSimRel.union(publicationSimRel).map(s -> { - final Relation r = new Relation(); + JavaRDD simRel = datasetSimRel.union(publicationSimRel).map(s -> { + final DLIRelation r = new DLIRelation(); r.setSource(s._1()); r.setTarget(s._2()); r.setRelType("similar"); return r; } ); - spark.createDataset(simRel.rdd(), Encoders.bean(Relation.class)).distinct().write() + spark.createDataset(simRel.rdd(), Encoders.bean(DLIRelation.class)).distinct().write() .mode(SaveMode.Overwrite).save(targetPath+"/pid_simRel"); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java index 36d3cf540..36c94f595 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java @@ -7,6 +7,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset; import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication; +import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation; import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown; import eu.dnetlib.dhp.utils.DHPUtils; import net.minidev.json.JSONArray; @@ -135,19 +136,19 @@ public class SparkScholexplorerCreateRawGraphJob { SparkSXGeneratePidSimlarity.generateDataFrame(spark, sc, inputPath.replace("/relation",""),targetPath.replace("/relation","") ); - RDD rdd = union.mapToPair((PairFunction) f -> { + RDD rdd = union.mapToPair((PairFunction) f -> { final String source = getJPathString(SOURCEJSONPATH, f); final String target = getJPathString(TARGETJSONPATH, f); final String reltype = getJPathString(RELJSONPATH, f); ObjectMapper mapper = new ObjectMapper(); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - return new Tuple2<>(DHPUtils.md5(String.format("%s::%s::%s", source.toLowerCase(), reltype.toLowerCase(), target.toLowerCase())), mapper.readValue(f, Relation.class)); + return new Tuple2<>(DHPUtils.md5(String.format("%s::%s::%s", source.toLowerCase(), reltype.toLowerCase(), target.toLowerCase())), mapper.readValue(f, DLIRelation.class)); }).reduceByKey((a, b) -> { a.mergeFrom(b); return a; }).map(Tuple2::_2).rdd(); - spark.createDataset(rdd, Encoders.bean(Relation.class)).write().mode(SaveMode.Overwrite).save(targetPath); + spark.createDataset(rdd, Encoders.bean(DLIRelation.class)).write().mode(SaveMode.Overwrite).save(targetPath); Dataset rel_ds =spark.read().load(targetPath).as(Encoders.bean(Relation.class)); System.out.println("LOADING PATH :"+targetPath.replace("/relation","")+"/pid_simRel"); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java index ca20c0aba..9eeff9613 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java @@ -2,10 +2,13 @@ package eu.dnetlib.dhp.sx.graph.parser; import eu.dnetlib.dhp.parser.utility.VtdUtilityParser; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset; +import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation; +import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown; +import eu.dnetlib.dhp.schema.scholexplorer.ProvenaceInfo; import eu.dnetlib.dhp.utils.DHPUtils; +import eu.dnetlib.scholexplorer.relation.RelInfo; import eu.dnetlib.scholexplorer.relation.RelationMapper; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; @@ -15,6 +18,7 @@ import javax.xml.stream.XMLStreamReader; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; public abstract class AbstractScholexplorerParser { @@ -104,6 +108,74 @@ public abstract class AbstractScholexplorerParser { return type+ DHPUtils.md5(String.format("%s::%s", pid.toLowerCase().trim(), pidType.toLowerCase().trim())); } + protected DLIUnknown createUnknownObject(final String pid, final String pidType, final KeyValue cf, final DataInfo di, final String dateOfCollection) { + final DLIUnknown uk = new DLIUnknown(); + uk.setId(generateId(pid, pidType, "unknown")); + ProvenaceInfo pi = new ProvenaceInfo(); + pi.setId(cf.getKey()); + pi.setName(cf.getValue()); + pi.setCompletionStatus("incomplete"); + uk.setDataInfo(di); + uk.setDlicollectedfrom(Collections.singletonList(pi)); + final StructuredProperty sourcePid = new StructuredProperty(); + sourcePid.setValue(pid); + final Qualifier pt = new Qualifier(); + pt.setClassname(pidType); + pt.setClassid(pidType); + pt.setSchemename("dnet:pid_types"); + pt.setSchemeid("dnet:pid_types"); + sourcePid.setQualifier(pt); + uk.setPid(Collections.singletonList(sourcePid)); + uk.setDateofcollection(dateOfCollection); + return uk; + } + + protected void generateRelations(RelationMapper relationMapper, Result parsedObject, List result, DataInfo di, String dateOfCollection, List relatedIdentifiers) { + if(relatedIdentifiers!= null) { + result.addAll(relatedIdentifiers.stream() + .flatMap(n -> { + final List rels = new ArrayList<>(); + DLIRelation r = new DLIRelation(); + r.setSource(parsedObject.getId()); + final String relatedPid = n.getTextValue(); + final String relatedPidType = n.getAttributes().get("relatedIdentifierType"); + final String relatedType = n.getAttributes().getOrDefault("entityType", "unknown"); + String relationSemantic = n.getAttributes().get("relationType"); + String inverseRelation; + final String targetId = generateId(relatedPid, relatedPidType, relatedType); + r.setDateOfCollection(dateOfCollection); + if (relationMapper.containsKey(relationSemantic.toLowerCase())) + { + RelInfo relInfo = relationMapper.get(relationSemantic.toLowerCase()); + relationSemantic = relInfo.getOriginal(); + inverseRelation = relInfo.getInverse(); + } + else { + relationSemantic = "Unknown"; + inverseRelation = "Unknown"; + } + r.setTarget(targetId); + r.setRelType(relationSemantic); + r.setRelClass("datacite"); + r.setCollectedFrom(parsedObject.getCollectedfrom()); + r.setDataInfo(di); + rels.add(r); + r = new DLIRelation(); + r.setDataInfo(di); + r.setSource(targetId); + r.setTarget(parsedObject.getId()); + r.setRelType(inverseRelation); + r.setRelClass("datacite"); + r.setCollectedFrom(parsedObject.getCollectedfrom()); + r.setDateOfCollection(dateOfCollection); + rels.add(r); + if("unknown".equalsIgnoreCase(relatedType)) + result.add(createUnknownObject(relatedPid, relatedPidType, parsedObject.getCollectedfrom().get(0), di, dateOfCollection)); + return rels.stream(); + }).collect(Collectors.toList())); + } + } + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java index 2ba2bd519..f1915c5cf 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java @@ -42,7 +42,8 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { parsedObject.setOriginalId(Collections.singletonList(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='recordIdentifier']"))); parsedObject.setOriginalObjIdentifier(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='objIdentifier']")); - parsedObject.setDateofcollection(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='dateOfCollection']")); + String dateOfCollection = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='dateOfCollection']"); + parsedObject.setDateofcollection(dateOfCollection); final String resolvedDate = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='resolvedDate']"); @@ -123,7 +124,7 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { List descs = VtdUtilityParser.getTextValue(ap, vn, "//*[local-name()='description']"); if (descs != null && descs.size() > 0) parsedObject.setDescription(descs.stream() - .map(it -> it.length() < 512 ? it : it.substring(0, 512)) + .map(it -> it.length() < 10000 ? it : it.substring(0, 10000)) .map(it -> { final Field d = new Field<>(); d.setValue(it); @@ -137,48 +138,7 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { Arrays.asList("relatedIdentifierType", "relationType", "entityType", "inverseRelationType")); - if(relatedIdentifiers!= null) { - result.addAll(relatedIdentifiers.stream() - .flatMap(n -> { - final List rels = new ArrayList<>(); - Relation r = new Relation(); - r.setSource(parsedObject.getId()); - final String relatedPid = n.getTextValue(); - final String relatedPidType = n.getAttributes().get("relatedIdentifierType"); - final String relatedType = n.getAttributes().getOrDefault("entityType", "unknown"); - String relationSemantic = n.getAttributes().get("relationType"); - String inverseRelation = n.getAttributes().get("inverseRelationType"); - final String targetId = generateId(relatedPid, relatedPidType, relatedType); - - if (relationMapper.containsKey(relationSemantic.toLowerCase())) - { - RelInfo relInfo = relationMapper.get(relationSemantic.toLowerCase()); - relationSemantic = relInfo.getOriginal(); - inverseRelation = relInfo.getInverse(); - } - else { - relationSemantic = "Unknown"; - inverseRelation = "Unknown"; - } - r.setTarget(targetId); - r.setRelType(relationSemantic); - r.setRelClass("datacite"); - r.setCollectedFrom(parsedObject.getCollectedfrom()); - r.setDataInfo(di); - rels.add(r); - r = new Relation(); - r.setDataInfo(di); - r.setSource(targetId); - r.setTarget(parsedObject.getId()); - r.setRelType(inverseRelation); - r.setRelClass("datacite"); - r.setCollectedFrom(parsedObject.getCollectedfrom()); - rels.add(r); - if("unknown".equalsIgnoreCase(relatedType)) - result.add(createUnknownObject(relatedPid, relatedPidType, parsedObject.getCollectedfrom().get(0), di)); - return rels.stream(); - }).collect(Collectors.toList())); - } + generateRelations(relationMapper, parsedObject, result, di, dateOfCollection, relatedIdentifiers); final List hostedBy = @@ -199,7 +159,7 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { } - List subjects = extractSubject(VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='resource']//*[local-name()='subject']", Arrays.asList("subjectScheme"))); + List subjects = extractSubject(VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='resource']//*[local-name()='subject']", Collections.singletonList("subjectScheme"))); parsedObject.setSubject(subjects); @@ -265,24 +225,6 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { } - private DLIUnknown createUnknownObject(final String pid, final String pidType, final KeyValue cf, final DataInfo di) { - final DLIUnknown uk = new DLIUnknown(); - uk.setId(generateId(pid, pidType, "unknown")); - ProvenaceInfo pi = new ProvenaceInfo(); - pi.setId(cf.getKey()); - pi.setName(cf.getValue()); - pi.setCompletionStatus("incomplete"); - uk.setDataInfo(di); - uk.setDlicollectedfrom(Collections.singletonList(pi)); - final StructuredProperty sourcePid = new StructuredProperty(); - sourcePid.setValue(pid); - final Qualifier pt = new Qualifier(); - pt.setClassname(pidType); - pt.setClassid(pidType); - pt.setSchemename("dnet:pid_types"); - pt.setSchemeid("dnet:pid_types"); - sourcePid.setQualifier(pt); - uk.setPid(Collections.singletonList(sourcePid)); - return uk; - } + + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java index b8b38515b..aa2f86076 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java @@ -38,7 +38,8 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser di.setDeletedbyinference(false); di.setInvisible(false); - parsedObject.setDateofcollection(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='dateOfCollection']")); + String dateOfCollection = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='dateOfCollection']"); + parsedObject.setDateofcollection(dateOfCollection); final String resolvedDate = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='resolvedDate']"); parsedObject.setOriginalId(Collections.singletonList(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='recordIdentifier']"))); @@ -118,48 +119,7 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser final List relatedIdentifiers = VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='relatedIdentifier']", Arrays.asList("relatedIdentifierType", "relationType", "entityType", "inverseRelationType")); - - - if (relatedIdentifiers != null) { - result.addAll(relatedIdentifiers.stream() - .flatMap(n -> { - final List rels = new ArrayList<>(); - Relation r = new Relation(); - r.setSource(parsedObject.getId()); - final String relatedPid = n.getTextValue(); - final String relatedPidType = n.getAttributes().get("relatedIdentifierType"); - final String relatedType = n.getAttributes().getOrDefault("entityType", "unknown"); - String relationSemantic = n.getAttributes().get("relationType"); - String inverseRelation = "Unknown"; - final String targetId = generateId(relatedPid, relatedPidType, relatedType); - - if (relationMapper.containsKey(relationSemantic.toLowerCase())) - { - RelInfo relInfo = relationMapper.get(relationSemantic.toLowerCase()); - relationSemantic = relInfo.getOriginal(); - inverseRelation = relInfo.getInverse(); - } - else { - relationSemantic = "Unknown"; - } - r.setTarget(targetId); - r.setRelType(relationSemantic); - r.setCollectedFrom(parsedObject.getCollectedfrom()); - r.setRelClass("datacite"); - r.setDataInfo(di); - rels.add(r); - r = new Relation(); - r.setDataInfo(di); - r.setSource(targetId); - r.setTarget(parsedObject.getId()); - r.setRelType(inverseRelation); - r.setRelClass("datacite"); - r.setCollectedFrom(parsedObject.getCollectedfrom()); - rels.add(r); - - return rels.stream(); - }).collect(Collectors.toList())); - } + generateRelations(relationMapper, parsedObject, result, di, dateOfCollection, relatedIdentifiers); final List hostedBy = VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='hostedBy']", Arrays.asList("id", "name")); @@ -206,8 +166,8 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser description.setValue(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='description']")); - if (StringUtils.isNotBlank(description.getValue()) && description.getValue().length() > 512) { - description.setValue(description.getValue().substring(0, 512)); + if (StringUtils.isNotBlank(description.getValue()) && description.getValue().length() > 10000) { + description.setValue(description.getValue().substring(0, 10000)); } parsedObject.setDescription(Collections.singletonList(description)); diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml index de38a01b3..03604f431 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml @@ -69,6 +69,11 @@ + + org.apache.httpcomponents + httpclient + + diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixResource.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixResource.java index 49b891e65..f29722eb8 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixResource.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixResource.java @@ -3,7 +3,6 @@ package eu.dnetlib.dhp.provision.scholix; import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary; import java.io.Serializable; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -20,10 +19,6 @@ public class ScholixResource implements Serializable { private List publisher; private List collectedFrom; - - - - public static ScholixResource fromSummary(ScholixSummary summary) { final ScholixResource resource = new ScholixResource(); @@ -38,7 +33,7 @@ public class ScholixResource implements Serializable { resource.setObjectType(summary.getTypology().toString()); - if (summary.getTitle() != null && summary.getTitle().size()>0) + if (summary.getTitle() != null && summary.getTitle().size() > 0) resource.setTitle(summary.getTitle().get(0)); if (summary.getAuthor() != null) @@ -47,7 +42,7 @@ public class ScholixResource implements Serializable { .collect(Collectors.toList()) ); - if (summary.getDate() != null && summary.getDate().size()>0) + if (summary.getDate() != null && summary.getDate().size() > 0) resource.setPublicationDate(summary.getDate().get(0)); if (summary.getPublisher() != null) resource.setPublisher(summary.getPublisher().stream() @@ -65,6 +60,7 @@ public class ScholixResource implements Serializable { } + public List getIdentifier() { return identifier; } diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/Datacite2Scholix.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/Datacite2Scholix.java index c6617a823..ac05a8350 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/Datacite2Scholix.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/Datacite2Scholix.java @@ -165,7 +165,7 @@ public class Datacite2Scholix { return res; } - protected String generateId(final String pid, final String pidType, final String entityType) { + public static String generateId(final String pid, final String pidType, final String entityType) { String type; switch (entityType){ case "publication": diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/SparkResolveScholixTarget.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/SparkResolveScholixTarget.java index 4628c4684..35020ecdf 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/SparkResolveScholixTarget.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/SparkResolveScholixTarget.java @@ -4,18 +4,25 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.provision.scholix.Scholix; import eu.dnetlib.dhp.provision.scholix.ScholixIdentifier; +import eu.dnetlib.dhp.provision.scholix.ScholixRelationship; import eu.dnetlib.dhp.provision.scholix.ScholixResource; import eu.dnetlib.dhp.utils.DHPUtils; +import eu.dnetlib.scholexplorer.relation.RelationMapper; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import scala.Tuple2; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; public class SparkResolveScholixTarget { @@ -29,8 +36,6 @@ public class SparkResolveScholixTarget { final String sourcePath = parser.get("sourcePath"); final String workingDirPath= parser.get("workingDirPath"); final String indexHost= parser.get("indexHost"); - - try (SparkSession spark = getSession(conf, master)){ final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); @@ -65,7 +70,55 @@ public class SparkResolveScholixTarget { }, Encoders.bean(ScholixResource.class)).write().mode(SaveMode.Overwrite).save(workingDirPath+"/stepB"); + Dataset s2 = spark.read().load(workingDirPath+"/stepB").as(Encoders.bean(ScholixResource.class)); + + s1.joinWith(s2, s1.col("target.identifier.identifier").equalTo(s2.col("identifier.identifier")), "left") + + .flatMap((FlatMapFunction, Scholix>) f -> + { + + final List res = new ArrayList<>(); + final Scholix s = f._1(); + final ScholixResource target = f._2(); + if (StringUtils.isNotBlank(s.getIdentifier())) + res.add(s); + else if (target == null) { + ScholixResource currentTarget = s.getTarget(); + currentTarget.setObjectType("unknown"); + currentTarget.setDnetIdentifier(Datacite2Scholix.generateId(currentTarget.getIdentifier().get(0).getIdentifier(),currentTarget.getIdentifier().get(0).getSchema(), currentTarget.getObjectType())); + + s.generateIdentifier(); + res.add(s); + final Scholix inverse = new Scholix(); + inverse.setTarget(s.getSource()); + inverse.setSource(s.getTarget()); + inverse.setLinkprovider(s.getLinkprovider()); + inverse.setPublicationDate(s.getPublicationDate()); + inverse.setPublisher(s.getPublisher()); + inverse.setRelationship(new ScholixRelationship(s.getRelationship().getInverse(), s.getRelationship().getSchema(), s.getRelationship().getName())); + inverse.generateIdentifier(); + res.add(inverse); + + } else + { + target.setIdentifier(target.getIdentifier().stream().map(d -> new ScholixIdentifier(d.getIdentifier().toLowerCase(), d.getSchema().toLowerCase())).collect(Collectors.toList())); + s.setTarget(target); + s.generateIdentifier(); + res.add(s); + final Scholix inverse = new Scholix(); + inverse.setTarget(s.getSource()); + inverse.setSource(s.getTarget()); + inverse.setLinkprovider(s.getLinkprovider()); + inverse.setPublicationDate(s.getPublicationDate()); + inverse.setPublisher(s.getPublisher()); + inverse.setRelationship(new ScholixRelationship(s.getRelationship().getInverse(), s.getRelationship().getSchema(), s.getRelationship().getName())); + inverse.generateIdentifier(); + res.add(inverse); + } + + return res.iterator(); + }, Encoders.bean(Scholix.class)).javaRDD().map(s -> new ObjectMapper().writeValueAsString(s)).saveAsTextFile(workingDirPath+"/resolved_json"); } } diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/index/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/index/oozie_app/config-default.xml new file mode 100644 index 000000000..6fb2a1253 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/index/oozie_app/config-default.xml @@ -0,0 +1,10 @@ + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/index/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/index/oozie_app/workflow.xml new file mode 100644 index 000000000..9fc86e014 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/index/oozie_app/workflow.xml @@ -0,0 +1,68 @@ + + + + workingDirPath + the source path + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + index + index name + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + index Summary + eu.dnetlib.dhp.provision.SparkIndexCollectionOnES + dhp-graph-provision-scholexplorer-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="32" + -mt yarn-cluster + --sourcePath${workingDirPath}/summary + --index${index}_object + --idPathid + --typesummary + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + index scholix + eu.dnetlib.dhp.provision.SparkIndexCollectionOnES + dhp-graph-provision-scholexplorer-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" + -mt yarn-cluster + --sourcePath${workingDirPath}/scholix_json + --index${index}_scholix + --idPathidentifier + --typescholix + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/synch/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/synch/oozie_app/config-default.xml new file mode 100644 index 000000000..6fb2a1253 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/synch/oozie_app/config-default.xml @@ -0,0 +1,10 @@ + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/synch/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/synch/oozie_app/workflow.xml new file mode 100644 index 000000000..5e108f94f --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/synch/oozie_app/workflow.xml @@ -0,0 +1,97 @@ + + + + workingDirPath + the source path + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + index + index name + + + timestamp + timestamp from incremental harvesting + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.provision.update.RetrieveUpdateFromDatacite + -t${workingDirPath}/synch/input_json + -n${nameNode} + -ts${timestamp} + -ihip-90-147-167-25.ct1.garrservices.it + -indatacite + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Resolve and generate Scholix + eu.dnetlib.dhp.provision.update.SparkResolveScholixTarget + dhp-graph-provision-scholexplorer-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="32" + -m yarn-cluster + -s${workingDirPath}/synch/input_json + -w${workingDirPath}/synch + -hip-90-147-167-25.ct1.garrservices.it + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + index scholix + eu.dnetlib.dhp.provision.SparkIndexCollectionOnES + dhp-graph-provision-scholexplorer-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" + -mt yarn-cluster + --sourcePath${workingDirPath}/synch/resolved_json + --index${index}_scholix + --idPathidentifier + --typescholix + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DataciteClientTest.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DataciteClientTest.java index e008d72be..782784be4 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DataciteClientTest.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DataciteClientTest.java @@ -3,24 +3,18 @@ package eu.dnetlib.dhp.provision; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.provision.scholix.Scholix; import eu.dnetlib.dhp.provision.scholix.ScholixResource; -import eu.dnetlib.dhp.provision.update.*; +import eu.dnetlib.dhp.provision.update.CrossrefClient; +import eu.dnetlib.dhp.provision.update.Datacite2Scholix; +import eu.dnetlib.dhp.provision.update.DataciteClient; import eu.dnetlib.scholexplorer.relation.RelationMapper; import org.apache.commons.io.IOUtils; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.util.List; public class DataciteClientTest { - - @Test public void dataciteSCholixTest() throws Exception { final String json = IOUtils.toString(getClass().getResourceAsStream("datacite.json")); @@ -32,66 +26,18 @@ public class DataciteClientTest { } - - public void testClient() throws Exception { - RetrieveUpdateFromDatacite.main(new String[]{ - "-n", "file:///data/new_s2.txt", - "-t", "/data/new_s2.txt", - "-ts", "1585760736", - "-ih", "ip-90-147-167-25.ct1.garrservices.it", - "-in", "datacite", - }); - - - SparkResolveScholixTarget.main(new String[]{ - "-s", "file:///data/new_s.txt", - "-m", "local[*]", - "-w", "/data/scholix/provision", - "-h", "ip-90-147-167-25.ct1.garrservices.it", - - }); - } - - public void testResolveDataset() throws Exception { - DataciteClient dc = new DataciteClient("ip-90-147-167-25.ct1.garrservices.it"); + DataciteClient dc = new DataciteClient("ip-90-147-167-25.ct1.garrservices.it"); ScholixResource datasetByDOI = dc.getDatasetByDOI("10.17182/hepdata.15392.v1/t5"); Assertions.assertNotNull(datasetByDOI); System.out.println(new ObjectMapper().writeValueAsString(datasetByDOI)); CrossrefClient cr = new CrossrefClient("ip-90-147-167-25.ct1.garrservices.it"); - ScholixResource crossrefByDOI = cr.getResourceByDOI("10.26850/1678-4618eqj.v35.1.2010.p41-46"); + ScholixResource crossrefByDOI = cr.getResourceByDOI("10.26850/1678-4618eqj.v35.1.2010.p41-46"); Assertions.assertNotNull(crossrefByDOI); System.out.println(new ObjectMapper().writeValueAsString(crossrefByDOI)); - - } - - private String getResponse(final String url,final String json ) { - CloseableHttpClient client = HttpClients.createDefault(); - try { - - HttpPost httpPost = new HttpPost(url); - if (json!= null) { - StringEntity entity = new StringEntity(json); - httpPost.setEntity(entity); - httpPost.setHeader("Accept", "application/json"); - httpPost.setHeader("Content-type", "application/json"); - } - CloseableHttpResponse response = client.execute(httpPost); - - return IOUtils.toString(response.getEntity().getContent()); - } catch (Throwable e) { - throw new RuntimeException("Error on executing request ",e); - } finally { - try { - client.close(); - } catch (IOException e) { - throw new RuntimeException("Unable to close client ",e); - } - } - } }