diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java new file mode 100644 index 000000000..a755c875f --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java @@ -0,0 +1,19 @@ +package eu.dnetlib.dhp.actionmanager.personentity; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +import java.io.Serializable; +import java.util.ArrayList; + + +public class Coauthors implements Serializable { + private ArrayList coauthors; + + public ArrayList getCoauthors() { + return coauthors; + } + + public void setCoauthors(ArrayList coauthors) { + this.coauthors = coauthors; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Couples.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Couples.java new file mode 100644 index 000000000..65670b22e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Couples.java @@ -0,0 +1,40 @@ +package eu.dnetlib.dhp.actionmanager.personentity; + +import eu.dnetlib.dhp.schema.oaf.Person; +import eu.dnetlib.dhp.schema.oaf.Relation; +import scala.Tuple2; + + +import java.io.Serializable; + +public class Couples implements Serializable { + Person p ; + Relation r; + + public Couples() { + + } + + public Person getP() { + return p; + } + + public void setP(Person p) { + this.p = p; + } + + public Relation getR() { + return r; + } + + public void setR(Relation r) { + this.r = r; + } + + public static Couples newInstance(Tuple2 couple){ + Couples c = new Couples(); + c.p = couple._1(); + c.r = couple._2(); + return c; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java new file mode 100644 index 000000000..0bbda0343 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java @@ -0,0 +1,331 @@ +package eu.dnetlib.dhp.actionmanager.personentity; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.Constants; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.collection.orcid.model.Author; +import eu.dnetlib.dhp.collection.orcid.model.Employment; +import eu.dnetlib.dhp.collection.orcid.model.Work; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Person; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import eu.dnetlib.dhp.schema.oaf.Pid; +import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; +import eu.dnetlib.dhp.utils.DHPUtils; +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.*; +import org.apache.spark.sql.*; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.spark_project.jetty.util.StringUtil; +import scala.Tuple2; +import static org.apache.spark.sql.functions.*; +import java.io.IOException; +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class ExtractPerson implements Serializable { + private static final Logger log = LoggerFactory.getLogger(ExtractPerson.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String OPENAIRE_PREFIX = "openaire____"; + private static final String SEPARATOR = "::"; + private static final String orcidKey = "10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5(ModelConstants.ORCID.toLowerCase()); + + private static final String DOI_PREFIX = "50|doi_________::"; + + private static final String PMID_PREFIX = "50|pmid________::"; + private static final String ARXIV_PREFIX = "50|arXiv_______::"; + + private static final String PMCID_PREFIX = "50|pmcid_______::"; + private static final String ROR_PREFIX = "20|ror_________::"; + private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______"; + public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid"; + public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID"; + + + public static void main(final String[] args) throws IOException, ParseException { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + ExtractPerson.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json")))); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}", outputPath); + + final String workingDir = parser.get("workingDir"); + log.info("workingDir {}", workingDir); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> createActionSet(spark, inputPath, outputPath, workingDir)); + + } + + private static void createActionSet(SparkSession spark, String inputPath, String outputPath, String workingDir) { + + Dataset authors = spark + .read() + .parquet(inputPath + "Authors").as(Encoders.bean(Author.class)); + + Dataset works = spark + .read() + .parquet(inputPath + "Works") + .as(Encoders.bean(Work.class)) + .filter((FilterFunction) w -> Optional.ofNullable(w.getPids()).isPresent() && + w.getPids().stream().anyMatch(p->p.getSchema().equalsIgnoreCase("doi") || + p.getSchema().equalsIgnoreCase("pmc") || + p.getSchema().equalsIgnoreCase("pmid") || + p.getSchema().equalsIgnoreCase("arxiv"))); + + Dataset employmentDataset = spark + .read() + .parquet(inputPath + "Employments") + .as(Encoders.bean(Employment.class)); + + + Dataset peopleToMap = authors.joinWith(works, authors.col("orcid").equalTo(works.col("orcid"))) + .map((MapFunction, Author>) t2 -> t2._1(), Encoders.bean(Author.class)) + .groupByKey((MapFunction) a -> a.getOrcid(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Author.class)); + + + Dataset employment = employmentDataset.joinWith(peopleToMap, employmentDataset.col("orcid").equalTo(peopleToMap.col("orcid"))) + .map((MapFunction, Employment>) t2 -> t2._1(), Encoders.bean(Employment.class)); + + peopleToMap.show(false); + + Dataset people; + people = peopleToMap.map((MapFunction) op -> { + Person person = new Person(); + person.setId(DHPUtils.generateIdentifier(op.getOrcid(), PERSON_PREFIX)); + person.setBiography(Optional.ofNullable(op.getBiography()) + + .orElse("")); + KeyValue kv = OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS); + kv.setDataInfo(null); + person.setCollectedfrom(Arrays.asList(kv)); + person.setAlternativeNames(Optional.ofNullable(op.getOtherNames()) + + .orElse(new ArrayList<>())); + person.setFamilyName(Optional.ofNullable(op.getFamilyName()) + + .orElse("")); + person.setGivenName(Optional.ofNullable(op.getGivenName()) + + .orElse("")); + person.setPid(Optional.ofNullable(op.getOtherPids()) + .map(v -> v.stream().map(p -> Pid.newInstance(p.getSchema(), p.getValue())).collect(Collectors.toList())) + .orElse(new ArrayList<>()) + ); + person.getPid().add(Pid.newInstance(ModelConstants.ORCID, op.getOrcid())); + person.setDateofcollection(op.getLastModifiedDate()); + person.setOriginalId(Arrays.asList(op.getOrcid())); + return person; + }, Encoders.bean(Person.class)); + + + people.show(false); + + + Dataset authorship; + authorship = works + .flatMap((FlatMapFunction) ExtractPerson::getAuthorshipRelationIterator + , Encoders.bean(Relation.class)); + + + authorship.show(false); + + + Dataset coauthorship = works + .flatMap((FlatMapFunction>) w -> { + List> lista = new ArrayList<>(); + w.getPids().stream().forEach(p -> { + if (p.getSchema().equalsIgnoreCase("doi") || p.getSchema().equalsIgnoreCase("pmc") || p.getSchema().equalsIgnoreCase("pmid") || p.getSchema().equalsIgnoreCase("arxiv")) + lista.add(new Tuple2<>(p.getValue(), w.getOrcid())); + }); + return lista.iterator(); + }, Encoders.tuple(Encoders.STRING(), Encoders.STRING())) + .groupByKey((MapFunction, String>) Tuple2::_1, Encoders.STRING()) + .mapGroups((MapGroupsFunction, Coauthors>) (k, it) -> + extractCoAuthors(it), Encoders.bean(Coauthors.class)) + .flatMap((FlatMapFunction) c -> c.getCoauthors().iterator(), Encoders.bean(Relation.class)) + .groupByKey((MapFunction) r -> r.getSource() + r.getTarget(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Relation.class)); + + coauthorship.show(false); + Dataset affiliation = employment + .filter((FilterFunction) e -> Optional.ofNullable(e.getAffiliationId()).isPresent()) + .filter((FilterFunction) e -> e.getAffiliationId().getSchema().equalsIgnoreCase("ror")) + .map((MapFunction) ExtractPerson::getAffiliationRelation + , Encoders.bean(Relation.class)); + + affiliation.show(false); + + people.toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .union(authorship.toJavaRDD().map(r-> new AtomicAction(r.getClass(),r))) + .union(coauthorship.toJavaRDD().map(r-> new AtomicAction(r.getClass(),r))) + .union(affiliation.toJavaRDD().map(r->new AtomicAction(r.getClass(),r))) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile( + outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);//, GzipCodec.class); + } + + private static Coauthors extractCoAuthors(Iterator> it) { + Coauthors coauth = new Coauthors(); + ArrayList ret = new ArrayList<>(); + List coauthors = new ArrayList<>(); + while(it.hasNext()) + coauthors.add(it.next()._2()); + for (int i = 0; i < coauthors.size() -1; i++ ) + for(int j = i + 1; j < coauthors.size(); j++) + ret.addAll(getCoAuthorshipRelations(coauthors.get(i), coauthors.get(j))); + + coauth.setCoauthors(ret); + + return coauth; + } + + private static Relation getAffiliationRelation(Employment row) { + String source = PERSON_PREFIX + IdentifierFactory.md5(row.getOrcid()); + String target = ROR_PREFIX + + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAffiliationId().getValue())); + List properties = new ArrayList<>() ; + + Relation relation = + OafMapperUtils.getRelation(source, target, ModelConstants.ORG_PERSON_RELTYPE, ModelConstants.ORG_PERSON_SUBRELTYPE, ModelConstants.ORG_PERSON_PARTICIPATES , + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils.dataInfo(false, null, false, false, + OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"), + null); + + if(Optional.ofNullable(row.getStartDate()).isPresent() && StringUtil.isNotBlank(row.getStartDate())){ + KeyValue kv = new KeyValue(); + kv.setKey("startDate"); + kv.setValue(row.getStartDate()); + properties.add(kv); + } + if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtil.isNotBlank(row.getEndDate())) { + KeyValue kv = new KeyValue(); + kv.setKey("endDate"); + kv.setValue(row.getEndDate()); + properties.add(kv); + } + + if (properties.size() > 0) + relation.setProperties(properties); + return relation; + + + } + + private static Collection getCoAuthorshipRelations(String orcid1, String orcid2) { + String source = PERSON_PREFIX + IdentifierFactory.md5(orcid1); + String target = PERSON_PREFIX + IdentifierFactory.md5(orcid2); + + return Arrays.asList(OafMapperUtils.getRelation(source, target,ModelConstants.PERSON_PERSON_RELTYPE, + ModelConstants.PERSON_PERSON_SUBRELTYPE, + ModelConstants.PERSON_PERSON_HASCOAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils.dataInfo(false, null, false, false, + OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"), + null), + OafMapperUtils.getRelation(target, source,ModelConstants.PERSON_PERSON_RELTYPE, + ModelConstants.PERSON_PERSON_SUBRELTYPE, + ModelConstants.PERSON_PERSON_HASCOAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils.dataInfo(false, null, false, false, + OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"), + null)); + + } + + private static @NotNull Iterator getAuthorshipRelationIterator(Work w) { + + if(Optional.ofNullable(w.getPids()).isPresent()) + return w.getPids() + .stream() + .map(pid -> getRelation(w.getOrcid(), pid)) + .filter(Objects::nonNull).collect(Collectors.toList()).iterator(); + List ret = new ArrayList<>(); + return ret.iterator(); + } + + + private static Relation getRelation(String orcid, eu.dnetlib.dhp.collection.orcid.model.Pid pid){ + String target ; + String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid); + switch (pid.getSchema()){ + case "doi": + target = DOI_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), pid.getValue())); + break; + case "pmid": + target = PMID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), pid.getValue())); + break; + case "arxiv": + target = ARXIV_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), pid.getValue())); + break; + case "pmcid": + target = PMCID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), pid.getValue())); + break; + + default: + return null; + } + + + return OafMapperUtils.getRelation(source, target,ModelConstants.RESULT_PERSON_RELTYPE, + ModelConstants.RESULT_PERSON_SUBRELTYPE, + ModelConstants.RESULT_PERSON_HASAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + OafMapperUtils.dataInfo(false, null, false, false, + OafMapperUtils.qualifier(ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"), + null); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/WorkList.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/WorkList.java new file mode 100644 index 000000000..71d2b491a --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/WorkList.java @@ -0,0 +1,23 @@ +package eu.dnetlib.dhp.actionmanager.personentity; + +import eu.dnetlib.dhp.collection.orcid.model.Work; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; + +import java.io.Serializable; +import java.util.ArrayList; + +public class WorkList implements Serializable { + private ArrayList workArrayList; + + public ArrayList getWorkArrayList() { + return workArrayList; + } + + public void setWorkArrayList(ArrayList workArrayList) { + this.workArrayList = workArrayList; + } + + public WorkList() { + workArrayList = new ArrayList<>(); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java index 27970f2c3..263179d6f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java @@ -104,8 +104,8 @@ public class CreateActionSetFromWebEntries implements Serializable { final String ror = ROR_PREFIX + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror"))); ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror)); - ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror)); - ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror)); +// ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror)); +// ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror)); return ret .iterator(); @@ -139,11 +139,17 @@ public class CreateActionSetFromWebEntries implements Serializable { "institution", functions .explode( functions.col("institutions"))) + .selectExpr( - "id", "doi", "ids.pmcid as pmcid", "ids.pmid as pmid", "institution.ror as ror", + "id", "doi", "institution.ror as ror", "institution.country_code as country_code", "publication_year") .distinct(); +// .selectExpr( +// "id", "doi", "ids.pmcid as pmcid", "ids.pmid as pmid", "institution.ror as ror", +// "institution.country_code as country_code", "publication_year") +// .distinct(); + } private static Dataset readBlackList(SparkSession spark, String inputPath) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Author.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Author.java index df87e4333..a1545ebfe 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Author.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Author.java @@ -20,6 +20,9 @@ public class Author extends ORCIDItem { private String lastModifiedDate; + public Author() { + } + public String getBiography() { return biography; } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/ORCIDItem.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/ORCIDItem.java index 6bc47bc26..419823cb1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/ORCIDItem.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/ORCIDItem.java @@ -11,4 +11,7 @@ public class ORCIDItem { public void setOrcid(String orcid) { this.orcid = orcid; } + + public ORCIDItem() { + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Work.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Work.java index 670170323..a8683aaaf 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Work.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/model/Work.java @@ -32,4 +32,6 @@ public class Work extends ORCIDItem { pids.add(pid); } + public Work() { + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json new file mode 100644 index 000000000..3310f16e4 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "ip", + "paramLongName": "inputPath", + "paramDescription": "the zipped opencitations file", + "paramRequired": true + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "the working path", + "paramRequired": true + }, + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "the hdfs name node", + "paramRequired": false + }, + { + "paramName": "wd", + "paramLongName": "workingDir", + "paramDescription": "the hdfs name node", + "paramRequired": false + } +] diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/person/CreatePersonAS.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/person/CreatePersonAS.java new file mode 100644 index 000000000..a4d56ba2e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/person/CreatePersonAS.java @@ -0,0 +1,114 @@ + +package eu.dnetlib.dhp.actionmanager.person; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob; +import eu.dnetlib.dhp.actionmanager.personentity.ExtractPerson; +import eu.dnetlib.dhp.collection.orcid.model.Author; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CreatePersonAS { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + private static final Logger log = LoggerFactory + .getLogger(CreatePersonAS.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory(CreatePersonAS.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(CreatePersonAS.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.codegen.wholeStage","false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(CreatePersonAS.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + void testAuthors() throws Exception { + + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/person/") + .getPath(); + +// spark +// .read() +// .parquet(inputPath + "Authors") +// .as(Encoders.bean(Author.class)) +// .filter((FilterFunction) a -> Optional.ofNullable(a.getOtherNames()).isPresent() && +// Optional.ofNullable(a.getBiography()).isPresent()) +// .write() +// .mode(SaveMode.Overwrite) +// .parquet(workingDir.toString() + "AuthorsSubset"); + + + ExtractPerson + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + inputPath, + "-outputPath", + workingDir.toString() + "/actionSet1", + "-workingDir", + workingDir.toString() + "/working" + }); + + + + } + + + } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/person/WorkJson/part-00000 b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/person/WorkJson/part-00000 new file mode 100644 index 000000000..636595d49 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/person/WorkJson/part-00000 @@ -0,0 +1,10 @@ +{"orcid":"0000-0001-6291-9619","title":"A Visible Light Driven Photoelectrochemical Chloramphenicol Aptasensor Based on a Gold Nanoparticle-Functionalized 3D Flower-like MoS2/TiO2 Heterostructure","pids":[{"value":"10.1021/acs.langmuir.1c02956","schema":"doi"},{"value":"2-s2.0-85124885368","schema":"eid"},{"value":"15205827 07437463","schema":"issn"}]} +{"orcid":"0000-0002-3210-3034","title":"A Visible Light Driven Photoelectrochemical Chloramphenicol Aptasensor Based on a Gold Nanoparticle-Functionalized 3D Flower-like MoS2/TiO2 Heterostructure","pids":[{"value":"10.1021/acs.langmuir.1c02956","schema":"doi"},{"value":"2-s2.0-85124885368","schema":"eid"},{"value":"15205827 07437463","schema":"issn"}]} +{"orcid":"0000-0001-6291-9619","title":"Study of High-Transverse-Momentum Higgs Boson Production in Association with a Vector Boson in the qqbb Final State with the ATLAS Detector","pids":[{"value":"2736741","schema":"other-id"},{"value":"10.1103/PhysRevLett.132.131802","schema":"doi"},{"value":"2312.07605","schema":"arxiv"}]} +{"orcid":"0000-0002-3210-3034","title":"Study of High-Transverse-Momentum Higgs Boson Production in Association with a Vector Boson in the qqbb Final State with the ATLAS Detector","pids":[{"value":"2736741","schema":"other-id"},{"value":"10.1103/PhysRevLett.132.131802","schema":"doi"},{"value":"2312.07605","schema":"arxiv"}]} +{"orcid":"0000-0002-9030-7609","title":"Search for supersymmetry in a final state containing two photons and missing transverse momentum in √s = 13 TeV pp collisions at the LHC using the ATLAS detector","pids":[{"value":"10.1140/epjc/s10052-016-4344-x","schema":"doi"},{"value":"2-s2.0-84988710988","schema":"eid"},{"value":"14346052 14346044","schema":"issn"}]} +{"orcid":"0000-0003-2552-9691","title":"Search for supersymmetry in a final state containing two photons and missing transverse momentum in $\\sqrt{s}$ = 13 TeV $pp$ collisions at the LHC using the ATLAS detector","pids":[{"value":"1473744","schema":"other-id"},{"value":"10.1140/epjc/s10052-016-4344-x","schema":"doi"},{"value":"1606.09150","schema":"arxiv"}]} +{"orcid":"0000-0003-0305-8980","title":"Search for supersymmetry in a final state containing two photons and missing transverse momentum in √s = 13 TeV pp collisions at the LHC using the ATLAS detector","pids":[{"value":"10.1140/epjc/s10052-016-4344-x","schema":"doi"},{"value":"2-s2.0-84988710988","schema":"eid"}]} +{"orcid":"0000-0002-9030-7609","title":"Measurement of the energy response of the ATLAS calorimeter to charged pions from $W^{\\pm }\\rightarrow \\tau ^{\\pm }(\\rightarrow \\pi ^{\\pm }\\nu _{\\tau })\\nu _{\\tau }$ events in Run 2 data","pids":[{"value":"1909507","schema":"other-id"},{"value":"10.1140/epjc/s10052-022-10117-2","schema":"doi"},{"value":"2108.09043","schema":"arxiv"}]} +{"orcid":"0000-0003-2629-4046","title":"Measurement of the energy response of the ATLAS calorimeter to charged pions from $W^{\\pm }\\rightarrow \\tau ^{\\pm }(\\rightarrow \\pi ^{\\pm }\\nu _{\\tau })\\nu _{\\tau }$ events in Run 2 data","pids":[{"value":"1909507","schema":"other-id"},{"value":"10.1140/epjc/s10052-022-10117-2","schema":"doi"},{"value":"2108.09043","schema":"arxiv"}]} +{"orcid":"0000-0001-8582-8912","title":"Measurement of the energy response of the ATLAS calorimeter to charged pions from $W^{\\pm }\\rightarrow \\tau ^{\\pm }(\\rightarrow \\pi ^{\\pm }\\nu _{\\tau })\\nu _{\\tau }$ events in Run 2 data","pids":[{"value":"1909507","schema":"other-id"},{"value":"10.1140/epjc/s10052-022-10117-2","schema":"doi"},{"value":"2108.09043","schema":"arxiv"}]} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 9e554204d..c8d7a980a 100644 --- a/pom.xml +++ b/pom.xml @@ -994,6 +994,25 @@ + + + arm-silicon-mac + + + aarch64 + mac + + + + + + org.xerial.snappy + snappy-java + 1.1.8.4 + + + + spark-34