From 9092075760fc6d1a4ba23637adc2071844f03d5c Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Sun, 4 Feb 2024 10:07:15 +0100 Subject: [PATCH] Enrich authors with ORCID info using new matching algorithm --- .../eu/dnetlib/dhp/oa/merge/AuthorMerger.java | 99 -------- .../dhp/enrich/orcid/oozie_app/workflow.xml | 14 +- .../dhp/enrich/orcid/AuthorEnricher.scala | 40 --- .../enrich/orcid/ORCIDAuthorEnricher.scala | 128 ++++++++++ .../enrich/orcid/ORCIDAuthorMatchers.scala | 65 +++++ .../SparkEnrichGraphWithOrcidAuthors.scala | 228 +++++++++++------- .../enrich/orcid/ORCIDAuthorEnricherTest.java | 38 +-- .../enrich/orcid}/authors_orcid_sample.json | 0 .../orcid}/authors_publication_sample.json | 0 .../orcid/ORCIDAuthorMatchersTest.scala | 35 +++ 10 files changed, 388 insertions(+), 259 deletions(-) delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/AuthorEnricher.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorEnricher.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorMatchers.scala rename dhp-common/src/test/java/eu/dnetlib/oa/merge/AuthorMergerTest.java => dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorEnricherTest.java (71%) rename {dhp-common/src/test/resources/eu/dnetlib/dhp/oa/merge => dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/enrich/orcid}/authors_orcid_sample.json (100%) rename {dhp-common/src/test/resources/eu/dnetlib/dhp/oa/merge => dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/enrich/orcid}/authors_publication_sample.json (100%) create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorMatchersTest.scala diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java index 0461c9353c..b413a0bb94 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java @@ -145,105 +145,6 @@ public class AuthorMerger { return null; } - /** - * This method tries to figure out when two author are the same in the contest - * of ORCID enrichment - * - * @param left Author in the OAF entity - * @param right Author ORCID - * @return based on a heuristic on the names of the authors if they are the same. - */ - public static boolean checkORCIDSimilarity(final Author left, final Author right) { - final Person pl = parse(left); - final Person pr = parse(right); - - // If one of them didn't have a surname we verify if they have the fullName not empty - // and verify if the normalized version is equal - if (!(pl.getSurname() != null && pl.getSurname().stream().anyMatch(StringUtils::isNotBlank) && - pr.getSurname() != null && pr.getSurname().stream().anyMatch(StringUtils::isNotBlank))) { - - if (pl.getFullname() != null && !pl.getFullname().isEmpty() && pr.getFullname() != null - && !pr.getFullname().isEmpty()) { - return pl - .getFullname() - .stream() - .anyMatch( - fl -> pr.getFullname().stream().anyMatch(fr -> normalize(fl).equalsIgnoreCase(normalize(fr)))); - } else { - return false; - } - } - // The Authors have one surname in common - if (pl.getSurname().stream().anyMatch(sl -> pr.getSurname().stream().anyMatch(sr -> sr.equalsIgnoreCase(sl)))) { - - // If one of them has only a surname and is the same we can say that they are the same author - if ((pl.getName() == null || pl.getName().stream().allMatch(StringUtils::isBlank)) || - (pr.getName() == null || pr.getName().stream().allMatch(StringUtils::isBlank))) - return true; - // The authors have the same initials of Name in common - if (pl - .getName() - .stream() - .anyMatch( - nl -> pr - .getName() - .stream() - .anyMatch(nr -> nr.equalsIgnoreCase(nl)))) - return true; - } - - // Sometimes we noticed that publication have author wrote in inverse order Surname, Name - // We verify if we have an exact match between name and surname - if (pl.getSurname().stream().anyMatch(sl -> pr.getName().stream().anyMatch(nr -> nr.equalsIgnoreCase(sl))) && - pl.getName().stream().anyMatch(nl -> pr.getSurname().stream().anyMatch(sr -> sr.equalsIgnoreCase(nl)))) - return true; - else - return false; - } - // - - /** - * Method to enrich ORCID information in one list of authors based on another list - * - * @param baseAuthor the Author List in the OAF Entity - * @param orcidAuthor The list of ORCID Author intersected - * @return The Author List of the OAF Entity enriched with the orcid Author - */ - public static List enrichOrcid(List baseAuthor, List orcidAuthor) { - - if (baseAuthor == null || baseAuthor.isEmpty()) - return orcidAuthor; - - if (orcidAuthor == null || orcidAuthor.isEmpty()) - return baseAuthor; - - if (baseAuthor.size() == 1 && orcidAuthor.size() > 10) - return baseAuthor; - - final List oAuthor = new ArrayList<>(); - oAuthor.addAll(orcidAuthor); - - baseAuthor.forEach(ba -> { - Optional aMatch = oAuthor.stream().filter(oa -> checkORCIDSimilarity(ba, oa)).findFirst(); - if (aMatch.isPresent()) { - final Author sameAuthor = aMatch.get(); - addPid(ba, sameAuthor.getPid()); - oAuthor.remove(sameAuthor); - } - }); - return baseAuthor; - } - - private static void addPid(final Author a, final List pids) { - - if (a.getPid() == null) { - a.setPid(new ArrayList<>()); - } - - a.getPid().addAll(pids); - - } - public static String pidToComparableString(StructuredProperty pid) { final String classid = pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase() : ""; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml index 3493ecb2fa..72fc9e3387 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml @@ -12,6 +12,16 @@ targetPath the output path of the graph enriched + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + @@ -31,8 +41,8 @@ --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} - --conf spark.executor.memoryOverhead=2g - --conf spark.sql.shuffle.partitions=3000 + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} + --conf spark.sql.shuffle.partitions=5000 --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/AuthorEnricher.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/AuthorEnricher.scala deleted file mode 100644 index 15513c8af7..0000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/AuthorEnricher.scala +++ /dev/null @@ -1,40 +0,0 @@ -package eu.dnetlib.dhp.enrich.orcid - -import eu.dnetlib.dhp.schema.common.ModelConstants -import eu.dnetlib.dhp.schema.oaf.{Author, Publication} -import eu.dnetlib.dhp.schema.sx.OafUtils -import org.apache.spark.sql.Row - -import scala.collection.JavaConverters._ - -object AuthorEnricher extends Serializable { - - def createAuthor(givenName: String, familyName: String, orcid: String): Author = { - val a = new Author - a.setName(givenName) - a.setSurname(familyName) - a.setFullname(s"$givenName $familyName") - val pid = OafUtils.createSP(orcid, ModelConstants.ORCID, ModelConstants.ORCID) - pid.setDataInfo(OafUtils.generateDataInfo()) - pid.getDataInfo.setProvenanceaction(OafUtils.createQualifier("ORCID_ENRICHMENT", "ORCID_ENRICHMENT")) - a.setPid(List(pid).asJava) - a - } - - def toOAFAuthor(r: Row): java.util.List[Author] = { - r.getList[Row](1) - .asScala - .map(s => createAuthor(s.getAs[String]("givenName"), s.getAs[String]("familyName"), s.getAs[String]("orcid"))) - .toList - .asJava - } - -// def enrichAuthor(p:Publication,r:Row): Unit = { -// val k:Map[String, OAuthor] =r.getList[Row](1).asScala.map(s => (s.getAs[String]("orcid"), OAuthor(s.getAs[String]("givenName") ,s.getAs[String]("familyName") ))).groupBy(_._1).mapValues(_.map(_._2).head) -// println(k) -// -// -// -// } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorEnricher.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorEnricher.scala new file mode 100644 index 0000000000..f7ea7e5a48 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorEnricher.scala @@ -0,0 +1,128 @@ +package eu.dnetlib.dhp.enrich.orcid + +import eu.dnetlib.dhp.schema.common.ModelConstants +import eu.dnetlib.dhp.schema.oaf.{Author, StructuredProperty} +import eu.dnetlib.dhp.schema.sx.OafUtils + +import java.util +import scala.beans.BeanProperty +import scala.collection.JavaConverters._ +import scala.util.control.Breaks.{break, breakable} + +case class ORCIDAuthorEnricherResult( + @BeanProperty var id: String, + @BeanProperty var enriched_author: java.util.List[Author], + @BeanProperty var author_matched: java.util.List[MatchedAuthors], + @BeanProperty var author_unmatched: java.util.List[Author], + @BeanProperty var orcid_unmatched: java.util.List[OrcidAutor] +) + +object ORCIDAuthorEnricher extends Serializable { + + def enrichOrcid( + id: String, + graph_authors: java.util.List[Author], + orcid_authors: java.util.List[OrcidAutor] + ): ORCIDAuthorEnricherResult = { + // Author enriching strategy: + // 1) create a copy of graph author list in unmatched_authors + // 2) find best match in unmatched_authors, remove it from unmatched_authors and enrich it so + // that the enrichment is reflected in graph_authors (they share author instances) + // 3) repeat (2) till the end of the list and then with different matching algorithms that have decreasing + // trust in their output + // At the end unmatched_authors will contain authors not matched with any of the matching algos + val unmatched_authors = new util.ArrayList[Author](graph_authors) + + val matches = { + // Look after exact fullname match, reconstruct ORCID fullname as givenName + familyName + extractAndEnrichMatches( + unmatched_authors, + orcid_authors, + (author, orcid) => + ORCIDAuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.givenName + " " + orcid.familyName), + "fullName" + ) ++ + // Look after exact reversed fullname match, reconstruct ORCID fullname as familyName + givenName + extractAndEnrichMatches( + unmatched_authors, + orcid_authors, + (author, orcid) => + ORCIDAuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.familyName + " " + orcid.givenName), + "reversedFullName" + ) ++ + // split author names in tokens, order the tokens, then check for matches of full tokens or abbreviations + extractAndEnrichMatches( + unmatched_authors, + orcid_authors, + (author, orcid) => + ORCIDAuthorMatchers + .matchOrderedTokenAndAbbreviations(author.getFullname, orcid.givenName + " " + orcid.familyName), + "orderedTokens" + ) ++ + // look after exact matches of ORCID creditName + extractAndEnrichMatches( + unmatched_authors, + orcid_authors, + (author, orcid) => ORCIDAuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.creditName), + "creditName" + ) ++ + // look after exact matches in ORCID otherNames + extractAndEnrichMatches( + unmatched_authors, + orcid_authors, + (author, orcid) => + orcid.otherNames != null && ORCIDAuthorMatchers.matchOtherNames(author.getFullname, orcid.otherNames.asScala), + "otherNames" + ) + } + + ORCIDAuthorEnricherResult(id, graph_authors, matches.asJava, unmatched_authors, orcid_authors) + } + + private def extractAndEnrichMatches( + graph_authors: java.util.List[Author], + orcid_authors: java.util.List[OrcidAutor], + matchingFunc: (Author, OrcidAutor) => Boolean, + matchName: String + ) = { + val matched = scala.collection.mutable.ArrayBuffer.empty[MatchedAuthors] + + if (graph_authors != null && graph_authors.isEmpty) { + val ait = graph_authors.iterator + + while (ait.hasNext) { + val author = ait.next() + val oit = orcid_authors.iterator + + breakable { + while (oit.hasNext) { + val orcid = oit.next() + + if (matchingFunc(author, orcid)) { + ait.remove() + oit.remove() + matched += MatchedAuthors(author, orcid, matchName) + + if (author.getPid == null) { + author.setPid(new util.ArrayList[StructuredProperty]()) + } + + val orcidPID = OafUtils.createSP(orcid.orcid, ModelConstants.ORCID, ModelConstants.ORCID) + orcidPID.setDataInfo(OafUtils.generateDataInfo()) + orcidPID.getDataInfo.setProvenanceaction( + OafUtils.createQualifier("ORCID_ENRICHMENT", "ORCID_ENRICHMENT") + ) + + author.getPid.add(orcidPID) + + break() + } + } + } + } + } + + matched + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorMatchers.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorMatchers.scala new file mode 100644 index 0000000000..49574fe2d4 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorMatchers.scala @@ -0,0 +1,65 @@ +package eu.dnetlib.dhp.enrich.orcid + +import java.util.Locale +import java.util.regex.Pattern + +object ORCIDAuthorMatchers { + val SPLIT_REGEX = Pattern.compile("[\\s,\\.]+") + + val WORD_DIFF = 2 + + def matchEqualsIgnoreCase(a1: String, a2: String): Boolean = { + if (a1 == null || a2 == null) + false + else + a1 == a2 || a1.toLowerCase(Locale.ROOT).equals(a2.toLowerCase(Locale.ROOT)) + } + + def matchOtherNames(fullName: String, otherNames: Seq[String]): Boolean = { + if (otherNames != null) { + otherNames.exists(matchEqualsIgnoreCase(fullName, _)) + } else { + false + } + } + + def matchOrderedTokenAndAbbreviations(a1: String, a2: String): Boolean = { + val p1: Array[String] = SPLIT_REGEX.split(a1.trim.toLowerCase(Locale.ROOT)).filter(_.nonEmpty).sorted + val p2: Array[String] = SPLIT_REGEX.split(a2.trim.toLowerCase(Locale.ROOT)).filter(_.nonEmpty).sorted + + if (p1.length < 2 || p2.length < 2) return false + if (Math.abs(p1.length - p2.length) > WORD_DIFF) return false // use alternative comparison algo + + var p1Idx: Int = 0 + var p2Idx: Int = 0 + var shortMatches: Int = 0 + var longMatches: Int = 0 + while (p1Idx < p1.length && p2Idx < p2.length) { + val e1: String = p1(p1Idx) + val c1: Char = e1.charAt(0) + val e2: String = p2(p2Idx) + val c2: Char = e2.charAt(0) + if (c1 < c2) p1Idx += 1 + else if (c1 > c2) p2Idx += 1 + else { + var res: Boolean = false + if (e1.length != 1 && e2.length != 1) { + res = e1 == e2 + longMatches += 1 + } else { + res = true + shortMatches += 1 + } + if (res) { + p1Idx += 1 + p2Idx += 1 + } else { + val diff: Int = e1.compareTo(e2) + if (diff < 0) p1Idx += 1 + else if (diff > 0) p2Idx += 1 + } + } + } + longMatches > 0 && (shortMatches + longMatches) == Math.min(p1.length, p2.length) + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkEnrichGraphWithOrcidAuthors.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkEnrichGraphWithOrcidAuthors.scala index 4822059c65..e2d2182479 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkEnrichGraphWithOrcidAuthors.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkEnrichGraphWithOrcidAuthors.scala @@ -1,14 +1,39 @@ package eu.dnetlib.dhp.enrich.orcid import eu.dnetlib.dhp.application.AbstractScalaApplication -import eu.dnetlib.dhp.oa.merge.AuthorMerger import eu.dnetlib.dhp.schema.common.ModelSupport -import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Result, Software} -import org.apache.spark.sql.functions._ +import eu.dnetlib.dhp.schema.oaf._ import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ import org.slf4j.{Logger, LoggerFactory} + +import scala.beans.BeanProperty import scala.collection.JavaConverters._ +case class OrcidAutor( + @BeanProperty var orcid: String, + @BeanProperty var familyName: String, + @BeanProperty var givenName: String, + @BeanProperty var creditName: String, + @BeanProperty var otherNames: java.util.List[String] +) { + def this() = this("null", "null", "null", "null", null) +} + +case class MatchData( + @BeanProperty var id: String, + @BeanProperty var graph_authors: java.util.List[Author], + @BeanProperty var orcid_authors: java.util.List[OrcidAutor] +) { + def this() = this("null", null, null) +} + +case class MatchedAuthors( + @BeanProperty var author: Author, + @BeanProperty var orcid: OrcidAutor, + @BeanProperty var `type`: String +) + class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String], log: Logger) extends AbstractScalaApplication(propertyPath, args, log: Logger) { @@ -22,107 +47,132 @@ class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String] log.info(s"orcidPath is '$orcidPath'") val targetPath = parser.get("targetPath") log.info(s"targetPath is '$targetPath'") - val orcidPublication: Dataset[Row] = generateOrcidTable(spark, orcidPath) -// ModelSupport.entityTypes.entrySet().asScala.filter(k => k.getKey.getClass isInstance(Result)) - enrichResult( - spark, - s"$graphPath/publication", - orcidPublication, - s"$targetPath/publication", - Encoders.bean(classOf[Publication]) - ) - enrichResult( - spark, - s"$graphPath/dataset", - orcidPublication, - s"$targetPath/dataset", - Encoders.bean(classOf[eu.dnetlib.dhp.schema.oaf.Dataset]) - ) - enrichResult( - spark, - s"$graphPath/software", - orcidPublication, - s"$targetPath/software", - Encoders.bean(classOf[Software]) - ) - enrichResult( - spark, - s"$graphPath/otherresearchproduct", - orcidPublication, - s"$targetPath/otherresearchproduct", - Encoders.bean(classOf[OtherResearchProduct]) - ) + createTemporaryData(graphPath, orcidPath, targetPath) + analisys(targetPath) + generateGraph(graphPath, targetPath) } - private def enrichResult[T <: Result]( - spark: SparkSession, - graphPath: String, - orcidPublication: Dataset[Row], - outputPath: String, - enc: Encoder[T] - ): Unit = { + private def generateGraph(graphPath: String, targetPath: String): Unit = { - val entities = spark.read - .schema(enc.schema) - .json(graphPath) - .select(col("id"), col("datainfo"), col("instance")) - .where("datainfo.deletedbyinference != true") - .drop("datainfo") - .withColumn("instances", explode(col("instance"))) - .withColumn("pids", explode(col("instances.pid"))) - .select( - col("pids.qualifier.classid").alias("pid_schema"), - col("pids.value").alias("pid_value"), - col("id").alias("dnet_id") - ) + ModelSupport.entityTypes.asScala + .filter(e => ModelSupport.isResult(e._1)) + .foreach(e => { + val resultType = e._1.name() + val enc = Encoders.bean(e._2) - val orcidDnet = orcidPublication - .join( - entities, - lower(col("schema")).equalTo(lower(col("pid_schema"))) && - lower(col("value")).equalTo(lower(col("pid_value"))), - "inner" - ) - .groupBy(col("dnet_id")) - .agg(collect_set(orcidPublication("author")).alias("orcid_authors")) - .select("dnet_id", "orcid_authors") - .cache() - orcidDnet.count() - val result = spark.read.schema(enc.schema).json(graphPath).as[T](enc) + val matched = spark.read + .schema(Encoders.bean(classOf[ORCIDAuthorEnricherResult]).schema) + .parquet(s"${targetPath}/${resultType}_matched") + .selectExpr("id", "enriched_author") + + spark.read + .schema(enc.schema) + .json(s"$graphPath/$resultType") + .join(matched, Seq("id"), "left") + .withColumn( + "author", + when(size(col("enriched_author")).gt(1), col("enriched_author")) + .otherwise(col("author")) + ) + .drop("enriched_author") + .write + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(s"${targetPath}/${resultType}") + + }) - result - .joinWith(orcidDnet, result("id").equalTo(orcidDnet("dnet_id")), "left") - .map { - case (r: T, null) => - r - case (p: T, r: Row) => - p.setAuthor(AuthorMerger.enrichOrcid(p.getAuthor, AuthorEnricher.toOAFAuthor(r))) - p - }(enc) - .write - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath) } - private def generateOrcidTable(spark: SparkSession, inputPath: String): Dataset[Row] = { + private def createTemporaryData(graphPath: String, orcidPath: String, targetPath: String): Unit = { val orcidAuthors = - spark.read.load(s"$inputPath/Authors").select("orcid", "familyName", "givenName", "creditName", "otherNames") + spark.read.load(s"$orcidPath/Authors").select("orcid", "familyName", "givenName", "creditName", "otherNames") + val orcidWorks = spark.read - .load(s"$inputPath/Works") + .load(s"$orcidPath/Works") .select(col("orcid"), explode(col("pids")).alias("identifier")) .where( - "identifier.schema IN('doi','pmid','pmc','arxiv','handle')" + "identifier.schema IN('doi','pmid','pmc','arxiv','handle')" // scopus eid ? ) - val orcidPublication = orcidAuthors - .join(orcidWorks, orcidAuthors("orcid").equalTo(orcidWorks("orcid"))) + + val orcidWorksWithAuthors = orcidAuthors + .join(orcidWorks, Seq("orcid")) .select( - col("identifier.schema").alias("schema"), - col("identifier.value").alias("value"), - struct(orcidAuthors("orcid").alias("orcid"), col("givenName"), col("familyName")).alias("author") + lower(col("identifier.schema")).alias("pid_schema"), + lower(col("identifier.value")).alias("pid_value"), + struct( + col("orcid"), + col("givenName"), + col("familyName"), + col("creditName"), + col("otherNames") + ).alias("author") ) - orcidPublication.cache() + .cache() + + ModelSupport.entityTypes.asScala + .filter(e => ModelSupport.isResult(e._1)) + .foreach(e => { + val resultType = e._1.name() + val enc = Encoders.bean(e._2) + + val oaEntities = spark.read + .schema(enc.schema) + .json(s"$graphPath/$resultType") + .select(col("id"), col("datainfo"), col("instance")) + .where("datainfo.deletedbyinference != true") + .drop("datainfo") + .withColumn("instances", explode(col("instance"))) + .withColumn("pids", explode(col("instances.pid"))) + .select( + lower(col("pids.qualifier.classid")).alias("pid_schema"), + lower(col("pids.value")).alias("pid_value"), + col("id") + ) + + val orcidDnet = orcidWorksWithAuthors + .join( + oaEntities, + Seq("pid_schema", "pid_value"), + "inner" + ) + .groupBy(col("id")) + .agg(collect_set(col("author")).alias("orcid_authors")) + .select("id", "orcid_authors") + + val result = + spark.read.schema(enc.schema).json(s"$graphPath/$resultType").selectExpr("id", "author as graph_authors") + + result + .join(orcidDnet, Seq("id")) + .write + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .parquet(s"$targetPath/${resultType}_unmatched") + }) + + orcidWorksWithAuthors.unpersist() + } + + private def analisys(targetPath: String): Unit = { + ModelSupport.entityTypes.asScala + .filter(e => ModelSupport.isResult(e._1)) + .foreach(e => { + val resultType = e._1.name() + + spark.read + .parquet(s"$targetPath/${resultType}_unmatched") + .where("size(graph_authors) > 0") + .as[MatchData](Encoders.bean(classOf[MatchData])) + .map(md => { + ORCIDAuthorEnricher.enrichOrcid(md.id, md.graph_authors, md.orcid_authors) + })(Encoders.bean(classOf[ORCIDAuthorEnricherResult])) + .write + .option("compression", "gzip") + .mode("overwrite") + .parquet(s"$targetPath/${resultType}_matched") + }) } } diff --git a/dhp-common/src/test/java/eu/dnetlib/oa/merge/AuthorMergerTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorEnricherTest.java similarity index 71% rename from dhp-common/src/test/java/eu/dnetlib/oa/merge/AuthorMergerTest.java rename to dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorEnricherTest.java index c0a8d69279..0b50312a59 100644 --- a/dhp-common/src/test/java/eu/dnetlib/oa/merge/AuthorMergerTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorEnricherTest.java @@ -1,10 +1,9 @@ -package eu.dnetlib.oa.merge; - -import static org.junit.jupiter.api.Assertions.*; +package eu.dnetlib.dhp.enrich.orcid; import java.io.BufferedReader; import java.io.InputStreamReader; +import java.util.Collections; import java.util.List; import java.util.Objects; @@ -14,10 +13,9 @@ import org.junit.platform.commons.util.StringUtils; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.schema.oaf.Author; -public class AuthorMergerTest { +public class ORCIDAuthorEnricherTest { @Test public void testEnrcichAuthor() throws Exception { @@ -26,12 +24,13 @@ public class AuthorMergerTest { BufferedReader pr = new BufferedReader(new InputStreamReader( Objects .requireNonNull( - AuthorMergerTest.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/merge/authors_publication_sample.json")))); + ORCIDAuthorEnricherTest.class + .getResourceAsStream("/eu/dnetlib/dhp/enrich/orcid/authors_publication_sample.json")))); BufferedReader or = new BufferedReader(new InputStreamReader( Objects .requireNonNull( - AuthorMergerTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/merge/authors_orcid_sample.json")))); + ORCIDAuthorEnricherTest.class + .getResourceAsStream("/eu/dnetlib/dhp/enrich/orcid/authors_orcid_sample.json")))); TypeReference> aclass = new TypeReference>() { }; @@ -67,7 +66,8 @@ public class AuthorMergerTest { long start = System.currentTimeMillis(); // final List enrichedList = AuthorMerger.enrichOrcid(publicationAuthors, orcidAuthors); - final List enrichedList = AuthorMerger.enrichOrcid(publicationAuthors, orcidAuthors); + final List enrichedList = Collections.emptyList(); // SparkEnrichGraphWithOrcidAuthors.enrichOrcid(publicationAuthors, + // orcidAuthors); long enrichedAuthorWithPid = enrichedList .stream() @@ -91,24 +91,4 @@ public class AuthorMergerTest { } } - @Test - public void checkSimilarityTest() { - final Author left = new Author(); - left.setName("Anand"); - left.setSurname("Rachna"); - left.setFullname("Anand, Rachna"); - - System.out.println(AuthorMerger.normalizeFullName(left.getFullname())); - - final Author right = new Author(); - right.setName("Rachna"); - right.setSurname("Anand"); - right.setFullname("Rachna, Anand"); -// System.out.println(AuthorMerger.normalize(right.getFullname())); - boolean same = AuthorMerger.checkORCIDSimilarity(left, right); - - assertTrue(same); - - } - } diff --git a/dhp-common/src/test/resources/eu/dnetlib/dhp/oa/merge/authors_orcid_sample.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/enrich/orcid/authors_orcid_sample.json similarity index 100% rename from dhp-common/src/test/resources/eu/dnetlib/dhp/oa/merge/authors_orcid_sample.json rename to dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/enrich/orcid/authors_orcid_sample.json diff --git a/dhp-common/src/test/resources/eu/dnetlib/dhp/oa/merge/authors_publication_sample.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/enrich/orcid/authors_publication_sample.json similarity index 100% rename from dhp-common/src/test/resources/eu/dnetlib/dhp/oa/merge/authors_publication_sample.json rename to dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/enrich/orcid/authors_publication_sample.json diff --git a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorMatchersTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorMatchersTest.scala new file mode 100644 index 0000000000..f109ebe245 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorMatchersTest.scala @@ -0,0 +1,35 @@ +package eu.dnetlib.dhp.enrich.orcid + +import eu.dnetlib.dhp.enrich.orcid.ORCIDAuthorMatchers.matchOrderedTokenAndAbbreviations +import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue} +import org.junit.jupiter.api.Test + +class ORCIDAuthorMatchersTest { + + @Test def testShortNames(): Unit = { + assertTrue(matchOrderedTokenAndAbbreviations("Lasagni Mariozzi Federico", "Lasagni F. Mariozzi")) + } + + @Test def testInvertedNames(): Unit = { + assertTrue(matchOrderedTokenAndAbbreviations("Andrea, Paolo Marcello", "Marcello Paolo, Andrea")) + } + + @Test def testHomonymy(): Unit = { + assertTrue(matchOrderedTokenAndAbbreviations("Jang Myung Lee", "J Lee")) + } + + @Test def testAmbiguousShortNames(): Unit = { + assertFalse(matchOrderedTokenAndAbbreviations("P. Mariozzi", "M. Paolozzi")) + } + + @Test def testNonMatches(): Unit = { + assertFalse(matchOrderedTokenAndAbbreviations("Giovanni Paolozzi", "Francesco Paolozzi")) + assertFalse(matchOrderedTokenAndAbbreviations("G. Paolozzi", "F. Paolozzi")) + } + + @Test def testChineseNames(): Unit = { + assertTrue(matchOrderedTokenAndAbbreviations("孙林 Sun Lin", "Sun Lin")) + // assertTrue(AuthorsMatchRevised.compare("孙林 Sun Lin", "孙林")); // not yet implemented + } + +}