From cd27df91a1acc8d8c721bbff0954321b63723a85 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 6 Nov 2020 17:12:31 +0100 Subject: [PATCH] fixed bug on missing relation in ANDS --- .../eu/dnetlib/dhp/doiboost/QueryTest.scala | 9 ++- .../dhp/oa/graph/clean/CleaningFunctions.java | 28 ++++---- .../eu/dnetlib/dhp/sx/graph/IdReplace.scala | 3 + .../sx/graph/SparkSplitOafTODLIEntities.scala | 69 ++++++++++++++++++- .../dhp/sx/graph/ScholexplorerParserTest.java | 23 +++++++ .../resources/eu/dnetlib/dhp/sx/graph/dmf.xml | 67 ++++++++---------- .../resources/eu/dnetlib/dhp/sx/graph/pmf.xml | 25 +++++++ .../dhp/provision/scholix/Scholix.java | 26 ++++--- .../dhp/sx/index/oozie_app/config-default.xml | 4 ++ .../dhp/sx/index/oozie_app/workflow.xml | 47 ++++++++----- .../dhp/sx/provision/oozie_app/workflow.xml | 54 --------------- 11 files changed, 216 insertions(+), 139 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/IdReplace.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/pmf.xml diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala index b3402ee9f..61c1f5111 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala @@ -1,6 +1,6 @@ package eu.dnetlib.dhp.doiboost -import eu.dnetlib.dhp.schema.oaf.Publication +import eu.dnetlib.dhp.schema.oaf.{Publication, Relation} import org.apache.spark.SparkContext import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig} @@ -21,6 +21,13 @@ class QueryTest { + } + + + def has_ands(r:Relation) :Boolean = { + + r.getCollectedfrom!= null && r.getCollectedfrom.asScala.count(k => k.getValue.contains("Australian")) > 0 + } def hasInstanceWithUrl(p:Publication):Boolean = { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java index 12543d8c7..56a4aaf5a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java @@ -109,20 +109,20 @@ public class CleaningFunctions { } if (Objects.nonNull(r.getPid())) { r - .setPid( - r - .getPid() - .stream() - .filter(Objects::nonNull) - .filter(sp -> StringUtils.isNotBlank(StringUtils.trim(sp.getValue()))) - .filter(sp -> NONE.equalsIgnoreCase(sp.getValue())) - .filter(sp -> Objects.nonNull(sp.getQualifier())) - .filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid())) - .map(sp -> { - sp.setValue(StringUtils.trim(sp.getValue())); - return sp; - }) - .collect(Collectors.toList())); + .setPid( + r + .getPid() + .stream() + .filter(Objects::nonNull) + .filter(sp -> StringUtils.isNotBlank(StringUtils.trim(sp.getValue()))) + .filter(sp -> NONE.equalsIgnoreCase(sp.getValue())) + .filter(sp -> Objects.nonNull(sp.getQualifier())) + .filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid())) + .map(sp -> { + sp.setValue(StringUtils.trim(sp.getValue())); + return sp; + }) + .collect(Collectors.toList())); } if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) { r diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/IdReplace.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/IdReplace.scala new file mode 100644 index 000000000..8d375600c --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/IdReplace.scala @@ -0,0 +1,3 @@ +package eu.dnetlib.dhp.sx.graph + +case class IdReplace(newId:String, oldId:String) {} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala index 822b16263..f359f73f9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala @@ -1,12 +1,15 @@ package eu.dnetlib.dhp.sx.graph import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation} +import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown} import eu.dnetlib.dhp.sx.ebi.EBIAggregator import org.apache.commons.io.IOUtils +import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.LoggerFactory +import org.apache.spark.sql.functions.col + object SparkSplitOafTODLIEntities { @@ -83,14 +86,42 @@ object SparkSplitOafTODLIEntities { } + def extract_ids(o:Oaf) :(String, String) = { + + o match { + case p: DLIPublication => + val prefix = StringUtils.substringBefore(p.getId, "|") + val original = StringUtils.substringAfter(p.getOriginalObjIdentifier, "::") + (p.getId, s"$prefix|$original") + case p: DLIDataset => + val prefix = StringUtils.substringBefore(p.getId, "|") + val original = StringUtils.substringAfter(p.getOriginalObjIdentifier, "::") + (p.getId, s"$prefix|$original") + case _ =>null + } + } + def extract_relations(spark:SparkSession, workingPath:String) :Unit = { implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation] + import spark.implicits._ val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf] val ebi_relation:Dataset[Relation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[Relation].repartition(2000) + + OAFDataset + .filter(o => o.isInstanceOf[Result]) + .map(extract_ids)(Encoders.tuple(Encoders.STRING, Encoders.STRING)) + .filter(r => r != null) + .where("_1 != _2") + .select(col("_1").alias("newId"), col("_2").alias("oldId")) + .distinct() + .map(f => IdReplace(f.getString(0), f.getString(1))) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/id_replace") + + OAFDataset .filter(s => s != null && s.isInstanceOf[Relation]) .map(s =>s.asInstanceOf[Relation]) @@ -100,7 +131,41 @@ object SparkSplitOafTODLIEntities { .agg(EBIAggregator.getRelationAggregator().toColumn) .map(p => p._2) .repartition(4000) - .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation") + .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation_unfixed") + + + val relations = spark.read.load(s"$workingPath/graph/relation_unfixed").as[Relation] + val ids = spark.read.load(s"$workingPath/graph/id_replace").as[IdReplace] + + relations + .map(r => (r.getSource, r))(Encoders.tuple(Encoders.STRING, relEncoder)) + .joinWith(ids, col("_1").equalTo(ids("oldId")), "left") + .map(i =>{ + val r = i._1._2 + if (i._2 != null) + { + val id = i._2.newId + r.setSource(id) + } + r + }).write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/rel_f_source") + + val rel_source:Dataset[Relation] = spark.read.load(s"$workingPath/graph/rel_f_source").as[Relation] + + rel_source + .map(r => (r.getTarget, r))(Encoders.tuple(Encoders.STRING, relEncoder)) + .joinWith(ids, col("_1").equalTo(ids("oldId")), "left") + .map(i =>{ + val r:Relation = i._1._2 + if (i._2 != null) + { + val id = i._2.newId + r.setTarget(id) + } + r + }).write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation") + + } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/ScholexplorerParserTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/ScholexplorerParserTest.java index d418da594..67226a031 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/ScholexplorerParserTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/ScholexplorerParserTest.java @@ -12,6 +12,7 @@ import com.fasterxml.jackson.databind.SerializationFeature; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.sx.graph.parser.DatasetScholexplorerParser; +import eu.dnetlib.dhp.sx.graph.parser.PublicationScholexplorerParser; import eu.dnetlib.scholexplorer.relation.RelationMapper; public class ScholexplorerParserTest { @@ -37,4 +38,26 @@ public class ScholexplorerParserTest { } }); } + + @Test + public void testPublicationParser() throws Exception { + String xml = IOUtils.toString(this.getClass().getResourceAsStream("pmf.xml")); + + PublicationScholexplorerParser p = new PublicationScholexplorerParser(); + List oaves = p.parseObject(xml, RelationMapper.load()); + + ObjectMapper m = new ObjectMapper(); + m.enable(SerializationFeature.INDENT_OUTPUT); + + oaves + .forEach( + oaf -> { + try { + System.out.println(m.writeValueAsString(oaf)); + System.out.println("----------------------------"); + } catch (JsonProcessingException e) { + + } + }); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/dmf.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/dmf.xml index 836e0b9a0..503d44bf7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/dmf.xml +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/dmf.xml @@ -1,51 +1,38 @@ - - aaadf8b3-01a8-4cc2-9964-63cfb19df3b4_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU= - oai:pangaea.de:doi:10.1594/PANGAEA.432865 - r3d100010134 - r3d100010134::00002f60593fd1f758fb838fafb46795 - 2020-02-18T03:05:02.534Z - - oai:pangaea.de:doi:10.1594/PANGAEA.432865 - citable topicOceans + xmlns="http://namespace.openaire.eu/"> + + r3d100010464::0002882a9d38c4f4612e7666ad768ccd + https://research.jcu.edu.au/researchdata/published/detail/9079e05370d830eb8d416c77c0b761ce::url + 2020-11-02T16:14:07.831Z + ands_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU= + r3d100010464 - - - 10.1594/pangaea.432865 - - Daily sea level from coastal tide gauge station Woods_Hole in 1978 (Research quality database) + + + https://research.jcu.edu.au/researchdata/published/detail/9079e05370d830eb8d416c77c0b761ce + + Vertebrate monitoring in the Australian Wet Tropics rainforest at CU6A1 (145.30367623, -16.57767628, 600.0m above MSL) collected by Reptile Surveys - PANGAEA - Data Publisher for Earth & Environmental Science - 2006 - - 1978-01-01T12:00:00/1978-12-31T12:00:00 + James Cook University + + 2013-05-07 - - - WOCE Sea Level, WSL - - - - DATE/TIME - Sea level - Tide gauge station - SeaLevel - World Ocean Circulation Experiment (WOCE) - - - - http://store.pangaea.de/Projects/WOCE/SeaLevel_rqds/Woods_Hole.txt + + Dataset + + r3d100010464::57793c5aa995172db237d9da17353f8b - - + + - + complete collected diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/pmf.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/pmf.xml new file mode 100644 index 000000000..7b9abd158 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/pmf.xml @@ -0,0 +1,25 @@ + + + + r3d100010464::57793c5aa995172db237d9da17353f8b + 10.1111/j.1365-2486.2005.00995.x::doi + 2020-11-02T16:14:07.831Z + ands_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU= + r3d100010464 + + + 10.1111/j.1365-2486.2005.00995.x + 10.1111/j.1365-2486.2005.00995.x + Potential decoupling of trends in distribution area and population size of species with climate change. + publication + + + + + complete + collected + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java index 6ea8ff735..ec3da5cfc 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java @@ -97,12 +97,17 @@ public class Scholix implements Serializable { } private List mergeScholixEntityId(final List a, final List b) { - final List m = new ArrayList<>(a); + final List m = a != null ? new ArrayList<>(a) : new ArrayList<>(); if (b != null) b.forEach(s -> { - int tt = (int) m.stream().filter(t -> t.getName().equalsIgnoreCase(s.getName())).count(); - if (tt == 0) { - m.add(s); + if (s != null) { + int tt = (int) m + .stream() + .filter(t -> t != null && t.getName() != null && t.getName().equalsIgnoreCase(s.getName())) + .count(); + if (tt == 0) { + m.add(s); + } } }); return m; @@ -110,7 +115,7 @@ public class Scholix implements Serializable { private List mergeScholixIdnetifier(final List a, final List b) { - final List m = new ArrayList<>(a); + final List m = a != null ? new ArrayList<>(a) : new ArrayList<>(); if (b != null) b.forEach(s -> { int tt = (int) m.stream().filter(t -> t.getIdentifier().equalsIgnoreCase(s.getIdentifier())).count(); @@ -123,7 +128,7 @@ public class Scholix implements Serializable { private List mergeScholixCollectedFrom(final List a, final List b) { - final List m = new ArrayList<>(a); + final List m = a != null ? new ArrayList<>(a) : new ArrayList<>(); if (b != null) b.forEach(s -> { int tt = (int) m @@ -139,14 +144,15 @@ public class Scholix implements Serializable { private ScholixRelationship mergeRelationships(final ScholixRelationship a, final ScholixRelationship b) { ScholixRelationship result = new ScholixRelationship(); - result.setName(StringUtils.isEmpty(a.getName()) ? b.getName() : a.getName()); - result.setInverse(StringUtils.isEmpty(a.getInverse()) ? b.getInverse() : a.getInverse()); - result.setSchema(StringUtils.isEmpty(a.getSchema()) ? b.getSchema() : a.getSchema()); + result.setName(a == null || StringUtils.isEmpty(a.getName()) ? b.getName() : a.getName()); + result.setInverse(a == null || StringUtils.isEmpty(a.getInverse()) ? b.getInverse() : a.getInverse()); + result.setSchema(a == null || StringUtils.isEmpty(a.getSchema()) ? b.getSchema() : a.getSchema()); return result; } private ScholixResource mergeResource(final ScholixResource a, final ScholixResource b) { - + if (a == null) + return b; final ScholixResource result = new ScholixResource(); result.setCollectedFrom(mergeScholixCollectedFrom(a.getCollectedFrom(), b.getCollectedFrom())); result.setCreator(mergeScholixEntityId(a.getCreator(), b.getCreator())); diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/index/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/index/oozie_app/config-default.xml index 6fb2a1253..7c1a43e51 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/index/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/index/oozie_app/config-default.xml @@ -7,4 +7,8 @@ oozie.action.sharelib.for.spark spark2 + + oozie.launcher.mapreduce.user.classpath.first + true + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/index/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/index/oozie_app/workflow.xml index 4f5c7bbf6..d98164afb 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/index/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/index/oozie_app/workflow.xml @@ -1,9 +1,17 @@ - + workingDirPath the source path + + index + the index name + + + esCluster + the Index cluster + sparkDriverMemory memory for driver process @@ -12,39 +20,43 @@ sparkExecutorMemory memory for individual executor - - index - index name - - - - indexHost - index host name - - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.provision.DropAndCreateESIndex + -i${index} + -c${esCluster} + + + + + + ${jobTracker} ${nameNode} yarn-cluster cluster - index Summary + index summary eu.dnetlib.dhp.provision.SparkIndexCollectionOnES dhp-graph-provision-scholexplorer-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="32" + --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" -mt yarn-cluster - --sourcePath${workingDirPath}/summary + --sourcePath${workingDirPath}/summary_json --index${index}_object - --esHost${indexHost} --idPathid - --typesummary + --cluster${esCluster} @@ -63,9 +75,8 @@ -mt yarn-cluster --sourcePath${workingDirPath}/scholix_json --index${index}_scholix - --esHost${indexHost} --idPathidentifier - --typescholix + --cluster${esCluster} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml index c2c2a78fb..4c0d6c1da 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml @@ -112,59 +112,5 @@ - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.provision.DropAndCreateESIndex - -i${index} - -c${esCluster} - - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - index summary - eu.dnetlib.dhp.provision.SparkIndexCollectionOnES - dhp-graph-provision-scholexplorer-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" - -mt yarn-cluster - --sourcePath${workingDirPath}/summary_json - --index${index}_object - --idPathid - --cluster${esCluster} - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - index scholix - eu.dnetlib.dhp.provision.SparkIndexCollectionOnES - dhp-graph-provision-scholexplorer-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" - -mt yarn-cluster - --sourcePath${workingDirPath}/scholix_json - --index${index}_scholix - --idPathidentifier - --cluster${esCluster} - - - - - \ No newline at end of file