diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/CoAuthorshipIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/CoAuthorshipIterator.java new file mode 100644 index 000000000..76e4c4851 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/CoAuthorshipIterator.java @@ -0,0 +1,80 @@ + +package eu.dnetlib.dhp.actionmanager.personentity; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Person; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import eu.dnetlib.dhp.utils.DHPUtils; + +public class CoAuthorshipIterator implements Iterator { + private int firstIndex; + private int secondIndex; + private boolean firstRelation; + private List authors; + private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______::"; + private static final String OPENAIRE_PREFIX = "openaire____"; + private static final String SEPARATOR = "::"; + private static final String ORCID_KEY = "10|" + OPENAIRE_PREFIX + SEPARATOR + + DHPUtils.md5(ModelConstants.ORCID.toLowerCase()); + public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid"; + public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID"; + + @Override + public boolean hasNext() { + return firstIndex < authors.size() - 1; + } + + @Override + public Relation next() { + Relation rel = null; + if (firstRelation) { + rel = getRelation(authors.get(firstIndex), authors.get(secondIndex)); + firstRelation = Boolean.FALSE; + } else { + rel = getRelation(authors.get(secondIndex), authors.get(firstIndex)); + firstRelation = Boolean.TRUE; + secondIndex += 1; + if (secondIndex >= authors.size()) { + firstIndex += 1; + secondIndex = firstIndex + 1; + } + } + + return rel; + } + + public CoAuthorshipIterator(List authors) { + this.authors = authors; + this.firstIndex = 0; + this.secondIndex = 1; + this.firstRelation = Boolean.TRUE; + + } + + private Relation getRelation(String orcid1, String orcid2) { + String source = PERSON_PREFIX + IdentifierFactory.md5(orcid1); + String target = PERSON_PREFIX + IdentifierFactory.md5(orcid2); + return OafMapperUtils + .getRelation( + source, target, ModelConstants.PERSON_PERSON_RELTYPE, + ModelConstants.PERSON_PERSON_SUBRELTYPE, + ModelConstants.PERSON_PERSON_HASCOAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(ORCID_KEY, ModelConstants.ORCID_DS)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.91"), + null); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java new file mode 100644 index 000000000..17f46d5c7 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java @@ -0,0 +1,20 @@ + +package eu.dnetlib.dhp.actionmanager.personentity; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class Coauthors implements Serializable { + private List coauthors; + + public List getCoauthors() { + return coauthors; + } + + public void setCoauthors(List coauthors) { + this.coauthors = coauthors; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Couples.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Couples.java new file mode 100644 index 000000000..d052b52b6 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Couples.java @@ -0,0 +1,40 @@ + +package eu.dnetlib.dhp.actionmanager.personentity; + +import java.io.Serializable; + +import eu.dnetlib.dhp.schema.oaf.Person; +import eu.dnetlib.dhp.schema.oaf.Relation; +import scala.Tuple2; + +public class Couples implements Serializable { + Person p; + Relation r; + + public Couples() { + + } + + public Person getP() { + return p; + } + + public void setP(Person p) { + this.p = p; + } + + public Relation getR() { + return r; + } + + public void setR(Relation r) { + this.r = r; + } + + public static Couples newInstance(Tuple2 couple) { + Couples c = new Couples(); + c.p = couple._1(); + c.r = couple._2(); + return c; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java new file mode 100644 index 000000000..064fb41a1 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java @@ -0,0 +1,431 @@ + +package eu.dnetlib.dhp.actionmanager.personentity; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static org.apache.spark.sql.functions.*; + +import java.io.IOException; +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.*; +import org.apache.spark.sql.*; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.spark_project.jetty.util.StringUtil; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.Constants; +import eu.dnetlib.dhp.actionmanager.transformativeagreement.model.TransformativeAgreementModel; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.collection.orcid.model.Author; +import eu.dnetlib.dhp.collection.orcid.model.Employment; +import eu.dnetlib.dhp.collection.orcid.model.Work; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Person; +import eu.dnetlib.dhp.schema.oaf.Pid; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; +import eu.dnetlib.dhp.utils.DHPUtils; +import scala.Tuple2; + +public class ExtractPerson implements Serializable { + private static final Logger log = LoggerFactory.getLogger(ExtractPerson.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String OPENAIRE_PREFIX = "openaire____"; + private static final String SEPARATOR = "::"; + private static final String orcidKey = "10|" + OPENAIRE_PREFIX + SEPARATOR + + DHPUtils.md5(ModelConstants.ORCID.toLowerCase()); + + private static final String DOI_PREFIX = "50|doi_________::"; + + private static final String PMID_PREFIX = "50|pmid________::"; + private static final String ARXIV_PREFIX = "50|arXiv_______::"; + + private static final String PMCID_PREFIX = "50|pmcid_______::"; + private static final String ROR_PREFIX = "20|ror_________::"; + private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______"; + public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid"; + public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID"; + + public static void main(final String[] args) throws IOException, ParseException { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + ExtractPerson.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json")))); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}", outputPath); + + final String workingDir = parser.get("workingDir"); + log.info("workingDir {}", workingDir); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + createActionSet(spark, inputPath, outputPath, workingDir); + }); + + } + + private static void createActionSet(SparkSession spark, String inputPath, String outputPath, String workingDir) { + + Dataset authors = spark + .read() + .parquet(inputPath + "Authors") + .as(Encoders.bean(Author.class)); + + Dataset works = spark + .read() + .parquet(inputPath + "Works") + .as(Encoders.bean(Work.class)) + .filter( + (FilterFunction) w -> Optional.ofNullable(w.getPids()).isPresent() && + w + .getPids() + .stream() + .anyMatch( + p -> p.getSchema().equalsIgnoreCase("doi") || + p.getSchema().equalsIgnoreCase("pmc") || + p.getSchema().equalsIgnoreCase("pmid") || + p.getSchema().equalsIgnoreCase("arxiv"))); + + Dataset employmentDataset = spark + .read() + .parquet(inputPath + "Employments") + .as(Encoders.bean(Employment.class)); + + Dataset peopleToMap = authors + .joinWith(works, authors.col("orcid").equalTo(works.col("orcid"))) + .map((MapFunction, Author>) t2 -> t2._1(), Encoders.bean(Author.class)) + .groupByKey((MapFunction) a -> a.getOrcid(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Author.class)); + + Dataset employment = employmentDataset + .joinWith(peopleToMap, employmentDataset.col("orcid").equalTo(peopleToMap.col("orcid"))) + .map((MapFunction, Employment>) t2 -> t2._1(), Encoders.bean(Employment.class)); + + Dataset people; + peopleToMap.map((MapFunction) op -> { + Person person = new Person(); + person.setId(DHPUtils.generateIdentifier(op.getOrcid(), PERSON_PREFIX)); + person + .setBiography( + Optional + .ofNullable(op.getBiography()) + + .orElse("")); + KeyValue kv = OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS); + kv.setDataInfo(null); + person.setCollectedfrom(Arrays.asList(kv)); + person + .setAlternativeNames( + Optional + .ofNullable(op.getOtherNames()) + + .orElse(new ArrayList<>())); + person + .setFamilyName( + Optional + .ofNullable(op.getFamilyName()) + + .orElse("")); + person + .setGivenName( + Optional + .ofNullable(op.getGivenName()) + + .orElse("")); + person + .setPid( + Optional + .ofNullable(op.getOtherPids()) + .map( + v -> v + .stream() + .map(p -> Pid.newInstance(p.getSchema(), p.getValue())) + .collect(Collectors.toList())) + .orElse(new ArrayList<>())); + person.getPid().add(Pid.newInstance(ModelConstants.ORCID, op.getOrcid())); + person.setDateofcollection(op.getLastModifiedDate()); + person.setOriginalId(Arrays.asList(op.getOrcid())); + return person; + }, Encoders.bean(Person.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(workingDir + "/people"); + + works + .flatMap( + (FlatMapFunction) ExtractPerson::getAuthorshipRelationIterator, + Encoders.bean(Relation.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(workingDir + "/authorship"); + + Dataset coauthorship = works + .flatMap((FlatMapFunction>) w -> { + List> lista = new ArrayList<>(); + w.getPids().stream().forEach(p -> { + if (p.getSchema().equalsIgnoreCase("doi") || p.getSchema().equalsIgnoreCase("pmc") + || p.getSchema().equalsIgnoreCase("pmid") || p.getSchema().equalsIgnoreCase("arxiv")) + lista.add(new Tuple2<>(p.getValue(), w.getOrcid())); + }); + return lista.iterator(); + }, Encoders.tuple(Encoders.STRING(), Encoders.STRING())) + .groupByKey((MapFunction, String>) Tuple2::_1, Encoders.STRING()) + .mapGroups( + (MapGroupsFunction, Coauthors>) (k, it) -> extractCoAuthors(it), + Encoders.bean(Coauthors.class)) + .flatMap( + (FlatMapFunction) c -> new CoAuthorshipIterator(c.getCoauthors()), + Encoders.bean(Relation.class)) + .groupByKey((MapFunction) r -> r.getSource() + r.getTarget(), Encoders.STRING()) + .mapGroups( + (MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Relation.class)); + + coauthorship + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(workingDir + "/coauthorship"); + + employment + .filter((FilterFunction) e -> Optional.ofNullable(e.getAffiliationId()).isPresent()) + .filter((FilterFunction) e -> e.getAffiliationId().getSchema().equalsIgnoreCase("ror")) + .map( + (MapFunction) ExtractPerson::getAffiliationRelation, + Encoders.bean(Relation.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(workingDir + "/affiliation"); + + people = spark + .read() + .textFile(workingDir + "/people") + .map( + (MapFunction) value -> OBJECT_MAPPER + .readValue(value, Person.class), + Encoders.bean(Person.class)); + + people.show(false); + people + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .union( + getRelations(spark, workingDir + "/authorship").toJavaRDD().map(r -> new AtomicAction(r.getClass(), r))) + .union( + getRelations(spark, workingDir + "/coauthorship") + .toJavaRDD() + .map(r -> new AtomicAction(r.getClass(), r))) + .union( + getRelations(spark, workingDir + "/affiliation") + .toJavaRDD() + .map(r -> new AtomicAction(r.getClass(), r))) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile( + outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); + } + + private static Dataset getRelations(SparkSession spark, String path) { + return spark + .read() + .textFile(path) + .map( + (MapFunction) value -> OBJECT_MAPPER + .readValue(value, Relation.class), + Encoders.bean(Relation.class));// spark.read().json(path).as(Encoders.bean(Relation.class)); + } + + private static Coauthors extractCoAuthors(Iterator> it) { + Coauthors coauth = new Coauthors(); + List coauthors = new ArrayList<>(); + while (it.hasNext()) + coauthors.add(it.next()._2()); + coauth.setCoauthors(coauthors); + + return coauth; + } + + private static Relation getAffiliationRelation(Employment row) { + String source = PERSON_PREFIX + IdentifierFactory.md5(row.getOrcid()); + String target = ROR_PREFIX + + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAffiliationId().getValue())); + List properties = new ArrayList<>(); + + Relation relation = OafMapperUtils + .getRelation( + source, target, ModelConstants.ORG_PERSON_RELTYPE, ModelConstants.ORG_PERSON_SUBRELTYPE, + ModelConstants.ORG_PERSON_PARTICIPATES, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.91"), + null); + + if (Optional.ofNullable(row.getStartDate()).isPresent() && StringUtil.isNotBlank(row.getStartDate())) { + KeyValue kv = new KeyValue(); + kv.setKey("startDate"); + kv.setValue(row.getStartDate()); + properties.add(kv); + } + if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtil.isNotBlank(row.getEndDate())) { + KeyValue kv = new KeyValue(); + kv.setKey("endDate"); + kv.setValue(row.getEndDate()); + properties.add(kv); + } + + if (properties.size() > 0) + relation.setProperties(properties); + return relation; + + } + + private static Collection getCoAuthorshipRelations(String orcid1, String orcid2) { + String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid1); + String target = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid2); + + return Arrays + .asList( + OafMapperUtils + .getRelation( + source, target, ModelConstants.PERSON_PERSON_RELTYPE, + ModelConstants.PERSON_PERSON_SUBRELTYPE, + ModelConstants.PERSON_PERSON_HASCOAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.91"), + null), + OafMapperUtils + .getRelation( + target, source, ModelConstants.PERSON_PERSON_RELTYPE, + ModelConstants.PERSON_PERSON_SUBRELTYPE, + ModelConstants.PERSON_PERSON_HASCOAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.91"), + null)); + + } + + private static @NotNull Iterator getAuthorshipRelationIterator(Work w) { + + if (Optional.ofNullable(w.getPids()).isPresent()) + return w + .getPids() + .stream() + .map(pid -> getRelation(w.getOrcid(), pid)) + .filter(Objects::nonNull) + .collect(Collectors.toList()) + .iterator(); + List ret = new ArrayList<>(); + return ret.iterator(); + } + + private static Relation getRelation(String orcid, eu.dnetlib.dhp.collection.orcid.model.Pid pid) { + String target; + String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid); + switch (pid.getSchema()) { + case "doi": + target = DOI_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), pid.getValue())); + break; + case "pmid": + target = PMID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), pid.getValue())); + break; + case "arxiv": + target = ARXIV_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), pid.getValue())); + break; + case "pmcid": + target = PMCID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), pid.getValue())); + break; + + default: + return null; + } + + return OafMapperUtils + .getRelation( + source, target, ModelConstants.RESULT_PERSON_RELTYPE, + ModelConstants.RESULT_PERSON_SUBRELTYPE, + ModelConstants.RESULT_PERSON_HASAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.91"), + null); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/WorkList.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/WorkList.java new file mode 100644 index 000000000..92842bfcf --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/WorkList.java @@ -0,0 +1,25 @@ + +package eu.dnetlib.dhp.actionmanager.personentity; + +import java.io.Serializable; +import java.util.ArrayList; + +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; + +import eu.dnetlib.dhp.collection.orcid.model.Work; + +public class WorkList implements Serializable { + private ArrayList workArrayList; + + public ArrayList getWorkArrayList() { + return workArrayList; + } + + public void setWorkArrayList(ArrayList workArrayList) { + this.workArrayList = workArrayList; + } + + public WorkList() { + workArrayList = new ArrayList<>(); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java index 07668f53b..9828ad907 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java @@ -5,7 +5,6 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; import java.util.*; -import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; @@ -30,7 +29,6 @@ import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner; import eu.dnetlib.dhp.schema.oaf.utils.PidType; -import io.netty.util.Constant; import scala.Tuple2; /** diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Author.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Author.java index df87e4333..a1545ebfe 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Author.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Author.java @@ -20,6 +20,9 @@ public class Author extends ORCIDItem { private String lastModifiedDate; + public Author() { + } + public String getBiography() { return biography; } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/ORCIDItem.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/ORCIDItem.java index 6bc47bc26..419823cb1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/ORCIDItem.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/ORCIDItem.java @@ -11,4 +11,7 @@ public class ORCIDItem { public void setOrcid(String orcid) { this.orcid = orcid; } + + public ORCIDItem() { + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Work.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Work.java index 670170323..a8683aaaf 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Work.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Work.java @@ -32,4 +32,6 @@ public class Work extends ORCIDItem { pids.add(pid); } + public Work() { + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json new file mode 100644 index 000000000..5175552e7 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json @@ -0,0 +1,25 @@ +[ + { + "paramName": "ip", + "paramLongName": "inputPath", + "paramDescription": "the zipped opencitations file", + "paramRequired": true + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "the working path", + "paramRequired": true + }, + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "the hdfs name node", + "paramRequired": false + }, { + "paramName": "wd", + "paramLongName": "workingDir", + "paramDescription": "the hdfs name node", + "paramRequired": false +} +] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/job.properties b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/job.properties new file mode 100644 index 000000000..d2269718c --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/job.properties @@ -0,0 +1,2 @@ +inputPath=/data/orcid_2023/tables/ +outputPath=/user/miriam.baglioni/peopleAS \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/config-default.xml new file mode 100644 index 000000000..d262cb6e0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/config-default.xml @@ -0,0 +1,30 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + hiveDbName + openaire + + + oozie.launcher.mapreduce.user.classpath.first + true + + diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/workflow.xml new file mode 100644 index 000000000..166e7bb9c --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/workflow.xml @@ -0,0 +1,111 @@ + + + + + inputPath + inputPath + + + outputPath + the path where to store the actionset + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + + + yarn + cluster + Produces the ActionSet for Person entity and relevant relations + eu.dnetlib.dhp.actionmanager.personentity.ExtractPerson + dhp-aggregation-${projectVersion}.jar + + --executor-cores=4 + --executor-memory=4G + --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=5G + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + --conf spark.sql.shuffle.partitions=15000 + + --inputPath${inputPath} + --outputPath${outputPath} + --workingDir${workingDir} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/person/CreatePersonAS.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/person/CreatePersonAS.java new file mode 100644 index 000000000..2e7b21010 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/person/CreatePersonAS.java @@ -0,0 +1,224 @@ + +package eu.dnetlib.dhp.actionmanager.person; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob; +import eu.dnetlib.dhp.actionmanager.personentity.ExtractPerson; +import eu.dnetlib.dhp.collection.orcid.model.Author; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Person; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.utils.DHPUtils; + +public class CreatePersonAS { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + private static final Logger log = LoggerFactory + .getLogger(CreatePersonAS.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory(CreatePersonAS.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(CreatePersonAS.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.codegen.wholeStage", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(CreatePersonAS.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + void testAuthors() throws Exception { + + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/person/") + .getPath(); + +// spark +// .read() +// .parquet(inputPath + "Authors") +// .as(Encoders.bean(Author.class)) +// .filter((FilterFunction) a -> Optional.ofNullable(a.getOtherNames()).isPresent() && +// Optional.ofNullable(a.getBiography()).isPresent()) +// .write() +// .mode(SaveMode.Overwrite) +// .parquet(workingDir.toString() + "AuthorsSubset"); + + ExtractPerson + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + inputPath, + "-outputPath", + workingDir.toString() + "/actionSet1", + "-workingDir", + workingDir.toString() + "/working" + }); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD relations = sc + .sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class) + .filter(v -> "eu.dnetlib.dhp.schema.oaf.Relation".equalsIgnoreCase(v._1().toString())) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); +// + JavaRDD people = sc + .sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class) + .filter(v -> "eu.dnetlib.dhp.schema.oaf.Person".equalsIgnoreCase(v._1().toString())) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Person) aa.getPayload())); +// + Assertions.assertEquals(7, people.count()); + Assertions + .assertEquals( + "Paulo", + people + .filter( + p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034"))) + .first() + .getGivenName()); + Assertions + .assertEquals( + "Tavares", + people + .filter( + p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034"))) + .first() + .getFamilyName()); + Assertions + .assertEquals( + 4, + people + .filter( + p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034"))) + .first() + .getAlternativeNames() + .size()); + Assertions + .assertEquals( + 4, + people + .filter( + p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034"))) + .first() + .getPid() + .size()); + Assertions + .assertTrue( + people + .filter( + p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034"))) + .first() + .getPid() + .stream() + .anyMatch( + p -> p.getSchema().equalsIgnoreCase("Scopus Author ID") + && p.getValue().equalsIgnoreCase("15119405200"))); + + Assertions + .assertEquals( + 16, + relations + .filter(r -> r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED)) + .count()); + Assertions + .assertEquals( + 14, + relations + .filter(r -> r.getRelClass().equalsIgnoreCase(ModelConstants.PERSON_PERSON_HASCOAUTHORED)) + .count()); + Assertions + .assertEquals( + 3, + relations + .filter( + r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619")) + && r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED)) + .count()); + Assertions + .assertEquals( + 2, + relations + .filter( + r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619")) + && r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED) + && r.getTarget().startsWith("50|doi")) + .count()); + Assertions + .assertEquals( + 1, + relations + .filter( + r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619")) + && r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED) + && r.getTarget().startsWith("50|arXiv")) + .count()); + + Assertions + .assertEquals( + 1, + relations + .filter( + r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619")) + && r.getRelClass().equalsIgnoreCase(ModelConstants.PERSON_PERSON_HASCOAUTHORED)) + .count()); + Assertions.assertEquals(33, relations.count()); + + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/person/WorkJson/part-00000 b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/person/WorkJson/part-00000 new file mode 100644 index 000000000..636595d49 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/person/WorkJson/part-00000 @@ -0,0 +1,10 @@ +{"orcid":"0000-0001-6291-9619","title":"A Visible Light Driven Photoelectrochemical Chloramphenicol Aptasensor Based on a Gold Nanoparticle-Functionalized 3D Flower-like MoS2/TiO2 Heterostructure","pids":[{"value":"10.1021/acs.langmuir.1c02956","schema":"doi"},{"value":"2-s2.0-85124885368","schema":"eid"},{"value":"15205827 07437463","schema":"issn"}]} +{"orcid":"0000-0002-3210-3034","title":"A Visible Light Driven Photoelectrochemical Chloramphenicol Aptasensor Based on a Gold Nanoparticle-Functionalized 3D Flower-like MoS2/TiO2 Heterostructure","pids":[{"value":"10.1021/acs.langmuir.1c02956","schema":"doi"},{"value":"2-s2.0-85124885368","schema":"eid"},{"value":"15205827 07437463","schema":"issn"}]} +{"orcid":"0000-0001-6291-9619","title":"Study of High-Transverse-Momentum Higgs Boson Production in Association with a Vector Boson in the qqbb Final State with the ATLAS Detector","pids":[{"value":"2736741","schema":"other-id"},{"value":"10.1103/PhysRevLett.132.131802","schema":"doi"},{"value":"2312.07605","schema":"arxiv"}]} +{"orcid":"0000-0002-3210-3034","title":"Study of High-Transverse-Momentum Higgs Boson Production in Association with a Vector Boson in the qqbb Final State with the ATLAS Detector","pids":[{"value":"2736741","schema":"other-id"},{"value":"10.1103/PhysRevLett.132.131802","schema":"doi"},{"value":"2312.07605","schema":"arxiv"}]} +{"orcid":"0000-0002-9030-7609","title":"Search for supersymmetry in a final state containing two photons and missing transverse momentum in √s = 13 TeV pp collisions at the LHC using the ATLAS detector","pids":[{"value":"10.1140/epjc/s10052-016-4344-x","schema":"doi"},{"value":"2-s2.0-84988710988","schema":"eid"},{"value":"14346052 14346044","schema":"issn"}]} +{"orcid":"0000-0003-2552-9691","title":"Search for supersymmetry in a final state containing two photons and missing transverse momentum in $\\sqrt{s}$ = 13 TeV $pp$ collisions at the LHC using the ATLAS detector","pids":[{"value":"1473744","schema":"other-id"},{"value":"10.1140/epjc/s10052-016-4344-x","schema":"doi"},{"value":"1606.09150","schema":"arxiv"}]} +{"orcid":"0000-0003-0305-8980","title":"Search for supersymmetry in a final state containing two photons and missing transverse momentum in √s = 13 TeV pp collisions at the LHC using the ATLAS detector","pids":[{"value":"10.1140/epjc/s10052-016-4344-x","schema":"doi"},{"value":"2-s2.0-84988710988","schema":"eid"}]} +{"orcid":"0000-0002-9030-7609","title":"Measurement of the energy response of the ATLAS calorimeter to charged pions from $W^{\\pm }\\rightarrow \\tau ^{\\pm }(\\rightarrow \\pi ^{\\pm }\\nu _{\\tau })\\nu _{\\tau }$ events in Run 2 data","pids":[{"value":"1909507","schema":"other-id"},{"value":"10.1140/epjc/s10052-022-10117-2","schema":"doi"},{"value":"2108.09043","schema":"arxiv"}]} +{"orcid":"0000-0003-2629-4046","title":"Measurement of the energy response of the ATLAS calorimeter to charged pions from $W^{\\pm }\\rightarrow \\tau ^{\\pm }(\\rightarrow \\pi ^{\\pm }\\nu _{\\tau })\\nu _{\\tau }$ events in Run 2 data","pids":[{"value":"1909507","schema":"other-id"},{"value":"10.1140/epjc/s10052-022-10117-2","schema":"doi"},{"value":"2108.09043","schema":"arxiv"}]} +{"orcid":"0000-0001-8582-8912","title":"Measurement of the energy response of the ATLAS calorimeter to charged pions from $W^{\\pm }\\rightarrow \\tau ^{\\pm }(\\rightarrow \\pi ^{\\pm }\\nu _{\\tau })\\nu _{\\tau }$ events in Run 2 data","pids":[{"value":"1909507","schema":"other-id"},{"value":"10.1140/epjc/s10052-022-10117-2","schema":"doi"},{"value":"2108.09043","schema":"arxiv"}]} \ No newline at end of file diff --git a/pom.xml b/pom.xml index e3e57eb4c..dc1dd7308 100644 --- a/pom.xml +++ b/pom.xml @@ -969,6 +969,25 @@ + + + arm-silicon-mac + + + aarch64 + mac + + + + + + org.xerial.snappy + snappy-java + 1.1.8.4 + + + + spark-34