forked from D-Net/dnet-hadoop
[Person]new implementation for the extraction of the coAuthorship relations
This commit is contained in:
parent
ddd20e7f8e
commit
c465835061
|
@ -0,0 +1,80 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Person;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||||
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
|
|
||||||
|
public class CoAuthorshipIterator implements Iterator<Relation> {
|
||||||
|
private int firstIndex;
|
||||||
|
private int secondIndex;
|
||||||
|
private boolean firstRelation;
|
||||||
|
private List<String> authors;
|
||||||
|
private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______::";
|
||||||
|
private static final String OPENAIRE_PREFIX = "openaire____";
|
||||||
|
private static final String SEPARATOR = "::";
|
||||||
|
private static final String ORCID_KEY = "10|" + OPENAIRE_PREFIX + SEPARATOR
|
||||||
|
+ DHPUtils.md5(ModelConstants.ORCID.toLowerCase());
|
||||||
|
public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid";
|
||||||
|
public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return firstIndex < authors.size() - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Relation next() {
|
||||||
|
Relation rel = null;
|
||||||
|
if (firstRelation) {
|
||||||
|
rel = getRelation(authors.get(firstIndex), authors.get(secondIndex));
|
||||||
|
firstRelation = Boolean.FALSE;
|
||||||
|
} else {
|
||||||
|
rel = getRelation(authors.get(secondIndex), authors.get(firstIndex));
|
||||||
|
firstRelation = Boolean.TRUE;
|
||||||
|
secondIndex += 1;
|
||||||
|
if (secondIndex >= authors.size()) {
|
||||||
|
firstIndex += 1;
|
||||||
|
secondIndex = firstIndex + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rel;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CoAuthorshipIterator(List<String> authors) {
|
||||||
|
this.authors = authors;
|
||||||
|
this.firstIndex = 0;
|
||||||
|
this.secondIndex = 1;
|
||||||
|
this.firstRelation = Boolean.TRUE;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private Relation getRelation(String orcid1, String orcid2) {
|
||||||
|
String source = PERSON_PREFIX + IdentifierFactory.md5(orcid1);
|
||||||
|
String target = PERSON_PREFIX + IdentifierFactory.md5(orcid2);
|
||||||
|
return OafMapperUtils
|
||||||
|
.getRelation(
|
||||||
|
source, target, ModelConstants.PERSON_PERSON_RELTYPE,
|
||||||
|
ModelConstants.PERSON_PERSON_SUBRELTYPE,
|
||||||
|
ModelConstants.PERSON_PERSON_HASCOAUTHORED,
|
||||||
|
Arrays.asList(OafMapperUtils.keyValue(ORCID_KEY, ModelConstants.ORCID_DS)),
|
||||||
|
OafMapperUtils
|
||||||
|
.dataInfo(
|
||||||
|
false, null, false, false,
|
||||||
|
OafMapperUtils
|
||||||
|
.qualifier(
|
||||||
|
ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME,
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||||
|
"0.91"),
|
||||||
|
null);
|
||||||
|
}
|
||||||
|
}
|
|
@ -3,17 +3,18 @@ package eu.dnetlib.dhp.actionmanager.personentity;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
|
||||||
public class Coauthors implements Serializable {
|
public class Coauthors implements Serializable {
|
||||||
private ArrayList<Relation> coauthors;
|
private List<String> coauthors;
|
||||||
|
|
||||||
public ArrayList<Relation> getCoauthors() {
|
public List<String> getCoauthors() {
|
||||||
return coauthors;
|
return coauthors;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setCoauthors(ArrayList<Relation> coauthors) {
|
public void setCoauthors(List<String> coauthors) {
|
||||||
this.coauthors = coauthors;
|
this.coauthors = coauthors;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.spark_project.jetty.util.StringUtil;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.actionmanager.Constants;
|
import eu.dnetlib.dhp.actionmanager.Constants;
|
||||||
|
import eu.dnetlib.dhp.actionmanager.transformativeagreement.model.TransformativeAgreementModel;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.collection.orcid.model.Author;
|
import eu.dnetlib.dhp.collection.orcid.model.Author;
|
||||||
import eu.dnetlib.dhp.collection.orcid.model.Employment;
|
import eu.dnetlib.dhp.collection.orcid.model.Employment;
|
||||||
|
@ -202,7 +203,7 @@ public class ExtractPerson implements Serializable {
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.json(workingDir + "/authorship");
|
.json(workingDir + "/authorship");
|
||||||
|
|
||||||
works
|
Dataset<Relation> coauthorship = works
|
||||||
.flatMap((FlatMapFunction<Work, Tuple2<String, String>>) w -> {
|
.flatMap((FlatMapFunction<Work, Tuple2<String, String>>) w -> {
|
||||||
List<Tuple2<String, String>> lista = new ArrayList<>();
|
List<Tuple2<String, String>> lista = new ArrayList<>();
|
||||||
w.getPids().stream().forEach(p -> {
|
w.getPids().stream().forEach(p -> {
|
||||||
|
@ -217,10 +218,13 @@ public class ExtractPerson implements Serializable {
|
||||||
(MapGroupsFunction<String, Tuple2<String, String>, Coauthors>) (k, it) -> extractCoAuthors(it),
|
(MapGroupsFunction<String, Tuple2<String, String>, Coauthors>) (k, it) -> extractCoAuthors(it),
|
||||||
Encoders.bean(Coauthors.class))
|
Encoders.bean(Coauthors.class))
|
||||||
.flatMap(
|
.flatMap(
|
||||||
(FlatMapFunction<Coauthors, Relation>) c -> c.getCoauthors().iterator(), Encoders.bean(Relation.class))
|
(FlatMapFunction<Coauthors, Relation>) c -> new CoAuthorshipIterator(c.getCoauthors()),
|
||||||
|
Encoders.bean(Relation.class))
|
||||||
.groupByKey((MapFunction<Relation, String>) r -> r.getSource() + r.getTarget(), Encoders.STRING())
|
.groupByKey((MapFunction<Relation, String>) r -> r.getSource() + r.getTarget(), Encoders.STRING())
|
||||||
.mapGroups(
|
.mapGroups(
|
||||||
(MapGroupsFunction<String, Relation, Relation>) (k, it) -> it.next(), Encoders.bean(Relation.class))
|
(MapGroupsFunction<String, Relation, Relation>) (k, it) -> it.next(), Encoders.bean(Relation.class));
|
||||||
|
|
||||||
|
coauthorship
|
||||||
.write()
|
.write()
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
|
@ -237,10 +241,16 @@ public class ExtractPerson implements Serializable {
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.json(workingDir + "/affiliation");
|
.json(workingDir + "/affiliation");
|
||||||
|
|
||||||
spark
|
people = spark
|
||||||
.read()
|
.read()
|
||||||
.json(workingDir + "/people")
|
.textFile(workingDir + "/people")
|
||||||
.as(Encoders.bean(Person.class))
|
.map(
|
||||||
|
(MapFunction<String, Person>) value -> OBJECT_MAPPER
|
||||||
|
.readValue(value, Person.class),
|
||||||
|
Encoders.bean(Person.class));
|
||||||
|
|
||||||
|
people.show(false);
|
||||||
|
people
|
||||||
.toJavaRDD()
|
.toJavaRDD()
|
||||||
.map(p -> new AtomicAction(p.getClass(), p))
|
.map(p -> new AtomicAction(p.getClass(), p))
|
||||||
.union(
|
.union(
|
||||||
|
@ -261,20 +271,21 @@ public class ExtractPerson implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Dataset<Relation> getRelations(SparkSession spark, String path) {
|
private static Dataset<Relation> getRelations(SparkSession spark, String path) {
|
||||||
return spark.read().json(path).as(Encoders.bean(Relation.class));
|
return spark
|
||||||
|
.read()
|
||||||
|
.textFile(path)
|
||||||
|
.map(
|
||||||
|
(MapFunction<String, Relation>) value -> OBJECT_MAPPER
|
||||||
|
.readValue(value, Relation.class),
|
||||||
|
Encoders.bean(Relation.class));// spark.read().json(path).as(Encoders.bean(Relation.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Coauthors extractCoAuthors(Iterator<Tuple2<String, String>> it) {
|
private static Coauthors extractCoAuthors(Iterator<Tuple2<String, String>> it) {
|
||||||
Coauthors coauth = new Coauthors();
|
Coauthors coauth = new Coauthors();
|
||||||
ArrayList<Relation> ret = new ArrayList<>();
|
|
||||||
List<String> coauthors = new ArrayList<>();
|
List<String> coauthors = new ArrayList<>();
|
||||||
while (it.hasNext())
|
while (it.hasNext())
|
||||||
coauthors.add(it.next()._2());
|
coauthors.add(it.next()._2());
|
||||||
for (int i = 0; i < coauthors.size() - 1; i++)
|
coauth.setCoauthors(coauthors);
|
||||||
for (int j = i + 1; j < coauthors.size(); j++)
|
|
||||||
ret.addAll(getCoAuthorshipRelations(coauthors.get(i), coauthors.get(j)));
|
|
||||||
|
|
||||||
coauth.setCoauthors(ret);
|
|
||||||
|
|
||||||
return coauth;
|
return coauth;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue