forked from D-Net/dnet-hadoop
Merge pull request 'Enrich authors with ORCID info using new matching algorithm' (#398) from new_orcid_enhancement into beta
Reviewed-on: D-Net/dnet-hadoop#398
This commit is contained in:
commit
3f22c101d9
|
@ -145,105 +145,6 @@ public class AuthorMerger {
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method tries to figure out when two author are the same in the contest
|
||||
* of ORCID enrichment
|
||||
*
|
||||
* @param left Author in the OAF entity
|
||||
* @param right Author ORCID
|
||||
* @return based on a heuristic on the names of the authors if they are the same.
|
||||
*/
|
||||
public static boolean checkORCIDSimilarity(final Author left, final Author right) {
|
||||
final Person pl = parse(left);
|
||||
final Person pr = parse(right);
|
||||
|
||||
// If one of them didn't have a surname we verify if they have the fullName not empty
|
||||
// and verify if the normalized version is equal
|
||||
if (!(pl.getSurname() != null && pl.getSurname().stream().anyMatch(StringUtils::isNotBlank) &&
|
||||
pr.getSurname() != null && pr.getSurname().stream().anyMatch(StringUtils::isNotBlank))) {
|
||||
|
||||
if (pl.getFullname() != null && !pl.getFullname().isEmpty() && pr.getFullname() != null
|
||||
&& !pr.getFullname().isEmpty()) {
|
||||
return pl
|
||||
.getFullname()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
fl -> pr.getFullname().stream().anyMatch(fr -> normalize(fl).equalsIgnoreCase(normalize(fr))));
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// The Authors have one surname in common
|
||||
if (pl.getSurname().stream().anyMatch(sl -> pr.getSurname().stream().anyMatch(sr -> sr.equalsIgnoreCase(sl)))) {
|
||||
|
||||
// If one of them has only a surname and is the same we can say that they are the same author
|
||||
if ((pl.getName() == null || pl.getName().stream().allMatch(StringUtils::isBlank)) ||
|
||||
(pr.getName() == null || pr.getName().stream().allMatch(StringUtils::isBlank)))
|
||||
return true;
|
||||
// The authors have the same initials of Name in common
|
||||
if (pl
|
||||
.getName()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
nl -> pr
|
||||
.getName()
|
||||
.stream()
|
||||
.anyMatch(nr -> nr.equalsIgnoreCase(nl))))
|
||||
return true;
|
||||
}
|
||||
|
||||
// Sometimes we noticed that publication have author wrote in inverse order Surname, Name
|
||||
// We verify if we have an exact match between name and surname
|
||||
if (pl.getSurname().stream().anyMatch(sl -> pr.getName().stream().anyMatch(nr -> nr.equalsIgnoreCase(sl))) &&
|
||||
pl.getName().stream().anyMatch(nl -> pr.getSurname().stream().anyMatch(sr -> sr.equalsIgnoreCase(nl))))
|
||||
return true;
|
||||
else
|
||||
return false;
|
||||
}
|
||||
//
|
||||
|
||||
/**
|
||||
* Method to enrich ORCID information in one list of authors based on another list
|
||||
*
|
||||
* @param baseAuthor the Author List in the OAF Entity
|
||||
* @param orcidAuthor The list of ORCID Author intersected
|
||||
* @return The Author List of the OAF Entity enriched with the orcid Author
|
||||
*/
|
||||
public static List<Author> enrichOrcid(List<Author> baseAuthor, List<Author> orcidAuthor) {
|
||||
|
||||
if (baseAuthor == null || baseAuthor.isEmpty())
|
||||
return orcidAuthor;
|
||||
|
||||
if (orcidAuthor == null || orcidAuthor.isEmpty())
|
||||
return baseAuthor;
|
||||
|
||||
if (baseAuthor.size() == 1 && orcidAuthor.size() > 10)
|
||||
return baseAuthor;
|
||||
|
||||
final List<Author> oAuthor = new ArrayList<>();
|
||||
oAuthor.addAll(orcidAuthor);
|
||||
|
||||
baseAuthor.forEach(ba -> {
|
||||
Optional<Author> aMatch = oAuthor.stream().filter(oa -> checkORCIDSimilarity(ba, oa)).findFirst();
|
||||
if (aMatch.isPresent()) {
|
||||
final Author sameAuthor = aMatch.get();
|
||||
addPid(ba, sameAuthor.getPid());
|
||||
oAuthor.remove(sameAuthor);
|
||||
}
|
||||
});
|
||||
return baseAuthor;
|
||||
}
|
||||
|
||||
private static void addPid(final Author a, final List<StructuredProperty> pids) {
|
||||
|
||||
if (a.getPid() == null) {
|
||||
a.setPid(new ArrayList<>());
|
||||
}
|
||||
|
||||
a.getPid().addAll(pids);
|
||||
|
||||
}
|
||||
|
||||
public static String pidToComparableString(StructuredProperty pid) {
|
||||
final String classid = pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase()
|
||||
: "";
|
||||
|
|
|
@ -12,6 +12,16 @@
|
|||
<name>targetPath</name>
|
||||
<description>the output path of the graph enriched</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
<description>spark 2.* extra listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
<description>spark 2.* sql query execution listeners classname</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="EnrichGraph"/>
|
||||
|
@ -31,8 +41,8 @@
|
|||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.executor.memoryOverhead=2g
|
||||
--conf spark.sql.shuffle.partitions=3000
|
||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
||||
--conf spark.sql.shuffle.partitions=5000
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
|
|
|
@ -1,40 +0,0 @@
|
|||
package eu.dnetlib.dhp.enrich.orcid
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||
import eu.dnetlib.dhp.schema.oaf.{Author, Publication}
|
||||
import eu.dnetlib.dhp.schema.sx.OafUtils
|
||||
import org.apache.spark.sql.Row
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
object AuthorEnricher extends Serializable {
|
||||
|
||||
def createAuthor(givenName: String, familyName: String, orcid: String): Author = {
|
||||
val a = new Author
|
||||
a.setName(givenName)
|
||||
a.setSurname(familyName)
|
||||
a.setFullname(s"$givenName $familyName")
|
||||
val pid = OafUtils.createSP(orcid, ModelConstants.ORCID, ModelConstants.ORCID)
|
||||
pid.setDataInfo(OafUtils.generateDataInfo())
|
||||
pid.getDataInfo.setProvenanceaction(OafUtils.createQualifier("ORCID_ENRICHMENT", "ORCID_ENRICHMENT"))
|
||||
a.setPid(List(pid).asJava)
|
||||
a
|
||||
}
|
||||
|
||||
def toOAFAuthor(r: Row): java.util.List[Author] = {
|
||||
r.getList[Row](1)
|
||||
.asScala
|
||||
.map(s => createAuthor(s.getAs[String]("givenName"), s.getAs[String]("familyName"), s.getAs[String]("orcid")))
|
||||
.toList
|
||||
.asJava
|
||||
}
|
||||
|
||||
// def enrichAuthor(p:Publication,r:Row): Unit = {
|
||||
// val k:Map[String, OAuthor] =r.getList[Row](1).asScala.map(s => (s.getAs[String]("orcid"), OAuthor(s.getAs[String]("givenName") ,s.getAs[String]("familyName") ))).groupBy(_._1).mapValues(_.map(_._2).head)
|
||||
// println(k)
|
||||
//
|
||||
//
|
||||
//
|
||||
// }
|
||||
|
||||
}
|
|
@ -0,0 +1,128 @@
|
|||
package eu.dnetlib.dhp.enrich.orcid
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||
import eu.dnetlib.dhp.schema.oaf.{Author, StructuredProperty}
|
||||
import eu.dnetlib.dhp.schema.sx.OafUtils
|
||||
|
||||
import java.util
|
||||
import scala.beans.BeanProperty
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.util.control.Breaks.{break, breakable}
|
||||
|
||||
case class ORCIDAuthorEnricherResult(
|
||||
@BeanProperty var id: String,
|
||||
@BeanProperty var enriched_author: java.util.List[Author],
|
||||
@BeanProperty var author_matched: java.util.List[MatchedAuthors],
|
||||
@BeanProperty var author_unmatched: java.util.List[Author],
|
||||
@BeanProperty var orcid_unmatched: java.util.List[OrcidAutor]
|
||||
)
|
||||
|
||||
object ORCIDAuthorEnricher extends Serializable {
|
||||
|
||||
def enrichOrcid(
|
||||
id: String,
|
||||
graph_authors: java.util.List[Author],
|
||||
orcid_authors: java.util.List[OrcidAutor]
|
||||
): ORCIDAuthorEnricherResult = {
|
||||
// Author enriching strategy:
|
||||
// 1) create a copy of graph author list in unmatched_authors
|
||||
// 2) find best match in unmatched_authors, remove it from unmatched_authors and enrich it so
|
||||
// that the enrichment is reflected in graph_authors (they share author instances)
|
||||
// 3) repeat (2) till the end of the list and then with different matching algorithms that have decreasing
|
||||
// trust in their output
|
||||
// At the end unmatched_authors will contain authors not matched with any of the matching algos
|
||||
val unmatched_authors = new util.ArrayList[Author](graph_authors)
|
||||
|
||||
val matches = {
|
||||
// Look after exact fullname match, reconstruct ORCID fullname as givenName + familyName
|
||||
extractAndEnrichMatches(
|
||||
unmatched_authors,
|
||||
orcid_authors,
|
||||
(author, orcid) =>
|
||||
ORCIDAuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.givenName + " " + orcid.familyName),
|
||||
"fullName"
|
||||
) ++
|
||||
// Look after exact reversed fullname match, reconstruct ORCID fullname as familyName + givenName
|
||||
extractAndEnrichMatches(
|
||||
unmatched_authors,
|
||||
orcid_authors,
|
||||
(author, orcid) =>
|
||||
ORCIDAuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.familyName + " " + orcid.givenName),
|
||||
"reversedFullName"
|
||||
) ++
|
||||
// split author names in tokens, order the tokens, then check for matches of full tokens or abbreviations
|
||||
extractAndEnrichMatches(
|
||||
unmatched_authors,
|
||||
orcid_authors,
|
||||
(author, orcid) =>
|
||||
ORCIDAuthorMatchers
|
||||
.matchOrderedTokenAndAbbreviations(author.getFullname, orcid.givenName + " " + orcid.familyName),
|
||||
"orderedTokens"
|
||||
) ++
|
||||
// look after exact matches of ORCID creditName
|
||||
extractAndEnrichMatches(
|
||||
unmatched_authors,
|
||||
orcid_authors,
|
||||
(author, orcid) => ORCIDAuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.creditName),
|
||||
"creditName"
|
||||
) ++
|
||||
// look after exact matches in ORCID otherNames
|
||||
extractAndEnrichMatches(
|
||||
unmatched_authors,
|
||||
orcid_authors,
|
||||
(author, orcid) =>
|
||||
orcid.otherNames != null && ORCIDAuthorMatchers.matchOtherNames(author.getFullname, orcid.otherNames.asScala),
|
||||
"otherNames"
|
||||
)
|
||||
}
|
||||
|
||||
ORCIDAuthorEnricherResult(id, graph_authors, matches.asJava, unmatched_authors, orcid_authors)
|
||||
}
|
||||
|
||||
private def extractAndEnrichMatches(
|
||||
graph_authors: java.util.List[Author],
|
||||
orcid_authors: java.util.List[OrcidAutor],
|
||||
matchingFunc: (Author, OrcidAutor) => Boolean,
|
||||
matchName: String
|
||||
) = {
|
||||
val matched = scala.collection.mutable.ArrayBuffer.empty[MatchedAuthors]
|
||||
|
||||
if (graph_authors != null && !graph_authors.isEmpty) {
|
||||
val ait = graph_authors.iterator
|
||||
|
||||
while (ait.hasNext) {
|
||||
val author = ait.next()
|
||||
val oit = orcid_authors.iterator
|
||||
|
||||
breakable {
|
||||
while (oit.hasNext) {
|
||||
val orcid = oit.next()
|
||||
|
||||
if (matchingFunc(author, orcid)) {
|
||||
ait.remove()
|
||||
oit.remove()
|
||||
matched += MatchedAuthors(author, orcid, matchName)
|
||||
|
||||
if (author.getPid == null) {
|
||||
author.setPid(new util.ArrayList[StructuredProperty]())
|
||||
}
|
||||
|
||||
val orcidPID = OafUtils.createSP(orcid.orcid, ModelConstants.ORCID, ModelConstants.ORCID)
|
||||
orcidPID.setDataInfo(OafUtils.generateDataInfo())
|
||||
orcidPID.getDataInfo.setProvenanceaction(
|
||||
OafUtils.createQualifier("ORCID_ENRICHMENT", "ORCID_ENRICHMENT")
|
||||
)
|
||||
|
||||
author.getPid.add(orcidPID)
|
||||
|
||||
break()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
matched
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
package eu.dnetlib.dhp.enrich.orcid
|
||||
|
||||
import java.util.Locale
|
||||
import java.util.regex.Pattern
|
||||
|
||||
object ORCIDAuthorMatchers {
|
||||
val SPLIT_REGEX = Pattern.compile("[\\s,\\.]+")
|
||||
|
||||
val WORD_DIFF = 2
|
||||
|
||||
def matchEqualsIgnoreCase(a1: String, a2: String): Boolean = {
|
||||
if (a1 == null || a2 == null)
|
||||
false
|
||||
else
|
||||
a1 == a2 || a1.toLowerCase(Locale.ROOT).equals(a2.toLowerCase(Locale.ROOT))
|
||||
}
|
||||
|
||||
def matchOtherNames(fullName: String, otherNames: Seq[String]): Boolean = {
|
||||
if (otherNames != null) {
|
||||
otherNames.exists(matchEqualsIgnoreCase(fullName, _))
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
def matchOrderedTokenAndAbbreviations(a1: String, a2: String): Boolean = {
|
||||
val p1: Array[String] = SPLIT_REGEX.split(a1.trim.toLowerCase(Locale.ROOT)).filter(_.nonEmpty).sorted
|
||||
val p2: Array[String] = SPLIT_REGEX.split(a2.trim.toLowerCase(Locale.ROOT)).filter(_.nonEmpty).sorted
|
||||
|
||||
if (p1.length < 2 || p2.length < 2) return false
|
||||
if (Math.abs(p1.length - p2.length) > WORD_DIFF) return false // use alternative comparison algo
|
||||
|
||||
var p1Idx: Int = 0
|
||||
var p2Idx: Int = 0
|
||||
var shortMatches: Int = 0
|
||||
var longMatches: Int = 0
|
||||
while (p1Idx < p1.length && p2Idx < p2.length) {
|
||||
val e1: String = p1(p1Idx)
|
||||
val c1: Char = e1.charAt(0)
|
||||
val e2: String = p2(p2Idx)
|
||||
val c2: Char = e2.charAt(0)
|
||||
if (c1 < c2) p1Idx += 1
|
||||
else if (c1 > c2) p2Idx += 1
|
||||
else {
|
||||
var res: Boolean = false
|
||||
if (e1.length != 1 && e2.length != 1) {
|
||||
res = e1 == e2
|
||||
longMatches += 1
|
||||
} else {
|
||||
res = true
|
||||
shortMatches += 1
|
||||
}
|
||||
if (res) {
|
||||
p1Idx += 1
|
||||
p2Idx += 1
|
||||
} else {
|
||||
val diff: Int = e1.compareTo(e2)
|
||||
if (diff < 0) p1Idx += 1
|
||||
else if (diff > 0) p2Idx += 1
|
||||
}
|
||||
}
|
||||
}
|
||||
longMatches > 0 && (shortMatches + longMatches) == Math.min(p1.length, p2.length)
|
||||
}
|
||||
}
|
|
@ -1,14 +1,39 @@
|
|||
package eu.dnetlib.dhp.enrich.orcid
|
||||
|
||||
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
||||
import eu.dnetlib.dhp.oa.merge.AuthorMerger
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport
|
||||
import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Result, Software}
|
||||
import org.apache.spark.sql.functions._
|
||||
import eu.dnetlib.dhp.schema.oaf._
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import scala.beans.BeanProperty
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
case class OrcidAutor(
|
||||
@BeanProperty var orcid: String,
|
||||
@BeanProperty var familyName: String,
|
||||
@BeanProperty var givenName: String,
|
||||
@BeanProperty var creditName: String,
|
||||
@BeanProperty var otherNames: java.util.List[String]
|
||||
) {
|
||||
def this() = this("null", "null", "null", "null", null)
|
||||
}
|
||||
|
||||
case class MatchData(
|
||||
@BeanProperty var id: String,
|
||||
@BeanProperty var graph_authors: java.util.List[Author],
|
||||
@BeanProperty var orcid_authors: java.util.List[OrcidAutor]
|
||||
) {
|
||||
def this() = this("null", null, null)
|
||||
}
|
||||
|
||||
case class MatchedAuthors(
|
||||
@BeanProperty var author: Author,
|
||||
@BeanProperty var orcid: OrcidAutor,
|
||||
@BeanProperty var `type`: String
|
||||
)
|
||||
|
||||
class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String], log: Logger)
|
||||
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
|
||||
|
||||
|
@ -22,107 +47,132 @@ class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String]
|
|||
log.info(s"orcidPath is '$orcidPath'")
|
||||
val targetPath = parser.get("targetPath")
|
||||
log.info(s"targetPath is '$targetPath'")
|
||||
val orcidPublication: Dataset[Row] = generateOrcidTable(spark, orcidPath)
|
||||
// ModelSupport.entityTypes.entrySet().asScala.filter(k => k.getKey.getClass isInstance(Result))
|
||||
|
||||
enrichResult(
|
||||
spark,
|
||||
s"$graphPath/publication",
|
||||
orcidPublication,
|
||||
s"$targetPath/publication",
|
||||
Encoders.bean(classOf[Publication])
|
||||
)
|
||||
enrichResult(
|
||||
spark,
|
||||
s"$graphPath/dataset",
|
||||
orcidPublication,
|
||||
s"$targetPath/dataset",
|
||||
Encoders.bean(classOf[eu.dnetlib.dhp.schema.oaf.Dataset])
|
||||
)
|
||||
enrichResult(
|
||||
spark,
|
||||
s"$graphPath/software",
|
||||
orcidPublication,
|
||||
s"$targetPath/software",
|
||||
Encoders.bean(classOf[Software])
|
||||
)
|
||||
enrichResult(
|
||||
spark,
|
||||
s"$graphPath/otherresearchproduct",
|
||||
orcidPublication,
|
||||
s"$targetPath/otherresearchproduct",
|
||||
Encoders.bean(classOf[OtherResearchProduct])
|
||||
)
|
||||
createTemporaryData(graphPath, orcidPath, targetPath)
|
||||
analisys(targetPath)
|
||||
generateGraph(graphPath, targetPath)
|
||||
}
|
||||
|
||||
private def enrichResult[T <: Result](
|
||||
spark: SparkSession,
|
||||
graphPath: String,
|
||||
orcidPublication: Dataset[Row],
|
||||
outputPath: String,
|
||||
enc: Encoder[T]
|
||||
): Unit = {
|
||||
private def generateGraph(graphPath: String, targetPath: String): Unit = {
|
||||
|
||||
val entities = spark.read
|
||||
.schema(enc.schema)
|
||||
.json(graphPath)
|
||||
.select(col("id"), col("datainfo"), col("instance"))
|
||||
.where("datainfo.deletedbyinference != true")
|
||||
.drop("datainfo")
|
||||
.withColumn("instances", explode(col("instance")))
|
||||
.withColumn("pids", explode(col("instances.pid")))
|
||||
.select(
|
||||
col("pids.qualifier.classid").alias("pid_schema"),
|
||||
col("pids.value").alias("pid_value"),
|
||||
col("id").alias("dnet_id")
|
||||
)
|
||||
ModelSupport.entityTypes.asScala
|
||||
.filter(e => ModelSupport.isResult(e._1))
|
||||
.foreach(e => {
|
||||
val resultType = e._1.name()
|
||||
val enc = Encoders.bean(e._2)
|
||||
|
||||
val orcidDnet = orcidPublication
|
||||
.join(
|
||||
entities,
|
||||
lower(col("schema")).equalTo(lower(col("pid_schema"))) &&
|
||||
lower(col("value")).equalTo(lower(col("pid_value"))),
|
||||
"inner"
|
||||
)
|
||||
.groupBy(col("dnet_id"))
|
||||
.agg(collect_set(orcidPublication("author")).alias("orcid_authors"))
|
||||
.select("dnet_id", "orcid_authors")
|
||||
.cache()
|
||||
orcidDnet.count()
|
||||
val result = spark.read.schema(enc.schema).json(graphPath).as[T](enc)
|
||||
val matched = spark.read
|
||||
.schema(Encoders.bean(classOf[ORCIDAuthorEnricherResult]).schema)
|
||||
.parquet(s"${targetPath}/${resultType}_matched")
|
||||
.selectExpr("id", "enriched_author")
|
||||
|
||||
spark.read
|
||||
.schema(enc.schema)
|
||||
.json(s"$graphPath/$resultType")
|
||||
.join(matched, Seq("id"), "left")
|
||||
.withColumn(
|
||||
"author",
|
||||
when(size(col("enriched_author")).gt(0), col("enriched_author"))
|
||||
.otherwise(col("author"))
|
||||
)
|
||||
.drop("enriched_author")
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(s"${targetPath}/${resultType}")
|
||||
|
||||
})
|
||||
|
||||
result
|
||||
.joinWith(orcidDnet, result("id").equalTo(orcidDnet("dnet_id")), "left")
|
||||
.map {
|
||||
case (r: T, null) =>
|
||||
r
|
||||
case (p: T, r: Row) =>
|
||||
p.setAuthor(AuthorMerger.enrichOrcid(p.getAuthor, AuthorEnricher.toOAFAuthor(r)))
|
||||
p
|
||||
}(enc)
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath)
|
||||
}
|
||||
|
||||
private def generateOrcidTable(spark: SparkSession, inputPath: String): Dataset[Row] = {
|
||||
private def createTemporaryData(graphPath: String, orcidPath: String, targetPath: String): Unit = {
|
||||
val orcidAuthors =
|
||||
spark.read.load(s"$inputPath/Authors").select("orcid", "familyName", "givenName", "creditName", "otherNames")
|
||||
spark.read.load(s"$orcidPath/Authors").select("orcid", "familyName", "givenName", "creditName", "otherNames")
|
||||
|
||||
val orcidWorks = spark.read
|
||||
.load(s"$inputPath/Works")
|
||||
.load(s"$orcidPath/Works")
|
||||
.select(col("orcid"), explode(col("pids")).alias("identifier"))
|
||||
.where(
|
||||
"identifier.schema IN('doi','pmid','pmc','arxiv','handle')"
|
||||
"identifier.schema IN('doi','pmid','pmc','arxiv','handle')" // scopus eid ?
|
||||
)
|
||||
val orcidPublication = orcidAuthors
|
||||
.join(orcidWorks, orcidAuthors("orcid").equalTo(orcidWorks("orcid")))
|
||||
|
||||
val orcidWorksWithAuthors = orcidAuthors
|
||||
.join(orcidWorks, Seq("orcid"))
|
||||
.select(
|
||||
col("identifier.schema").alias("schema"),
|
||||
col("identifier.value").alias("value"),
|
||||
struct(orcidAuthors("orcid").alias("orcid"), col("givenName"), col("familyName")).alias("author")
|
||||
lower(col("identifier.schema")).alias("pid_schema"),
|
||||
lower(col("identifier.value")).alias("pid_value"),
|
||||
struct(
|
||||
col("orcid"),
|
||||
col("givenName"),
|
||||
col("familyName"),
|
||||
col("creditName"),
|
||||
col("otherNames")
|
||||
).alias("author")
|
||||
)
|
||||
orcidPublication.cache()
|
||||
.cache()
|
||||
|
||||
ModelSupport.entityTypes.asScala
|
||||
.filter(e => ModelSupport.isResult(e._1))
|
||||
.foreach(e => {
|
||||
val resultType = e._1.name()
|
||||
val enc = Encoders.bean(e._2)
|
||||
|
||||
val oaEntities = spark.read
|
||||
.schema(enc.schema)
|
||||
.json(s"$graphPath/$resultType")
|
||||
.select(col("id"), col("datainfo"), col("instance"))
|
||||
.where("datainfo.deletedbyinference != true")
|
||||
.drop("datainfo")
|
||||
.withColumn("instances", explode(col("instance")))
|
||||
.withColumn("pids", explode(col("instances.pid")))
|
||||
.select(
|
||||
lower(col("pids.qualifier.classid")).alias("pid_schema"),
|
||||
lower(col("pids.value")).alias("pid_value"),
|
||||
col("id")
|
||||
)
|
||||
|
||||
val orcidDnet = orcidWorksWithAuthors
|
||||
.join(
|
||||
oaEntities,
|
||||
Seq("pid_schema", "pid_value"),
|
||||
"inner"
|
||||
)
|
||||
.groupBy(col("id"))
|
||||
.agg(collect_set(col("author")).alias("orcid_authors"))
|
||||
.select("id", "orcid_authors")
|
||||
|
||||
val result =
|
||||
spark.read.schema(enc.schema).json(s"$graphPath/$resultType").selectExpr("id", "author as graph_authors")
|
||||
|
||||
result
|
||||
.join(orcidDnet, Seq("id"))
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.parquet(s"$targetPath/${resultType}_unmatched")
|
||||
})
|
||||
|
||||
orcidWorksWithAuthors.unpersist()
|
||||
}
|
||||
|
||||
private def analisys(targetPath: String): Unit = {
|
||||
ModelSupport.entityTypes.asScala
|
||||
.filter(e => ModelSupport.isResult(e._1))
|
||||
.foreach(e => {
|
||||
val resultType = e._1.name()
|
||||
|
||||
spark.read
|
||||
.parquet(s"$targetPath/${resultType}_unmatched")
|
||||
.where("size(graph_authors) > 0")
|
||||
.as[MatchData](Encoders.bean(classOf[MatchData]))
|
||||
.map(md => {
|
||||
ORCIDAuthorEnricher.enrichOrcid(md.id, md.graph_authors, md.orcid_authors)
|
||||
})(Encoders.bean(classOf[ORCIDAuthorEnricherResult]))
|
||||
.write
|
||||
.option("compression", "gzip")
|
||||
.mode("overwrite")
|
||||
.parquet(s"$targetPath/${resultType}_matched")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
|
||||
package eu.dnetlib.oa.merge;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
package eu.dnetlib.dhp.enrich.orcid;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
|
@ -14,10 +13,9 @@ import org.junit.platform.commons.util.StringUtils;
|
|||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
|
||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||
|
||||
public class AuthorMergerTest {
|
||||
public class ORCIDAuthorEnricherTest {
|
||||
|
||||
@Test
|
||||
public void testEnrcichAuthor() throws Exception {
|
||||
|
@ -26,12 +24,13 @@ public class AuthorMergerTest {
|
|||
BufferedReader pr = new BufferedReader(new InputStreamReader(
|
||||
Objects
|
||||
.requireNonNull(
|
||||
AuthorMergerTest.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/oa/merge/authors_publication_sample.json"))));
|
||||
ORCIDAuthorEnricherTest.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/enrich/orcid/authors_publication_sample.json"))));
|
||||
BufferedReader or = new BufferedReader(new InputStreamReader(
|
||||
Objects
|
||||
.requireNonNull(
|
||||
AuthorMergerTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/merge/authors_orcid_sample.json"))));
|
||||
ORCIDAuthorEnricherTest.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/enrich/orcid/authors_orcid_sample.json"))));
|
||||
|
||||
TypeReference<List<Author>> aclass = new TypeReference<List<Author>>() {
|
||||
};
|
||||
|
@ -67,7 +66,8 @@ public class AuthorMergerTest {
|
|||
long start = System.currentTimeMillis();
|
||||
|
||||
// final List<Author> enrichedList = AuthorMerger.enrichOrcid(publicationAuthors, orcidAuthors);
|
||||
final List<Author> enrichedList = AuthorMerger.enrichOrcid(publicationAuthors, orcidAuthors);
|
||||
final List<Author> enrichedList = Collections.emptyList(); // SparkEnrichGraphWithOrcidAuthors.enrichOrcid(publicationAuthors,
|
||||
// orcidAuthors);
|
||||
|
||||
long enrichedAuthorWithPid = enrichedList
|
||||
.stream()
|
||||
|
@ -91,24 +91,4 @@ public class AuthorMergerTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkSimilarityTest() {
|
||||
final Author left = new Author();
|
||||
left.setName("Anand");
|
||||
left.setSurname("Rachna");
|
||||
left.setFullname("Anand, Rachna");
|
||||
|
||||
System.out.println(AuthorMerger.normalizeFullName(left.getFullname()));
|
||||
|
||||
final Author right = new Author();
|
||||
right.setName("Rachna");
|
||||
right.setSurname("Anand");
|
||||
right.setFullname("Rachna, Anand");
|
||||
// System.out.println(AuthorMerger.normalize(right.getFullname()));
|
||||
boolean same = AuthorMerger.checkORCIDSimilarity(left, right);
|
||||
|
||||
assertTrue(same);
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package eu.dnetlib.dhp.enrich.orcid
|
||||
|
||||
import eu.dnetlib.dhp.enrich.orcid.ORCIDAuthorMatchers.matchOrderedTokenAndAbbreviations
|
||||
import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
|
||||
import org.junit.jupiter.api.Test
|
||||
|
||||
class ORCIDAuthorMatchersTest {
|
||||
|
||||
@Test def testShortNames(): Unit = {
|
||||
assertTrue(matchOrderedTokenAndAbbreviations("Lasagni Mariozzi Federico", "Lasagni F. Mariozzi"))
|
||||
}
|
||||
|
||||
@Test def testInvertedNames(): Unit = {
|
||||
assertTrue(matchOrderedTokenAndAbbreviations("Andrea, Paolo Marcello", "Marcello Paolo, Andrea"))
|
||||
}
|
||||
|
||||
@Test def testHomonymy(): Unit = {
|
||||
assertTrue(matchOrderedTokenAndAbbreviations("Jang Myung Lee", "J Lee"))
|
||||
}
|
||||
|
||||
@Test def testAmbiguousShortNames(): Unit = {
|
||||
assertFalse(matchOrderedTokenAndAbbreviations("P. Mariozzi", "M. Paolozzi"))
|
||||
}
|
||||
|
||||
@Test def testNonMatches(): Unit = {
|
||||
assertFalse(matchOrderedTokenAndAbbreviations("Giovanni Paolozzi", "Francesco Paolozzi"))
|
||||
assertFalse(matchOrderedTokenAndAbbreviations("G. Paolozzi", "F. Paolozzi"))
|
||||
}
|
||||
|
||||
@Test def testChineseNames(): Unit = {
|
||||
assertTrue(matchOrderedTokenAndAbbreviations("孙林 Sun Lin", "Sun Lin"))
|
||||
// assertTrue(AuthorsMatchRevised.compare("孙林 Sun Lin", "孙林")); // not yet implemented
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue