diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java index 0b602b7746..7a8e55a6ed 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java @@ -67,6 +67,7 @@ public class AuthorMerger { a -> a .getPid() .stream() + .filter(Objects::nonNull) .map(p -> new Tuple2<>(pidToComparableString(p), a))) .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1)); @@ -78,6 +79,7 @@ public class AuthorMerger { a -> a .getPid() .stream() + .filter(Objects::nonNull) .filter(p -> !basePidAuthorMap.containsKey(pidToComparableString(p))) .map(p -> new Tuple2<>(p, a))) .collect(Collectors.toList()); @@ -150,7 +152,7 @@ public class AuthorMerger { } private static boolean hasPid(Author a) { - if (a == null || a.getPid() == null || a.getPid().size() == 0) + if (a == null || a.getPid() == null || a.getPid().isEmpty()) return false; return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue())); } @@ -159,7 +161,10 @@ public class AuthorMerger { if (StringUtils.isNotBlank(author.getSurname())) { return new Person(author.getSurname() + ", " + author.getName(), false); } else { - return new Person(author.getFullname(), false); + if (StringUtils.isNotBlank(author.getFullname())) + return new Person(author.getFullname(), false); + else + return new Person("", false); } } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java index a75cc52e6a..e5181b1119 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java @@ -98,7 +98,7 @@ public class GraphCleaningFunctions extends CleaningFunctions { Result r = (Result) value; - if (Objects.nonNull(r.getTitle()) && r.getTitle().isEmpty()) { + if (Objects.isNull(r.getTitle()) || r.getTitle().isEmpty()) { return false; } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala index 0cdf0accb1..045927bed1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala @@ -532,11 +532,11 @@ object DataciteToOAFTransformation { JField("awardUri", JString(awardUri)) <- fundingReferences } yield awardUri + result.setId(IdentifierFactory.createIdentifier(result)) var relations: List[Relation] = awardUris.flatMap(a => get_projectRelation(a, result.getId)).filter(r => r != null) - fix_figshare(result) - result.setId(IdentifierFactory.createIdentifier(result)) + if (result.getId == null) return List() diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/datacite/record.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/datacite/record.json index 3ae10be738..f5aa659404 100644 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/datacite/record.json +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/datacite/record.json @@ -1 +1 @@ -{"id":"10.7282/t3-sjyd-1r46","type":"dois","attributes":{"doi":"10.7282/t3-sjyd-1r46","identifiers":[],"creators":[{"name":"Huang, Xu","nameType":"Personal","givenName":"Xu","familyName":"Huang","affiliation":[],"nameIdentifiers":[]},{"name":"Guo, Zhixiong","nameType":"Personal","givenName":"Zhixiong","familyName":"Guo","nameIdentifiers":[{"schemeUri":"https://orcid.org","nameIdentifier":"https://orcid.org/0000-0003-0481-2738","nameIdentifierScheme":"ORCID"}],"affiliation":[]}],"titles":[{"title":"High thermal conductance across c-BN/diamond interface"}],"publisher":"Rutgers University","container":{},"publicationYear":2099,"subjects":[{"subject":"Diamond"},{"subject":"Cubic boron nitride"},{"subject":"Thermal conductivity"},{"subject":"Interface"},{"subject":"Phonon"},{"subject":"Thermal conductance"}],"contributors":[],"dates":[{"date":"2099-12-31","dateType":"Accepted"},{"date":"2099","dateType":"Issued"}],"language":"en","types":{"ris":"RPRT","bibtex":"article","citeproc":"article-journal","schemaOrg":"ScholarlyArticle","resourceType":"Accepted manuscript","resourceTypeGeneral":"Text"},"relatedIdentifiers":[],"sizes":[],"formats":["application/pdf"],"version":null,"rightsList":[{"rights":"Embargo"}],"descriptions":[{"description":"High thermal conductivity electronic components with low interfacial thermal resistance are of technological importance and fundamental interest of research. Diamond, a superhard material with ultrahigh thermal conductivity at room temperature, is desirable for microelectronics thermal management. Cubic polymorph of boron nitride (c-BN) is a promising material due to wide bandgap and diamond like structure and properties. To understand the nature in thermal transport of diamond, c-BN and the most commonly used silicon (Si) semiconductor, ab initio phonon Boltzmann transport equations are employed to investigate lattice vibrational properties of these three materials. At 300 K, the predicted thermal conductivity of Si, diamond and c-BN reached 142, 2112, and 736 W/(m��K), respectively. What's more, heat transport phenomena across the interfaces of Si/diamond, c-BN/diamond and Si/c-BN are unfolded. In comparison, the interfacial thermal conductance of c-BN/diamond is ten-fold of Si/diamond; besides, the thermal conductance across Si/c-BN interface is 20.2% larger than that of Si/diamond at 300 K and 18.9% larger at 340 K. These findings provide us new vision and potential solution to heat dissipation of high-local-power density devices, shedding light on future thermal management of c-BN and diamond related electronics.","descriptionType":"Abstract"}],"geoLocations":[],"fundingReferences":[],"url":"https://scholarship.libraries.rutgers.edu/discovery/fulldisplay/alma991031549917804646/01RUT_INST:ResearchRepository","contentUrl":null,"metadataVersion":1,"schemaVersion":"http://datacite.org/schema/kernel-4","source":"mds","isActive":true,"state":"findable","reason":null,"viewCount":0,"downloadCount":0,"referenceCount":0,"citationCount":0,"partCount":0,"partOfCount":0,"versionCount":0,"versionOfCount":0,"created":"2020-06-30T21:12:19Z","registered":"2020-07-02T16:45:07Z","published":null,"updated":"2021-01-14T18:24:19Z"},"relationships":{"client":{"data":{"id":"rutgers.lib","type":"clients"}}}} \ No newline at end of file +{"id":"10.5517/ccdc.csd.cc25rpzm","type":"dois","attributes":{"doi":"10.5517/ccdc.csd.cc25rpzm","prefix":"10.5517","suffix":"ccdc.csd.cc25rpzm","identifiers":[{"identifier":"2018781","identifierType":"CCDC"}],"alternateIdentifiers":[{"alternateIdentifierType":"CCDC","alternateIdentifier":"2018781"}],"creators":[{"name":"Ling, Irene","affiliation":[],"nameIdentifiers":[]},{"name":"Sobolev, Alexandre N.","affiliation":[],"nameIdentifiers":[]},{"name":"Raston, Colin L.","affiliation":[],"nameIdentifiers":[]}],"titles":[{"title":"CCDC 2018781: Experimental Crystal Structure Determination"}],"publisher":"Cambridge Crystallographic Data Centre","container":{},"publicationYear":2021,"subjects":[{"subject":"Crystal Structure"},{"subject":"Experimental 3D Coordinates"},{"subject":"Crystal System"},{"subject":"Space Group"},{"subject":"Cell Parameters"},{"subject":"Crystallography"},{"subject":"bis[penta-aqua-copper(ii)] bis(mu-5,11,17,23-tetra-sulfonato-25,26,27,28-tetrahydroxycalix(4)arene)-dodeca-aqua-tri-copper(ii) bis(nitrate) heptahydrate"}],"contributors":[],"dates":[],"language":"en","types":{"ris":"DATA","bibtex":"misc","citeproc":"dataset","schemaOrg":"Dataset","resourceTypeGeneral":"Dataset"},"relatedIdentifiers":[{"relationType":"IsSupplementTo","relatedIdentifier":"10.1080/00958972.2020.1849642","relatedIdentifierType":"DOI"}],"sizes":[],"formats":["CIF"],"version":null,"rightsList":[],"descriptions":[{"description":"Related Article: Irene Ling, Alexandre N. Sobolev, Colin L. Raston|2021|J.Coord.Chem.|74|40|doi:10.1080/00958972.2020.1849642","descriptionType":"Other"}],"geoLocations":[],"fundingReferences":[],"xml":"PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiPz4KPHJlc291cmNlIHhtbG5zOnhzaT0iaHR0cDovL3d3dy53My5vcmcvMjAwMS9YTUxTY2hlbWEtaW5zdGFuY2UiIHhtbG5zPSJodHRwOi8vZGF0YWNpdGUub3JnL3NjaGVtYS9rZXJuZWwtNCIgeHNpOnNjaGVtYUxvY2F0aW9uPSJodHRwOi8vZGF0YWNpdGUub3JnL3NjaGVtYS9rZXJuZWwtNCBodHRwOi8vc2NoZW1hLmRhdGFjaXRlLm9yZy9tZXRhL2tlcm5lbC00L21ldGFkYXRhLnhzZCI+CiAgPGlkZW50aWZpZXIgaWRlbnRpZmllclR5cGU9IkRPSSI+MTAuNTUxNy9DQ0RDLkNTRC5DQzI1UlBaTTwvaWRlbnRpZmllcj4KICA8Y3JlYXRvcnM+CiAgICA8Y3JlYXRvcj4KICAgICAgPGNyZWF0b3JOYW1lPkxpbmcsIElyZW5lPC9jcmVhdG9yTmFtZT4KICAgIDwvY3JlYXRvcj4KICAgIDxjcmVhdG9yPgogICAgICA8Y3JlYXRvck5hbWU+U29ib2xldiwgQWxleGFuZHJlIE4uPC9jcmVhdG9yTmFtZT4KICAgIDwvY3JlYXRvcj4KICAgIDxjcmVhdG9yPgogICAgICA8Y3JlYXRvck5hbWU+UmFzdG9uLCBDb2xpbiBMLjwvY3JlYXRvck5hbWU+CiAgICA8L2NyZWF0b3I+CiAgPC9jcmVhdG9ycz4KICA8dGl0bGVzPgogICAgPHRpdGxlPkNDREMgMjAxODc4MTogRXhwZXJpbWVudGFsIENyeXN0YWwgU3RydWN0dXJlIERldGVybWluYXRpb248L3RpdGxlPgogIDwvdGl0bGVzPgogIDxwdWJsaXNoZXI+Q2FtYnJpZGdlIENyeXN0YWxsb2dyYXBoaWMgRGF0YSBDZW50cmU8L3B1Ymxpc2hlcj4KICA8cHVibGljYXRpb25ZZWFyPjIwMjE8L3B1YmxpY2F0aW9uWWVhcj4KICA8cmVzb3VyY2VUeXBlIHJlc291cmNlVHlwZUdlbmVyYWw9IkRhdGFzZXQiLz4KICA8c3ViamVjdHM+CiAgICA8c3ViamVjdD5DcnlzdGFsIFN0cnVjdHVyZTwvc3ViamVjdD4KICAgIDxzdWJqZWN0PkV4cGVyaW1lbnRhbCAzRCBDb29yZGluYXRlczwvc3ViamVjdD4KICAgIDxzdWJqZWN0PkNyeXN0YWwgU3lzdGVtPC9zdWJqZWN0PgogICAgPHN1YmplY3Q+U3BhY2UgR3JvdXA8L3N1YmplY3Q+CiAgICA8c3ViamVjdD5DZWxsIFBhcmFtZXRlcnM8L3N1YmplY3Q+CiAgICA8c3ViamVjdD5DcnlzdGFsbG9ncmFwaHk8L3N1YmplY3Q+CiAgICA8c3ViamVjdD5iaXNbcGVudGEtYXF1YS1jb3BwZXIoaWkpXSBiaXMobXUtNSwxMSwxNywyMy10ZXRyYS1zdWxmb25hdG8tMjUsMjYsMjcsMjgtdGV0cmFoeWRyb3h5Y2FsaXgoNClhcmVuZSktZG9kZWNhLWFxdWEtdHJpLWNvcHBlcihpaSkgYmlzKG5pdHJhdGUpIGhlcHRhaHlkcmF0ZTwvc3ViamVjdD4KICA8L3N1YmplY3RzPgogIDxsYW5ndWFnZT5lbmc8L2xhbmd1YWdlPgogIDxhbHRlcm5hdGVJZGVudGlmaWVycz4KICAgIDxhbHRlcm5hdGVJZGVudGlmaWVyIGFsdGVybmF0ZUlkZW50aWZpZXJUeXBlPSJDQ0RDIj4yMDE4NzgxPC9hbHRlcm5hdGVJZGVudGlmaWVyPgogIDwvYWx0ZXJuYXRlSWRlbnRpZmllcnM+CiAgPHJlbGF0ZWRJZGVudGlmaWVycz4KICAgIDxyZWxhdGVkSWRlbnRpZmllciByZWxhdGVkSWRlbnRpZmllclR5cGU9IkRPSSIgcmVsYXRpb25UeXBlPSJJc1N1cHBsZW1lbnRUbyI+MTAuMTA4MC8wMDk1ODk3Mi4yMDIwLjE4NDk2NDI8L3JlbGF0ZWRJZGVudGlmaWVyPgogIDwvcmVsYXRlZElkZW50aWZpZXJzPgogIDxzaXplcy8+CiAgPGZvcm1hdHM+CiAgICA8Zm9ybWF0PkNJRjwvZm9ybWF0PgogIDwvZm9ybWF0cz4KICA8dmVyc2lvbi8+CiAgPGRlc2NyaXB0aW9ucz4KICAgIDxkZXNjcmlwdGlvbiBkZXNjcmlwdGlvblR5cGU9Ik90aGVyIj5SZWxhdGVkIEFydGljbGU6IElyZW5lIExpbmcsICBBbGV4YW5kcmUgTi4gU29ib2xldiwgIENvbGluIEwuIFJhc3RvbnwyMDIxfEouQ29vcmQuQ2hlbS58NzR8NDB8ZG9pOjEwLjEwODAvMDA5NTg5NzIuMjAyMC4xODQ5NjQyPC9kZXNjcmlwdGlvbj4KICA8L2Rlc2NyaXB0aW9ucz4KPC9yZXNvdXJjZT4K","url":"http://www.ccdc.cam.ac.uk/services/structure_request?id=doi:10.5517/ccdc.csd.cc25rpzm&sid=DataCite","contentUrl":null,"metadataVersion":3,"schemaVersion":"http://datacite.org/schema/kernel-4","source":"api","isActive":true,"state":"findable","reason":null,"viewCount":0,"viewsOverTime":[],"downloadCount":0,"downloadsOverTime":[],"referenceCount":0,"citationCount":0,"citationsOverTime":[],"partCount":0,"partOfCount":0,"versionCount":0,"versionOfCount":0,"created":"2021-03-09T13:25:35.000Z","registered":"2021-03-09T13:25:36.000Z","published":"2021","updated":"2021-03-31T21:49:56.000Z"},"relationships":{"client":{"data":{"id":"ccdc.csd","type":"clients"}},"provider":{"data":{"id":"ccdc","type":"providers"}},"media":{"data":{"id":"10.5517/ccdc.csd.cc25rpzm","type":"media"}},"references":{"data":[]},"citations":{"data":[]},"parts":{"data":[]},"partOf":{"data":[]},"versions":{"data":[]},"versionOf":{"data":[]}}} \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java index 58009bfcfc..3f27b94422 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java @@ -38,7 +38,8 @@ import scala.Tuple2; /** * Groups the graph content by entity identifier to ensure ID uniqueness */ -public class GroupEntitiesSparkJob { +public class +GroupEntitiesSparkJob { private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java index 03709c8fec..fdef7f77d4 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; @@ -77,48 +78,54 @@ public class SparkUpdateEntity extends AbstractSparkAction { (type, clazz) -> { final String outputPath = dedupGraphPath + "/" + type; removeOutputDir(spark, outputPath); + final String ip = DedupUtility.createEntityPath(graphBasePath, type.toString()); + if (HdfsSupport.exists(ip, sc.hadoopConfiguration())) { + JavaRDD sourceEntity = sc + .textFile(DedupUtility.createEntityPath(graphBasePath, type.toString())); - JavaRDD sourceEntity = sc - .textFile(DedupUtility.createEntityPath(graphBasePath, type.toString())); + if (mergeRelExists(workingPath, type.toString())) { - if (mergeRelExists(workingPath, type.toString())) { + final String mergeRelPath = DedupUtility + .createMergeRelPath(workingPath, "*", type.toString()); + final String dedupRecordPath = DedupUtility + .createDedupRecordPath(workingPath, "*", type.toString()); - final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, "*", type.toString()); - final String dedupRecordPath = DedupUtility - .createDedupRecordPath(workingPath, "*", type.toString()); + final Dataset rel = spark + .read() + .load(mergeRelPath) + .as(Encoders.bean(Relation.class)); - final Dataset rel = spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class)); + final JavaPairRDD mergedIds = rel + .where("relClass == 'merges'") + .where("source != target") + .select(rel.col("target")) + .distinct() + .toJavaRDD() + .mapToPair( + (PairFunction) r -> new Tuple2<>(r.getString(0), "d")); - final JavaPairRDD mergedIds = rel - .where("relClass == 'merges'") - .where("source != target") - .select(rel.col("target")) - .distinct() - .toJavaRDD() - .mapToPair( - (PairFunction) r -> new Tuple2<>(r.getString(0), "d")); + JavaPairRDD entitiesWithId = sourceEntity + .mapToPair( + (PairFunction) s -> new Tuple2<>( + MapDocumentUtil.getJPathString(IDJSONPATH, s), s)); + if (type == EntityType.organization) // exclude root records from organizations + entitiesWithId = excludeRootOrgs(entitiesWithId, rel); - JavaPairRDD entitiesWithId = sourceEntity - .mapToPair( - (PairFunction) s -> new Tuple2<>( - MapDocumentUtil.getJPathString(IDJSONPATH, s), s)); - if (type == EntityType.organization) // exclude root records from organizations - entitiesWithId = excludeRootOrgs(entitiesWithId, rel); + JavaRDD map = entitiesWithId + .leftOuterJoin(mergedIds) + .map(k -> { + if (k._2()._2().isPresent()) { + return updateDeletedByInference(k._2()._1(), clazz); + } + return k._2()._1(); + }); - JavaRDD map = entitiesWithId - .leftOuterJoin(mergedIds) - .map(k -> { - if (k._2()._2().isPresent()) { - return updateDeletedByInference(k._2()._1(), clazz); - } - return k._2()._1(); - }); + sourceEntity = map.union(sc.textFile(dedupRecordPath)); - sourceEntity = map.union(sc.textFile(dedupRecordPath)); + } + sourceEntity.saveAsTextFile(outputPath, GzipCodec.class); } - - sourceEntity.saveAsTextFile(outputPath, GzipCodec.class); }); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertDatasetToJsonRDD.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertDatasetToJsonRDD.scala new file mode 100644 index 0000000000..3ee0c7dd6a --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertDatasetToJsonRDD.scala @@ -0,0 +1,42 @@ +package eu.dnetlib.dhp.sx.graph + +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.{Oaf, OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset} +import org.apache.commons.io.IOUtils +import org.apache.hadoop.io.compress.GzipCodec +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +object SparkConvertDatasetToJsonRDD { + + + def main(args: Array[String]): Unit = { + val log: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + val sourcePath = parser.get("sourcePath") + log.info(s"sourcePath -> $sourcePath") + val targetPath = parser.get("targetPath") + log.info(s"targetPath -> $targetPath") + + val resultObject = List("publication","dataset","software", "otherResearchProduct") + val mapper = new ObjectMapper() + implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) + + + resultObject.foreach{item => + spark.read.load(s"$sourcePath/$item").as[Result].map(r=> mapper.writeValueAsString(r))(Encoders.STRING).rdd.saveAsTextFile(s"$targetPath/${item.toLowerCase}", classOf[GzipCodec]) + } + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala new file mode 100644 index 0000000000..cb41d6134c --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala @@ -0,0 +1,67 @@ +package eu.dnetlib.dhp.sx.graph + +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset} +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.{Logger, LoggerFactory} +object SparkConvertRDDtoDataset { + + def main(args: Array[String]): Unit = { + + + val log: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + val sourcePath = parser.get("sourcePath") + log.info(s"sourcePath -> $sourcePath") + val t = parser.get("targetPath") + log.info(s"targetPath -> $t") + + val entityPath = s"$t/entities" + val relPath = s"$t/relation" + val mapper = new ObjectMapper() + implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset]) + implicit val publicationEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication]) + implicit val relationEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) + implicit val orpEncoder: Encoder[OtherResearchProduct] = Encoders.kryo(classOf[OtherResearchProduct]) + implicit val softwareEncoder: Encoder[Software] = Encoders.kryo(classOf[Software]) + + + log.info("Converting dataset") + val rddDataset =spark.sparkContext.textFile(s"$sourcePath/dataset").map(s => mapper.readValue(s, classOf[OafDataset])) + spark.createDataset(rddDataset).as[OafDataset].write.mode(SaveMode.Overwrite).save(s"$entityPath/dataset") + + + log.info("Converting publication") + val rddPublication =spark.sparkContext.textFile(s"$sourcePath/publication").map(s => mapper.readValue(s, classOf[Publication])) + spark.createDataset(rddPublication).as[Publication].write.mode(SaveMode.Overwrite).save(s"$entityPath/publication") + + log.info("Converting software") + val rddSoftware =spark.sparkContext.textFile(s"$sourcePath/software").map(s => mapper.readValue(s, classOf[Software])) + spark.createDataset(rddSoftware).as[Software].write.mode(SaveMode.Overwrite).save(s"$entityPath/software") + + log.info("Converting otherresearchproduct") + val rddOtherResearchProduct =spark.sparkContext.textFile(s"$sourcePath/otherresearchproduct").map(s => mapper.readValue(s, classOf[OtherResearchProduct])) + spark.createDataset(rddOtherResearchProduct).as[OtherResearchProduct].write.mode(SaveMode.Overwrite).save(s"$entityPath/otherresearchproduct") + + + log.info("Converting Relation") + + + val rddRelation =spark.sparkContext.textFile(s"$sourcePath/relation").map(s => mapper.readValue(s, classOf[Relation])) + spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") + + + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala index a37dd21328..350b00c5ea 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala @@ -70,7 +70,7 @@ object SparkCreateInputGraph { resultObject.foreach { r => log.info(s"Make ${r._1} unique") - makeDatasetUnique(s"$targetPath/extracted/${r._1}",s"$targetPath/dedup/${r._1}",spark, r._2) + makeDatasetUnique(s"$targetPath/extracted/${r._1}",s"$targetPath/preprocess/${r._1}",spark, r._2) } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala index ba483bfb26..0a7fc18fb6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala @@ -42,6 +42,7 @@ object SparkCreateScholix { val relationDS: Dataset[(String, Relation)] = spark.read.load(relationPath).as[Relation] + .filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge")) .map(r => (r.getSource, r))(Encoders.tuple(Encoders.STRING, relEncoder)) val summaryDS: Dataset[(String, ScholixSummary)] = spark.read.load(summaryPath).as[ScholixSummary] diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala index a66da3e6d8..0970375f5c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala @@ -1,7 +1,7 @@ package eu.dnetlib.dhp.sx.graph import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.Result +import eu.dnetlib.dhp.schema.oaf.{Oaf, Result} import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils import org.apache.commons.io.IOUtils @@ -29,11 +29,12 @@ object SparkCreateSummaryObject { log.info(s"targetPath -> $targetPath") implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result] + implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] implicit val summaryEncoder:Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary] - val ds:Dataset[Result] = spark.read.load(s"$sourcePath/*").as[Result] + val ds:Dataset[Result] = spark.read.load(s"$sourcePath/*").as[Result].filter(r=>r.getDataInfo== null || r.getDataInfo.getDeletedbyinference== false) ds.repartition(6000).map(r => ScholixUtils.resultToSummary(r)).filter(s => s!= null).write.mode(SaveMode.Overwrite).save(targetPath) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala index 6ee575e2a8..b2fddec203 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala @@ -1,10 +1,17 @@ package eu.dnetlib.dhp.sx.graph +import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.{Relation, Result} import org.apache.commons.io.IOUtils +import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD import org.apache.spark.sql._ +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.JsonAST.{JField, JObject, JString} +import org.json4s.jackson.JsonMethods.parse import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ @@ -25,60 +32,109 @@ object SparkResolveRelation { val relationPath = parser.get("relationPath") log.info(s"sourcePath -> $relationPath") val entityPath = parser.get("entityPath") - log.info(s"targetPath -> $entityPath") + log.info(s"entityPath -> $entityPath") val workingPath = parser.get("workingPath") log.info(s"workingPath -> $workingPath") - - implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) import spark.implicits._ - val entities:Dataset[Result] = spark.read.load(s"$entityPath/*").as[Result] - entities.flatMap(e => e.getPid.asScala - .map(p => - convertPidToDNETIdentifier(p.getValue, p.getQualifier.getClassid)) - .filter(s => s!= null) - .map(s => (s,e.getId)) - ).groupByKey(_._1) - .reduceGroups((x,y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y) - .map(s =>s._2) - .write - .mode(SaveMode.Overwrite) - .save(s"$workingPath/resolvedPid") - val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/resolvedPid").as[(String,String)] + extractPidResolvedTableFromJsonRDD(spark, entityPath, workingPath) + + val mappper = new ObjectMapper() + + val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String,String)] val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) - relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_1")), "left").map{ + relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_2")), "left").map{ m => val sourceResolved = m._2 val currentRelation = m._1._2 - if (sourceResolved!=null && sourceResolved._2.nonEmpty) - currentRelation.setSource(sourceResolved._2) + if (sourceResolved!=null && sourceResolved._1!=null && sourceResolved._1.nonEmpty) + currentRelation.setSource(sourceResolved._1) currentRelation }.write .mode(SaveMode.Overwrite) - .save(s"$workingPath/resolvedSource") + .save(s"$workingPath/relationResolvedSource") - val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/resolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) - relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_1")), "left").map{ + val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/relationResolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) + relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_2")), "left").map{ m => val targetResolved = m._2 val currentRelation = m._1._2 - if (targetResolved!=null && targetResolved._2.nonEmpty) - currentRelation.setTarget(targetResolved._2) + if (targetResolved!=null && targetResolved._1.nonEmpty) + currentRelation.setTarget(targetResolved._1) currentRelation }.filter(r => r.getSource.startsWith("50")&& r.getTarget.startsWith("50")) .write .mode(SaveMode.Overwrite) - .save(s"$workingPath/resolvedRelation") + .save(s"$workingPath/relation_resolved") + + spark.read.load(s"$workingPath/relation_resolved").as[Relation] + .map(r => mappper.writeValueAsString(r)) + .rdd.saveAsTextFile(s"$workingPath/relation", classOf[GzipCodec]) + } + private def extractPidsFromRecord(input:String):(String,List[(String,String)]) = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + val id:String = (json \ "id").extract[String] + val result: List[(String,String)] = for { + JObject(pids) <- json \ "pid" + JField("value", JString(pidValue)) <- pids + JField("qualifier", JObject(qualifier)) <- pids + JField("classname", JString(pidType)) <- qualifier + } yield (pidValue, pidType) + (id,result) + } + private def extractPidResolvedTableFromJsonRDD(spark: SparkSession, entityPath: String, workingPath: String) = { + import spark.implicits._ + + val d: RDD[(String,String)] = spark.sparkContext.textFile(s"$entityPath/*") + .map(i => extractPidsFromRecord(i)) + .filter(s => s != null && s._1!= null && s._2!=null && s._2.nonEmpty) + .flatMap{ p => + p._2.map(pid => + (p._1, convertPidToDNETIdentifier(pid._1, pid._2)) + ) + }.filter(r =>r._1 != null || r._2 != null) + + spark.createDataset(d) + .groupByKey(_._2) + .reduceGroups((x, y) => if (x._1.startsWith("50|doi") || x._1.startsWith("50|pmid")) x else y) + .map(s => s._2) + .write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/relationResolvedPid") + } + + + /* + This method should be used once we finally convert everythings in Kryo dataset + instead of using rdd of json + */ + private def extractPidResolvedTableFromKryo(spark: SparkSession, entityPath: String, workingPath: String) = { + import spark.implicits._ + implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) + val entities: Dataset[Result] = spark.read.load(s"$entityPath/*").as[Result] + entities.flatMap(e => e.getPid.asScala + .map(p => + convertPidToDNETIdentifier(p.getValue, p.getQualifier.getClassid)) + .filter(s => s != null) + .map(s => (s, e.getId)) + ).groupByKey(_._1) + .reduceGroups((x, y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y) + .map(s => s._2) + .write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/relationResolvedPid") + } def convertPidToDNETIdentifier(pid:String, pidType: String):String = { if (pid==null || pid.isEmpty || pidType== null || pidType.isEmpty) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json new file mode 100644 index 0000000000..8bfdde5b0d --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json @@ -0,0 +1,5 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/workflow.xml new file mode 100644 index 0000000000..685976ce69 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/workflow.xml @@ -0,0 +1,85 @@ + + + + sourcePath + the working dir base path + + + targetPath + the graph Raw base path + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Extract entities in raw graph + eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=2000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --sourcePath${sourcePath} + --targetPath${targetPath} + + + + + + + + + + + + + + + + + yarn + cluster + Generate Input Graph for deduplication + eu.dnetlib.dhp.sx.graph.SparkConvertDatasetToJsonRDD + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=3000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --sourcePath${targetPath}/preprocess + --targetPath${targetPath}/dedup + + + + + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml similarity index 78% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml index 3ea4e9d30d..d8eb1fc80b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + sourcePath @@ -6,48 +6,22 @@ targetPath - the graph Raw base path + the final graph path - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - Extract entities in raw graph - eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=2000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn - --sourcePath${sourcePath} - --targetPath${targetPath} - - - - - - - - - yarn - cluster - Resolve Relations in raw graph - eu.dnetlib.dhp.sx.graph.SparkResolveRelation + Import JSONRDD to Dataset kryo + eu.dnetlib.dhp.sx.graph.SparkConvertRDDtoDataset dhp-graph-mapper-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -60,9 +34,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --masteryarn - --relationPath${targetPath}/extracted/relation - --workingPath${targetPath}/resolved/ - --entityPath${targetPath}/dedup + --sourcePath${sourcePath} + --targetPath${targetPath} @@ -87,7 +60,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --masteryarn - --sourcePath${targetPath}/dedup + --sourcePath${targetPath}/entities --targetPath${targetPath}/provision/summaries @@ -114,7 +87,7 @@ --masteryarn --summaryPath${targetPath}/provision/summaries --targetPath${targetPath}/provision/scholix - --relationPath${targetPath}/resolved/resolvedRelation + --relationPath${targetPath}/relation @@ -182,9 +155,5 @@ - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml new file mode 100644 index 0000000000..7683ff94cd --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml @@ -0,0 +1,62 @@ + + + + entityPath + the path of deduplicate Entities + + + relationPath + the path of relation unresolved + + + targetPath + the path of relation unresolved + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + + + + yarn + cluster + Resolve Relations in raw graph + eu.dnetlib.dhp.sx.graph.SparkResolveRelation + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=3000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --relationPath${relationPath} + --workingPath${targetPath} + --entityPath${entityPath} + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml deleted file mode 100644 index 9d06c42d62..0000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml +++ /dev/null @@ -1,120 +0,0 @@ - - - - workingPath - the working path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - Extract DLI Entities (Publication) - eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=5000 - ${sparkExtraOPT} - - -mt yarn-cluster - --workingPath${workingPath} - -epublication - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - Extract DLI Entities (Dataset) - eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=5000 - ${sparkExtraOPT} - - -mt yarn-cluster - --workingPath${workingPath} - -edataset - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - Extract DLI Entities (Unknown) - eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=5000 - ${sparkExtraOPT} - - -mt yarn-cluster - --workingPath${workingPath} - -eunknown - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - Extract DLI Entities (Relation) - eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=5000 - ${sparkExtraOPT} - - -mt yarn-cluster - --workingPath${workingPath} - -erelation - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/workflow.xml deleted file mode 100644 index 4d54b2afbc..0000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/workflow.xml +++ /dev/null @@ -1,61 +0,0 @@ - - - - sourcePath - the source path - - - targetPath - the source path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - entity - the entity to be merged - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - Merge ${entity} - eu.dnetlib.dhp.sx.graph.SparkScholexplorerCreateRawGraphJob - dhp-graph-mapper-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} - -mt yarn-cluster - --sourcePath${sourcePath}/${entity} - --targetPath${targetPath}/${entity} - --entity${entity} - - - - - - - \ No newline at end of file