WIP: [OrcidPropagation] Make ORCID enrichment/propagation code more generic and reusable #501

Draft
giambattista.bloisi wants to merge 38 commits from propagateorcid into beta
50 changed files with 708 additions and 1091 deletions

View File

@ -0,0 +1,9 @@
package eu.dnetlib.dhp.common.enrichment;
public class Constants {
public static final String PROPAGATION_DATA_INFO_TYPE = "propagation";
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID = "authorpid:result";
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME = "Propagation of authors pid to result through semantic relations";
}

View File

@ -621,7 +621,7 @@ public class MergeUtils {
return m; return m;
} }
private static List<Author> mergeAuthors(List<Author> author, List<Author> author1, int trust) { public static List<Author> mergeAuthors(List<Author> author, List<Author> author1, int trust) {
List<List<Author>> authors = new ArrayList<>(); List<List<Author>> authors = new ArrayList<>();
if (author != null) { if (author != null) {
authors.add(author); authors.add(author);

View File

@ -0,0 +1,134 @@
package eu.dnetlib.dhp.common.author
import eu.dnetlib.dhp.application.AbstractScalaApplication
import eu.dnetlib.dhp.schema.common.{EntityType, ModelConstants, ModelSupport}
import eu.dnetlib.dhp.utils.{MatchData, ORCIDAuthorEnricher, ORCIDAuthorEnricherResult}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.slf4j.{Logger, LoggerFactory}
import eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE
import eu.dnetlib.dhp.schema.oaf.{OafEntity, Result}
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils
import scala.collection.JavaConverters._
abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[String], log: Logger)
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
/** Here all the spark applications runs this method
* where the whole logic of the spark node is defined
*/
override def run(): Unit = {
val graphPath = parser.get("graphPath")
log.info(s"graphPath is '$graphPath'")
val orcidPath = parser.get("orcidPath")
log.info(s"orcidPath is '$orcidPath'")
val targetPath = parser.get("targetPath")
log.info(s"targetPath is '$targetPath'")
val workingDir = parser.get("workingDir")
log.info(s"targetPath is '$workingDir'")
val classid =
Option(parser.get("matchingSource")).map(_ => ModelConstants.ORCID_PENDING).getOrElse(ModelConstants.ORCID)
log.info(s"classid is '$classid'")
val provenance =
Option(parser.get("matchingSource")).map(_ => PROPAGATION_DATA_INFO_TYPE).getOrElse("ORCID_ENRICHMENT")
log.info(s"targetPath is '$workingDir'")
createTemporaryData(spark, graphPath, orcidPath, workingDir)
analisys(workingDir, classid, provenance)
generateGraph(spark, graphPath, workingDir, targetPath)
}
private def processAndMerge(
spark: SparkSession,
inputPath: String,
outputPath: String,
clazz: Class[Result],
encoder: Encoder[Result]
): Unit = {
var tmp = spark.read
.schema(Encoders.bean(clazz).schema)
.json(inputPath)
.as(encoder)
tmp
.groupByKey(r => r.getId)(Encoders.STRING)
.mapGroups((k, it) => {
val p: Result = it.next
it.foldLeft(p.getAuthor)((x, r) => MergeUtils.mergeAuthors(x, r.getAuthor, 0))
p
})(encoder)
.write
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath)
}
private def generateGraph(spark: SparkSession, graphPath: String, workingDir: String, targetPath: String): Unit = {
ModelSupport.entityTypes
.keySet()
.asScala
.filter(ModelSupport.isResult)
.foreach((e: EntityType) => {
val resultClazz: Class[Result] = ModelSupport.entityTypes.get(e).asInstanceOf[Class[Result]]
val matched: Dataset[Row] = spark.read
.schema(Encoders.bean(classOf[ORCIDAuthorEnricherResult]).schema)
.parquet(workingDir + "/" + e.name + "_matched")
.selectExpr("id", "enriched_author")
val result: Dataset[Row] = spark.read
.schema(Encoders.bean(resultClazz).schema)
.json(graphPath + "/" + e.name)
result
.join(matched, Seq("id"), "left")
.withColumn(
"author",
when(size(col("enriched_author")).gt(0), col("enriched_author"))
.otherwise(col("author"))
)
.drop("enriched_author")
.as(Encoders.bean(resultClazz))
.groupByKey(r => r.getId)(Encoders.STRING)
.mapGroups((k, it) => {
val p: Result = it.next
p.setAuthor(it.foldLeft(p.getAuthor)((x, r) => MergeUtils.mergeAuthors(x, r.getAuthor, 0)))
p
})(Encoders.bean(resultClazz))
.write
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(targetPath + "/" + e.name)
})
}
def createTemporaryData(spark: SparkSession, graphPath: String, orcidPath: String, targetPath: String): Unit
private def analisys(targetPath: String, classid: String, provenance: String): Unit = {
ModelSupport.entityTypes.asScala
.filter(e => ModelSupport.isResult(e._1))
.foreach(e => {
val resultType = e._1.name()
val c = classid
val p = provenance
spark.read
.parquet(s"$targetPath/${resultType}_unmatched")
.where("size(graph_authors) > 0")
.as[MatchData](Encoders.bean(classOf[MatchData]))
.map(md => {
ORCIDAuthorEnricher.enrichOrcid(md.id, md.graph_authors, md.orcid_authors, c, p)
})(Encoders.bean(classOf[ORCIDAuthorEnricherResult]))
.write
.option("compression", "gzip")
.mode("overwrite")
.parquet(s"$targetPath/${resultType}_matched")
})
}
}

View File

@ -1,11 +1,12 @@
package eu.dnetlib.pace.util package eu.dnetlib.dhp.utils
import java.text.Normalizer
import java.util.Locale import java.util.Locale
import java.util.regex.Pattern import java.util.regex.Pattern
import scala.util.control.Breaks.{break, breakable} import scala.util.control.Breaks.{break, breakable}
object AuthorMatchers { object AuthorMatchers {
val SPLIT_REGEX = Pattern.compile("[\\s,\\.]+") val SPLIT_REGEX = Pattern.compile("[\\s\\p{Punct}\\p{Pd}]+")
val WORD_DIFF = 2 val WORD_DIFF = 2
@ -24,9 +25,16 @@ object AuthorMatchers {
} }
} }
def normalize(s: String): Array[String] = {
SPLIT_REGEX
.split(Normalizer.normalize(s, Normalizer.Form.NFC).toLowerCase(Locale.ROOT))
.filter(_.nonEmpty)
.sorted
}
def matchOrderedTokenAndAbbreviations(a1: String, a2: String): Boolean = { def matchOrderedTokenAndAbbreviations(a1: String, a2: String): Boolean = {
val p1: Array[String] = SPLIT_REGEX.split(a1.trim.toLowerCase(Locale.ROOT)).filter(_.nonEmpty).sorted val p1: Array[String] = normalize(a1)
val p2: Array[String] = SPLIT_REGEX.split(a2.trim.toLowerCase(Locale.ROOT)).filter(_.nonEmpty).sorted val p2: Array[String] = normalize(a2)
if (p1.length < 2 || p2.length < 2) return false if (p1.length < 2 || p2.length < 2) return false
if (Math.abs(p1.length - p2.length) > WORD_DIFF) return false // use alternative comparison algo if (Math.abs(p1.length - p2.length) > WORD_DIFF) return false // use alternative comparison algo
@ -73,7 +81,6 @@ object AuthorMatchers {
removeMatches(graph_authors, orcid_authors, (a, b) => matchingFunc(a, b)) removeMatches(graph_authors, orcid_authors, (a, b) => matchingFunc(a, b))
} }
def removeMatches( def removeMatches(
graph_authors: java.util.List[String], graph_authors: java.util.List[String],
orcid_authors: java.util.List[String], orcid_authors: java.util.List[String],

View File

@ -0,0 +1,197 @@
package eu.dnetlib.dhp.utils
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.{Author, StructuredProperty}
import eu.dnetlib.dhp.schema.sx.OafUtils
import java.util
import scala.beans.BeanProperty
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.Breaks.{break, breakable}
import eu.dnetlib.dhp.common.enrichment.Constants._
case class OrcidAuthor(
@BeanProperty var orcid: String,
@BeanProperty var familyName: String,
@BeanProperty var givenName: String,
@BeanProperty var creditName: String,
@BeanProperty var otherNames: java.util.List[String]
) {
def this() = this("null", "null", "null", "null", null)
}
case class MatchedAuthors(
@BeanProperty var author: Author,
@BeanProperty var orcid: OrcidAuthor,
@BeanProperty var `type`: String
)
case class MatchData(
@BeanProperty var id: String,
@BeanProperty var graph_authors: java.util.List[Author],
@BeanProperty var orcid_authors: java.util.List[OrcidAuthor]
) {
def this() = this("null", null, null)
}
case class ORCIDAuthorEnricherResult(
@BeanProperty var id: String,
@BeanProperty var enriched_author: java.util.List[Author],
@BeanProperty var author_matched: java.util.List[MatchedAuthors],
@BeanProperty var author_unmatched: java.util.List[Author],
@BeanProperty var orcid_unmatched: java.util.List[OrcidAuthor]
)
object ORCIDAuthorEnricher extends Serializable {
def enrichOrcid(
id: String,
graph_authors: java.util.List[Author],
orcid_authors: java.util.List[OrcidAuthor],
classid: String,
provenance: String
): ORCIDAuthorEnricherResult = {
// Author enriching strategy:
// 1) create a copy of graph author list in unmatched_authors
// 2) find best match in unmatched_authors, remove it from unmatched_authors and enrich it so
// that the enrichment is reflected in graph_authors (they share author instances).
// Do not match in case of ambiguity: two authors match and at least one of them has affiliation string
// 3) repeat (2) till the end of the list and then with different matching algorithms that have decreasing
// trust in their output
// At the end unmatched_authors will contain authors not matched with any of the matching algos
val unmatched_authors = new util.ArrayList[Author](graph_authors)
val matches = {
// Look after exact fullname match, reconstruct ORCID fullname as givenName + familyName
extractAndEnrichMatches(
unmatched_authors,
orcid_authors,
(author, orcid) =>
AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.givenName + " " + orcid.familyName),
"fullName",
classid,
provenance
) ++
// Look after exact reversed fullname match, reconstruct ORCID fullname as familyName + givenName
extractAndEnrichMatches(
unmatched_authors,
orcid_authors,
(author, orcid) =>
AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.familyName + " " + orcid.givenName),
"reversedFullName",
classid,
provenance
) ++
// split author names in tokens, order the tokens, then check for matches of full tokens or abbreviations
extractAndEnrichMatches(
unmatched_authors,
orcid_authors,
(author, orcid) =>
AuthorMatchers
.matchOrderedTokenAndAbbreviations(author.getFullname, orcid.givenName + " " + orcid.familyName),
"orderedTokens-1",
classid,
provenance,
skipAmbiguities = true
) ++
// split author names in tokens, order the tokens, then check for matches of full tokens or abbreviations
extractAndEnrichMatches(
unmatched_authors,
orcid_authors,
(author, orcid) =>
AuthorMatchers
.matchOrderedTokenAndAbbreviations(author.getFullname, orcid.givenName + " " + orcid.familyName),
"orderedTokens-2",
classid,
provenance
) ++
// look after exact matches of ORCID creditName
extractAndEnrichMatches(
unmatched_authors,
orcid_authors,
(author, orcid) => AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.creditName),
"creditName",
classid,
provenance
) ++
// look after exact matches in ORCID otherNames
extractAndEnrichMatches(
unmatched_authors,
orcid_authors,
(author, orcid) =>
orcid.otherNames != null && AuthorMatchers.matchOtherNames(author.getFullname, orcid.otherNames.asScala),
"otherNames",
classid,
provenance
)
}
ORCIDAuthorEnricherResult(id, graph_authors, matches.asJava, unmatched_authors, orcid_authors)
}
private def extractAndEnrichMatches(
unmatched_authors: java.util.List[Author],
orcid_authors: java.util.List[OrcidAuthor],
matchingFunc: (Author, OrcidAuthor) => Boolean,
matchName: String,
classid: String,
provenance: String,
skipAmbiguities: Boolean = false
): ArrayBuffer[MatchedAuthors] = {
val matched = ArrayBuffer.empty[MatchedAuthors]
if (unmatched_authors == null || unmatched_authors.isEmpty) {
return matched
}
val oit = orcid_authors.iterator
while (oit.hasNext) {
val orcid = oit.next()
val candidates = unmatched_authors.asScala.foldLeft(ArrayBuffer[Author]())((res, author) => {
if (matchingFunc(author, orcid)) {
res += author
}
res
})
if (
candidates.size == 1 ||
(candidates.size > 1 && !skipAmbiguities && !candidates
.exists(a => a.getRawAffiliationString != null && !a.getRawAffiliationString.isEmpty))
) {
val author = candidates(0)
unmatched_authors.remove(author)
oit.remove()
matched += MatchedAuthors(author, orcid, matchName)
if (author.getPid == null) {
author.setPid(new util.ArrayList[StructuredProperty]())
}
val orcidPID = OafUtils.createSP(orcid.orcid, classid, classid)
orcidPID.setDataInfo(OafUtils.generateDataInfo())
if (provenance.equalsIgnoreCase(PROPAGATION_DATA_INFO_TYPE)) {
orcidPID.getDataInfo.setInferenceprovenance(PROPAGATION_DATA_INFO_TYPE);
orcidPID.getDataInfo.setInferred(true);
orcidPID.getDataInfo.setProvenanceaction(
OafUtils.createQualifier(
PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID,
PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME
)
)
} else
orcidPID.getDataInfo.setProvenanceaction(
OafUtils.createQualifier(provenance, provenance)
)
author.getPid.add(orcidPID)
}
}
matched
}
}

View File

@ -1,10 +1,10 @@
package eu.dnetlib.dhp.enrich.orcid package eu.dnetlib.dhp.utils
import eu.dnetlib.pace.util.AuthorMatchers.matchOrderedTokenAndAbbreviations import eu.dnetlib.dhp.utils.AuthorMatchers.matchOrderedTokenAndAbbreviations
import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue} import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
class ORCIDAuthorMatchersTest { class AuthorMatchersTest {
@Test def testShortNames(): Unit = { @Test def testShortNames(): Unit = {
assertTrue(matchOrderedTokenAndAbbreviations("Lasagni Mariozzi Federico", "Lasagni F. Mariozzi")) assertTrue(matchOrderedTokenAndAbbreviations("Lasagni Mariozzi Federico", "Lasagni F. Mariozzi"))

View File

@ -9,11 +9,11 @@ import java.util.stream.Collectors;
import com.wcohen.ss.AbstractStringDistance; import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.dhp.utils.AuthorMatchers;
import eu.dnetlib.pace.config.Config; import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Person; import eu.dnetlib.pace.model.Person;
import eu.dnetlib.pace.tree.support.AbstractListComparator; import eu.dnetlib.pace.tree.support.AbstractListComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass; import eu.dnetlib.pace.tree.support.ComparatorClass;
import eu.dnetlib.pace.util.AuthorMatchers;
@ComparatorClass("authorsMatch") @ComparatorClass("authorsMatch")
public class AuthorsMatch extends AbstractListComparator { public class AuthorsMatch extends AbstractListComparator {

View File

@ -21,9 +21,13 @@ class DecisionTreeTest {
void testJPath() throws IOException { void testJPath() throws IOException {
DedupConfig conf = DedupConfig DedupConfig conf = DedupConfig
.load(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/dedup_conf_organization.json"))); .load(
IOUtils
.toString(
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/dedup_conf_organization.json")));
final String org = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/organization.json")); final String org = IOUtils
.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/organization.json"));
Row row = SparkModel.apply(conf).rowFromJson(org); Row row = SparkModel.apply(conf).rowFromJson(org);
@ -42,7 +46,8 @@ class DecisionTreeTest {
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); "/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
final String org = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/organization_example1.json")); final String org = IOUtils
.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/organization_example1.json"));
Row row = SparkModel.apply(conf).rowFromJson(org); Row row = SparkModel.apply(conf).rowFromJson(org);
// to check that the same parsing returns the same row // to check that the same parsing returns the same row

View File

@ -440,7 +440,8 @@ public class SparkDedupTest implements Serializable {
.count(); .count();
final List<Relation> merges = pubs final List<Relation> merges = pubs
.filter("source == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'")// and relClass = '"+ModelConstants.MERGES+"'") .filter("source == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'")// and relClass =
// '"+ModelConstants.MERGES+"'")
.collectAsList(); .collectAsList();
assertEquals(4, merges.size()); assertEquals(4, merges.size());
Set<String> dups = Sets Set<String> dups = Sets

View File

@ -19,9 +19,13 @@ class JsonPathTest {
void testJPath() throws IOException { void testJPath() throws IOException {
DedupConfig conf = DedupConfig DedupConfig conf = DedupConfig
.load(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/dedup_conf_organization.json"))); .load(
IOUtils
.toString(
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/dedup_conf_organization.json")));
final String org = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/organization.json")); final String org = IOUtils
.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/jpath/organization.json"));
Row row = SparkModel.apply(conf).rowFromJson(org); Row row = SparkModel.apply(conf).rowFromJson(org);

View File

@ -1,6 +1,8 @@
package eu.dnetlib.dhp; package eu.dnetlib.dhp;
import static eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@ -46,7 +48,7 @@ public class PropagationConstant {
public static final String INSTITUTIONAL_REPO_TYPE = "institutional"; public static final String INSTITUTIONAL_REPO_TYPE = "institutional";
public static final String PROPAGATION_DATA_INFO_TYPE = "propagation"; // public static final String PROPAGATION_DATA_INFO_TYPE = "propagation";
public static final String TRUE = "true"; public static final String TRUE = "true";
@ -74,9 +76,6 @@ public class PropagationConstant {
public static final String PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_ID = "result:community:project"; public static final String PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_ID = "result:community:project";
public static final String PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_NAME = " Propagation of result belonging to community through project"; public static final String PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_NAME = " Propagation of result belonging to community through project";
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID = "authorpid:result";
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME = "Propagation of authors pid to result through semantic relations";
public static final String ITERATION_ONE = "ExitAtFirstIteration"; public static final String ITERATION_ONE = "ExitAtFirstIteration";
public static final String ITERATION_TWO = "ExitAtSecondIteration"; public static final String ITERATION_TWO = "ExitAtSecondIteration";
public static final String ITERATION_THREE = "ExitAtThirdIteration"; public static final String ITERATION_THREE = "ExitAtThirdIteration";

View File

@ -1,43 +0,0 @@
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
public class AutoritativeAuthor {
private String name;
private String surname;
private String fullname;
private String orcid;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSurname() {
return surname;
}
public void setSurname(String surname) {
this.surname = surname;
}
public String getFullname() {
return fullname;
}
public void setFullname(String fullname) {
this.fullname = fullname;
}
public String getOrcid() {
return orcid;
}
public void setOrcid(String orcid) {
this.orcid = orcid;
}
}

View File

@ -0,0 +1,19 @@
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
import java.io.Serializable;
import java.util.List;
import eu.dnetlib.dhp.utils.OrcidAuthor;
public class OrcidAuthors implements Serializable {
List<OrcidAuthor> orcidAuthorList;
public List<OrcidAuthor> getOrcidAuthorList() {
return orcidAuthorList;
}
public void setOrcidAuthorList(List<OrcidAuthor> orcidAuthorList) {
this.orcidAuthorList = orcidAuthorList;
}
}

View File

@ -1,124 +0,0 @@
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
public class PrepareResultOrcidAssociationStep1 {
private static final Logger log = LoggerFactory.getLogger(PrepareResultOrcidAssociationStep1.class);
public static void main(String[] args) throws Exception {
String jsonConf = IOUtils
.toString(
PrepareResultOrcidAssociationStep1.class
.getResourceAsStream(
"/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConf);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel));
final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase();
log.info("resultType: {}", resultType);
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
String inputRelationPath = inputPath + "/relation";
log.info("inputRelationPath: {}", inputRelationPath);
String inputResultPath = inputPath + "/" + resultType;
log.info("inputResultPath: {}", inputResultPath);
String outputResultPath = outputPath + "/" + resultType;
log.info("outputResultPath: {}", outputResultPath);
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
prepareInfo(
spark, inputRelationPath, inputResultPath, outputResultPath, resultClazz, allowedsemrel);
});
}
private static <R extends Result> void prepareInfo(
SparkSession spark,
String inputRelationPath,
String inputResultPath,
String outputResultPath,
Class<R> resultClazz,
List<String> allowedsemrel) {
Dataset<Relation> relation = readPath(spark, inputRelationPath, Relation.class);
relation.createOrReplaceTempView("relation");
log.info("Reading Graph table from: {}", inputResultPath);
Dataset<R> result = readPath(spark, inputResultPath, resultClazz);
result.createOrReplaceTempView("result");
String query = "SELECT target resultId, author authorList"
+ " FROM (SELECT id, collect_set(named_struct('name', name, 'surname', surname, 'fullname', fullname, 'orcid', orcid)) author "
+ " FROM ( "
+ " SELECT DISTINCT id, MyT.fullname, MyT.name, MyT.surname, MyP.value orcid "
+ " FROM result "
+ " LATERAL VIEW EXPLODE (author) a AS MyT "
+ " LATERAL VIEW EXPLODE (MyT.pid) p AS MyP "
+ " WHERE lower(MyP.qualifier.classid) = '" + ModelConstants.ORCID + "' or "
+ " lower(MyP.qualifier.classid) = '" + ModelConstants.ORCID_PENDING + "') tmp "
+ " GROUP BY id) r_t "
+ " JOIN ("
+ " SELECT source, target "
+ " FROM relation "
+ " WHERE datainfo.deletedbyinference = false "
+ getConstraintList(" lower(relclass) = '", allowedsemrel)
+ " ) rel_rel "
+ " ON source = id";
log.info("executedQuery: {}", query);
spark
.sql(query)
.as(Encoders.bean(ResultOrcidList.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputResultPath);
}
}

View File

@ -1,95 +0,0 @@
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import scala.Tuple2;
public class PrepareResultOrcidAssociationStep2 {
private static final Logger log = LoggerFactory.getLogger(PrepareResultOrcidAssociationStep2.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareResultOrcidAssociationStep2.class
.getResourceAsStream(
"/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters2.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
mergeInfo(spark, inputPath, outputPath);
});
}
private static void mergeInfo(SparkSession spark, String inputPath, String outputPath) {
Dataset<ResultOrcidList> resultOrcidAssoc = readPath(spark, inputPath + "/publication", ResultOrcidList.class)
.union(readPath(spark, inputPath + "/dataset", ResultOrcidList.class))
.union(readPath(spark, inputPath + "/otherresearchproduct", ResultOrcidList.class))
.union(readPath(spark, inputPath + "/software", ResultOrcidList.class));
resultOrcidAssoc
.toJavaRDD()
.mapToPair(r -> new Tuple2<>(r.getResultId(), r))
.reduceByKey(
(a, b) -> {
if (a == null) {
return b;
}
if (b == null) {
return a;
}
Set<String> orcid_set = new HashSet<>();
a.getAuthorList().stream().forEach(aa -> orcid_set.add(aa.getOrcid()));
b
.getAuthorList()
.stream()
.forEach(
aa -> {
if (!orcid_set.contains(aa.getOrcid())) {
a.getAuthorList().add(aa);
orcid_set.add(aa.getOrcid());
}
});
return a;
})
.map(Tuple2::_2)
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(outputPath, GzipCodec.class);
}
}

View File

@ -1,27 +0,0 @@
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class ResultOrcidList implements Serializable {
String resultId;
List<AutoritativeAuthor> authorList = new ArrayList<>();
public String getResultId() {
return resultId;
}
public void setResultId(String resultId) {
this.resultId = resultId;
}
public List<AutoritativeAuthor> getAuthorList() {
return authorList;
}
public void setAuthorList(List<AutoritativeAuthor> authorList) {
this.authorList = authorList;
}
}

View File

@ -1,211 +0,0 @@
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.PacePerson;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import scala.Tuple2;
public class SparkOrcidToResultFromSemRelJob {
private static final Logger log = LoggerFactory.getLogger(SparkOrcidToResultFromSemRelJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkOrcidToResultFromSemRelJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String possibleUpdates = parser.get("possibleUpdatesPath");
log.info("possibleUpdatesPath: {}", possibleUpdates);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final Boolean saveGraph = Optional
.ofNullable(parser.get("saveGraph"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("saveGraph: {}", saveGraph);
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
if (saveGraph) {
execPropagation(spark, possibleUpdates, inputPath, outputPath, resultClazz);
}
});
}
private static <R extends Result> void execPropagation(
SparkSession spark,
String possibleUpdatesPath,
String inputPath,
String outputPath,
Class<R> resultClazz) {
// read possible updates (resultId and list of possible orcid to add
Dataset<ResultOrcidList> possible_updates = readPath(spark, possibleUpdatesPath, ResultOrcidList.class);
// read the result we have been considering
Dataset<R> result = readPath(spark, inputPath, resultClazz);
// make join result left_outer with possible updates
result
.joinWith(
possible_updates,
result.col("id").equalTo(possible_updates.col("resultId")),
"left_outer")
.map(authorEnrichFn(), Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
private static <R extends Result> MapFunction<Tuple2<R, ResultOrcidList>, R> authorEnrichFn() {
return value -> {
R ret = value._1();
Optional<ResultOrcidList> rol = Optional.ofNullable(value._2());
if (rol.isPresent() && Optional.ofNullable(ret.getAuthor()).isPresent()) {
List<Author> toenrich_author = ret.getAuthor();
List<AutoritativeAuthor> autoritativeAuthors = rol.get().getAuthorList();
for (Author author : toenrich_author) {
if (!containsAllowedPid(author)) {
enrichAuthor(author, autoritativeAuthors);
}
}
}
return ret;
};
}
private static void enrichAuthor(Author a, List<AutoritativeAuthor> au) {
PacePerson pp = new PacePerson(a.getFullname(), false);
for (AutoritativeAuthor aa : au) {
if (enrichAuthor(aa, a, pp.getNormalisedFirstName(), pp.getNormalisedSurname())) {
return;
}
}
}
private static boolean enrichAuthor(AutoritativeAuthor autoritative_author, Author author,
String author_name,
String author_surname) {
boolean toaddpid = false;
if (StringUtils.isNotEmpty(autoritative_author.getSurname())) {
if (StringUtils.isNotEmpty(author.getSurname())) {
author_surname = author.getSurname();
}
if (StringUtils.isNotEmpty(author_surname)) {
// have the same surname. Check the name
if (autoritative_author
.getSurname()
.trim()
.equalsIgnoreCase(author_surname.trim()) && StringUtils.isNotEmpty(autoritative_author.getName())) {
if (StringUtils.isNotEmpty(author.getName())) {
author_name = author.getName();
}
if (StringUtils.isNotEmpty(author_name)) {
if (autoritative_author
.getName()
.trim()
.equalsIgnoreCase(author_name.trim())) {
toaddpid = true;
}
// they could be differently written (i.e. only the initials of the name
// in one of the two
else {
if (autoritative_author
.getName()
.trim()
.substring(0, 0)
.equalsIgnoreCase(author_name.trim().substring(0, 0))) {
toaddpid = true;
}
}
}
}
}
}
if (toaddpid) {
StructuredProperty p = new StructuredProperty();
p.setValue(autoritative_author.getOrcid());
p
.setQualifier(
getQualifier(
ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, ModelConstants.DNET_PID_TYPES));
p
.setDataInfo(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID,
PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS));
Optional<List<StructuredProperty>> authorPid = Optional.ofNullable(author.getPid());
if (authorPid.isPresent()) {
authorPid.get().add(p);
} else {
author.setPid(Lists.newArrayList(p));
}
}
return toaddpid;
}
private static boolean containsAllowedPid(Author a) {
Optional<List<StructuredProperty>> pids = Optional.ofNullable(a.getPid());
if (!pids.isPresent()) {
return false;
}
for (StructuredProperty pid : pids.get()) {
if (ModelConstants.ORCID_PENDING.equalsIgnoreCase(pid.getQualifier().getClassid()) ||
ModelConstants.ORCID.equalsIgnoreCase(pid.getQualifier().getClassid())) {
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,158 @@
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.common.author.SparkEnrichWithOrcidAuthors;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.OrcidAuthor;
import scala.Tuple2;
public class SparkPropagateOrcidAuthor extends SparkEnrichWithOrcidAuthors {
private static final Logger log = LoggerFactory.getLogger(SparkPropagateOrcidAuthor.class);
public SparkPropagateOrcidAuthor(String propertyPath, String[] args, Logger log) {
super(propertyPath, args, log);
}
public static void main(String[] args) throws Exception {
// Create instance and run the Spark application
SparkPropagateOrcidAuthor app = new SparkPropagateOrcidAuthor(
"/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json", args, log);
app.initialize().run();
}
private static OrcidAuthors getOrcidAuthorsList(List<Author> authors) {
OrcidAuthors oas = new OrcidAuthors();
List<OrcidAuthor> tmp = authors
.stream()
.map(SparkPropagateOrcidAuthor::getOrcidAuthor)
.filter(Objects::nonNull)
.collect(Collectors.toList());
oas.setOrcidAuthorList(tmp);
return oas;
}
private static OrcidAuthor getOrcidAuthor(Author a) {
return Optional
.ofNullable(getOrcid(a))
.map(orcid -> {
OrcidAuthor orcidAuthor = new OrcidAuthor(orcid, a.getSurname(), a.getName(), a.getFullname(), null);
return orcidAuthor;
})
.orElse(null);
}
private static String getOrcid(Author a) {
String orcid = null;
if (a.getPid().stream().anyMatch(p -> p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID)))
orcid = a
.getPid()
.stream()
.filter(p -> p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID))
.findFirst()
.get()
.getValue();
else if (a
.getPid()
.stream()
.anyMatch(p -> p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID_PENDING)))
orcid = a
.getPid()
.stream()
.filter(p -> p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID_PENDING))
.findFirst()
.get()
.getValue();
return orcid;
}
@Override
public void createTemporaryData(SparkSession spark, String graphPath, String orcidPath, String targetPath) {
Dataset<Row> supplements = spark
.read()
.schema(Encoders.bean(Relation.class).schema())
.json(graphPath + "/" + "relation")
.where(
"relclass IN('" + ModelConstants.IS_SUPPLEMENT_TO + "', '" +
ModelConstants.IS_SUPPLEMENTED_BY + "')")
.selectExpr("source as id", "target");
Dataset<Row> result = spark
.read()
.schema(Encoders.bean(Result.class).schema())
.json(
graphPath + "/dataset", graphPath + "/publication", graphPath + "/software",
graphPath + "/otherresearchproduct")
.as(Encoders.bean(Result.class))
.selectExpr("id", "author as graph_authors");
ModelSupport.entityTypes
.keySet()
.stream()
.filter(ModelSupport::isResult)
.forEach(e -> {
Dataset<Row> orcidDnet = spark
.read()
.schema(Encoders.bean(Result.class).schema())
.json(graphPath + "/" + e.name())
.as(Encoders.bean(Result.class))
.filter(
(FilterFunction<Result>) SparkPropagateOrcidAuthor::hasOrcidAuthor)
.map(
(MapFunction<Result, Tuple2<String, OrcidAuthors>>) r -> new Tuple2<>(r.getId(),
getOrcidAuthorsList(r.getAuthor())),
Encoders.tuple(Encoders.STRING(), Encoders.bean(OrcidAuthors.class)))
.selectExpr("_1 as target", "_2.orcidAuthorList as orcid_authors");
result
.join(supplements, "id")
.join(orcidDnet, "target")
.drop("target")
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.parquet(targetPath + "/" + e.name() + "_unmatched");
});
}
private static boolean hasOrcidAuthor(Result r) {
return r.getAuthor() != null &&
isaBoolean(r);
}
private static boolean isaBoolean(Result r) {
boolean tmp = r
.getAuthor()
.stream()
.anyMatch(
a -> a.getPid() != null && a
.getPid()
.stream()
.anyMatch(
p -> p.getQualifier().getClassid().equalsIgnoreCase(ModelConstants.ORCID) ||
p
.getQualifier()
.getClassid()
.equalsIgnoreCase(ModelConstants.ORCID_PENDING)));
return tmp;
}
}

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.projecttoresult;
import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.resulttocommunityfromorganization;
import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;

View File

@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.PropagationConstant.PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME; import static eu.dnetlib.dhp.PropagationConstant.PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.resulttocommunityfromsemrel;
import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;

View File

@ -1,44 +1,32 @@
[ [
{ {
"paramName":"s", "paramName":"s",
"paramLongName":"sourcePath", "paramLongName":"graphPath",
"paramDescription": "the path of the sequencial file to read", "paramDescription": "the path of the sequencial file to read",
"paramRequired": true "paramRequired": true
}, },
{
"paramName":"sg",
"paramLongName":"saveGraph",
"paramDescription": "true if the new version of the graph must be saved",
"paramRequired": false
},
{ {
"paramName": "out", "paramName": "out",
"paramLongName": "outputPath", "paramLongName": "targetPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
}, {
"paramName": "o",
"paramLongName": "orcidPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
}, {
"paramName": "w",
"paramLongName": "workingDir",
"paramDescription": "the path used to store temporary output files", "paramDescription": "the path used to store temporary output files",
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "ssm", "paramName": "m",
"paramLongName": "isSparkSessionManaged", "paramLongName": "matchingSource",
"paramDescription": "true if the spark session is managed, false otherwise", "paramDescription": "the path used to store temporary output files",
"paramRequired": false
},
{
"paramName":"tn",
"paramLongName":"resultTableName",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
},
{
"paramName":"pu",
"paramLongName":"possibleUpdatesPath",
"paramDescription": "the path the the association resultId orcid author list can be found",
"paramRequired": true
},
{
"paramName":"test",
"paramLongName":"isTest",
"paramDescription": "true if it is executing a test",
"paramRequired": false "paramRequired": false
} }
] ]

View File

@ -92,21 +92,14 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="copy_wait" to="fork_prepare_assoc_step1"/> <join name="copy_wait" to="exec_propagation"/>
<fork name="fork_prepare_assoc_step1"> <action name="exec_propagation">
<path start="join_prepare_publication"/>
<path start="join_prepare_dataset"/>
<path start="join_prepare_otherresearchproduct"/>
<path start="join_prepare_software"/>
</fork>
<action name="join_prepare_publication">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>ORCIDPropagation-PreparePhase1-Publications</name> <name>ORCIDPropagation-PreparePhase1-Publications</name>
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.PrepareResultOrcidAssociationStep1</class> <class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkPropagateOrcidAuthor</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar> <jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -119,239 +112,17 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=8000 --conf spark.sql.shuffle.partitions=8000
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>--graphPath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--orcidPath</arg><arg>${sourcePath}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--workingDir</arg><arg>${workingDir}</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg> <arg>--targetPath</arg><arg>${outputPath}</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg> <arg>--matchingSource</arg><arg>graph</arg>
</spark> </spark>
<ok to="wait"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="join_prepare_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ORCIDPropagation-PreparePhase1-Dataset</name>
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.PrepareResultOrcidAssociationStep1</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="join_prepare_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ORCIDPropagation-PreparePhase1-ORP</name>
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.PrepareResultOrcidAssociationStep1</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="join_prepare_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ORCIDPropagation-PreparePhase1-Software</name>
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.PrepareResultOrcidAssociationStep1</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
</action>
<join name="wait" to="prepare_assoc_step2"/>
<action name="prepare_assoc_step2">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ORCIDPropagation-PreparePhase2</name>
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.PrepareResultOrcidAssociationStep2</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
</spark>
<ok to="fork-join-exec-propagation"/>
<error to="Kill"/>
</action>
<fork name="fork-join-exec-propagation">
<path start="join_propagate_publication"/>
<path start="join_propagate_dataset"/>
<path start="join_propagate_otherresearchproduct"/>
<path start="join_propagate_software"/>
</fork>
<action name="join_propagate_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ORCIDPropagation-Publication</name>
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
</spark>
<ok to="wait2"/>
<error to="Kill"/>
</action>
<action name="join_propagate_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ORCIDPropagation-Dataset</name>
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=8000
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
</spark>
<ok to="wait2"/>
<error to="Kill"/>
</action>
<action name="join_propagate_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ORCIDPropagation-ORP</name>
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=8000
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
</spark>
<ok to="wait2"/>
<error to="Kill"/>
</action>
<action name="join_propagate_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ORCIDPropagation-Software</name>
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=4000
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
</spark>
<ok to="wait2"/>
<error to="Kill"/>
</action>
<join name="wait2" to="End"/>
<end name="End"/> <end name="End"/>

View File

@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.Country; import eu.dnetlib.dhp.schema.oaf.Country;
@ -33,7 +34,8 @@ public class CountryPropagationJobTest {
private static final Logger log = LoggerFactory.getLogger(CountryPropagationJobTest.class); private static final Logger log = LoggerFactory.getLogger(CountryPropagationJobTest.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
private static SparkSession spark; private static SparkSession spark;

View File

@ -15,11 +15,13 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
public class DatasourceCountryPreparationTest { public class DatasourceCountryPreparationTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
private static SparkSession spark; private static SparkSession spark;

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.KeyValueSet; import eu.dnetlib.dhp.KeyValueSet;
import eu.dnetlib.dhp.PropagationConstant; import eu.dnetlib.dhp.PropagationConstant;
import eu.dnetlib.dhp.common.enrichment.Constants;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
@ -145,7 +146,7 @@ public class SparkJobTest {
.foreach( .foreach(
r -> Assertions r -> Assertions
.assertEquals( .assertEquals(
PropagationConstant.PROPAGATION_DATA_INFO_TYPE, r.getDataInfo().getInferenceprovenance())); Constants.PROPAGATION_DATA_INFO_TYPE, r.getDataInfo().getInferenceprovenance()));
result result
.foreach( .foreach(
r -> Assertions r -> Assertions
@ -428,7 +429,7 @@ public class SparkJobTest {
.foreach( .foreach(
r -> Assertions r -> Assertions
.assertEquals( .assertEquals(
PropagationConstant.PROPAGATION_DATA_INFO_TYPE, r.getDataInfo().getInferenceprovenance())); Constants.PROPAGATION_DATA_INFO_TYPE, r.getDataInfo().getInferenceprovenance()));
project project
.foreach( .foreach(
r -> Assertions r -> Assertions

View File

@ -4,11 +4,16 @@ package eu.dnetlib.dhp.orcidtoresultfromsemrel;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Optional;
import java.util.logging.Filter;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
@ -21,8 +26,11 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.enrichment.Constants;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class OrcidPropagationJobTest { public class OrcidPropagationJobTest {
@ -71,23 +79,24 @@ public class OrcidPropagationJobTest {
.getResource( .getResource(
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc") "/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc")
.getPath(); .getPath();
SparkOrcidToResultFromSemRelJob SparkPropagateOrcidAuthor
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(), "-graphPath",
"-isSparkSessionManaged", Boolean.FALSE.toString(), getClass()
"-sourcePath", sourcePath, .getResource(
"-hive_metastore_uris", "", "/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/noupdate")
"-saveGraph", "true", .getPath(),
"-resultTableName", Dataset.class.getCanonicalName(), "-targetPath",
"-outputPath", workingDir.toString() + "/dataset", workingDir.toString() + "/graph",
"-possibleUpdatesPath", possibleUpdatesPath "-orcidPath", "",
"-workingDir", workingDir.toString()
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Dataset> tmp = sc JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/dataset") .textFile(workingDir.toString() + "/graph/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
// tmp.map(s -> new Gson().toJson(s)).foreach(s -> System.out.println(s)); // tmp.map(s -> new Gson().toJson(s)).foreach(s -> System.out.println(s));
@ -110,40 +119,27 @@ public class OrcidPropagationJobTest {
@Test @Test
void oneUpdateTest() throws Exception { void oneUpdateTest() throws Exception {
SparkOrcidToResultFromSemRelJob SparkPropagateOrcidAuthor
.main( .main(
new String[] { new String[] {
"-isTest", "-graphPath",
Boolean.TRUE.toString(),
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-sourcePath",
getClass()
.getResource("/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/oneupdate")
.getPath(),
"-hive_metastore_uris",
"",
"-saveGraph",
"true",
"-resultTableName",
"eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath",
workingDir.toString() + "/dataset",
"-possibleUpdatesPath",
getClass() getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc") "/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/oneupdate")
.getPath() .getPath(),
"-targetPath",
workingDir.toString() + "/graph",
"-orcidPath", "",
"-workingDir", workingDir.toString(),
"-matchingSource", "propagation"
}); });
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Dataset> tmp = sc JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/dataset") .textFile(workingDir.toString() + "/graph/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
// tmp.map(s -> new Gson().toJson(s)).foreach(s -> System.out.println(s));
Assertions.assertEquals(10, tmp.count()); Assertions.assertEquals(10, tmp.count());
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
@ -158,6 +154,7 @@ public class OrcidPropagationJobTest {
+ "where MyP.datainfo.inferenceprovenance = 'propagation'"; + "where MyP.datainfo.inferenceprovenance = 'propagation'";
org.apache.spark.sql.Dataset<Row> propagatedAuthors = spark.sql(query); org.apache.spark.sql.Dataset<Row> propagatedAuthors = spark.sql(query);
propagatedAuthors.show(false);
Assertions.assertEquals(1, propagatedAuthors.count()); Assertions.assertEquals(1, propagatedAuthors.count());
@ -167,47 +164,35 @@ public class OrcidPropagationJobTest {
propagatedAuthors propagatedAuthors
.filter( .filter(
"id = '50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e' " "id = '50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e' "
+ "and name = 'Vajinder' and surname = 'Kumar' and pidType = '" + + "and name = 'Nicole' and surname = 'Jung' and pidType = '" +
ModelConstants.ORCID_PENDING + "'") ModelConstants.ORCID_PENDING + "'")
.count()); .count());
Assertions.assertEquals(1, propagatedAuthors.filter("pid = '0000-0002-8825-3517'").count()); Assertions.assertEquals(1, propagatedAuthors.filter("pid = '0000-0001-9513-2468'").count());
} }
@Test @Test
void twoUpdatesTest() throws Exception { void twoUpdatesTest() throws Exception {
SparkOrcidToResultFromSemRelJob SparkPropagateOrcidAuthor
.main( .main(
new String[] { new String[] {
"-isTest", "-graphPath",
Boolean.TRUE.toString(),
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-sourcePath",
getClass() getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/twoupdates") "/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/twoupdates")
.getPath(), .getPath(),
"-hive_metastore_uris", "-targetPath",
"", workingDir.toString() + "/graph",
"-saveGraph", "-orcidPath", "",
"true", "-workingDir", workingDir.toString(),
"-resultTableName", "-matchingSource", "propagation"
"eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath",
workingDir.toString() + "/dataset",
"-possibleUpdatesPath",
getClass()
.getResource(
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc")
.getPath()
}); });
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Dataset> tmp = sc JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/dataset") .textFile(workingDir.toString() + "/graph/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
Assertions.assertEquals(10, tmp.count()); Assertions.assertEquals(10, tmp.count());
@ -229,10 +214,10 @@ public class OrcidPropagationJobTest {
Assertions Assertions
.assertEquals( .assertEquals(
1, propagatedAuthors.filter("name = 'Marc' and surname = 'Schmidtmann'").count()); 1, propagatedAuthors.filter("name = 'Nicole' and surname = 'Jung'").count());
Assertions Assertions
.assertEquals( .assertEquals(
1, propagatedAuthors.filter("name = 'Ruediger' and surname = 'Beckhaus'").count()); 1, propagatedAuthors.filter("name = 'Camilo José' and surname = 'Cela'").count());
query = "select id, MyT.name name, MyT.surname surname, MyP.value pid ,MyP.qualifier.classid pidType " query = "select id, MyT.name name, MyT.surname surname, MyP.value pid ,MyP.qualifier.classid pidType "
+ "from dataset " + "from dataset "
@ -243,13 +228,27 @@ public class OrcidPropagationJobTest {
Assertions Assertions
.assertEquals( .assertEquals(
2, authorsExplodedPids.filter("name = 'Marc' and surname = 'Schmidtmann'").count()); 3, authorsExplodedPids.filter("name = 'Camilo José' and surname = 'Cela'").count());
Assertions Assertions
.assertEquals( .assertEquals(
1, 1,
authorsExplodedPids authorsExplodedPids
.filter( .filter(
"name = 'Marc' and surname = 'Schmidtmann' and pidType = 'MAG Identifier'") "name = 'Camilo José' and surname = 'Cela' and pidType = 'MAG Identifier'")
.count());
Assertions
.assertEquals(
1,
authorsExplodedPids
.filter(
"name = 'Camilo José' and surname = 'Cela' and pidType = 'orcid'")
.count());
Assertions
.assertEquals(
1,
authorsExplodedPids
.filter(
"name = 'Camilo José' and surname = 'Cela' and pidType = 'orcid_pending'")
.count()); .count());
} }
} }

View File

@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.orcidtoresultfromsemrel.OrcidPropagationJobTest; import eu.dnetlib.dhp.orcidtoresultfromsemrel.OrcidPropagationJobTest;
@ -30,7 +31,8 @@ public class ResultToCommunityJobTest {
private static final Logger log = LoggerFactory.getLogger(ResultToCommunityJobTest.class); private static final Logger log = LoggerFactory.getLogger(ResultToCommunityJobTest.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
private static SparkSession spark; private static SparkSession spark;

View File

@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
@ -34,7 +35,8 @@ public class ResultToCommunityJobTest {
private static final Logger log = LoggerFactory.getLogger(ResultToCommunityJobTest.class); private static final Logger log = LoggerFactory.getLogger(ResultToCommunityJobTest.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
private static SparkSession spark; private static SparkSession spark;

View File

@ -0,0 +1,2 @@
{"subRelType": "supplement", "relClass": "IsSupplementedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "target": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "source": "50|dedup_wf_001::36bcfaa1494c849547a346da688ade24", "collectedfrom": [], "validated": false, "properties": []}
{"subRelType": "supplement", "relClass": "IsSupplementTo", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "source": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "target": "50|dedup_wf_001::36bcfaa1494c849547a346da688ade24", "collectedfrom": [], "validated": false, "properties": []}

View File

@ -0,0 +1,9 @@
{"subRelType": "supplement", "relClass": "IsSupplementedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "target": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "source": "50|dedup_wf_001::36bcfaa1494c849547a346da688ade24", "collectedfrom": [], "validated": false, "properties": []}
{"subRelType": "supplement", "relClass": "IsSupplementTo", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "source": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "target": "50|dedup_wf_001::36bcfaa1494c849547a346da688ade24", "collectedfrom": [], "validated": false, "properties": []}
{"subRelType": "supplement", "relClass": "IsSupplementedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "target": "50|od______3989::0f89464c4ac4c398fe0c71433b175a62", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "source": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "collectedfrom": [], "validated": false, "properties": []}
{"subRelType": "supplement", "relClass": "IsSupplementTo", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "source": "50|od______3989::0f89464c4ac4c398fe0c71433b175a62", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "target": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "collectedfrom": [], "validated": false, "properties": []}

View File

@ -1,129 +0,0 @@
package eu.dnetlib.dhp.enrich.orcid
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.{Author, StructuredProperty}
import eu.dnetlib.dhp.schema.sx.OafUtils
import eu.dnetlib.pace.util.AuthorMatchers
import java.util
import scala.beans.BeanProperty
import scala.collection.JavaConverters._
import scala.util.control.Breaks.{break, breakable}
case class ORCIDAuthorEnricherResult(
@BeanProperty var id: String,
@BeanProperty var enriched_author: java.util.List[Author],
@BeanProperty var author_matched: java.util.List[MatchedAuthors],
@BeanProperty var author_unmatched: java.util.List[Author],
@BeanProperty var orcid_unmatched: java.util.List[OrcidAutor]
)
object ORCIDAuthorEnricher extends Serializable {
def enrichOrcid(
id: String,
graph_authors: java.util.List[Author],
orcid_authors: java.util.List[OrcidAutor]
): ORCIDAuthorEnricherResult = {
// Author enriching strategy:
// 1) create a copy of graph author list in unmatched_authors
// 2) find best match in unmatched_authors, remove it from unmatched_authors and enrich it so
// that the enrichment is reflected in graph_authors (they share author instances)
// 3) repeat (2) till the end of the list and then with different matching algorithms that have decreasing
// trust in their output
// At the end unmatched_authors will contain authors not matched with any of the matching algos
val unmatched_authors = new util.ArrayList[Author](graph_authors)
val matches = {
// Look after exact fullname match, reconstruct ORCID fullname as givenName + familyName
extractAndEnrichMatches(
unmatched_authors,
orcid_authors,
(author, orcid) =>
AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.givenName + " " + orcid.familyName),
"fullName"
) ++
// Look after exact reversed fullname match, reconstruct ORCID fullname as familyName + givenName
extractAndEnrichMatches(
unmatched_authors,
orcid_authors,
(author, orcid) =>
AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.familyName + " " + orcid.givenName),
"reversedFullName"
) ++
// split author names in tokens, order the tokens, then check for matches of full tokens or abbreviations
extractAndEnrichMatches(
unmatched_authors,
orcid_authors,
(author, orcid) =>
AuthorMatchers
.matchOrderedTokenAndAbbreviations(author.getFullname, orcid.givenName + " " + orcid.familyName),
"orderedTokens"
) ++
// look after exact matches of ORCID creditName
extractAndEnrichMatches(
unmatched_authors,
orcid_authors,
(author, orcid) => AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.creditName),
"creditName"
) ++
// look after exact matches in ORCID otherNames
extractAndEnrichMatches(
unmatched_authors,
orcid_authors,
(author, orcid) =>
orcid.otherNames != null && AuthorMatchers.matchOtherNames(author.getFullname, orcid.otherNames.asScala),
"otherNames"
)
}
ORCIDAuthorEnricherResult(id, graph_authors, matches.asJava, unmatched_authors, orcid_authors)
}
private def extractAndEnrichMatches(
graph_authors: java.util.List[Author],
orcid_authors: java.util.List[OrcidAutor],
matchingFunc: (Author, OrcidAutor) => Boolean,
matchName: String
) = {
val matched = scala.collection.mutable.ArrayBuffer.empty[MatchedAuthors]
if (graph_authors != null && !graph_authors.isEmpty) {
val ait = graph_authors.iterator
while (ait.hasNext) {
val author = ait.next()
val oit = orcid_authors.iterator
breakable {
while (oit.hasNext) {
val orcid = oit.next()
if (matchingFunc(author, orcid)) {
ait.remove()
oit.remove()
matched += MatchedAuthors(author, orcid, matchName)
if (author.getPid == null) {
author.setPid(new util.ArrayList[StructuredProperty]())
}
val orcidPID = OafUtils.createSP(orcid.orcid, ModelConstants.ORCID, ModelConstants.ORCID)
orcidPID.setDataInfo(OafUtils.generateDataInfo())
orcidPID.getDataInfo.setProvenanceaction(
OafUtils.createQualifier("ORCID_ENRICHMENT", "ORCID_ENRICHMENT")
)
author.getPid.add(orcidPID)
break()
}
}
}
}
}
matched
}
}

View File

@ -1,93 +1,23 @@
package eu.dnetlib.dhp.enrich.orcid package eu.dnetlib.dhp.enrich.orcid
import eu.dnetlib.dhp.application.AbstractScalaApplication import eu.dnetlib.dhp.common.author.SparkEnrichWithOrcidAuthors
import eu.dnetlib.dhp.schema.common.ModelSupport import eu.dnetlib.dhp.schema.common.ModelSupport
import eu.dnetlib.dhp.schema.oaf._ import eu.dnetlib.dhp.utils.ORCIDAuthorEnricherResult
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import scala.beans.BeanProperty
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
case class OrcidAutor(
@BeanProperty var orcid: String,
@BeanProperty var familyName: String,
@BeanProperty var givenName: String,
@BeanProperty var creditName: String,
@BeanProperty var otherNames: java.util.List[String]
) {
def this() = this("null", "null", "null", "null", null)
}
case class MatchData(
@BeanProperty var id: String,
@BeanProperty var graph_authors: java.util.List[Author],
@BeanProperty var orcid_authors: java.util.List[OrcidAutor]
) {
def this() = this("null", null, null)
}
case class MatchedAuthors(
@BeanProperty var author: Author,
@BeanProperty var orcid: OrcidAutor,
@BeanProperty var `type`: String
)
class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String], log: Logger) class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String], log: Logger)
extends AbstractScalaApplication(propertyPath, args, log: Logger) { extends SparkEnrichWithOrcidAuthors(propertyPath, args, log: Logger) {
/** Here all the spark applications runs this method override def createTemporaryData(
* where the whole logic of the spark node is defined spark: SparkSession,
*/ graphPath: String,
override def run(): Unit = { orcidPath: String,
val graphPath = parser.get("graphPath") targetPath: String
log.info(s"graphPath is '$graphPath'") ): Unit = {
val orcidPath = parser.get("orcidPath")
log.info(s"orcidPath is '$orcidPath'")
val targetPath = parser.get("targetPath")
log.info(s"targetPath is '$targetPath'")
val workingDir = parser.get("workingDir")
log.info(s"targetPath is '$workingDir'")
createTemporaryData(graphPath, orcidPath, workingDir)
analisys(workingDir)
generateGraph(graphPath, workingDir, targetPath)
}
private def generateGraph(graphPath: String, workingDir: String, targetPath: String): Unit = {
ModelSupport.entityTypes.asScala
.filter(e => ModelSupport.isResult(e._1))
.foreach(e => {
val resultType = e._1.name()
val enc = Encoders.bean(e._2)
val matched = spark.read
.schema(Encoders.bean(classOf[ORCIDAuthorEnricherResult]).schema)
.parquet(s"${workingDir}/${resultType}_matched")
.selectExpr("id", "enriched_author")
spark.read
.schema(enc.schema)
.json(s"$graphPath/$resultType")
.join(matched, Seq("id"), "left")
.withColumn(
"author",
when(size(col("enriched_author")).gt(0), col("enriched_author"))
.otherwise(col("author"))
)
.drop("enriched_author")
.write
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(s"${targetPath}/${resultType}")
})
}
private def createTemporaryData(graphPath: String, orcidPath: String, targetPath: String): Unit = {
val orcidAuthors = val orcidAuthors =
spark.read.load(s"$orcidPath/Authors").select("orcid", "familyName", "givenName", "creditName", "otherNames") spark.read.load(s"$orcidPath/Authors").select("orcid", "familyName", "givenName", "creditName", "otherNames")
@ -157,25 +87,6 @@ class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String]
orcidWorksWithAuthors.unpersist() orcidWorksWithAuthors.unpersist()
} }
private def analisys(targetPath: String): Unit = {
ModelSupport.entityTypes.asScala
.filter(e => ModelSupport.isResult(e._1))
.foreach(e => {
val resultType = e._1.name()
spark.read
.parquet(s"$targetPath/${resultType}_unmatched")
.where("size(graph_authors) > 0")
.as[MatchData](Encoders.bean(classOf[MatchData]))
.map(md => {
ORCIDAuthorEnricher.enrichOrcid(md.id, md.graph_authors, md.orcid_authors)
})(Encoders.bean(classOf[ORCIDAuthorEnricherResult]))
.write
.option("compression", "gzip")
.mode("overwrite")
.parquet(s"$targetPath/${resultType}_matched")
})
}
} }
object SparkEnrichGraphWithOrcidAuthors { object SparkEnrichGraphWithOrcidAuthors {

View File

@ -1,6 +1,6 @@
package eu.dnetlib.dhp.incremental package eu.dnetlib.dhp.incremental
import eu.dnetlib.dhp.PropagationConstant import eu.dnetlib.dhp.common.enrichment.Constants
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.bulktag.community.TaggingConstants import eu.dnetlib.dhp.bulktag.community.TaggingConstants
import eu.dnetlib.dhp.schema.common.ModelSupport import eu.dnetlib.dhp.schema.common.ModelSupport
@ -63,7 +63,7 @@ object SparkAppendContextCleanedGraph {
c.getDataInfo.asScala c.getDataInfo.asScala
.filter( .filter(
di => di =>
!di.getInferenceprovenance.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE) !di.getInferenceprovenance.equals(Constants.PROPAGATION_DATA_INFO_TYPE)
&& !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE) && !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE)
) )
.toList .toList