From cc5bbafa5dcc1fee8626b3381892cf3f1e1f2aa3 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 17 Mar 2021 12:12:56 +0100 Subject: [PATCH] some fix to make workflows runs --- .../dhp/schema/common/ModelConstants.java | 2 +- .../doiboost/crossref/Crossref2Oaf.scala | 2 +- .../dnetlib/doiboost/mag/MagDataModel.scala | 2 +- .../doiboost/mag/SparkProcessMAG.scala | 2 +- .../dnetlib/doiboost/uw/UnpayWallToOAF.scala | 16 ++- .../dhp/doiboost/oozie_app/workflow.xml | 123 +++++++++++------- .../doiboost/uw/UnpayWallMappingTest.scala | 2 +- 7 files changed, 92 insertions(+), 57 deletions(-) diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java index 1a0117fb9..6c4de6342 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java @@ -18,7 +18,7 @@ public class ModelConstants { public static final String PUBMED_CENTRAL_ID = "10|opendoar____::eda80a3d5b344bc40f3bc04f65b7a357"; public static final String ARXIV_ID = "10|opendoar____::6f4922f45568161a8cdf4ad2299f6d23"; - //VOCABULARY VALUE + // VOCABULARY VALUE public static final String ACCESS_RIGHT_OPEN = "OPEN"; public static final String DNET_SUBJECT_TYPOLOGIES = "dnet:subject_classification_typologies"; diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index 79194b283..2e30add3a 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -180,7 +180,7 @@ case object Crossref2Oaf { // Ticket #6281 added pid to Instance - instance.setPid(result.getPid.asScala.filter(p => p.getQualifier.getClassid.equalsIgnoreCase("doi")).asJava) + instance.setPid(result.getPid) val has_review = (json \ "relation" \"has-review" \ "id") diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala index 987a81fba..c4b28505f 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala @@ -172,7 +172,7 @@ case object ConversionUtil { i.setUrl(List(s"https://academic.microsoft.com/#/detail/${extractMagIdentifier(pub.getOriginalId.asScala)}").asJava) // Ticket #6281 added pid to Instance - i.setPid(pub.getPid.asScala.filter(p => p.getQualifier.getClassid.equalsIgnoreCase("doi")).asJava) + i.setPid(pub.getPid) i.setCollectedfrom(createMAGCollectedFrom()) pub.setInstance(List(i).asJava) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala index bc1982e77..173e33360 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala @@ -67,7 +67,7 @@ object SparkProcessMAG { MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName, mpa.sequenceNumber) } else mpa - }).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation")).as("authors")) + }).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation", $"sequenceNumber")).as("authors")) .write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_1_paper_authors") logger.info("Phase 4) create First Version of publication Entity with Paper Journal and Authors") diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala index 0972d55de..6e4b2400f 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala @@ -65,11 +65,7 @@ object UnpayWallToOAF { val colour = get_color(is_oa, oaLocation, journal_is_oa) pub.setPid(List(createSP(doi, "doi", PID_TYPES)).asJava) - //IMPORTANT - //The old method pub.setId(IdentifierFactory.createIdentifier(pub)) - //will be replaced using IdentifierFactory - //pub.setId(generateIdentifier(pub, doi.toLowerCase)) - pub.setId(IdentifierFactory.createIdentifier(pub)) + pub.setCollectedfrom(List(createUnpayWallCollectedFrom()).asJava) @@ -87,7 +83,7 @@ object UnpayWallToOAF { i.setUrl(List(oaLocation.url.get).asJava) // Ticket #6281 added pid to Instance - i.setPid(pub.getPid.asScala.filter(p => p.getQualifier.getClassid.equalsIgnoreCase("doi")).asJava) + i.setPid(pub.getPid) if (oaLocation.license.isDefined) i.setLicense(asField(oaLocation.license.get)) @@ -104,6 +100,14 @@ object UnpayWallToOAF { i.setAccessright(a) } pub.setInstance(List(i).asJava) + + //IMPORTANT + //The old method pub.setId(IdentifierFactory.createIdentifier(pub)) + //will be replaced using IdentifierFactory + //pub.setId(generateIdentifier(pub, doi.toLowerCase)) + val id = IdentifierFactory.createIdentifier(pub) + logger.info(id); + pub.setId(IdentifierFactory.createIdentifier(pub)) pub } diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml index 3f5805b62..cc260d7c0 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml @@ -54,6 +54,11 @@ + + MAGDumpPath + the MAG dump working path + + inputPathMAG the MAG working path @@ -132,7 +137,10 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --workingPath${inputPathCrossref} --masteryarn-cluster @@ -147,6 +155,43 @@ + + + + + + + + + + + + + + + + + + + yarn-cluster + cluster + Convert Mag to Dataset + eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${MAGDumpPath} + --targetPath${inputPathMAG}/dataset + --masteryarn-cluster + @@ -164,46 +209,15 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --sourcePath${inputPathCrossref}/crossref_ds --targetPath${workingPath} --masteryarn-cluster - - - - - - - - - - - - - - - - - - - - yarn-cluster - cluster - Convert Mag to Dataset - eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - ${sparkExtraOPT} - - --sourcePath${inputPathMAG}/input - --targetPath${inputPathMAG}/dataset - --masteryarn-cluster - @@ -216,11 +230,14 @@ eu.dnetlib.doiboost.mag.SparkProcessMAG dhp-doiboost-${projectVersion}.jar - --executor-memory=${sparkExecutorMemory} + --executor-memory=${sparkExecutorIntersectionMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --sourcePath${inputPathMAG}/dataset --workingPath${inputPathMAG}/process @@ -245,10 +262,14 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --sourcePath${inputPathUnpayWall}/uw_extracted - --targetPath${workingPath} + --targetPath${workingPath}/uwPublication --masteryarn-cluster @@ -268,10 +289,13 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --sourcePath${inputPathOrcid} - --targetPath${workingPath} + --targetPath${workingPath}/orcidPublication --masteryarn-cluster @@ -291,11 +315,15 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --hostedByMapPath${hostedByMapPath} - --affiliationPath${inputPathMAG}/process/Affiliations - --paperAffiliationPath${inputPathMAG}/process/PaperAuthorAffiliations + --affiliationPath${inputPathMAG}/dataset/Affiliations + --paperAffiliationPath${inputPathMAG}/dataset/PaperAuthorAffiliations --workingPath${workingPath} --masteryarn-cluster @@ -316,7 +344,10 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --dbPublicationPath${workingPath}/doiBoostPublicationFiltered --dbDatasetPath${workingPath}/crossrefDataset diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/uw/UnpayWallMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/uw/UnpayWallMappingTest.scala index 3e4acdffb..94682d142 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/uw/UnpayWallMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/uw/UnpayWallMappingTest.scala @@ -28,7 +28,7 @@ class UnpayWallMappingTest { if(p!= null) { assertTrue(p.getPid.size()==1) - logger.info(p.getId) + logger.info("ID :",p.getId) } assertNotNull(line) assertTrue(line.nonEmpty)