forked from D-Net/dnet-hadoop
[Person]first implementation of the action set to include Person entity in the graph starting from the orcid data
This commit is contained in:
parent
67ff783e65
commit
ddd20e7f8e
|
@ -1,19 +1,19 @@
|
|||
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
|
||||
public class Coauthors implements Serializable {
|
||||
private ArrayList<Relation> coauthors;
|
||||
private ArrayList<Relation> coauthors;
|
||||
|
||||
public ArrayList<Relation> getCoauthors() {
|
||||
return coauthors;
|
||||
}
|
||||
public ArrayList<Relation> getCoauthors() {
|
||||
return coauthors;
|
||||
}
|
||||
|
||||
public void setCoauthors(ArrayList<Relation> coauthors) {
|
||||
this.coauthors = coauthors;
|
||||
}
|
||||
public void setCoauthors(ArrayList<Relation> coauthors) {
|
||||
this.coauthors = coauthors;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,40 +1,40 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Person;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import scala.Tuple2;
|
||||
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class Couples implements Serializable {
|
||||
Person p ;
|
||||
Relation r;
|
||||
Person p;
|
||||
Relation r;
|
||||
|
||||
public Couples() {
|
||||
public Couples() {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public Person getP() {
|
||||
return p;
|
||||
}
|
||||
public Person getP() {
|
||||
return p;
|
||||
}
|
||||
|
||||
public void setP(Person p) {
|
||||
this.p = p;
|
||||
}
|
||||
public void setP(Person p) {
|
||||
this.p = p;
|
||||
}
|
||||
|
||||
public Relation getR() {
|
||||
return r;
|
||||
}
|
||||
public Relation getR() {
|
||||
return r;
|
||||
}
|
||||
|
||||
public void setR(Relation r) {
|
||||
this.r = r;
|
||||
}
|
||||
public void setR(Relation r) {
|
||||
this.r = r;
|
||||
}
|
||||
|
||||
public static <Tuples> Couples newInstance(Tuple2<Person, Relation> couple){
|
||||
Couples c = new Couples();
|
||||
c.p = couple._1();
|
||||
c.r = couple._2();
|
||||
return c;
|
||||
}
|
||||
public static <Tuples> Couples newInstance(Tuple2<Person, Relation> couple) {
|
||||
Couples c = new Couples();
|
||||
c.p = couple._1();
|
||||
c.r = couple._2();
|
||||
return c;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,27 +1,18 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
import static org.apache.spark.sql.functions.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.dhp.actionmanager.Constants;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.collection.orcid.model.Author;
|
||||
import eu.dnetlib.dhp.collection.orcid.model.Employment;
|
||||
import eu.dnetlib.dhp.collection.orcid.model.Work;
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.Person;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
import eu.dnetlib.dhp.schema.oaf.Pid;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.compress.BZip2Codec;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||
import org.apache.spark.SparkConf;
|
||||
|
@ -31,301 +22,399 @@ import org.jetbrains.annotations.NotNull;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.spark_project.jetty.util.StringUtil;
|
||||
import scala.Tuple2;
|
||||
import static org.apache.spark.sql.functions.*;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.actionmanager.Constants;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.collection.orcid.model.Author;
|
||||
import eu.dnetlib.dhp.collection.orcid.model.Employment;
|
||||
import eu.dnetlib.dhp.collection.orcid.model.Work;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.Person;
|
||||
import eu.dnetlib.dhp.schema.oaf.Pid;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class ExtractPerson implements Serializable {
|
||||
private static final Logger log = LoggerFactory.getLogger(ExtractPerson.class);
|
||||
private static final Logger log = LoggerFactory.getLogger(ExtractPerson.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
private static final String OPENAIRE_PREFIX = "openaire____";
|
||||
private static final String SEPARATOR = "::";
|
||||
private static final String orcidKey = "10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5(ModelConstants.ORCID.toLowerCase());
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
private static final String OPENAIRE_PREFIX = "openaire____";
|
||||
private static final String SEPARATOR = "::";
|
||||
private static final String orcidKey = "10|" + OPENAIRE_PREFIX + SEPARATOR
|
||||
+ DHPUtils.md5(ModelConstants.ORCID.toLowerCase());
|
||||
|
||||
private static final String DOI_PREFIX = "50|doi_________::";
|
||||
private static final String DOI_PREFIX = "50|doi_________::";
|
||||
|
||||
private static final String PMID_PREFIX = "50|pmid________::";
|
||||
private static final String ARXIV_PREFIX = "50|arXiv_______::";
|
||||
private static final String PMID_PREFIX = "50|pmid________::";
|
||||
private static final String ARXIV_PREFIX = "50|arXiv_______::";
|
||||
|
||||
private static final String PMCID_PREFIX = "50|pmcid_______::";
|
||||
private static final String ROR_PREFIX = "20|ror_________::";
|
||||
private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______";
|
||||
public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid";
|
||||
public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID";
|
||||
private static final String PMCID_PREFIX = "50|pmcid_______::";
|
||||
private static final String ROR_PREFIX = "20|ror_________::";
|
||||
private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______";
|
||||
public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid";
|
||||
public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID";
|
||||
|
||||
public static void main(final String[] args) throws IOException, ParseException {
|
||||
|
||||
public static void main(final String[] args) throws IOException, ParseException {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
Objects
|
||||
.requireNonNull(
|
||||
ExtractPerson.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json"))));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
Objects
|
||||
.requireNonNull(
|
||||
ExtractPerson.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json"))));
|
||||
parser.parseArgument(args);
|
||||
|
||||
parser.parseArgument(args);
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
final String inputPath = parser.get("inputPath");
|
||||
log.info("inputPath {}", inputPath);
|
||||
|
||||
final String inputPath = parser.get("inputPath");
|
||||
log.info("inputPath {}", inputPath);
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath {}", outputPath);
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath {}", outputPath);
|
||||
final String workingDir = parser.get("workingDir");
|
||||
log.info("workingDir {}", workingDir);
|
||||
|
||||
final String workingDir = parser.get("workingDir");
|
||||
log.info("workingDir {}", workingDir);
|
||||
SparkConf conf = new SparkConf();
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
|
||||
createActionSet(spark, inputPath, outputPath, workingDir);
|
||||
});
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> createActionSet(spark, inputPath, outputPath, workingDir));
|
||||
}
|
||||
|
||||
}
|
||||
private static void createActionSet(SparkSession spark, String inputPath, String outputPath, String workingDir) {
|
||||
|
||||
private static void createActionSet(SparkSession spark, String inputPath, String outputPath, String workingDir) {
|
||||
Dataset<Author> authors = spark
|
||||
.read()
|
||||
.parquet(inputPath + "Authors")
|
||||
.as(Encoders.bean(Author.class));
|
||||
|
||||
Dataset<Author> authors = spark
|
||||
.read()
|
||||
.parquet(inputPath + "Authors").as(Encoders.bean(Author.class));
|
||||
Dataset<Work> works = spark
|
||||
.read()
|
||||
.parquet(inputPath + "Works")
|
||||
.as(Encoders.bean(Work.class))
|
||||
.filter(
|
||||
(FilterFunction<Work>) w -> Optional.ofNullable(w.getPids()).isPresent() &&
|
||||
w
|
||||
.getPids()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
p -> p.getSchema().equalsIgnoreCase("doi") ||
|
||||
p.getSchema().equalsIgnoreCase("pmc") ||
|
||||
p.getSchema().equalsIgnoreCase("pmid") ||
|
||||
p.getSchema().equalsIgnoreCase("arxiv")));
|
||||
|
||||
Dataset<Work> works = spark
|
||||
.read()
|
||||
.parquet(inputPath + "Works")
|
||||
.as(Encoders.bean(Work.class))
|
||||
.filter((FilterFunction<Work>) w -> Optional.ofNullable(w.getPids()).isPresent() &&
|
||||
w.getPids().stream().anyMatch(p->p.getSchema().equalsIgnoreCase("doi") ||
|
||||
p.getSchema().equalsIgnoreCase("pmc") ||
|
||||
p.getSchema().equalsIgnoreCase("pmid") ||
|
||||
p.getSchema().equalsIgnoreCase("arxiv")));
|
||||
Dataset<Employment> employmentDataset = spark
|
||||
.read()
|
||||
.parquet(inputPath + "Employments")
|
||||
.as(Encoders.bean(Employment.class));
|
||||
|
||||
Dataset<Employment> employmentDataset = spark
|
||||
.read()
|
||||
.parquet(inputPath + "Employments")
|
||||
.as(Encoders.bean(Employment.class));
|
||||
Dataset<Author> peopleToMap = authors
|
||||
.joinWith(works, authors.col("orcid").equalTo(works.col("orcid")))
|
||||
.map((MapFunction<Tuple2<Author, Work>, Author>) t2 -> t2._1(), Encoders.bean(Author.class))
|
||||
.groupByKey((MapFunction<Author, String>) a -> a.getOrcid(), Encoders.STRING())
|
||||
.mapGroups((MapGroupsFunction<String, Author, Author>) (k, it) -> it.next(), Encoders.bean(Author.class));
|
||||
|
||||
Dataset<Employment> employment = employmentDataset
|
||||
.joinWith(peopleToMap, employmentDataset.col("orcid").equalTo(peopleToMap.col("orcid")))
|
||||
.map((MapFunction<Tuple2<Employment, Author>, Employment>) t2 -> t2._1(), Encoders.bean(Employment.class));
|
||||
|
||||
Dataset<Author> peopleToMap = authors.joinWith(works, authors.col("orcid").equalTo(works.col("orcid")))
|
||||
.map((MapFunction<Tuple2<Author, Work>, Author>) t2 -> t2._1(), Encoders.bean(Author.class))
|
||||
.groupByKey((MapFunction<Author, String>) a -> a.getOrcid(), Encoders.STRING())
|
||||
.mapGroups((MapGroupsFunction<String, Author, Author>) (k, it) -> it.next(), Encoders.bean(Author.class));
|
||||
Dataset<Person> people;
|
||||
peopleToMap.map((MapFunction<Author, Person>) op -> {
|
||||
Person person = new Person();
|
||||
person.setId(DHPUtils.generateIdentifier(op.getOrcid(), PERSON_PREFIX));
|
||||
person
|
||||
.setBiography(
|
||||
Optional
|
||||
.ofNullable(op.getBiography())
|
||||
|
||||
.orElse(""));
|
||||
KeyValue kv = OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS);
|
||||
kv.setDataInfo(null);
|
||||
person.setCollectedfrom(Arrays.asList(kv));
|
||||
person
|
||||
.setAlternativeNames(
|
||||
Optional
|
||||
.ofNullable(op.getOtherNames())
|
||||
|
||||
Dataset<Employment> employment = employmentDataset.joinWith(peopleToMap, employmentDataset.col("orcid").equalTo(peopleToMap.col("orcid")))
|
||||
.map((MapFunction<Tuple2<Employment, Author>, Employment>) t2 -> t2._1(), Encoders.bean(Employment.class));
|
||||
.orElse(new ArrayList<>()));
|
||||
person
|
||||
.setFamilyName(
|
||||
Optional
|
||||
.ofNullable(op.getFamilyName())
|
||||
|
||||
peopleToMap.show(false);
|
||||
.orElse(""));
|
||||
person
|
||||
.setGivenName(
|
||||
Optional
|
||||
.ofNullable(op.getGivenName())
|
||||
|
||||
Dataset<Person> people;
|
||||
people = peopleToMap.map((MapFunction<Author, Person>) op -> {
|
||||
Person person = new Person();
|
||||
person.setId(DHPUtils.generateIdentifier(op.getOrcid(), PERSON_PREFIX));
|
||||
person.setBiography(Optional.ofNullable(op.getBiography())
|
||||
.orElse(""));
|
||||
person
|
||||
.setPid(
|
||||
Optional
|
||||
.ofNullable(op.getOtherPids())
|
||||
.map(
|
||||
v -> v
|
||||
.stream()
|
||||
.map(p -> Pid.newInstance(p.getSchema(), p.getValue()))
|
||||
.collect(Collectors.toList()))
|
||||
.orElse(new ArrayList<>()));
|
||||
person.getPid().add(Pid.newInstance(ModelConstants.ORCID, op.getOrcid()));
|
||||
person.setDateofcollection(op.getLastModifiedDate());
|
||||
person.setOriginalId(Arrays.asList(op.getOrcid()));
|
||||
return person;
|
||||
}, Encoders.bean(Person.class))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingDir + "/people");
|
||||
|
||||
.orElse(""));
|
||||
KeyValue kv = OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS);
|
||||
kv.setDataInfo(null);
|
||||
person.setCollectedfrom(Arrays.asList(kv));
|
||||
person.setAlternativeNames(Optional.ofNullable(op.getOtherNames())
|
||||
works
|
||||
.flatMap(
|
||||
(FlatMapFunction<Work, Relation>) ExtractPerson::getAuthorshipRelationIterator,
|
||||
Encoders.bean(Relation.class))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingDir + "/authorship");
|
||||
|
||||
.orElse(new ArrayList<>()));
|
||||
person.setFamilyName(Optional.ofNullable(op.getFamilyName())
|
||||
works
|
||||
.flatMap((FlatMapFunction<Work, Tuple2<String, String>>) w -> {
|
||||
List<Tuple2<String, String>> lista = new ArrayList<>();
|
||||
w.getPids().stream().forEach(p -> {
|
||||
if (p.getSchema().equalsIgnoreCase("doi") || p.getSchema().equalsIgnoreCase("pmc")
|
||||
|| p.getSchema().equalsIgnoreCase("pmid") || p.getSchema().equalsIgnoreCase("arxiv"))
|
||||
lista.add(new Tuple2<>(p.getValue(), w.getOrcid()));
|
||||
});
|
||||
return lista.iterator();
|
||||
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
|
||||
.groupByKey((MapFunction<Tuple2<String, String>, String>) Tuple2::_1, Encoders.STRING())
|
||||
.mapGroups(
|
||||
(MapGroupsFunction<String, Tuple2<String, String>, Coauthors>) (k, it) -> extractCoAuthors(it),
|
||||
Encoders.bean(Coauthors.class))
|
||||
.flatMap(
|
||||
(FlatMapFunction<Coauthors, Relation>) c -> c.getCoauthors().iterator(), Encoders.bean(Relation.class))
|
||||
.groupByKey((MapFunction<Relation, String>) r -> r.getSource() + r.getTarget(), Encoders.STRING())
|
||||
.mapGroups(
|
||||
(MapGroupsFunction<String, Relation, Relation>) (k, it) -> it.next(), Encoders.bean(Relation.class))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingDir + "/coauthorship");
|
||||
|
||||
.orElse(""));
|
||||
person.setGivenName(Optional.ofNullable(op.getGivenName())
|
||||
employment
|
||||
.filter((FilterFunction<Employment>) e -> Optional.ofNullable(e.getAffiliationId()).isPresent())
|
||||
.filter((FilterFunction<Employment>) e -> e.getAffiliationId().getSchema().equalsIgnoreCase("ror"))
|
||||
.map(
|
||||
(MapFunction<Employment, Relation>) ExtractPerson::getAffiliationRelation,
|
||||
Encoders.bean(Relation.class))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingDir + "/affiliation");
|
||||
|
||||
.orElse(""));
|
||||
person.setPid(Optional.ofNullable(op.getOtherPids())
|
||||
.map(v -> v.stream().map(p -> Pid.newInstance(p.getSchema(), p.getValue())).collect(Collectors.toList()))
|
||||
.orElse(new ArrayList<>())
|
||||
);
|
||||
person.getPid().add(Pid.newInstance(ModelConstants.ORCID, op.getOrcid()));
|
||||
person.setDateofcollection(op.getLastModifiedDate());
|
||||
person.setOriginalId(Arrays.asList(op.getOrcid()));
|
||||
return person;
|
||||
}, Encoders.bean(Person.class));
|
||||
spark
|
||||
.read()
|
||||
.json(workingDir + "/people")
|
||||
.as(Encoders.bean(Person.class))
|
||||
.toJavaRDD()
|
||||
.map(p -> new AtomicAction(p.getClass(), p))
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/authorship").toJavaRDD().map(r -> new AtomicAction(r.getClass(), r)))
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/coauthorship")
|
||||
.toJavaRDD()
|
||||
.map(r -> new AtomicAction(r.getClass(), r)))
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/affiliation")
|
||||
.toJavaRDD()
|
||||
.map(r -> new AtomicAction(r.getClass(), r)))
|
||||
.mapToPair(
|
||||
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
||||
.saveAsHadoopFile(
|
||||
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class);
|
||||
}
|
||||
|
||||
private static Dataset<Relation> getRelations(SparkSession spark, String path) {
|
||||
return spark.read().json(path).as(Encoders.bean(Relation.class));
|
||||
}
|
||||
|
||||
people.show(false);
|
||||
private static Coauthors extractCoAuthors(Iterator<Tuple2<String, String>> it) {
|
||||
Coauthors coauth = new Coauthors();
|
||||
ArrayList<Relation> ret = new ArrayList<>();
|
||||
List<String> coauthors = new ArrayList<>();
|
||||
while (it.hasNext())
|
||||
coauthors.add(it.next()._2());
|
||||
for (int i = 0; i < coauthors.size() - 1; i++)
|
||||
for (int j = i + 1; j < coauthors.size(); j++)
|
||||
ret.addAll(getCoAuthorshipRelations(coauthors.get(i), coauthors.get(j)));
|
||||
|
||||
coauth.setCoauthors(ret);
|
||||
|
||||
Dataset<Relation> authorship;
|
||||
authorship = works
|
||||
.flatMap((FlatMapFunction<Work, Relation>) ExtractPerson::getAuthorshipRelationIterator
|
||||
, Encoders.bean(Relation.class));
|
||||
return coauth;
|
||||
}
|
||||
|
||||
private static Relation getAffiliationRelation(Employment row) {
|
||||
String source = PERSON_PREFIX + IdentifierFactory.md5(row.getOrcid());
|
||||
String target = ROR_PREFIX
|
||||
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAffiliationId().getValue()));
|
||||
List<KeyValue> properties = new ArrayList<>();
|
||||
|
||||
authorship.show(false);
|
||||
Relation relation = OafMapperUtils
|
||||
.getRelation(
|
||||
source, target, ModelConstants.ORG_PERSON_RELTYPE, ModelConstants.ORG_PERSON_SUBRELTYPE,
|
||||
ModelConstants.ORG_PERSON_PARTICIPATES,
|
||||
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||
OafMapperUtils
|
||||
.dataInfo(
|
||||
false, null, false, false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
"0.91"),
|
||||
null);
|
||||
|
||||
if (Optional.ofNullable(row.getStartDate()).isPresent() && StringUtil.isNotBlank(row.getStartDate())) {
|
||||
KeyValue kv = new KeyValue();
|
||||
kv.setKey("startDate");
|
||||
kv.setValue(row.getStartDate());
|
||||
properties.add(kv);
|
||||
}
|
||||
if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtil.isNotBlank(row.getEndDate())) {
|
||||
KeyValue kv = new KeyValue();
|
||||
kv.setKey("endDate");
|
||||
kv.setValue(row.getEndDate());
|
||||
properties.add(kv);
|
||||
}
|
||||
|
||||
Dataset<Relation> coauthorship = works
|
||||
.flatMap((FlatMapFunction<Work, Tuple2<String, String>>) w -> {
|
||||
List<Tuple2<String, String>> lista = new ArrayList<>();
|
||||
w.getPids().stream().forEach(p -> {
|
||||
if (p.getSchema().equalsIgnoreCase("doi") || p.getSchema().equalsIgnoreCase("pmc") || p.getSchema().equalsIgnoreCase("pmid") || p.getSchema().equalsIgnoreCase("arxiv"))
|
||||
lista.add(new Tuple2<>(p.getValue(), w.getOrcid()));
|
||||
});
|
||||
return lista.iterator();
|
||||
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
|
||||
.groupByKey((MapFunction<Tuple2<String, String>, String>) Tuple2::_1, Encoders.STRING())
|
||||
.mapGroups((MapGroupsFunction<String, Tuple2<String, String>, Coauthors>) (k, it) ->
|
||||
extractCoAuthors(it), Encoders.bean(Coauthors.class))
|
||||
.flatMap((FlatMapFunction<Coauthors, Relation>) c -> c.getCoauthors().iterator(), Encoders.bean(Relation.class))
|
||||
.groupByKey((MapFunction<Relation, String>) r -> r.getSource() + r.getTarget(), Encoders.STRING())
|
||||
.mapGroups((MapGroupsFunction<String, Relation, Relation>) (k, it) -> it.next(), Encoders.bean(Relation.class));
|
||||
if (properties.size() > 0)
|
||||
relation.setProperties(properties);
|
||||
return relation;
|
||||
|
||||
coauthorship.show(false);
|
||||
Dataset<Relation> affiliation = employment
|
||||
.filter((FilterFunction<Employment>) e -> Optional.ofNullable(e.getAffiliationId()).isPresent())
|
||||
.filter((FilterFunction<Employment>) e -> e.getAffiliationId().getSchema().equalsIgnoreCase("ror"))
|
||||
.map((MapFunction<Employment, Relation>) ExtractPerson::getAffiliationRelation
|
||||
, Encoders.bean(Relation.class));
|
||||
}
|
||||
|
||||
affiliation.show(false);
|
||||
private static Collection<? extends Relation> getCoAuthorshipRelations(String orcid1, String orcid2) {
|
||||
String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid1);
|
||||
String target = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid2);
|
||||
|
||||
people.toJavaRDD()
|
||||
.map(p -> new AtomicAction(p.getClass(), p))
|
||||
.union(authorship.toJavaRDD().map(r-> new AtomicAction(r.getClass(),r)))
|
||||
.union(coauthorship.toJavaRDD().map(r-> new AtomicAction(r.getClass(),r)))
|
||||
.union(affiliation.toJavaRDD().map(r->new AtomicAction(r.getClass(),r)))
|
||||
.mapToPair(
|
||||
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
||||
.saveAsHadoopFile(
|
||||
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);//, GzipCodec.class);
|
||||
}
|
||||
return Arrays
|
||||
.asList(
|
||||
OafMapperUtils
|
||||
.getRelation(
|
||||
source, target, ModelConstants.PERSON_PERSON_RELTYPE,
|
||||
ModelConstants.PERSON_PERSON_SUBRELTYPE,
|
||||
ModelConstants.PERSON_PERSON_HASCOAUTHORED,
|
||||
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||
OafMapperUtils
|
||||
.dataInfo(
|
||||
false, null, false, false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
"0.91"),
|
||||
null),
|
||||
OafMapperUtils
|
||||
.getRelation(
|
||||
target, source, ModelConstants.PERSON_PERSON_RELTYPE,
|
||||
ModelConstants.PERSON_PERSON_SUBRELTYPE,
|
||||
ModelConstants.PERSON_PERSON_HASCOAUTHORED,
|
||||
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||
OafMapperUtils
|
||||
.dataInfo(
|
||||
false, null, false, false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
"0.91"),
|
||||
null));
|
||||
|
||||
private static Coauthors extractCoAuthors(Iterator<Tuple2<String, String>> it) {
|
||||
Coauthors coauth = new Coauthors();
|
||||
ArrayList<Relation> ret = new ArrayList<>();
|
||||
List<String> coauthors = new ArrayList<>();
|
||||
while(it.hasNext())
|
||||
coauthors.add(it.next()._2());
|
||||
for (int i = 0; i < coauthors.size() -1; i++ )
|
||||
for(int j = i + 1; j < coauthors.size(); j++)
|
||||
ret.addAll(getCoAuthorshipRelations(coauthors.get(i), coauthors.get(j)));
|
||||
}
|
||||
|
||||
coauth.setCoauthors(ret);
|
||||
private static @NotNull Iterator<Relation> getAuthorshipRelationIterator(Work w) {
|
||||
|
||||
return coauth;
|
||||
}
|
||||
if (Optional.ofNullable(w.getPids()).isPresent())
|
||||
return w
|
||||
.getPids()
|
||||
.stream()
|
||||
.map(pid -> getRelation(w.getOrcid(), pid))
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toList())
|
||||
.iterator();
|
||||
List<Relation> ret = new ArrayList<>();
|
||||
return ret.iterator();
|
||||
}
|
||||
|
||||
private static Relation getAffiliationRelation(Employment row) {
|
||||
String source = PERSON_PREFIX + IdentifierFactory.md5(row.getOrcid());
|
||||
String target = ROR_PREFIX
|
||||
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAffiliationId().getValue()));
|
||||
List<KeyValue> properties = new ArrayList<>() ;
|
||||
private static Relation getRelation(String orcid, eu.dnetlib.dhp.collection.orcid.model.Pid pid) {
|
||||
String target;
|
||||
String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid);
|
||||
switch (pid.getSchema()) {
|
||||
case "doi":
|
||||
target = DOI_PREFIX
|
||||
+ IdentifierFactory
|
||||
.md5(PidCleaner.normalizePidValue(PidType.doi.toString(), pid.getValue()));
|
||||
break;
|
||||
case "pmid":
|
||||
target = PMID_PREFIX
|
||||
+ IdentifierFactory
|
||||
.md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), pid.getValue()));
|
||||
break;
|
||||
case "arxiv":
|
||||
target = ARXIV_PREFIX
|
||||
+ IdentifierFactory
|
||||
.md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), pid.getValue()));
|
||||
break;
|
||||
case "pmcid":
|
||||
target = PMCID_PREFIX
|
||||
+ IdentifierFactory
|
||||
.md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), pid.getValue()));
|
||||
break;
|
||||
|
||||
Relation relation =
|
||||
OafMapperUtils.getRelation(source, target, ModelConstants.ORG_PERSON_RELTYPE, ModelConstants.ORG_PERSON_SUBRELTYPE, ModelConstants.ORG_PERSON_PARTICIPATES ,
|
||||
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||
OafMapperUtils.dataInfo(false, null, false, false,
|
||||
OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"),
|
||||
null);
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
|
||||
if(Optional.ofNullable(row.getStartDate()).isPresent() && StringUtil.isNotBlank(row.getStartDate())){
|
||||
KeyValue kv = new KeyValue();
|
||||
kv.setKey("startDate");
|
||||
kv.setValue(row.getStartDate());
|
||||
properties.add(kv);
|
||||
}
|
||||
if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtil.isNotBlank(row.getEndDate())) {
|
||||
KeyValue kv = new KeyValue();
|
||||
kv.setKey("endDate");
|
||||
kv.setValue(row.getEndDate());
|
||||
properties.add(kv);
|
||||
}
|
||||
|
||||
if (properties.size() > 0)
|
||||
relation.setProperties(properties);
|
||||
return relation;
|
||||
|
||||
|
||||
}
|
||||
|
||||
private static Collection<? extends Relation> getCoAuthorshipRelations(String orcid1, String orcid2) {
|
||||
String source = PERSON_PREFIX + IdentifierFactory.md5(orcid1);
|
||||
String target = PERSON_PREFIX + IdentifierFactory.md5(orcid2);
|
||||
|
||||
return Arrays.asList(OafMapperUtils.getRelation(source, target,ModelConstants.PERSON_PERSON_RELTYPE,
|
||||
ModelConstants.PERSON_PERSON_SUBRELTYPE,
|
||||
ModelConstants.PERSON_PERSON_HASCOAUTHORED,
|
||||
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||
OafMapperUtils.dataInfo(false, null, false, false,
|
||||
OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"),
|
||||
null),
|
||||
OafMapperUtils.getRelation(target, source,ModelConstants.PERSON_PERSON_RELTYPE,
|
||||
ModelConstants.PERSON_PERSON_SUBRELTYPE,
|
||||
ModelConstants.PERSON_PERSON_HASCOAUTHORED,
|
||||
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||
OafMapperUtils.dataInfo(false, null, false, false,
|
||||
OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"),
|
||||
null));
|
||||
|
||||
}
|
||||
|
||||
private static @NotNull Iterator<Relation> getAuthorshipRelationIterator(Work w) {
|
||||
|
||||
if(Optional.ofNullable(w.getPids()).isPresent())
|
||||
return w.getPids()
|
||||
.stream()
|
||||
.map(pid -> getRelation(w.getOrcid(), pid))
|
||||
.filter(Objects::nonNull).collect(Collectors.toList()).iterator();
|
||||
List<Relation> ret = new ArrayList<>();
|
||||
return ret.iterator();
|
||||
}
|
||||
|
||||
|
||||
private static Relation getRelation(String orcid, eu.dnetlib.dhp.collection.orcid.model.Pid pid){
|
||||
String target ;
|
||||
String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid);
|
||||
switch (pid.getSchema()){
|
||||
case "doi":
|
||||
target = DOI_PREFIX
|
||||
+ IdentifierFactory
|
||||
.md5(PidCleaner.normalizePidValue(PidType.doi.toString(), pid.getValue()));
|
||||
break;
|
||||
case "pmid":
|
||||
target = PMID_PREFIX
|
||||
+ IdentifierFactory
|
||||
.md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), pid.getValue()));
|
||||
break;
|
||||
case "arxiv":
|
||||
target = ARXIV_PREFIX
|
||||
+ IdentifierFactory
|
||||
.md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), pid.getValue()));
|
||||
break;
|
||||
case "pmcid":
|
||||
target = PMCID_PREFIX
|
||||
+ IdentifierFactory
|
||||
.md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), pid.getValue()));
|
||||
break;
|
||||
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
return OafMapperUtils.getRelation(source, target,ModelConstants.RESULT_PERSON_RELTYPE,
|
||||
ModelConstants.RESULT_PERSON_SUBRELTYPE,
|
||||
ModelConstants.RESULT_PERSON_HASAUTHORED,
|
||||
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||
OafMapperUtils.dataInfo(false, null, false, false,
|
||||
OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"),
|
||||
null);
|
||||
}
|
||||
return OafMapperUtils
|
||||
.getRelation(
|
||||
source, target, ModelConstants.RESULT_PERSON_RELTYPE,
|
||||
ModelConstants.RESULT_PERSON_SUBRELTYPE,
|
||||
ModelConstants.RESULT_PERSON_HASAUTHORED,
|
||||
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||
OafMapperUtils
|
||||
.dataInfo(
|
||||
false, null, false, false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
"0.91"),
|
||||
null);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,23 +1,25 @@
|
|||
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||
|
||||
import eu.dnetlib.dhp.collection.orcid.model.Work;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
||||
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
||||
|
||||
import eu.dnetlib.dhp.collection.orcid.model.Work;
|
||||
|
||||
public class WorkList implements Serializable {
|
||||
private ArrayList<Work> workArrayList;
|
||||
private ArrayList<Work> workArrayList;
|
||||
|
||||
public ArrayList<Work> getWorkArrayList() {
|
||||
return workArrayList;
|
||||
}
|
||||
public ArrayList<Work> getWorkArrayList() {
|
||||
return workArrayList;
|
||||
}
|
||||
|
||||
public void setWorkArrayList(ArrayList<Work> workArrayList) {
|
||||
this.workArrayList = workArrayList;
|
||||
}
|
||||
public void setWorkArrayList(ArrayList<Work> workArrayList) {
|
||||
this.workArrayList = workArrayList;
|
||||
}
|
||||
|
||||
public WorkList() {
|
||||
workArrayList = new ArrayList<>();
|
||||
}
|
||||
public WorkList() {
|
||||
workArrayList = new ArrayList<>();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,11 +16,10 @@
|
|||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "the hdfs name node",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "wd",
|
||||
"paramLongName": "workingDir",
|
||||
"paramDescription": "the hdfs name node",
|
||||
"paramRequired": false
|
||||
}
|
||||
}, {
|
||||
"paramName": "wd",
|
||||
"paramLongName": "workingDir",
|
||||
"paramDescription": "the hdfs name node",
|
||||
"paramRequired": false
|
||||
}
|
||||
]
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
inputPath=/data/orcid_2023/tables/
|
||||
outputPath=/user/miriam.baglioni/peopleAS
|
|
@ -0,0 +1,30 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveJdbcUrl</name>
|
||||
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveDbName</name>
|
||||
<value>openaire</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,111 @@
|
|||
<workflow-app name="PersonEntity" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
|
||||
<property>
|
||||
<name>inputPath</name>
|
||||
<description>inputPath</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>outputPath</name>
|
||||
<description>the path where to store the actionset</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozieActionShareLibForSpark2</name>
|
||||
<description>oozie action sharelib for spark 2.*</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
<description>spark 2.* extra listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
<description>spark 2.* sql query execution listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<description>spark 2.* yarn history server address</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<description>spark 2.* event log dir location</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>mapreduce.job.queuename</name>
|
||||
<value>${queueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||
<value>${oozieLauncherQueueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
</global>
|
||||
<start to="deleteoutputpath"/>
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
<action name="deleteoutputpath">
|
||||
<fs>
|
||||
<delete path="${outputPath}"/>
|
||||
<mkdir path="${outputPath}"/>
|
||||
<delete path="${workingDir}"/>
|
||||
<mkdir path="${workingDir}"/>
|
||||
</fs>
|
||||
<ok to="atomicactions"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="atomicactions">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Produces the ActionSet for Person entity and relevant relations</name>
|
||||
<class>eu.dnetlib.dhp.actionmanager.personentity.ExtractPerson</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=4
|
||||
--executor-memory=4G
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.executor.memoryOverhead=5G
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=15000
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${inputPath}</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||
<arg>--workingDir</arg><arg>${workingDir}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -1,15 +1,13 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.person;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob;
|
||||
import eu.dnetlib.dhp.actionmanager.personentity.ExtractPerson;
|
||||
import eu.dnetlib.dhp.collection.orcid.model.Author;
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.spark.SparkConf;
|
||||
|
@ -27,12 +25,18 @@ import org.junit.jupiter.api.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Optional;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob;
|
||||
import eu.dnetlib.dhp.actionmanager.personentity.ExtractPerson;
|
||||
import eu.dnetlib.dhp.collection.orcid.model.Author;
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Person;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
|
||||
public class CreatePersonAS {
|
||||
|
||||
|
@ -57,7 +61,7 @@ public class CreatePersonAS {
|
|||
conf.set("spark.driver.host", "localhost");
|
||||
conf.set("hive.metastore.local", "true");
|
||||
conf.set("spark.ui.enabled", "false");
|
||||
conf.set("spark.sql.codegen.wholeStage","false");
|
||||
conf.set("spark.sql.codegen.wholeStage", "false");
|
||||
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||
|
||||
|
@ -92,7 +96,6 @@ public class CreatePersonAS {
|
|||
// .mode(SaveMode.Overwrite)
|
||||
// .parquet(workingDir.toString() + "AuthorsSubset");
|
||||
|
||||
|
||||
ExtractPerson
|
||||
.main(
|
||||
new String[] {
|
||||
|
@ -102,13 +105,120 @@ public class CreatePersonAS {
|
|||
inputPath,
|
||||
"-outputPath",
|
||||
workingDir.toString() + "/actionSet1",
|
||||
"-workingDir",
|
||||
workingDir.toString() + "/working"
|
||||
"-workingDir",
|
||||
workingDir.toString() + "/working"
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Relation> relations = sc
|
||||
.sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class)
|
||||
.filter(v -> "eu.dnetlib.dhp.schema.oaf.Relation".equalsIgnoreCase(v._1().toString()))
|
||||
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||
.map(aa -> ((Relation) aa.getPayload()));
|
||||
//
|
||||
JavaRDD<Person> people = sc
|
||||
.sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class)
|
||||
.filter(v -> "eu.dnetlib.dhp.schema.oaf.Person".equalsIgnoreCase(v._1().toString()))
|
||||
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||
.map(aa -> ((Person) aa.getPayload()));
|
||||
//
|
||||
Assertions.assertEquals(7, people.count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"Paulo",
|
||||
people
|
||||
.filter(
|
||||
p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034")))
|
||||
.first()
|
||||
.getGivenName());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"Tavares",
|
||||
people
|
||||
.filter(
|
||||
p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034")))
|
||||
.first()
|
||||
.getFamilyName());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
4,
|
||||
people
|
||||
.filter(
|
||||
p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034")))
|
||||
.first()
|
||||
.getAlternativeNames()
|
||||
.size());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
4,
|
||||
people
|
||||
.filter(
|
||||
p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034")))
|
||||
.first()
|
||||
.getPid()
|
||||
.size());
|
||||
Assertions
|
||||
.assertTrue(
|
||||
people
|
||||
.filter(
|
||||
p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034")))
|
||||
.first()
|
||||
.getPid()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
p -> p.getSchema().equalsIgnoreCase("Scopus Author ID")
|
||||
&& p.getValue().equalsIgnoreCase("15119405200")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
16,
|
||||
relations
|
||||
.filter(r -> r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED))
|
||||
.count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
14,
|
||||
relations
|
||||
.filter(r -> r.getRelClass().equalsIgnoreCase(ModelConstants.PERSON_PERSON_HASCOAUTHORED))
|
||||
.count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
3,
|
||||
relations
|
||||
.filter(
|
||||
r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619"))
|
||||
&& r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED))
|
||||
.count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2,
|
||||
relations
|
||||
.filter(
|
||||
r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619"))
|
||||
&& r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED)
|
||||
&& r.getTarget().startsWith("50|doi"))
|
||||
.count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
relations
|
||||
.filter(
|
||||
r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619"))
|
||||
&& r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED)
|
||||
&& r.getTarget().startsWith("50|arXiv"))
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
relations
|
||||
.filter(
|
||||
r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619"))
|
||||
&& r.getRelClass().equalsIgnoreCase(ModelConstants.PERSON_PERSON_HASCOAUTHORED))
|
||||
.count());
|
||||
Assertions.assertEquals(33, relations.count());
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue