[Person] extract relations from data from publishers web pages

This commit is contained in:
Miriam Baglioni 2024-10-31 09:07:12 +01:00
parent 8fbfa56d45
commit 07345fae75
1 changed files with 120 additions and 10 deletions

View File

@ -157,7 +157,7 @@ public class ExtractPerson implements Serializable {
}
private static void extractInfoForActionSetFromPublisher(SparkSession spark, String inputPath, String workingDir) {
//Read the publishers output
Dataset<Row> df = spark
.read()
.schema(
@ -170,19 +170,115 @@ public class ExtractPerson implements Serializable {
.json(inputPath)
.where("DOI is not null");
df.selectExpr("DOI", "explode(Authors) as author")
.selectExpr("DOI", "author.Name.Full as fullname", "author.Name.First as firstname",
"author.Name.Last as lastname", "author.Contributor_roles as roles",
"author.Corresponding as corresponding", "author.Matchings as affs",
"authors.PIDs as pid")
.where("pid.Schema=='ORCID'")
.selectExpr("explode affs as affiliation", "DOI", "fullname", "firstname", "lastname", "roles", "pid.Value as orcid")
.where("aff.Status == 'active");
//Select the relevant information
Dataset<Row> authors = df.selectExpr("DOI", "explode(Authors) as author")
.selectExpr("DOI", "author.Contributor_roles as roles",
"author.Corresponding as corresponding", "author.Matchings as affs",
"authors.PIDs as pid")
.where("pid.Schema=='ORCID'")
.selectExpr("explode affs as affiliation", "DOI", "corresponding", "roles", "pid.Value as orcid")
.where("aff.Status == 'active")
.selectExpr("affiliation", "DOI", "corresponding", "explode roles as role", "orcid");
//create the relation dataset with possible redundant relations
Dataset<Relation> relations = authors.flatMap((FlatMapFunction<Row, Relation>) a ->
Arrays.asList(getAuthorshipRelation(a), getAffiliationRelation(a)).iterator(), Encoders.bean(Relation.class))
.unionAll(
df.selectExpr("DOI","explode Authors as author").where("author.PIDs.Schema == 'ORCID")
.selectExpr("DOI", "author.PIDs.value as orcid")
.groupByKey((MapFunction<Row, String>) r -> r.getAs("DOI"), Encoders.STRING() )
.mapGroups((MapGroupsFunction<String, Row, Coauthors>) (k,it) -> extractCoAuthorsRow(it),
Encoders.bean(Coauthors.class))
.flatMap(
(FlatMapFunction<Coauthors, Relation>) c -> new CoAuthorshipIterator(c.getCoauthors()),
Encoders.bean(Relation.class))
.groupByKey((MapFunction<Relation, String>) r -> r.getSource() + r.getTarget(), Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, Relation, Relation>) (k, it) -> it.next(), Encoders.bean(Relation.class)));
//produce one dataset with only one relation per source, target and semantics. Eventually extend the list of properties
relations.groupByKey((MapFunction<Relation, String>) r -> r.getSource() + r.getRelClass() + r.getTarget(), Encoders.STRING() )
.mapGroups((MapGroupsFunction<String, Relation, Relation>) (k,it) -> mergeRelation(it), Encoders.bean(Relation.class) )
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(workingDir + "/publishers");
}
private static Relation mergeRelation(Iterator<Relation> it){
Relation r = it.next();
while (it.hasNext()){
Relation r1 = it.next();
r = MergeUtils.mergeRelation(r, r1);
}
return r;
}
private static @NotNull Relation getAuthorshipRelation(Row a) {
String target = DOI_PREFIX
+ IdentifierFactory
.md5(PidCleaner.normalizePidValue(PidType.doi.toString(), a.getAs("DOI")));
;
String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(a.getAs("orcid"));
Relation relation = OafMapperUtils
.getRelation(
source, target, ModelConstants.RESULT_PERSON_RELTYPE,
ModelConstants.RESULT_PERSON_SUBRELTYPE,
ModelConstants.RESULT_PERSON_HASAUTHORED,
OafMapperUtils.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME),
null,
null);
if(StringUtil.isNotBlank(a.getAs("aff.Value"))){
KeyValue kv = new KeyValue();
kv.setKey("declared_affiliation");
kv.setValue(ROR_PREFIX
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", a.getAs("aff.Value"))));
relation.getProperties().add(new KeyValue());
}
if(StringUtil.isNotBlank(a.getAs("corresponding")) && a.getAs("corresponding").equals("true")){
KeyValue kv = new KeyValue();
kv.setKey("corresponding");
kv.setValue("true");
relation.getProperties().add(new KeyValue());
}
if(StringUtil.isNotBlank(a.getAs("role"))){
KeyValue kv = new KeyValue();
kv.setKey("role");
String role = (String) a.getAs("role.Schema")
+(String) a.getAs("role.value");
kv.setValue(role);
relation.getProperties().add(new KeyValue());
}
return relation;
}
private static @NotNull Relation getAffiliationRelation(Row a) {
String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(a.getAs("orcid"));
String target = ROR_PREFIX
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", a.getAs("aff.Value")));
Relation relation = OafMapperUtils
.getRelation(
source, target, ModelConstants.ORG_PERSON_RELTYPE,
ModelConstants.ORG_PERSON_SUBRELTYPE,
ModelConstants.ORG_PERSON_PARTICIPATES,
OafMapperUtils.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME),
null,
null);
return relation;
}
private static void extractInfoForActionSetFromProjects(SparkSession spark, String inputPath, String workingDir,
String dbUrl, String dbUser, String dbPassword, String hdfsPath, String hdfsNameNode) throws IOException {
@ -280,6 +376,10 @@ public class ExtractPerson implements Serializable {
getRelations(spark, workingDir + "/project")
.toJavaRDD()
.map(r -> new AtomicAction(r.getClass(), r)))
.union(
getRelations(spark, workingDir + "/publishers")
.toJavaRDD()
.map(r -> new AtomicAction(r.getClass(), r)))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
@ -448,6 +548,16 @@ public class ExtractPerson implements Serializable {
return coauth;
}
private static Coauthors extractCoAuthorsRow(Iterator<Row> it) {
Coauthors coauth = new Coauthors();
List<String> coauthors = new ArrayList<>();
while (it.hasNext())
coauthors.add(it.next().getAs("orcid"));
coauth.setCoauthors(coauthors);
return coauth;
}
private static Relation getAffiliationRelation(Employment row) {
String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(row.getOrcid());
String target = ROR_PREFIX
@ -529,7 +639,7 @@ public class ExtractPerson implements Serializable {
source, target, ModelConstants.RESULT_PERSON_RELTYPE,
ModelConstants.RESULT_PERSON_SUBRELTYPE,
ModelConstants.RESULT_PERSON_HASAUTHORED,
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
Collections.singletonList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
ORCIDDATAINFO,
null);
relation.setValidated(true);