diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/CoAuthorshipIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/CoAuthorshipIterator.java new file mode 100644 index 000000000..76e4c4851 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/CoAuthorshipIterator.java @@ -0,0 +1,80 @@ + +package eu.dnetlib.dhp.actionmanager.personentity; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Person; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import eu.dnetlib.dhp.utils.DHPUtils; + +public class CoAuthorshipIterator implements Iterator { + private int firstIndex; + private int secondIndex; + private boolean firstRelation; + private List authors; + private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______::"; + private static final String OPENAIRE_PREFIX = "openaire____"; + private static final String SEPARATOR = "::"; + private static final String ORCID_KEY = "10|" + OPENAIRE_PREFIX + SEPARATOR + + DHPUtils.md5(ModelConstants.ORCID.toLowerCase()); + public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid"; + public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID"; + + @Override + public boolean hasNext() { + return firstIndex < authors.size() - 1; + } + + @Override + public Relation next() { + Relation rel = null; + if (firstRelation) { + rel = getRelation(authors.get(firstIndex), authors.get(secondIndex)); + firstRelation = Boolean.FALSE; + } else { + rel = getRelation(authors.get(secondIndex), authors.get(firstIndex)); + firstRelation = Boolean.TRUE; + secondIndex += 1; + if (secondIndex >= authors.size()) { + firstIndex += 1; + secondIndex = firstIndex + 1; + } + } + + return rel; + } + + public CoAuthorshipIterator(List authors) { + this.authors = authors; + this.firstIndex = 0; + this.secondIndex = 1; + this.firstRelation = Boolean.TRUE; + + } + + private Relation getRelation(String orcid1, String orcid2) { + String source = PERSON_PREFIX + IdentifierFactory.md5(orcid1); + String target = PERSON_PREFIX + IdentifierFactory.md5(orcid2); + return OafMapperUtils + .getRelation( + source, target, ModelConstants.PERSON_PERSON_RELTYPE, + ModelConstants.PERSON_PERSON_SUBRELTYPE, + ModelConstants.PERSON_PERSON_HASCOAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(ORCID_KEY, ModelConstants.ORCID_DS)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.91"), + null); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java index f550178d7..17f46d5c7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java @@ -3,17 +3,18 @@ package eu.dnetlib.dhp.actionmanager.personentity; import java.io.Serializable; import java.util.ArrayList; +import java.util.List; import eu.dnetlib.dhp.schema.oaf.Relation; public class Coauthors implements Serializable { - private ArrayList coauthors; + private List coauthors; - public ArrayList getCoauthors() { + public List getCoauthors() { return coauthors; } - public void setCoauthors(ArrayList coauthors) { + public void setCoauthors(List coauthors) { this.coauthors = coauthors; } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java index b7d5f4367..064fb41a1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java @@ -26,6 +26,7 @@ import org.spark_project.jetty.util.StringUtil; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.actionmanager.Constants; +import eu.dnetlib.dhp.actionmanager.transformativeagreement.model.TransformativeAgreementModel; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.collection.orcid.model.Author; import eu.dnetlib.dhp.collection.orcid.model.Employment; @@ -202,7 +203,7 @@ public class ExtractPerson implements Serializable { .mode(SaveMode.Overwrite) .json(workingDir + "/authorship"); - works + Dataset coauthorship = works .flatMap((FlatMapFunction>) w -> { List> lista = new ArrayList<>(); w.getPids().stream().forEach(p -> { @@ -217,10 +218,13 @@ public class ExtractPerson implements Serializable { (MapGroupsFunction, Coauthors>) (k, it) -> extractCoAuthors(it), Encoders.bean(Coauthors.class)) .flatMap( - (FlatMapFunction) c -> c.getCoauthors().iterator(), Encoders.bean(Relation.class)) + (FlatMapFunction) c -> new CoAuthorshipIterator(c.getCoauthors()), + Encoders.bean(Relation.class)) .groupByKey((MapFunction) r -> r.getSource() + r.getTarget(), Encoders.STRING()) .mapGroups( - (MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Relation.class)) + (MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Relation.class)); + + coauthorship .write() .option("compression", "gzip") .mode(SaveMode.Overwrite) @@ -237,10 +241,16 @@ public class ExtractPerson implements Serializable { .mode(SaveMode.Overwrite) .json(workingDir + "/affiliation"); - spark + people = spark .read() - .json(workingDir + "/people") - .as(Encoders.bean(Person.class)) + .textFile(workingDir + "/people") + .map( + (MapFunction) value -> OBJECT_MAPPER + .readValue(value, Person.class), + Encoders.bean(Person.class)); + + people.show(false); + people .toJavaRDD() .map(p -> new AtomicAction(p.getClass(), p)) .union( @@ -261,20 +271,21 @@ public class ExtractPerson implements Serializable { } private static Dataset getRelations(SparkSession spark, String path) { - return spark.read().json(path).as(Encoders.bean(Relation.class)); + return spark + .read() + .textFile(path) + .map( + (MapFunction) value -> OBJECT_MAPPER + .readValue(value, Relation.class), + Encoders.bean(Relation.class));// spark.read().json(path).as(Encoders.bean(Relation.class)); } private static Coauthors extractCoAuthors(Iterator> it) { Coauthors coauth = new Coauthors(); - ArrayList ret = new ArrayList<>(); List coauthors = new ArrayList<>(); while (it.hasNext()) coauthors.add(it.next()._2()); - for (int i = 0; i < coauthors.size() - 1; i++) - for (int j = i + 1; j < coauthors.size(); j++) - ret.addAll(getCoAuthorshipRelations(coauthors.get(i), coauthors.get(j))); - - coauth.setCoauthors(ret); + coauth.setCoauthors(coauthors); return coauth; }