forked from D-Net/dnet-hadoop
[Person]First implementation to include Person entity in the graph
This commit is contained in:
parent
d35edac212
commit
67ff783e65
|
@ -0,0 +1,19 @@
|
||||||
|
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
|
||||||
|
public class Coauthors implements Serializable {
|
||||||
|
private ArrayList<Relation> coauthors;
|
||||||
|
|
||||||
|
public ArrayList<Relation> getCoauthors() {
|
||||||
|
return coauthors;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCoauthors(ArrayList<Relation> coauthors) {
|
||||||
|
this.coauthors = coauthors;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,40 @@
|
||||||
|
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Person;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class Couples implements Serializable {
|
||||||
|
Person p ;
|
||||||
|
Relation r;
|
||||||
|
|
||||||
|
public Couples() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public Person getP() {
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setP(Person p) {
|
||||||
|
this.p = p;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Relation getR() {
|
||||||
|
return r;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setR(Relation r) {
|
||||||
|
this.r = r;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <Tuples> Couples newInstance(Tuple2<Person, Relation> couple){
|
||||||
|
Couples c = new Couples();
|
||||||
|
c.p = couple._1();
|
||||||
|
c.r = couple._2();
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,331 @@
|
||||||
|
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.actionmanager.Constants;
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.collection.orcid.model.Author;
|
||||||
|
import eu.dnetlib.dhp.collection.orcid.model.Employment;
|
||||||
|
import eu.dnetlib.dhp.collection.orcid.model.Work;
|
||||||
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Person;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Pid;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
||||||
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
|
import org.apache.commons.cli.ParseException;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
|
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.*;
|
||||||
|
import org.apache.spark.sql.*;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.spark_project.jetty.util.StringUtil;
|
||||||
|
import scala.Tuple2;
|
||||||
|
import static org.apache.spark.sql.functions.*;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
public class ExtractPerson implements Serializable {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(ExtractPerson.class);
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
private static final String OPENAIRE_PREFIX = "openaire____";
|
||||||
|
private static final String SEPARATOR = "::";
|
||||||
|
private static final String orcidKey = "10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5(ModelConstants.ORCID.toLowerCase());
|
||||||
|
|
||||||
|
private static final String DOI_PREFIX = "50|doi_________::";
|
||||||
|
|
||||||
|
private static final String PMID_PREFIX = "50|pmid________::";
|
||||||
|
private static final String ARXIV_PREFIX = "50|arXiv_______::";
|
||||||
|
|
||||||
|
private static final String PMCID_PREFIX = "50|pmcid_______::";
|
||||||
|
private static final String ROR_PREFIX = "20|ror_________::";
|
||||||
|
private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______";
|
||||||
|
public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid";
|
||||||
|
public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID";
|
||||||
|
|
||||||
|
|
||||||
|
public static void main(final String[] args) throws IOException, ParseException {
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
Objects
|
||||||
|
.requireNonNull(
|
||||||
|
ExtractPerson.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json"))));
|
||||||
|
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final String inputPath = parser.get("inputPath");
|
||||||
|
log.info("inputPath {}", inputPath);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath {}", outputPath);
|
||||||
|
|
||||||
|
final String workingDir = parser.get("workingDir");
|
||||||
|
log.info("workingDir {}", workingDir);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
runWithSparkSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> createActionSet(spark, inputPath, outputPath, workingDir));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void createActionSet(SparkSession spark, String inputPath, String outputPath, String workingDir) {
|
||||||
|
|
||||||
|
Dataset<Author> authors = spark
|
||||||
|
.read()
|
||||||
|
.parquet(inputPath + "Authors").as(Encoders.bean(Author.class));
|
||||||
|
|
||||||
|
Dataset<Work> works = spark
|
||||||
|
.read()
|
||||||
|
.parquet(inputPath + "Works")
|
||||||
|
.as(Encoders.bean(Work.class))
|
||||||
|
.filter((FilterFunction<Work>) w -> Optional.ofNullable(w.getPids()).isPresent() &&
|
||||||
|
w.getPids().stream().anyMatch(p->p.getSchema().equalsIgnoreCase("doi") ||
|
||||||
|
p.getSchema().equalsIgnoreCase("pmc") ||
|
||||||
|
p.getSchema().equalsIgnoreCase("pmid") ||
|
||||||
|
p.getSchema().equalsIgnoreCase("arxiv")));
|
||||||
|
|
||||||
|
Dataset<Employment> employmentDataset = spark
|
||||||
|
.read()
|
||||||
|
.parquet(inputPath + "Employments")
|
||||||
|
.as(Encoders.bean(Employment.class));
|
||||||
|
|
||||||
|
|
||||||
|
Dataset<Author> peopleToMap = authors.joinWith(works, authors.col("orcid").equalTo(works.col("orcid")))
|
||||||
|
.map((MapFunction<Tuple2<Author, Work>, Author>) t2 -> t2._1(), Encoders.bean(Author.class))
|
||||||
|
.groupByKey((MapFunction<Author, String>) a -> a.getOrcid(), Encoders.STRING())
|
||||||
|
.mapGroups((MapGroupsFunction<String, Author, Author>) (k, it) -> it.next(), Encoders.bean(Author.class));
|
||||||
|
|
||||||
|
|
||||||
|
Dataset<Employment> employment = employmentDataset.joinWith(peopleToMap, employmentDataset.col("orcid").equalTo(peopleToMap.col("orcid")))
|
||||||
|
.map((MapFunction<Tuple2<Employment, Author>, Employment>) t2 -> t2._1(), Encoders.bean(Employment.class));
|
||||||
|
|
||||||
|
peopleToMap.show(false);
|
||||||
|
|
||||||
|
Dataset<Person> people;
|
||||||
|
people = peopleToMap.map((MapFunction<Author, Person>) op -> {
|
||||||
|
Person person = new Person();
|
||||||
|
person.setId(DHPUtils.generateIdentifier(op.getOrcid(), PERSON_PREFIX));
|
||||||
|
person.setBiography(Optional.ofNullable(op.getBiography())
|
||||||
|
|
||||||
|
.orElse(""));
|
||||||
|
KeyValue kv = OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS);
|
||||||
|
kv.setDataInfo(null);
|
||||||
|
person.setCollectedfrom(Arrays.asList(kv));
|
||||||
|
person.setAlternativeNames(Optional.ofNullable(op.getOtherNames())
|
||||||
|
|
||||||
|
.orElse(new ArrayList<>()));
|
||||||
|
person.setFamilyName(Optional.ofNullable(op.getFamilyName())
|
||||||
|
|
||||||
|
.orElse(""));
|
||||||
|
person.setGivenName(Optional.ofNullable(op.getGivenName())
|
||||||
|
|
||||||
|
.orElse(""));
|
||||||
|
person.setPid(Optional.ofNullable(op.getOtherPids())
|
||||||
|
.map(v -> v.stream().map(p -> Pid.newInstance(p.getSchema(), p.getValue())).collect(Collectors.toList()))
|
||||||
|
.orElse(new ArrayList<>())
|
||||||
|
);
|
||||||
|
person.getPid().add(Pid.newInstance(ModelConstants.ORCID, op.getOrcid()));
|
||||||
|
person.setDateofcollection(op.getLastModifiedDate());
|
||||||
|
person.setOriginalId(Arrays.asList(op.getOrcid()));
|
||||||
|
return person;
|
||||||
|
}, Encoders.bean(Person.class));
|
||||||
|
|
||||||
|
|
||||||
|
people.show(false);
|
||||||
|
|
||||||
|
|
||||||
|
Dataset<Relation> authorship;
|
||||||
|
authorship = works
|
||||||
|
.flatMap((FlatMapFunction<Work, Relation>) ExtractPerson::getAuthorshipRelationIterator
|
||||||
|
, Encoders.bean(Relation.class));
|
||||||
|
|
||||||
|
|
||||||
|
authorship.show(false);
|
||||||
|
|
||||||
|
|
||||||
|
Dataset<Relation> coauthorship = works
|
||||||
|
.flatMap((FlatMapFunction<Work, Tuple2<String, String>>) w -> {
|
||||||
|
List<Tuple2<String, String>> lista = new ArrayList<>();
|
||||||
|
w.getPids().stream().forEach(p -> {
|
||||||
|
if (p.getSchema().equalsIgnoreCase("doi") || p.getSchema().equalsIgnoreCase("pmc") || p.getSchema().equalsIgnoreCase("pmid") || p.getSchema().equalsIgnoreCase("arxiv"))
|
||||||
|
lista.add(new Tuple2<>(p.getValue(), w.getOrcid()));
|
||||||
|
});
|
||||||
|
return lista.iterator();
|
||||||
|
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
|
||||||
|
.groupByKey((MapFunction<Tuple2<String, String>, String>) Tuple2::_1, Encoders.STRING())
|
||||||
|
.mapGroups((MapGroupsFunction<String, Tuple2<String, String>, Coauthors>) (k, it) ->
|
||||||
|
extractCoAuthors(it), Encoders.bean(Coauthors.class))
|
||||||
|
.flatMap((FlatMapFunction<Coauthors, Relation>) c -> c.getCoauthors().iterator(), Encoders.bean(Relation.class))
|
||||||
|
.groupByKey((MapFunction<Relation, String>) r -> r.getSource() + r.getTarget(), Encoders.STRING())
|
||||||
|
.mapGroups((MapGroupsFunction<String, Relation, Relation>) (k, it) -> it.next(), Encoders.bean(Relation.class));
|
||||||
|
|
||||||
|
coauthorship.show(false);
|
||||||
|
Dataset<Relation> affiliation = employment
|
||||||
|
.filter((FilterFunction<Employment>) e -> Optional.ofNullable(e.getAffiliationId()).isPresent())
|
||||||
|
.filter((FilterFunction<Employment>) e -> e.getAffiliationId().getSchema().equalsIgnoreCase("ror"))
|
||||||
|
.map((MapFunction<Employment, Relation>) ExtractPerson::getAffiliationRelation
|
||||||
|
, Encoders.bean(Relation.class));
|
||||||
|
|
||||||
|
affiliation.show(false);
|
||||||
|
|
||||||
|
people.toJavaRDD()
|
||||||
|
.map(p -> new AtomicAction(p.getClass(), p))
|
||||||
|
.union(authorship.toJavaRDD().map(r-> new AtomicAction(r.getClass(),r)))
|
||||||
|
.union(coauthorship.toJavaRDD().map(r-> new AtomicAction(r.getClass(),r)))
|
||||||
|
.union(affiliation.toJavaRDD().map(r->new AtomicAction(r.getClass(),r)))
|
||||||
|
.mapToPair(
|
||||||
|
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||||
|
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
||||||
|
.saveAsHadoopFile(
|
||||||
|
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);//, GzipCodec.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Coauthors extractCoAuthors(Iterator<Tuple2<String, String>> it) {
|
||||||
|
Coauthors coauth = new Coauthors();
|
||||||
|
ArrayList<Relation> ret = new ArrayList<>();
|
||||||
|
List<String> coauthors = new ArrayList<>();
|
||||||
|
while(it.hasNext())
|
||||||
|
coauthors.add(it.next()._2());
|
||||||
|
for (int i = 0; i < coauthors.size() -1; i++ )
|
||||||
|
for(int j = i + 1; j < coauthors.size(); j++)
|
||||||
|
ret.addAll(getCoAuthorshipRelations(coauthors.get(i), coauthors.get(j)));
|
||||||
|
|
||||||
|
coauth.setCoauthors(ret);
|
||||||
|
|
||||||
|
return coauth;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Relation getAffiliationRelation(Employment row) {
|
||||||
|
String source = PERSON_PREFIX + IdentifierFactory.md5(row.getOrcid());
|
||||||
|
String target = ROR_PREFIX
|
||||||
|
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAffiliationId().getValue()));
|
||||||
|
List<KeyValue> properties = new ArrayList<>() ;
|
||||||
|
|
||||||
|
Relation relation =
|
||||||
|
OafMapperUtils.getRelation(source, target, ModelConstants.ORG_PERSON_RELTYPE, ModelConstants.ORG_PERSON_SUBRELTYPE, ModelConstants.ORG_PERSON_PARTICIPATES ,
|
||||||
|
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||||
|
OafMapperUtils.dataInfo(false, null, false, false,
|
||||||
|
OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"),
|
||||||
|
null);
|
||||||
|
|
||||||
|
if(Optional.ofNullable(row.getStartDate()).isPresent() && StringUtil.isNotBlank(row.getStartDate())){
|
||||||
|
KeyValue kv = new KeyValue();
|
||||||
|
kv.setKey("startDate");
|
||||||
|
kv.setValue(row.getStartDate());
|
||||||
|
properties.add(kv);
|
||||||
|
}
|
||||||
|
if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtil.isNotBlank(row.getEndDate())) {
|
||||||
|
KeyValue kv = new KeyValue();
|
||||||
|
kv.setKey("endDate");
|
||||||
|
kv.setValue(row.getEndDate());
|
||||||
|
properties.add(kv);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (properties.size() > 0)
|
||||||
|
relation.setProperties(properties);
|
||||||
|
return relation;
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Collection<? extends Relation> getCoAuthorshipRelations(String orcid1, String orcid2) {
|
||||||
|
String source = PERSON_PREFIX + IdentifierFactory.md5(orcid1);
|
||||||
|
String target = PERSON_PREFIX + IdentifierFactory.md5(orcid2);
|
||||||
|
|
||||||
|
return Arrays.asList(OafMapperUtils.getRelation(source, target,ModelConstants.PERSON_PERSON_RELTYPE,
|
||||||
|
ModelConstants.PERSON_PERSON_SUBRELTYPE,
|
||||||
|
ModelConstants.PERSON_PERSON_HASCOAUTHORED,
|
||||||
|
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||||
|
OafMapperUtils.dataInfo(false, null, false, false,
|
||||||
|
OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"),
|
||||||
|
null),
|
||||||
|
OafMapperUtils.getRelation(target, source,ModelConstants.PERSON_PERSON_RELTYPE,
|
||||||
|
ModelConstants.PERSON_PERSON_SUBRELTYPE,
|
||||||
|
ModelConstants.PERSON_PERSON_HASCOAUTHORED,
|
||||||
|
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||||
|
OafMapperUtils.dataInfo(false, null, false, false,
|
||||||
|
OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"),
|
||||||
|
null));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static @NotNull Iterator<Relation> getAuthorshipRelationIterator(Work w) {
|
||||||
|
|
||||||
|
if(Optional.ofNullable(w.getPids()).isPresent())
|
||||||
|
return w.getPids()
|
||||||
|
.stream()
|
||||||
|
.map(pid -> getRelation(w.getOrcid(), pid))
|
||||||
|
.filter(Objects::nonNull).collect(Collectors.toList()).iterator();
|
||||||
|
List<Relation> ret = new ArrayList<>();
|
||||||
|
return ret.iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static Relation getRelation(String orcid, eu.dnetlib.dhp.collection.orcid.model.Pid pid){
|
||||||
|
String target ;
|
||||||
|
String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid);
|
||||||
|
switch (pid.getSchema()){
|
||||||
|
case "doi":
|
||||||
|
target = DOI_PREFIX
|
||||||
|
+ IdentifierFactory
|
||||||
|
.md5(PidCleaner.normalizePidValue(PidType.doi.toString(), pid.getValue()));
|
||||||
|
break;
|
||||||
|
case "pmid":
|
||||||
|
target = PMID_PREFIX
|
||||||
|
+ IdentifierFactory
|
||||||
|
.md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), pid.getValue()));
|
||||||
|
break;
|
||||||
|
case "arxiv":
|
||||||
|
target = ARXIV_PREFIX
|
||||||
|
+ IdentifierFactory
|
||||||
|
.md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), pid.getValue()));
|
||||||
|
break;
|
||||||
|
case "pmcid":
|
||||||
|
target = PMCID_PREFIX
|
||||||
|
+ IdentifierFactory
|
||||||
|
.md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), pid.getValue()));
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
return OafMapperUtils.getRelation(source, target,ModelConstants.RESULT_PERSON_RELTYPE,
|
||||||
|
ModelConstants.RESULT_PERSON_SUBRELTYPE,
|
||||||
|
ModelConstants.RESULT_PERSON_HASAUTHORED,
|
||||||
|
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||||
|
OafMapperUtils.dataInfo(false, null, false, false,
|
||||||
|
OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"),
|
||||||
|
null);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,23 @@
|
||||||
|
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.collection.orcid.model.Work;
|
||||||
|
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
public class WorkList implements Serializable {
|
||||||
|
private ArrayList<Work> workArrayList;
|
||||||
|
|
||||||
|
public ArrayList<Work> getWorkArrayList() {
|
||||||
|
return workArrayList;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setWorkArrayList(ArrayList<Work> workArrayList) {
|
||||||
|
this.workArrayList = workArrayList;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkList() {
|
||||||
|
workArrayList = new ArrayList<>();
|
||||||
|
}
|
||||||
|
}
|
|
@ -104,8 +104,8 @@ public class CreateActionSetFromWebEntries implements Serializable {
|
||||||
final String ror = ROR_PREFIX
|
final String ror = ROR_PREFIX
|
||||||
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror")));
|
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror")));
|
||||||
ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror));
|
ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror));
|
||||||
ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror));
|
// ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror));
|
||||||
ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror));
|
// ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror));
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
.iterator();
|
.iterator();
|
||||||
|
@ -139,11 +139,17 @@ public class CreateActionSetFromWebEntries implements Serializable {
|
||||||
"institution", functions
|
"institution", functions
|
||||||
.explode(
|
.explode(
|
||||||
functions.col("institutions")))
|
functions.col("institutions")))
|
||||||
|
|
||||||
.selectExpr(
|
.selectExpr(
|
||||||
"id", "doi", "ids.pmcid as pmcid", "ids.pmid as pmid", "institution.ror as ror",
|
"id", "doi", "institution.ror as ror",
|
||||||
"institution.country_code as country_code", "publication_year")
|
"institution.country_code as country_code", "publication_year")
|
||||||
.distinct();
|
.distinct();
|
||||||
|
|
||||||
|
// .selectExpr(
|
||||||
|
// "id", "doi", "ids.pmcid as pmcid", "ids.pmid as pmid", "institution.ror as ror",
|
||||||
|
// "institution.country_code as country_code", "publication_year")
|
||||||
|
// .distinct();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Dataset<Row> readBlackList(SparkSession spark, String inputPath) {
|
private static Dataset<Row> readBlackList(SparkSession spark, String inputPath) {
|
||||||
|
|
|
@ -20,6 +20,9 @@ public class Author extends ORCIDItem {
|
||||||
|
|
||||||
private String lastModifiedDate;
|
private String lastModifiedDate;
|
||||||
|
|
||||||
|
public Author() {
|
||||||
|
}
|
||||||
|
|
||||||
public String getBiography() {
|
public String getBiography() {
|
||||||
return biography;
|
return biography;
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,4 +11,7 @@ public class ORCIDItem {
|
||||||
public void setOrcid(String orcid) {
|
public void setOrcid(String orcid) {
|
||||||
this.orcid = orcid;
|
this.orcid = orcid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ORCIDItem() {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,4 +32,6 @@ public class Work extends ORCIDItem {
|
||||||
pids.add(pid);
|
pids.add(pid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Work() {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,26 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "ip",
|
||||||
|
"paramLongName": "inputPath",
|
||||||
|
"paramDescription": "the zipped opencitations file",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "op",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the working path",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "issm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "the hdfs name node",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "wd",
|
||||||
|
"paramLongName": "workingDir",
|
||||||
|
"paramDescription": "the hdfs name node",
|
||||||
|
"paramRequired": false
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,114 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.person;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob;
|
||||||
|
import eu.dnetlib.dhp.actionmanager.personentity.ExtractPerson;
|
||||||
|
import eu.dnetlib.dhp.collection.orcid.model.Author;
|
||||||
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
public class CreatePersonAS {
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
private static Path workingDir;
|
||||||
|
private static final Logger log = LoggerFactory
|
||||||
|
.getLogger(CreatePersonAS.class);
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void beforeAll() throws IOException {
|
||||||
|
workingDir = Files
|
||||||
|
.createTempDirectory(CreatePersonAS.class.getSimpleName());
|
||||||
|
log.info("using work dir {}", workingDir);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.setAppName(CreatePersonAS.class.getSimpleName());
|
||||||
|
|
||||||
|
conf.setMaster("local[*]");
|
||||||
|
conf.set("spark.driver.host", "localhost");
|
||||||
|
conf.set("hive.metastore.local", "true");
|
||||||
|
conf.set("spark.ui.enabled", "false");
|
||||||
|
conf.set("spark.sql.codegen.wholeStage","false");
|
||||||
|
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||||
|
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||||
|
|
||||||
|
spark = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(CreatePersonAS.class.getSimpleName())
|
||||||
|
.config(conf)
|
||||||
|
.getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void afterAll() throws IOException {
|
||||||
|
FileUtils.deleteDirectory(workingDir.toFile());
|
||||||
|
spark.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testAuthors() throws Exception {
|
||||||
|
|
||||||
|
String inputPath = getClass()
|
||||||
|
.getResource(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/person/")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
// spark
|
||||||
|
// .read()
|
||||||
|
// .parquet(inputPath + "Authors")
|
||||||
|
// .as(Encoders.bean(Author.class))
|
||||||
|
// .filter((FilterFunction<Author>) a -> Optional.ofNullable(a.getOtherNames()).isPresent() &&
|
||||||
|
// Optional.ofNullable(a.getBiography()).isPresent())
|
||||||
|
// .write()
|
||||||
|
// .mode(SaveMode.Overwrite)
|
||||||
|
// .parquet(workingDir.toString() + "AuthorsSubset");
|
||||||
|
|
||||||
|
|
||||||
|
ExtractPerson
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-isSparkSessionManaged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-inputPath",
|
||||||
|
inputPath,
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/actionSet1",
|
||||||
|
"-workingDir",
|
||||||
|
workingDir.toString() + "/working"
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,10 @@
|
||||||
|
{"orcid":"0000-0001-6291-9619","title":"A Visible Light Driven Photoelectrochemical Chloramphenicol Aptasensor Based on a Gold Nanoparticle-Functionalized 3D Flower-like MoS<sub>2</sub>/TiO<sub>2</sub> Heterostructure","pids":[{"value":"10.1021/acs.langmuir.1c02956","schema":"doi"},{"value":"2-s2.0-85124885368","schema":"eid"},{"value":"15205827 07437463","schema":"issn"}]}
|
||||||
|
{"orcid":"0000-0002-3210-3034","title":"A Visible Light Driven Photoelectrochemical Chloramphenicol Aptasensor Based on a Gold Nanoparticle-Functionalized 3D Flower-like MoS<sub>2</sub>/TiO<sub>2</sub> Heterostructure","pids":[{"value":"10.1021/acs.langmuir.1c02956","schema":"doi"},{"value":"2-s2.0-85124885368","schema":"eid"},{"value":"15205827 07437463","schema":"issn"}]}
|
||||||
|
{"orcid":"0000-0001-6291-9619","title":"Study of High-Transverse-Momentum Higgs Boson Production in Association with a Vector Boson in the <math display=\"inline\"><mrow><mi>q</mi><mi>q</mi><mi>b</mi><mi>b</mi></mrow></math> Final State with the ATLAS Detector","pids":[{"value":"2736741","schema":"other-id"},{"value":"10.1103/PhysRevLett.132.131802","schema":"doi"},{"value":"2312.07605","schema":"arxiv"}]}
|
||||||
|
{"orcid":"0000-0002-3210-3034","title":"Study of High-Transverse-Momentum Higgs Boson Production in Association with a Vector Boson in the <math display=\"inline\"><mrow><mi>q</mi><mi>q</mi><mi>b</mi><mi>b</mi></mrow></math> Final State with the ATLAS Detector","pids":[{"value":"2736741","schema":"other-id"},{"value":"10.1103/PhysRevLett.132.131802","schema":"doi"},{"value":"2312.07605","schema":"arxiv"}]}
|
||||||
|
{"orcid":"0000-0002-9030-7609","title":"Search for supersymmetry in a final state containing two photons and missing transverse momentum in √s = 13 TeV pp collisions at the LHC using the ATLAS detector","pids":[{"value":"10.1140/epjc/s10052-016-4344-x","schema":"doi"},{"value":"2-s2.0-84988710988","schema":"eid"},{"value":"14346052 14346044","schema":"issn"}]}
|
||||||
|
{"orcid":"0000-0003-2552-9691","title":"Search for supersymmetry in a final state containing two photons and missing transverse momentum in $\\sqrt{s}$ = 13 TeV $pp$ collisions at the LHC using the ATLAS detector","pids":[{"value":"1473744","schema":"other-id"},{"value":"10.1140/epjc/s10052-016-4344-x","schema":"doi"},{"value":"1606.09150","schema":"arxiv"}]}
|
||||||
|
{"orcid":"0000-0003-0305-8980","title":"Search for supersymmetry in a final state containing two photons and missing transverse momentum in √s = 13 TeV pp collisions at the LHC using the ATLAS detector","pids":[{"value":"10.1140/epjc/s10052-016-4344-x","schema":"doi"},{"value":"2-s2.0-84988710988","schema":"eid"}]}
|
||||||
|
{"orcid":"0000-0002-9030-7609","title":"Measurement of the energy response of the ATLAS calorimeter to charged pions from $W^{\\pm }\\rightarrow \\tau ^{\\pm }(\\rightarrow \\pi ^{\\pm }\\nu _{\\tau })\\nu _{\\tau }$ events in Run 2 data","pids":[{"value":"1909507","schema":"other-id"},{"value":"10.1140/epjc/s10052-022-10117-2","schema":"doi"},{"value":"2108.09043","schema":"arxiv"}]}
|
||||||
|
{"orcid":"0000-0003-2629-4046","title":"Measurement of the energy response of the ATLAS calorimeter to charged pions from $W^{\\pm }\\rightarrow \\tau ^{\\pm }(\\rightarrow \\pi ^{\\pm }\\nu _{\\tau })\\nu _{\\tau }$ events in Run 2 data","pids":[{"value":"1909507","schema":"other-id"},{"value":"10.1140/epjc/s10052-022-10117-2","schema":"doi"},{"value":"2108.09043","schema":"arxiv"}]}
|
||||||
|
{"orcid":"0000-0001-8582-8912","title":"Measurement of the energy response of the ATLAS calorimeter to charged pions from $W^{\\pm }\\rightarrow \\tau ^{\\pm }(\\rightarrow \\pi ^{\\pm }\\nu _{\\tau })\\nu _{\\tau }$ events in Run 2 data","pids":[{"value":"1909507","schema":"other-id"},{"value":"10.1140/epjc/s10052-022-10117-2","schema":"doi"},{"value":"2108.09043","schema":"arxiv"}]}
|
19
pom.xml
19
pom.xml
|
@ -994,6 +994,25 @@
|
||||||
|
|
||||||
<!-- Build with scala 12 and Spark 3.4 -->
|
<!-- Build with scala 12 and Spark 3.4 -->
|
||||||
<profiles>
|
<profiles>
|
||||||
|
<!-- Activate ARM-compatible snappy dependency on new Silicon Macs -->
|
||||||
|
<profile>
|
||||||
|
<id>arm-silicon-mac</id>
|
||||||
|
<activation>
|
||||||
|
<os>
|
||||||
|
<arch>aarch64</arch>
|
||||||
|
<family>mac</family>
|
||||||
|
</os>
|
||||||
|
</activation>
|
||||||
|
<dependencyManagement>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.xerial.snappy</groupId>
|
||||||
|
<artifactId>snappy-java</artifactId>
|
||||||
|
<version>1.1.8.4</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</dependencyManagement>
|
||||||
|
</profile>
|
||||||
<profile>
|
<profile>
|
||||||
<id>spark-34</id>
|
<id>spark-34</id>
|
||||||
<properties>
|
<properties>
|
||||||
|
|
Loading…
Reference in New Issue