diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java index a755c875f..f550178d7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java @@ -1,19 +1,19 @@ -package eu.dnetlib.dhp.actionmanager.personentity; -import eu.dnetlib.dhp.schema.oaf.Relation; +package eu.dnetlib.dhp.actionmanager.personentity; import java.io.Serializable; import java.util.ArrayList; +import eu.dnetlib.dhp.schema.oaf.Relation; public class Coauthors implements Serializable { - private ArrayList coauthors; + private ArrayList coauthors; - public ArrayList getCoauthors() { - return coauthors; - } + public ArrayList getCoauthors() { + return coauthors; + } - public void setCoauthors(ArrayList coauthors) { - this.coauthors = coauthors; - } + public void setCoauthors(ArrayList coauthors) { + this.coauthors = coauthors; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Couples.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Couples.java index 65670b22e..d052b52b6 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Couples.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Couples.java @@ -1,40 +1,40 @@ + package eu.dnetlib.dhp.actionmanager.personentity; +import java.io.Serializable; + import eu.dnetlib.dhp.schema.oaf.Person; import eu.dnetlib.dhp.schema.oaf.Relation; import scala.Tuple2; - -import java.io.Serializable; - public class Couples implements Serializable { - Person p ; - Relation r; + Person p; + Relation r; - public Couples() { + public Couples() { - } + } - public Person getP() { - return p; - } + public Person getP() { + return p; + } - public void setP(Person p) { - this.p = p; - } + public void setP(Person p) { + this.p = p; + } - public Relation getR() { - return r; - } + public Relation getR() { + return r; + } - public void setR(Relation r) { - this.r = r; - } + public void setR(Relation r) { + this.r = r; + } - public static Couples newInstance(Tuple2 couple){ - Couples c = new Couples(); - c.p = couple._1(); - c.r = couple._2(); - return c; - } + public static Couples newInstance(Tuple2 couple) { + Couples c = new Couples(); + c.p = couple._1(); + c.r = couple._2(); + return c; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java index 0bbda0343..b7d5f4367 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java @@ -1,27 +1,18 @@ + package eu.dnetlib.dhp.actionmanager.personentity; -import com.fasterxml.jackson.databind.ObjectMapper; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static org.apache.spark.sql.functions.*; + +import java.io.IOException; +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; -import eu.dnetlib.dhp.actionmanager.Constants; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.collection.orcid.model.Author; -import eu.dnetlib.dhp.collection.orcid.model.Employment; -import eu.dnetlib.dhp.collection.orcid.model.Work; -import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Person; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; -import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; -import eu.dnetlib.dhp.schema.oaf.Pid; -import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner; -import eu.dnetlib.dhp.schema.oaf.utils.PidType; -import eu.dnetlib.dhp.utils.DHPUtils; import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; @@ -31,301 +22,399 @@ import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.spark_project.jetty.util.StringUtil; -import scala.Tuple2; -import static org.apache.spark.sql.functions.*; -import java.io.IOException; -import java.io.Serializable; -import java.util.*; -import java.util.stream.Collectors; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.Constants; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.collection.orcid.model.Author; +import eu.dnetlib.dhp.collection.orcid.model.Employment; +import eu.dnetlib.dhp.collection.orcid.model.Work; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Person; +import eu.dnetlib.dhp.schema.oaf.Pid; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; +import eu.dnetlib.dhp.utils.DHPUtils; +import scala.Tuple2; public class ExtractPerson implements Serializable { - private static final Logger log = LoggerFactory.getLogger(ExtractPerson.class); + private static final Logger log = LoggerFactory.getLogger(ExtractPerson.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String OPENAIRE_PREFIX = "openaire____"; - private static final String SEPARATOR = "::"; - private static final String orcidKey = "10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5(ModelConstants.ORCID.toLowerCase()); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String OPENAIRE_PREFIX = "openaire____"; + private static final String SEPARATOR = "::"; + private static final String orcidKey = "10|" + OPENAIRE_PREFIX + SEPARATOR + + DHPUtils.md5(ModelConstants.ORCID.toLowerCase()); - private static final String DOI_PREFIX = "50|doi_________::"; + private static final String DOI_PREFIX = "50|doi_________::"; - private static final String PMID_PREFIX = "50|pmid________::"; - private static final String ARXIV_PREFIX = "50|arXiv_______::"; + private static final String PMID_PREFIX = "50|pmid________::"; + private static final String ARXIV_PREFIX = "50|arXiv_______::"; - private static final String PMCID_PREFIX = "50|pmcid_______::"; - private static final String ROR_PREFIX = "20|ror_________::"; - private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______"; - public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid"; - public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID"; + private static final String PMCID_PREFIX = "50|pmcid_______::"; + private static final String ROR_PREFIX = "20|ror_________::"; + private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______"; + public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid"; + public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID"; + public static void main(final String[] args) throws IOException, ParseException { - public static void main(final String[] args) throws IOException, ParseException { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + ExtractPerson.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json")))); - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - Objects - .requireNonNull( - ExtractPerson.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json")))); + parser.parseArgument(args); - parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}", inputPath); - final String inputPath = parser.get("inputPath"); - log.info("inputPath {}", inputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}", outputPath); - final String outputPath = parser.get("outputPath"); - log.info("outputPath {}", outputPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir {}", workingDir); - final String workingDir = parser.get("workingDir"); - log.info("workingDir {}", workingDir); + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + createActionSet(spark, inputPath, outputPath, workingDir); + }); - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> createActionSet(spark, inputPath, outputPath, workingDir)); + } - } + private static void createActionSet(SparkSession spark, String inputPath, String outputPath, String workingDir) { - private static void createActionSet(SparkSession spark, String inputPath, String outputPath, String workingDir) { + Dataset authors = spark + .read() + .parquet(inputPath + "Authors") + .as(Encoders.bean(Author.class)); - Dataset authors = spark - .read() - .parquet(inputPath + "Authors").as(Encoders.bean(Author.class)); + Dataset works = spark + .read() + .parquet(inputPath + "Works") + .as(Encoders.bean(Work.class)) + .filter( + (FilterFunction) w -> Optional.ofNullable(w.getPids()).isPresent() && + w + .getPids() + .stream() + .anyMatch( + p -> p.getSchema().equalsIgnoreCase("doi") || + p.getSchema().equalsIgnoreCase("pmc") || + p.getSchema().equalsIgnoreCase("pmid") || + p.getSchema().equalsIgnoreCase("arxiv"))); - Dataset works = spark - .read() - .parquet(inputPath + "Works") - .as(Encoders.bean(Work.class)) - .filter((FilterFunction) w -> Optional.ofNullable(w.getPids()).isPresent() && - w.getPids().stream().anyMatch(p->p.getSchema().equalsIgnoreCase("doi") || - p.getSchema().equalsIgnoreCase("pmc") || - p.getSchema().equalsIgnoreCase("pmid") || - p.getSchema().equalsIgnoreCase("arxiv"))); + Dataset employmentDataset = spark + .read() + .parquet(inputPath + "Employments") + .as(Encoders.bean(Employment.class)); - Dataset employmentDataset = spark - .read() - .parquet(inputPath + "Employments") - .as(Encoders.bean(Employment.class)); + Dataset peopleToMap = authors + .joinWith(works, authors.col("orcid").equalTo(works.col("orcid"))) + .map((MapFunction, Author>) t2 -> t2._1(), Encoders.bean(Author.class)) + .groupByKey((MapFunction) a -> a.getOrcid(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Author.class)); + Dataset employment = employmentDataset + .joinWith(peopleToMap, employmentDataset.col("orcid").equalTo(peopleToMap.col("orcid"))) + .map((MapFunction, Employment>) t2 -> t2._1(), Encoders.bean(Employment.class)); - Dataset peopleToMap = authors.joinWith(works, authors.col("orcid").equalTo(works.col("orcid"))) - .map((MapFunction, Author>) t2 -> t2._1(), Encoders.bean(Author.class)) - .groupByKey((MapFunction) a -> a.getOrcid(), Encoders.STRING()) - .mapGroups((MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Author.class)); + Dataset people; + peopleToMap.map((MapFunction) op -> { + Person person = new Person(); + person.setId(DHPUtils.generateIdentifier(op.getOrcid(), PERSON_PREFIX)); + person + .setBiography( + Optional + .ofNullable(op.getBiography()) + .orElse("")); + KeyValue kv = OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS); + kv.setDataInfo(null); + person.setCollectedfrom(Arrays.asList(kv)); + person + .setAlternativeNames( + Optional + .ofNullable(op.getOtherNames()) - Dataset employment = employmentDataset.joinWith(peopleToMap, employmentDataset.col("orcid").equalTo(peopleToMap.col("orcid"))) - .map((MapFunction, Employment>) t2 -> t2._1(), Encoders.bean(Employment.class)); + .orElse(new ArrayList<>())); + person + .setFamilyName( + Optional + .ofNullable(op.getFamilyName()) - peopleToMap.show(false); + .orElse("")); + person + .setGivenName( + Optional + .ofNullable(op.getGivenName()) - Dataset people; - people = peopleToMap.map((MapFunction) op -> { - Person person = new Person(); - person.setId(DHPUtils.generateIdentifier(op.getOrcid(), PERSON_PREFIX)); - person.setBiography(Optional.ofNullable(op.getBiography()) + .orElse("")); + person + .setPid( + Optional + .ofNullable(op.getOtherPids()) + .map( + v -> v + .stream() + .map(p -> Pid.newInstance(p.getSchema(), p.getValue())) + .collect(Collectors.toList())) + .orElse(new ArrayList<>())); + person.getPid().add(Pid.newInstance(ModelConstants.ORCID, op.getOrcid())); + person.setDateofcollection(op.getLastModifiedDate()); + person.setOriginalId(Arrays.asList(op.getOrcid())); + return person; + }, Encoders.bean(Person.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(workingDir + "/people"); - .orElse("")); - KeyValue kv = OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS); - kv.setDataInfo(null); - person.setCollectedfrom(Arrays.asList(kv)); - person.setAlternativeNames(Optional.ofNullable(op.getOtherNames()) + works + .flatMap( + (FlatMapFunction) ExtractPerson::getAuthorshipRelationIterator, + Encoders.bean(Relation.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(workingDir + "/authorship"); - .orElse(new ArrayList<>())); - person.setFamilyName(Optional.ofNullable(op.getFamilyName()) + works + .flatMap((FlatMapFunction>) w -> { + List> lista = new ArrayList<>(); + w.getPids().stream().forEach(p -> { + if (p.getSchema().equalsIgnoreCase("doi") || p.getSchema().equalsIgnoreCase("pmc") + || p.getSchema().equalsIgnoreCase("pmid") || p.getSchema().equalsIgnoreCase("arxiv")) + lista.add(new Tuple2<>(p.getValue(), w.getOrcid())); + }); + return lista.iterator(); + }, Encoders.tuple(Encoders.STRING(), Encoders.STRING())) + .groupByKey((MapFunction, String>) Tuple2::_1, Encoders.STRING()) + .mapGroups( + (MapGroupsFunction, Coauthors>) (k, it) -> extractCoAuthors(it), + Encoders.bean(Coauthors.class)) + .flatMap( + (FlatMapFunction) c -> c.getCoauthors().iterator(), Encoders.bean(Relation.class)) + .groupByKey((MapFunction) r -> r.getSource() + r.getTarget(), Encoders.STRING()) + .mapGroups( + (MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Relation.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(workingDir + "/coauthorship"); - .orElse("")); - person.setGivenName(Optional.ofNullable(op.getGivenName()) + employment + .filter((FilterFunction) e -> Optional.ofNullable(e.getAffiliationId()).isPresent()) + .filter((FilterFunction) e -> e.getAffiliationId().getSchema().equalsIgnoreCase("ror")) + .map( + (MapFunction) ExtractPerson::getAffiliationRelation, + Encoders.bean(Relation.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(workingDir + "/affiliation"); - .orElse("")); - person.setPid(Optional.ofNullable(op.getOtherPids()) - .map(v -> v.stream().map(p -> Pid.newInstance(p.getSchema(), p.getValue())).collect(Collectors.toList())) - .orElse(new ArrayList<>()) - ); - person.getPid().add(Pid.newInstance(ModelConstants.ORCID, op.getOrcid())); - person.setDateofcollection(op.getLastModifiedDate()); - person.setOriginalId(Arrays.asList(op.getOrcid())); - return person; - }, Encoders.bean(Person.class)); + spark + .read() + .json(workingDir + "/people") + .as(Encoders.bean(Person.class)) + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .union( + getRelations(spark, workingDir + "/authorship").toJavaRDD().map(r -> new AtomicAction(r.getClass(), r))) + .union( + getRelations(spark, workingDir + "/coauthorship") + .toJavaRDD() + .map(r -> new AtomicAction(r.getClass(), r))) + .union( + getRelations(spark, workingDir + "/affiliation") + .toJavaRDD() + .map(r -> new AtomicAction(r.getClass(), r))) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile( + outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); + } + private static Dataset getRelations(SparkSession spark, String path) { + return spark.read().json(path).as(Encoders.bean(Relation.class)); + } - people.show(false); + private static Coauthors extractCoAuthors(Iterator> it) { + Coauthors coauth = new Coauthors(); + ArrayList ret = new ArrayList<>(); + List coauthors = new ArrayList<>(); + while (it.hasNext()) + coauthors.add(it.next()._2()); + for (int i = 0; i < coauthors.size() - 1; i++) + for (int j = i + 1; j < coauthors.size(); j++) + ret.addAll(getCoAuthorshipRelations(coauthors.get(i), coauthors.get(j))); + coauth.setCoauthors(ret); - Dataset authorship; - authorship = works - .flatMap((FlatMapFunction) ExtractPerson::getAuthorshipRelationIterator - , Encoders.bean(Relation.class)); + return coauth; + } + private static Relation getAffiliationRelation(Employment row) { + String source = PERSON_PREFIX + IdentifierFactory.md5(row.getOrcid()); + String target = ROR_PREFIX + + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAffiliationId().getValue())); + List properties = new ArrayList<>(); - authorship.show(false); + Relation relation = OafMapperUtils + .getRelation( + source, target, ModelConstants.ORG_PERSON_RELTYPE, ModelConstants.ORG_PERSON_SUBRELTYPE, + ModelConstants.ORG_PERSON_PARTICIPATES, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.91"), + null); + if (Optional.ofNullable(row.getStartDate()).isPresent() && StringUtil.isNotBlank(row.getStartDate())) { + KeyValue kv = new KeyValue(); + kv.setKey("startDate"); + kv.setValue(row.getStartDate()); + properties.add(kv); + } + if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtil.isNotBlank(row.getEndDate())) { + KeyValue kv = new KeyValue(); + kv.setKey("endDate"); + kv.setValue(row.getEndDate()); + properties.add(kv); + } - Dataset coauthorship = works - .flatMap((FlatMapFunction>) w -> { - List> lista = new ArrayList<>(); - w.getPids().stream().forEach(p -> { - if (p.getSchema().equalsIgnoreCase("doi") || p.getSchema().equalsIgnoreCase("pmc") || p.getSchema().equalsIgnoreCase("pmid") || p.getSchema().equalsIgnoreCase("arxiv")) - lista.add(new Tuple2<>(p.getValue(), w.getOrcid())); - }); - return lista.iterator(); - }, Encoders.tuple(Encoders.STRING(), Encoders.STRING())) - .groupByKey((MapFunction, String>) Tuple2::_1, Encoders.STRING()) - .mapGroups((MapGroupsFunction, Coauthors>) (k, it) -> - extractCoAuthors(it), Encoders.bean(Coauthors.class)) - .flatMap((FlatMapFunction) c -> c.getCoauthors().iterator(), Encoders.bean(Relation.class)) - .groupByKey((MapFunction) r -> r.getSource() + r.getTarget(), Encoders.STRING()) - .mapGroups((MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Relation.class)); + if (properties.size() > 0) + relation.setProperties(properties); + return relation; - coauthorship.show(false); - Dataset affiliation = employment - .filter((FilterFunction) e -> Optional.ofNullable(e.getAffiliationId()).isPresent()) - .filter((FilterFunction) e -> e.getAffiliationId().getSchema().equalsIgnoreCase("ror")) - .map((MapFunction) ExtractPerson::getAffiliationRelation - , Encoders.bean(Relation.class)); + } - affiliation.show(false); + private static Collection getCoAuthorshipRelations(String orcid1, String orcid2) { + String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid1); + String target = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid2); - people.toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p)) - .union(authorship.toJavaRDD().map(r-> new AtomicAction(r.getClass(),r))) - .union(coauthorship.toJavaRDD().map(r-> new AtomicAction(r.getClass(),r))) - .union(affiliation.toJavaRDD().map(r->new AtomicAction(r.getClass(),r))) - .mapToPair( - aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile( - outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);//, GzipCodec.class); - } + return Arrays + .asList( + OafMapperUtils + .getRelation( + source, target, ModelConstants.PERSON_PERSON_RELTYPE, + ModelConstants.PERSON_PERSON_SUBRELTYPE, + ModelConstants.PERSON_PERSON_HASCOAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.91"), + null), + OafMapperUtils + .getRelation( + target, source, ModelConstants.PERSON_PERSON_RELTYPE, + ModelConstants.PERSON_PERSON_SUBRELTYPE, + ModelConstants.PERSON_PERSON_HASCOAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.91"), + null)); - private static Coauthors extractCoAuthors(Iterator> it) { - Coauthors coauth = new Coauthors(); - ArrayList ret = new ArrayList<>(); - List coauthors = new ArrayList<>(); - while(it.hasNext()) - coauthors.add(it.next()._2()); - for (int i = 0; i < coauthors.size() -1; i++ ) - for(int j = i + 1; j < coauthors.size(); j++) - ret.addAll(getCoAuthorshipRelations(coauthors.get(i), coauthors.get(j))); + } - coauth.setCoauthors(ret); + private static @NotNull Iterator getAuthorshipRelationIterator(Work w) { - return coauth; - } + if (Optional.ofNullable(w.getPids()).isPresent()) + return w + .getPids() + .stream() + .map(pid -> getRelation(w.getOrcid(), pid)) + .filter(Objects::nonNull) + .collect(Collectors.toList()) + .iterator(); + List ret = new ArrayList<>(); + return ret.iterator(); + } - private static Relation getAffiliationRelation(Employment row) { - String source = PERSON_PREFIX + IdentifierFactory.md5(row.getOrcid()); - String target = ROR_PREFIX - + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAffiliationId().getValue())); - List properties = new ArrayList<>() ; + private static Relation getRelation(String orcid, eu.dnetlib.dhp.collection.orcid.model.Pid pid) { + String target; + String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid); + switch (pid.getSchema()) { + case "doi": + target = DOI_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), pid.getValue())); + break; + case "pmid": + target = PMID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), pid.getValue())); + break; + case "arxiv": + target = ARXIV_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), pid.getValue())); + break; + case "pmcid": + target = PMCID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), pid.getValue())); + break; - Relation relation = - OafMapperUtils.getRelation(source, target, ModelConstants.ORG_PERSON_RELTYPE, ModelConstants.ORG_PERSON_SUBRELTYPE, ModelConstants.ORG_PERSON_PARTICIPATES , - Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), - OafMapperUtils.dataInfo(false, null, false, false, - OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"), - null); + default: + return null; + } - if(Optional.ofNullable(row.getStartDate()).isPresent() && StringUtil.isNotBlank(row.getStartDate())){ - KeyValue kv = new KeyValue(); - kv.setKey("startDate"); - kv.setValue(row.getStartDate()); - properties.add(kv); - } - if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtil.isNotBlank(row.getEndDate())) { - KeyValue kv = new KeyValue(); - kv.setKey("endDate"); - kv.setValue(row.getEndDate()); - properties.add(kv); - } - - if (properties.size() > 0) - relation.setProperties(properties); - return relation; - - - } - - private static Collection getCoAuthorshipRelations(String orcid1, String orcid2) { - String source = PERSON_PREFIX + IdentifierFactory.md5(orcid1); - String target = PERSON_PREFIX + IdentifierFactory.md5(orcid2); - - return Arrays.asList(OafMapperUtils.getRelation(source, target,ModelConstants.PERSON_PERSON_RELTYPE, - ModelConstants.PERSON_PERSON_SUBRELTYPE, - ModelConstants.PERSON_PERSON_HASCOAUTHORED, - Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), - OafMapperUtils.dataInfo(false, null, false, false, - OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"), - null), - OafMapperUtils.getRelation(target, source,ModelConstants.PERSON_PERSON_RELTYPE, - ModelConstants.PERSON_PERSON_SUBRELTYPE, - ModelConstants.PERSON_PERSON_HASCOAUTHORED, - Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), - OafMapperUtils.dataInfo(false, null, false, false, - OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"), - null)); - - } - - private static @NotNull Iterator getAuthorshipRelationIterator(Work w) { - - if(Optional.ofNullable(w.getPids()).isPresent()) - return w.getPids() - .stream() - .map(pid -> getRelation(w.getOrcid(), pid)) - .filter(Objects::nonNull).collect(Collectors.toList()).iterator(); - List ret = new ArrayList<>(); - return ret.iterator(); - } - - - private static Relation getRelation(String orcid, eu.dnetlib.dhp.collection.orcid.model.Pid pid){ - String target ; - String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid); - switch (pid.getSchema()){ - case "doi": - target = DOI_PREFIX - + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), pid.getValue())); - break; - case "pmid": - target = PMID_PREFIX - + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), pid.getValue())); - break; - case "arxiv": - target = ARXIV_PREFIX - + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), pid.getValue())); - break; - case "pmcid": - target = PMCID_PREFIX - + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), pid.getValue())); - break; - - default: - return null; - } - - - return OafMapperUtils.getRelation(source, target,ModelConstants.RESULT_PERSON_RELTYPE, - ModelConstants.RESULT_PERSON_SUBRELTYPE, - ModelConstants.RESULT_PERSON_HASAUTHORED, - Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), - OafMapperUtils.dataInfo(false, null, false, false, - OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"), - null); - } + return OafMapperUtils + .getRelation( + source, target, ModelConstants.RESULT_PERSON_RELTYPE, + ModelConstants.RESULT_PERSON_SUBRELTYPE, + ModelConstants.RESULT_PERSON_HASAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.91"), + null); + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/WorkList.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/WorkList.java index 71d2b491a..92842bfcf 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/WorkList.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/WorkList.java @@ -1,23 +1,25 @@ -package eu.dnetlib.dhp.actionmanager.personentity; -import eu.dnetlib.dhp.collection.orcid.model.Work; -import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; +package eu.dnetlib.dhp.actionmanager.personentity; import java.io.Serializable; import java.util.ArrayList; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; + +import eu.dnetlib.dhp.collection.orcid.model.Work; + public class WorkList implements Serializable { - private ArrayList workArrayList; + private ArrayList workArrayList; - public ArrayList getWorkArrayList() { - return workArrayList; - } + public ArrayList getWorkArrayList() { + return workArrayList; + } - public void setWorkArrayList(ArrayList workArrayList) { - this.workArrayList = workArrayList; - } + public void setWorkArrayList(ArrayList workArrayList) { + this.workArrayList = workArrayList; + } - public WorkList() { - workArrayList = new ArrayList<>(); - } + public WorkList() { + workArrayList = new ArrayList<>(); + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json index 3310f16e4..5175552e7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json @@ -16,11 +16,10 @@ "paramLongName": "isSparkSessionManaged", "paramDescription": "the hdfs name node", "paramRequired": false - }, - { - "paramName": "wd", - "paramLongName": "workingDir", - "paramDescription": "the hdfs name node", - "paramRequired": false - } + }, { + "paramName": "wd", + "paramLongName": "workingDir", + "paramDescription": "the hdfs name node", + "paramRequired": false +} ] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/job.properties b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/job.properties new file mode 100644 index 000000000..d2269718c --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/job.properties @@ -0,0 +1,2 @@ +inputPath=/data/orcid_2023/tables/ +outputPath=/user/miriam.baglioni/peopleAS \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/config-default.xml new file mode 100644 index 000000000..d262cb6e0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/config-default.xml @@ -0,0 +1,30 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + hiveDbName + openaire + + + oozie.launcher.mapreduce.user.classpath.first + true + + diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/workflow.xml new file mode 100644 index 000000000..166e7bb9c --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/workflow.xml @@ -0,0 +1,111 @@ + + + + + inputPath + inputPath + + + outputPath + the path where to store the actionset + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + + + yarn + cluster + Produces the ActionSet for Person entity and relevant relations + eu.dnetlib.dhp.actionmanager.personentity.ExtractPerson + dhp-aggregation-${projectVersion}.jar + + --executor-cores=4 + --executor-memory=4G + --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=5G + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + --conf spark.sql.shuffle.partitions=15000 + + --inputPath${inputPath} + --outputPath${outputPath} + --workingDir${workingDir} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/person/CreatePersonAS.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/person/CreatePersonAS.java index a4d56ba2e..2e7b21010 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/person/CreatePersonAS.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/person/CreatePersonAS.java @@ -1,15 +1,13 @@ package eu.dnetlib.dhp.actionmanager.person; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob; -import eu.dnetlib.dhp.actionmanager.personentity.ExtractPerson; -import eu.dnetlib.dhp.collection.orcid.model.Author; -import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; + import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -27,12 +25,18 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Optional; +import com.fasterxml.jackson.databind.ObjectMapper; -import static org.junit.jupiter.api.Assertions.assertEquals; +import eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob; +import eu.dnetlib.dhp.actionmanager.personentity.ExtractPerson; +import eu.dnetlib.dhp.collection.orcid.model.Author; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Person; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.utils.DHPUtils; public class CreatePersonAS { @@ -57,7 +61,7 @@ public class CreatePersonAS { conf.set("spark.driver.host", "localhost"); conf.set("hive.metastore.local", "true"); conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.codegen.wholeStage","false"); + conf.set("spark.sql.codegen.wholeStage", "false"); conf.set("spark.sql.warehouse.dir", workingDir.toString()); conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); @@ -92,7 +96,6 @@ public class CreatePersonAS { // .mode(SaveMode.Overwrite) // .parquet(workingDir.toString() + "AuthorsSubset"); - ExtractPerson .main( new String[] { @@ -102,13 +105,120 @@ public class CreatePersonAS { inputPath, "-outputPath", workingDir.toString() + "/actionSet1", - "-workingDir", - workingDir.toString() + "/working" + "-workingDir", + workingDir.toString() + "/working" }); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + JavaRDD relations = sc + .sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class) + .filter(v -> "eu.dnetlib.dhp.schema.oaf.Relation".equalsIgnoreCase(v._1().toString())) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); +// + JavaRDD people = sc + .sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class) + .filter(v -> "eu.dnetlib.dhp.schema.oaf.Person".equalsIgnoreCase(v._1().toString())) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Person) aa.getPayload())); +// + Assertions.assertEquals(7, people.count()); + Assertions + .assertEquals( + "Paulo", + people + .filter( + p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034"))) + .first() + .getGivenName()); + Assertions + .assertEquals( + "Tavares", + people + .filter( + p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034"))) + .first() + .getFamilyName()); + Assertions + .assertEquals( + 4, + people + .filter( + p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034"))) + .first() + .getAlternativeNames() + .size()); + Assertions + .assertEquals( + 4, + people + .filter( + p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034"))) + .first() + .getPid() + .size()); + Assertions + .assertTrue( + people + .filter( + p -> p.getPid().stream().anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0002-3210-3034"))) + .first() + .getPid() + .stream() + .anyMatch( + p -> p.getSchema().equalsIgnoreCase("Scopus Author ID") + && p.getValue().equalsIgnoreCase("15119405200"))); + + Assertions + .assertEquals( + 16, + relations + .filter(r -> r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED)) + .count()); + Assertions + .assertEquals( + 14, + relations + .filter(r -> r.getRelClass().equalsIgnoreCase(ModelConstants.PERSON_PERSON_HASCOAUTHORED)) + .count()); + Assertions + .assertEquals( + 3, + relations + .filter( + r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619")) + && r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED)) + .count()); + Assertions + .assertEquals( + 2, + relations + .filter( + r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619")) + && r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED) + && r.getTarget().startsWith("50|doi")) + .count()); + Assertions + .assertEquals( + 1, + relations + .filter( + r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619")) + && r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED) + && r.getTarget().startsWith("50|arXiv")) + .count()); + + Assertions + .assertEquals( + 1, + relations + .filter( + r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619")) + && r.getRelClass().equalsIgnoreCase(ModelConstants.PERSON_PERSON_HASCOAUTHORED)) + .count()); + Assertions.assertEquals(33, relations.count()); } - - } +}