From 67ff783e650d3b42894dae334b6643778effc9b5 Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Sat, 29 Jun 2024 17:13:01 +0200 Subject: [PATCH 1/3] [Person]First implementation to include Person entity in the graph --- .../actionmanager/personentity/Coauthors.java | 19 + .../actionmanager/personentity/Couples.java | 40 +++ .../personentity/ExtractPerson.java | 331 ++++++++++++++++++ .../actionmanager/personentity/WorkList.java | 23 ++ .../CreateActionSetFromWebEntries.java | 12 +- .../dhp/collection/orcid/model/Author.java | 3 + .../dhp/collection/orcid/model/ORCIDItem.java | 3 + .../dhp/collection/orcid/model/Work.java | 2 + .../personentity/as_parameters.json | 26 ++ .../actionmanager/person/CreatePersonAS.java | 114 ++++++ .../actionmanager/person/WorkJson/part-00000 | 10 + pom.xml | 19 + 12 files changed, 599 insertions(+), 3 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Couples.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/WorkList.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json create mode 100644 dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/person/CreatePersonAS.java create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/person/WorkJson/part-00000 diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java new file mode 100644 index 000000000..a755c875f --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java @@ -0,0 +1,19 @@ +package eu.dnetlib.dhp.actionmanager.personentity; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +import java.io.Serializable; +import java.util.ArrayList; + + +public class Coauthors implements Serializable { + private ArrayList coauthors; + + public ArrayList getCoauthors() { + return coauthors; + } + + public void setCoauthors(ArrayList coauthors) { + this.coauthors = coauthors; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Couples.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Couples.java new file mode 100644 index 000000000..65670b22e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Couples.java @@ -0,0 +1,40 @@ +package eu.dnetlib.dhp.actionmanager.personentity; + +import eu.dnetlib.dhp.schema.oaf.Person; +import eu.dnetlib.dhp.schema.oaf.Relation; +import scala.Tuple2; + + +import java.io.Serializable; + +public class Couples implements Serializable { + Person p ; + Relation r; + + public Couples() { + + } + + public Person getP() { + return p; + } + + public void setP(Person p) { + this.p = p; + } + + public Relation getR() { + return r; + } + + public void setR(Relation r) { + this.r = r; + } + + public static Couples newInstance(Tuple2 couple){ + Couples c = new Couples(); + c.p = couple._1(); + c.r = couple._2(); + return c; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java new file mode 100644 index 000000000..0bbda0343 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java @@ -0,0 +1,331 @@ +package eu.dnetlib.dhp.actionmanager.personentity; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.Constants; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.collection.orcid.model.Author; +import eu.dnetlib.dhp.collection.orcid.model.Employment; +import eu.dnetlib.dhp.collection.orcid.model.Work; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Person; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import eu.dnetlib.dhp.schema.oaf.Pid; +import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; +import eu.dnetlib.dhp.utils.DHPUtils; +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.*; +import org.apache.spark.sql.*; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.spark_project.jetty.util.StringUtil; +import scala.Tuple2; +import static org.apache.spark.sql.functions.*; +import java.io.IOException; +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class ExtractPerson implements Serializable { + private static final Logger log = LoggerFactory.getLogger(ExtractPerson.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String OPENAIRE_PREFIX = "openaire____"; + private static final String SEPARATOR = "::"; + private static final String orcidKey = "10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5(ModelConstants.ORCID.toLowerCase()); + + private static final String DOI_PREFIX = "50|doi_________::"; + + private static final String PMID_PREFIX = "50|pmid________::"; + private static final String ARXIV_PREFIX = "50|arXiv_______::"; + + private static final String PMCID_PREFIX = "50|pmcid_______::"; + private static final String ROR_PREFIX = "20|ror_________::"; + private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______"; + public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid"; + public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID"; + + + public static void main(final String[] args) throws IOException, ParseException { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + ExtractPerson.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json")))); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}", outputPath); + + final String workingDir = parser.get("workingDir"); + log.info("workingDir {}", workingDir); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> createActionSet(spark, inputPath, outputPath, workingDir)); + + } + + private static void createActionSet(SparkSession spark, String inputPath, String outputPath, String workingDir) { + + Dataset authors = spark + .read() + .parquet(inputPath + "Authors").as(Encoders.bean(Author.class)); + + Dataset works = spark + .read() + .parquet(inputPath + "Works") + .as(Encoders.bean(Work.class)) + .filter((FilterFunction) w -> Optional.ofNullable(w.getPids()).isPresent() && + w.getPids().stream().anyMatch(p->p.getSchema().equalsIgnoreCase("doi") || + p.getSchema().equalsIgnoreCase("pmc") || + p.getSchema().equalsIgnoreCase("pmid") || + p.getSchema().equalsIgnoreCase("arxiv"))); + + Dataset employmentDataset = spark + .read() + .parquet(inputPath + "Employments") + .as(Encoders.bean(Employment.class)); + + + Dataset peopleToMap = authors.joinWith(works, authors.col("orcid").equalTo(works.col("orcid"))) + .map((MapFunction, Author>) t2 -> t2._1(), Encoders.bean(Author.class)) + .groupByKey((MapFunction) a -> a.getOrcid(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Author.class)); + + + Dataset employment = employmentDataset.joinWith(peopleToMap, employmentDataset.col("orcid").equalTo(peopleToMap.col("orcid"))) + .map((MapFunction, Employment>) t2 -> t2._1(), Encoders.bean(Employment.class)); + + peopleToMap.show(false); + + Dataset people; + people = peopleToMap.map((MapFunction) op -> { + Person person = new Person(); + person.setId(DHPUtils.generateIdentifier(op.getOrcid(), PERSON_PREFIX)); + person.setBiography(Optional.ofNullable(op.getBiography()) + + .orElse("")); + KeyValue kv = OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS); + kv.setDataInfo(null); + person.setCollectedfrom(Arrays.asList(kv)); + person.setAlternativeNames(Optional.ofNullable(op.getOtherNames()) + + .orElse(new ArrayList<>())); + person.setFamilyName(Optional.ofNullable(op.getFamilyName()) + + .orElse("")); + person.setGivenName(Optional.ofNullable(op.getGivenName()) + + .orElse("")); + person.setPid(Optional.ofNullable(op.getOtherPids()) + .map(v -> v.stream().map(p -> Pid.newInstance(p.getSchema(), p.getValue())).collect(Collectors.toList())) + .orElse(new ArrayList<>()) + ); + person.getPid().add(Pid.newInstance(ModelConstants.ORCID, op.getOrcid())); + person.setDateofcollection(op.getLastModifiedDate()); + person.setOriginalId(Arrays.asList(op.getOrcid())); + return person; + }, Encoders.bean(Person.class)); + + + people.show(false); + + + Dataset authorship; + authorship = works + .flatMap((FlatMapFunction) ExtractPerson::getAuthorshipRelationIterator + , Encoders.bean(Relation.class)); + + + authorship.show(false); + + + Dataset coauthorship = works + .flatMap((FlatMapFunction>) w -> { + List> lista = new ArrayList<>(); + w.getPids().stream().forEach(p -> { + if (p.getSchema().equalsIgnoreCase("doi") || p.getSchema().equalsIgnoreCase("pmc") || p.getSchema().equalsIgnoreCase("pmid") || p.getSchema().equalsIgnoreCase("arxiv")) + lista.add(new Tuple2<>(p.getValue(), w.getOrcid())); + }); + return lista.iterator(); + }, Encoders.tuple(Encoders.STRING(), Encoders.STRING())) + .groupByKey((MapFunction, String>) Tuple2::_1, Encoders.STRING()) + .mapGroups((MapGroupsFunction, Coauthors>) (k, it) -> + extractCoAuthors(it), Encoders.bean(Coauthors.class)) + .flatMap((FlatMapFunction) c -> c.getCoauthors().iterator(), Encoders.bean(Relation.class)) + .groupByKey((MapFunction) r -> r.getSource() + r.getTarget(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Relation.class)); + + coauthorship.show(false); + Dataset affiliation = employment + .filter((FilterFunction) e -> Optional.ofNullable(e.getAffiliationId()).isPresent()) + .filter((FilterFunction) e -> e.getAffiliationId().getSchema().equalsIgnoreCase("ror")) + .map((MapFunction) ExtractPerson::getAffiliationRelation + , Encoders.bean(Relation.class)); + + affiliation.show(false); + + people.toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .union(authorship.toJavaRDD().map(r-> new AtomicAction(r.getClass(),r))) + .union(coauthorship.toJavaRDD().map(r-> new AtomicAction(r.getClass(),r))) + .union(affiliation.toJavaRDD().map(r->new AtomicAction(r.getClass(),r))) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile( + outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);//, GzipCodec.class); + } + + private static Coauthors extractCoAuthors(Iterator> it) { + Coauthors coauth = new Coauthors(); + ArrayList ret = new ArrayList<>(); + List coauthors = new ArrayList<>(); + while(it.hasNext()) + coauthors.add(it.next()._2()); + for (int i = 0; i < coauthors.size() -1; i++ ) + for(int j = i + 1; j < coauthors.size(); j++) + ret.addAll(getCoAuthorshipRelations(coauthors.get(i), coauthors.get(j))); + + coauth.setCoauthors(ret); + + return coauth; + } + + private static Relation getAffiliationRelation(Employment row) { + String source = PERSON_PREFIX + IdentifierFactory.md5(row.getOrcid()); + String target = ROR_PREFIX + + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAffiliationId().getValue())); + List properties = new ArrayList<>() ; + + Relation relation = + OafMapperUtils.getRelation(source, target, ModelConstants.ORG_PERSON_RELTYPE, ModelConstants.ORG_PERSON_SUBRELTYPE, ModelConstants.ORG_PERSON_PARTICIPATES , + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils.dataInfo(false, null, false, false, + OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"), + null); + + if(Optional.ofNullable(row.getStartDate()).isPresent() && StringUtil.isNotBlank(row.getStartDate())){ + KeyValue kv = new KeyValue(); + kv.setKey("startDate"); + kv.setValue(row.getStartDate()); + properties.add(kv); + } + if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtil.isNotBlank(row.getEndDate())) { + KeyValue kv = new KeyValue(); + kv.setKey("endDate"); + kv.setValue(row.getEndDate()); + properties.add(kv); + } + + if (properties.size() > 0) + relation.setProperties(properties); + return relation; + + + } + + private static Collection getCoAuthorshipRelations(String orcid1, String orcid2) { + String source = PERSON_PREFIX + IdentifierFactory.md5(orcid1); + String target = PERSON_PREFIX + IdentifierFactory.md5(orcid2); + + return Arrays.asList(OafMapperUtils.getRelation(source, target,ModelConstants.PERSON_PERSON_RELTYPE, + ModelConstants.PERSON_PERSON_SUBRELTYPE, + ModelConstants.PERSON_PERSON_HASCOAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils.dataInfo(false, null, false, false, + OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"), + null), + OafMapperUtils.getRelation(target, source,ModelConstants.PERSON_PERSON_RELTYPE, + ModelConstants.PERSON_PERSON_SUBRELTYPE, + ModelConstants.PERSON_PERSON_HASCOAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils.dataInfo(false, null, false, false, + OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"), + null)); + + } + + private static @NotNull Iterator getAuthorshipRelationIterator(Work w) { + + if(Optional.ofNullable(w.getPids()).isPresent()) + return w.getPids() + .stream() + .map(pid -> getRelation(w.getOrcid(), pid)) + .filter(Objects::nonNull).collect(Collectors.toList()).iterator(); + List ret = new ArrayList<>(); + return ret.iterator(); + } + + + private static Relation getRelation(String orcid, eu.dnetlib.dhp.collection.orcid.model.Pid pid){ + String target ; + String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid); + switch (pid.getSchema()){ + case "doi": + target = DOI_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), pid.getValue())); + break; + case "pmid": + target = PMID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), pid.getValue())); + break; + case "arxiv": + target = ARXIV_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), pid.getValue())); + break; + case "pmcid": + target = PMCID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), pid.getValue())); + break; + + default: + return null; + } + + + return OafMapperUtils.getRelation(source, target,ModelConstants.RESULT_PERSON_RELTYPE, + ModelConstants.RESULT_PERSON_SUBRELTYPE, + ModelConstants.RESULT_PERSON_HASAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils.dataInfo(false, null, false, false, + OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"), + null); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/WorkList.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/WorkList.java new file mode 100644 index 000000000..71d2b491a --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/WorkList.java @@ -0,0 +1,23 @@ +package eu.dnetlib.dhp.actionmanager.personentity; + +import eu.dnetlib.dhp.collection.orcid.model.Work; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; + +import java.io.Serializable; +import java.util.ArrayList; + +public class WorkList implements Serializable { + private ArrayList workArrayList; + + public ArrayList getWorkArrayList() { + return workArrayList; + } + + public void setWorkArrayList(ArrayList workArrayList) { + this.workArrayList = workArrayList; + } + + public WorkList() { + workArrayList = new ArrayList<>(); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java index 27970f2c3..263179d6f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java @@ -104,8 +104,8 @@ public class CreateActionSetFromWebEntries implements Serializable { final String ror = ROR_PREFIX + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror"))); ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror)); - ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror)); - ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror)); +// ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror)); +// ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror)); return ret .iterator(); @@ -139,11 +139,17 @@ public class CreateActionSetFromWebEntries implements Serializable { "institution", functions .explode( functions.col("institutions"))) + .selectExpr( - "id", "doi", "ids.pmcid as pmcid", "ids.pmid as pmid", "institution.ror as ror", + "id", "doi", "institution.ror as ror", "institution.country_code as country_code", "publication_year") .distinct(); +// .selectExpr( +// "id", "doi", "ids.pmcid as pmcid", "ids.pmid as pmid", "institution.ror as ror", +// "institution.country_code as country_code", "publication_year") +// .distinct(); + } private static Dataset readBlackList(SparkSession spark, String inputPath) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Author.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Author.java index df87e4333..a1545ebfe 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Author.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Author.java @@ -20,6 +20,9 @@ public class Author extends ORCIDItem { private String lastModifiedDate; + public Author() { + } + public String getBiography() { return biography; } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/ORCIDItem.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/ORCIDItem.java index 6bc47bc26..419823cb1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/ORCIDItem.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/ORCIDItem.java @@ -11,4 +11,7 @@ public class ORCIDItem { public void setOrcid(String orcid) { this.orcid = orcid; } + + public ORCIDItem() { + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Work.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Work.java index 670170323..a8683aaaf 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Work.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Work.java @@ -32,4 +32,6 @@ public class Work extends ORCIDItem { pids.add(pid); } + public Work() { + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json new file mode 100644 index 000000000..3310f16e4 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "ip", + "paramLongName": "inputPath", + "paramDescription": "the zipped opencitations file", + "paramRequired": true + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "the working path", + "paramRequired": true + }, + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "the hdfs name node", + "paramRequired": false + }, + { + "paramName": "wd", + "paramLongName": "workingDir", + "paramDescription": "the hdfs name node", + "paramRequired": false + } +] diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/person/CreatePersonAS.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/person/CreatePersonAS.java new file mode 100644 index 000000000..a4d56ba2e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/person/CreatePersonAS.java @@ -0,0 +1,114 @@ + +package eu.dnetlib.dhp.actionmanager.person; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob; +import eu.dnetlib.dhp.actionmanager.personentity.ExtractPerson; +import eu.dnetlib.dhp.collection.orcid.model.Author; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CreatePersonAS { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + private static final Logger log = LoggerFactory + .getLogger(CreatePersonAS.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory(CreatePersonAS.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(CreatePersonAS.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.codegen.wholeStage","false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(CreatePersonAS.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + void testAuthors() throws Exception { + + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/person/") + .getPath(); + +// spark +// .read() +// .parquet(inputPath + "Authors") +// .as(Encoders.bean(Author.class)) +// .filter((FilterFunction) a -> Optional.ofNullable(a.getOtherNames()).isPresent() && +// Optional.ofNullable(a.getBiography()).isPresent()) +// .write() +// .mode(SaveMode.Overwrite) +// .parquet(workingDir.toString() + "AuthorsSubset"); + + + ExtractPerson + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + inputPath, + "-outputPath", + workingDir.toString() + "/actionSet1", + "-workingDir", + workingDir.toString() + "/working" + }); + + + + } + + + } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/person/WorkJson/part-00000 b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/person/WorkJson/part-00000 new file mode 100644 index 000000000..636595d49 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/person/WorkJson/part-00000 @@ -0,0 +1,10 @@ +{"orcid":"0000-0001-6291-9619","title":"A Visible Light Driven Photoelectrochemical Chloramphenicol Aptasensor Based on a Gold Nanoparticle-Functionalized 3D Flower-like MoS2/TiO2 Heterostructure","pids":[{"value":"10.1021/acs.langmuir.1c02956","schema":"doi"},{"value":"2-s2.0-85124885368","schema":"eid"},{"value":"15205827 07437463","schema":"issn"}]} +{"orcid":"0000-0002-3210-3034","title":"A Visible Light Driven Photoelectrochemical Chloramphenicol Aptasensor Based on a Gold Nanoparticle-Functionalized 3D Flower-like MoS2/TiO2 Heterostructure","pids":[{"value":"10.1021/acs.langmuir.1c02956","schema":"doi"},{"value":"2-s2.0-85124885368","schema":"eid"},{"value":"15205827 07437463","schema":"issn"}]} +{"orcid":"0000-0001-6291-9619","title":"Study of High-Transverse-Momentum Higgs Boson Production in Association with a Vector Boson in the qqbb Final State with the ATLAS Detector","pids":[{"value":"2736741","schema":"other-id"},{"value":"10.1103/PhysRevLett.132.131802","schema":"doi"},{"value":"2312.07605","schema":"arxiv"}]} +{"orcid":"0000-0002-3210-3034","title":"Study of High-Transverse-Momentum Higgs Boson Production in Association with a Vector Boson in the qqbb Final State with the ATLAS Detector","pids":[{"value":"2736741","schema":"other-id"},{"value":"10.1103/PhysRevLett.132.131802","schema":"doi"},{"value":"2312.07605","schema":"arxiv"}]} +{"orcid":"0000-0002-9030-7609","title":"Search for supersymmetry in a final state containing two photons and missing transverse momentum in √s = 13 TeV pp collisions at the LHC using the ATLAS detector","pids":[{"value":"10.1140/epjc/s10052-016-4344-x","schema":"doi"},{"value":"2-s2.0-84988710988","schema":"eid"},{"value":"14346052 14346044","schema":"issn"}]} +{"orcid":"0000-0003-2552-9691","title":"Search for supersymmetry in a final state containing two photons and missing transverse momentum in $\\sqrt{s}$ = 13 TeV $pp$ collisions at the LHC using the ATLAS detector","pids":[{"value":"1473744","schema":"other-id"},{"value":"10.1140/epjc/s10052-016-4344-x","schema":"doi"},{"value":"1606.09150","schema":"arxiv"}]} +{"orcid":"0000-0003-0305-8980","title":"Search for supersymmetry in a final state containing two photons and missing transverse momentum in √s = 13 TeV pp collisions at the LHC using the ATLAS detector","pids":[{"value":"10.1140/epjc/s10052-016-4344-x","schema":"doi"},{"value":"2-s2.0-84988710988","schema":"eid"}]} +{"orcid":"0000-0002-9030-7609","title":"Measurement of the energy response of the ATLAS calorimeter to charged pions from $W^{\\pm }\\rightarrow \\tau ^{\\pm }(\\rightarrow \\pi ^{\\pm }\\nu _{\\tau })\\nu _{\\tau }$ events in Run 2 data","pids":[{"value":"1909507","schema":"other-id"},{"value":"10.1140/epjc/s10052-022-10117-2","schema":"doi"},{"value":"2108.09043","schema":"arxiv"}]} +{"orcid":"0000-0003-2629-4046","title":"Measurement of the energy response of the ATLAS calorimeter to charged pions from $W^{\\pm }\\rightarrow \\tau ^{\\pm }(\\rightarrow \\pi ^{\\pm }\\nu _{\\tau })\\nu _{\\tau }$ events in Run 2 data","pids":[{"value":"1909507","schema":"other-id"},{"value":"10.1140/epjc/s10052-022-10117-2","schema":"doi"},{"value":"2108.09043","schema":"arxiv"}]} +{"orcid":"0000-0001-8582-8912","title":"Measurement of the energy response of the ATLAS calorimeter to charged pions from $W^{\\pm }\\rightarrow \\tau ^{\\pm }(\\rightarrow \\pi ^{\\pm }\\nu _{\\tau })\\nu _{\\tau }$ events in Run 2 data","pids":[{"value":"1909507","schema":"other-id"},{"value":"10.1140/epjc/s10052-022-10117-2","schema":"doi"},{"value":"2108.09043","schema":"arxiv"}]} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 9e554204d..c8d7a980a 100644 --- a/pom.xml +++ b/pom.xml @@ -994,6 +994,25 @@ + + + arm-silicon-mac + + + aarch64 + mac + + + + + + org.xerial.snappy + snappy-java + 1.1.8.4 + + + + spark-34 From ddd20e7f8edb69e5493e8c5af80d9931da08c582 Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Thu, 4 Jul 2024 12:08:46 +0200 Subject: [PATCH 2/3] [Person]first implementation of the action set to include Person entity in the graph starting from the orcid data --- .../actionmanager/personentity/Coauthors.java | 18 +- .../actionmanager/personentity/Couples.java | 50 +- .../personentity/ExtractPerson.java | 621 ++++++++++-------- .../actionmanager/personentity/WorkList.java | 28 +- .../personentity/as_parameters.json | 13 +- .../actionmanager/personentity/job.properties | 2 + .../personentity/oozie_app/config-default.xml | 30 + .../personentity/oozie_app/workflow.xml | 111 ++++ .../actionmanager/person/CreatePersonAS.java | 150 ++++- 9 files changed, 683 insertions(+), 340 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/job.properties create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java index a755c875f..f550178d7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java @@ -1,19 +1,19 @@ -package eu.dnetlib.dhp.actionmanager.personentity; -import eu.dnetlib.dhp.schema.oaf.Relation; +package eu.dnetlib.dhp.actionmanager.personentity; import java.io.Serializable; import java.util.ArrayList; +import eu.dnetlib.dhp.schema.oaf.Relation; public class Coauthors implements Serializable { - private ArrayList coauthors; + private ArrayList coauthors; - public ArrayList getCoauthors() { - return coauthors; - } + public ArrayList getCoauthors() { + return coauthors; + } - public void setCoauthors(ArrayList coauthors) { - this.coauthors = coauthors; - } + public void setCoauthors(ArrayList coauthors) { + this.coauthors = coauthors; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Couples.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Couples.java index 65670b22e..d052b52b6 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Couples.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Couples.java @@ -1,40 +1,40 @@ + package eu.dnetlib.dhp.actionmanager.personentity; +import java.io.Serializable; + import eu.dnetlib.dhp.schema.oaf.Person; import eu.dnetlib.dhp.schema.oaf.Relation; import scala.Tuple2; - -import java.io.Serializable; - public class Couples implements Serializable { - Person p ; - Relation r; + Person p; + Relation r; - public Couples() { + public Couples() { - } + } - public Person getP() { - return p; - } + public Person getP() { + return p; + } - public void setP(Person p) { - this.p = p; - } + public void setP(Person p) { + this.p = p; + } - public Relation getR() { - return r; - } + public Relation getR() { + return r; + } - public void setR(Relation r) { - this.r = r; - } + public void setR(Relation r) { + this.r = r; + } - public static Couples newInstance(Tuple2 couple){ - Couples c = new Couples(); - c.p = couple._1(); - c.r = couple._2(); - return c; - } + public static Couples newInstance(Tuple2 couple) { + Couples c = new Couples(); + c.p = couple._1(); + c.r = couple._2(); + return c; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java index 0bbda0343..b7d5f4367 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java @@ -1,27 +1,18 @@ + package eu.dnetlib.dhp.actionmanager.personentity; -import com.fasterxml.jackson.databind.ObjectMapper; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static org.apache.spark.sql.functions.*; + +import java.io.IOException; +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; -import eu.dnetlib.dhp.actionmanager.Constants; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.collection.orcid.model.Author; -import eu.dnetlib.dhp.collection.orcid.model.Employment; -import eu.dnetlib.dhp.collection.orcid.model.Work; -import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Person; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; -import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; -import eu.dnetlib.dhp.schema.oaf.Pid; -import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner; -import eu.dnetlib.dhp.schema.oaf.utils.PidType; -import eu.dnetlib.dhp.utils.DHPUtils; import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; @@ -31,301 +22,399 @@ import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.spark_project.jetty.util.StringUtil; -import scala.Tuple2; -import static org.apache.spark.sql.functions.*; -import java.io.IOException; -import java.io.Serializable; -import java.util.*; -import java.util.stream.Collectors; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.Constants; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.collection.orcid.model.Author; +import eu.dnetlib.dhp.collection.orcid.model.Employment; +import eu.dnetlib.dhp.collection.orcid.model.Work; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Person; +import eu.dnetlib.dhp.schema.oaf.Pid; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; +import eu.dnetlib.dhp.utils.DHPUtils; +import scala.Tuple2; public class ExtractPerson implements Serializable { - private static final Logger log = LoggerFactory.getLogger(ExtractPerson.class); + private static final Logger log = LoggerFactory.getLogger(ExtractPerson.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String OPENAIRE_PREFIX = "openaire____"; - private static final String SEPARATOR = "::"; - private static final String orcidKey = "10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5(ModelConstants.ORCID.toLowerCase()); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String OPENAIRE_PREFIX = "openaire____"; + private static final String SEPARATOR = "::"; + private static final String orcidKey = "10|" + OPENAIRE_PREFIX + SEPARATOR + + DHPUtils.md5(ModelConstants.ORCID.toLowerCase()); - private static final String DOI_PREFIX = "50|doi_________::"; + private static final String DOI_PREFIX = "50|doi_________::"; - private static final String PMID_PREFIX = "50|pmid________::"; - private static final String ARXIV_PREFIX = "50|arXiv_______::"; + private static final String PMID_PREFIX = "50|pmid________::"; + private static final String ARXIV_PREFIX = "50|arXiv_______::"; - private static final String PMCID_PREFIX = "50|pmcid_______::"; - private static final String ROR_PREFIX = "20|ror_________::"; - private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______"; - public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid"; - public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID"; + private static final String PMCID_PREFIX = "50|pmcid_______::"; + private static final String ROR_PREFIX = "20|ror_________::"; + private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______"; + public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid"; + public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID"; + public static void main(final String[] args) throws IOException, ParseException { - public static void main(final String[] args) throws IOException, ParseException { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + ExtractPerson.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json")))); - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - Objects - .requireNonNull( - ExtractPerson.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json")))); + parser.parseArgument(args); - parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}", inputPath); - final String inputPath = parser.get("inputPath"); - log.info("inputPath {}", inputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}", outputPath); - final String outputPath = parser.get("outputPath"); - log.info("outputPath {}", outputPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir {}", workingDir); - final String workingDir = parser.get("workingDir"); - log.info("workingDir {}", workingDir); + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + createActionSet(spark, inputPath, outputPath, workingDir); + }); - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> createActionSet(spark, inputPath, outputPath, workingDir)); + } - } + private static void createActionSet(SparkSession spark, String inputPath, String outputPath, String workingDir) { - private static void createActionSet(SparkSession spark, String inputPath, String outputPath, String workingDir) { + Dataset authors = spark + .read() + .parquet(inputPath + "Authors") + .as(Encoders.bean(Author.class)); - Dataset authors = spark - .read() - .parquet(inputPath + "Authors").as(Encoders.bean(Author.class)); + Dataset works = spark + .read() + .parquet(inputPath + "Works") + .as(Encoders.bean(Work.class)) + .filter( + (FilterFunction) w -> Optional.ofNullable(w.getPids()).isPresent() && + w + .getPids() + .stream() + .anyMatch( + p -> p.getSchema().equalsIgnoreCase("doi") || + p.getSchema().equalsIgnoreCase("pmc") || + p.getSchema().equalsIgnoreCase("pmid") || + p.getSchema().equalsIgnoreCase("arxiv"))); - Dataset works = spark - .read() - .parquet(inputPath + "Works") - .as(Encoders.bean(Work.class)) - .filter((FilterFunction) w -> Optional.ofNullable(w.getPids()).isPresent() && - w.getPids().stream().anyMatch(p->p.getSchema().equalsIgnoreCase("doi") || - p.getSchema().equalsIgnoreCase("pmc") || - p.getSchema().equalsIgnoreCase("pmid") || - p.getSchema().equalsIgnoreCase("arxiv"))); + Dataset employmentDataset = spark + .read() + .parquet(inputPath + "Employments") + .as(Encoders.bean(Employment.class)); - Dataset employmentDataset = spark - .read() - .parquet(inputPath + "Employments") - .as(Encoders.bean(Employment.class)); + Dataset peopleToMap = authors + .joinWith(works, authors.col("orcid").equalTo(works.col("orcid"))) + .map((MapFunction, Author>) t2 -> t2._1(), Encoders.bean(Author.class)) + .groupByKey((MapFunction) a -> a.getOrcid(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Author.class)); + Dataset employment = employmentDataset + .joinWith(peopleToMap, employmentDataset.col("orcid").equalTo(peopleToMap.col("orcid"))) + .map((MapFunction, Employment>) t2 -> t2._1(), Encoders.bean(Employment.class)); - Dataset peopleToMap = authors.joinWith(works, authors.col("orcid").equalTo(works.col("orcid"))) - .map((MapFunction, Author>) t2 -> t2._1(), Encoders.bean(Author.class)) - .groupByKey((MapFunction) a -> a.getOrcid(), Encoders.STRING()) - .mapGroups((MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Author.class)); + Dataset people; + peopleToMap.map((MapFunction) op -> { + Person person = new Person(); + person.setId(DHPUtils.generateIdentifier(op.getOrcid(), PERSON_PREFIX)); + person + .setBiography( + Optional + .ofNullable(op.getBiography()) + .orElse("")); + KeyValue kv = OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS); + kv.setDataInfo(null); + person.setCollectedfrom(Arrays.asList(kv)); + person + .setAlternativeNames( + Optional + .ofNullable(op.getOtherNames()) - Dataset employment = employmentDataset.joinWith(peopleToMap, employmentDataset.col("orcid").equalTo(peopleToMap.col("orcid"))) - .map((MapFunction, Employment>) t2 -> t2._1(), Encoders.bean(Employment.class)); + .orElse(new ArrayList<>())); + person + .setFamilyName( + Optional + .ofNullable(op.getFamilyName()) - peopleToMap.show(false); + .orElse("")); + person + .setGivenName( + Optional + .ofNullable(op.getGivenName()) - Dataset people; - people = peopleToMap.map((MapFunction) op -> { - Person person = new Person(); - person.setId(DHPUtils.generateIdentifier(op.getOrcid(), PERSON_PREFIX)); - person.setBiography(Optional.ofNullable(op.getBiography()) + .orElse("")); + person + .setPid( + Optional + .ofNullable(op.getOtherPids()) + .map( + v -> v + .stream() + .map(p -> Pid.newInstance(p.getSchema(), p.getValue())) + .collect(Collectors.toList())) + .orElse(new ArrayList<>())); + person.getPid().add(Pid.newInstance(ModelConstants.ORCID, op.getOrcid())); + person.setDateofcollection(op.getLastModifiedDate()); + person.setOriginalId(Arrays.asList(op.getOrcid())); + return person; + }, Encoders.bean(Person.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(workingDir + "/people"); - .orElse("")); - KeyValue kv = OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS); - kv.setDataInfo(null); - person.setCollectedfrom(Arrays.asList(kv)); - person.setAlternativeNames(Optional.ofNullable(op.getOtherNames()) + works + .flatMap( + (FlatMapFunction) ExtractPerson::getAuthorshipRelationIterator, + Encoders.bean(Relation.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(workingDir + "/authorship"); - .orElse(new ArrayList<>())); - person.setFamilyName(Optional.ofNullable(op.getFamilyName()) + works + .flatMap((FlatMapFunction>) w -> { + List> lista = new ArrayList<>(); + w.getPids().stream().forEach(p -> { + if (p.getSchema().equalsIgnoreCase("doi") || p.getSchema().equalsIgnoreCase("pmc") + || p.getSchema().equalsIgnoreCase("pmid") || p.getSchema().equalsIgnoreCase("arxiv")) + lista.add(new Tuple2<>(p.getValue(), w.getOrcid())); + }); + return lista.iterator(); + }, Encoders.tuple(Encoders.STRING(), Encoders.STRING())) + .groupByKey((MapFunction, String>) Tuple2::_1, Encoders.STRING()) + .mapGroups( + (MapGroupsFunction, Coauthors>) (k, it) -> extractCoAuthors(it), + Encoders.bean(Coauthors.class)) + .flatMap( + (FlatMapFunction) c -> c.getCoauthors().iterator(), Encoders.bean(Relation.class)) + .groupByKey((MapFunction) r -> r.getSource() + r.getTarget(), Encoders.STRING()) + .mapGroups( + (MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Relation.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(workingDir + "/coauthorship"); - .orElse("")); - person.setGivenName(Optional.ofNullable(op.getGivenName()) + employment + .filter((FilterFunction) e -> Optional.ofNullable(e.getAffiliationId()).isPresent()) + .filter((FilterFunction) e -> e.getAffiliationId().getSchema().equalsIgnoreCase("ror")) + .map( + (MapFunction) ExtractPerson::getAffiliationRelation, + Encoders.bean(Relation.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(workingDir + "/affiliation"); - .orElse("")); - person.setPid(Optional.ofNullable(op.getOtherPids()) - .map(v -> v.stream().map(p -> Pid.newInstance(p.getSchema(), p.getValue())).collect(Collectors.toList())) - .orElse(new ArrayList<>()) - ); - person.getPid().add(Pid.newInstance(ModelConstants.ORCID, op.getOrcid())); - person.setDateofcollection(op.getLastModifiedDate()); - person.setOriginalId(Arrays.asList(op.getOrcid())); - return person; - }, Encoders.bean(Person.class)); + spark + .read() + .json(workingDir + "/people") + .as(Encoders.bean(Person.class)) + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .union( + getRelations(spark, workingDir + "/authorship").toJavaRDD().map(r -> new AtomicAction(r.getClass(), r))) + .union( + getRelations(spark, workingDir + "/coauthorship") + .toJavaRDD() + .map(r -> new AtomicAction(r.getClass(), r))) + .union( + getRelations(spark, workingDir + "/affiliation") + .toJavaRDD() + .map(r -> new AtomicAction(r.getClass(), r))) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile( + outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); + } + private static Dataset getRelations(SparkSession spark, String path) { + return spark.read().json(path).as(Encoders.bean(Relation.class)); + } - people.show(false); + private static Coauthors extractCoAuthors(Iterator> it) { + Coauthors coauth = new Coauthors(); + ArrayList ret = new ArrayList<>(); + List coauthors = new ArrayList<>(); + while (it.hasNext()) + coauthors.add(it.next()._2()); + for (int i = 0; i < coauthors.size() - 1; i++) + for (int j = i + 1; j < coauthors.size(); j++) + ret.addAll(getCoAuthorshipRelations(coauthors.get(i), coauthors.get(j))); + coauth.setCoauthors(ret); - Dataset authorship; - authorship = works - .flatMap((FlatMapFunction) ExtractPerson::getAuthorshipRelationIterator - , Encoders.bean(Relation.class)); + return coauth; + } + private static Relation getAffiliationRelation(Employment row) { + String source = PERSON_PREFIX + IdentifierFactory.md5(row.getOrcid()); + String target = ROR_PREFIX + + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAffiliationId().getValue())); + List properties = new ArrayList<>(); - authorship.show(false); + Relation relation = OafMapperUtils + .getRelation( + source, target, ModelConstants.ORG_PERSON_RELTYPE, ModelConstants.ORG_PERSON_SUBRELTYPE, + ModelConstants.ORG_PERSON_PARTICIPATES, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.91"), + null); + if (Optional.ofNullable(row.getStartDate()).isPresent() && StringUtil.isNotBlank(row.getStartDate())) { + KeyValue kv = new KeyValue(); + kv.setKey("startDate"); + kv.setValue(row.getStartDate()); + properties.add(kv); + } + if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtil.isNotBlank(row.getEndDate())) { + KeyValue kv = new KeyValue(); + kv.setKey("endDate"); + kv.setValue(row.getEndDate()); + properties.add(kv); + } - Dataset coauthorship = works - .flatMap((FlatMapFunction>) w -> { - List> lista = new ArrayList<>(); - w.getPids().stream().forEach(p -> { - if (p.getSchema().equalsIgnoreCase("doi") || p.getSchema().equalsIgnoreCase("pmc") || p.getSchema().equalsIgnoreCase("pmid") || p.getSchema().equalsIgnoreCase("arxiv")) - lista.add(new Tuple2<>(p.getValue(), w.getOrcid())); - }); - return lista.iterator(); - }, Encoders.tuple(Encoders.STRING(), Encoders.STRING())) - .groupByKey((MapFunction, String>) Tuple2::_1, Encoders.STRING()) - .mapGroups((MapGroupsFunction, Coauthors>) (k, it) -> - extractCoAuthors(it), Encoders.bean(Coauthors.class)) - .flatMap((FlatMapFunction) c -> c.getCoauthors().iterator(), Encoders.bean(Relation.class)) - .groupByKey((MapFunction) r -> r.getSource() + r.getTarget(), Encoders.STRING()) - .mapGroups((MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Relation.class)); + if (properties.size() > 0) + relation.setProperties(properties); + return relation; - coauthorship.show(false); - Dataset affiliation = employment - .filter((FilterFunction) e -> Optional.ofNullable(e.getAffiliationId()).isPresent()) - .filter((FilterFunction) e -> e.getAffiliationId().getSchema().equalsIgnoreCase("ror")) - .map((MapFunction) ExtractPerson::getAffiliationRelation - , Encoders.bean(Relation.class)); + } - affiliation.show(false); + private static Collection getCoAuthorshipRelations(String orcid1, String orcid2) { + String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid1); + String target = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid2); - people.toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p)) - .union(authorship.toJavaRDD().map(r-> new AtomicAction(r.getClass(),r))) - .union(coauthorship.toJavaRDD().map(r-> new AtomicAction(r.getClass(),r))) - .union(affiliation.toJavaRDD().map(r->new AtomicAction(r.getClass(),r))) - .mapToPair( - aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile( - outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);//, GzipCodec.class); - } + return Arrays + .asList( + OafMapperUtils + .getRelation( + source, target, ModelConstants.PERSON_PERSON_RELTYPE, + ModelConstants.PERSON_PERSON_SUBRELTYPE, + ModelConstants.PERSON_PERSON_HASCOAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.91"), + null), + OafMapperUtils + .getRelation( + target, source, ModelConstants.PERSON_PERSON_RELTYPE, + ModelConstants.PERSON_PERSON_SUBRELTYPE, + ModelConstants.PERSON_PERSON_HASCOAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.91"), + null)); - private static Coauthors extractCoAuthors(Iterator> it) { - Coauthors coauth = new Coauthors(); - ArrayList ret = new ArrayList<>(); - List coauthors = new ArrayList<>(); - while(it.hasNext()) - coauthors.add(it.next()._2()); - for (int i = 0; i < coauthors.size() -1; i++ ) - for(int j = i + 1; j < coauthors.size(); j++) - ret.addAll(getCoAuthorshipRelations(coauthors.get(i), coauthors.get(j))); + } - coauth.setCoauthors(ret); + private static @NotNull Iterator getAuthorshipRelationIterator(Work w) { - return coauth; - } + if (Optional.ofNullable(w.getPids()).isPresent()) + return w + .getPids() + .stream() + .map(pid -> getRelation(w.getOrcid(), pid)) + .filter(Objects::nonNull) + .collect(Collectors.toList()) + .iterator(); + List ret = new ArrayList<>(); + return ret.iterator(); + } - private static Relation getAffiliationRelation(Employment row) { - String source = PERSON_PREFIX + IdentifierFactory.md5(row.getOrcid()); - String target = ROR_PREFIX - + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAffiliationId().getValue())); - List properties = new ArrayList<>() ; + private static Relation getRelation(String orcid, eu.dnetlib.dhp.collection.orcid.model.Pid pid) { + String target; + String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid); + switch (pid.getSchema()) { + case "doi": + target = DOI_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), pid.getValue())); + break; + case "pmid": + target = PMID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), pid.getValue())); + break; + case "arxiv": + target = ARXIV_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), pid.getValue())); + break; + case "pmcid": + target = PMCID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), pid.getValue())); + break; - Relation relation = - OafMapperUtils.getRelation(source, target, ModelConstants.ORG_PERSON_RELTYPE, ModelConstants.ORG_PERSON_SUBRELTYPE, ModelConstants.ORG_PERSON_PARTICIPATES , - Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), - OafMapperUtils.dataInfo(false, null, false, false, - OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"), - null); + default: + return null; + } - if(Optional.ofNullable(row.getStartDate()).isPresent() && StringUtil.isNotBlank(row.getStartDate())){ - KeyValue kv = new KeyValue(); - kv.setKey("startDate"); - kv.setValue(row.getStartDate()); - properties.add(kv); - } - if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtil.isNotBlank(row.getEndDate())) { - KeyValue kv = new KeyValue(); - kv.setKey("endDate"); - kv.setValue(row.getEndDate()); - properties.add(kv); - } - - if (properties.size() > 0) - relation.setProperties(properties); - return relation; - - - } - - private static Collection getCoAuthorshipRelations(String orcid1, String orcid2) { - String source = PERSON_PREFIX + IdentifierFactory.md5(orcid1); - String target = PERSON_PREFIX + IdentifierFactory.md5(orcid2); - - return Arrays.asList(OafMapperUtils.getRelation(source, target,ModelConstants.PERSON_PERSON_RELTYPE, - ModelConstants.PERSON_PERSON_SUBRELTYPE, - ModelConstants.PERSON_PERSON_HASCOAUTHORED, - Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), - OafMapperUtils.dataInfo(false, null, false, false, - OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"), - null), - OafMapperUtils.getRelation(target, source,ModelConstants.PERSON_PERSON_RELTYPE, - ModelConstants.PERSON_PERSON_SUBRELTYPE, - ModelConstants.PERSON_PERSON_HASCOAUTHORED, - Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), - OafMapperUtils.dataInfo(false, null, false, false, - OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"), - null)); - - } - - private static @NotNull Iterator getAuthorshipRelationIterator(Work w) { - - if(Optional.ofNullable(w.getPids()).isPresent()) - return w.getPids() - .stream() - .map(pid -> getRelation(w.getOrcid(), pid)) - .filter(Objects::nonNull).collect(Collectors.toList()).iterator(); - List ret = new ArrayList<>(); - return ret.iterator(); - } - - - private static Relation getRelation(String orcid, eu.dnetlib.dhp.collection.orcid.model.Pid pid){ - String target ; - String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid); - switch (pid.getSchema()){ - case "doi": - target = DOI_PREFIX - + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), pid.getValue())); - break; - case "pmid": - target = PMID_PREFIX - + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), pid.getValue())); - break; - case "arxiv": - target = ARXIV_PREFIX - + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), pid.getValue())); - break; - case "pmcid": - target = PMCID_PREFIX - + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), pid.getValue())); - break; - - default: - return null; - } - - - return OafMapperUtils.getRelation(source, target,ModelConstants.RESULT_PERSON_RELTYPE, - ModelConstants.RESULT_PERSON_SUBRELTYPE, - ModelConstants.RESULT_PERSON_HASAUTHORED, - Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), - OafMapperUtils.dataInfo(false, null, false, false, - OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"), - null); - } + return OafMapperUtils + .getRelation( + source, target, ModelConstants.RESULT_PERSON_RELTYPE, + ModelConstants.RESULT_PERSON_SUBRELTYPE, + ModelConstants.RESULT_PERSON_HASAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.91"), + null); + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/WorkList.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/WorkList.java index 71d2b491a..92842bfcf 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/WorkList.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/WorkList.java @@ -1,23 +1,25 @@ -package eu.dnetlib.dhp.actionmanager.personentity; -import eu.dnetlib.dhp.collection.orcid.model.Work; -import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; +package eu.dnetlib.dhp.actionmanager.personentity; import java.io.Serializable; import java.util.ArrayList; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; + +import eu.dnetlib.dhp.collection.orcid.model.Work; + public class WorkList implements Serializable { - private ArrayList workArrayList; + private ArrayList workArrayList; - public ArrayList getWorkArrayList() { - return workArrayList; - } + public ArrayList getWorkArrayList() { + return workArrayList; + } - public void setWorkArrayList(ArrayList workArrayList) { - this.workArrayList = workArrayList; - } + public void setWorkArrayList(ArrayList workArrayList) { + this.workArrayList = workArrayList; + } - public WorkList() { - workArrayList = new ArrayList<>(); - } + public WorkList() { + workArrayList = new ArrayList<>(); + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json index 3310f16e4..5175552e7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json @@ -16,11 +16,10 @@ "paramLongName": "isSparkSessionManaged", "paramDescription": "the hdfs name node", "paramRequired": false - }, - { - "paramName": "wd", - "paramLongName": "workingDir", - "paramDescription": "the hdfs name node", - "paramRequired": false - } + }, { + "paramName": "wd", + "paramLongName": "workingDir", + "paramDescription": "the hdfs name node", + "paramRequired": false +} ] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/job.properties b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/job.properties new file mode 100644 index 000000000..d2269718c --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/job.properties @@ -0,0 +1,2 @@ +inputPath=/data/orcid_2023/tables/ +outputPath=/user/miriam.baglioni/peopleAS \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/config-default.xml new file mode 100644 index 000000000..d262cb6e0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/config-default.xml @@ -0,0 +1,30 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + hiveDbName + openaire + + + oozie.launcher.mapreduce.user.classpath.first + true + + diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/workflow.xml new file mode 100644 index 000000000..166e7bb9c --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/workflow.xml @@ -0,0 +1,111 @@ + + + + + inputPath + inputPath + + + outputPath + the path where to store the actionset + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + + + yarn + cluster + Produces the ActionSet for Person entity and relevant relations + eu.dnetlib.dhp.actionmanager.personentity.ExtractPerson + dhp-aggregation-${projectVersion}.jar + + --executor-cores=4 + --executor-memory=4G + --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=5G + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + --conf spark.sql.shuffle.partitions=15000 + + --inputPath${inputPath} + --outputPath${outputPath} + --workingDir${workingDir} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/person/CreatePersonAS.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/person/CreatePersonAS.java index a4d56ba2e..2e7b21010 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/person/CreatePersonAS.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/person/CreatePersonAS.java @@ -1,15 +1,13 @@ package eu.dnetlib.dhp.actionmanager.person; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob; -import eu.dnetlib.dhp.actionmanager.personentity.ExtractPerson; -import eu.dnetlib.dhp.collection.orcid.model.Author; -import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; + import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -27,12 +25,18 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Optional; +import com.fasterxml.jackson.databind.ObjectMapper; -import static org.junit.jupiter.api.Assertions.assertEquals; +import eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob; +import eu.dnetlib.dhp.actionmanager.personentity.ExtractPerson; +import eu.dnetlib.dhp.collection.orcid.model.Author; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Person; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.utils.DHPUtils; public class CreatePersonAS { @@ -57,7 +61,7 @@ public class CreatePersonAS { conf.set("spark.driver.host", "localhost"); conf.set("hive.metastore.local", "true"); conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.codegen.wholeStage","false"); + conf.set("spark.sql.codegen.wholeStage", "false"); conf.set("spark.sql.warehouse.dir", workingDir.toString()); conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); @@ -92,7 +96,6 @@ public class CreatePersonAS { // .mode(SaveMode.Overwrite) // .parquet(workingDir.toString() + "AuthorsSubset"); - ExtractPerson .main( new String[] { @@ -102,13 +105,120 @@ public class CreatePersonAS { inputPath, "-outputPath", workingDir.toString() + "/actionSet1", - "-workingDir", - workingDir.toString() + "/working" + "-workingDir", + workingDir.toString() + "/working" }); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + JavaRDD relations = sc + .sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class) + .filter(v -> "eu.dnetlib.dhp.schema.oaf.Relation".equalsIgnoreCase(v._1().toString())) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); +// + JavaRDD people = sc + .sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class) + .filter(v -> "eu.dnetlib.dhp.schema.oaf.Person".equalsIgnoreCase(v._1().toString())) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Person) aa.getPayload())); +// + Assertions.assertEquals(7, people.count()); + Assertions + .assertEquals( + "Paulo", + people + .filter( + p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034"))) + .first() + .getGivenName()); + Assertions + .assertEquals( + "Tavares", + people + .filter( + p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034"))) + .first() + .getFamilyName()); + Assertions + .assertEquals( + 4, + people + .filter( + p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034"))) + .first() + .getAlternativeNames() + .size()); + Assertions + .assertEquals( + 4, + people + .filter( + p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034"))) + .first() + .getPid() + .size()); + Assertions + .assertTrue( + people + .filter( + p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034"))) + .first() + .getPid() + .stream() + .anyMatch( + p -> p.getSchema().equalsIgnoreCase("Scopus Author ID") + && p.getValue().equalsIgnoreCase("15119405200"))); + + Assertions + .assertEquals( + 16, + relations + .filter(r -> r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED)) + .count()); + Assertions + .assertEquals( + 14, + relations + .filter(r -> r.getRelClass().equalsIgnoreCase(ModelConstants.PERSON_PERSON_HASCOAUTHORED)) + .count()); + Assertions + .assertEquals( + 3, + relations + .filter( + r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619")) + && r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED)) + .count()); + Assertions + .assertEquals( + 2, + relations + .filter( + r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619")) + && r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED) + && r.getTarget().startsWith("50|doi")) + .count()); + Assertions + .assertEquals( + 1, + relations + .filter( + r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619")) + && r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED) + && r.getTarget().startsWith("50|arXiv")) + .count()); + + Assertions + .assertEquals( + 1, + relations + .filter( + r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619")) + && r.getRelClass().equalsIgnoreCase(ModelConstants.PERSON_PERSON_HASCOAUTHORED)) + .count()); + Assertions.assertEquals(33, relations.count()); } - - } +} From c4658350613a3d124a3119a73efff7e33d8df443 Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Tue, 9 Jul 2024 12:29:55 +0200 Subject: [PATCH 3/3] [Person]new implementation for the extraction of the coAuthorship relations --- .../personentity/CoAuthorshipIterator.java | 80 +++++++++++++++++++ .../actionmanager/personentity/Coauthors.java | 7 +- .../personentity/ExtractPerson.java | 37 ++++++--- 3 files changed, 108 insertions(+), 16 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/CoAuthorshipIterator.java diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/CoAuthorshipIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/CoAuthorshipIterator.java new file mode 100644 index 000000000..76e4c4851 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/CoAuthorshipIterator.java @@ -0,0 +1,80 @@ + +package eu.dnetlib.dhp.actionmanager.personentity; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Person; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import eu.dnetlib.dhp.utils.DHPUtils; + +public class CoAuthorshipIterator implements Iterator { + private int firstIndex; + private int secondIndex; + private boolean firstRelation; + private List authors; + private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______::"; + private static final String OPENAIRE_PREFIX = "openaire____"; + private static final String SEPARATOR = "::"; + private static final String ORCID_KEY = "10|" + OPENAIRE_PREFIX + SEPARATOR + + DHPUtils.md5(ModelConstants.ORCID.toLowerCase()); + public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid"; + public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID"; + + @Override + public boolean hasNext() { + return firstIndex < authors.size() - 1; + } + + @Override + public Relation next() { + Relation rel = null; + if (firstRelation) { + rel = getRelation(authors.get(firstIndex), authors.get(secondIndex)); + firstRelation = Boolean.FALSE; + } else { + rel = getRelation(authors.get(secondIndex), authors.get(firstIndex)); + firstRelation = Boolean.TRUE; + secondIndex += 1; + if (secondIndex >= authors.size()) { + firstIndex += 1; + secondIndex = firstIndex + 1; + } + } + + return rel; + } + + public CoAuthorshipIterator(List authors) { + this.authors = authors; + this.firstIndex = 0; + this.secondIndex = 1; + this.firstRelation = Boolean.TRUE; + + } + + private Relation getRelation(String orcid1, String orcid2) { + String source = PERSON_PREFIX + IdentifierFactory.md5(orcid1); + String target = PERSON_PREFIX + IdentifierFactory.md5(orcid2); + return OafMapperUtils + .getRelation( + source, target, ModelConstants.PERSON_PERSON_RELTYPE, + ModelConstants.PERSON_PERSON_SUBRELTYPE, + ModelConstants.PERSON_PERSON_HASCOAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(ORCID_KEY, ModelConstants.ORCID_DS)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.91"), + null); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java index f550178d7..17f46d5c7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java @@ -3,17 +3,18 @@ package eu.dnetlib.dhp.actionmanager.personentity; import java.io.Serializable; import java.util.ArrayList; +import java.util.List; import eu.dnetlib.dhp.schema.oaf.Relation; public class Coauthors implements Serializable { - private ArrayList coauthors; + private List coauthors; - public ArrayList getCoauthors() { + public List getCoauthors() { return coauthors; } - public void setCoauthors(ArrayList coauthors) { + public void setCoauthors(List coauthors) { this.coauthors = coauthors; } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java index b7d5f4367..064fb41a1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java @@ -26,6 +26,7 @@ import org.spark_project.jetty.util.StringUtil; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.actionmanager.Constants; +import eu.dnetlib.dhp.actionmanager.transformativeagreement.model.TransformativeAgreementModel; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.collection.orcid.model.Author; import eu.dnetlib.dhp.collection.orcid.model.Employment; @@ -202,7 +203,7 @@ public class ExtractPerson implements Serializable { .mode(SaveMode.Overwrite) .json(workingDir + "/authorship"); - works + Dataset coauthorship = works .flatMap((FlatMapFunction>) w -> { List> lista = new ArrayList<>(); w.getPids().stream().forEach(p -> { @@ -217,10 +218,13 @@ public class ExtractPerson implements Serializable { (MapGroupsFunction, Coauthors>) (k, it) -> extractCoAuthors(it), Encoders.bean(Coauthors.class)) .flatMap( - (FlatMapFunction) c -> c.getCoauthors().iterator(), Encoders.bean(Relation.class)) + (FlatMapFunction) c -> new CoAuthorshipIterator(c.getCoauthors()), + Encoders.bean(Relation.class)) .groupByKey((MapFunction) r -> r.getSource() + r.getTarget(), Encoders.STRING()) .mapGroups( - (MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Relation.class)) + (MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Relation.class)); + + coauthorship .write() .option("compression", "gzip") .mode(SaveMode.Overwrite) @@ -237,10 +241,16 @@ public class ExtractPerson implements Serializable { .mode(SaveMode.Overwrite) .json(workingDir + "/affiliation"); - spark + people = spark .read() - .json(workingDir + "/people") - .as(Encoders.bean(Person.class)) + .textFile(workingDir + "/people") + .map( + (MapFunction) value -> OBJECT_MAPPER + .readValue(value, Person.class), + Encoders.bean(Person.class)); + + people.show(false); + people .toJavaRDD() .map(p -> new AtomicAction(p.getClass(), p)) .union( @@ -261,20 +271,21 @@ public class ExtractPerson implements Serializable { } private static Dataset getRelations(SparkSession spark, String path) { - return spark.read().json(path).as(Encoders.bean(Relation.class)); + return spark + .read() + .textFile(path) + .map( + (MapFunction) value -> OBJECT_MAPPER + .readValue(value, Relation.class), + Encoders.bean(Relation.class));// spark.read().json(path).as(Encoders.bean(Relation.class)); } private static Coauthors extractCoAuthors(Iterator> it) { Coauthors coauth = new Coauthors(); - ArrayList ret = new ArrayList<>(); List coauthors = new ArrayList<>(); while (it.hasNext()) coauthors.add(it.next()._2()); - for (int i = 0; i < coauthors.size() - 1; i++) - for (int j = i + 1; j < coauthors.size(); j++) - ret.addAll(getCoAuthorshipRelations(coauthors.get(i), coauthors.get(j))); - - coauth.setCoauthors(ret); + coauth.setCoauthors(coauthors); return coauth; }