[orcidenrichment] refactoring
This commit is contained in:
parent
07a51c7361
commit
939c84ede6
|
@ -0,0 +1,5 @@
|
|||
package eu.dnetlib.dhp.common.enrichment;
|
||||
|
||||
public class Constants {
|
||||
public static final String PROPAGATION_DATA_INFO_TYPE = "propagation";
|
||||
}
|
|
@ -1,11 +1,12 @@
|
|||
package eu.dnetlib.dhp.common.author
|
||||
|
||||
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport
|
||||
import eu.dnetlib.dhp.schema.common.{ModelConstants, ModelSupport}
|
||||
import eu.dnetlib.dhp.utils.{MatchData, ORCIDAuthorEnricher, ORCIDAuthorEnricherResult}
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
import eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
|
@ -24,13 +25,18 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str
|
|||
log.info(s"targetPath is '$targetPath'")
|
||||
val workingDir = parser.get("workingDir")
|
||||
log.info(s"targetPath is '$workingDir'")
|
||||
val classid = Option(parser.get("matchingSource")).map(_=>ModelConstants.ORCID_PENDING).getOrElse(ModelConstants.ORCID)
|
||||
|
||||
createTemporaryData(graphPath, orcidPath, workingDir)
|
||||
analisys(workingDir)
|
||||
generateGraph(graphPath, workingDir, targetPath)
|
||||
log.info(s"classid is '$classid'")
|
||||
val provenance = Option(parser.get("matchingSource")).map(_=>PROPAGATION_DATA_INFO_TYPE).getOrElse("ORCID_ENRICHMENT")
|
||||
log.info(s"targetPath is '$workingDir'")
|
||||
|
||||
createTemporaryData(spark, graphPath, orcidPath, workingDir)
|
||||
analisys(workingDir,classid,provenance)
|
||||
generateGraph(spark, graphPath, workingDir, targetPath)
|
||||
}
|
||||
|
||||
private def generateGraph(graphPath: String, workingDir: String, targetPath: String): Unit = {
|
||||
private def generateGraph(spark: SparkSession, graphPath: String, workingDir: String, targetPath: String): Unit = {
|
||||
|
||||
ModelSupport.entityTypes.asScala
|
||||
.filter(e => ModelSupport.isResult(e._1))
|
||||
|
@ -64,7 +70,7 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str
|
|||
|
||||
def createTemporaryData(spark: SparkSession, graphPath: String, orcidPath: String, targetPath: String): Unit
|
||||
|
||||
private def analisys(targetPath: String): Unit = {
|
||||
private def analisys(targetPath: String, classid:String, provenance:String): Unit = {
|
||||
ModelSupport.entityTypes.asScala
|
||||
.filter(e => ModelSupport.isResult(e._1))
|
||||
.foreach(e => {
|
||||
|
@ -75,7 +81,7 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str
|
|||
.where("size(graph_authors) > 0")
|
||||
.as[MatchData](Encoders.bean(classOf[MatchData]))
|
||||
.map(md => {
|
||||
ORCIDAuthorEnricher.enrichOrcid(md.id, md.graph_authors, md.orcid_authors)
|
||||
ORCIDAuthorEnricher.enrichOrcid(md.id, md.graph_authors, md.orcid_authors, classid, provenance)
|
||||
})(Encoders.bean(classOf[ORCIDAuthorEnricherResult]))
|
||||
.write
|
||||
.option("compression", "gzip")
|
||||
|
|
|
@ -46,7 +46,9 @@ object ORCIDAuthorEnricher extends Serializable {
|
|||
def enrichOrcid(
|
||||
id: String,
|
||||
graph_authors: java.util.List[Author],
|
||||
orcid_authors: java.util.List[OrcidAuthor]
|
||||
orcid_authors: java.util.List[OrcidAuthor],
|
||||
classid:String,
|
||||
provenance:String
|
||||
): ORCIDAuthorEnricherResult = {
|
||||
// Author enriching strategy:
|
||||
// 1) create a copy of graph author list in unmatched_authors
|
||||
|
@ -64,7 +66,9 @@ object ORCIDAuthorEnricher extends Serializable {
|
|||
orcid_authors,
|
||||
(author, orcid) =>
|
||||
AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.givenName + " " + orcid.familyName),
|
||||
"fullName"
|
||||
"fullName",
|
||||
classid,
|
||||
provenance
|
||||
) ++
|
||||
// Look after exact reversed fullname match, reconstruct ORCID fullname as familyName + givenName
|
||||
extractAndEnrichMatches(
|
||||
|
@ -72,7 +76,9 @@ object ORCIDAuthorEnricher extends Serializable {
|
|||
orcid_authors,
|
||||
(author, orcid) =>
|
||||
AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.familyName + " " + orcid.givenName),
|
||||
"reversedFullName"
|
||||
"reversedFullName",
|
||||
classid,
|
||||
provenance
|
||||
) ++
|
||||
// split author names in tokens, order the tokens, then check for matches of full tokens or abbreviations
|
||||
extractAndEnrichMatches(
|
||||
|
@ -81,14 +87,18 @@ object ORCIDAuthorEnricher extends Serializable {
|
|||
(author, orcid) =>
|
||||
AuthorMatchers
|
||||
.matchOrderedTokenAndAbbreviations(author.getFullname, orcid.givenName + " " + orcid.familyName),
|
||||
"orderedTokens"
|
||||
"orderedTokens",
|
||||
classid,
|
||||
provenance
|
||||
) ++
|
||||
// look after exact matches of ORCID creditName
|
||||
extractAndEnrichMatches(
|
||||
unmatched_authors,
|
||||
orcid_authors,
|
||||
(author, orcid) => AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.creditName),
|
||||
"creditName"
|
||||
"creditName",
|
||||
classid,
|
||||
provenance
|
||||
) ++
|
||||
// look after exact matches in ORCID otherNames
|
||||
extractAndEnrichMatches(
|
||||
|
@ -96,7 +106,9 @@ object ORCIDAuthorEnricher extends Serializable {
|
|||
orcid_authors,
|
||||
(author, orcid) =>
|
||||
orcid.otherNames != null && AuthorMatchers.matchOtherNames(author.getFullname, orcid.otherNames.asScala),
|
||||
"otherNames"
|
||||
"otherNames",
|
||||
classid,
|
||||
provenance
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -107,7 +119,9 @@ object ORCIDAuthorEnricher extends Serializable {
|
|||
graph_authors: java.util.List[Author],
|
||||
orcid_authors: java.util.List[OrcidAuthor],
|
||||
matchingFunc: (Author, OrcidAuthor) => Boolean,
|
||||
matchName: String
|
||||
matchName: String,
|
||||
classid:String,
|
||||
provenance : String
|
||||
) = {
|
||||
val matched = scala.collection.mutable.ArrayBuffer.empty[MatchedAuthors]
|
||||
|
||||
|
@ -131,10 +145,12 @@ object ORCIDAuthorEnricher extends Serializable {
|
|||
author.setPid(new util.ArrayList[StructuredProperty]())
|
||||
}
|
||||
|
||||
val orcidPID = OafUtils.createSP(orcid.orcid, ModelConstants.ORCID, ModelConstants.ORCID)
|
||||
val orcidPID = OafUtils.createSP(orcid.orcid, classid, classid)
|
||||
//val orcidPID = OafUtils.createSP(orcid.orcid, ModelConstants.ORCID, ModelConstants.ORCID)
|
||||
orcidPID.setDataInfo(OafUtils.generateDataInfo())
|
||||
orcidPID.getDataInfo.setProvenanceaction(
|
||||
OafUtils.createQualifier("ORCID_ENRICHMENT", "ORCID_ENRICHMENT")
|
||||
//OafUtils.createQualifier("ORCID_ENRICHMENT", "ORCID_ENRICHMENT")
|
||||
OafUtils.createQualifier(provenance, provenance)
|
||||
)
|
||||
|
||||
author.getPid.add(orcidPID)
|
||||
|
|
|
@ -21,6 +21,8 @@ import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
|||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
|
||||
import static eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE;
|
||||
|
||||
public class PropagationConstant {
|
||||
|
||||
private PropagationConstant() {
|
||||
|
@ -46,7 +48,7 @@ public class PropagationConstant {
|
|||
|
||||
public static final String INSTITUTIONAL_REPO_TYPE = "institutional";
|
||||
|
||||
public static final String PROPAGATION_DATA_INFO_TYPE = "propagation";
|
||||
//public static final String PROPAGATION_DATA_INFO_TYPE = "propagation";
|
||||
|
||||
public static final String TRUE = "true";
|
||||
|
||||
|
|
|
@ -8,8 +8,6 @@ import eu.dnetlib.dhp.schema.oaf.Author;
|
|||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.utils.OrcidAuthor;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.*;
|
||||
|
@ -19,6 +17,8 @@ import scala.Tuple2;
|
|||
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
|
@ -39,20 +39,25 @@ public class SparkPropagateOrcidAuthor extends SparkEnrichWithOrcidAuthors {
|
|||
|
||||
private static OrcidAuthors getOrcidAuthorsList(List<Author> authors) {
|
||||
OrcidAuthors oas = new OrcidAuthors();
|
||||
List<OrcidAuthor> tmp = authors.stream().map(SparkPropagateOrcidAuthor::getOrcidAuthor).collect(Collectors.toList());
|
||||
List<OrcidAuthor> tmp = authors.stream().map(SparkPropagateOrcidAuthor::getOrcidAuthor)
|
||||
.filter(Objects::nonNull).collect(Collectors.toList());
|
||||
oas.setOrcidAuthorList(tmp);
|
||||
return oas;
|
||||
}
|
||||
|
||||
private static OrcidAuthor getOrcidAuthor(Author a){
|
||||
return new OrcidAuthor(getOrcid(a),a.getSurname(), a.getName(), a.getFullname(), null);
|
||||
return Optional.ofNullable(getOrcid(a))
|
||||
.map(orcid -> new OrcidAuthor(orcid,a.getSurname(), a.getName(), a.getFullname(), null))
|
||||
.orElse(null);
|
||||
|
||||
}
|
||||
|
||||
private static String getOrcid(Author a){
|
||||
if (a.getPid().stream().anyMatch(p->p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID)))
|
||||
return a.getPid().stream().filter(p->p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID)).findFirst().get().getValue();
|
||||
return a.getPid().stream().filter(p->p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID_PENDING)).findFirst().get().getValue();
|
||||
if (a.getPid().stream().anyMatch(p->p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID_PENDING)))
|
||||
return a.getPid().stream().filter(p->p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID_PENDING)).findFirst().get().getValue();
|
||||
return null;
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue