From 31d2d6d41ea7dc590e028a5567165bf2612981bc Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 21 Jul 2021 18:09:14 +0200 Subject: [PATCH 01/14] Scholexplorer: introduction of dedup openaire --- .../eu/dnetlib/dhp/oa/merge/AuthorMerger.java | 1 + .../graph/SparkConvertDatasetToJsonRDD.scala | 41 ++++++ .../dhp/sx/graph/SparkCreateInputGraph.scala | 2 +- .../sx/graph/convert_dataset_json_params.json | 5 + .../oozie_app/config-default.xml | 0 .../extractEntities/oozie_app/workflow.xml | 85 +++++++++++++ .../dhp/sx/graph/step2/oozie_app/workflow.xml | 120 ------------------ 7 files changed, 133 insertions(+), 121 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertDatasetToJsonRDD.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/{step2 => extractEntities}/oozie_app/config-default.xml (100%) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/workflow.xml delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java index 0b602b7746..fe9f94960b 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java @@ -78,6 +78,7 @@ public class AuthorMerger { a -> a .getPid() .stream() + .filter(Objects::nonNull) .filter(p -> !basePidAuthorMap.containsKey(pidToComparableString(p))) .map(p -> new Tuple2<>(p, a))) .collect(Collectors.toList()); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertDatasetToJsonRDD.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertDatasetToJsonRDD.scala new file mode 100644 index 0000000000..0886a1fcaf --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertDatasetToJsonRDD.scala @@ -0,0 +1,41 @@ +package eu.dnetlib.dhp.sx.graph + +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.{Oaf, OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset} +import org.apache.commons.io.IOUtils +import org.apache.hadoop.io.compress.GzipCodec +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +object SparkConvertDatasetToJsonRDD { + + + def main(args: Array[String]): Unit = { + val log: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + val sourcePath = parser.get("sourcePath") + log.info(s"sourcePath -> $sourcePath") + val targetPath = parser.get("targetPath") + log.info(s"targetPath -> $targetPath") + + val resultObject = List("publication","dataset","software", "otherResearchProduct") + val mapper = new ObjectMapper() + implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) + + resultObject.foreach{item => + spark.read.load(s"$sourcePath/$item").as[Result].map(r=> mapper.writeValueAsString(r))(Encoders.STRING).rdd.saveAsTextFile(s"$targetPath/${item.toLowerCase}", classOf[GzipCodec]) + } + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala index a37dd21328..350b00c5ea 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala @@ -70,7 +70,7 @@ object SparkCreateInputGraph { resultObject.foreach { r => log.info(s"Make ${r._1} unique") - makeDatasetUnique(s"$targetPath/extracted/${r._1}",s"$targetPath/dedup/${r._1}",spark, r._2) + makeDatasetUnique(s"$targetPath/extracted/${r._1}",s"$targetPath/preprocess/${r._1}",spark, r._2) } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json new file mode 100644 index 0000000000..8bfdde5b0d --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json @@ -0,0 +1,5 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/workflow.xml new file mode 100644 index 0000000000..685976ce69 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/workflow.xml @@ -0,0 +1,85 @@ + + + + sourcePath + the working dir base path + + + targetPath + the graph Raw base path + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Extract entities in raw graph + eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=2000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --sourcePath${sourcePath} + --targetPath${targetPath} + + + + + + + + + + + + + + + + + yarn + cluster + Generate Input Graph for deduplication + eu.dnetlib.dhp.sx.graph.SparkConvertDatasetToJsonRDD + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=3000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --sourcePath${targetPath}/preprocess + --targetPath${targetPath}/dedup + + + + + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml deleted file mode 100644 index 9d06c42d62..0000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml +++ /dev/null @@ -1,120 +0,0 @@ - - - - workingPath - the working path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - Extract DLI Entities (Publication) - eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=5000 - ${sparkExtraOPT} - - -mt yarn-cluster - --workingPath${workingPath} - -epublication - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - Extract DLI Entities (Dataset) - eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=5000 - ${sparkExtraOPT} - - -mt yarn-cluster - --workingPath${workingPath} - -edataset - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - Extract DLI Entities (Unknown) - eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=5000 - ${sparkExtraOPT} - - -mt yarn-cluster - --workingPath${workingPath} - -eunknown - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - Extract DLI Entities (Relation) - eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=5000 - ${sparkExtraOPT} - - -mt yarn-cluster - --workingPath${workingPath} - -erelation - - - - - - - \ No newline at end of file From d94565862a39e72316d5a0338c2a756b8563fb9f Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 21 Jul 2021 21:23:11 +0200 Subject: [PATCH 02/14] fixed NPE --- .../src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java | 1 + 1 file changed, 1 insertion(+) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java index fe9f94960b..f900a276dd 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java @@ -67,6 +67,7 @@ public class AuthorMerger { a -> a .getPid() .stream() + .filter(Objects::nonNull) .map(p -> new Tuple2<>(pidToComparableString(p), a))) .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1)); From 62ae36a3d25d9110437e42391dab227485c056bb Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 22 Jul 2021 15:41:38 +0200 Subject: [PATCH 03/14] fixed NPE --- .../main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java | 7 +++++-- .../eu/dnetlib/dhp/actionmanager/datacite/record.json | 2 +- .../dhp/sx/graph/SparkConvertDatasetToJsonRDD.scala | 1 + 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java index f900a276dd..7a8e55a6ed 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java @@ -152,7 +152,7 @@ public class AuthorMerger { } private static boolean hasPid(Author a) { - if (a == null || a.getPid() == null || a.getPid().size() == 0) + if (a == null || a.getPid() == null || a.getPid().isEmpty()) return false; return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue())); } @@ -161,7 +161,10 @@ public class AuthorMerger { if (StringUtils.isNotBlank(author.getSurname())) { return new Person(author.getSurname() + ", " + author.getName(), false); } else { - return new Person(author.getFullname(), false); + if (StringUtils.isNotBlank(author.getFullname())) + return new Person(author.getFullname(), false); + else + return new Person("", false); } } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/datacite/record.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/datacite/record.json index 3ae10be738..f5aa659404 100644 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/datacite/record.json +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/datacite/record.json @@ -1 +1 @@ -{"id":"10.7282/t3-sjyd-1r46","type":"dois","attributes":{"doi":"10.7282/t3-sjyd-1r46","identifiers":[],"creators":[{"name":"Huang, Xu","nameType":"Personal","givenName":"Xu","familyName":"Huang","affiliation":[],"nameIdentifiers":[]},{"name":"Guo, Zhixiong","nameType":"Personal","givenName":"Zhixiong","familyName":"Guo","nameIdentifiers":[{"schemeUri":"https://orcid.org","nameIdentifier":"https://orcid.org/0000-0003-0481-2738","nameIdentifierScheme":"ORCID"}],"affiliation":[]}],"titles":[{"title":"High thermal conductance across c-BN/diamond interface"}],"publisher":"Rutgers University","container":{},"publicationYear":2099,"subjects":[{"subject":"Diamond"},{"subject":"Cubic boron nitride"},{"subject":"Thermal conductivity"},{"subject":"Interface"},{"subject":"Phonon"},{"subject":"Thermal conductance"}],"contributors":[],"dates":[{"date":"2099-12-31","dateType":"Accepted"},{"date":"2099","dateType":"Issued"}],"language":"en","types":{"ris":"RPRT","bibtex":"article","citeproc":"article-journal","schemaOrg":"ScholarlyArticle","resourceType":"Accepted manuscript","resourceTypeGeneral":"Text"},"relatedIdentifiers":[],"sizes":[],"formats":["application/pdf"],"version":null,"rightsList":[{"rights":"Embargo"}],"descriptions":[{"description":"High thermal conductivity electronic components with low interfacial thermal resistance are of technological importance and fundamental interest of research. Diamond, a superhard material with ultrahigh thermal conductivity at room temperature, is desirable for microelectronics thermal management. Cubic polymorph of boron nitride (c-BN) is a promising material due to wide bandgap and diamond like structure and properties. To understand the nature in thermal transport of diamond, c-BN and the most commonly used silicon (Si) semiconductor, ab initio phonon Boltzmann transport equations are employed to investigate lattice vibrational properties of these three materials. At 300 K, the predicted thermal conductivity of Si, diamond and c-BN reached 142, 2112, and 736 W/(m��K), respectively. What's more, heat transport phenomena across the interfaces of Si/diamond, c-BN/diamond and Si/c-BN are unfolded. In comparison, the interfacial thermal conductance of c-BN/diamond is ten-fold of Si/diamond; besides, the thermal conductance across Si/c-BN interface is 20.2% larger than that of Si/diamond at 300 K and 18.9% larger at 340 K. These findings provide us new vision and potential solution to heat dissipation of high-local-power density devices, shedding light on future thermal management of c-BN and diamond related electronics.","descriptionType":"Abstract"}],"geoLocations":[],"fundingReferences":[],"url":"https://scholarship.libraries.rutgers.edu/discovery/fulldisplay/alma991031549917804646/01RUT_INST:ResearchRepository","contentUrl":null,"metadataVersion":1,"schemaVersion":"http://datacite.org/schema/kernel-4","source":"mds","isActive":true,"state":"findable","reason":null,"viewCount":0,"downloadCount":0,"referenceCount":0,"citationCount":0,"partCount":0,"partOfCount":0,"versionCount":0,"versionOfCount":0,"created":"2020-06-30T21:12:19Z","registered":"2020-07-02T16:45:07Z","published":null,"updated":"2021-01-14T18:24:19Z"},"relationships":{"client":{"data":{"id":"rutgers.lib","type":"clients"}}}} \ No newline at end of file +{"id":"10.5517/ccdc.csd.cc25rpzm","type":"dois","attributes":{"doi":"10.5517/ccdc.csd.cc25rpzm","prefix":"10.5517","suffix":"ccdc.csd.cc25rpzm","identifiers":[{"identifier":"2018781","identifierType":"CCDC"}],"alternateIdentifiers":[{"alternateIdentifierType":"CCDC","alternateIdentifier":"2018781"}],"creators":[{"name":"Ling, Irene","affiliation":[],"nameIdentifiers":[]},{"name":"Sobolev, Alexandre N.","affiliation":[],"nameIdentifiers":[]},{"name":"Raston, Colin L.","affiliation":[],"nameIdentifiers":[]}],"titles":[{"title":"CCDC 2018781: Experimental Crystal Structure Determination"}],"publisher":"Cambridge Crystallographic Data Centre","container":{},"publicationYear":2021,"subjects":[{"subject":"Crystal Structure"},{"subject":"Experimental 3D Coordinates"},{"subject":"Crystal System"},{"subject":"Space Group"},{"subject":"Cell Parameters"},{"subject":"Crystallography"},{"subject":"bis[penta-aqua-copper(ii)] bis(mu-5,11,17,23-tetra-sulfonato-25,26,27,28-tetrahydroxycalix(4)arene)-dodeca-aqua-tri-copper(ii) bis(nitrate) heptahydrate"}],"contributors":[],"dates":[],"language":"en","types":{"ris":"DATA","bibtex":"misc","citeproc":"dataset","schemaOrg":"Dataset","resourceTypeGeneral":"Dataset"},"relatedIdentifiers":[{"relationType":"IsSupplementTo","relatedIdentifier":"10.1080/00958972.2020.1849642","relatedIdentifierType":"DOI"}],"sizes":[],"formats":["CIF"],"version":null,"rightsList":[],"descriptions":[{"description":"Related Article: Irene Ling, Alexandre N. Sobolev, Colin L. Raston|2021|J.Coord.Chem.|74|40|doi:10.1080/00958972.2020.1849642","descriptionType":"Other"}],"geoLocations":[],"fundingReferences":[],"xml":"PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiPz4KPHJlc291cmNlIHhtbG5zOnhzaT0iaHR0cDovL3d3dy53My5vcmcvMjAwMS9YTUxTY2hlbWEtaW5zdGFuY2UiIHhtbG5zPSJodHRwOi8vZGF0YWNpdGUub3JnL3NjaGVtYS9rZXJuZWwtNCIgeHNpOnNjaGVtYUxvY2F0aW9uPSJodHRwOi8vZGF0YWNpdGUub3JnL3NjaGVtYS9rZXJuZWwtNCBodHRwOi8vc2NoZW1hLmRhdGFjaXRlLm9yZy9tZXRhL2tlcm5lbC00L21ldGFkYXRhLnhzZCI+CiAgPGlkZW50aWZpZXIgaWRlbnRpZmllclR5cGU9IkRPSSI+MTAuNTUxNy9DQ0RDLkNTRC5DQzI1UlBaTTwvaWRlbnRpZmllcj4KICA8Y3JlYXRvcnM+CiAgICA8Y3JlYXRvcj4KICAgICAgPGNyZWF0b3JOYW1lPkxpbmcsIElyZW5lPC9jcmVhdG9yTmFtZT4KICAgIDwvY3JlYXRvcj4KICAgIDxjcmVhdG9yPgogICAgICA8Y3JlYXRvck5hbWU+U29ib2xldiwgQWxleGFuZHJlIE4uPC9jcmVhdG9yTmFtZT4KICAgIDwvY3JlYXRvcj4KICAgIDxjcmVhdG9yPgogICAgICA8Y3JlYXRvck5hbWU+UmFzdG9uLCBDb2xpbiBMLjwvY3JlYXRvck5hbWU+CiAgICA8L2NyZWF0b3I+CiAgPC9jcmVhdG9ycz4KICA8dGl0bGVzPgogICAgPHRpdGxlPkNDREMgMjAxODc4MTogRXhwZXJpbWVudGFsIENyeXN0YWwgU3RydWN0dXJlIERldGVybWluYXRpb248L3RpdGxlPgogIDwvdGl0bGVzPgogIDxwdWJsaXNoZXI+Q2FtYnJpZGdlIENyeXN0YWxsb2dyYXBoaWMgRGF0YSBDZW50cmU8L3B1Ymxpc2hlcj4KICA8cHVibGljYXRpb25ZZWFyPjIwMjE8L3B1YmxpY2F0aW9uWWVhcj4KICA8cmVzb3VyY2VUeXBlIHJlc291cmNlVHlwZUdlbmVyYWw9IkRhdGFzZXQiLz4KICA8c3ViamVjdHM+CiAgICA8c3ViamVjdD5DcnlzdGFsIFN0cnVjdHVyZTwvc3ViamVjdD4KICAgIDxzdWJqZWN0PkV4cGVyaW1lbnRhbCAzRCBDb29yZGluYXRlczwvc3ViamVjdD4KICAgIDxzdWJqZWN0PkNyeXN0YWwgU3lzdGVtPC9zdWJqZWN0PgogICAgPHN1YmplY3Q+U3BhY2UgR3JvdXA8L3N1YmplY3Q+CiAgICA8c3ViamVjdD5DZWxsIFBhcmFtZXRlcnM8L3N1YmplY3Q+CiAgICA8c3ViamVjdD5DcnlzdGFsbG9ncmFwaHk8L3N1YmplY3Q+CiAgICA8c3ViamVjdD5iaXNbcGVudGEtYXF1YS1jb3BwZXIoaWkpXSBiaXMobXUtNSwxMSwxNywyMy10ZXRyYS1zdWxmb25hdG8tMjUsMjYsMjcsMjgtdGV0cmFoeWRyb3h5Y2FsaXgoNClhcmVuZSktZG9kZWNhLWFxdWEtdHJpLWNvcHBlcihpaSkgYmlzKG5pdHJhdGUpIGhlcHRhaHlkcmF0ZTwvc3ViamVjdD4KICA8L3N1YmplY3RzPgogIDxsYW5ndWFnZT5lbmc8L2xhbmd1YWdlPgogIDxhbHRlcm5hdGVJZGVudGlmaWVycz4KICAgIDxhbHRlcm5hdGVJZGVudGlmaWVyIGFsdGVybmF0ZUlkZW50aWZpZXJUeXBlPSJDQ0RDIj4yMDE4NzgxPC9hbHRlcm5hdGVJZGVudGlmaWVyPgogIDwvYWx0ZXJuYXRlSWRlbnRpZmllcnM+CiAgPHJlbGF0ZWRJZGVudGlmaWVycz4KICAgIDxyZWxhdGVkSWRlbnRpZmllciByZWxhdGVkSWRlbnRpZmllclR5cGU9IkRPSSIgcmVsYXRpb25UeXBlPSJJc1N1cHBsZW1lbnRUbyI+MTAuMTA4MC8wMDk1ODk3Mi4yMDIwLjE4NDk2NDI8L3JlbGF0ZWRJZGVudGlmaWVyPgogIDwvcmVsYXRlZElkZW50aWZpZXJzPgogIDxzaXplcy8+CiAgPGZvcm1hdHM+CiAgICA8Zm9ybWF0PkNJRjwvZm9ybWF0PgogIDwvZm9ybWF0cz4KICA8dmVyc2lvbi8+CiAgPGRlc2NyaXB0aW9ucz4KICAgIDxkZXNjcmlwdGlvbiBkZXNjcmlwdGlvblR5cGU9Ik90aGVyIj5SZWxhdGVkIEFydGljbGU6IElyZW5lIExpbmcsICBBbGV4YW5kcmUgTi4gU29ib2xldiwgIENvbGluIEwuIFJhc3RvbnwyMDIxfEouQ29vcmQuQ2hlbS58NzR8NDB8ZG9pOjEwLjEwODAvMDA5NTg5NzIuMjAyMC4xODQ5NjQyPC9kZXNjcmlwdGlvbj4KICA8L2Rlc2NyaXB0aW9ucz4KPC9yZXNvdXJjZT4K","url":"http://www.ccdc.cam.ac.uk/services/structure_request?id=doi:10.5517/ccdc.csd.cc25rpzm&sid=DataCite","contentUrl":null,"metadataVersion":3,"schemaVersion":"http://datacite.org/schema/kernel-4","source":"api","isActive":true,"state":"findable","reason":null,"viewCount":0,"viewsOverTime":[],"downloadCount":0,"downloadsOverTime":[],"referenceCount":0,"citationCount":0,"citationsOverTime":[],"partCount":0,"partOfCount":0,"versionCount":0,"versionOfCount":0,"created":"2021-03-09T13:25:35.000Z","registered":"2021-03-09T13:25:36.000Z","published":"2021","updated":"2021-03-31T21:49:56.000Z"},"relationships":{"client":{"data":{"id":"ccdc.csd","type":"clients"}},"provider":{"data":{"id":"ccdc","type":"providers"}},"media":{"data":{"id":"10.5517/ccdc.csd.cc25rpzm","type":"media"}},"references":{"data":[]},"citations":{"data":[]},"parts":{"data":[]},"partOf":{"data":[]},"versions":{"data":[]},"versionOf":{"data":[]}}} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertDatasetToJsonRDD.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertDatasetToJsonRDD.scala index 0886a1fcaf..3ee0c7dd6a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertDatasetToJsonRDD.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertDatasetToJsonRDD.scala @@ -33,6 +33,7 @@ object SparkConvertDatasetToJsonRDD { val mapper = new ObjectMapper() implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) + resultObject.foreach{item => spark.read.load(s"$sourcePath/$item").as[Result].map(r=> mapper.writeValueAsString(r))(Encoders.STRING).rdd.saveAsTextFile(s"$targetPath/${item.toLowerCase}", classOf[GzipCodec]) } From 058b636d4d0949e0b76773b280fd3f65987afefb Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 22 Jul 2021 16:08:54 +0200 Subject: [PATCH 04/14] added control to check if the entity exists --- .../dhp/oa/dedup/SparkUpdateEntity.java | 71 ++++++++++--------- 1 file changed, 39 insertions(+), 32 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java index 03709c8fec..fdef7f77d4 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; @@ -77,48 +78,54 @@ public class SparkUpdateEntity extends AbstractSparkAction { (type, clazz) -> { final String outputPath = dedupGraphPath + "/" + type; removeOutputDir(spark, outputPath); + final String ip = DedupUtility.createEntityPath(graphBasePath, type.toString()); + if (HdfsSupport.exists(ip, sc.hadoopConfiguration())) { + JavaRDD sourceEntity = sc + .textFile(DedupUtility.createEntityPath(graphBasePath, type.toString())); - JavaRDD sourceEntity = sc - .textFile(DedupUtility.createEntityPath(graphBasePath, type.toString())); + if (mergeRelExists(workingPath, type.toString())) { - if (mergeRelExists(workingPath, type.toString())) { + final String mergeRelPath = DedupUtility + .createMergeRelPath(workingPath, "*", type.toString()); + final String dedupRecordPath = DedupUtility + .createDedupRecordPath(workingPath, "*", type.toString()); - final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, "*", type.toString()); - final String dedupRecordPath = DedupUtility - .createDedupRecordPath(workingPath, "*", type.toString()); + final Dataset rel = spark + .read() + .load(mergeRelPath) + .as(Encoders.bean(Relation.class)); - final Dataset rel = spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class)); + final JavaPairRDD mergedIds = rel + .where("relClass == 'merges'") + .where("source != target") + .select(rel.col("target")) + .distinct() + .toJavaRDD() + .mapToPair( + (PairFunction) r -> new Tuple2<>(r.getString(0), "d")); - final JavaPairRDD mergedIds = rel - .where("relClass == 'merges'") - .where("source != target") - .select(rel.col("target")) - .distinct() - .toJavaRDD() - .mapToPair( - (PairFunction) r -> new Tuple2<>(r.getString(0), "d")); + JavaPairRDD entitiesWithId = sourceEntity + .mapToPair( + (PairFunction) s -> new Tuple2<>( + MapDocumentUtil.getJPathString(IDJSONPATH, s), s)); + if (type == EntityType.organization) // exclude root records from organizations + entitiesWithId = excludeRootOrgs(entitiesWithId, rel); - JavaPairRDD entitiesWithId = sourceEntity - .mapToPair( - (PairFunction) s -> new Tuple2<>( - MapDocumentUtil.getJPathString(IDJSONPATH, s), s)); - if (type == EntityType.organization) // exclude root records from organizations - entitiesWithId = excludeRootOrgs(entitiesWithId, rel); + JavaRDD map = entitiesWithId + .leftOuterJoin(mergedIds) + .map(k -> { + if (k._2()._2().isPresent()) { + return updateDeletedByInference(k._2()._1(), clazz); + } + return k._2()._1(); + }); - JavaRDD map = entitiesWithId - .leftOuterJoin(mergedIds) - .map(k -> { - if (k._2()._2().isPresent()) { - return updateDeletedByInference(k._2()._1(), clazz); - } - return k._2()._1(); - }); + sourceEntity = map.union(sc.textFile(dedupRecordPath)); - sourceEntity = map.union(sc.textFile(dedupRecordPath)); + } + sourceEntity.saveAsTextFile(outputPath, GzipCodec.class); } - - sourceEntity.saveAsTextFile(outputPath, GzipCodec.class); }); } From 43e9380cd351bbc4697548c79c1a48e1bad7ac3b Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 23 Jul 2021 11:25:18 +0200 Subject: [PATCH 05/14] update resolve relation to use the same format of openaire graph --- .../dhp/sx/graph/SparkResolveRelation.scala | 77 +++++++++++++++---- 1 file changed, 62 insertions(+), 15 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala index 6ee575e2a8..11d026f025 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala @@ -4,7 +4,12 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.{Relation, Result} import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD import org.apache.spark.sql._ +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.JsonAST.{JField, JObject, JString} +import org.json4s.jackson.JsonMethods.parse import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ @@ -29,23 +34,11 @@ object SparkResolveRelation { val workingPath = parser.get("workingPath") log.info(s"workingPath -> $workingPath") - - implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) import spark.implicits._ - val entities:Dataset[Result] = spark.read.load(s"$entityPath/*").as[Result] - entities.flatMap(e => e.getPid.asScala - .map(p => - convertPidToDNETIdentifier(p.getValue, p.getQualifier.getClassid)) - .filter(s => s!= null) - .map(s => (s,e.getId)) - ).groupByKey(_._1) - .reduceGroups((x,y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y) - .map(s =>s._2) - .write - .mode(SaveMode.Overwrite) - .save(s"$workingPath/resolvedPid") + + extractPidResolvedTableFromJsonRDD(spark, entityPath, workingPath) val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/resolvedPid").as[(String,String)] @@ -74,11 +67,65 @@ object SparkResolveRelation { }.filter(r => r.getSource.startsWith("50")&& r.getTarget.startsWith("50")) .write .mode(SaveMode.Overwrite) - .save(s"$workingPath/resolvedRelation") + .save(s"$workingPath/relation") } + private def extractPidsFromRecord(input:String):(String,List[(String,String)]) = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + val id:String = (json \ "id").extract[String] + val result: List[(String,String)] = for { + JObject(pids) <- json \ "pid" + JField("value", JString(pidValue)) <- pids + JField("qualifier", JObject(qualifier)) <- pids + JField("classname", JString(pidType)) <- qualifier + } yield (pidValue, pidType) + (id,result) + } + private def extractPidResolvedTableFromJsonRDD(spark: SparkSession, entityPath: String, workingPath: String) = { + import spark.implicits._ + + val d: RDD[(String,String)] = spark.sparkContext.textFile(s"$entityPath/*") + .map(i => extractPidsFromRecord(i)) + .filter(s => s != null && s._2!=null && s._2.nonEmpty) + .flatMap{ p => + p._2.map(pid => + (p._1,convertPidToDNETIdentifier(pid._1, pid._2)) + ) + } + + spark.createDataset(d) + .groupByKey(_._1) + .reduceGroups((x, y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y) + .map(s => s._2) + .write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/resolvedPid") + } + + + /* + This method should be used once we finally convert everythings in Kryo dataset + instead of using rdd of json + */ + private def extractPidResolvedTableFromKryo(spark: SparkSession, entityPath: String, workingPath: String) = { + import spark.implicits._ + implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) + val entities: Dataset[Result] = spark.read.load(s"$entityPath/*").as[Result] + entities.flatMap(e => e.getPid.asScala + .map(p => + convertPidToDNETIdentifier(p.getValue, p.getQualifier.getClassid)) + .filter(s => s != null) + .map(s => (s, e.getId)) + ).groupByKey(_._1) + .reduceGroups((x, y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y) + .map(s => s._2) + .write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/resolvedPid") + } def convertPidToDNETIdentifier(pid:String, pidType: String):String = { if (pid==null || pid.isEmpty || pidType== null || pidType.isEmpty) From ca74e8dd02617b5c2911f1cc4be53a47f08f2f13 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 23 Jul 2021 11:40:06 +0200 Subject: [PATCH 06/14] create a separate wf for resolving relation --- .../oozie_app/config-default.xml | 0 .../resolverelation/oozie_app/workflow.xml | 52 ++++++++++++++++ .../dhp/sx/graph/step3/oozie_app/workflow.xml | 61 ------------------- 3 files changed, 52 insertions(+), 61 deletions(-) rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/{step3 => resolverelation}/oozie_app/config-default.xml (100%) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml new file mode 100644 index 0000000000..e73b61a749 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml @@ -0,0 +1,52 @@ + + + + entityPath + the path of deduplicate Entities + + + relationPath + the path of relation unresolved + + + targetPath + the path of relation unresolved + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + yarn + cluster + Resolve Relations in raw graph + eu.dnetlib.dhp.sx.graph.SparkResolveRelation + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=3000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --relationPath${relationPath} + --workingPath${targetPath} + --entityPath${entityPath} + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/workflow.xml deleted file mode 100644 index 4d54b2afbc..0000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/workflow.xml +++ /dev/null @@ -1,61 +0,0 @@ - - - - sourcePath - the source path - - - targetPath - the source path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - entity - the entity to be merged - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - Merge ${entity} - eu.dnetlib.dhp.sx.graph.SparkScholexplorerCreateRawGraphJob - dhp-graph-mapper-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} - -mt yarn-cluster - --sourcePath${sourcePath}/${entity} - --targetPath${targetPath}/${entity} - --entity${entity} - - - - - - - \ No newline at end of file From bc835d2024df229fa0fe132b7d216ce94dee456e Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 23 Jul 2021 11:55:55 +0200 Subject: [PATCH 07/14] [cleaning] fixed filtering function for missing titles --- .../eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java index a75cc52e6a..e5181b1119 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java @@ -98,7 +98,7 @@ public class GraphCleaningFunctions extends CleaningFunctions { Result r = (Result) value; - if (Objects.nonNull(r.getTitle()) && r.getTitle().isEmpty()) { + if (Objects.isNull(r.getTitle()) || r.getTitle().isEmpty()) { return false; } From 4a439c3863de04e1c3c051ed2e2fc18f4731d1a9 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 23 Jul 2021 13:23:25 +0200 Subject: [PATCH 08/14] NPE fixed --- .../java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala index 11d026f025..9e7963b2ac 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala @@ -48,7 +48,7 @@ object SparkResolveRelation { m => val sourceResolved = m._2 val currentRelation = m._1._2 - if (sourceResolved!=null && sourceResolved._2.nonEmpty) + if (sourceResolved!=null && sourceResolved._2!=null && sourceResolved._2.nonEmpty) currentRelation.setSource(sourceResolved._2) currentRelation }.write From cfde63a7c3741111b2c4b95573c2e8a166732935 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 23 Jul 2021 14:17:17 +0200 Subject: [PATCH 09/14] fixed resolve relation join --- .../java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala index 9e7963b2ac..aa09311e3d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala @@ -44,7 +44,7 @@ object SparkResolveRelation { val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) - relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_1")), "left").map{ + relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_2")), "left").map{ m => val sourceResolved = m._2 val currentRelation = m._1._2 @@ -57,7 +57,7 @@ object SparkResolveRelation { val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/resolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) - relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_1")), "left").map{ + relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_2")), "left").map{ m => val targetResolved = m._2 val currentRelation = m._1._2 From d9e3b8993739c649b622aaa6c6c401710f8b2a31 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 23 Jul 2021 16:38:32 +0200 Subject: [PATCH 10/14] implemented last part of workflows to generate scholixGraph --- .../sx/graph/SparkConvertRDDtoDataset.scala | 43 ++++++++++++++++ .../dhp/sx/graph/SparkResolveRelation.scala | 22 ++++---- .../oozie_app/config-default.xml | 0 .../oozie_app/workflow.xml | 51 ++++--------------- .../resolverelation/oozie_app/workflow.xml | 12 ++++- 5 files changed, 75 insertions(+), 53 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/{step1 => finalGraph}/oozie_app/config-default.xml (100%) rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/{step1 => finalGraph}/oozie_app/workflow.xml (77%) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala new file mode 100644 index 0000000000..2cd176deed --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala @@ -0,0 +1,43 @@ +package eu.dnetlib.dhp.sx.graph +import com.cloudera.com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset} +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.{Logger, LoggerFactory} +object SparkConvertRDDtoDataset { + + def main(args: Array[String]): Unit = { + val entities = List( + ("dataset", classOf[OafDataset]), + ("otherresearchproduct", classOf[OtherResearchProduct]), + ("publication", classOf[Publication]), + ("software", classOf[Software]) + ) + + val log: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + val sourcePath = parser.get("sourcePath") + log.info(s"sourcePath -> $sourcePath") + val targetPath = parser.get("targetPath") + log.info(s"targetPath -> $targetPath") + val mapper = new ObjectMapper() + implicit val resultEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) + + entities.foreach{ + e => + val rdd =spark.sparkContext.textFile(s"$sourcePath/${e._1}").map(s => mapper.readValue(s, e._2)) + spark.createDataset(rdd).as[Result].write.mode(SaveMode.Overwrite).save(s"$targetPath/${e._1}") + } + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala index aa09311e3d..0d0dc4159b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala @@ -30,7 +30,7 @@ object SparkResolveRelation { val relationPath = parser.get("relationPath") log.info(s"sourcePath -> $relationPath") val entityPath = parser.get("entityPath") - log.info(s"targetPath -> $entityPath") + log.info(s"entityPath -> $entityPath") val workingPath = parser.get("workingPath") log.info(s"workingPath -> $workingPath") @@ -48,8 +48,8 @@ object SparkResolveRelation { m => val sourceResolved = m._2 val currentRelation = m._1._2 - if (sourceResolved!=null && sourceResolved._2!=null && sourceResolved._2.nonEmpty) - currentRelation.setSource(sourceResolved._2) + if (sourceResolved!=null && sourceResolved._1!=null && sourceResolved._1.nonEmpty) + currentRelation.setSource(sourceResolved._1) currentRelation }.write .mode(SaveMode.Overwrite) @@ -61,13 +61,13 @@ object SparkResolveRelation { m => val targetResolved = m._2 val currentRelation = m._1._2 - if (targetResolved!=null && targetResolved._2.nonEmpty) - currentRelation.setTarget(targetResolved._2) + if (targetResolved!=null && targetResolved._1.nonEmpty) + currentRelation.setTarget(targetResolved._1) currentRelation }.filter(r => r.getSource.startsWith("50")&& r.getTarget.startsWith("50")) .write .mode(SaveMode.Overwrite) - .save(s"$workingPath/relation") + .save(s"$workingPath/relation_resolved") } @@ -89,16 +89,16 @@ object SparkResolveRelation { val d: RDD[(String,String)] = spark.sparkContext.textFile(s"$entityPath/*") .map(i => extractPidsFromRecord(i)) - .filter(s => s != null && s._2!=null && s._2.nonEmpty) + .filter(s => s != null && s._1!= null && s._2!=null && s._2.nonEmpty) .flatMap{ p => p._2.map(pid => - (p._1,convertPidToDNETIdentifier(pid._1, pid._2)) + (p._1, convertPidToDNETIdentifier(pid._1, pid._2)) ) - } + }.filter(r =>r._1 != null || r._2 != null) spark.createDataset(d) - .groupByKey(_._1) - .reduceGroups((x, y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y) + .groupByKey(_._2) + .reduceGroups((x, y) => if (x._1.startsWith("50|doi") || x._1.startsWith("50|pmid")) x else y) .map(s => s._2) .write .mode(SaveMode.Overwrite) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml similarity index 77% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml index 3ea4e9d30d..4e601bff33 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + sourcePath @@ -6,48 +6,22 @@ targetPath - the graph Raw base path + the final graph path - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - Extract entities in raw graph - eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=2000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn - --sourcePath${sourcePath} - --targetPath${targetPath} - - - - - - - - - yarn - cluster - Resolve Relations in raw graph - eu.dnetlib.dhp.sx.graph.SparkResolveRelation + Import JSONRDD to Dataset kryo + eu.dnetlib.dhp.sx.graph.SparkConvertRDDtoDataset dhp-graph-mapper-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -60,9 +34,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --masteryarn - --relationPath${targetPath}/extracted/relation - --workingPath${targetPath}/resolved/ - --entityPath${targetPath}/dedup + --sourcePath${sourcePath} + --targetPath${targetPath}/entities @@ -87,7 +60,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --masteryarn - --sourcePath${targetPath}/dedup + --sourcePath${targetPath}/entities --targetPath${targetPath}/provision/summaries @@ -114,7 +87,7 @@ --masteryarn --summaryPath${targetPath}/provision/summaries --targetPath${targetPath}/provision/scholix - --relationPath${targetPath}/resolved/resolvedRelation + --relationPath${sourcePath}/relation_resolved @@ -182,9 +155,5 @@ - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml index e73b61a749..7683ff94cd 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml @@ -15,14 +15,24 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + yarn From a0393607a7cadb821ebf25fb76e72d226127440a Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 23 Jul 2021 18:14:37 +0200 Subject: [PATCH 11/14] mapping funding relations from Datacite should be done according to the actual result identifier --- .../actionmanager/datacite/DataciteToOAFTransformation.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala index 0cdf0accb1..045927bed1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala @@ -532,11 +532,11 @@ object DataciteToOAFTransformation { JField("awardUri", JString(awardUri)) <- fundingReferences } yield awardUri + result.setId(IdentifierFactory.createIdentifier(result)) var relations: List[Relation] = awardUris.flatMap(a => get_projectRelation(a, result.getId)).filter(r => r != null) - fix_figshare(result) - result.setId(IdentifierFactory.createIdentifier(result)) + if (result.getId == null) return List() From 3920c69bc8e050ada6284f8d40e687e5dd9b1761 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Sun, 25 Jul 2021 09:51:24 +0200 Subject: [PATCH 12/14] change implementation of resolve Relation to generate jsonRdd in output --- .../dhp/oa/dedup/GroupEntitiesSparkJob.java | 3 ++- .../dhp/sx/graph/SparkResolveRelation.scala | 15 ++++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java index 58009bfcfc..3f27b94422 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java @@ -38,7 +38,8 @@ import scala.Tuple2; /** * Groups the graph content by entity identifier to ensure ID uniqueness */ -public class GroupEntitiesSparkJob { +public class +GroupEntitiesSparkJob { private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala index 0d0dc4159b..82bf3c50e2 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala @@ -1,8 +1,10 @@ package eu.dnetlib.dhp.sx.graph +import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.{Relation, Result} import org.apache.commons.io.IOUtils +import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -40,7 +42,9 @@ object SparkResolveRelation { extractPidResolvedTableFromJsonRDD(spark, entityPath, workingPath) - val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/resolvedPid").as[(String,String)] + val mappper = new ObjectMapper() + + val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String,String)] val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) @@ -68,6 +72,11 @@ object SparkResolveRelation { .write .mode(SaveMode.Overwrite) .save(s"$workingPath/relation_resolved") + + spark.read.load(s"$workingPath/relation_resolved").as[Relation] + .map(r => mappper.writeValueAsString(r)) + .rdd.saveAsTextFile(s"$workingPath/relation", classOf[GzipCodec]) + } @@ -102,7 +111,7 @@ object SparkResolveRelation { .map(s => s._2) .write .mode(SaveMode.Overwrite) - .save(s"$workingPath/resolvedPid") + .save(s"$workingPath/relationResolvedPid") } @@ -124,7 +133,7 @@ object SparkResolveRelation { .map(s => s._2) .write .mode(SaveMode.Overwrite) - .save(s"$workingPath/resolvedPid") + .save(s"$workingPath/relationResolvedPid") } def convertPidToDNETIdentifier(pid:String, pidType: String):String = { From 8fac10c91e109fd3ea74d4534b68df468618bf1a Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Sun, 25 Jul 2021 11:15:37 +0200 Subject: [PATCH 13/14] fixed defintion wf of creation final infospace of scholexplorer --- .../sx/graph/SparkConvertRDDtoDataset.scala | 58 +++++++++++++------ .../sx/graph/SparkCreateSummaryObject.scala | 5 +- .../dhp/sx/graph/SparkResolveRelation.scala | 4 +- .../graph/finalGraph/oozie_app/workflow.xml | 4 +- 4 files changed, 48 insertions(+), 23 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala index 2cd176deed..cb41d6134c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala @@ -1,7 +1,8 @@ package eu.dnetlib.dhp.sx.graph -import com.cloudera.com.fasterxml.jackson.databind.ObjectMapper + +import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset} +import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset} import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} @@ -9,12 +10,7 @@ import org.slf4j.{Logger, LoggerFactory} object SparkConvertRDDtoDataset { def main(args: Array[String]): Unit = { - val entities = List( - ("dataset", classOf[OafDataset]), - ("otherresearchproduct", classOf[OtherResearchProduct]), - ("publication", classOf[Publication]), - ("software", classOf[Software]) - ) + val log: Logger = LoggerFactory.getLogger(getClass) val conf: SparkConf = new SparkConf() @@ -29,15 +25,43 @@ object SparkConvertRDDtoDataset { val sourcePath = parser.get("sourcePath") log.info(s"sourcePath -> $sourcePath") - val targetPath = parser.get("targetPath") - log.info(s"targetPath -> $targetPath") - val mapper = new ObjectMapper() - implicit val resultEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) + val t = parser.get("targetPath") + log.info(s"targetPath -> $t") + + val entityPath = s"$t/entities" + val relPath = s"$t/relation" + val mapper = new ObjectMapper() + implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset]) + implicit val publicationEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication]) + implicit val relationEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) + implicit val orpEncoder: Encoder[OtherResearchProduct] = Encoders.kryo(classOf[OtherResearchProduct]) + implicit val softwareEncoder: Encoder[Software] = Encoders.kryo(classOf[Software]) + + + log.info("Converting dataset") + val rddDataset =spark.sparkContext.textFile(s"$sourcePath/dataset").map(s => mapper.readValue(s, classOf[OafDataset])) + spark.createDataset(rddDataset).as[OafDataset].write.mode(SaveMode.Overwrite).save(s"$entityPath/dataset") + + + log.info("Converting publication") + val rddPublication =spark.sparkContext.textFile(s"$sourcePath/publication").map(s => mapper.readValue(s, classOf[Publication])) + spark.createDataset(rddPublication).as[Publication].write.mode(SaveMode.Overwrite).save(s"$entityPath/publication") + + log.info("Converting software") + val rddSoftware =spark.sparkContext.textFile(s"$sourcePath/software").map(s => mapper.readValue(s, classOf[Software])) + spark.createDataset(rddSoftware).as[Software].write.mode(SaveMode.Overwrite).save(s"$entityPath/software") + + log.info("Converting otherresearchproduct") + val rddOtherResearchProduct =spark.sparkContext.textFile(s"$sourcePath/otherresearchproduct").map(s => mapper.readValue(s, classOf[OtherResearchProduct])) + spark.createDataset(rddOtherResearchProduct).as[OtherResearchProduct].write.mode(SaveMode.Overwrite).save(s"$entityPath/otherresearchproduct") + + + log.info("Converting Relation") + + + val rddRelation =spark.sparkContext.textFile(s"$sourcePath/relation").map(s => mapper.readValue(s, classOf[Relation])) + spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") + - entities.foreach{ - e => - val rdd =spark.sparkContext.textFile(s"$sourcePath/${e._1}").map(s => mapper.readValue(s, e._2)) - spark.createDataset(rdd).as[Result].write.mode(SaveMode.Overwrite).save(s"$targetPath/${e._1}") - } } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala index a66da3e6d8..ac189b6ba4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala @@ -1,7 +1,7 @@ package eu.dnetlib.dhp.sx.graph import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.Result +import eu.dnetlib.dhp.schema.oaf.{Oaf, Result} import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils import org.apache.commons.io.IOUtils @@ -29,11 +29,12 @@ object SparkCreateSummaryObject { log.info(s"targetPath -> $targetPath") implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result] + implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] implicit val summaryEncoder:Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary] - val ds:Dataset[Result] = spark.read.load(s"$sourcePath/*").as[Result] + val ds:Dataset[Result] = spark.read.load(s"$sourcePath/*").as[Oaf].filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result]) ds.repartition(6000).map(r => ScholixUtils.resultToSummary(r)).filter(s => s!= null).write.mode(SaveMode.Overwrite).save(targetPath) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala index 82bf3c50e2..b2fddec203 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala @@ -57,10 +57,10 @@ object SparkResolveRelation { currentRelation }.write .mode(SaveMode.Overwrite) - .save(s"$workingPath/resolvedSource") + .save(s"$workingPath/relationResolvedSource") - val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/resolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) + val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/relationResolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_2")), "left").map{ m => val targetResolved = m._2 diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml index 4e601bff33..d8eb1fc80b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml @@ -35,7 +35,7 @@ --masteryarn --sourcePath${sourcePath} - --targetPath${targetPath}/entities + --targetPath${targetPath} @@ -87,7 +87,7 @@ --masteryarn --summaryPath${targetPath}/provision/summaries --targetPath${targetPath}/provision/scholix - --relationPath${sourcePath}/relation_resolved + --relationPath${targetPath}/relation From 848aabbb6cef47e941f739f475521011dfee6be9 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Sun, 25 Jul 2021 12:06:41 +0200 Subject: [PATCH 14/14] minor fix --- .../main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala | 1 + .../java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala index ba483bfb26..0a7fc18fb6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala @@ -42,6 +42,7 @@ object SparkCreateScholix { val relationDS: Dataset[(String, Relation)] = spark.read.load(relationPath).as[Relation] + .filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge")) .map(r => (r.getSource, r))(Encoders.tuple(Encoders.STRING, relEncoder)) val summaryDS: Dataset[(String, ScholixSummary)] = spark.read.load(summaryPath).as[ScholixSummary] diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala index ac189b6ba4..0970375f5c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala @@ -34,7 +34,7 @@ object SparkCreateSummaryObject { implicit val summaryEncoder:Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary] - val ds:Dataset[Result] = spark.read.load(s"$sourcePath/*").as[Oaf].filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result]) + val ds:Dataset[Result] = spark.read.load(s"$sourcePath/*").as[Result].filter(r=>r.getDataInfo== null || r.getDataInfo.getDeletedbyinference== false) ds.repartition(6000).map(r => ScholixUtils.resultToSummary(r)).filter(s => s!= null).write.mode(SaveMode.Overwrite).save(targetPath)