WIP: [OrcidPropagation] Make ORCID enrichment/propagation code more generic and reusable #501
|
@ -0,0 +1,9 @@
|
|||
|
||||
package eu.dnetlib.dhp.common.enrichment;
|
||||
|
||||
public class Constants {
|
||||
public static final String PROPAGATION_DATA_INFO_TYPE = "propagation";
|
||||
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID = "authorpid:result";
|
||||
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME = "Propagation of authors pid to result through semantic relations";
|
||||
|
||||
}
|
|
@ -621,7 +621,7 @@ public class MergeUtils {
|
|||
return m;
|
||||
}
|
||||
|
||||
private static List<Author> mergeAuthors(List<Author> author, List<Author> author1, int trust) {
|
||||
public static List<Author> mergeAuthors(List<Author> author, List<Author> author1, int trust) {
|
||||
List<List<Author>> authors = new ArrayList<>();
|
||||
if (author != null) {
|
||||
authors.add(author);
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
package eu.dnetlib.dhp.common.author
|
||||
|
||||
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
||||
import eu.dnetlib.dhp.schema.common.{EntityType, ModelConstants, ModelSupport}
|
||||
import eu.dnetlib.dhp.utils.{MatchData, ORCIDAuthorEnricher, ORCIDAuthorEnricherResult}
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
import eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE
|
||||
import eu.dnetlib.dhp.schema.oaf.{OafEntity, Result}
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[String], log: Logger)
|
||||
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
|
||||
|
||||
/** Here all the spark applications runs this method
|
||||
* where the whole logic of the spark node is defined
|
||||
*/
|
||||
override def run(): Unit = {
|
||||
val graphPath = parser.get("graphPath")
|
||||
log.info(s"graphPath is '$graphPath'")
|
||||
val orcidPath = parser.get("orcidPath")
|
||||
log.info(s"orcidPath is '$orcidPath'")
|
||||
val targetPath = parser.get("targetPath")
|
||||
log.info(s"targetPath is '$targetPath'")
|
||||
val workingDir = parser.get("workingDir")
|
||||
log.info(s"targetPath is '$workingDir'")
|
||||
val classid =
|
||||
Option(parser.get("matchingSource")).map(_ => ModelConstants.ORCID_PENDING).getOrElse(ModelConstants.ORCID)
|
||||
|
||||
log.info(s"classid is '$classid'")
|
||||
val provenance =
|
||||
Option(parser.get("matchingSource")).map(_ => PROPAGATION_DATA_INFO_TYPE).getOrElse("ORCID_ENRICHMENT")
|
||||
log.info(s"targetPath is '$workingDir'")
|
||||
|
||||
createTemporaryData(spark, graphPath, orcidPath, workingDir)
|
||||
analisys(workingDir, classid, provenance)
|
||||
generateGraph(spark, graphPath, workingDir, targetPath)
|
||||
}
|
||||
|
||||
private def processAndMerge(
|
||||
spark: SparkSession,
|
||||
inputPath: String,
|
||||
outputPath: String,
|
||||
clazz: Class[Result],
|
||||
encoder: Encoder[Result]
|
||||
): Unit = {
|
||||
var tmp = spark.read
|
||||
.schema(Encoders.bean(clazz).schema)
|
||||
.json(inputPath)
|
||||
.as(encoder)
|
||||
tmp
|
||||
.groupByKey(r => r.getId)(Encoders.STRING)
|
||||
.mapGroups((k, it) => {
|
||||
val p: Result = it.next
|
||||
|
||||
it.foldLeft(p.getAuthor)((x, r) => MergeUtils.mergeAuthors(x, r.getAuthor, 0))
|
||||
p
|
||||
|
||||
})(encoder)
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath)
|
||||
}
|
||||
|
||||
private def generateGraph(spark: SparkSession, graphPath: String, workingDir: String, targetPath: String): Unit = {
|
||||
ModelSupport.entityTypes
|
||||
.keySet()
|
||||
.asScala
|
||||
.filter(ModelSupport.isResult)
|
||||
.foreach((e: EntityType) => {
|
||||
val resultClazz: Class[Result] = ModelSupport.entityTypes.get(e).asInstanceOf[Class[Result]]
|
||||
val matched: Dataset[Row] = spark.read
|
||||
.schema(Encoders.bean(classOf[ORCIDAuthorEnricherResult]).schema)
|
||||
.parquet(workingDir + "/" + e.name + "_matched")
|
||||
.selectExpr("id", "enriched_author")
|
||||
|
||||
val result: Dataset[Row] = spark.read
|
||||
.schema(Encoders.bean(resultClazz).schema)
|
||||
.json(graphPath + "/" + e.name)
|
||||
|
||||
result
|
||||
.join(matched, Seq("id"), "left")
|
||||
.withColumn(
|
||||
"author",
|
||||
when(size(col("enriched_author")).gt(0), col("enriched_author"))
|
||||
.otherwise(col("author"))
|
||||
)
|
||||
.drop("enriched_author")
|
||||
.as(Encoders.bean(resultClazz))
|
||||
.groupByKey(r => r.getId)(Encoders.STRING)
|
||||
.mapGroups((k, it) => {
|
||||
val p: Result = it.next
|
||||
|
||||
p.setAuthor(it.foldLeft(p.getAuthor)((x, r) => MergeUtils.mergeAuthors(x, r.getAuthor, 0)))
|
||||
|
||||
p
|
||||
|
||||
})(Encoders.bean(resultClazz))
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(targetPath + "/" + e.name)
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
def createTemporaryData(spark: SparkSession, graphPath: String, orcidPath: String, targetPath: String): Unit
|
||||
|
||||
private def analisys(targetPath: String, classid: String, provenance: String): Unit = {
|
||||
ModelSupport.entityTypes.asScala
|
||||
.filter(e => ModelSupport.isResult(e._1))
|
||||
.foreach(e => {
|
||||
val resultType = e._1.name()
|
||||
val c = classid
|
||||
val p = provenance
|
||||
|
||||
spark.read
|
||||
.parquet(s"$targetPath/${resultType}_unmatched")
|
||||
.where("size(graph_authors) > 0")
|
||||
.as[MatchData](Encoders.bean(classOf[MatchData]))
|
||||
.map(md => {
|
||||
ORCIDAuthorEnricher.enrichOrcid(md.id, md.graph_authors, md.orcid_authors, c, p)
|
||||
})(Encoders.bean(classOf[ORCIDAuthorEnricherResult]))
|
||||
.write
|
||||
.option("compression", "gzip")
|
||||
.mode("overwrite")
|
||||
.parquet(s"$targetPath/${resultType}_matched")
|
||||
})
|
||||
}
|
||||
}
|
|
@ -1,11 +1,12 @@
|
|||
package eu.dnetlib.pace.util
|
||||
package eu.dnetlib.dhp.utils
|
||||
|
||||
import java.text.Normalizer
|
||||
import java.util.Locale
|
||||
import java.util.regex.Pattern
|
||||
import scala.util.control.Breaks.{break, breakable}
|
||||
|
||||
object AuthorMatchers {
|
||||
val SPLIT_REGEX = Pattern.compile("[\\s,\\.]+")
|
||||
val SPLIT_REGEX = Pattern.compile("[\\s\\p{Punct}\\p{Pd}]+")
|
||||
|
||||
val WORD_DIFF = 2
|
||||
|
||||
|
@ -24,9 +25,16 @@ object AuthorMatchers {
|
|||
}
|
||||
}
|
||||
|
||||
def normalize(s: String): Array[String] = {
|
||||
SPLIT_REGEX
|
||||
.split(Normalizer.normalize(s, Normalizer.Form.NFC).toLowerCase(Locale.ROOT))
|
||||
.filter(_.nonEmpty)
|
||||
.sorted
|
||||
}
|
||||
|
||||
def matchOrderedTokenAndAbbreviations(a1: String, a2: String): Boolean = {
|
||||
val p1: Array[String] = SPLIT_REGEX.split(a1.trim.toLowerCase(Locale.ROOT)).filter(_.nonEmpty).sorted
|
||||
val p2: Array[String] = SPLIT_REGEX.split(a2.trim.toLowerCase(Locale.ROOT)).filter(_.nonEmpty).sorted
|
||||
val p1: Array[String] = normalize(a1)
|
||||
val p2: Array[String] = normalize(a2)
|
||||
|
||||
if (p1.length < 2 || p2.length < 2) return false
|
||||
if (Math.abs(p1.length - p2.length) > WORD_DIFF) return false // use alternative comparison algo
|
||||
|
@ -66,19 +74,18 @@ object AuthorMatchers {
|
|||
}
|
||||
|
||||
def removeMatches(
|
||||
graph_authors: java.util.List[String],
|
||||
orcid_authors: java.util.List[String],
|
||||
matchingFunc: java.util.function.BiFunction[String,String,Boolean]
|
||||
) : java.util.List[String] = {
|
||||
removeMatches(graph_authors, orcid_authors, (a, b) => matchingFunc(a,b))
|
||||
graph_authors: java.util.List[String],
|
||||
orcid_authors: java.util.List[String],
|
||||
matchingFunc: java.util.function.BiFunction[String, String, Boolean]
|
||||
): java.util.List[String] = {
|
||||
removeMatches(graph_authors, orcid_authors, (a, b) => matchingFunc(a, b))
|
||||
}
|
||||
|
||||
|
||||
def removeMatches(
|
||||
graph_authors: java.util.List[String],
|
||||
orcid_authors: java.util.List[String],
|
||||
matchingFunc: (String, String) => Boolean
|
||||
) : java.util.List[String] = {
|
||||
graph_authors: java.util.List[String],
|
||||
orcid_authors: java.util.List[String],
|
||||
matchingFunc: (String, String) => Boolean
|
||||
): java.util.List[String] = {
|
||||
val matched = new java.util.ArrayList[String]()
|
||||
|
||||
if (graph_authors != null && !graph_authors.isEmpty) {
|
|
@ -0,0 +1,197 @@
|
|||
package eu.dnetlib.dhp.utils
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||
import eu.dnetlib.dhp.schema.oaf.{Author, StructuredProperty}
|
||||
import eu.dnetlib.dhp.schema.sx.OafUtils
|
||||
|
||||
import java.util
|
||||
import scala.beans.BeanProperty
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.util.control.Breaks.{break, breakable}
|
||||
import eu.dnetlib.dhp.common.enrichment.Constants._
|
||||
|
||||
case class OrcidAuthor(
|
||||
@BeanProperty var orcid: String,
|
||||
@BeanProperty var familyName: String,
|
||||
@BeanProperty var givenName: String,
|
||||
@BeanProperty var creditName: String,
|
||||
@BeanProperty var otherNames: java.util.List[String]
|
||||
) {
|
||||
def this() = this("null", "null", "null", "null", null)
|
||||
}
|
||||
|
||||
case class MatchedAuthors(
|
||||
@BeanProperty var author: Author,
|
||||
@BeanProperty var orcid: OrcidAuthor,
|
||||
@BeanProperty var `type`: String
|
||||
)
|
||||
|
||||
case class MatchData(
|
||||
@BeanProperty var id: String,
|
||||
@BeanProperty var graph_authors: java.util.List[Author],
|
||||
@BeanProperty var orcid_authors: java.util.List[OrcidAuthor]
|
||||
) {
|
||||
def this() = this("null", null, null)
|
||||
}
|
||||
|
||||
case class ORCIDAuthorEnricherResult(
|
||||
@BeanProperty var id: String,
|
||||
@BeanProperty var enriched_author: java.util.List[Author],
|
||||
@BeanProperty var author_matched: java.util.List[MatchedAuthors],
|
||||
@BeanProperty var author_unmatched: java.util.List[Author],
|
||||
@BeanProperty var orcid_unmatched: java.util.List[OrcidAuthor]
|
||||
)
|
||||
|
||||
object ORCIDAuthorEnricher extends Serializable {
|
||||
|
||||
def enrichOrcid(
|
||||
id: String,
|
||||
graph_authors: java.util.List[Author],
|
||||
orcid_authors: java.util.List[OrcidAuthor],
|
||||
classid: String,
|
||||
provenance: String
|
||||
): ORCIDAuthorEnricherResult = {
|
||||
// Author enriching strategy:
|
||||
// 1) create a copy of graph author list in unmatched_authors
|
||||
// 2) find best match in unmatched_authors, remove it from unmatched_authors and enrich it so
|
||||
// that the enrichment is reflected in graph_authors (they share author instances).
|
||||
// Do not match in case of ambiguity: two authors match and at least one of them has affiliation string
|
||||
// 3) repeat (2) till the end of the list and then with different matching algorithms that have decreasing
|
||||
// trust in their output
|
||||
// At the end unmatched_authors will contain authors not matched with any of the matching algos
|
||||
val unmatched_authors = new util.ArrayList[Author](graph_authors)
|
||||
|
||||
val matches = {
|
||||
// Look after exact fullname match, reconstruct ORCID fullname as givenName + familyName
|
||||
extractAndEnrichMatches(
|
||||
unmatched_authors,
|
||||
orcid_authors,
|
||||
(author, orcid) =>
|
||||
AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.givenName + " " + orcid.familyName),
|
||||
"fullName",
|
||||
classid,
|
||||
provenance
|
||||
) ++
|
||||
// Look after exact reversed fullname match, reconstruct ORCID fullname as familyName + givenName
|
||||
extractAndEnrichMatches(
|
||||
unmatched_authors,
|
||||
orcid_authors,
|
||||
(author, orcid) =>
|
||||
AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.familyName + " " + orcid.givenName),
|
||||
"reversedFullName",
|
||||
classid,
|
||||
provenance
|
||||
) ++
|
||||
// split author names in tokens, order the tokens, then check for matches of full tokens or abbreviations
|
||||
extractAndEnrichMatches(
|
||||
unmatched_authors,
|
||||
orcid_authors,
|
||||
(author, orcid) =>
|
||||
AuthorMatchers
|
||||
.matchOrderedTokenAndAbbreviations(author.getFullname, orcid.givenName + " " + orcid.familyName),
|
||||
"orderedTokens-1",
|
||||
classid,
|
||||
provenance,
|
||||
skipAmbiguities = true
|
||||
) ++
|
||||
// split author names in tokens, order the tokens, then check for matches of full tokens or abbreviations
|
||||
extractAndEnrichMatches(
|
||||
unmatched_authors,
|
||||
orcid_authors,
|
||||
(author, orcid) =>
|
||||
AuthorMatchers
|
||||
.matchOrderedTokenAndAbbreviations(author.getFullname, orcid.givenName + " " + orcid.familyName),
|
||||
"orderedTokens-2",
|
||||
classid,
|
||||
provenance
|
||||
) ++
|
||||
// look after exact matches of ORCID creditName
|
||||
extractAndEnrichMatches(
|
||||
unmatched_authors,
|
||||
orcid_authors,
|
||||
(author, orcid) => AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.creditName),
|
||||
"creditName",
|
||||
classid,
|
||||
provenance
|
||||
) ++
|
||||
// look after exact matches in ORCID otherNames
|
||||
extractAndEnrichMatches(
|
||||
unmatched_authors,
|
||||
orcid_authors,
|
||||
(author, orcid) =>
|
||||
orcid.otherNames != null && AuthorMatchers.matchOtherNames(author.getFullname, orcid.otherNames.asScala),
|
||||
"otherNames",
|
||||
classid,
|
||||
provenance
|
||||
)
|
||||
}
|
||||
|
||||
ORCIDAuthorEnricherResult(id, graph_authors, matches.asJava, unmatched_authors, orcid_authors)
|
||||
}
|
||||
|
||||
private def extractAndEnrichMatches(
|
||||
unmatched_authors: java.util.List[Author],
|
||||
orcid_authors: java.util.List[OrcidAuthor],
|
||||
matchingFunc: (Author, OrcidAuthor) => Boolean,
|
||||
matchName: String,
|
||||
classid: String,
|
||||
provenance: String,
|
||||
skipAmbiguities: Boolean = false
|
||||
): ArrayBuffer[MatchedAuthors] = {
|
||||
val matched = ArrayBuffer.empty[MatchedAuthors]
|
||||
|
||||
if (unmatched_authors == null || unmatched_authors.isEmpty) {
|
||||
return matched
|
||||
}
|
||||
|
||||
val oit = orcid_authors.iterator
|
||||
while (oit.hasNext) {
|
||||
val orcid = oit.next()
|
||||
val candidates = unmatched_authors.asScala.foldLeft(ArrayBuffer[Author]())((res, author) => {
|
||||
if (matchingFunc(author, orcid)) {
|
||||
res += author
|
||||
}
|
||||
|
||||
res
|
||||
})
|
||||
|
||||
if (
|
||||
candidates.size == 1 ||
|
||||
(candidates.size > 1 && !skipAmbiguities && !candidates
|
||||
.exists(a => a.getRawAffiliationString != null && !a.getRawAffiliationString.isEmpty))
|
||||
) {
|
||||
val author = candidates(0)
|
||||
unmatched_authors.remove(author)
|
||||
oit.remove()
|
||||
matched += MatchedAuthors(author, orcid, matchName)
|
||||
|
||||
if (author.getPid == null) {
|
||||
author.setPid(new util.ArrayList[StructuredProperty]())
|
||||
}
|
||||
|
||||
val orcidPID = OafUtils.createSP(orcid.orcid, classid, classid)
|
||||
orcidPID.setDataInfo(OafUtils.generateDataInfo())
|
||||
if (provenance.equalsIgnoreCase(PROPAGATION_DATA_INFO_TYPE)) {
|
||||
orcidPID.getDataInfo.setInferenceprovenance(PROPAGATION_DATA_INFO_TYPE);
|
||||
orcidPID.getDataInfo.setInferred(true);
|
||||
orcidPID.getDataInfo.setProvenanceaction(
|
||||
OafUtils.createQualifier(
|
||||
PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID,
|
||||
PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME
|
||||
)
|
||||
)
|
||||
} else
|
||||
orcidPID.getDataInfo.setProvenanceaction(
|
||||
OafUtils.createQualifier(provenance, provenance)
|
||||
)
|
||||
|
||||
author.getPid.add(orcidPID)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
matched
|
||||
}
|
||||
|
||||
}
|
|
@ -1,10 +1,10 @@
|
|||
package eu.dnetlib.dhp.enrich.orcid
|
||||
package eu.dnetlib.dhp.utils
|
||||
|
||||
import eu.dnetlib.pace.util.AuthorMatchers.matchOrderedTokenAndAbbreviations
|
||||
import eu.dnetlib.dhp.utils.AuthorMatchers.matchOrderedTokenAndAbbreviations
|
||||
import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
|
||||
import org.junit.jupiter.api.Test
|
||||
|
||||
class ORCIDAuthorMatchersTest {
|
||||
class AuthorMatchersTest {
|
||||
|
||||
@Test def testShortNames(): Unit = {
|
||||
assertTrue(matchOrderedTokenAndAbbreviations("Lasagni Mariozzi Federico", "Lasagni F. Mariozzi"))
|
|
@ -9,11 +9,11 @@ import java.util.stream.Collectors;
|
|||
|
||||
import com.wcohen.ss.AbstractStringDistance;
|
||||
|
||||
import eu.dnetlib.dhp.utils.AuthorMatchers;
|
||||
import eu.dnetlib.pace.config.Config;
|
||||
import eu.dnetlib.pace.model.Person;
|
||||
import eu.dnetlib.pace.tree.support.AbstractListComparator;
|
||||
import eu.dnetlib.pace.tree.support.ComparatorClass;
|
||||
import eu.dnetlib.pace.util.AuthorMatchers;
|
||||
|
||||
@ComparatorClass("authorsMatch")
|
||||
public class AuthorsMatch extends AbstractListComparator {
|
||||
|
|
|
@ -21,9 +21,13 @@ class DecisionTreeTest {
|
|||
void testJPath() throws IOException {
|
||||
|
||||
DedupConfig conf = DedupConfig
|
||||
.load(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/dedup_conf_organization.json")));
|
||||
.load(
|
||||
IOUtils
|
||||
.toString(
|
||||
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/dedup_conf_organization.json")));
|
||||
|
||||
final String org = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/organization.json"));
|
||||
final String org = IOUtils
|
||||
.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/organization.json"));
|
||||
|
||||
Row row = SparkModel.apply(conf).rowFromJson(org);
|
||||
|
||||
|
@ -42,7 +46,8 @@ class DecisionTreeTest {
|
|||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
|
||||
|
||||
final String org = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/organization_example1.json"));
|
||||
final String org = IOUtils
|
||||
.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/organization_example1.json"));
|
||||
|
||||
Row row = SparkModel.apply(conf).rowFromJson(org);
|
||||
// to check that the same parsing returns the same row
|
||||
|
|
|
@ -440,7 +440,8 @@ public class SparkDedupTest implements Serializable {
|
|||
.count();
|
||||
|
||||
final List<Relation> merges = pubs
|
||||
.filter("source == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'")// and relClass = '"+ModelConstants.MERGES+"'")
|
||||
.filter("source == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'")// and relClass =
|
||||
// '"+ModelConstants.MERGES+"'")
|
||||
.collectAsList();
|
||||
assertEquals(4, merges.size());
|
||||
Set<String> dups = Sets
|
||||
|
|
|
@ -19,9 +19,13 @@ class JsonPathTest {
|
|||
void testJPath() throws IOException {
|
||||
|
||||
DedupConfig conf = DedupConfig
|
||||
.load(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/dedup_conf_organization.json")));
|
||||
.load(
|
||||
IOUtils
|
||||
.toString(
|
||||
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/dedup_conf_organization.json")));
|
||||
|
||||
final String org = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/organization.json"));
|
||||
final String org = IOUtils
|
||||
.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/organization.json"));
|
||||
|
||||
Row row = SparkModel.apply(conf).rowFromJson(org);
|
||||
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
|
||||
package eu.dnetlib.dhp;
|
||||
|
||||
import static eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
@ -46,7 +48,7 @@ public class PropagationConstant {
|
|||
|
||||
public static final String INSTITUTIONAL_REPO_TYPE = "institutional";
|
||||
|
||||
public static final String PROPAGATION_DATA_INFO_TYPE = "propagation";
|
||||
// public static final String PROPAGATION_DATA_INFO_TYPE = "propagation";
|
||||
|
||||
public static final String TRUE = "true";
|
||||
|
||||
|
@ -74,9 +76,6 @@ public class PropagationConstant {
|
|||
public static final String PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_ID = "result:community:project";
|
||||
public static final String PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_NAME = " Propagation of result belonging to community through project";
|
||||
|
||||
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID = "authorpid:result";
|
||||
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME = "Propagation of authors pid to result through semantic relations";
|
||||
|
||||
public static final String ITERATION_ONE = "ExitAtFirstIteration";
|
||||
public static final String ITERATION_TWO = "ExitAtSecondIteration";
|
||||
public static final String ITERATION_THREE = "ExitAtThirdIteration";
|
||||
|
|
|
@ -1,43 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
||||
|
||||
public class AutoritativeAuthor {
|
||||
|
||||
private String name;
|
||||
private String surname;
|
||||
private String fullname;
|
||||
private String orcid;
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getSurname() {
|
||||
return surname;
|
||||
}
|
||||
|
||||
public void setSurname(String surname) {
|
||||
this.surname = surname;
|
||||
}
|
||||
|
||||
public String getFullname() {
|
||||
return fullname;
|
||||
}
|
||||
|
||||
public void setFullname(String fullname) {
|
||||
this.fullname = fullname;
|
||||
}
|
||||
|
||||
public String getOrcid() {
|
||||
return orcid;
|
||||
}
|
||||
|
||||
public void setOrcid(String orcid) {
|
||||
this.orcid = orcid;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
|
||||
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
import eu.dnetlib.dhp.utils.OrcidAuthor;
|
||||
|
||||
public class OrcidAuthors implements Serializable {
|
||||
List<OrcidAuthor> orcidAuthorList;
|
||||
|
||||
public List<OrcidAuthor> getOrcidAuthorList() {
|
||||
return orcidAuthorList;
|
||||
}
|
||||
|
||||
public void setOrcidAuthorList(List<OrcidAuthor> orcidAuthorList) {
|
||||
this.orcidAuthorList = orcidAuthorList;
|
||||
}
|
||||
}
|
|
@ -1,124 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
||||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
||||
public class PrepareResultOrcidAssociationStep1 {
|
||||
private static final Logger log = LoggerFactory.getLogger(PrepareResultOrcidAssociationStep1.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConf = IOUtils
|
||||
.toString(
|
||||
PrepareResultOrcidAssociationStep1.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConf);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
String inputPath = parser.get("sourcePath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
final String resultClassName = parser.get("resultTableName");
|
||||
log.info("resultTableName: {}", resultClassName);
|
||||
|
||||
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
|
||||
log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel));
|
||||
|
||||
final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase();
|
||||
log.info("resultType: {}", resultType);
|
||||
|
||||
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
||||
|
||||
String inputRelationPath = inputPath + "/relation";
|
||||
log.info("inputRelationPath: {}", inputRelationPath);
|
||||
|
||||
String inputResultPath = inputPath + "/" + resultType;
|
||||
log.info("inputResultPath: {}", inputResultPath);
|
||||
|
||||
String outputResultPath = outputPath + "/" + resultType;
|
||||
log.info("outputResultPath: {}", outputResultPath);
|
||||
|
||||
runWithSparkHiveSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
removeOutputDir(spark, outputPath);
|
||||
prepareInfo(
|
||||
spark, inputRelationPath, inputResultPath, outputResultPath, resultClazz, allowedsemrel);
|
||||
});
|
||||
}
|
||||
|
||||
private static <R extends Result> void prepareInfo(
|
||||
SparkSession spark,
|
||||
String inputRelationPath,
|
||||
String inputResultPath,
|
||||
String outputResultPath,
|
||||
Class<R> resultClazz,
|
||||
List<String> allowedsemrel) {
|
||||
|
||||
Dataset<Relation> relation = readPath(spark, inputRelationPath, Relation.class);
|
||||
relation.createOrReplaceTempView("relation");
|
||||
|
||||
log.info("Reading Graph table from: {}", inputResultPath);
|
||||
Dataset<R> result = readPath(spark, inputResultPath, resultClazz);
|
||||
result.createOrReplaceTempView("result");
|
||||
|
||||
String query = "SELECT target resultId, author authorList"
|
||||
+ " FROM (SELECT id, collect_set(named_struct('name', name, 'surname', surname, 'fullname', fullname, 'orcid', orcid)) author "
|
||||
+ " FROM ( "
|
||||
+ " SELECT DISTINCT id, MyT.fullname, MyT.name, MyT.surname, MyP.value orcid "
|
||||
+ " FROM result "
|
||||
+ " LATERAL VIEW EXPLODE (author) a AS MyT "
|
||||
+ " LATERAL VIEW EXPLODE (MyT.pid) p AS MyP "
|
||||
+ " WHERE lower(MyP.qualifier.classid) = '" + ModelConstants.ORCID + "' or "
|
||||
+ " lower(MyP.qualifier.classid) = '" + ModelConstants.ORCID_PENDING + "') tmp "
|
||||
+ " GROUP BY id) r_t "
|
||||
+ " JOIN ("
|
||||
+ " SELECT source, target "
|
||||
+ " FROM relation "
|
||||
+ " WHERE datainfo.deletedbyinference = false "
|
||||
+ getConstraintList(" lower(relclass) = '", allowedsemrel)
|
||||
+ " ) rel_rel "
|
||||
+ " ON source = id";
|
||||
|
||||
log.info("executedQuery: {}", query);
|
||||
spark
|
||||
.sql(query)
|
||||
.as(Encoders.bean(ResultOrcidList.class))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputResultPath);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,95 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
||||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class PrepareResultOrcidAssociationStep2 {
|
||||
private static final Logger log = LoggerFactory.getLogger(PrepareResultOrcidAssociationStep2.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
PrepareResultOrcidAssociationStep2.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters2.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
String inputPath = parser.get("sourcePath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
removeOutputDir(spark, outputPath);
|
||||
mergeInfo(spark, inputPath, outputPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static void mergeInfo(SparkSession spark, String inputPath, String outputPath) {
|
||||
|
||||
Dataset<ResultOrcidList> resultOrcidAssoc = readPath(spark, inputPath + "/publication", ResultOrcidList.class)
|
||||
.union(readPath(spark, inputPath + "/dataset", ResultOrcidList.class))
|
||||
.union(readPath(spark, inputPath + "/otherresearchproduct", ResultOrcidList.class))
|
||||
.union(readPath(spark, inputPath + "/software", ResultOrcidList.class));
|
||||
|
||||
resultOrcidAssoc
|
||||
.toJavaRDD()
|
||||
.mapToPair(r -> new Tuple2<>(r.getResultId(), r))
|
||||
.reduceByKey(
|
||||
(a, b) -> {
|
||||
if (a == null) {
|
||||
return b;
|
||||
}
|
||||
if (b == null) {
|
||||
return a;
|
||||
}
|
||||
Set<String> orcid_set = new HashSet<>();
|
||||
a.getAuthorList().stream().forEach(aa -> orcid_set.add(aa.getOrcid()));
|
||||
b
|
||||
.getAuthorList()
|
||||
.stream()
|
||||
.forEach(
|
||||
aa -> {
|
||||
if (!orcid_set.contains(aa.getOrcid())) {
|
||||
a.getAuthorList().add(aa);
|
||||
orcid_set.add(aa.getOrcid());
|
||||
}
|
||||
});
|
||||
return a;
|
||||
})
|
||||
.map(Tuple2::_2)
|
||||
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
|
||||
.saveAsTextFile(outputPath, GzipCodec.class);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,27 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class ResultOrcidList implements Serializable {
|
||||
String resultId;
|
||||
List<AutoritativeAuthor> authorList = new ArrayList<>();
|
||||
|
||||
public String getResultId() {
|
||||
return resultId;
|
||||
}
|
||||
|
||||
public void setResultId(String resultId) {
|
||||
this.resultId = resultId;
|
||||
}
|
||||
|
||||
public List<AutoritativeAuthor> getAuthorList() {
|
||||
return authorList;
|
||||
}
|
||||
|
||||
public void setAuthorList(List<AutoritativeAuthor> authorList) {
|
||||
this.authorList = authorList;
|
||||
}
|
||||
}
|
|
@ -1,211 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
||||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.PacePerson;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class SparkOrcidToResultFromSemRelJob {
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkOrcidToResultFromSemRelJob.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
SparkOrcidToResultFromSemRelJob.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String inputPath = parser.get("sourcePath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
final String possibleUpdates = parser.get("possibleUpdatesPath");
|
||||
log.info("possibleUpdatesPath: {}", possibleUpdates);
|
||||
|
||||
final String resultClassName = parser.get("resultTableName");
|
||||
log.info("resultTableName: {}", resultClassName);
|
||||
|
||||
final Boolean saveGraph = Optional
|
||||
.ofNullable(parser.get("saveGraph"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("saveGraph: {}", saveGraph);
|
||||
|
||||
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
removeOutputDir(spark, outputPath);
|
||||
if (saveGraph) {
|
||||
execPropagation(spark, possibleUpdates, inputPath, outputPath, resultClazz);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static <R extends Result> void execPropagation(
|
||||
SparkSession spark,
|
||||
String possibleUpdatesPath,
|
||||
String inputPath,
|
||||
String outputPath,
|
||||
Class<R> resultClazz) {
|
||||
|
||||
// read possible updates (resultId and list of possible orcid to add
|
||||
Dataset<ResultOrcidList> possible_updates = readPath(spark, possibleUpdatesPath, ResultOrcidList.class);
|
||||
// read the result we have been considering
|
||||
Dataset<R> result = readPath(spark, inputPath, resultClazz);
|
||||
// make join result left_outer with possible updates
|
||||
|
||||
result
|
||||
.joinWith(
|
||||
possible_updates,
|
||||
result.col("id").equalTo(possible_updates.col("resultId")),
|
||||
"left_outer")
|
||||
.map(authorEnrichFn(), Encoders.bean(resultClazz))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath);
|
||||
}
|
||||
|
||||
private static <R extends Result> MapFunction<Tuple2<R, ResultOrcidList>, R> authorEnrichFn() {
|
||||
return value -> {
|
||||
R ret = value._1();
|
||||
Optional<ResultOrcidList> rol = Optional.ofNullable(value._2());
|
||||
if (rol.isPresent() && Optional.ofNullable(ret.getAuthor()).isPresent()) {
|
||||
List<Author> toenrich_author = ret.getAuthor();
|
||||
List<AutoritativeAuthor> autoritativeAuthors = rol.get().getAuthorList();
|
||||
for (Author author : toenrich_author) {
|
||||
if (!containsAllowedPid(author)) {
|
||||
enrichAuthor(author, autoritativeAuthors);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
};
|
||||
}
|
||||
|
||||
private static void enrichAuthor(Author a, List<AutoritativeAuthor> au) {
|
||||
PacePerson pp = new PacePerson(a.getFullname(), false);
|
||||
for (AutoritativeAuthor aa : au) {
|
||||
if (enrichAuthor(aa, a, pp.getNormalisedFirstName(), pp.getNormalisedSurname())) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean enrichAuthor(AutoritativeAuthor autoritative_author, Author author,
|
||||
String author_name,
|
||||
String author_surname) {
|
||||
boolean toaddpid = false;
|
||||
|
||||
if (StringUtils.isNotEmpty(autoritative_author.getSurname())) {
|
||||
if (StringUtils.isNotEmpty(author.getSurname())) {
|
||||
author_surname = author.getSurname();
|
||||
}
|
||||
if (StringUtils.isNotEmpty(author_surname)) {
|
||||
// have the same surname. Check the name
|
||||
if (autoritative_author
|
||||
.getSurname()
|
||||
.trim()
|
||||
.equalsIgnoreCase(author_surname.trim()) && StringUtils.isNotEmpty(autoritative_author.getName())) {
|
||||
if (StringUtils.isNotEmpty(author.getName())) {
|
||||
author_name = author.getName();
|
||||
}
|
||||
if (StringUtils.isNotEmpty(author_name)) {
|
||||
if (autoritative_author
|
||||
.getName()
|
||||
.trim()
|
||||
.equalsIgnoreCase(author_name.trim())) {
|
||||
toaddpid = true;
|
||||
}
|
||||
// they could be differently written (i.e. only the initials of the name
|
||||
// in one of the two
|
||||
else {
|
||||
if (autoritative_author
|
||||
.getName()
|
||||
.trim()
|
||||
.substring(0, 0)
|
||||
.equalsIgnoreCase(author_name.trim().substring(0, 0))) {
|
||||
toaddpid = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (toaddpid) {
|
||||
StructuredProperty p = new StructuredProperty();
|
||||
p.setValue(autoritative_author.getOrcid());
|
||||
p
|
||||
.setQualifier(
|
||||
getQualifier(
|
||||
ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, ModelConstants.DNET_PID_TYPES));
|
||||
p
|
||||
.setDataInfo(
|
||||
getDataInfo(
|
||||
PROPAGATION_DATA_INFO_TYPE,
|
||||
PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID,
|
||||
PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS));
|
||||
|
||||
Optional<List<StructuredProperty>> authorPid = Optional.ofNullable(author.getPid());
|
||||
if (authorPid.isPresent()) {
|
||||
authorPid.get().add(p);
|
||||
} else {
|
||||
author.setPid(Lists.newArrayList(p));
|
||||
}
|
||||
|
||||
}
|
||||
return toaddpid;
|
||||
}
|
||||
|
||||
private static boolean containsAllowedPid(Author a) {
|
||||
Optional<List<StructuredProperty>> pids = Optional.ofNullable(a.getPid());
|
||||
if (!pids.isPresent()) {
|
||||
return false;
|
||||
}
|
||||
for (StructuredProperty pid : pids.get()) {
|
||||
if (ModelConstants.ORCID_PENDING.equalsIgnoreCase(pid.getQualifier().getClassid()) ||
|
||||
ModelConstants.ORCID.equalsIgnoreCase(pid.getQualifier().getClassid())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,158 @@
|
|||
|
||||
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.common.author.SparkEnrichWithOrcidAuthors;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.utils.OrcidAuthor;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class SparkPropagateOrcidAuthor extends SparkEnrichWithOrcidAuthors {
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkPropagateOrcidAuthor.class);
|
||||
|
||||
public SparkPropagateOrcidAuthor(String propertyPath, String[] args, Logger log) {
|
||||
super(propertyPath, args, log);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
// Create instance and run the Spark application
|
||||
SparkPropagateOrcidAuthor app = new SparkPropagateOrcidAuthor(
|
||||
"/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json", args, log);
|
||||
app.initialize().run();
|
||||
|
||||
}
|
||||
|
||||
private static OrcidAuthors getOrcidAuthorsList(List<Author> authors) {
|
||||
OrcidAuthors oas = new OrcidAuthors();
|
||||
List<OrcidAuthor> tmp = authors
|
||||
.stream()
|
||||
.map(SparkPropagateOrcidAuthor::getOrcidAuthor)
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toList());
|
||||
oas.setOrcidAuthorList(tmp);
|
||||
return oas;
|
||||
}
|
||||
|
||||
private static OrcidAuthor getOrcidAuthor(Author a) {
|
||||
return Optional
|
||||
.ofNullable(getOrcid(a))
|
||||
.map(orcid -> {
|
||||
OrcidAuthor orcidAuthor = new OrcidAuthor(orcid, a.getSurname(), a.getName(), a.getFullname(), null);
|
||||
return orcidAuthor;
|
||||
})
|
||||
.orElse(null);
|
||||
|
||||
}
|
||||
|
||||
private static String getOrcid(Author a) {
|
||||
String orcid = null;
|
||||
if (a.getPid().stream().anyMatch(p -> p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID)))
|
||||
orcid = a
|
||||
.getPid()
|
||||
.stream()
|
||||
.filter(p -> p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID))
|
||||
.findFirst()
|
||||
.get()
|
||||
.getValue();
|
||||
else if (a
|
||||
.getPid()
|
||||
.stream()
|
||||
.anyMatch(p -> p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID_PENDING)))
|
||||
orcid = a
|
||||
.getPid()
|
||||
.stream()
|
||||
.filter(p -> p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID_PENDING))
|
||||
.findFirst()
|
||||
.get()
|
||||
.getValue();
|
||||
return orcid;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createTemporaryData(SparkSession spark, String graphPath, String orcidPath, String targetPath) {
|
||||
Dataset<Row> supplements = spark
|
||||
.read()
|
||||
.schema(Encoders.bean(Relation.class).schema())
|
||||
.json(graphPath + "/" + "relation")
|
||||
.where(
|
||||
"relclass IN('" + ModelConstants.IS_SUPPLEMENT_TO + "', '" +
|
||||
ModelConstants.IS_SUPPLEMENTED_BY + "')")
|
||||
.selectExpr("source as id", "target");
|
||||
|
||||
Dataset<Row> result = spark
|
||||
.read()
|
||||
.schema(Encoders.bean(Result.class).schema())
|
||||
.json(
|
||||
graphPath + "/dataset", graphPath + "/publication", graphPath + "/software",
|
||||
graphPath + "/otherresearchproduct")
|
||||
.as(Encoders.bean(Result.class))
|
||||
.selectExpr("id", "author as graph_authors");
|
||||
|
||||
ModelSupport.entityTypes
|
||||
.keySet()
|
||||
.stream()
|
||||
.filter(ModelSupport::isResult)
|
||||
.forEach(e -> {
|
||||
Dataset<Row> orcidDnet = spark
|
||||
.read()
|
||||
.schema(Encoders.bean(Result.class).schema())
|
||||
.json(graphPath + "/" + e.name())
|
||||
.as(Encoders.bean(Result.class))
|
||||
.filter(
|
||||
(FilterFunction<Result>) SparkPropagateOrcidAuthor::hasOrcidAuthor)
|
||||
.map(
|
||||
(MapFunction<Result, Tuple2<String, OrcidAuthors>>) r -> new Tuple2<>(r.getId(),
|
||||
getOrcidAuthorsList(r.getAuthor())),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.bean(OrcidAuthors.class)))
|
||||
.selectExpr("_1 as target", "_2.orcidAuthorList as orcid_authors");
|
||||
|
||||
result
|
||||
.join(supplements, "id")
|
||||
.join(orcidDnet, "target")
|
||||
.drop("target")
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.parquet(targetPath + "/" + e.name() + "_unmatched");
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
private static boolean hasOrcidAuthor(Result r) {
|
||||
|
||||
return r.getAuthor() != null &&
|
||||
isaBoolean(r);
|
||||
}
|
||||
|
||||
private static boolean isaBoolean(Result r) {
|
||||
boolean tmp = r
|
||||
.getAuthor()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
a -> a.getPid() != null && a
|
||||
.getPid()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
p -> p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID) ||
|
||||
p
|
||||
.getQualifier()
|
||||
.getClassid()
|
||||
.equalsIgnoreCase(ModelConstants.ORCID_PENDING)));
|
||||
return tmp;
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@ package eu.dnetlib.dhp.projecttoresult;
|
|||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
import static eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
|
|
@ -3,6 +3,7 @@ package eu.dnetlib.dhp.resulttocommunityfromorganization;
|
|||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
import static eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
|
|
@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.PropagationConstant.*;
|
|||
import static eu.dnetlib.dhp.PropagationConstant.PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
import static eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
|
|
|
@ -3,6 +3,7 @@ package eu.dnetlib.dhp.resulttocommunityfromsemrel;
|
|||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
import static eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
|
|
@ -1,44 +1,32 @@
|
|||
[
|
||||
{
|
||||
"paramName":"s",
|
||||
"paramLongName":"sourcePath",
|
||||
"paramLongName":"graphPath",
|
||||
"paramDescription": "the path of the sequencial file to read",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName":"sg",
|
||||
"paramLongName":"saveGraph",
|
||||
"paramDescription": "true if the new version of the graph must be saved",
|
||||
"paramRequired": false
|
||||
},
|
||||
|
||||
{
|
||||
"paramName": "out",
|
||||
"paramLongName": "outputPath",
|
||||
"paramLongName": "targetPath",
|
||||
"paramDescription": "the path used to store temporary output files",
|
||||
"paramRequired": true
|
||||
},
|
||||
}, {
|
||||
"paramName": "o",
|
||||
"paramLongName": "orcidPath",
|
||||
"paramDescription": "the path used to store temporary output files",
|
||||
"paramRequired": true
|
||||
}, {
|
||||
"paramName": "w",
|
||||
"paramLongName": "workingDir",
|
||||
"paramDescription": "the path used to store temporary output files",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "ssm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "true if the spark session is managed, false otherwise",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName":"tn",
|
||||
"paramLongName":"resultTableName",
|
||||
"paramDescription": "the name of the result table we are currently working on",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName":"pu",
|
||||
"paramLongName":"possibleUpdatesPath",
|
||||
"paramDescription": "the path the the association resultId orcid author list can be found",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName":"test",
|
||||
"paramLongName":"isTest",
|
||||
"paramDescription": "true if it is executing a test",
|
||||
"paramName": "m",
|
||||
"paramLongName": "matchingSource",
|
||||
"paramDescription": "the path used to store temporary output files",
|
||||
"paramRequired": false
|
||||
}
|
||||
|
||||
]
|
|
@ -92,21 +92,14 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="copy_wait" to="fork_prepare_assoc_step1"/>
|
||||
<join name="copy_wait" to="exec_propagation"/>
|
||||
|
||||
<fork name="fork_prepare_assoc_step1">
|
||||
<path start="join_prepare_publication"/>
|
||||
<path start="join_prepare_dataset"/>
|
||||
<path start="join_prepare_otherresearchproduct"/>
|
||||
<path start="join_prepare_software"/>
|
||||
</fork>
|
||||
|
||||
<action name="join_prepare_publication">
|
||||
<action name="exec_propagation">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ORCIDPropagation-PreparePhase1-Publications</name>
|
||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.PrepareResultOrcidAssociationStep1</class>
|
||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkPropagateOrcidAuthor</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
|
@ -119,239 +112,17 @@
|
|||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=8000
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
|
||||
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
|
||||
<arg>--graphPath</arg><arg>${sourcePath}</arg>
|
||||
<arg>--orcidPath</arg><arg>${sourcePath}</arg>
|
||||
<arg>--workingDir</arg><arg>${workingDir}</arg>
|
||||
<arg>--targetPath</arg><arg>${outputPath}</arg>
|
||||
<arg>--matchingSource</arg><arg>graph</arg>
|
||||
</spark>
|
||||
<ok to="wait"/>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="join_prepare_dataset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ORCIDPropagation-PreparePhase1-Dataset</name>
|
||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.PrepareResultOrcidAssociationStep1</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
|
||||
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
|
||||
</spark>
|
||||
<ok to="wait"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="join_prepare_otherresearchproduct">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ORCIDPropagation-PreparePhase1-ORP</name>
|
||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.PrepareResultOrcidAssociationStep1</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
|
||||
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
|
||||
</spark>
|
||||
<ok to="wait"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="join_prepare_software">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ORCIDPropagation-PreparePhase1-Software</name>
|
||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.PrepareResultOrcidAssociationStep1</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
|
||||
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
|
||||
</spark>
|
||||
<ok to="wait"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="wait" to="prepare_assoc_step2"/>
|
||||
|
||||
<action name="prepare_assoc_step2">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ORCIDPropagation-PreparePhase2</name>
|
||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.PrepareResultOrcidAssociationStep2</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
|
||||
</spark>
|
||||
<ok to="fork-join-exec-propagation"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<fork name="fork-join-exec-propagation">
|
||||
<path start="join_propagate_publication"/>
|
||||
<path start="join_propagate_dataset"/>
|
||||
<path start="join_propagate_otherresearchproduct"/>
|
||||
<path start="join_propagate_software"/>
|
||||
</fork>
|
||||
|
||||
<action name="join_propagate_publication">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ORCIDPropagation-Publication</name>
|
||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=15000
|
||||
</spark-opts>
|
||||
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
|
||||
</spark>
|
||||
<ok to="wait2"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="join_propagate_dataset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ORCIDPropagation-Dataset</name>
|
||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=8000
|
||||
</spark-opts>
|
||||
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
|
||||
</spark>
|
||||
<ok to="wait2"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="join_propagate_otherresearchproduct">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ORCIDPropagation-ORP</name>
|
||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=8000
|
||||
</spark-opts>
|
||||
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
|
||||
</spark>
|
||||
<ok to="wait2"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="join_propagate_software">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ORCIDPropagation-Software</name>
|
||||
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=4000
|
||||
</spark-opts>
|
||||
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
|
||||
</spark>
|
||||
<ok to="wait2"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="wait2" to="End"/>
|
||||
|
||||
<end name="End"/>
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Country;
|
||||
|
@ -33,7 +34,8 @@ public class CountryPropagationJobTest {
|
|||
|
||||
private static final Logger log = LoggerFactory.getLogger(CountryPropagationJobTest.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
|
|
|
@ -15,11 +15,13 @@ import org.junit.jupiter.api.Assertions;
|
|||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
public class DatasourceCountryPreparationTest {
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
|
||||
import eu.dnetlib.dhp.KeyValueSet;
|
||||
import eu.dnetlib.dhp.PropagationConstant;
|
||||
import eu.dnetlib.dhp.common.enrichment.Constants;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
|
||||
|
@ -145,7 +146,7 @@ public class SparkJobTest {
|
|||
.foreach(
|
||||
r -> Assertions
|
||||
.assertEquals(
|
||||
PropagationConstant.PROPAGATION_DATA_INFO_TYPE, r.getDataInfo().getInferenceprovenance()));
|
||||
Constants.PROPAGATION_DATA_INFO_TYPE, r.getDataInfo().getInferenceprovenance()));
|
||||
result
|
||||
.foreach(
|
||||
r -> Assertions
|
||||
|
@ -428,7 +429,7 @@ public class SparkJobTest {
|
|||
.foreach(
|
||||
r -> Assertions
|
||||
.assertEquals(
|
||||
PropagationConstant.PROPAGATION_DATA_INFO_TYPE, r.getDataInfo().getInferenceprovenance()));
|
||||
Constants.PROPAGATION_DATA_INFO_TYPE, r.getDataInfo().getInferenceprovenance()));
|
||||
project
|
||||
.foreach(
|
||||
r -> Assertions
|
||||
|
|
|
@ -4,11 +4,16 @@ package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
|||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Optional;
|
||||
import java.util.logging.Filter;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.api.java.function.ForeachFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
@ -21,8 +26,11 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.common.enrichment.Constants;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
|
||||
public class OrcidPropagationJobTest {
|
||||
|
||||
|
@ -71,23 +79,24 @@ public class OrcidPropagationJobTest {
|
|||
.getResource(
|
||||
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc")
|
||||
.getPath();
|
||||
SparkOrcidToResultFromSemRelJob
|
||||
SparkPropagateOrcidAuthor
|
||||
.main(
|
||||
new String[] {
|
||||
"-isTest", Boolean.TRUE.toString(),
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath", sourcePath,
|
||||
"-hive_metastore_uris", "",
|
||||
"-saveGraph", "true",
|
||||
"-resultTableName", Dataset.class.getCanonicalName(),
|
||||
"-outputPath", workingDir.toString() + "/dataset",
|
||||
"-possibleUpdatesPath", possibleUpdatesPath
|
||||
"-graphPath",
|
||||
getClass()
|
||||
.getResource(
|
||||
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/noupdate")
|
||||
.getPath(),
|
||||
"-targetPath",
|
||||
workingDir.toString() + "/graph",
|
||||
"-orcidPath", "",
|
||||
"-workingDir", workingDir.toString()
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Dataset> tmp = sc
|
||||
.textFile(workingDir.toString() + "/dataset")
|
||||
.textFile(workingDir.toString() + "/graph/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||
|
||||
// tmp.map(s -> new Gson().toJson(s)).foreach(s -> System.out.println(s));
|
||||
|
@ -110,40 +119,27 @@ public class OrcidPropagationJobTest {
|
|||
|
||||
@Test
|
||||
void oneUpdateTest() throws Exception {
|
||||
SparkOrcidToResultFromSemRelJob
|
||||
SparkPropagateOrcidAuthor
|
||||
.main(
|
||||
new String[] {
|
||||
"-isTest",
|
||||
Boolean.TRUE.toString(),
|
||||
"-isSparkSessionManaged",
|
||||
Boolean.FALSE.toString(),
|
||||
"-sourcePath",
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/oneupdate")
|
||||
.getPath(),
|
||||
"-hive_metastore_uris",
|
||||
"",
|
||||
"-saveGraph",
|
||||
"true",
|
||||
"-resultTableName",
|
||||
"eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||
"-outputPath",
|
||||
workingDir.toString() + "/dataset",
|
||||
"-possibleUpdatesPath",
|
||||
"-graphPath",
|
||||
getClass()
|
||||
.getResource(
|
||||
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc")
|
||||
.getPath()
|
||||
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/oneupdate")
|
||||
.getPath(),
|
||||
"-targetPath",
|
||||
workingDir.toString() + "/graph",
|
||||
"-orcidPath", "",
|
||||
"-workingDir", workingDir.toString(),
|
||||
"-matchingSource", "propagation"
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Dataset> tmp = sc
|
||||
.textFile(workingDir.toString() + "/dataset")
|
||||
.textFile(workingDir.toString() + "/graph/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||
|
||||
// tmp.map(s -> new Gson().toJson(s)).foreach(s -> System.out.println(s));
|
||||
|
||||
Assertions.assertEquals(10, tmp.count());
|
||||
|
||||
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
|
||||
|
@ -158,6 +154,7 @@ public class OrcidPropagationJobTest {
|
|||
+ "where MyP.datainfo.inferenceprovenance = 'propagation'";
|
||||
|
||||
org.apache.spark.sql.Dataset<Row> propagatedAuthors = spark.sql(query);
|
||||
propagatedAuthors.show(false);
|
||||
|
||||
Assertions.assertEquals(1, propagatedAuthors.count());
|
||||
|
||||
|
@ -167,47 +164,35 @@ public class OrcidPropagationJobTest {
|
|||
propagatedAuthors
|
||||
.filter(
|
||||
"id = '50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e' "
|
||||
+ "and name = 'Vajinder' and surname = 'Kumar' and pidType = '" +
|
||||
+ "and name = 'Nicole' and surname = 'Jung' and pidType = '" +
|
||||
|
||||
ModelConstants.ORCID_PENDING + "'")
|
||||
.count());
|
||||
|
||||
Assertions.assertEquals(1, propagatedAuthors.filter("pid = '0000-0002-8825-3517'").count());
|
||||
Assertions.assertEquals(1, propagatedAuthors.filter("pid = '0000-0001-9513-2468'").count());
|
||||
}
|
||||
|
||||
@Test
|
||||
void twoUpdatesTest() throws Exception {
|
||||
SparkOrcidToResultFromSemRelJob
|
||||
SparkPropagateOrcidAuthor
|
||||
.main(
|
||||
new String[] {
|
||||
"-isTest",
|
||||
Boolean.TRUE.toString(),
|
||||
"-isSparkSessionManaged",
|
||||
Boolean.FALSE.toString(),
|
||||
"-sourcePath",
|
||||
"-graphPath",
|
||||
getClass()
|
||||
.getResource(
|
||||
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/twoupdates")
|
||||
.getPath(),
|
||||
"-hive_metastore_uris",
|
||||
"",
|
||||
"-saveGraph",
|
||||
"true",
|
||||
"-resultTableName",
|
||||
"eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||
"-outputPath",
|
||||
workingDir.toString() + "/dataset",
|
||||
"-possibleUpdatesPath",
|
||||
getClass()
|
||||
.getResource(
|
||||
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc")
|
||||
.getPath()
|
||||
"-targetPath",
|
||||
workingDir.toString() + "/graph",
|
||||
"-orcidPath", "",
|
||||
"-workingDir", workingDir.toString(),
|
||||
"-matchingSource", "propagation"
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Dataset> tmp = sc
|
||||
.textFile(workingDir.toString() + "/dataset")
|
||||
.textFile(workingDir.toString() + "/graph/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||
|
||||
Assertions.assertEquals(10, tmp.count());
|
||||
|
@ -229,10 +214,10 @@ public class OrcidPropagationJobTest {
|
|||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1, propagatedAuthors.filter("name = 'Marc' and surname = 'Schmidtmann'").count());
|
||||
1, propagatedAuthors.filter("name = 'Nicole' and surname = 'Jung'").count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1, propagatedAuthors.filter("name = 'Ruediger' and surname = 'Beckhaus'").count());
|
||||
1, propagatedAuthors.filter("name = 'Camilo José' and surname = 'Cela'").count());
|
||||
|
||||
query = "select id, MyT.name name, MyT.surname surname, MyP.value pid ,MyP.qualifier.classid pidType "
|
||||
+ "from dataset "
|
||||
|
@ -243,13 +228,27 @@ public class OrcidPropagationJobTest {
|
|||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2, authorsExplodedPids.filter("name = 'Marc' and surname = 'Schmidtmann'").count());
|
||||
3, authorsExplodedPids.filter("name = 'Camilo José' and surname = 'Cela'").count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
authorsExplodedPids
|
||||
.filter(
|
||||
"name = 'Marc' and surname = 'Schmidtmann' and pidType = 'MAG Identifier'")
|
||||
"name = 'Camilo José' and surname = 'Cela' and pidType = 'MAG Identifier'")
|
||||
.count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
authorsExplodedPids
|
||||
.filter(
|
||||
"name = 'Camilo José' and surname = 'Cela' and pidType = 'orcid'")
|
||||
.count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
authorsExplodedPids
|
||||
.filter(
|
||||
"name = 'Camilo José' and surname = 'Cela' and pidType = 'orcid_pending'")
|
||||
.count());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.orcidtoresultfromsemrel.OrcidPropagationJobTest;
|
||||
|
@ -30,7 +31,8 @@ public class ResultToCommunityJobTest {
|
|||
|
||||
private static final Logger log = LoggerFactory.getLogger(ResultToCommunityJobTest.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
|
||||
|
@ -34,7 +35,8 @@ public class ResultToCommunityJobTest {
|
|||
|
||||
private static final Logger log = LoggerFactory.getLogger(ResultToCommunityJobTest.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
|
|
Binary file not shown.
File diff suppressed because one or more lines are too long
Binary file not shown.
|
@ -0,0 +1,2 @@
|
|||
{"subRelType": "supplement", "relClass": "IsSupplementedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "target": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "source": "50|dedup_wf_001::36bcfaa1494c849547a346da688ade24", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "supplement", "relClass": "IsSupplementTo", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "source": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "target": "50|dedup_wf_001::36bcfaa1494c849547a346da688ade24", "collectedfrom": [], "validated": false, "properties": []}
|
File diff suppressed because one or more lines are too long
Binary file not shown.
|
@ -0,0 +1,9 @@
|
|||
{"subRelType": "supplement", "relClass": "IsSupplementedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "target": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "source": "50|dedup_wf_001::36bcfaa1494c849547a346da688ade24", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "supplement", "relClass": "IsSupplementTo", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "source": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "target": "50|dedup_wf_001::36bcfaa1494c849547a346da688ade24", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "supplement", "relClass": "IsSupplementedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "target": "50|od______3989::0f89464c4ac4c398fe0c71433b175a62", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "source": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "collectedfrom": [], "validated": false, "properties": []}
|
||||
{"subRelType": "supplement", "relClass": "IsSupplementTo", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "source": "50|od______3989::0f89464c4ac4c398fe0c71433b175a62", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "target": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "collectedfrom": [], "validated": false, "properties": []}
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -1,129 +0,0 @@
|
|||
package eu.dnetlib.dhp.enrich.orcid
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||
import eu.dnetlib.dhp.schema.oaf.{Author, StructuredProperty}
|
||||
import eu.dnetlib.dhp.schema.sx.OafUtils
|
||||
import eu.dnetlib.pace.util.AuthorMatchers
|
||||
|
||||
import java.util
|
||||
import scala.beans.BeanProperty
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.util.control.Breaks.{break, breakable}
|
||||
|
||||
case class ORCIDAuthorEnricherResult(
|
||||
@BeanProperty var id: String,
|
||||
@BeanProperty var enriched_author: java.util.List[Author],
|
||||
@BeanProperty var author_matched: java.util.List[MatchedAuthors],
|
||||
@BeanProperty var author_unmatched: java.util.List[Author],
|
||||
@BeanProperty var orcid_unmatched: java.util.List[OrcidAutor]
|
||||
)
|
||||
|
||||
object ORCIDAuthorEnricher extends Serializable {
|
||||
|
||||
def enrichOrcid(
|
||||
id: String,
|
||||
graph_authors: java.util.List[Author],
|
||||
orcid_authors: java.util.List[OrcidAutor]
|
||||
): ORCIDAuthorEnricherResult = {
|
||||
// Author enriching strategy:
|
||||
// 1) create a copy of graph author list in unmatched_authors
|
||||
// 2) find best match in unmatched_authors, remove it from unmatched_authors and enrich it so
|
||||
// that the enrichment is reflected in graph_authors (they share author instances)
|
||||
// 3) repeat (2) till the end of the list and then with different matching algorithms that have decreasing
|
||||
// trust in their output
|
||||
// At the end unmatched_authors will contain authors not matched with any of the matching algos
|
||||
val unmatched_authors = new util.ArrayList[Author](graph_authors)
|
||||
|
||||
val matches = {
|
||||
// Look after exact fullname match, reconstruct ORCID fullname as givenName + familyName
|
||||
extractAndEnrichMatches(
|
||||
unmatched_authors,
|
||||
orcid_authors,
|
||||
(author, orcid) =>
|
||||
AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.givenName + " " + orcid.familyName),
|
||||
"fullName"
|
||||
) ++
|
||||
// Look after exact reversed fullname match, reconstruct ORCID fullname as familyName + givenName
|
||||
extractAndEnrichMatches(
|
||||
unmatched_authors,
|
||||
orcid_authors,
|
||||
(author, orcid) =>
|
||||
AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.familyName + " " + orcid.givenName),
|
||||
"reversedFullName"
|
||||
) ++
|
||||
// split author names in tokens, order the tokens, then check for matches of full tokens or abbreviations
|
||||
extractAndEnrichMatches(
|
||||
unmatched_authors,
|
||||
orcid_authors,
|
||||
(author, orcid) =>
|
||||
AuthorMatchers
|
||||
.matchOrderedTokenAndAbbreviations(author.getFullname, orcid.givenName + " " + orcid.familyName),
|
||||
"orderedTokens"
|
||||
) ++
|
||||
// look after exact matches of ORCID creditName
|
||||
extractAndEnrichMatches(
|
||||
unmatched_authors,
|
||||
orcid_authors,
|
||||
(author, orcid) => AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.creditName),
|
||||
"creditName"
|
||||
) ++
|
||||
// look after exact matches in ORCID otherNames
|
||||
extractAndEnrichMatches(
|
||||
unmatched_authors,
|
||||
orcid_authors,
|
||||
(author, orcid) =>
|
||||
orcid.otherNames != null && AuthorMatchers.matchOtherNames(author.getFullname, orcid.otherNames.asScala),
|
||||
"otherNames"
|
||||
)
|
||||
}
|
||||
|
||||
ORCIDAuthorEnricherResult(id, graph_authors, matches.asJava, unmatched_authors, orcid_authors)
|
||||
}
|
||||
|
||||
private def extractAndEnrichMatches(
|
||||
graph_authors: java.util.List[Author],
|
||||
orcid_authors: java.util.List[OrcidAutor],
|
||||
matchingFunc: (Author, OrcidAutor) => Boolean,
|
||||
matchName: String
|
||||
) = {
|
||||
val matched = scala.collection.mutable.ArrayBuffer.empty[MatchedAuthors]
|
||||
|
||||
if (graph_authors != null && !graph_authors.isEmpty) {
|
||||
val ait = graph_authors.iterator
|
||||
|
||||
while (ait.hasNext) {
|
||||
val author = ait.next()
|
||||
val oit = orcid_authors.iterator
|
||||
|
||||
breakable {
|
||||
while (oit.hasNext) {
|
||||
val orcid = oit.next()
|
||||
|
||||
if (matchingFunc(author, orcid)) {
|
||||
ait.remove()
|
||||
oit.remove()
|
||||
matched += MatchedAuthors(author, orcid, matchName)
|
||||
|
||||
if (author.getPid == null) {
|
||||
author.setPid(new util.ArrayList[StructuredProperty]())
|
||||
}
|
||||
|
||||
val orcidPID = OafUtils.createSP(orcid.orcid, ModelConstants.ORCID, ModelConstants.ORCID)
|
||||
orcidPID.setDataInfo(OafUtils.generateDataInfo())
|
||||
orcidPID.getDataInfo.setProvenanceaction(
|
||||
OafUtils.createQualifier("ORCID_ENRICHMENT", "ORCID_ENRICHMENT")
|
||||
)
|
||||
|
||||
author.getPid.add(orcidPID)
|
||||
|
||||
break()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
matched
|
||||
}
|
||||
|
||||
}
|
|
@ -1,93 +1,23 @@
|
|||
package eu.dnetlib.dhp.enrich.orcid
|
||||
|
||||
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
||||
import eu.dnetlib.dhp.common.author.SparkEnrichWithOrcidAuthors
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport
|
||||
import eu.dnetlib.dhp.schema.oaf._
|
||||
import eu.dnetlib.dhp.utils.ORCIDAuthorEnricherResult
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import scala.beans.BeanProperty
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
case class OrcidAutor(
|
||||
@BeanProperty var orcid: String,
|
||||
@BeanProperty var familyName: String,
|
||||
@BeanProperty var givenName: String,
|
||||
@BeanProperty var creditName: String,
|
||||
@BeanProperty var otherNames: java.util.List[String]
|
||||
) {
|
||||
def this() = this("null", "null", "null", "null", null)
|
||||
}
|
||||
|
||||
case class MatchData(
|
||||
@BeanProperty var id: String,
|
||||
@BeanProperty var graph_authors: java.util.List[Author],
|
||||
@BeanProperty var orcid_authors: java.util.List[OrcidAutor]
|
||||
) {
|
||||
def this() = this("null", null, null)
|
||||
}
|
||||
|
||||
case class MatchedAuthors(
|
||||
@BeanProperty var author: Author,
|
||||
@BeanProperty var orcid: OrcidAutor,
|
||||
@BeanProperty var `type`: String
|
||||
)
|
||||
|
||||
class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String], log: Logger)
|
||||
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
|
||||
extends SparkEnrichWithOrcidAuthors(propertyPath, args, log: Logger) {
|
||||
|
||||
/** Here all the spark applications runs this method
|
||||
* where the whole logic of the spark node is defined
|
||||
*/
|
||||
override def run(): Unit = {
|
||||
val graphPath = parser.get("graphPath")
|
||||
log.info(s"graphPath is '$graphPath'")
|
||||
val orcidPath = parser.get("orcidPath")
|
||||
log.info(s"orcidPath is '$orcidPath'")
|
||||
val targetPath = parser.get("targetPath")
|
||||
log.info(s"targetPath is '$targetPath'")
|
||||
val workingDir = parser.get("workingDir")
|
||||
log.info(s"targetPath is '$workingDir'")
|
||||
|
||||
createTemporaryData(graphPath, orcidPath, workingDir)
|
||||
analisys(workingDir)
|
||||
generateGraph(graphPath, workingDir, targetPath)
|
||||
}
|
||||
|
||||
private def generateGraph(graphPath: String, workingDir: String, targetPath: String): Unit = {
|
||||
|
||||
ModelSupport.entityTypes.asScala
|
||||
.filter(e => ModelSupport.isResult(e._1))
|
||||
.foreach(e => {
|
||||
val resultType = e._1.name()
|
||||
val enc = Encoders.bean(e._2)
|
||||
|
||||
val matched = spark.read
|
||||
.schema(Encoders.bean(classOf[ORCIDAuthorEnricherResult]).schema)
|
||||
.parquet(s"${workingDir}/${resultType}_matched")
|
||||
.selectExpr("id", "enriched_author")
|
||||
|
||||
spark.read
|
||||
.schema(enc.schema)
|
||||
.json(s"$graphPath/$resultType")
|
||||
.join(matched, Seq("id"), "left")
|
||||
.withColumn(
|
||||
"author",
|
||||
when(size(col("enriched_author")).gt(0), col("enriched_author"))
|
||||
.otherwise(col("author"))
|
||||
)
|
||||
.drop("enriched_author")
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(s"${targetPath}/${resultType}")
|
||||
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
private def createTemporaryData(graphPath: String, orcidPath: String, targetPath: String): Unit = {
|
||||
override def createTemporaryData(
|
||||
spark: SparkSession,
|
||||
graphPath: String,
|
||||
orcidPath: String,
|
||||
targetPath: String
|
||||
): Unit = {
|
||||
val orcidAuthors =
|
||||
spark.read.load(s"$orcidPath/Authors").select("orcid", "familyName", "givenName", "creditName", "otherNames")
|
||||
|
||||
|
@ -157,25 +87,6 @@ class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String]
|
|||
orcidWorksWithAuthors.unpersist()
|
||||
}
|
||||
|
||||
private def analisys(targetPath: String): Unit = {
|
||||
ModelSupport.entityTypes.asScala
|
||||
.filter(e => ModelSupport.isResult(e._1))
|
||||
.foreach(e => {
|
||||
val resultType = e._1.name()
|
||||
|
||||
spark.read
|
||||
.parquet(s"$targetPath/${resultType}_unmatched")
|
||||
.where("size(graph_authors) > 0")
|
||||
.as[MatchData](Encoders.bean(classOf[MatchData]))
|
||||
.map(md => {
|
||||
ORCIDAuthorEnricher.enrichOrcid(md.id, md.graph_authors, md.orcid_authors)
|
||||
})(Encoders.bean(classOf[ORCIDAuthorEnricherResult]))
|
||||
.write
|
||||
.option("compression", "gzip")
|
||||
.mode("overwrite")
|
||||
.parquet(s"$targetPath/${resultType}_matched")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
object SparkEnrichGraphWithOrcidAuthors {
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package eu.dnetlib.dhp.incremental
|
||||
|
||||
import eu.dnetlib.dhp.PropagationConstant
|
||||
import eu.dnetlib.dhp.common.enrichment.Constants
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.bulktag.community.TaggingConstants
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport
|
||||
|
@ -63,7 +63,7 @@ object SparkAppendContextCleanedGraph {
|
|||
c.getDataInfo.asScala
|
||||
.filter(
|
||||
di =>
|
||||
!di.getInferenceprovenance.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE)
|
||||
!di.getInferenceprovenance.equals(Constants.PROPAGATION_DATA_INFO_TYPE)
|
||||
&& !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE)
|
||||
)
|
||||
.toList
|
||||
|
|
Loading…
Reference in New Issue