Merge pull request 'PersonEntity' (#459) from person into beta
Reviewed-on: #459
This commit is contained in:
commit
c25b048e12
|
@ -0,0 +1,80 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Person;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||||
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
|
|
||||||
|
public class CoAuthorshipIterator implements Iterator<Relation> {
|
||||||
|
private int firstIndex;
|
||||||
|
private int secondIndex;
|
||||||
|
private boolean firstRelation;
|
||||||
|
private List<String> authors;
|
||||||
|
private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______::";
|
||||||
|
private static final String OPENAIRE_PREFIX = "openaire____";
|
||||||
|
private static final String SEPARATOR = "::";
|
||||||
|
private static final String ORCID_KEY = "10|" + OPENAIRE_PREFIX + SEPARATOR
|
||||||
|
+ DHPUtils.md5(ModelConstants.ORCID.toLowerCase());
|
||||||
|
public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid";
|
||||||
|
public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return firstIndex < authors.size() - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Relation next() {
|
||||||
|
Relation rel = null;
|
||||||
|
if (firstRelation) {
|
||||||
|
rel = getRelation(authors.get(firstIndex), authors.get(secondIndex));
|
||||||
|
firstRelation = Boolean.FALSE;
|
||||||
|
} else {
|
||||||
|
rel = getRelation(authors.get(secondIndex), authors.get(firstIndex));
|
||||||
|
firstRelation = Boolean.TRUE;
|
||||||
|
secondIndex += 1;
|
||||||
|
if (secondIndex >= authors.size()) {
|
||||||
|
firstIndex += 1;
|
||||||
|
secondIndex = firstIndex + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rel;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CoAuthorshipIterator(List<String> authors) {
|
||||||
|
this.authors = authors;
|
||||||
|
this.firstIndex = 0;
|
||||||
|
this.secondIndex = 1;
|
||||||
|
this.firstRelation = Boolean.TRUE;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private Relation getRelation(String orcid1, String orcid2) {
|
||||||
|
String source = PERSON_PREFIX + IdentifierFactory.md5(orcid1);
|
||||||
|
String target = PERSON_PREFIX + IdentifierFactory.md5(orcid2);
|
||||||
|
return OafMapperUtils
|
||||||
|
.getRelation(
|
||||||
|
source, target, ModelConstants.PERSON_PERSON_RELTYPE,
|
||||||
|
ModelConstants.PERSON_PERSON_SUBRELTYPE,
|
||||||
|
ModelConstants.PERSON_PERSON_HASCOAUTHORED,
|
||||||
|
Arrays.asList(OafMapperUtils.keyValue(ORCID_KEY, ModelConstants.ORCID_DS)),
|
||||||
|
OafMapperUtils
|
||||||
|
.dataInfo(
|
||||||
|
false, null, false, false,
|
||||||
|
OafMapperUtils
|
||||||
|
.qualifier(
|
||||||
|
ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME,
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||||
|
"0.91"),
|
||||||
|
null);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,20 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
|
||||||
|
public class Coauthors implements Serializable {
|
||||||
|
private List<String> coauthors;
|
||||||
|
|
||||||
|
public List<String> getCoauthors() {
|
||||||
|
return coauthors;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCoauthors(List<String> coauthors) {
|
||||||
|
this.coauthors = coauthors;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,40 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Person;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
public class Couples implements Serializable {
|
||||||
|
Person p;
|
||||||
|
Relation r;
|
||||||
|
|
||||||
|
public Couples() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public Person getP() {
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setP(Person p) {
|
||||||
|
this.p = p;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Relation getR() {
|
||||||
|
return r;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setR(Relation r) {
|
||||||
|
this.r = r;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <Tuples> Couples newInstance(Tuple2<Person, Relation> couple) {
|
||||||
|
Couples c = new Couples();
|
||||||
|
c.p = couple._1();
|
||||||
|
c.r = couple._2();
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,431 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
import static org.apache.spark.sql.functions.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.cli.ParseException;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.compress.BZip2Codec;
|
||||||
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
|
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.*;
|
||||||
|
import org.apache.spark.sql.*;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.spark_project.jetty.util.StringUtil;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.actionmanager.Constants;
|
||||||
|
import eu.dnetlib.dhp.actionmanager.transformativeagreement.model.TransformativeAgreementModel;
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.collection.orcid.model.Author;
|
||||||
|
import eu.dnetlib.dhp.collection.orcid.model.Employment;
|
||||||
|
import eu.dnetlib.dhp.collection.orcid.model.Work;
|
||||||
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Person;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Pid;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
||||||
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
public class ExtractPerson implements Serializable {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(ExtractPerson.class);
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
private static final String OPENAIRE_PREFIX = "openaire____";
|
||||||
|
private static final String SEPARATOR = "::";
|
||||||
|
private static final String orcidKey = "10|" + OPENAIRE_PREFIX + SEPARATOR
|
||||||
|
+ DHPUtils.md5(ModelConstants.ORCID.toLowerCase());
|
||||||
|
|
||||||
|
private static final String DOI_PREFIX = "50|doi_________::";
|
||||||
|
|
||||||
|
private static final String PMID_PREFIX = "50|pmid________::";
|
||||||
|
private static final String ARXIV_PREFIX = "50|arXiv_______::";
|
||||||
|
|
||||||
|
private static final String PMCID_PREFIX = "50|pmcid_______::";
|
||||||
|
private static final String ROR_PREFIX = "20|ror_________::";
|
||||||
|
private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______";
|
||||||
|
public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid";
|
||||||
|
public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID";
|
||||||
|
|
||||||
|
public static void main(final String[] args) throws IOException, ParseException {
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
Objects
|
||||||
|
.requireNonNull(
|
||||||
|
ExtractPerson.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json"))));
|
||||||
|
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final String inputPath = parser.get("inputPath");
|
||||||
|
log.info("inputPath {}", inputPath);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath {}", outputPath);
|
||||||
|
|
||||||
|
final String workingDir = parser.get("workingDir");
|
||||||
|
log.info("workingDir {}", workingDir);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
runWithSparkSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
|
||||||
|
createActionSet(spark, inputPath, outputPath, workingDir);
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void createActionSet(SparkSession spark, String inputPath, String outputPath, String workingDir) {
|
||||||
|
|
||||||
|
Dataset<Author> authors = spark
|
||||||
|
.read()
|
||||||
|
.parquet(inputPath + "Authors")
|
||||||
|
.as(Encoders.bean(Author.class));
|
||||||
|
|
||||||
|
Dataset<Work> works = spark
|
||||||
|
.read()
|
||||||
|
.parquet(inputPath + "Works")
|
||||||
|
.as(Encoders.bean(Work.class))
|
||||||
|
.filter(
|
||||||
|
(FilterFunction<Work>) w -> Optional.ofNullable(w.getPids()).isPresent() &&
|
||||||
|
w
|
||||||
|
.getPids()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(
|
||||||
|
p -> p.getSchema().equalsIgnoreCase("doi") ||
|
||||||
|
p.getSchema().equalsIgnoreCase("pmc") ||
|
||||||
|
p.getSchema().equalsIgnoreCase("pmid") ||
|
||||||
|
p.getSchema().equalsIgnoreCase("arxiv")));
|
||||||
|
|
||||||
|
Dataset<Employment> employmentDataset = spark
|
||||||
|
.read()
|
||||||
|
.parquet(inputPath + "Employments")
|
||||||
|
.as(Encoders.bean(Employment.class));
|
||||||
|
|
||||||
|
Dataset<Author> peopleToMap = authors
|
||||||
|
.joinWith(works, authors.col("orcid").equalTo(works.col("orcid")))
|
||||||
|
.map((MapFunction<Tuple2<Author, Work>, Author>) t2 -> t2._1(), Encoders.bean(Author.class))
|
||||||
|
.groupByKey((MapFunction<Author, String>) a -> a.getOrcid(), Encoders.STRING())
|
||||||
|
.mapGroups((MapGroupsFunction<String, Author, Author>) (k, it) -> it.next(), Encoders.bean(Author.class));
|
||||||
|
|
||||||
|
Dataset<Employment> employment = employmentDataset
|
||||||
|
.joinWith(peopleToMap, employmentDataset.col("orcid").equalTo(peopleToMap.col("orcid")))
|
||||||
|
.map((MapFunction<Tuple2<Employment, Author>, Employment>) t2 -> t2._1(), Encoders.bean(Employment.class));
|
||||||
|
|
||||||
|
Dataset<Person> people;
|
||||||
|
peopleToMap.map((MapFunction<Author, Person>) op -> {
|
||||||
|
Person person = new Person();
|
||||||
|
person.setId(DHPUtils.generateIdentifier(op.getOrcid(), PERSON_PREFIX));
|
||||||
|
person
|
||||||
|
.setBiography(
|
||||||
|
Optional
|
||||||
|
.ofNullable(op.getBiography())
|
||||||
|
|
||||||
|
.orElse(""));
|
||||||
|
KeyValue kv = OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS);
|
||||||
|
kv.setDataInfo(null);
|
||||||
|
person.setCollectedfrom(Arrays.asList(kv));
|
||||||
|
person
|
||||||
|
.setAlternativeNames(
|
||||||
|
Optional
|
||||||
|
.ofNullable(op.getOtherNames())
|
||||||
|
|
||||||
|
.orElse(new ArrayList<>()));
|
||||||
|
person
|
||||||
|
.setFamilyName(
|
||||||
|
Optional
|
||||||
|
.ofNullable(op.getFamilyName())
|
||||||
|
|
||||||
|
.orElse(""));
|
||||||
|
person
|
||||||
|
.setGivenName(
|
||||||
|
Optional
|
||||||
|
.ofNullable(op.getGivenName())
|
||||||
|
|
||||||
|
.orElse(""));
|
||||||
|
person
|
||||||
|
.setPid(
|
||||||
|
Optional
|
||||||
|
.ofNullable(op.getOtherPids())
|
||||||
|
.map(
|
||||||
|
v -> v
|
||||||
|
.stream()
|
||||||
|
.map(p -> Pid.newInstance(p.getSchema(), p.getValue()))
|
||||||
|
.collect(Collectors.toList()))
|
||||||
|
.orElse(new ArrayList<>()));
|
||||||
|
person.getPid().add(Pid.newInstance(ModelConstants.ORCID, op.getOrcid()));
|
||||||
|
person.setDateofcollection(op.getLastModifiedDate());
|
||||||
|
person.setOriginalId(Arrays.asList(op.getOrcid()));
|
||||||
|
return person;
|
||||||
|
}, Encoders.bean(Person.class))
|
||||||
|
.write()
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.json(workingDir + "/people");
|
||||||
|
|
||||||
|
works
|
||||||
|
.flatMap(
|
||||||
|
(FlatMapFunction<Work, Relation>) ExtractPerson::getAuthorshipRelationIterator,
|
||||||
|
Encoders.bean(Relation.class))
|
||||||
|
.write()
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.json(workingDir + "/authorship");
|
||||||
|
|
||||||
|
Dataset<Relation> coauthorship = works
|
||||||
|
.flatMap((FlatMapFunction<Work, Tuple2<String, String>>) w -> {
|
||||||
|
List<Tuple2<String, String>> lista = new ArrayList<>();
|
||||||
|
w.getPids().stream().forEach(p -> {
|
||||||
|
if (p.getSchema().equalsIgnoreCase("doi") || p.getSchema().equalsIgnoreCase("pmc")
|
||||||
|
|| p.getSchema().equalsIgnoreCase("pmid") || p.getSchema().equalsIgnoreCase("arxiv"))
|
||||||
|
lista.add(new Tuple2<>(p.getValue(), w.getOrcid()));
|
||||||
|
});
|
||||||
|
return lista.iterator();
|
||||||
|
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
|
||||||
|
.groupByKey((MapFunction<Tuple2<String, String>, String>) Tuple2::_1, Encoders.STRING())
|
||||||
|
.mapGroups(
|
||||||
|
(MapGroupsFunction<String, Tuple2<String, String>, Coauthors>) (k, it) -> extractCoAuthors(it),
|
||||||
|
Encoders.bean(Coauthors.class))
|
||||||
|
.flatMap(
|
||||||
|
(FlatMapFunction<Coauthors, Relation>) c -> new CoAuthorshipIterator(c.getCoauthors()),
|
||||||
|
Encoders.bean(Relation.class))
|
||||||
|
.groupByKey((MapFunction<Relation, String>) r -> r.getSource() + r.getTarget(), Encoders.STRING())
|
||||||
|
.mapGroups(
|
||||||
|
(MapGroupsFunction<String, Relation, Relation>) (k, it) -> it.next(), Encoders.bean(Relation.class));
|
||||||
|
|
||||||
|
coauthorship
|
||||||
|
.write()
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.json(workingDir + "/coauthorship");
|
||||||
|
|
||||||
|
employment
|
||||||
|
.filter((FilterFunction<Employment>) e -> Optional.ofNullable(e.getAffiliationId()).isPresent())
|
||||||
|
.filter((FilterFunction<Employment>) e -> e.getAffiliationId().getSchema().equalsIgnoreCase("ror"))
|
||||||
|
.map(
|
||||||
|
(MapFunction<Employment, Relation>) ExtractPerson::getAffiliationRelation,
|
||||||
|
Encoders.bean(Relation.class))
|
||||||
|
.write()
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.json(workingDir + "/affiliation");
|
||||||
|
|
||||||
|
people = spark
|
||||||
|
.read()
|
||||||
|
.textFile(workingDir + "/people")
|
||||||
|
.map(
|
||||||
|
(MapFunction<String, Person>) value -> OBJECT_MAPPER
|
||||||
|
.readValue(value, Person.class),
|
||||||
|
Encoders.bean(Person.class));
|
||||||
|
|
||||||
|
people.show(false);
|
||||||
|
people
|
||||||
|
.toJavaRDD()
|
||||||
|
.map(p -> new AtomicAction(p.getClass(), p))
|
||||||
|
.union(
|
||||||
|
getRelations(spark, workingDir + "/authorship").toJavaRDD().map(r -> new AtomicAction(r.getClass(), r)))
|
||||||
|
.union(
|
||||||
|
getRelations(spark, workingDir + "/coauthorship")
|
||||||
|
.toJavaRDD()
|
||||||
|
.map(r -> new AtomicAction(r.getClass(), r)))
|
||||||
|
.union(
|
||||||
|
getRelations(spark, workingDir + "/affiliation")
|
||||||
|
.toJavaRDD()
|
||||||
|
.map(r -> new AtomicAction(r.getClass(), r)))
|
||||||
|
.mapToPair(
|
||||||
|
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||||
|
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
||||||
|
.saveAsHadoopFile(
|
||||||
|
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Dataset<Relation> getRelations(SparkSession spark, String path) {
|
||||||
|
return spark
|
||||||
|
.read()
|
||||||
|
.textFile(path)
|
||||||
|
.map(
|
||||||
|
(MapFunction<String, Relation>) value -> OBJECT_MAPPER
|
||||||
|
.readValue(value, Relation.class),
|
||||||
|
Encoders.bean(Relation.class));// spark.read().json(path).as(Encoders.bean(Relation.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Coauthors extractCoAuthors(Iterator<Tuple2<String, String>> it) {
|
||||||
|
Coauthors coauth = new Coauthors();
|
||||||
|
List<String> coauthors = new ArrayList<>();
|
||||||
|
while (it.hasNext())
|
||||||
|
coauthors.add(it.next()._2());
|
||||||
|
coauth.setCoauthors(coauthors);
|
||||||
|
|
||||||
|
return coauth;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Relation getAffiliationRelation(Employment row) {
|
||||||
|
String source = PERSON_PREFIX + IdentifierFactory.md5(row.getOrcid());
|
||||||
|
String target = ROR_PREFIX
|
||||||
|
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAffiliationId().getValue()));
|
||||||
|
List<KeyValue> properties = new ArrayList<>();
|
||||||
|
|
||||||
|
Relation relation = OafMapperUtils
|
||||||
|
.getRelation(
|
||||||
|
source, target, ModelConstants.ORG_PERSON_RELTYPE, ModelConstants.ORG_PERSON_SUBRELTYPE,
|
||||||
|
ModelConstants.ORG_PERSON_PARTICIPATES,
|
||||||
|
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||||
|
OafMapperUtils
|
||||||
|
.dataInfo(
|
||||||
|
false, null, false, false,
|
||||||
|
OafMapperUtils
|
||||||
|
.qualifier(
|
||||||
|
ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||||
|
"0.91"),
|
||||||
|
null);
|
||||||
|
|
||||||
|
if (Optional.ofNullable(row.getStartDate()).isPresent() && StringUtil.isNotBlank(row.getStartDate())) {
|
||||||
|
KeyValue kv = new KeyValue();
|
||||||
|
kv.setKey("startDate");
|
||||||
|
kv.setValue(row.getStartDate());
|
||||||
|
properties.add(kv);
|
||||||
|
}
|
||||||
|
if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtil.isNotBlank(row.getEndDate())) {
|
||||||
|
KeyValue kv = new KeyValue();
|
||||||
|
kv.setKey("endDate");
|
||||||
|
kv.setValue(row.getEndDate());
|
||||||
|
properties.add(kv);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (properties.size() > 0)
|
||||||
|
relation.setProperties(properties);
|
||||||
|
return relation;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Collection<? extends Relation> getCoAuthorshipRelations(String orcid1, String orcid2) {
|
||||||
|
String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid1);
|
||||||
|
String target = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid2);
|
||||||
|
|
||||||
|
return Arrays
|
||||||
|
.asList(
|
||||||
|
OafMapperUtils
|
||||||
|
.getRelation(
|
||||||
|
source, target, ModelConstants.PERSON_PERSON_RELTYPE,
|
||||||
|
ModelConstants.PERSON_PERSON_SUBRELTYPE,
|
||||||
|
ModelConstants.PERSON_PERSON_HASCOAUTHORED,
|
||||||
|
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||||
|
OafMapperUtils
|
||||||
|
.dataInfo(
|
||||||
|
false, null, false, false,
|
||||||
|
OafMapperUtils
|
||||||
|
.qualifier(
|
||||||
|
ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME,
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||||
|
"0.91"),
|
||||||
|
null),
|
||||||
|
OafMapperUtils
|
||||||
|
.getRelation(
|
||||||
|
target, source, ModelConstants.PERSON_PERSON_RELTYPE,
|
||||||
|
ModelConstants.PERSON_PERSON_SUBRELTYPE,
|
||||||
|
ModelConstants.PERSON_PERSON_HASCOAUTHORED,
|
||||||
|
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||||
|
OafMapperUtils
|
||||||
|
.dataInfo(
|
||||||
|
false, null, false, false,
|
||||||
|
OafMapperUtils
|
||||||
|
.qualifier(
|
||||||
|
ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME,
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||||
|
"0.91"),
|
||||||
|
null));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static @NotNull Iterator<Relation> getAuthorshipRelationIterator(Work w) {
|
||||||
|
|
||||||
|
if (Optional.ofNullable(w.getPids()).isPresent())
|
||||||
|
return w
|
||||||
|
.getPids()
|
||||||
|
.stream()
|
||||||
|
.map(pid -> getRelation(w.getOrcid(), pid))
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.iterator();
|
||||||
|
List<Relation> ret = new ArrayList<>();
|
||||||
|
return ret.iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Relation getRelation(String orcid, eu.dnetlib.dhp.collection.orcid.model.Pid pid) {
|
||||||
|
String target;
|
||||||
|
String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid);
|
||||||
|
switch (pid.getSchema()) {
|
||||||
|
case "doi":
|
||||||
|
target = DOI_PREFIX
|
||||||
|
+ IdentifierFactory
|
||||||
|
.md5(PidCleaner.normalizePidValue(PidType.doi.toString(), pid.getValue()));
|
||||||
|
break;
|
||||||
|
case "pmid":
|
||||||
|
target = PMID_PREFIX
|
||||||
|
+ IdentifierFactory
|
||||||
|
.md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), pid.getValue()));
|
||||||
|
break;
|
||||||
|
case "arxiv":
|
||||||
|
target = ARXIV_PREFIX
|
||||||
|
+ IdentifierFactory
|
||||||
|
.md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), pid.getValue()));
|
||||||
|
break;
|
||||||
|
case "pmcid":
|
||||||
|
target = PMCID_PREFIX
|
||||||
|
+ IdentifierFactory
|
||||||
|
.md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), pid.getValue()));
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return OafMapperUtils
|
||||||
|
.getRelation(
|
||||||
|
source, target, ModelConstants.RESULT_PERSON_RELTYPE,
|
||||||
|
ModelConstants.RESULT_PERSON_SUBRELTYPE,
|
||||||
|
ModelConstants.RESULT_PERSON_HASAUTHORED,
|
||||||
|
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||||
|
OafMapperUtils
|
||||||
|
.dataInfo(
|
||||||
|
false, null, false, false,
|
||||||
|
OafMapperUtils
|
||||||
|
.qualifier(
|
||||||
|
ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||||
|
"0.91"),
|
||||||
|
null);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.collection.orcid.model.Work;
|
||||||
|
|
||||||
|
public class WorkList implements Serializable {
|
||||||
|
private ArrayList<Work> workArrayList;
|
||||||
|
|
||||||
|
public ArrayList<Work> getWorkArrayList() {
|
||||||
|
return workArrayList;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setWorkArrayList(ArrayList<Work> workArrayList) {
|
||||||
|
this.workArrayList = workArrayList;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkList() {
|
||||||
|
workArrayList = new ArrayList<>();
|
||||||
|
}
|
||||||
|
}
|
|
@ -5,7 +5,6 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
@ -30,7 +29,6 @@ import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner;
|
import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner;
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
||||||
import io.netty.util.Constant;
|
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -20,6 +20,9 @@ public class Author extends ORCIDItem {
|
||||||
|
|
||||||
private String lastModifiedDate;
|
private String lastModifiedDate;
|
||||||
|
|
||||||
|
public Author() {
|
||||||
|
}
|
||||||
|
|
||||||
public String getBiography() {
|
public String getBiography() {
|
||||||
return biography;
|
return biography;
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,4 +11,7 @@ public class ORCIDItem {
|
||||||
public void setOrcid(String orcid) {
|
public void setOrcid(String orcid) {
|
||||||
this.orcid = orcid;
|
this.orcid = orcid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ORCIDItem() {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,4 +32,6 @@ public class Work extends ORCIDItem {
|
||||||
pids.add(pid);
|
pids.add(pid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Work() {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,25 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "ip",
|
||||||
|
"paramLongName": "inputPath",
|
||||||
|
"paramDescription": "the zipped opencitations file",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "op",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the working path",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "issm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "the hdfs name node",
|
||||||
|
"paramRequired": false
|
||||||
|
}, {
|
||||||
|
"paramName": "wd",
|
||||||
|
"paramLongName": "workingDir",
|
||||||
|
"paramDescription": "the hdfs name node",
|
||||||
|
"paramRequired": false
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,2 @@
|
||||||
|
inputPath=/data/orcid_2023/tables/
|
||||||
|
outputPath=/user/miriam.baglioni/peopleAS
|
|
@ -0,0 +1,30 @@
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>jobTracker</name>
|
||||||
|
<value>yarnRM</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nameNode</name>
|
||||||
|
<value>hdfs://nameservice1</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.use.system.libpath</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveMetastoreUris</name>
|
||||||
|
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveJdbcUrl</name>
|
||||||
|
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveDbName</name>
|
||||||
|
<value>openaire</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
|
@ -0,0 +1,111 @@
|
||||||
|
<workflow-app name="PersonEntity" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
<parameters>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>inputPath</name>
|
||||||
|
<description>inputPath</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>outputPath</name>
|
||||||
|
<description>the path where to store the actionset</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkDriverMemory</name>
|
||||||
|
<description>memory for driver process</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorMemory</name>
|
||||||
|
<description>memory for individual executor</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorCores</name>
|
||||||
|
<description>number of cores used by single executor</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozieActionShareLibForSpark2</name>
|
||||||
|
<description>oozie action sharelib for spark 2.*</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2ExtraListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||||
|
<description>spark 2.* extra listeners classname</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2SqlQueryExecutionListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||||
|
<description>spark 2.* sql query execution listeners classname</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2YarnHistoryServerAddress</name>
|
||||||
|
<description>spark 2.* yarn history server address</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2EventLogDir</name>
|
||||||
|
<description>spark 2.* event log dir location</description>
|
||||||
|
</property>
|
||||||
|
</parameters>
|
||||||
|
|
||||||
|
<global>
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.job.queuename</name>
|
||||||
|
<value>${queueName}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||||
|
<value>${oozieLauncherQueueName}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>${oozieActionShareLibForSpark2}</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
</configuration>
|
||||||
|
</global>
|
||||||
|
<start to="deleteoutputpath"/>
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
<action name="deleteoutputpath">
|
||||||
|
<fs>
|
||||||
|
<delete path="${outputPath}"/>
|
||||||
|
<mkdir path="${outputPath}"/>
|
||||||
|
<delete path="${workingDir}"/>
|
||||||
|
<mkdir path="${workingDir}"/>
|
||||||
|
</fs>
|
||||||
|
<ok to="atomicactions"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
|
||||||
|
<action name="atomicactions">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Produces the ActionSet for Person entity and relevant relations</name>
|
||||||
|
<class>eu.dnetlib.dhp.actionmanager.personentity.ExtractPerson</class>
|
||||||
|
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=4
|
||||||
|
--executor-memory=4G
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.executor.memoryOverhead=5G
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=15000
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--inputPath</arg><arg>${inputPath}</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
|
<arg>--workingDir</arg><arg>${workingDir}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<end name="End"/>
|
||||||
|
</workflow-app>
|
|
@ -0,0 +1,224 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.person;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob;
|
||||||
|
import eu.dnetlib.dhp.actionmanager.personentity.ExtractPerson;
|
||||||
|
import eu.dnetlib.dhp.collection.orcid.model.Author;
|
||||||
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Person;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||||
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
|
|
||||||
|
public class CreatePersonAS {
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
private static Path workingDir;
|
||||||
|
private static final Logger log = LoggerFactory
|
||||||
|
.getLogger(CreatePersonAS.class);
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void beforeAll() throws IOException {
|
||||||
|
workingDir = Files
|
||||||
|
.createTempDirectory(CreatePersonAS.class.getSimpleName());
|
||||||
|
log.info("using work dir {}", workingDir);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.setAppName(CreatePersonAS.class.getSimpleName());
|
||||||
|
|
||||||
|
conf.setMaster("local[*]");
|
||||||
|
conf.set("spark.driver.host", "localhost");
|
||||||
|
conf.set("hive.metastore.local", "true");
|
||||||
|
conf.set("spark.ui.enabled", "false");
|
||||||
|
conf.set("spark.sql.codegen.wholeStage", "false");
|
||||||
|
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||||
|
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||||
|
|
||||||
|
spark = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(CreatePersonAS.class.getSimpleName())
|
||||||
|
.config(conf)
|
||||||
|
.getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void afterAll() throws IOException {
|
||||||
|
FileUtils.deleteDirectory(workingDir.toFile());
|
||||||
|
spark.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testAuthors() throws Exception {
|
||||||
|
|
||||||
|
String inputPath = getClass()
|
||||||
|
.getResource(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/person/")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
// spark
|
||||||
|
// .read()
|
||||||
|
// .parquet(inputPath + "Authors")
|
||||||
|
// .as(Encoders.bean(Author.class))
|
||||||
|
// .filter((FilterFunction<Author>) a -> Optional.ofNullable(a.getOtherNames()).isPresent() &&
|
||||||
|
// Optional.ofNullable(a.getBiography()).isPresent())
|
||||||
|
// .write()
|
||||||
|
// .mode(SaveMode.Overwrite)
|
||||||
|
// .parquet(workingDir.toString() + "AuthorsSubset");
|
||||||
|
|
||||||
|
ExtractPerson
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-isSparkSessionManaged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-inputPath",
|
||||||
|
inputPath,
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/actionSet1",
|
||||||
|
"-workingDir",
|
||||||
|
workingDir.toString() + "/working"
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Relation> relations = sc
|
||||||
|
.sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class)
|
||||||
|
.filter(v -> "eu.dnetlib.dhp.schema.oaf.Relation".equalsIgnoreCase(v._1().toString()))
|
||||||
|
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||||
|
.map(aa -> ((Relation) aa.getPayload()));
|
||||||
|
//
|
||||||
|
JavaRDD<Person> people = sc
|
||||||
|
.sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class)
|
||||||
|
.filter(v -> "eu.dnetlib.dhp.schema.oaf.Person".equalsIgnoreCase(v._1().toString()))
|
||||||
|
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||||
|
.map(aa -> ((Person) aa.getPayload()));
|
||||||
|
//
|
||||||
|
Assertions.assertEquals(7, people.count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"Paulo",
|
||||||
|
people
|
||||||
|
.filter(
|
||||||
|
p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034")))
|
||||||
|
.first()
|
||||||
|
.getGivenName());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"Tavares",
|
||||||
|
people
|
||||||
|
.filter(
|
||||||
|
p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034")))
|
||||||
|
.first()
|
||||||
|
.getFamilyName());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
4,
|
||||||
|
people
|
||||||
|
.filter(
|
||||||
|
p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034")))
|
||||||
|
.first()
|
||||||
|
.getAlternativeNames()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
4,
|
||||||
|
people
|
||||||
|
.filter(
|
||||||
|
p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034")))
|
||||||
|
.first()
|
||||||
|
.getPid()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertTrue(
|
||||||
|
people
|
||||||
|
.filter(
|
||||||
|
p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034")))
|
||||||
|
.first()
|
||||||
|
.getPid()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(
|
||||||
|
p -> p.getSchema().equalsIgnoreCase("Scopus Author ID")
|
||||||
|
&& p.getValue().equalsIgnoreCase("15119405200")));
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
16,
|
||||||
|
relations
|
||||||
|
.filter(r -> r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED))
|
||||||
|
.count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
14,
|
||||||
|
relations
|
||||||
|
.filter(r -> r.getRelClass().equalsIgnoreCase(ModelConstants.PERSON_PERSON_HASCOAUTHORED))
|
||||||
|
.count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
3,
|
||||||
|
relations
|
||||||
|
.filter(
|
||||||
|
r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619"))
|
||||||
|
&& r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED))
|
||||||
|
.count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
2,
|
||||||
|
relations
|
||||||
|
.filter(
|
||||||
|
r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619"))
|
||||||
|
&& r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED)
|
||||||
|
&& r.getTarget().startsWith("50|doi"))
|
||||||
|
.count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1,
|
||||||
|
relations
|
||||||
|
.filter(
|
||||||
|
r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619"))
|
||||||
|
&& r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED)
|
||||||
|
&& r.getTarget().startsWith("50|arXiv"))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1,
|
||||||
|
relations
|
||||||
|
.filter(
|
||||||
|
r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619"))
|
||||||
|
&& r.getRelClass().equalsIgnoreCase(ModelConstants.PERSON_PERSON_HASCOAUTHORED))
|
||||||
|
.count());
|
||||||
|
Assertions.assertEquals(33, relations.count());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,10 @@
|
||||||
|
{"orcid":"0000-0001-6291-9619","title":"A Visible Light Driven Photoelectrochemical Chloramphenicol Aptasensor Based on a Gold Nanoparticle-Functionalized 3D Flower-like MoS<sub>2</sub>/TiO<sub>2</sub> Heterostructure","pids":[{"value":"10.1021/acs.langmuir.1c02956","schema":"doi"},{"value":"2-s2.0-85124885368","schema":"eid"},{"value":"15205827 07437463","schema":"issn"}]}
|
||||||
|
{"orcid":"0000-0002-3210-3034","title":"A Visible Light Driven Photoelectrochemical Chloramphenicol Aptasensor Based on a Gold Nanoparticle-Functionalized 3D Flower-like MoS<sub>2</sub>/TiO<sub>2</sub> Heterostructure","pids":[{"value":"10.1021/acs.langmuir.1c02956","schema":"doi"},{"value":"2-s2.0-85124885368","schema":"eid"},{"value":"15205827 07437463","schema":"issn"}]}
|
||||||
|
{"orcid":"0000-0001-6291-9619","title":"Study of High-Transverse-Momentum Higgs Boson Production in Association with a Vector Boson in the <math display=\"inline\"><mrow><mi>q</mi><mi>q</mi><mi>b</mi><mi>b</mi></mrow></math> Final State with the ATLAS Detector","pids":[{"value":"2736741","schema":"other-id"},{"value":"10.1103/PhysRevLett.132.131802","schema":"doi"},{"value":"2312.07605","schema":"arxiv"}]}
|
||||||
|
{"orcid":"0000-0002-3210-3034","title":"Study of High-Transverse-Momentum Higgs Boson Production in Association with a Vector Boson in the <math display=\"inline\"><mrow><mi>q</mi><mi>q</mi><mi>b</mi><mi>b</mi></mrow></math> Final State with the ATLAS Detector","pids":[{"value":"2736741","schema":"other-id"},{"value":"10.1103/PhysRevLett.132.131802","schema":"doi"},{"value":"2312.07605","schema":"arxiv"}]}
|
||||||
|
{"orcid":"0000-0002-9030-7609","title":"Search for supersymmetry in a final state containing two photons and missing transverse momentum in √s = 13 TeV pp collisions at the LHC using the ATLAS detector","pids":[{"value":"10.1140/epjc/s10052-016-4344-x","schema":"doi"},{"value":"2-s2.0-84988710988","schema":"eid"},{"value":"14346052 14346044","schema":"issn"}]}
|
||||||
|
{"orcid":"0000-0003-2552-9691","title":"Search for supersymmetry in a final state containing two photons and missing transverse momentum in $\\sqrt{s}$ = 13 TeV $pp$ collisions at the LHC using the ATLAS detector","pids":[{"value":"1473744","schema":"other-id"},{"value":"10.1140/epjc/s10052-016-4344-x","schema":"doi"},{"value":"1606.09150","schema":"arxiv"}]}
|
||||||
|
{"orcid":"0000-0003-0305-8980","title":"Search for supersymmetry in a final state containing two photons and missing transverse momentum in √s = 13 TeV pp collisions at the LHC using the ATLAS detector","pids":[{"value":"10.1140/epjc/s10052-016-4344-x","schema":"doi"},{"value":"2-s2.0-84988710988","schema":"eid"}]}
|
||||||
|
{"orcid":"0000-0002-9030-7609","title":"Measurement of the energy response of the ATLAS calorimeter to charged pions from $W^{\\pm }\\rightarrow \\tau ^{\\pm }(\\rightarrow \\pi ^{\\pm }\\nu _{\\tau })\\nu _{\\tau }$ events in Run 2 data","pids":[{"value":"1909507","schema":"other-id"},{"value":"10.1140/epjc/s10052-022-10117-2","schema":"doi"},{"value":"2108.09043","schema":"arxiv"}]}
|
||||||
|
{"orcid":"0000-0003-2629-4046","title":"Measurement of the energy response of the ATLAS calorimeter to charged pions from $W^{\\pm }\\rightarrow \\tau ^{\\pm }(\\rightarrow \\pi ^{\\pm }\\nu _{\\tau })\\nu _{\\tau }$ events in Run 2 data","pids":[{"value":"1909507","schema":"other-id"},{"value":"10.1140/epjc/s10052-022-10117-2","schema":"doi"},{"value":"2108.09043","schema":"arxiv"}]}
|
||||||
|
{"orcid":"0000-0001-8582-8912","title":"Measurement of the energy response of the ATLAS calorimeter to charged pions from $W^{\\pm }\\rightarrow \\tau ^{\\pm }(\\rightarrow \\pi ^{\\pm }\\nu _{\\tau })\\nu _{\\tau }$ events in Run 2 data","pids":[{"value":"1909507","schema":"other-id"},{"value":"10.1140/epjc/s10052-022-10117-2","schema":"doi"},{"value":"2108.09043","schema":"arxiv"}]}
|
19
pom.xml
19
pom.xml
|
@ -969,6 +969,25 @@
|
||||||
|
|
||||||
<!-- Build with scala 12 and Spark 3.4 -->
|
<!-- Build with scala 12 and Spark 3.4 -->
|
||||||
<profiles>
|
<profiles>
|
||||||
|
<!-- Activate ARM-compatible snappy dependency on new Silicon Macs -->
|
||||||
|
<profile>
|
||||||
|
<id>arm-silicon-mac</id>
|
||||||
|
<activation>
|
||||||
|
<os>
|
||||||
|
<arch>aarch64</arch>
|
||||||
|
<family>mac</family>
|
||||||
|
</os>
|
||||||
|
</activation>
|
||||||
|
<dependencyManagement>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.xerial.snappy</groupId>
|
||||||
|
<artifactId>snappy-java</artifactId>
|
||||||
|
<version>1.1.8.4</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</dependencyManagement>
|
||||||
|
</profile>
|
||||||
<profile>
|
<profile>
|
||||||
<id>spark-34</id>
|
<id>spark-34</id>
|
||||||
<properties>
|
<properties>
|
||||||
|
|
Loading…
Reference in New Issue