WIP: [OrcidPropagation] Make ORCID enrichment/propagation code more generic and reusable #501
|
@ -0,0 +1,9 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.common.enrichment;
|
||||||
|
|
||||||
|
public class Constants {
|
||||||
|
public static final String PROPAGATION_DATA_INFO_TYPE = "propagation";
|
||||||
|
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID = "authorpid:result";
|
||||||
|
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME = "Propagation of authors pid to result through semantic relations";
|
||||||
|
|
||||||
|
}
|
|
@ -621,7 +621,7 @@ public class MergeUtils {
|
||||||
return m;
|
return m;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<Author> mergeAuthors(List<Author> author, List<Author> author1, int trust) {
|
public static List<Author> mergeAuthors(List<Author> author, List<Author> author1, int trust) {
|
||||||
List<List<Author>> authors = new ArrayList<>();
|
List<List<Author>> authors = new ArrayList<>();
|
||||||
if (author != null) {
|
if (author != null) {
|
||||||
authors.add(author);
|
authors.add(author);
|
||||||
|
|
|
@ -0,0 +1,134 @@
|
||||||
|
package eu.dnetlib.dhp.common.author
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
||||||
|
import eu.dnetlib.dhp.schema.common.{EntityType, ModelConstants, ModelSupport}
|
||||||
|
import eu.dnetlib.dhp.utils.{MatchData, ORCIDAuthorEnricher, ORCIDAuthorEnricherResult}
|
||||||
|
import org.apache.spark.sql._
|
||||||
|
import org.apache.spark.sql.functions._
|
||||||
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
import eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.{OafEntity, Result}
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils
|
||||||
|
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
|
abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[String], log: Logger)
|
||||||
|
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
|
||||||
|
|
||||||
|
/** Here all the spark applications runs this method
|
||||||
|
* where the whole logic of the spark node is defined
|
||||||
|
*/
|
||||||
|
override def run(): Unit = {
|
||||||
|
val graphPath = parser.get("graphPath")
|
||||||
|
log.info(s"graphPath is '$graphPath'")
|
||||||
|
val orcidPath = parser.get("orcidPath")
|
||||||
|
log.info(s"orcidPath is '$orcidPath'")
|
||||||
|
val targetPath = parser.get("targetPath")
|
||||||
|
log.info(s"targetPath is '$targetPath'")
|
||||||
|
val workingDir = parser.get("workingDir")
|
||||||
|
log.info(s"targetPath is '$workingDir'")
|
||||||
|
val classid =
|
||||||
|
Option(parser.get("matchingSource")).map(_ => ModelConstants.ORCID_PENDING).getOrElse(ModelConstants.ORCID)
|
||||||
|
|
||||||
|
log.info(s"classid is '$classid'")
|
||||||
|
val provenance =
|
||||||
|
Option(parser.get("matchingSource")).map(_ => PROPAGATION_DATA_INFO_TYPE).getOrElse("ORCID_ENRICHMENT")
|
||||||
|
log.info(s"targetPath is '$workingDir'")
|
||||||
|
|
||||||
|
createTemporaryData(spark, graphPath, orcidPath, workingDir)
|
||||||
|
analisys(workingDir, classid, provenance)
|
||||||
|
generateGraph(spark, graphPath, workingDir, targetPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def processAndMerge(
|
||||||
|
spark: SparkSession,
|
||||||
|
inputPath: String,
|
||||||
|
outputPath: String,
|
||||||
|
clazz: Class[Result],
|
||||||
|
encoder: Encoder[Result]
|
||||||
|
): Unit = {
|
||||||
|
var tmp = spark.read
|
||||||
|
.schema(Encoders.bean(clazz).schema)
|
||||||
|
.json(inputPath)
|
||||||
|
.as(encoder)
|
||||||
|
tmp
|
||||||
|
.groupByKey(r => r.getId)(Encoders.STRING)
|
||||||
|
.mapGroups((k, it) => {
|
||||||
|
val p: Result = it.next
|
||||||
|
|
||||||
|
it.foldLeft(p.getAuthor)((x, r) => MergeUtils.mergeAuthors(x, r.getAuthor, 0))
|
||||||
|
p
|
||||||
|
|
||||||
|
})(encoder)
|
||||||
|
.write
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def generateGraph(spark: SparkSession, graphPath: String, workingDir: String, targetPath: String): Unit = {
|
||||||
|
ModelSupport.entityTypes
|
||||||
|
.keySet()
|
||||||
|
.asScala
|
||||||
|
.filter(ModelSupport.isResult)
|
||||||
|
.foreach((e: EntityType) => {
|
||||||
|
val resultClazz: Class[Result] = ModelSupport.entityTypes.get(e).asInstanceOf[Class[Result]]
|
||||||
|
val matched: Dataset[Row] = spark.read
|
||||||
|
.schema(Encoders.bean(classOf[ORCIDAuthorEnricherResult]).schema)
|
||||||
|
.parquet(workingDir + "/" + e.name + "_matched")
|
||||||
|
.selectExpr("id", "enriched_author")
|
||||||
|
|
||||||
|
val result: Dataset[Row] = spark.read
|
||||||
|
.schema(Encoders.bean(resultClazz).schema)
|
||||||
|
.json(graphPath + "/" + e.name)
|
||||||
|
|
||||||
|
result
|
||||||
|
.join(matched, Seq("id"), "left")
|
||||||
|
.withColumn(
|
||||||
|
"author",
|
||||||
|
when(size(col("enriched_author")).gt(0), col("enriched_author"))
|
||||||
|
.otherwise(col("author"))
|
||||||
|
)
|
||||||
|
.drop("enriched_author")
|
||||||
|
.as(Encoders.bean(resultClazz))
|
||||||
|
.groupByKey(r => r.getId)(Encoders.STRING)
|
||||||
|
.mapGroups((k, it) => {
|
||||||
|
val p: Result = it.next
|
||||||
|
|
||||||
|
p.setAuthor(it.foldLeft(p.getAuthor)((x, r) => MergeUtils.mergeAuthors(x, r.getAuthor, 0)))
|
||||||
|
|
||||||
|
p
|
||||||
|
|
||||||
|
})(Encoders.bean(resultClazz))
|
||||||
|
.write
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(targetPath + "/" + e.name)
|
||||||
|
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
def createTemporaryData(spark: SparkSession, graphPath: String, orcidPath: String, targetPath: String): Unit
|
||||||
|
|
||||||
|
private def analisys(targetPath: String, classid: String, provenance: String): Unit = {
|
||||||
|
ModelSupport.entityTypes.asScala
|
||||||
|
.filter(e => ModelSupport.isResult(e._1))
|
||||||
|
.foreach(e => {
|
||||||
|
val resultType = e._1.name()
|
||||||
|
val c = classid
|
||||||
|
val p = provenance
|
||||||
|
|
||||||
|
spark.read
|
||||||
|
.parquet(s"$targetPath/${resultType}_unmatched")
|
||||||
|
.where("size(graph_authors) > 0")
|
||||||
|
.as[MatchData](Encoders.bean(classOf[MatchData]))
|
||||||
|
.map(md => {
|
||||||
|
ORCIDAuthorEnricher.enrichOrcid(md.id, md.graph_authors, md.orcid_authors, c, p)
|
||||||
|
})(Encoders.bean(classOf[ORCIDAuthorEnricherResult]))
|
||||||
|
.write
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.mode("overwrite")
|
||||||
|
.parquet(s"$targetPath/${resultType}_matched")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,11 +1,12 @@
|
||||||
package eu.dnetlib.pace.util
|
package eu.dnetlib.dhp.utils
|
||||||
|
|
||||||
|
import java.text.Normalizer
|
||||||
import java.util.Locale
|
import java.util.Locale
|
||||||
import java.util.regex.Pattern
|
import java.util.regex.Pattern
|
||||||
import scala.util.control.Breaks.{break, breakable}
|
import scala.util.control.Breaks.{break, breakable}
|
||||||
|
|
||||||
object AuthorMatchers {
|
object AuthorMatchers {
|
||||||
val SPLIT_REGEX = Pattern.compile("[\\s,\\.]+")
|
val SPLIT_REGEX = Pattern.compile("[\\s\\p{Punct}\\p{Pd}]+")
|
||||||
|
|
||||||
val WORD_DIFF = 2
|
val WORD_DIFF = 2
|
||||||
|
|
||||||
|
@ -24,9 +25,16 @@ object AuthorMatchers {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def normalize(s: String): Array[String] = {
|
||||||
|
SPLIT_REGEX
|
||||||
|
.split(Normalizer.normalize(s, Normalizer.Form.NFC).toLowerCase(Locale.ROOT))
|
||||||
|
.filter(_.nonEmpty)
|
||||||
|
.sorted
|
||||||
|
}
|
||||||
|
|
||||||
def matchOrderedTokenAndAbbreviations(a1: String, a2: String): Boolean = {
|
def matchOrderedTokenAndAbbreviations(a1: String, a2: String): Boolean = {
|
||||||
val p1: Array[String] = SPLIT_REGEX.split(a1.trim.toLowerCase(Locale.ROOT)).filter(_.nonEmpty).sorted
|
val p1: Array[String] = normalize(a1)
|
||||||
val p2: Array[String] = SPLIT_REGEX.split(a2.trim.toLowerCase(Locale.ROOT)).filter(_.nonEmpty).sorted
|
val p2: Array[String] = normalize(a2)
|
||||||
|
|
||||||
if (p1.length < 2 || p2.length < 2) return false
|
if (p1.length < 2 || p2.length < 2) return false
|
||||||
if (Math.abs(p1.length - p2.length) > WORD_DIFF) return false // use alternative comparison algo
|
if (Math.abs(p1.length - p2.length) > WORD_DIFF) return false // use alternative comparison algo
|
||||||
|
@ -73,7 +81,6 @@ object AuthorMatchers {
|
||||||
removeMatches(graph_authors, orcid_authors, (a, b) => matchingFunc(a, b))
|
removeMatches(graph_authors, orcid_authors, (a, b) => matchingFunc(a, b))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def removeMatches(
|
def removeMatches(
|
||||||
graph_authors: java.util.List[String],
|
graph_authors: java.util.List[String],
|
||||||
orcid_authors: java.util.List[String],
|
orcid_authors: java.util.List[String],
|
|
@ -0,0 +1,197 @@
|
||||||
|
package eu.dnetlib.dhp.utils
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.{Author, StructuredProperty}
|
||||||
|
import eu.dnetlib.dhp.schema.sx.OafUtils
|
||||||
|
|
||||||
|
import java.util
|
||||||
|
import scala.beans.BeanProperty
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
import scala.collection.mutable.ArrayBuffer
|
||||||
|
import scala.util.control.Breaks.{break, breakable}
|
||||||
|
import eu.dnetlib.dhp.common.enrichment.Constants._
|
||||||
|
|
||||||
|
case class OrcidAuthor(
|
||||||
|
@BeanProperty var orcid: String,
|
||||||
|
@BeanProperty var familyName: String,
|
||||||
|
@BeanProperty var givenName: String,
|
||||||
|
@BeanProperty var creditName: String,
|
||||||
|
@BeanProperty var otherNames: java.util.List[String]
|
||||||
|
) {
|
||||||
|
def this() = this("null", "null", "null", "null", null)
|
||||||
|
}
|
||||||
|
|
||||||
|
case class MatchedAuthors(
|
||||||
|
@BeanProperty var author: Author,
|
||||||
|
@BeanProperty var orcid: OrcidAuthor,
|
||||||
|
@BeanProperty var `type`: String
|
||||||
|
)
|
||||||
|
|
||||||
|
case class MatchData(
|
||||||
|
@BeanProperty var id: String,
|
||||||
|
@BeanProperty var graph_authors: java.util.List[Author],
|
||||||
|
@BeanProperty var orcid_authors: java.util.List[OrcidAuthor]
|
||||||
|
) {
|
||||||
|
def this() = this("null", null, null)
|
||||||
|
}
|
||||||
|
|
||||||
|
case class ORCIDAuthorEnricherResult(
|
||||||
|
@BeanProperty var id: String,
|
||||||
|
@BeanProperty var enriched_author: java.util.List[Author],
|
||||||
|
@BeanProperty var author_matched: java.util.List[MatchedAuthors],
|
||||||
|
@BeanProperty var author_unmatched: java.util.List[Author],
|
||||||
|
@BeanProperty var orcid_unmatched: java.util.List[OrcidAuthor]
|
||||||
|
)
|
||||||
|
|
||||||
|
object ORCIDAuthorEnricher extends Serializable {
|
||||||
|
|
||||||
|
def enrichOrcid(
|
||||||
|
id: String,
|
||||||
|
graph_authors: java.util.List[Author],
|
||||||
|
orcid_authors: java.util.List[OrcidAuthor],
|
||||||
|
classid: String,
|
||||||
|
provenance: String
|
||||||
|
): ORCIDAuthorEnricherResult = {
|
||||||
|
// Author enriching strategy:
|
||||||
|
// 1) create a copy of graph author list in unmatched_authors
|
||||||
|
// 2) find best match in unmatched_authors, remove it from unmatched_authors and enrich it so
|
||||||
|
// that the enrichment is reflected in graph_authors (they share author instances).
|
||||||
|
// Do not match in case of ambiguity: two authors match and at least one of them has affiliation string
|
||||||
|
// 3) repeat (2) till the end of the list and then with different matching algorithms that have decreasing
|
||||||
|
// trust in their output
|
||||||
|
// At the end unmatched_authors will contain authors not matched with any of the matching algos
|
||||||
|
val unmatched_authors = new util.ArrayList[Author](graph_authors)
|
||||||
|
|
||||||
|
val matches = {
|
||||||
|
// Look after exact fullname match, reconstruct ORCID fullname as givenName + familyName
|
||||||
|
extractAndEnrichMatches(
|
||||||
|
unmatched_authors,
|
||||||
|
orcid_authors,
|
||||||
|
(author, orcid) =>
|
||||||
|
AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.givenName + " " + orcid.familyName),
|
||||||
|
"fullName",
|
||||||
|
classid,
|
||||||
|
provenance
|
||||||
|
) ++
|
||||||
|
// Look after exact reversed fullname match, reconstruct ORCID fullname as familyName + givenName
|
||||||
|
extractAndEnrichMatches(
|
||||||
|
unmatched_authors,
|
||||||
|
orcid_authors,
|
||||||
|
(author, orcid) =>
|
||||||
|
AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.familyName + " " + orcid.givenName),
|
||||||
|
"reversedFullName",
|
||||||
|
classid,
|
||||||
|
provenance
|
||||||
|
) ++
|
||||||
|
// split author names in tokens, order the tokens, then check for matches of full tokens or abbreviations
|
||||||
|
extractAndEnrichMatches(
|
||||||
|
unmatched_authors,
|
||||||
|
orcid_authors,
|
||||||
|
(author, orcid) =>
|
||||||
|
AuthorMatchers
|
||||||
|
.matchOrderedTokenAndAbbreviations(author.getFullname, orcid.givenName + " " + orcid.familyName),
|
||||||
|
"orderedTokens-1",
|
||||||
|
classid,
|
||||||
|
provenance,
|
||||||
|
skipAmbiguities = true
|
||||||
|
) ++
|
||||||
|
// split author names in tokens, order the tokens, then check for matches of full tokens or abbreviations
|
||||||
|
extractAndEnrichMatches(
|
||||||
|
unmatched_authors,
|
||||||
|
orcid_authors,
|
||||||
|
(author, orcid) =>
|
||||||
|
AuthorMatchers
|
||||||
|
.matchOrderedTokenAndAbbreviations(author.getFullname, orcid.givenName + " " + orcid.familyName),
|
||||||
|
"orderedTokens-2",
|
||||||
|
classid,
|
||||||
|
provenance
|
||||||
|
) ++
|
||||||
|
// look after exact matches of ORCID creditName
|
||||||
|
extractAndEnrichMatches(
|
||||||
|
unmatched_authors,
|
||||||
|
orcid_authors,
|
||||||
|
(author, orcid) => AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.creditName),
|
||||||
|
"creditName",
|
||||||
|
classid,
|
||||||
|
provenance
|
||||||
|
) ++
|
||||||
|
// look after exact matches in ORCID otherNames
|
||||||
|
extractAndEnrichMatches(
|
||||||
|
unmatched_authors,
|
||||||
|
orcid_authors,
|
||||||
|
(author, orcid) =>
|
||||||
|
orcid.otherNames != null && AuthorMatchers.matchOtherNames(author.getFullname, orcid.otherNames.asScala),
|
||||||
|
"otherNames",
|
||||||
|
classid,
|
||||||
|
provenance
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
ORCIDAuthorEnricherResult(id, graph_authors, matches.asJava, unmatched_authors, orcid_authors)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def extractAndEnrichMatches(
|
||||||
|
unmatched_authors: java.util.List[Author],
|
||||||
|
orcid_authors: java.util.List[OrcidAuthor],
|
||||||
|
matchingFunc: (Author, OrcidAuthor) => Boolean,
|
||||||
|
matchName: String,
|
||||||
|
classid: String,
|
||||||
|
provenance: String,
|
||||||
|
skipAmbiguities: Boolean = false
|
||||||
|
): ArrayBuffer[MatchedAuthors] = {
|
||||||
|
val matched = ArrayBuffer.empty[MatchedAuthors]
|
||||||
|
|
||||||
|
if (unmatched_authors == null || unmatched_authors.isEmpty) {
|
||||||
|
return matched
|
||||||
|
}
|
||||||
|
|
||||||
|
val oit = orcid_authors.iterator
|
||||||
|
while (oit.hasNext) {
|
||||||
|
val orcid = oit.next()
|
||||||
|
val candidates = unmatched_authors.asScala.foldLeft(ArrayBuffer[Author]())((res, author) => {
|
||||||
|
if (matchingFunc(author, orcid)) {
|
||||||
|
res += author
|
||||||
|
}
|
||||||
|
|
||||||
|
res
|
||||||
|
})
|
||||||
|
|
||||||
|
if (
|
||||||
|
candidates.size == 1 ||
|
||||||
|
(candidates.size > 1 && !skipAmbiguities && !candidates
|
||||||
|
.exists(a => a.getRawAffiliationString != null && !a.getRawAffiliationString.isEmpty))
|
||||||
|
) {
|
||||||
|
val author = candidates(0)
|
||||||
|
unmatched_authors.remove(author)
|
||||||
|
oit.remove()
|
||||||
|
matched += MatchedAuthors(author, orcid, matchName)
|
||||||
|
|
||||||
|
if (author.getPid == null) {
|
||||||
|
author.setPid(new util.ArrayList[StructuredProperty]())
|
||||||
|
}
|
||||||
|
|
||||||
|
val orcidPID = OafUtils.createSP(orcid.orcid, classid, classid)
|
||||||
|
orcidPID.setDataInfo(OafUtils.generateDataInfo())
|
||||||
|
if (provenance.equalsIgnoreCase(PROPAGATION_DATA_INFO_TYPE)) {
|
||||||
|
orcidPID.getDataInfo.setInferenceprovenance(PROPAGATION_DATA_INFO_TYPE);
|
||||||
|
orcidPID.getDataInfo.setInferred(true);
|
||||||
|
orcidPID.getDataInfo.setProvenanceaction(
|
||||||
|
OafUtils.createQualifier(
|
||||||
|
PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID,
|
||||||
|
PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME
|
||||||
|
)
|
||||||
|
)
|
||||||
|
} else
|
||||||
|
orcidPID.getDataInfo.setProvenanceaction(
|
||||||
|
OafUtils.createQualifier(provenance, provenance)
|
||||||
|
)
|
||||||
|
|
||||||
|
author.getPid.add(orcidPID)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
matched
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,10 +1,10 @@
|
||||||
package eu.dnetlib.dhp.enrich.orcid
|
package eu.dnetlib.dhp.utils
|
||||||
|
|
||||||
import eu.dnetlib.pace.util.AuthorMatchers.matchOrderedTokenAndAbbreviations
|
import eu.dnetlib.dhp.utils.AuthorMatchers.matchOrderedTokenAndAbbreviations
|
||||||
import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
|
import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
|
|
||||||
class ORCIDAuthorMatchersTest {
|
class AuthorMatchersTest {
|
||||||
|
|
||||||
@Test def testShortNames(): Unit = {
|
@Test def testShortNames(): Unit = {
|
||||||
assertTrue(matchOrderedTokenAndAbbreviations("Lasagni Mariozzi Federico", "Lasagni F. Mariozzi"))
|
assertTrue(matchOrderedTokenAndAbbreviations("Lasagni Mariozzi Federico", "Lasagni F. Mariozzi"))
|
|
@ -9,11 +9,11 @@ import java.util.stream.Collectors;
|
||||||
|
|
||||||
import com.wcohen.ss.AbstractStringDistance;
|
import com.wcohen.ss.AbstractStringDistance;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.utils.AuthorMatchers;
|
||||||
import eu.dnetlib.pace.config.Config;
|
import eu.dnetlib.pace.config.Config;
|
||||||
import eu.dnetlib.pace.model.Person;
|
import eu.dnetlib.pace.model.Person;
|
||||||
import eu.dnetlib.pace.tree.support.AbstractListComparator;
|
import eu.dnetlib.pace.tree.support.AbstractListComparator;
|
||||||
import eu.dnetlib.pace.tree.support.ComparatorClass;
|
import eu.dnetlib.pace.tree.support.ComparatorClass;
|
||||||
import eu.dnetlib.pace.util.AuthorMatchers;
|
|
||||||
|
|
||||||
@ComparatorClass("authorsMatch")
|
@ComparatorClass("authorsMatch")
|
||||||
public class AuthorsMatch extends AbstractListComparator {
|
public class AuthorsMatch extends AbstractListComparator {
|
||||||
|
|
|
@ -21,9 +21,13 @@ class DecisionTreeTest {
|
||||||
void testJPath() throws IOException {
|
void testJPath() throws IOException {
|
||||||
|
|
||||||
DedupConfig conf = DedupConfig
|
DedupConfig conf = DedupConfig
|
||||||
.load(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/dedup_conf_organization.json")));
|
.load(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/dedup_conf_organization.json")));
|
||||||
|
|
||||||
final String org = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/organization.json"));
|
final String org = IOUtils
|
||||||
|
.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/organization.json"));
|
||||||
|
|
||||||
Row row = SparkModel.apply(conf).rowFromJson(org);
|
Row row = SparkModel.apply(conf).rowFromJson(org);
|
||||||
|
|
||||||
|
@ -42,7 +46,8 @@ class DecisionTreeTest {
|
||||||
.getResourceAsStream(
|
.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
|
"/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
|
||||||
|
|
||||||
final String org = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/organization_example1.json"));
|
final String org = IOUtils
|
||||||
|
.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/organization_example1.json"));
|
||||||
|
|
||||||
Row row = SparkModel.apply(conf).rowFromJson(org);
|
Row row = SparkModel.apply(conf).rowFromJson(org);
|
||||||
// to check that the same parsing returns the same row
|
// to check that the same parsing returns the same row
|
||||||
|
|
|
@ -440,7 +440,8 @@ public class SparkDedupTest implements Serializable {
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
final List<Relation> merges = pubs
|
final List<Relation> merges = pubs
|
||||||
.filter("source == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'")// and relClass = '"+ModelConstants.MERGES+"'")
|
.filter("source == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'")// and relClass =
|
||||||
|
// '"+ModelConstants.MERGES+"'")
|
||||||
.collectAsList();
|
.collectAsList();
|
||||||
assertEquals(4, merges.size());
|
assertEquals(4, merges.size());
|
||||||
Set<String> dups = Sets
|
Set<String> dups = Sets
|
||||||
|
|
|
@ -19,9 +19,13 @@ class JsonPathTest {
|
||||||
void testJPath() throws IOException {
|
void testJPath() throws IOException {
|
||||||
|
|
||||||
DedupConfig conf = DedupConfig
|
DedupConfig conf = DedupConfig
|
||||||
.load(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/dedup_conf_organization.json")));
|
.load(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/dedup_conf_organization.json")));
|
||||||
|
|
||||||
final String org = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/organization.json"));
|
final String org = IOUtils
|
||||||
|
.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/organization.json"));
|
||||||
|
|
||||||
Row row = SparkModel.apply(conf).rowFromJson(org);
|
Row row = SparkModel.apply(conf).rowFromJson(org);
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp;
|
package eu.dnetlib.dhp;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
@ -46,7 +48,7 @@ public class PropagationConstant {
|
||||||
|
|
||||||
public static final String INSTITUTIONAL_REPO_TYPE = "institutional";
|
public static final String INSTITUTIONAL_REPO_TYPE = "institutional";
|
||||||
|
|
||||||
public static final String PROPAGATION_DATA_INFO_TYPE = "propagation";
|
// public static final String PROPAGATION_DATA_INFO_TYPE = "propagation";
|
||||||
|
|
||||||
public static final String TRUE = "true";
|
public static final String TRUE = "true";
|
||||||
|
|
||||||
|
@ -74,9 +76,6 @@ public class PropagationConstant {
|
||||||
public static final String PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_ID = "result:community:project";
|
public static final String PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_ID = "result:community:project";
|
||||||
public static final String PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_NAME = " Propagation of result belonging to community through project";
|
public static final String PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_NAME = " Propagation of result belonging to community through project";
|
||||||
|
|
||||||
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID = "authorpid:result";
|
|
||||||
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME = "Propagation of authors pid to result through semantic relations";
|
|
||||||
|
|
||||||
public static final String ITERATION_ONE = "ExitAtFirstIteration";
|
public static final String ITERATION_ONE = "ExitAtFirstIteration";
|
||||||
public static final String ITERATION_TWO = "ExitAtSecondIteration";
|
public static final String ITERATION_TWO = "ExitAtSecondIteration";
|
||||||
public static final String ITERATION_THREE = "ExitAtThirdIteration";
|
public static final String ITERATION_THREE = "ExitAtThirdIteration";
|
||||||
|
|
|
@ -1,43 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
|
||||||
|
|
||||||
public class AutoritativeAuthor {
|
|
||||||
|
|
||||||
private String name;
|
|
||||||
private String surname;
|
|
||||||
private String fullname;
|
|
||||||
private String orcid;
|
|
||||||
|
|
||||||
public String getName() {
|
|
||||||
return name;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setName(String name) {
|
|
||||||
this.name = name;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getSurname() {
|
|
||||||
return surname;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setSurname(String surname) {
|
|
||||||
this.surname = surname;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getFullname() {
|
|
||||||
return fullname;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setFullname(String fullname) {
|
|
||||||
this.fullname = fullname;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getOrcid() {
|
|
||||||
return orcid;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setOrcid(String orcid) {
|
|
||||||
this.orcid = orcid;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.utils.OrcidAuthor;
|
||||||
|
|
||||||
|
public class OrcidAuthors implements Serializable {
|
||||||
|
List<OrcidAuthor> orcidAuthorList;
|
||||||
|
|
||||||
|
public List<OrcidAuthor> getOrcidAuthorList() {
|
||||||
|
return orcidAuthorList;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOrcidAuthorList(List<OrcidAuthor> orcidAuthorList) {
|
||||||
|
this.orcidAuthorList = orcidAuthorList;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,124 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.sql.Dataset;
|
|
||||||
import org.apache.spark.sql.Encoders;
|
|
||||||
import org.apache.spark.sql.SaveMode;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.google.gson.Gson;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
|
||||||
|
|
||||||
public class PrepareResultOrcidAssociationStep1 {
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(PrepareResultOrcidAssociationStep1.class);
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
String jsonConf = IOUtils
|
|
||||||
.toString(
|
|
||||||
PrepareResultOrcidAssociationStep1.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters.json"));
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConf);
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
||||||
|
|
||||||
String inputPath = parser.get("sourcePath");
|
|
||||||
log.info("inputPath: {}", inputPath);
|
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
|
||||||
log.info("outputPath: {}", outputPath);
|
|
||||||
|
|
||||||
final String resultClassName = parser.get("resultTableName");
|
|
||||||
log.info("resultTableName: {}", resultClassName);
|
|
||||||
|
|
||||||
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
|
|
||||||
log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel));
|
|
||||||
|
|
||||||
final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase();
|
|
||||||
log.info("resultType: {}", resultType);
|
|
||||||
|
|
||||||
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
|
||||||
|
|
||||||
String inputRelationPath = inputPath + "/relation";
|
|
||||||
log.info("inputRelationPath: {}", inputRelationPath);
|
|
||||||
|
|
||||||
String inputResultPath = inputPath + "/" + resultType;
|
|
||||||
log.info("inputResultPath: {}", inputResultPath);
|
|
||||||
|
|
||||||
String outputResultPath = outputPath + "/" + resultType;
|
|
||||||
log.info("outputResultPath: {}", outputResultPath);
|
|
||||||
|
|
||||||
runWithSparkHiveSession(
|
|
||||||
conf,
|
|
||||||
isSparkSessionManaged,
|
|
||||||
spark -> {
|
|
||||||
removeOutputDir(spark, outputPath);
|
|
||||||
prepareInfo(
|
|
||||||
spark, inputRelationPath, inputResultPath, outputResultPath, resultClazz, allowedsemrel);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private static <R extends Result> void prepareInfo(
|
|
||||||
SparkSession spark,
|
|
||||||
String inputRelationPath,
|
|
||||||
String inputResultPath,
|
|
||||||
String outputResultPath,
|
|
||||||
Class<R> resultClazz,
|
|
||||||
List<String> allowedsemrel) {
|
|
||||||
|
|
||||||
Dataset<Relation> relation = readPath(spark, inputRelationPath, Relation.class);
|
|
||||||
relation.createOrReplaceTempView("relation");
|
|
||||||
|
|
||||||
log.info("Reading Graph table from: {}", inputResultPath);
|
|
||||||
Dataset<R> result = readPath(spark, inputResultPath, resultClazz);
|
|
||||||
result.createOrReplaceTempView("result");
|
|
||||||
|
|
||||||
String query = "SELECT target resultId, author authorList"
|
|
||||||
+ " FROM (SELECT id, collect_set(named_struct('name', name, 'surname', surname, 'fullname', fullname, 'orcid', orcid)) author "
|
|
||||||
+ " FROM ( "
|
|
||||||
+ " SELECT DISTINCT id, MyT.fullname, MyT.name, MyT.surname, MyP.value orcid "
|
|
||||||
+ " FROM result "
|
|
||||||
+ " LATERAL VIEW EXPLODE (author) a AS MyT "
|
|
||||||
+ " LATERAL VIEW EXPLODE (MyT.pid) p AS MyP "
|
|
||||||
+ " WHERE lower(MyP.qualifier.classid) = '" + ModelConstants.ORCID + "' or "
|
|
||||||
+ " lower(MyP.qualifier.classid) = '" + ModelConstants.ORCID_PENDING + "') tmp "
|
|
||||||
+ " GROUP BY id) r_t "
|
|
||||||
+ " JOIN ("
|
|
||||||
+ " SELECT source, target "
|
|
||||||
+ " FROM relation "
|
|
||||||
+ " WHERE datainfo.deletedbyinference = false "
|
|
||||||
+ getConstraintList(" lower(relclass) = '", allowedsemrel)
|
|
||||||
+ " ) rel_rel "
|
|
||||||
+ " ON source = id";
|
|
||||||
|
|
||||||
log.info("executedQuery: {}", query);
|
|
||||||
spark
|
|
||||||
.sql(query)
|
|
||||||
.as(Encoders.bean(ResultOrcidList.class))
|
|
||||||
.write()
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.json(outputResultPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,95 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
||||||
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.hadoop.io.compress.GzipCodec;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.sql.*;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
public class PrepareResultOrcidAssociationStep2 {
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(PrepareResultOrcidAssociationStep2.class);
|
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
String jsonConfiguration = IOUtils
|
|
||||||
.toString(
|
|
||||||
PrepareResultOrcidAssociationStep2.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters2.json"));
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
|
||||||
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
||||||
|
|
||||||
String inputPath = parser.get("sourcePath");
|
|
||||||
log.info("inputPath: {}", inputPath);
|
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
|
||||||
log.info("outputPath: {}", outputPath);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
|
|
||||||
runWithSparkSession(
|
|
||||||
conf,
|
|
||||||
isSparkSessionManaged,
|
|
||||||
spark -> {
|
|
||||||
removeOutputDir(spark, outputPath);
|
|
||||||
mergeInfo(spark, inputPath, outputPath);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void mergeInfo(SparkSession spark, String inputPath, String outputPath) {
|
|
||||||
|
|
||||||
Dataset<ResultOrcidList> resultOrcidAssoc = readPath(spark, inputPath + "/publication", ResultOrcidList.class)
|
|
||||||
.union(readPath(spark, inputPath + "/dataset", ResultOrcidList.class))
|
|
||||||
.union(readPath(spark, inputPath + "/otherresearchproduct", ResultOrcidList.class))
|
|
||||||
.union(readPath(spark, inputPath + "/software", ResultOrcidList.class));
|
|
||||||
|
|
||||||
resultOrcidAssoc
|
|
||||||
.toJavaRDD()
|
|
||||||
.mapToPair(r -> new Tuple2<>(r.getResultId(), r))
|
|
||||||
.reduceByKey(
|
|
||||||
(a, b) -> {
|
|
||||||
if (a == null) {
|
|
||||||
return b;
|
|
||||||
}
|
|
||||||
if (b == null) {
|
|
||||||
return a;
|
|
||||||
}
|
|
||||||
Set<String> orcid_set = new HashSet<>();
|
|
||||||
a.getAuthorList().stream().forEach(aa -> orcid_set.add(aa.getOrcid()));
|
|
||||||
b
|
|
||||||
.getAuthorList()
|
|
||||||
.stream()
|
|
||||||
.forEach(
|
|
||||||
aa -> {
|
|
||||||
if (!orcid_set.contains(aa.getOrcid())) {
|
|
||||||
a.getAuthorList().add(aa);
|
|
||||||
orcid_set.add(aa.getOrcid());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return a;
|
|
||||||
})
|
|
||||||
.map(Tuple2::_2)
|
|
||||||
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
|
|
||||||
.saveAsTextFile(outputPath, GzipCodec.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,27 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
public class ResultOrcidList implements Serializable {
|
|
||||||
String resultId;
|
|
||||||
List<AutoritativeAuthor> authorList = new ArrayList<>();
|
|
||||||
|
|
||||||
public String getResultId() {
|
|
||||||
return resultId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setResultId(String resultId) {
|
|
||||||
this.resultId = resultId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<AutoritativeAuthor> getAuthorList() {
|
|
||||||
return authorList;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setAuthorList(List<AutoritativeAuthor> authorList) {
|
|
||||||
this.authorList = authorList;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,211 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
|
||||||
import org.apache.spark.sql.Dataset;
|
|
||||||
import org.apache.spark.sql.Encoders;
|
|
||||||
import org.apache.spark.sql.SaveMode;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.common.PacePerson;
|
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
public class SparkOrcidToResultFromSemRelJob {
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkOrcidToResultFromSemRelJob.class);
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
String jsonConfiguration = IOUtils
|
|
||||||
.toString(
|
|
||||||
SparkOrcidToResultFromSemRelJob.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json"));
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
||||||
|
|
||||||
final String inputPath = parser.get("sourcePath");
|
|
||||||
log.info("inputPath: {}", inputPath);
|
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
|
||||||
log.info("outputPath: {}", outputPath);
|
|
||||||
|
|
||||||
final String possibleUpdates = parser.get("possibleUpdatesPath");
|
|
||||||
log.info("possibleUpdatesPath: {}", possibleUpdates);
|
|
||||||
|
|
||||||
final String resultClassName = parser.get("resultTableName");
|
|
||||||
log.info("resultTableName: {}", resultClassName);
|
|
||||||
|
|
||||||
final Boolean saveGraph = Optional
|
|
||||||
.ofNullable(parser.get("saveGraph"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
log.info("saveGraph: {}", saveGraph);
|
|
||||||
|
|
||||||
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
|
|
||||||
runWithSparkSession(
|
|
||||||
conf,
|
|
||||||
isSparkSessionManaged,
|
|
||||||
spark -> {
|
|
||||||
removeOutputDir(spark, outputPath);
|
|
||||||
if (saveGraph) {
|
|
||||||
execPropagation(spark, possibleUpdates, inputPath, outputPath, resultClazz);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private static <R extends Result> void execPropagation(
|
|
||||||
SparkSession spark,
|
|
||||||
String possibleUpdatesPath,
|
|
||||||
String inputPath,
|
|
||||||
String outputPath,
|
|
||||||
Class<R> resultClazz) {
|
|
||||||
|
|
||||||
// read possible updates (resultId and list of possible orcid to add
|
|
||||||
Dataset<ResultOrcidList> possible_updates = readPath(spark, possibleUpdatesPath, ResultOrcidList.class);
|
|
||||||
// read the result we have been considering
|
|
||||||
Dataset<R> result = readPath(spark, inputPath, resultClazz);
|
|
||||||
// make join result left_outer with possible updates
|
|
||||||
|
|
||||||
result
|
|
||||||
.joinWith(
|
|
||||||
possible_updates,
|
|
||||||
result.col("id").equalTo(possible_updates.col("resultId")),
|
|
||||||
"left_outer")
|
|
||||||
.map(authorEnrichFn(), Encoders.bean(resultClazz))
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.json(outputPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static <R extends Result> MapFunction<Tuple2<R, ResultOrcidList>, R> authorEnrichFn() {
|
|
||||||
return value -> {
|
|
||||||
R ret = value._1();
|
|
||||||
Optional<ResultOrcidList> rol = Optional.ofNullable(value._2());
|
|
||||||
if (rol.isPresent() && Optional.ofNullable(ret.getAuthor()).isPresent()) {
|
|
||||||
List<Author> toenrich_author = ret.getAuthor();
|
|
||||||
List<AutoritativeAuthor> autoritativeAuthors = rol.get().getAuthorList();
|
|
||||||
for (Author author : toenrich_author) {
|
|
||||||
if (!containsAllowedPid(author)) {
|
|
||||||
enrichAuthor(author, autoritativeAuthors);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void enrichAuthor(Author a, List<AutoritativeAuthor> au) {
|
|
||||||
PacePerson pp = new PacePerson(a.getFullname(), false);
|
|
||||||
for (AutoritativeAuthor aa : au) {
|
|
||||||
if (enrichAuthor(aa, a, pp.getNormalisedFirstName(), pp.getNormalisedSurname())) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean enrichAuthor(AutoritativeAuthor autoritative_author, Author author,
|
|
||||||
String author_name,
|
|
||||||
String author_surname) {
|
|
||||||
boolean toaddpid = false;
|
|
||||||
|
|
||||||
if (StringUtils.isNotEmpty(autoritative_author.getSurname())) {
|
|
||||||
if (StringUtils.isNotEmpty(author.getSurname())) {
|
|
||||||
author_surname = author.getSurname();
|
|
||||||
}
|
|
||||||
if (StringUtils.isNotEmpty(author_surname)) {
|
|
||||||
// have the same surname. Check the name
|
|
||||||
if (autoritative_author
|
|
||||||
.getSurname()
|
|
||||||
.trim()
|
|
||||||
.equalsIgnoreCase(author_surname.trim()) && StringUtils.isNotEmpty(autoritative_author.getName())) {
|
|
||||||
if (StringUtils.isNotEmpty(author.getName())) {
|
|
||||||
author_name = author.getName();
|
|
||||||
}
|
|
||||||
if (StringUtils.isNotEmpty(author_name)) {
|
|
||||||
if (autoritative_author
|
|
||||||
.getName()
|
|
||||||
.trim()
|
|
||||||
.equalsIgnoreCase(author_name.trim())) {
|
|
||||||
toaddpid = true;
|
|
||||||
}
|
|
||||||
// they could be differently written (i.e. only the initials of the name
|
|
||||||
// in one of the two
|
|
||||||
else {
|
|
||||||
if (autoritative_author
|
|
||||||
.getName()
|
|
||||||
.trim()
|
|
||||||
.substring(0, 0)
|
|
||||||
.equalsIgnoreCase(author_name.trim().substring(0, 0))) {
|
|
||||||
toaddpid = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (toaddpid) {
|
|
||||||
StructuredProperty p = new StructuredProperty();
|
|
||||||
p.setValue(autoritative_author.getOrcid());
|
|
||||||
p
|
|
||||||
.setQualifier(
|
|
||||||
getQualifier(
|
|
||||||
ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, ModelConstants.DNET_PID_TYPES));
|
|
||||||
p
|
|
||||||
.setDataInfo(
|
|
||||||
getDataInfo(
|
|
||||||
PROPAGATION_DATA_INFO_TYPE,
|
|
||||||
PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID,
|
|
||||||
PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME,
|
|
||||||
ModelConstants.DNET_PROVENANCE_ACTIONS));
|
|
||||||
|
|
||||||
Optional<List<StructuredProperty>> authorPid = Optional.ofNullable(author.getPid());
|
|
||||||
if (authorPid.isPresent()) {
|
|
||||||
authorPid.get().add(p);
|
|
||||||
} else {
|
|
||||||
author.setPid(Lists.newArrayList(p));
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
return toaddpid;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean containsAllowedPid(Author a) {
|
|
||||||
Optional<List<StructuredProperty>> pids = Optional.ofNullable(a.getPid());
|
|
||||||
if (!pids.isPresent()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
for (StructuredProperty pid : pids.get()) {
|
|
||||||
if (ModelConstants.ORCID_PENDING.equalsIgnoreCase(pid.getQualifier().getClassid()) ||
|
|
||||||
ModelConstants.ORCID.equalsIgnoreCase(pid.getQualifier().getClassid())) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,158 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.*;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.common.author.SparkEnrichWithOrcidAuthors;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
import eu.dnetlib.dhp.utils.OrcidAuthor;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
public class SparkPropagateOrcidAuthor extends SparkEnrichWithOrcidAuthors {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SparkPropagateOrcidAuthor.class);
|
||||||
|
|
||||||
|
public SparkPropagateOrcidAuthor(String propertyPath, String[] args, Logger log) {
|
||||||
|
super(propertyPath, args, log);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
// Create instance and run the Spark application
|
||||||
|
SparkPropagateOrcidAuthor app = new SparkPropagateOrcidAuthor(
|
||||||
|
"/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json", args, log);
|
||||||
|
app.initialize().run();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static OrcidAuthors getOrcidAuthorsList(List<Author> authors) {
|
||||||
|
OrcidAuthors oas = new OrcidAuthors();
|
||||||
|
List<OrcidAuthor> tmp = authors
|
||||||
|
.stream()
|
||||||
|
.map(SparkPropagateOrcidAuthor::getOrcidAuthor)
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
oas.setOrcidAuthorList(tmp);
|
||||||
|
return oas;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static OrcidAuthor getOrcidAuthor(Author a) {
|
||||||
|
return Optional
|
||||||
|
.ofNullable(getOrcid(a))
|
||||||
|
.map(orcid -> {
|
||||||
|
OrcidAuthor orcidAuthor = new OrcidAuthor(orcid, a.getSurname(), a.getName(), a.getFullname(), null);
|
||||||
|
return orcidAuthor;
|
||||||
|
})
|
||||||
|
.orElse(null);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String getOrcid(Author a) {
|
||||||
|
String orcid = null;
|
||||||
|
if (a.getPid().stream().anyMatch(p -> p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID)))
|
||||||
|
orcid = a
|
||||||
|
.getPid()
|
||||||
|
.stream()
|
||||||
|
.filter(p -> p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID))
|
||||||
|
.findFirst()
|
||||||
|
.get()
|
||||||
|
.getValue();
|
||||||
|
else if (a
|
||||||
|
.getPid()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(p -> p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID_PENDING)))
|
||||||
|
orcid = a
|
||||||
|
.getPid()
|
||||||
|
.stream()
|
||||||
|
.filter(p -> p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID_PENDING))
|
||||||
|
.findFirst()
|
||||||
|
.get()
|
||||||
|
.getValue();
|
||||||
|
return orcid;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void createTemporaryData(SparkSession spark, String graphPath, String orcidPath, String targetPath) {
|
||||||
|
Dataset<Row> supplements = spark
|
||||||
|
.read()
|
||||||
|
.schema(Encoders.bean(Relation.class).schema())
|
||||||
|
.json(graphPath + "/" + "relation")
|
||||||
|
.where(
|
||||||
|
"relclass IN('" + ModelConstants.IS_SUPPLEMENT_TO + "', '" +
|
||||||
|
ModelConstants.IS_SUPPLEMENTED_BY + "')")
|
||||||
|
.selectExpr("source as id", "target");
|
||||||
|
|
||||||
|
Dataset<Row> result = spark
|
||||||
|
.read()
|
||||||
|
.schema(Encoders.bean(Result.class).schema())
|
||||||
|
.json(
|
||||||
|
graphPath + "/dataset", graphPath + "/publication", graphPath + "/software",
|
||||||
|
graphPath + "/otherresearchproduct")
|
||||||
|
.as(Encoders.bean(Result.class))
|
||||||
|
.selectExpr("id", "author as graph_authors");
|
||||||
|
|
||||||
|
ModelSupport.entityTypes
|
||||||
|
.keySet()
|
||||||
|
.stream()
|
||||||
|
.filter(ModelSupport::isResult)
|
||||||
|
.forEach(e -> {
|
||||||
|
Dataset<Row> orcidDnet = spark
|
||||||
|
.read()
|
||||||
|
.schema(Encoders.bean(Result.class).schema())
|
||||||
|
.json(graphPath + "/" + e.name())
|
||||||
|
.as(Encoders.bean(Result.class))
|
||||||
|
.filter(
|
||||||
|
(FilterFunction<Result>) SparkPropagateOrcidAuthor::hasOrcidAuthor)
|
||||||
|
.map(
|
||||||
|
(MapFunction<Result, Tuple2<String, OrcidAuthors>>) r -> new Tuple2<>(r.getId(),
|
||||||
|
getOrcidAuthorsList(r.getAuthor())),
|
||||||
|
Encoders.tuple(Encoders.STRING(), Encoders.bean(OrcidAuthors.class)))
|
||||||
|
.selectExpr("_1 as target", "_2.orcidAuthorList as orcid_authors");
|
||||||
|
|
||||||
|
result
|
||||||
|
.join(supplements, "id")
|
||||||
|
.join(orcidDnet, "target")
|
||||||
|
.drop("target")
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.parquet(targetPath + "/" + e.name() + "_unmatched");
|
||||||
|
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean hasOrcidAuthor(Result r) {
|
||||||
|
|
||||||
|
return r.getAuthor() != null &&
|
||||||
|
isaBoolean(r);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isaBoolean(Result r) {
|
||||||
|
boolean tmp = r
|
||||||
|
.getAuthor()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(
|
||||||
|
a -> a.getPid() != null && a
|
||||||
|
.getPid()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(
|
||||||
|
p -> p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID) ||
|
||||||
|
p
|
||||||
|
.getQualifier()
|
||||||
|
.getClassid()
|
||||||
|
.equalsIgnoreCase(ModelConstants.ORCID_PENDING)));
|
||||||
|
return tmp;
|
||||||
|
}
|
||||||
|
}
|
|
@ -3,6 +3,7 @@ package eu.dnetlib.dhp.projecttoresult;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
import static eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
|
@ -3,6 +3,7 @@ package eu.dnetlib.dhp.resulttocommunityfromorganization;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
import static eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
|
@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||||
import static eu.dnetlib.dhp.PropagationConstant.PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME;
|
import static eu.dnetlib.dhp.PropagationConstant.PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME;
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
import static eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
|
@ -3,6 +3,7 @@ package eu.dnetlib.dhp.resulttocommunityfromsemrel;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||||
|
import static eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
|
@ -1,44 +1,32 @@
|
||||||
[
|
[
|
||||||
{
|
{
|
||||||
"paramName":"s",
|
"paramName":"s",
|
||||||
"paramLongName":"sourcePath",
|
"paramLongName":"graphPath",
|
||||||
"paramDescription": "the path of the sequencial file to read",
|
"paramDescription": "the path of the sequencial file to read",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"paramName":"sg",
|
|
||||||
"paramLongName":"saveGraph",
|
|
||||||
"paramDescription": "true if the new version of the graph must be saved",
|
|
||||||
"paramRequired": false
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"paramName": "out",
|
"paramName": "out",
|
||||||
"paramLongName": "outputPath",
|
"paramLongName": "targetPath",
|
||||||
|
"paramDescription": "the path used to store temporary output files",
|
||||||
|
"paramRequired": true
|
||||||
|
}, {
|
||||||
|
"paramName": "o",
|
||||||
|
"paramLongName": "orcidPath",
|
||||||
|
"paramDescription": "the path used to store temporary output files",
|
||||||
|
"paramRequired": true
|
||||||
|
}, {
|
||||||
|
"paramName": "w",
|
||||||
|
"paramLongName": "workingDir",
|
||||||
"paramDescription": "the path used to store temporary output files",
|
"paramDescription": "the path used to store temporary output files",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"paramName": "ssm",
|
"paramName": "m",
|
||||||
"paramLongName": "isSparkSessionManaged",
|
"paramLongName": "matchingSource",
|
||||||
"paramDescription": "true if the spark session is managed, false otherwise",
|
"paramDescription": "the path used to store temporary output files",
|
||||||
"paramRequired": false
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"paramName":"tn",
|
|
||||||
"paramLongName":"resultTableName",
|
|
||||||
"paramDescription": "the name of the result table we are currently working on",
|
|
||||||
"paramRequired": true
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"paramName":"pu",
|
|
||||||
"paramLongName":"possibleUpdatesPath",
|
|
||||||
"paramDescription": "the path the the association resultId orcid author list can be found",
|
|
||||||
"paramRequired": true
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"paramName":"test",
|
|
||||||
"paramLongName":"isTest",
|
|
||||||
"paramDescription": "true if it is executing a test",
|
|
||||||
"paramRequired": false
|
"paramRequired": false
|
||||||
}
|
}
|
||||||
|
|
||||||
]
|
]
|
|
@ -92,21 +92,14 @@
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<join name="copy_wait" to="fork_prepare_assoc_step1"/>
|
<join name="copy_wait" to="exec_propagation"/>
|
||||||
|
|
||||||
<fork name="fork_prepare_assoc_step1">
|
<action name="exec_propagation">
|
||||||
<path start="join_prepare_publication"/>
|
|
||||||
<path start="join_prepare_dataset"/>
|
|
||||||
<path start="join_prepare_otherresearchproduct"/>
|
|
||||||
<path start="join_prepare_software"/>
|
|
||||||
</fork>
|
|
||||||
|
|
||||||
<action name="join_prepare_publication">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>ORCIDPropagation-PreparePhase1-Publications</name>
|
<name>ORCIDPropagation-PreparePhase1-Publications</name>
|
||||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.PrepareResultOrcidAssociationStep1</class>
|
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkPropagateOrcidAuthor</class>
|
||||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
@ -119,239 +112,17 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.shuffle.partitions=8000
|
--conf spark.sql.shuffle.partitions=8000
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
<arg>--graphPath</arg><arg>${sourcePath}</arg>
|
||||||
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
<arg>--orcidPath</arg><arg>${sourcePath}</arg>
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
<arg>--workingDir</arg><arg>${workingDir}</arg>
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
|
<arg>--targetPath</arg><arg>${outputPath}</arg>
|
||||||
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
|
<arg>--matchingSource</arg><arg>graph</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="wait"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<action name="join_prepare_dataset">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>ORCIDPropagation-PreparePhase1-Dataset</name>
|
|
||||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.PrepareResultOrcidAssociationStep1</class>
|
|
||||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
|
||||||
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
|
|
||||||
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="join_prepare_otherresearchproduct">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>ORCIDPropagation-PreparePhase1-ORP</name>
|
|
||||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.PrepareResultOrcidAssociationStep1</class>
|
|
||||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
|
||||||
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
|
|
||||||
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="join_prepare_software">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>ORCIDPropagation-PreparePhase1-Software</name>
|
|
||||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.PrepareResultOrcidAssociationStep1</class>
|
|
||||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
|
||||||
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
|
|
||||||
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<join name="wait" to="prepare_assoc_step2"/>
|
|
||||||
|
|
||||||
<action name="prepare_assoc_step2">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>ORCIDPropagation-PreparePhase2</name>
|
|
||||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.PrepareResultOrcidAssociationStep2</class>
|
|
||||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--sourcePath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="fork-join-exec-propagation"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<fork name="fork-join-exec-propagation">
|
|
||||||
<path start="join_propagate_publication"/>
|
|
||||||
<path start="join_propagate_dataset"/>
|
|
||||||
<path start="join_propagate_otherresearchproduct"/>
|
|
||||||
<path start="join_propagate_software"/>
|
|
||||||
</fork>
|
|
||||||
|
|
||||||
<action name="join_propagate_publication">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>ORCIDPropagation-Publication</name>
|
|
||||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
|
|
||||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=15000
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
|
|
||||||
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
|
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait2"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="join_propagate_dataset">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>ORCIDPropagation-Dataset</name>
|
|
||||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
|
|
||||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=8000
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
|
|
||||||
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
|
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait2"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="join_propagate_otherresearchproduct">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>ORCIDPropagation-ORP</name>
|
|
||||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
|
|
||||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=8000
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
|
|
||||||
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
|
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait2"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="join_propagate_software">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>ORCIDPropagation-Software</name>
|
|
||||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
|
|
||||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=4000
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
|
|
||||||
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
|
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait2"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<join name="wait2" to="End"/>
|
|
||||||
|
|
||||||
<end name="End"/>
|
<end name="End"/>
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Country;
|
import eu.dnetlib.dhp.schema.oaf.Country;
|
||||||
|
@ -33,7 +34,8 @@ public class CountryPropagationJobTest {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(CountryPropagationJobTest.class);
|
private static final Logger log = LoggerFactory.getLogger(CountryPropagationJobTest.class);
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
||||||
|
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||||
|
|
||||||
private static SparkSession spark;
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
|
|
@ -15,11 +15,13 @@ import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
public class DatasourceCountryPreparationTest {
|
public class DatasourceCountryPreparationTest {
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
||||||
|
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||||
|
|
||||||
private static SparkSession spark;
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.KeyValueSet;
|
import eu.dnetlib.dhp.KeyValueSet;
|
||||||
import eu.dnetlib.dhp.PropagationConstant;
|
import eu.dnetlib.dhp.PropagationConstant;
|
||||||
|
import eu.dnetlib.dhp.common.enrichment.Constants;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
|
||||||
|
@ -145,7 +146,7 @@ public class SparkJobTest {
|
||||||
.foreach(
|
.foreach(
|
||||||
r -> Assertions
|
r -> Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
PropagationConstant.PROPAGATION_DATA_INFO_TYPE, r.getDataInfo().getInferenceprovenance()));
|
Constants.PROPAGATION_DATA_INFO_TYPE, r.getDataInfo().getInferenceprovenance()));
|
||||||
result
|
result
|
||||||
.foreach(
|
.foreach(
|
||||||
r -> Assertions
|
r -> Assertions
|
||||||
|
@ -428,7 +429,7 @@ public class SparkJobTest {
|
||||||
.foreach(
|
.foreach(
|
||||||
r -> Assertions
|
r -> Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
PropagationConstant.PROPAGATION_DATA_INFO_TYPE, r.getDataInfo().getInferenceprovenance()));
|
Constants.PROPAGATION_DATA_INFO_TYPE, r.getDataInfo().getInferenceprovenance()));
|
||||||
project
|
project
|
||||||
.foreach(
|
.foreach(
|
||||||
r -> Assertions
|
r -> Assertions
|
||||||
|
|
|
@ -4,11 +4,16 @@ package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.logging.Filter;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
|
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||||
|
import org.apache.spark.api.java.function.ForeachFunction;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
@ -21,8 +26,11 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.common.enrichment.Constants;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||||
|
|
||||||
public class OrcidPropagationJobTest {
|
public class OrcidPropagationJobTest {
|
||||||
|
|
||||||
|
@ -71,23 +79,24 @@ public class OrcidPropagationJobTest {
|
||||||
.getResource(
|
.getResource(
|
||||||
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc")
|
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc")
|
||||||
.getPath();
|
.getPath();
|
||||||
SparkOrcidToResultFromSemRelJob
|
SparkPropagateOrcidAuthor
|
||||||
.main(
|
.main(
|
||||||
new String[] {
|
new String[] {
|
||||||
"-isTest", Boolean.TRUE.toString(),
|
"-graphPath",
|
||||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
getClass()
|
||||||
"-sourcePath", sourcePath,
|
.getResource(
|
||||||
"-hive_metastore_uris", "",
|
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/noupdate")
|
||||||
"-saveGraph", "true",
|
.getPath(),
|
||||||
"-resultTableName", Dataset.class.getCanonicalName(),
|
"-targetPath",
|
||||||
"-outputPath", workingDir.toString() + "/dataset",
|
workingDir.toString() + "/graph",
|
||||||
"-possibleUpdatesPath", possibleUpdatesPath
|
"-orcidPath", "",
|
||||||
|
"-workingDir", workingDir.toString()
|
||||||
});
|
});
|
||||||
|
|
||||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
JavaRDD<Dataset> tmp = sc
|
JavaRDD<Dataset> tmp = sc
|
||||||
.textFile(workingDir.toString() + "/dataset")
|
.textFile(workingDir.toString() + "/graph/dataset")
|
||||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||||
|
|
||||||
// tmp.map(s -> new Gson().toJson(s)).foreach(s -> System.out.println(s));
|
// tmp.map(s -> new Gson().toJson(s)).foreach(s -> System.out.println(s));
|
||||||
|
@ -110,40 +119,27 @@ public class OrcidPropagationJobTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void oneUpdateTest() throws Exception {
|
void oneUpdateTest() throws Exception {
|
||||||
SparkOrcidToResultFromSemRelJob
|
SparkPropagateOrcidAuthor
|
||||||
.main(
|
.main(
|
||||||
new String[] {
|
new String[] {
|
||||||
"-isTest",
|
"-graphPath",
|
||||||
Boolean.TRUE.toString(),
|
|
||||||
"-isSparkSessionManaged",
|
|
||||||
Boolean.FALSE.toString(),
|
|
||||||
"-sourcePath",
|
|
||||||
getClass()
|
|
||||||
.getResource("/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/oneupdate")
|
|
||||||
.getPath(),
|
|
||||||
"-hive_metastore_uris",
|
|
||||||
"",
|
|
||||||
"-saveGraph",
|
|
||||||
"true",
|
|
||||||
"-resultTableName",
|
|
||||||
"eu.dnetlib.dhp.schema.oaf.Dataset",
|
|
||||||
"-outputPath",
|
|
||||||
workingDir.toString() + "/dataset",
|
|
||||||
"-possibleUpdatesPath",
|
|
||||||
getClass()
|
getClass()
|
||||||
.getResource(
|
.getResource(
|
||||||
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc")
|
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/oneupdate")
|
||||||
.getPath()
|
.getPath(),
|
||||||
|
"-targetPath",
|
||||||
|
workingDir.toString() + "/graph",
|
||||||
|
"-orcidPath", "",
|
||||||
|
"-workingDir", workingDir.toString(),
|
||||||
|
"-matchingSource", "propagation"
|
||||||
});
|
});
|
||||||
|
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
JavaRDD<Dataset> tmp = sc
|
JavaRDD<Dataset> tmp = sc
|
||||||
.textFile(workingDir.toString() + "/dataset")
|
.textFile(workingDir.toString() + "/graph/dataset")
|
||||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||||
|
|
||||||
// tmp.map(s -> new Gson().toJson(s)).foreach(s -> System.out.println(s));
|
|
||||||
|
|
||||||
Assertions.assertEquals(10, tmp.count());
|
Assertions.assertEquals(10, tmp.count());
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
|
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
|
||||||
|
@ -158,6 +154,7 @@ public class OrcidPropagationJobTest {
|
||||||
+ "where MyP.datainfo.inferenceprovenance = 'propagation'";
|
+ "where MyP.datainfo.inferenceprovenance = 'propagation'";
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Row> propagatedAuthors = spark.sql(query);
|
org.apache.spark.sql.Dataset<Row> propagatedAuthors = spark.sql(query);
|
||||||
|
propagatedAuthors.show(false);
|
||||||
|
|
||||||
Assertions.assertEquals(1, propagatedAuthors.count());
|
Assertions.assertEquals(1, propagatedAuthors.count());
|
||||||
|
|
||||||
|
@ -167,47 +164,35 @@ public class OrcidPropagationJobTest {
|
||||||
propagatedAuthors
|
propagatedAuthors
|
||||||
.filter(
|
.filter(
|
||||||
"id = '50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e' "
|
"id = '50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e' "
|
||||||
+ "and name = 'Vajinder' and surname = 'Kumar' and pidType = '" +
|
+ "and name = 'Nicole' and surname = 'Jung' and pidType = '" +
|
||||||
|
|
||||||
ModelConstants.ORCID_PENDING + "'")
|
ModelConstants.ORCID_PENDING + "'")
|
||||||
.count());
|
.count());
|
||||||
|
|
||||||
Assertions.assertEquals(1, propagatedAuthors.filter("pid = '0000-0002-8825-3517'").count());
|
Assertions.assertEquals(1, propagatedAuthors.filter("pid = '0000-0001-9513-2468'").count());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void twoUpdatesTest() throws Exception {
|
void twoUpdatesTest() throws Exception {
|
||||||
SparkOrcidToResultFromSemRelJob
|
SparkPropagateOrcidAuthor
|
||||||
.main(
|
.main(
|
||||||
new String[] {
|
new String[] {
|
||||||
"-isTest",
|
"-graphPath",
|
||||||
Boolean.TRUE.toString(),
|
|
||||||
"-isSparkSessionManaged",
|
|
||||||
Boolean.FALSE.toString(),
|
|
||||||
"-sourcePath",
|
|
||||||
getClass()
|
getClass()
|
||||||
.getResource(
|
.getResource(
|
||||||
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/twoupdates")
|
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/twoupdates")
|
||||||
.getPath(),
|
.getPath(),
|
||||||
"-hive_metastore_uris",
|
"-targetPath",
|
||||||
"",
|
workingDir.toString() + "/graph",
|
||||||
"-saveGraph",
|
"-orcidPath", "",
|
||||||
"true",
|
"-workingDir", workingDir.toString(),
|
||||||
"-resultTableName",
|
"-matchingSource", "propagation"
|
||||||
"eu.dnetlib.dhp.schema.oaf.Dataset",
|
|
||||||
"-outputPath",
|
|
||||||
workingDir.toString() + "/dataset",
|
|
||||||
"-possibleUpdatesPath",
|
|
||||||
getClass()
|
|
||||||
.getResource(
|
|
||||||
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc")
|
|
||||||
.getPath()
|
|
||||||
});
|
});
|
||||||
|
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
JavaRDD<Dataset> tmp = sc
|
JavaRDD<Dataset> tmp = sc
|
||||||
.textFile(workingDir.toString() + "/dataset")
|
.textFile(workingDir.toString() + "/graph/dataset")
|
||||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||||
|
|
||||||
Assertions.assertEquals(10, tmp.count());
|
Assertions.assertEquals(10, tmp.count());
|
||||||
|
@ -229,10 +214,10 @@ public class OrcidPropagationJobTest {
|
||||||
|
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
1, propagatedAuthors.filter("name = 'Marc' and surname = 'Schmidtmann'").count());
|
1, propagatedAuthors.filter("name = 'Nicole' and surname = 'Jung'").count());
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
1, propagatedAuthors.filter("name = 'Ruediger' and surname = 'Beckhaus'").count());
|
1, propagatedAuthors.filter("name = 'Camilo José' and surname = 'Cela'").count());
|
||||||
|
|
||||||
query = "select id, MyT.name name, MyT.surname surname, MyP.value pid ,MyP.qualifier.classid pidType "
|
query = "select id, MyT.name name, MyT.surname surname, MyP.value pid ,MyP.qualifier.classid pidType "
|
||||||
+ "from dataset "
|
+ "from dataset "
|
||||||
|
@ -243,13 +228,27 @@ public class OrcidPropagationJobTest {
|
||||||
|
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
2, authorsExplodedPids.filter("name = 'Marc' and surname = 'Schmidtmann'").count());
|
3, authorsExplodedPids.filter("name = 'Camilo José' and surname = 'Cela'").count());
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
1,
|
1,
|
||||||
authorsExplodedPids
|
authorsExplodedPids
|
||||||
.filter(
|
.filter(
|
||||||
"name = 'Marc' and surname = 'Schmidtmann' and pidType = 'MAG Identifier'")
|
"name = 'Camilo José' and surname = 'Cela' and pidType = 'MAG Identifier'")
|
||||||
|
.count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1,
|
||||||
|
authorsExplodedPids
|
||||||
|
.filter(
|
||||||
|
"name = 'Camilo José' and surname = 'Cela' and pidType = 'orcid'")
|
||||||
|
.count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1,
|
||||||
|
authorsExplodedPids
|
||||||
|
.filter(
|
||||||
|
"name = 'Camilo José' and surname = 'Cela' and pidType = 'orcid_pending'")
|
||||||
.count());
|
.count());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.orcidtoresultfromsemrel.OrcidPropagationJobTest;
|
import eu.dnetlib.dhp.orcidtoresultfromsemrel.OrcidPropagationJobTest;
|
||||||
|
@ -30,7 +31,8 @@ public class ResultToCommunityJobTest {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(ResultToCommunityJobTest.class);
|
private static final Logger log = LoggerFactory.getLogger(ResultToCommunityJobTest.class);
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
||||||
|
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||||
|
|
||||||
private static SparkSession spark;
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
|
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
|
||||||
|
@ -34,7 +35,8 @@ public class ResultToCommunityJobTest {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(ResultToCommunityJobTest.class);
|
private static final Logger log = LoggerFactory.getLogger(ResultToCommunityJobTest.class);
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
||||||
|
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||||
|
|
||||||
private static SparkSession spark;
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
|
Binary file not shown.
File diff suppressed because one or more lines are too long
Binary file not shown.
|
@ -0,0 +1,2 @@
|
||||||
|
{"subRelType": "supplement", "relClass": "IsSupplementedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "target": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "source": "50|dedup_wf_001::36bcfaa1494c849547a346da688ade24", "collectedfrom": [], "validated": false, "properties": []}
|
||||||
|
{"subRelType": "supplement", "relClass": "IsSupplementTo", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "source": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "target": "50|dedup_wf_001::36bcfaa1494c849547a346da688ade24", "collectedfrom": [], "validated": false, "properties": []}
|
File diff suppressed because one or more lines are too long
Binary file not shown.
|
@ -0,0 +1,9 @@
|
||||||
|
{"subRelType": "supplement", "relClass": "IsSupplementedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "target": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "source": "50|dedup_wf_001::36bcfaa1494c849547a346da688ade24", "collectedfrom": [], "validated": false, "properties": []}
|
||||||
|
{"subRelType": "supplement", "relClass": "IsSupplementTo", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "source": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "target": "50|dedup_wf_001::36bcfaa1494c849547a346da688ade24", "collectedfrom": [], "validated": false, "properties": []}
|
||||||
|
{"subRelType": "supplement", "relClass": "IsSupplementedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "target": "50|od______3989::0f89464c4ac4c398fe0c71433b175a62", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "source": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "collectedfrom": [], "validated": false, "properties": []}
|
||||||
|
{"subRelType": "supplement", "relClass": "IsSupplementTo", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "source": "50|od______3989::0f89464c4ac4c398fe0c71433b175a62", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "target": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "collectedfrom": [], "validated": false, "properties": []}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,129 +0,0 @@
|
||||||
package eu.dnetlib.dhp.enrich.orcid
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Author, StructuredProperty}
|
|
||||||
import eu.dnetlib.dhp.schema.sx.OafUtils
|
|
||||||
import eu.dnetlib.pace.util.AuthorMatchers
|
|
||||||
|
|
||||||
import java.util
|
|
||||||
import scala.beans.BeanProperty
|
|
||||||
import scala.collection.JavaConverters._
|
|
||||||
import scala.util.control.Breaks.{break, breakable}
|
|
||||||
|
|
||||||
case class ORCIDAuthorEnricherResult(
|
|
||||||
@BeanProperty var id: String,
|
|
||||||
@BeanProperty var enriched_author: java.util.List[Author],
|
|
||||||
@BeanProperty var author_matched: java.util.List[MatchedAuthors],
|
|
||||||
@BeanProperty var author_unmatched: java.util.List[Author],
|
|
||||||
@BeanProperty var orcid_unmatched: java.util.List[OrcidAutor]
|
|
||||||
)
|
|
||||||
|
|
||||||
object ORCIDAuthorEnricher extends Serializable {
|
|
||||||
|
|
||||||
def enrichOrcid(
|
|
||||||
id: String,
|
|
||||||
graph_authors: java.util.List[Author],
|
|
||||||
orcid_authors: java.util.List[OrcidAutor]
|
|
||||||
): ORCIDAuthorEnricherResult = {
|
|
||||||
// Author enriching strategy:
|
|
||||||
// 1) create a copy of graph author list in unmatched_authors
|
|
||||||
// 2) find best match in unmatched_authors, remove it from unmatched_authors and enrich it so
|
|
||||||
// that the enrichment is reflected in graph_authors (they share author instances)
|
|
||||||
// 3) repeat (2) till the end of the list and then with different matching algorithms that have decreasing
|
|
||||||
// trust in their output
|
|
||||||
// At the end unmatched_authors will contain authors not matched with any of the matching algos
|
|
||||||
val unmatched_authors = new util.ArrayList[Author](graph_authors)
|
|
||||||
|
|
||||||
val matches = {
|
|
||||||
// Look after exact fullname match, reconstruct ORCID fullname as givenName + familyName
|
|
||||||
extractAndEnrichMatches(
|
|
||||||
unmatched_authors,
|
|
||||||
orcid_authors,
|
|
||||||
(author, orcid) =>
|
|
||||||
AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.givenName + " " + orcid.familyName),
|
|
||||||
"fullName"
|
|
||||||
) ++
|
|
||||||
// Look after exact reversed fullname match, reconstruct ORCID fullname as familyName + givenName
|
|
||||||
extractAndEnrichMatches(
|
|
||||||
unmatched_authors,
|
|
||||||
orcid_authors,
|
|
||||||
(author, orcid) =>
|
|
||||||
AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.familyName + " " + orcid.givenName),
|
|
||||||
"reversedFullName"
|
|
||||||
) ++
|
|
||||||
// split author names in tokens, order the tokens, then check for matches of full tokens or abbreviations
|
|
||||||
extractAndEnrichMatches(
|
|
||||||
unmatched_authors,
|
|
||||||
orcid_authors,
|
|
||||||
(author, orcid) =>
|
|
||||||
AuthorMatchers
|
|
||||||
.matchOrderedTokenAndAbbreviations(author.getFullname, orcid.givenName + " " + orcid.familyName),
|
|
||||||
"orderedTokens"
|
|
||||||
) ++
|
|
||||||
// look after exact matches of ORCID creditName
|
|
||||||
extractAndEnrichMatches(
|
|
||||||
unmatched_authors,
|
|
||||||
orcid_authors,
|
|
||||||
(author, orcid) => AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.creditName),
|
|
||||||
"creditName"
|
|
||||||
) ++
|
|
||||||
// look after exact matches in ORCID otherNames
|
|
||||||
extractAndEnrichMatches(
|
|
||||||
unmatched_authors,
|
|
||||||
orcid_authors,
|
|
||||||
(author, orcid) =>
|
|
||||||
orcid.otherNames != null && AuthorMatchers.matchOtherNames(author.getFullname, orcid.otherNames.asScala),
|
|
||||||
"otherNames"
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
ORCIDAuthorEnricherResult(id, graph_authors, matches.asJava, unmatched_authors, orcid_authors)
|
|
||||||
}
|
|
||||||
|
|
||||||
private def extractAndEnrichMatches(
|
|
||||||
graph_authors: java.util.List[Author],
|
|
||||||
orcid_authors: java.util.List[OrcidAutor],
|
|
||||||
matchingFunc: (Author, OrcidAutor) => Boolean,
|
|
||||||
matchName: String
|
|
||||||
) = {
|
|
||||||
val matched = scala.collection.mutable.ArrayBuffer.empty[MatchedAuthors]
|
|
||||||
|
|
||||||
if (graph_authors != null && !graph_authors.isEmpty) {
|
|
||||||
val ait = graph_authors.iterator
|
|
||||||
|
|
||||||
while (ait.hasNext) {
|
|
||||||
val author = ait.next()
|
|
||||||
val oit = orcid_authors.iterator
|
|
||||||
|
|
||||||
breakable {
|
|
||||||
while (oit.hasNext) {
|
|
||||||
val orcid = oit.next()
|
|
||||||
|
|
||||||
if (matchingFunc(author, orcid)) {
|
|
||||||
ait.remove()
|
|
||||||
oit.remove()
|
|
||||||
matched += MatchedAuthors(author, orcid, matchName)
|
|
||||||
|
|
||||||
if (author.getPid == null) {
|
|
||||||
author.setPid(new util.ArrayList[StructuredProperty]())
|
|
||||||
}
|
|
||||||
|
|
||||||
val orcidPID = OafUtils.createSP(orcid.orcid, ModelConstants.ORCID, ModelConstants.ORCID)
|
|
||||||
orcidPID.setDataInfo(OafUtils.generateDataInfo())
|
|
||||||
orcidPID.getDataInfo.setProvenanceaction(
|
|
||||||
OafUtils.createQualifier("ORCID_ENRICHMENT", "ORCID_ENRICHMENT")
|
|
||||||
)
|
|
||||||
|
|
||||||
author.getPid.add(orcidPID)
|
|
||||||
|
|
||||||
break()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
matched
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,93 +1,23 @@
|
||||||
package eu.dnetlib.dhp.enrich.orcid
|
package eu.dnetlib.dhp.enrich.orcid
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
import eu.dnetlib.dhp.common.author.SparkEnrichWithOrcidAuthors
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport
|
import eu.dnetlib.dhp.schema.common.ModelSupport
|
||||||
import eu.dnetlib.dhp.schema.oaf._
|
import eu.dnetlib.dhp.utils.ORCIDAuthorEnricherResult
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.apache.spark.sql.functions._
|
import org.apache.spark.sql.functions._
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
import scala.beans.BeanProperty
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
case class OrcidAutor(
|
|
||||||
@BeanProperty var orcid: String,
|
|
||||||
@BeanProperty var familyName: String,
|
|
||||||
@BeanProperty var givenName: String,
|
|
||||||
@BeanProperty var creditName: String,
|
|
||||||
@BeanProperty var otherNames: java.util.List[String]
|
|
||||||
) {
|
|
||||||
def this() = this("null", "null", "null", "null", null)
|
|
||||||
}
|
|
||||||
|
|
||||||
case class MatchData(
|
|
||||||
@BeanProperty var id: String,
|
|
||||||
@BeanProperty var graph_authors: java.util.List[Author],
|
|
||||||
@BeanProperty var orcid_authors: java.util.List[OrcidAutor]
|
|
||||||
) {
|
|
||||||
def this() = this("null", null, null)
|
|
||||||
}
|
|
||||||
|
|
||||||
case class MatchedAuthors(
|
|
||||||
@BeanProperty var author: Author,
|
|
||||||
@BeanProperty var orcid: OrcidAutor,
|
|
||||||
@BeanProperty var `type`: String
|
|
||||||
)
|
|
||||||
|
|
||||||
class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String], log: Logger)
|
class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String], log: Logger)
|
||||||
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
|
extends SparkEnrichWithOrcidAuthors(propertyPath, args, log: Logger) {
|
||||||
|
|
||||||
/** Here all the spark applications runs this method
|
override def createTemporaryData(
|
||||||
* where the whole logic of the spark node is defined
|
spark: SparkSession,
|
||||||
*/
|
graphPath: String,
|
||||||
override def run(): Unit = {
|
orcidPath: String,
|
||||||
val graphPath = parser.get("graphPath")
|
targetPath: String
|
||||||
log.info(s"graphPath is '$graphPath'")
|
): Unit = {
|
||||||
val orcidPath = parser.get("orcidPath")
|
|
||||||
log.info(s"orcidPath is '$orcidPath'")
|
|
||||||
val targetPath = parser.get("targetPath")
|
|
||||||
log.info(s"targetPath is '$targetPath'")
|
|
||||||
val workingDir = parser.get("workingDir")
|
|
||||||
log.info(s"targetPath is '$workingDir'")
|
|
||||||
|
|
||||||
createTemporaryData(graphPath, orcidPath, workingDir)
|
|
||||||
analisys(workingDir)
|
|
||||||
generateGraph(graphPath, workingDir, targetPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
private def generateGraph(graphPath: String, workingDir: String, targetPath: String): Unit = {
|
|
||||||
|
|
||||||
ModelSupport.entityTypes.asScala
|
|
||||||
.filter(e => ModelSupport.isResult(e._1))
|
|
||||||
.foreach(e => {
|
|
||||||
val resultType = e._1.name()
|
|
||||||
val enc = Encoders.bean(e._2)
|
|
||||||
|
|
||||||
val matched = spark.read
|
|
||||||
.schema(Encoders.bean(classOf[ORCIDAuthorEnricherResult]).schema)
|
|
||||||
.parquet(s"${workingDir}/${resultType}_matched")
|
|
||||||
.selectExpr("id", "enriched_author")
|
|
||||||
|
|
||||||
spark.read
|
|
||||||
.schema(enc.schema)
|
|
||||||
.json(s"$graphPath/$resultType")
|
|
||||||
.join(matched, Seq("id"), "left")
|
|
||||||
.withColumn(
|
|
||||||
"author",
|
|
||||||
when(size(col("enriched_author")).gt(0), col("enriched_author"))
|
|
||||||
.otherwise(col("author"))
|
|
||||||
)
|
|
||||||
.drop("enriched_author")
|
|
||||||
.write
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.json(s"${targetPath}/${resultType}")
|
|
||||||
|
|
||||||
})
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private def createTemporaryData(graphPath: String, orcidPath: String, targetPath: String): Unit = {
|
|
||||||
val orcidAuthors =
|
val orcidAuthors =
|
||||||
spark.read.load(s"$orcidPath/Authors").select("orcid", "familyName", "givenName", "creditName", "otherNames")
|
spark.read.load(s"$orcidPath/Authors").select("orcid", "familyName", "givenName", "creditName", "otherNames")
|
||||||
|
|
||||||
|
@ -157,25 +87,6 @@ class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String]
|
||||||
orcidWorksWithAuthors.unpersist()
|
orcidWorksWithAuthors.unpersist()
|
||||||
}
|
}
|
||||||
|
|
||||||
private def analisys(targetPath: String): Unit = {
|
|
||||||
ModelSupport.entityTypes.asScala
|
|
||||||
.filter(e => ModelSupport.isResult(e._1))
|
|
||||||
.foreach(e => {
|
|
||||||
val resultType = e._1.name()
|
|
||||||
|
|
||||||
spark.read
|
|
||||||
.parquet(s"$targetPath/${resultType}_unmatched")
|
|
||||||
.where("size(graph_authors) > 0")
|
|
||||||
.as[MatchData](Encoders.bean(classOf[MatchData]))
|
|
||||||
.map(md => {
|
|
||||||
ORCIDAuthorEnricher.enrichOrcid(md.id, md.graph_authors, md.orcid_authors)
|
|
||||||
})(Encoders.bean(classOf[ORCIDAuthorEnricherResult]))
|
|
||||||
.write
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.mode("overwrite")
|
|
||||||
.parquet(s"$targetPath/${resultType}_matched")
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object SparkEnrichGraphWithOrcidAuthors {
|
object SparkEnrichGraphWithOrcidAuthors {
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package eu.dnetlib.dhp.incremental
|
package eu.dnetlib.dhp.incremental
|
||||||
|
|
||||||
import eu.dnetlib.dhp.PropagationConstant
|
import eu.dnetlib.dhp.common.enrichment.Constants
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
import eu.dnetlib.dhp.bulktag.community.TaggingConstants
|
import eu.dnetlib.dhp.bulktag.community.TaggingConstants
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport
|
import eu.dnetlib.dhp.schema.common.ModelSupport
|
||||||
|
@ -63,7 +63,7 @@ object SparkAppendContextCleanedGraph {
|
||||||
c.getDataInfo.asScala
|
c.getDataInfo.asScala
|
||||||
.filter(
|
.filter(
|
||||||
di =>
|
di =>
|
||||||
!di.getInferenceprovenance.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE)
|
!di.getInferenceprovenance.equals(Constants.PROPAGATION_DATA_INFO_TYPE)
|
||||||
&& !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE)
|
&& !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE)
|
||||||
)
|
)
|
||||||
.toList
|
.toList
|
||||||
|
|
Loading…
Reference in New Issue