diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/enrichment/Constants.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/enrichment/Constants.java new file mode 100644 index 000000000..df433dddb --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/enrichment/Constants.java @@ -0,0 +1,5 @@ +package eu.dnetlib.dhp.common.enrichment; + +public class Constants { + public static final String PROPAGATION_DATA_INFO_TYPE = "propagation"; +} diff --git a/dhp-common/src/main/scala/eu/dnetlib/dhp/common/author/SparkEnrichWithOrcidAuthors.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/common/author/SparkEnrichWithOrcidAuthors.scala index 7e62e2012..47d563649 100644 --- a/dhp-common/src/main/scala/eu/dnetlib/dhp/common/author/SparkEnrichWithOrcidAuthors.scala +++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/common/author/SparkEnrichWithOrcidAuthors.scala @@ -1,11 +1,12 @@ package eu.dnetlib.dhp.common.author import eu.dnetlib.dhp.application.AbstractScalaApplication -import eu.dnetlib.dhp.schema.common.ModelSupport +import eu.dnetlib.dhp.schema.common.{ModelConstants, ModelSupport} import eu.dnetlib.dhp.utils.{MatchData, ORCIDAuthorEnricher, ORCIDAuthorEnricherResult} import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.slf4j.{Logger, LoggerFactory} +import eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE import scala.collection.JavaConverters._ @@ -24,13 +25,18 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str log.info(s"targetPath is '$targetPath'") val workingDir = parser.get("workingDir") log.info(s"targetPath is '$workingDir'") + val classid = Option(parser.get("matchingSource")).map(_=>ModelConstants.ORCID_PENDING).getOrElse(ModelConstants.ORCID) - createTemporaryData(graphPath, orcidPath, workingDir) - analisys(workingDir) - generateGraph(graphPath, workingDir, targetPath) + log.info(s"classid is '$classid'") + val provenance = Option(parser.get("matchingSource")).map(_=>PROPAGATION_DATA_INFO_TYPE).getOrElse("ORCID_ENRICHMENT") + log.info(s"targetPath is '$workingDir'") + + createTemporaryData(spark, graphPath, orcidPath, workingDir) + analisys(workingDir,classid,provenance) + generateGraph(spark, graphPath, workingDir, targetPath) } - private def generateGraph(graphPath: String, workingDir: String, targetPath: String): Unit = { + private def generateGraph(spark: SparkSession, graphPath: String, workingDir: String, targetPath: String): Unit = { ModelSupport.entityTypes.asScala .filter(e => ModelSupport.isResult(e._1)) @@ -64,7 +70,7 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str def createTemporaryData(spark: SparkSession, graphPath: String, orcidPath: String, targetPath: String): Unit - private def analisys(targetPath: String): Unit = { + private def analisys(targetPath: String, classid:String, provenance:String): Unit = { ModelSupport.entityTypes.asScala .filter(e => ModelSupport.isResult(e._1)) .foreach(e => { @@ -75,7 +81,7 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str .where("size(graph_authors) > 0") .as[MatchData](Encoders.bean(classOf[MatchData])) .map(md => { - ORCIDAuthorEnricher.enrichOrcid(md.id, md.graph_authors, md.orcid_authors) + ORCIDAuthorEnricher.enrichOrcid(md.id, md.graph_authors, md.orcid_authors, classid, provenance) })(Encoders.bean(classOf[ORCIDAuthorEnricherResult])) .write .option("compression", "gzip") diff --git a/dhp-common/src/main/scala/eu/dnetlib/dhp/utils/ORCIDAuthorEnricher.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/utils/ORCIDAuthorEnricher.scala index 6a087b1de..ff9715c07 100644 --- a/dhp-common/src/main/scala/eu/dnetlib/dhp/utils/ORCIDAuthorEnricher.scala +++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/utils/ORCIDAuthorEnricher.scala @@ -46,7 +46,9 @@ object ORCIDAuthorEnricher extends Serializable { def enrichOrcid( id: String, graph_authors: java.util.List[Author], - orcid_authors: java.util.List[OrcidAuthor] + orcid_authors: java.util.List[OrcidAuthor], + classid:String, + provenance:String ): ORCIDAuthorEnricherResult = { // Author enriching strategy: // 1) create a copy of graph author list in unmatched_authors @@ -64,7 +66,9 @@ object ORCIDAuthorEnricher extends Serializable { orcid_authors, (author, orcid) => AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.givenName + " " + orcid.familyName), - "fullName" + "fullName", + classid, + provenance ) ++ // Look after exact reversed fullname match, reconstruct ORCID fullname as familyName + givenName extractAndEnrichMatches( @@ -72,7 +76,9 @@ object ORCIDAuthorEnricher extends Serializable { orcid_authors, (author, orcid) => AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.familyName + " " + orcid.givenName), - "reversedFullName" + "reversedFullName", + classid, + provenance ) ++ // split author names in tokens, order the tokens, then check for matches of full tokens or abbreviations extractAndEnrichMatches( @@ -81,14 +87,18 @@ object ORCIDAuthorEnricher extends Serializable { (author, orcid) => AuthorMatchers .matchOrderedTokenAndAbbreviations(author.getFullname, orcid.givenName + " " + orcid.familyName), - "orderedTokens" + "orderedTokens", + classid, + provenance ) ++ // look after exact matches of ORCID creditName extractAndEnrichMatches( unmatched_authors, orcid_authors, (author, orcid) => AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.creditName), - "creditName" + "creditName", + classid, + provenance ) ++ // look after exact matches in ORCID otherNames extractAndEnrichMatches( @@ -96,7 +106,9 @@ object ORCIDAuthorEnricher extends Serializable { orcid_authors, (author, orcid) => orcid.otherNames != null && AuthorMatchers.matchOtherNames(author.getFullname, orcid.otherNames.asScala), - "otherNames" + "otherNames", + classid, + provenance ) } @@ -107,7 +119,9 @@ object ORCIDAuthorEnricher extends Serializable { graph_authors: java.util.List[Author], orcid_authors: java.util.List[OrcidAuthor], matchingFunc: (Author, OrcidAuthor) => Boolean, - matchName: String + matchName: String, + classid:String, + provenance : String ) = { val matched = scala.collection.mutable.ArrayBuffer.empty[MatchedAuthors] @@ -131,10 +145,12 @@ object ORCIDAuthorEnricher extends Serializable { author.setPid(new util.ArrayList[StructuredProperty]()) } - val orcidPID = OafUtils.createSP(orcid.orcid, ModelConstants.ORCID, ModelConstants.ORCID) + val orcidPID = OafUtils.createSP(orcid.orcid, classid, classid) + //val orcidPID = OafUtils.createSP(orcid.orcid, ModelConstants.ORCID, ModelConstants.ORCID) orcidPID.setDataInfo(OafUtils.generateDataInfo()) orcidPID.getDataInfo.setProvenanceaction( - OafUtils.createQualifier("ORCID_ENRICHMENT", "ORCID_ENRICHMENT") + //OafUtils.createQualifier("ORCID_ENRICHMENT", "ORCID_ENRICHMENT") + OafUtils.createQualifier(provenance, provenance) ) author.getPid.add(orcidPID) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java index 7c4186663..8dd6e38ba 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java @@ -21,6 +21,8 @@ import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Relation; +import static eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE; + public class PropagationConstant { private PropagationConstant() { @@ -46,7 +48,7 @@ public class PropagationConstant { public static final String INSTITUTIONAL_REPO_TYPE = "institutional"; - public static final String PROPAGATION_DATA_INFO_TYPE = "propagation"; + //public static final String PROPAGATION_DATA_INFO_TYPE = "propagation"; public static final String TRUE = "true"; diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkPropagateOrcidAuthor.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkPropagateOrcidAuthor.java index 7e17b6a7b..9f7b2f29e 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkPropagateOrcidAuthor.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkPropagateOrcidAuthor.java @@ -8,8 +8,6 @@ import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.utils.OrcidAuthor; -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; @@ -19,6 +17,8 @@ import scala.Tuple2; import java.util.List; +import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; @@ -39,20 +39,25 @@ public class SparkPropagateOrcidAuthor extends SparkEnrichWithOrcidAuthors { private static OrcidAuthors getOrcidAuthorsList(List authors) { OrcidAuthors oas = new OrcidAuthors(); - List tmp = authors.stream().map(SparkPropagateOrcidAuthor::getOrcidAuthor).collect(Collectors.toList()); + List tmp = authors.stream().map(SparkPropagateOrcidAuthor::getOrcidAuthor) + .filter(Objects::nonNull).collect(Collectors.toList()); oas.setOrcidAuthorList(tmp); return oas; } private static OrcidAuthor getOrcidAuthor(Author a){ - return new OrcidAuthor(getOrcid(a),a.getSurname(), a.getName(), a.getFullname(), null); + return Optional.ofNullable(getOrcid(a)) + .map(orcid -> new OrcidAuthor(orcid,a.getSurname(), a.getName(), a.getFullname(), null)) + .orElse(null); } private static String getOrcid(Author a){ if (a.getPid().stream().anyMatch(p->p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID))) return a.getPid().stream().filter(p->p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID)).findFirst().get().getValue(); - return a.getPid().stream().filter(p->p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID_PENDING)).findFirst().get().getValue(); + if (a.getPid().stream().anyMatch(p->p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID_PENDING))) + return a.getPid().stream().filter(p->p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID_PENDING)).findFirst().get().getValue(); + return null; }