From d378de4f0d013ec1ac5524cf48743c1422070d85 Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Thu, 14 Nov 2024 18:04:13 +0100 Subject: [PATCH] [person] added merge of relations and production of all possible relations (not only authorship but also co authorship) Note some of the properties present in the publsihers file may be missing (role and corresponding author since there is no place where to put them) note the confidence of the affiliation is not present in the property of the relation --- .../EnrichExternalDataWithGraphORCID.java | 35 +++++++---- .../SparkCopyEnrichedPublisherAuthors.java | 63 +++++++++++++++++++ .../enrich_publisher_orcid_parameters.json | 20 ++++++ 3 files changed, 107 insertions(+), 11 deletions(-) create mode 100644 dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/enrich/relsfrompublisherenricheddata/SparkCopyEnrichedPublisherAuthors.java create mode 100644 dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/enrich/publisher/enrich_publisher_orcid_parameters.json diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/enrich/relsfrompublisherenricheddata/EnrichExternalDataWithGraphORCID.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/enrich/relsfrompublisherenricheddata/EnrichExternalDataWithGraphORCID.java index de7d7a4df..46052ba35 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/enrich/relsfrompublisherenricheddata/EnrichExternalDataWithGraphORCID.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/enrich/relsfrompublisherenricheddata/EnrichExternalDataWithGraphORCID.java @@ -1,14 +1,13 @@ package eu.dnetlib.dhp.enrich.relsfrompublisherenricheddata; -import com.azul.tooling.in.Model; import eu.dnetlib.dhp.common.author.SparkEnrichWithOrcidAuthors; +import eu.dnetlib.dhp.common.person.CoAuthorshipIterator; +import eu.dnetlib.dhp.common.person.Coauthors; import eu.dnetlib.dhp.orcidtoresultfromsemrel.OrcidAuthors; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; -import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner; import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.ORCIDAuthorEnricherResult; import eu.dnetlib.dhp.utils.OrcidAuthor; @@ -57,6 +56,8 @@ public class EnrichExternalDataWithGraphORCID extends SparkEnrichWithOrcidAuthor } + + private static OrcidAuthors getOrcidAuthorsList(List authors) { OrcidAuthors oas = new OrcidAuthors(); List tmp = authors @@ -102,13 +103,12 @@ public class EnrichExternalDataWithGraphORCID extends SparkEnrichWithOrcidAuthor @Override public void generateGraph(SparkSession spark, String graphPath, String workingDir, String targetPath) { - //creates new relations + //creates new relations of authorship Dataset newRelations = spark.read().schema(Encoders.bean(ORCIDAuthorEnricherResult.class).schema()) .parquet(workingDir + "/publication_matched") .selectExpr("id as doi", "enriched_author") .flatMap((FlatMapFunction) this::getRelationsList, Encoders.bean(Relation.class)); - //redirects new relations versus representatives if any Dataset graph_relations = spark.read().schema(Encoders.bean(Relation.class).schema()) .json(graphPath + "/relation") @@ -154,14 +154,24 @@ public class EnrichExternalDataWithGraphORCID extends SparkEnrichWithOrcidAuthor eauthors.forEach(author -> { List pids =author.getAs("pid"); - pids.forEach(p -> { - if(p.getAs("scheme")==ModelConstants.ORCID) - relationList.add(getRelations(r.getAs("doi"),author.getAs("raw_affiliation_string"), p.getAs("value"))); - }); + List pidList = pids.stream().filter(p -> ModelConstants.ORCID.equalsIgnoreCase(p.getAs("schema")) || ModelConstants.ORCID_PENDING.equalsIgnoreCase(p.getAs("schema"))).collect(Collectors.toList()); + pidList.forEach(p -> relationList.add(getRelations(r.getAs("doi"),author.getAs("raw_affiliation_string"), p.getAs("value"))) + ); + new CoAuthorshipIterator(extractCoAuthors(pidList)).forEachRemaining(relationList::add); + }); return relationList.iterator(); } + private static List extractCoAuthors(List pidList) { + + List coauthors = new ArrayList<>(); + for(Row pid : pidList) + coauthors.add(pid.getAs("Value")); + + + return coauthors; + } private Relation getRelations(String doi, List rawAffiliationString, String orcid) { Relation rel = OafMapperUtils.getRelation("30|orcid_______::"+ DHPUtils.md5(orcid) , "50|doi_________::" + DHPUtils.md5(doi) ,ModelConstants.RESULT_PERSON_RELTYPE, ModelConstants.RESULT_PERSON_SUBRELTYPE, ModelConstants.RESULT_PERSON_HASAUTHORED, @@ -272,9 +282,12 @@ public class EnrichExternalDataWithGraphORCID extends SparkEnrichWithOrcidAuthor List pids = new ArrayList<>(); List affs = new ArrayList<>(); - ((List)a.getAs("pids")).forEach(pid -> pids.add(getPid(pid))); + List publisherPids = a.getAs("pids"); + if(Optional.ofNullable(publisherPids).isPresent()) + publisherPids.forEach(pid -> pids.add(getPid(pid))); + Listaffiliations = a.getAs("affiliations"); //"`Matchings`: ARRAY>, - ((List)a.getAs("affiliations")).forEach(aff -> { + affiliations.forEach(aff -> { if(aff.getAs("Status").equals(Boolean.TRUE)) affs.add(aff.getAs("PID") + "@@" + aff.getAs("Value") + "@@" + String.valueOf(aff.getAs("Confidence"))); diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/enrich/relsfrompublisherenricheddata/SparkCopyEnrichedPublisherAuthors.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/enrich/relsfrompublisherenricheddata/SparkCopyEnrichedPublisherAuthors.java new file mode 100644 index 000000000..0acd1fcac --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/enrich/relsfrompublisherenricheddata/SparkCopyEnrichedPublisherAuthors.java @@ -0,0 +1,63 @@ +package eu.dnetlib.dhp.enrich.relsfrompublisherenricheddata; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob; +import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob; +import eu.dnetlib.dhp.utils.ORCIDAuthorEnricherResult; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Optional; + +import static eu.dnetlib.dhp.PropagationConstant.isSparkSessionManaged; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class SparkCopyEnrichedPublisherAuthors implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkCountryPropagationJob.class); + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + SparkCopyEnrichedPublisherAuthors.class + .getResourceAsStream( + "/eu/dnetlib/dhp/wf/subworkflows/enrich/publisher/input_propagation_parameter.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> copyEnrichedAuthors(spark, workingDir, outputPath)); + } + + + private static void copyEnrichedAuthors(SparkSession spark, String workingDir, String persistedPath) { + spark.read().schema(Encoders.bean(ORCIDAuthorEnricherResult.class).schema()) + .parquet(workingDir + "/publication_matched") + .selectExpr("id as doi", "enriched_author") + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(persistedPath + "/publisherEnrichedAuthors"); + } +} diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/enrich/publisher/enrich_publisher_orcid_parameters.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/enrich/publisher/enrich_publisher_orcid_parameters.json new file mode 100644 index 000000000..228298b1e --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/enrich/publisher/enrich_publisher_orcid_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "should be local or yarn", + "paramRequired": false + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "the path of the orcid Table generated by the dump", + "paramRequired": true + }, + { + "paramName": "wd", + "paramLongName": "workingDir", + "paramDescription": "the path of the graph we want to apply enrichment", + "paramRequired": true + } +] \ No newline at end of file