diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java index 398b842c8..31f60636a 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java @@ -10,12 +10,12 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; -import eu.dnetlib.dhp.schema.common.ModelSupport; import org.apache.commons.lang3.StringUtils; import com.google.common.collect.HashBiMap; import com.google.common.collect.Maps; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.utils.DHPUtils; @@ -60,6 +60,37 @@ public class IdentifierFactory implements Serializable { return pidFromInstance(pid, collectedFrom).distinct().collect(Collectors.toList()); } + public static String createDOIBoostIdentifier(T entity) { + if (entity == null) + return null; + + StructuredProperty pid = null; + if (entity.getPid() != null) { + pid = entity + .getPid() + .stream() + .filter(Objects::nonNull) + .filter(s -> s.getQualifier() != null && "doi".equalsIgnoreCase(s.getQualifier().getClassid())) + .filter(IdentifierFactory::pidFilter) + .findAny() + .orElse(null); + } else { + if (entity.getInstance() != null) { + pid = entity + .getInstance() + .stream() + .filter(i -> i.getPid() != null) + .flatMap(i -> i.getPid().stream()) + .filter(IdentifierFactory::pidFilter) + .findAny() + .orElse(null); + } + } + if (pid != null) + return idFromPid(entity, pid, true); + return null; + } + /** * Creates an identifier from the most relevant PID (if available) provided by a known PID authority in the given * entity T. Returns entity.id when none of the PIDs meet the selection criteria is available. diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala index efe8453a2..32c58595f 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala @@ -196,6 +196,8 @@ object DoiBoostMappingUtil { //Case empty publication if (publication == null) return false + if (publication.getId == null || publication.getId.isEmpty) + return false //Case publication with no title if (publication.getTitle == null || publication.getTitle.size == 0) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index 79194b283..43b3f7e1c 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -180,7 +180,7 @@ case object Crossref2Oaf { // Ticket #6281 added pid to Instance - instance.setPid(result.getPid.asScala.filter(p => p.getQualifier.getClassid.equalsIgnoreCase("doi")).asJava) + instance.setPid(result.getPid) val has_review = (json \ "relation" \"has-review" \ "id") @@ -206,6 +206,9 @@ case object Crossref2Oaf { val links: List[String] = ((for {JString(url) <- json \ "link" \ "URL"} yield url) ::: List(s)).filter(p => p != null).distinct if (links.nonEmpty) instance.setUrl(links.asJava) + result.setId(IdentifierFactory.createDOIBoostIdentifier(result)) + if (result.getId== null) + return null result } @@ -240,6 +243,8 @@ case object Crossref2Oaf { return List() val cOBJCategory = mappingCrossrefSubType.getOrElse(objectType, mappingCrossrefSubType.getOrElse(objectSubType, "0038 Other literature type")); mappingResult(result, json, cOBJCategory) + if (result == null) + return List() val funderList: List[mappingFunder] = (json \ "funder").extractOrElse[List[mappingFunder]](List()) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala index 987a81fba..6a8ae9928 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala @@ -172,7 +172,7 @@ case object ConversionUtil { i.setUrl(List(s"https://academic.microsoft.com/#/detail/${extractMagIdentifier(pub.getOriginalId.asScala)}").asJava) // Ticket #6281 added pid to Instance - i.setPid(pub.getPid.asScala.filter(p => p.getQualifier.getClassid.equalsIgnoreCase("doi")).asJava) + i.setPid(pub.getPid) i.setCollectedfrom(createMAGCollectedFrom()) pub.setInstance(List(i).asJava) @@ -197,8 +197,8 @@ case object ConversionUtil { //IMPORTANT //The old method result.setId(generateIdentifier(result, doi)) //will be replaced using IdentifierFactory - pub.setId(generateIdentifier(pub, paper.Doi.toLowerCase)) - pub.setId(IdentifierFactory.createIdentifier(pub)) + + pub.setId(IdentifierFactory.createDOIBoostIdentifier(pub)) val mainTitles = createSP(paper.PaperTitle, "main title", "dnet:dataCite_title") val originalTitles = createSP(paper.OriginalTitle, "alternative title", "dnet:dataCite_title") diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala index bc1982e77..173e33360 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala @@ -67,7 +67,7 @@ object SparkProcessMAG { MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName, mpa.sequenceNumber) } else mpa - }).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation")).as("authors")) + }).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation", $"sequenceNumber")).as("authors")) .write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_1_paper_authors") logger.info("Phase 4) create First Version of publication Entity with Paper Journal and Authors") diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala index ccf005ce1..02016b47c 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala @@ -1,6 +1,7 @@ package eu.dnetlib.doiboost.orcid import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Publication} import eu.dnetlib.dhp.schema.orcid.OrcidDOI import eu.dnetlib.doiboost.DoiBoostMappingUtil @@ -49,7 +50,11 @@ object ORCIDToOAF { val pub:Publication = new Publication pub.setPid(List(createSP(doi.toLowerCase, "doi", PID_TYPES)).asJava) pub.setDataInfo(generateDataInfo()) - pub.setId(generateIdentifier(pub, doi.toLowerCase)) + + pub.setId(IdentifierFactory.createDOIBoostIdentifier(pub)) + if (pub.getId == null) + return null + try{ val l:List[Author]= input.getAuthors.asScala.map(a=> { diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala index 0972d55de..b9895dd09 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala @@ -55,7 +55,6 @@ object UnpayWallToOAF { val doi = (json \"doi").extract[String] - val is_oa = (json\ "is_oa").extract[Boolean] val journal_is_oa= (json\ "journal_is_oa").extract[Boolean] @@ -63,14 +62,6 @@ object UnpayWallToOAF { val oaLocation:OALocation = (json \ "best_oa_location").extractOrElse[OALocation](null) val colour = get_color(is_oa, oaLocation, journal_is_oa) - pub.setPid(List(createSP(doi, "doi", PID_TYPES)).asJava) - - //IMPORTANT - //The old method pub.setId(IdentifierFactory.createIdentifier(pub)) - //will be replaced using IdentifierFactory - //pub.setId(generateIdentifier(pub, doi.toLowerCase)) - pub.setId(IdentifierFactory.createIdentifier(pub)) - pub.setCollectedfrom(List(createUnpayWallCollectedFrom()).asJava) pub.setDataInfo(generateDataInfo()) @@ -86,12 +77,9 @@ object UnpayWallToOAF { // i.setAccessright(getOpenAccessQualifier()) i.setUrl(List(oaLocation.url.get).asJava) - // Ticket #6281 added pid to Instance - i.setPid(pub.getPid.asScala.filter(p => p.getQualifier.getClassid.equalsIgnoreCase("doi")).asJava) - if (oaLocation.license.isDefined) i.setLicense(asField(oaLocation.license.get)) - + pub.setPid(List(createSP(doi, "doi", PID_TYPES)).asJava) // Ticket #6282 Adding open Access Colour if (colour.isDefined) { @@ -102,8 +90,15 @@ object UnpayWallToOAF { a.setSchemename(ModelConstants.DNET_ACCESS_MODES) a.setOpenAccessRoute(colour.get) i.setAccessright(a) + i.setPid(List(createSP(doi, "doi", PID_TYPES)).asJava) } pub.setInstance(List(i).asJava) + + //IMPORTANT + //The old method pub.setId(IdentifierFactory.createIdentifier(pub)) + //will be replaced using IdentifierFactory + //pub.setId(generateIdentifier(pub, doi.toLowerCase)) + pub.setId(IdentifierFactory.createDOIBoostIdentifier(pub)) pub } diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml index 3f5805b62..cc260d7c0 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml @@ -54,6 +54,11 @@ + + MAGDumpPath + the MAG dump working path + + inputPathMAG the MAG working path @@ -132,7 +137,10 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --workingPath${inputPathCrossref} --masteryarn-cluster @@ -147,6 +155,43 @@ + + + + + + + + + + + + + + + + + + + yarn-cluster + cluster + Convert Mag to Dataset + eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${MAGDumpPath} + --targetPath${inputPathMAG}/dataset + --masteryarn-cluster + @@ -164,46 +209,15 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --sourcePath${inputPathCrossref}/crossref_ds --targetPath${workingPath} --masteryarn-cluster - - - - - - - - - - - - - - - - - - - - yarn-cluster - cluster - Convert Mag to Dataset - eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - ${sparkExtraOPT} - - --sourcePath${inputPathMAG}/input - --targetPath${inputPathMAG}/dataset - --masteryarn-cluster - @@ -216,11 +230,14 @@ eu.dnetlib.doiboost.mag.SparkProcessMAG dhp-doiboost-${projectVersion}.jar - --executor-memory=${sparkExecutorMemory} + --executor-memory=${sparkExecutorIntersectionMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --sourcePath${inputPathMAG}/dataset --workingPath${inputPathMAG}/process @@ -245,10 +262,14 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --sourcePath${inputPathUnpayWall}/uw_extracted - --targetPath${workingPath} + --targetPath${workingPath}/uwPublication --masteryarn-cluster @@ -268,10 +289,13 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --sourcePath${inputPathOrcid} - --targetPath${workingPath} + --targetPath${workingPath}/orcidPublication --masteryarn-cluster @@ -291,11 +315,15 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --hostedByMapPath${hostedByMapPath} - --affiliationPath${inputPathMAG}/process/Affiliations - --paperAffiliationPath${inputPathMAG}/process/PaperAuthorAffiliations + --affiliationPath${inputPathMAG}/dataset/Affiliations + --paperAffiliationPath${inputPathMAG}/dataset/PaperAuthorAffiliations --workingPath${workingPath} --masteryarn-cluster @@ -316,7 +344,10 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --dbPublicationPath${workingPath}/doiBoostPublicationFiltered --dbDatasetPath${workingPath}/crossrefDataset diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/uw/UnpayWallMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/uw/UnpayWallMappingTest.scala index 3e4acdffb..6688fc616 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/uw/UnpayWallMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/uw/UnpayWallMappingTest.scala @@ -22,13 +22,11 @@ class UnpayWallMappingTest { for (line <-Ilist.lines) { - - val p = UnpayWallToOAF.convertToOAF(line) if(p!= null) { - assertTrue(p.getPid.size()==1) - logger.info(p.getId) + assertTrue(p.getInstance().size()==1) + logger.info(s"ID : ${p.getId}") } assertNotNull(line) assertTrue(line.nonEmpty)