diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/enrich/relsfrompublisherenricheddata/EnrichExternalDataWithGraphORCID.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/enrich/relsfrompublisherenricheddata/EnrichExternalDataWithGraphORCID.java index de7d7a4df..46052ba35 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/enrich/relsfrompublisherenricheddata/EnrichExternalDataWithGraphORCID.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/enrich/relsfrompublisherenricheddata/EnrichExternalDataWithGraphORCID.java @@ -1,14 +1,13 @@ package eu.dnetlib.dhp.enrich.relsfrompublisherenricheddata; -import com.azul.tooling.in.Model; import eu.dnetlib.dhp.common.author.SparkEnrichWithOrcidAuthors; +import eu.dnetlib.dhp.common.person.CoAuthorshipIterator; +import eu.dnetlib.dhp.common.person.Coauthors; import eu.dnetlib.dhp.orcidtoresultfromsemrel.OrcidAuthors; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; -import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner; import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.ORCIDAuthorEnricherResult; import eu.dnetlib.dhp.utils.OrcidAuthor; @@ -57,6 +56,8 @@ public class EnrichExternalDataWithGraphORCID extends SparkEnrichWithOrcidAuthor } + + private static OrcidAuthors getOrcidAuthorsList(List authors) { OrcidAuthors oas = new OrcidAuthors(); List tmp = authors @@ -102,13 +103,12 @@ public class EnrichExternalDataWithGraphORCID extends SparkEnrichWithOrcidAuthor @Override public void generateGraph(SparkSession spark, String graphPath, String workingDir, String targetPath) { - //creates new relations + //creates new relations of authorship Dataset newRelations = spark.read().schema(Encoders.bean(ORCIDAuthorEnricherResult.class).schema()) .parquet(workingDir + "/publication_matched") .selectExpr("id as doi", "enriched_author") .flatMap((FlatMapFunction) this::getRelationsList, Encoders.bean(Relation.class)); - //redirects new relations versus representatives if any Dataset graph_relations = spark.read().schema(Encoders.bean(Relation.class).schema()) .json(graphPath + "/relation") @@ -154,14 +154,24 @@ public class EnrichExternalDataWithGraphORCID extends SparkEnrichWithOrcidAuthor eauthors.forEach(author -> { List pids =author.getAs("pid"); - pids.forEach(p -> { - if(p.getAs("scheme")==ModelConstants.ORCID) - relationList.add(getRelations(r.getAs("doi"),author.getAs("raw_affiliation_string"), p.getAs("value"))); - }); + List pidList = pids.stream().filter(p -> ModelConstants.ORCID.equalsIgnoreCase(p.getAs("schema")) || ModelConstants.ORCID_PENDING.equalsIgnoreCase(p.getAs("schema"))).collect(Collectors.toList()); + pidList.forEach(p -> relationList.add(getRelations(r.getAs("doi"),author.getAs("raw_affiliation_string"), p.getAs("value"))) + ); + new CoAuthorshipIterator(extractCoAuthors(pidList)).forEachRemaining(relationList::add); + }); return relationList.iterator(); } + private static List extractCoAuthors(List pidList) { + + List coauthors = new ArrayList<>(); + for(Row pid : pidList) + coauthors.add(pid.getAs("Value")); + + + return coauthors; + } private Relation getRelations(String doi, List rawAffiliationString, String orcid) { Relation rel = OafMapperUtils.getRelation("30|orcid_______::"+ DHPUtils.md5(orcid) , "50|doi_________::" + DHPUtils.md5(doi) ,ModelConstants.RESULT_PERSON_RELTYPE, ModelConstants.RESULT_PERSON_SUBRELTYPE, ModelConstants.RESULT_PERSON_HASAUTHORED, @@ -272,9 +282,12 @@ public class EnrichExternalDataWithGraphORCID extends SparkEnrichWithOrcidAuthor List pids = new ArrayList<>(); List affs = new ArrayList<>(); - ((List)a.getAs("pids")).forEach(pid -> pids.add(getPid(pid))); + List publisherPids = a.getAs("pids"); + if(Optional.ofNullable(publisherPids).isPresent()) + publisherPids.forEach(pid -> pids.add(getPid(pid))); + Listaffiliations = a.getAs("affiliations"); //"`Matchings`: ARRAY>, - ((List)a.getAs("affiliations")).forEach(aff -> { + affiliations.forEach(aff -> { if(aff.getAs("Status").equals(Boolean.TRUE)) affs.add(aff.getAs("PID") + "@@" + aff.getAs("Value") + "@@" + String.valueOf(aff.getAs("Confidence"))); diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/enrich/relsfrompublisherenricheddata/SparkCopyEnrichedPublisherAuthors.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/enrich/relsfrompublisherenricheddata/SparkCopyEnrichedPublisherAuthors.java new file mode 100644 index 000000000..0acd1fcac --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/enrich/relsfrompublisherenricheddata/SparkCopyEnrichedPublisherAuthors.java @@ -0,0 +1,63 @@ +package eu.dnetlib.dhp.enrich.relsfrompublisherenricheddata; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob; +import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob; +import eu.dnetlib.dhp.utils.ORCIDAuthorEnricherResult; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Optional; + +import static eu.dnetlib.dhp.PropagationConstant.isSparkSessionManaged; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class SparkCopyEnrichedPublisherAuthors implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkCountryPropagationJob.class); + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + SparkCopyEnrichedPublisherAuthors.class + .getResourceAsStream( + "/eu/dnetlib/dhp/wf/subworkflows/enrich/publisher/input_propagation_parameter.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> copyEnrichedAuthors(spark, workingDir, outputPath)); + } + + + private static void copyEnrichedAuthors(SparkSession spark, String workingDir, String persistedPath) { + spark.read().schema(Encoders.bean(ORCIDAuthorEnricherResult.class).schema()) + .parquet(workingDir + "/publication_matched") + .selectExpr("id as doi", "enriched_author") + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(persistedPath + "/publisherEnrichedAuthors"); + } +} diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/enrich/publisher/enrich_publisher_orcid_parameters.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/enrich/publisher/enrich_publisher_orcid_parameters.json new file mode 100644 index 000000000..228298b1e --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/enrich/publisher/enrich_publisher_orcid_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "should be local or yarn", + "paramRequired": false + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "the path of the orcid Table generated by the dump", + "paramRequired": true + }, + { + "paramName": "wd", + "paramLongName": "workingDir", + "paramDescription": "the path of the graph we want to apply enrichment", + "paramRequired": true + } +] \ No newline at end of file