From a821371af2ffa3bec53c47f949eb1a90fad3363f Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 25 Oct 2023 11:46:10 +0200 Subject: [PATCH] added code to dump the relations between organizaiton and projects in the subset of entities relevant for EOSC --- .../eu/dnetlib/dhp/eosc/model/Funder.java | 25 +- .../eu/dnetlib/dhp/eosc/model/Fundings.java | 34 ++- .../eu/dnetlib/dhp/eosc/model/Granted.java | 77 +++--- .../dnetlib/dhp/eosc/model/Organization.java | 107 ++++---- .../dhp/eosc/model/OrganizationPid.java | 2 +- .../eu/dnetlib/dhp/eosc/model/Programme.java | 47 ++-- .../eu/dnetlib/dhp/eosc/model/Project.java | 236 +++++++++--------- .../ExtendEoscResultWithOrganization.java | 2 +- ...ExtendEoscResultWithOrganizationStep2.java | 136 +++++----- ...java => SparkDumpOrganizationProject.java} | 63 ++--- .../dump/eosc/SparkUpdateProjectInfo.java | 144 ++++++----- ...osc_dump_organizationprojectrelations.json | 26 ++ .../graph/dump/eosc/SelectEoscResultTest.java | 2 +- 13 files changed, 459 insertions(+), 442 deletions(-) rename dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/{SparkDumpRelation.java => SparkDumpOrganizationProject.java} (56%) create mode 100644 dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_dump_organizationprojectrelations.json diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Funder.java b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Funder.java index f2b198d..eb8df14 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Funder.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Funder.java @@ -1,28 +1,27 @@ + package eu.dnetlib.dhp.eosc.model; /** * @author miriam.baglioni * @Date 25/10/23 */ - - - import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; +import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; /** * To store information about the funder funding the project related to the result. It extends * eu.dnetlib.dhp.schema.dump.oaf.Funder with the following parameter: - - private * eu.dnetdlib.dhp.schema.dump.oaf.graph.Fundings funding_stream to store the fundingstream */ - public class Funder extends FunderShort { +public class Funder extends FunderShort { - @JsonSchema(description = "Description of the funding stream") - private Fundings funding_stream; + @JsonSchema(description = "Description of the funding stream") + private Fundings funding_stream; - public Fundings getFunding_stream() { - return funding_stream; - } + public Fundings getFunding_stream() { + return funding_stream; + } - public void setFunding_stream(Fundings funding_stream) { - this.funding_stream = funding_stream; - } - } + public void setFunding_stream(Fundings funding_stream) { + this.funding_stream = funding_stream; + } +} diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Fundings.java b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Fundings.java index 245a140..0440be3 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Fundings.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Fundings.java @@ -1,12 +1,10 @@ + package eu.dnetlib.dhp.eosc.model; /** * @author miriam.baglioni * @Date 25/10/23 */ - - - import java.io.Serializable; import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; @@ -22,23 +20,23 @@ import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; */ public class Fundings implements Serializable { - @JsonSchema(description = "Id of the funding stream") - private String id; - private String description; + @JsonSchema(description = "Id of the funding stream") + private String id; + private String description; - public String getId() { - return id; - } + public String getId() { + return id; + } - public void setId(String id) { - this.id = id; - } + public void setId(String id) { + this.id = id; + } - public String getDescription() { - return description; - } + public String getDescription() { + return description; + } - public void setDescription(String description) { - this.description = description; - } + public void setDescription(String description) { + this.description = description; + } } diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Granted.java b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Granted.java index 4202ea2..26b28ef 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Granted.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Granted.java @@ -1,11 +1,10 @@ + package eu.dnetlib.dhp.eosc.model; /** * @author miriam.baglioni * @Date 25/10/23 */ - - import java.io.Serializable; import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; @@ -16,51 +15,51 @@ import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; * funded amount by the funder */ public class Granted implements Serializable { - @JsonSchema(description = "The currency of the granted amount (e.g. EUR)") - private String currency; + @JsonSchema(description = "The currency of the granted amount (e.g. EUR)") + private String currency; - @JsonSchema(description = "The total cost of the project") - private float totalcost; + @JsonSchema(description = "The total cost of the project") + private float totalcost; - @JsonSchema(description = "The funded amount") - private float fundedamount; + @JsonSchema(description = "The funded amount") + private float fundedamount; - public String getCurrency() { - return currency; - } + public String getCurrency() { + return currency; + } - public void setCurrency(String currency) { - this.currency = currency; - } + public void setCurrency(String currency) { + this.currency = currency; + } - public float getTotalcost() { - return totalcost; - } + public float getTotalcost() { + return totalcost; + } - public void setTotalcost(float totalcost) { - this.totalcost = totalcost; - } + public void setTotalcost(float totalcost) { + this.totalcost = totalcost; + } - public float getFundedamount() { - return fundedamount; - } + public float getFundedamount() { + return fundedamount; + } - public void setFundedamount(float fundedamount) { - this.fundedamount = fundedamount; - } + public void setFundedamount(float fundedamount) { + this.fundedamount = fundedamount; + } - public static Granted newInstance(String currency, float totalcost, float fundedamount) { - Granted granted = new Granted(); - granted.currency = currency; - granted.totalcost = totalcost; - granted.fundedamount = fundedamount; - return granted; - } + public static Granted newInstance(String currency, float totalcost, float fundedamount) { + Granted granted = new Granted(); + granted.currency = currency; + granted.totalcost = totalcost; + granted.fundedamount = fundedamount; + return granted; + } - public static Granted newInstance(String currency, float fundedamount) { - Granted granted = new Granted(); - granted.currency = currency; - granted.fundedamount = fundedamount; - return granted; - } + public static Granted newInstance(String currency, float fundedamount) { + Granted granted = new Granted(); + granted.currency = currency; + granted.fundedamount = fundedamount; + return granted; + } } diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Organization.java b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Organization.java index 395ac24..0c0a270 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Organization.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Organization.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.eosc.model; import java.io.Serializable; @@ -16,76 +17,76 @@ import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; * - private List pid to store the list of pids for the organization */ public class Organization implements Serializable { - private String legalshortname; - private String legalname; - private String websiteurl; + private String legalshortname; + private String legalname; + private String websiteurl; - @JsonSchema(description = "Alternative names that identify the organisation") - private List alternativenames; + @JsonSchema(description = "Alternative names that identify the organisation") + private List alternativenames; - @JsonSchema(description = "The organisation country") - private Country country; + @JsonSchema(description = "The organisation country") + private Country country; - @JsonSchema(description = "The OpenAIRE id for the organisation") - private String id; + @JsonSchema(description = "The OpenAIRE id for the organisation") + private String id; - @JsonSchema(description = "Persistent identifiers for the organisation i.e. isni 0000000090326370") - private List pid; + @JsonSchema(description = "Persistent identifiers for the organisation i.e. isni 0000000090326370") + private List pid; - public String getLegalshortname() { - return legalshortname; - } + public String getLegalshortname() { + return legalshortname; + } - public void setLegalshortname(String legalshortname) { - this.legalshortname = legalshortname; - } + public void setLegalshortname(String legalshortname) { + this.legalshortname = legalshortname; + } - public String getLegalname() { - return legalname; - } + public String getLegalname() { + return legalname; + } - public void setLegalname(String legalname) { - this.legalname = legalname; - } + public void setLegalname(String legalname) { + this.legalname = legalname; + } - public String getWebsiteurl() { - return websiteurl; - } + public String getWebsiteurl() { + return websiteurl; + } - public void setWebsiteurl(String websiteurl) { - this.websiteurl = websiteurl; - } + public void setWebsiteurl(String websiteurl) { + this.websiteurl = websiteurl; + } - public List getAlternativenames() { - return alternativenames; - } + public List getAlternativenames() { + return alternativenames; + } - public void setAlternativenames(List alternativenames) { - this.alternativenames = alternativenames; - } + public void setAlternativenames(List alternativenames) { + this.alternativenames = alternativenames; + } - public Country getCountry() { - return country; - } + public Country getCountry() { + return country; + } - public void setCountry(Country country) { - this.country = country; - } + public void setCountry(Country country) { + this.country = country; + } - public String getId() { - return id; - } + public String getId() { + return id; + } - public void setId(String id) { - this.id = id; - } + public void setId(String id) { + this.id = id; + } - public List getPid() { - return pid; - } + public List getPid() { + return pid; + } - public void setPid(List pid) { - this.pid = pid; - } + public void setPid(List pid) { + this.pid = pid; + } } diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/OrganizationPid.java b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/OrganizationPid.java index 4613d4d..fb99d97 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/OrganizationPid.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/OrganizationPid.java @@ -33,7 +33,7 @@ public class OrganizationPid implements Serializable { this.value = value; } - public static OrganizationPid newInstance(String type, String value){ + public static OrganizationPid newInstance(String type, String value) { OrganizationPid op = new OrganizationPid(); op.type = type; op.value = value; diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Programme.java b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Programme.java index 76e9846..0d90081 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Programme.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Programme.java @@ -1,11 +1,10 @@ + package eu.dnetlib.dhp.eosc.model; /** * @author miriam.baglioni * @Date 25/10/23 */ - - import java.io.Serializable; import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; @@ -15,32 +14,32 @@ import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; * to store the code of the programme - private String description to store the description of the programme */ public class Programme implements Serializable { - @JsonSchema(description = "The code of the programme") - private String code; + @JsonSchema(description = "The code of the programme") + private String code; - @JsonSchema(description = "The description of the programme") - private String description; + @JsonSchema(description = "The description of the programme") + private String description; - public String getCode() { - return code; - } + public String getCode() { + return code; + } - public void setCode(String code) { - this.code = code; - } + public void setCode(String code) { + this.code = code; + } - public String getDescription() { - return description; - } + public String getDescription() { + return description; + } - public void setDescription(String description) { - this.description = description; - } + public void setDescription(String description) { + this.description = description; + } - public static Programme newInstance(String code, String description) { - Programme p = new Programme(); - p.code = code; - p.description = description; - return p; - } + public static Programme newInstance(String code, String description) { + Programme p = new Programme(); + p.code = code; + p.description = description; + return p; + } } diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Project.java b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Project.java index e6a9d5b..6e71764 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Project.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Project.java @@ -1,17 +1,15 @@ + package eu.dnetlib.dhp.eosc.model; /** * @author miriam.baglioni * @Date 25/10/23 */ - - import java.io.Serializable; import java.util.List; import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; - /** * This is the class representing the Project in the model used for the dumps of the whole graph. At the moment the dump * of the Projects differs from the other dumps because we do not create relations between Funders (Organization) and @@ -43,164 +41,162 @@ import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; */ public class Project implements Serializable { - private String id; + private String id; - private String websiteurl; - private String code; - private String acronym; - private String title; + private String websiteurl; + private String code; + private String acronym; + private String title; - private String startdate; + private String startdate; - private String enddate; + private String enddate; - private String callidentifier; + private String callidentifier; - private String keywords; + private String keywords; - private boolean openaccessmandateforpublications; + private boolean openaccessmandateforpublications; - private boolean openaccessmandatefordataset; - private List subject; + private boolean openaccessmandatefordataset; + private List subject; - @JsonSchema(description = "Funding information for the project") - private List funding; + @JsonSchema(description = "Funding information for the project") + private List funding; - private String summary; + private String summary; - @JsonSchema(description = "The money granted to the project") - private Granted granted; + @JsonSchema(description = "The money granted to the project") + private Granted granted; - @JsonSchema(description = "The h2020 programme funding the project") - private List h2020programme; + @JsonSchema(description = "The h2020 programme funding the project") + private List h2020programme; + public String getId() { + return id; + } + public void setId(String id) { + this.id = id; + } - public String getId() { - return id; - } + public String getWebsiteurl() { + return websiteurl; + } - public void setId(String id) { - this.id = id; - } + public void setWebsiteurl(String websiteurl) { + this.websiteurl = websiteurl; + } - public String getWebsiteurl() { - return websiteurl; - } + public String getCode() { + return code; + } - public void setWebsiteurl(String websiteurl) { - this.websiteurl = websiteurl; - } + public void setCode(String code) { + this.code = code; + } - public String getCode() { - return code; - } + public String getAcronym() { + return acronym; + } - public void setCode(String code) { - this.code = code; - } + public void setAcronym(String acronym) { + this.acronym = acronym; + } - public String getAcronym() { - return acronym; - } + public String getTitle() { + return title; + } - public void setAcronym(String acronym) { - this.acronym = acronym; - } + public void setTitle(String title) { + this.title = title; + } - public String getTitle() { - return title; - } + public String getStartdate() { + return startdate; + } - public void setTitle(String title) { - this.title = title; - } + public void setStartdate(String startdate) { + this.startdate = startdate; + } - public String getStartdate() { - return startdate; - } + public String getEnddate() { + return enddate; + } - public void setStartdate(String startdate) { - this.startdate = startdate; - } + public void setEnddate(String enddate) { + this.enddate = enddate; + } - public String getEnddate() { - return enddate; - } + public String getCallidentifier() { + return callidentifier; + } - public void setEnddate(String enddate) { - this.enddate = enddate; - } + public void setCallidentifier(String callidentifier) { + this.callidentifier = callidentifier; + } - public String getCallidentifier() { - return callidentifier; - } + public String getKeywords() { + return keywords; + } - public void setCallidentifier(String callidentifier) { - this.callidentifier = callidentifier; - } + public void setKeywords(String keywords) { + this.keywords = keywords; + } - public String getKeywords() { - return keywords; - } + public boolean isOpenaccessmandateforpublications() { + return openaccessmandateforpublications; + } - public void setKeywords(String keywords) { - this.keywords = keywords; - } + public void setOpenaccessmandateforpublications(boolean openaccessmandateforpublications) { + this.openaccessmandateforpublications = openaccessmandateforpublications; + } - public boolean isOpenaccessmandateforpublications() { - return openaccessmandateforpublications; - } + public boolean isOpenaccessmandatefordataset() { + return openaccessmandatefordataset; + } - public void setOpenaccessmandateforpublications(boolean openaccessmandateforpublications) { - this.openaccessmandateforpublications = openaccessmandateforpublications; - } + public void setOpenaccessmandatefordataset(boolean openaccessmandatefordataset) { + this.openaccessmandatefordataset = openaccessmandatefordataset; + } - public boolean isOpenaccessmandatefordataset() { - return openaccessmandatefordataset; - } + public List getSubject() { + return subject; + } - public void setOpenaccessmandatefordataset(boolean openaccessmandatefordataset) { - this.openaccessmandatefordataset = openaccessmandatefordataset; - } + public void setSubject(List subject) { + this.subject = subject; + } - public List getSubject() { - return subject; - } + public List getFunding() { + return funding; + } - public void setSubject(List subject) { - this.subject = subject; - } + public void setFunding(List funding) { + this.funding = funding; + } - public List getFunding() { - return funding; - } + public String getSummary() { + return summary; + } - public void setFunding(List funding) { - this.funding = funding; - } + public void setSummary(String summary) { + this.summary = summary; + } - public String getSummary() { - return summary; - } + public Granted getGranted() { + return granted; + } - public void setSummary(String summary) { - this.summary = summary; - } + public void setGranted(Granted granted) { + this.granted = granted; + } - public Granted getGranted() { - return granted; - } + public List getH2020programme() { + return h2020programme; + } - public void setGranted(Granted granted) { - this.granted = granted; - } - - public List getH2020programme() { - return h2020programme; - } - - public void setH2020programme(List h2020programme) { - this.h2020programme = h2020programme; - } + public void setH2020programme(List h2020programme) { + this.h2020programme = h2020programme; + } } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganization.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganization.java index ff40538..7aa692a 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganization.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganization.java @@ -6,7 +6,6 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; import java.util.*; -import eu.dnetlib.dhp.eosc.model.Affiliation; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -20,6 +19,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.eosc.model.Affiliation; import eu.dnetlib.dhp.eosc.model.OrganizationPid; import eu.dnetlib.dhp.eosc.model.Result; import eu.dnetlib.dhp.schema.common.ModelConstants; diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java index 14f4836..1ca6f20 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java @@ -7,8 +7,6 @@ import java.io.Serializable; import java.util.*; import java.util.stream.Collectors; -import eu.dnetlib.dhp.eosc.model.Affiliation; -import eu.dnetlib.dhp.eosc.model.Country; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -22,6 +20,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.eosc.model.Affiliation; +import eu.dnetlib.dhp.eosc.model.Country; import eu.dnetlib.dhp.eosc.model.OrganizationPid; import eu.dnetlib.dhp.eosc.model.Result; import eu.dnetlib.dhp.schema.common.ModelConstants; @@ -70,7 +70,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, workingPath + "publicationextendedaffiliation"); - addOrganizations(spark, inputPath, workingPath , outputPath); + addOrganizations(spark, inputPath, workingPath, outputPath); }); } @@ -156,85 +156,89 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { .json(workingPath + "publicationextendedaffiliation"); relations - .joinWith(organizations, relations.col("source").equalTo(organizations.col("id"))) - .map((MapFunction, eu.dnetlib.dhp.eosc.model.Organization>) t2 -> mapOrganization(t2._2()),Encoders.bean(eu.dnetlib.dhp.eosc.model.Organization.class)) - .filter(Objects::nonNull) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(outputPath + "organization"); + .joinWith(organizations, relations.col("source").equalTo(organizations.col("id"))) + .map( + (MapFunction, eu.dnetlib.dhp.eosc.model.Organization>) t2 -> mapOrganization( + t2._2()), + Encoders.bean(eu.dnetlib.dhp.eosc.model.Organization.class)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "organization"); relations - .joinWith(organizations, relations.col("source").equalTo(organizations.col("id"))) - .map((MapFunction, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> eu.dnetlib.dhp.eosc.model.Relation.newInstance(t2._1().getSource(), t2._1().getTarget()), Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class) ) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(outputPath + "resultOrganization"); + .joinWith(organizations, relations.col("source").equalTo(organizations.col("id"))) + .map( + (MapFunction, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> eu.dnetlib.dhp.eosc.model.Relation + .newInstance(t2._1().getSource(), t2._1().getTarget()), + Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "resultOrganization"); } - private static eu.dnetlib.dhp.eosc.model.Organization mapOrganization(Organization org){ + private static eu.dnetlib.dhp.eosc.model.Organization mapOrganization(Organization org) { - if (Boolean.TRUE.equals(org.getDataInfo().getDeletedbyinference())) - return null; - if (!Optional.ofNullable(org.getLegalname()).isPresent() - && !Optional.ofNullable(org.getLegalshortname()).isPresent()) - return null; + if (Boolean.TRUE.equals(org.getDataInfo().getDeletedbyinference())) + return null; + if (!Optional.ofNullable(org.getLegalname()).isPresent() + && !Optional.ofNullable(org.getLegalshortname()).isPresent()) + return null; eu.dnetlib.dhp.eosc.model.Organization organization = new eu.dnetlib.dhp.eosc.model.Organization(); - Optional - .ofNullable(org.getLegalshortname()) - .ifPresent(value -> organization.setLegalshortname(value.getValue())); + Optional + .ofNullable(org.getLegalshortname()) + .ifPresent(value -> organization.setLegalshortname(value.getValue())); - Optional - .ofNullable(org.getLegalname()) - .ifPresent(value -> organization.setLegalname(value.getValue())); + Optional + .ofNullable(org.getLegalname()) + .ifPresent(value -> organization.setLegalname(value.getValue())); - Optional - .ofNullable(org.getWebsiteurl()) - .ifPresent(value -> organization.setWebsiteurl(value.getValue())); + Optional + .ofNullable(org.getWebsiteurl()) + .ifPresent(value -> organization.setWebsiteurl(value.getValue())); - Optional - .ofNullable(org.getAlternativeNames()) - .ifPresent( - value -> organization - .setAlternativenames( - value - .stream() - .map(v -> v.getValue()) - .collect(Collectors.toList()))); + Optional + .ofNullable(org.getAlternativeNames()) + .ifPresent( + value -> organization + .setAlternativenames( + value + .stream() + .map(v -> v.getValue()) + .collect(Collectors.toList()))); - Optional - .ofNullable(org.getCountry()) - .ifPresent( - value -> { - if (!value.getClassid().equals(UNKNOWN)) { - organization - .setCountry( - Country.newInstance(value.getClassid(), value.getClassname())); - } + Optional + .ofNullable(org.getCountry()) + .ifPresent( + value -> { + if (!value.getClassid().equals(UNKNOWN)) { + organization + .setCountry( + Country.newInstance(value.getClassid(), value.getClassname())); + } - }); + }); - Optional - .ofNullable(org.getId()) - .ifPresent(value -> organization.setId(value)); + Optional + .ofNullable(org.getId()) + .ifPresent(value -> organization.setId(value)); - Optional - .ofNullable(org.getPid()) - .ifPresent( - value -> organization - .setPid( - value - .stream() - .map(p -> OrganizationPid.newInstance(p.getQualifier().getClassid(), p.getValue())) - .collect(Collectors.toList()))); - - return organization; - } + Optional + .ofNullable(org.getPid()) + .ifPresent( + value -> organization + .setPid( + value + .stream() + .map(p -> OrganizationPid.newInstance(p.getQualifier().getClassid(), p.getValue())) + .collect(Collectors.toList()))); + return organization; } - +} diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkDumpRelation.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkDumpOrganizationProject.java similarity index 56% rename from dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkDumpRelation.java rename to dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkDumpOrganizationProject.java index 607b014..8fcbce8 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkDumpRelation.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkDumpOrganizationProject.java @@ -6,8 +6,12 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; import java.util.Optional; +import eu.dnetlib.dhp.eosc.model.Organization; +import eu.dnetlib.dhp.eosc.model.Project; +import eu.dnetlib.dhp.schema.common.ModelConstants; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -21,19 +25,20 @@ import eu.dnetlib.dhp.eosc.model.Provenance; import eu.dnetlib.dhp.eosc.model.RelType; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; +import scala.Tuple2; /** * @author miriam.baglioni * @Date 12/01/23 */ -public class SparkDumpRelation implements Serializable { +public class SparkDumpOrganizationProject implements Serializable { - private static final Logger log = LoggerFactory.getLogger(SparkDumpRelation.class); + private static final Logger log = LoggerFactory.getLogger(SparkDumpOrganizationProject.class); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( - SparkDumpRelation.class + SparkDumpOrganizationProject.class .getResourceAsStream( "/eu/dnetlib/dhp/oa/graph/dump/input_relationdump_parameters.json")); @@ -66,49 +71,21 @@ public class SparkDumpRelation implements Serializable { } private static void dumpRelation(SparkSession spark, String inputPath, String outputPath) { - Dataset relations = Utils.readPath(spark, inputPath, Relation.class); - relations + Dataset organization = Utils.readPath(spark, outputPath + "organization", Organization.class); + Dataset project = Utils.readPath(spark, outputPath + "project", Project.class); - .map((MapFunction) relation -> { - eu.dnetlib.dhp.eosc.model.Relation relNew = new eu.dnetlib.dhp.eosc.model.Relation(); - relNew - .setSource( + Dataset relation = Utils.readPath(spark, inputPath + "/relation", Relation.class) + .filter((FilterFunction) r-> !r.getDataInfo().getDeletedbyinference() && r.getRelClass().equalsIgnoreCase(ModelConstants.IS_PARTICIPANT)); - relation.getSource()); + Dataset eoscOrgs = relation.joinWith(organization, relation.col("source").equalTo(organization.col("id"))) + .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)); - relNew - .setTarget( - - relation.getTarget()); - - relNew - .setReltype( - RelType - .newInstance( - relation.getRelClass(), - relation.getSubRelType())); - - Optional odInfo = Optional.ofNullable(relation.getDataInfo()); - if (odInfo.isPresent()) { - DataInfo dInfo = odInfo.get(); - if (Optional.ofNullable(dInfo.getProvenanceaction()).isPresent() && - Optional.ofNullable(dInfo.getProvenanceaction().getClassname()).isPresent()) { - relNew - .setProvenance( - Provenance - .newInstance( - dInfo.getProvenanceaction().getClassname(), - dInfo.getTrust())); - } - } - - return relNew; - - }, Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class)) - .write() - .option("compression", "gzip") - .mode(SaveMode.Append) - .json(outputPath); + eoscOrgs.joinWith(project, eoscOrgs.col("target").equalTo(project.col("id"))) + .map((MapFunction, eu.dnetlib.dhp.eosc.model.Relation>) t2-> eu.dnetlib.dhp.eosc.model.Relation.newInstance(t2._1().getSource(), t2._1().getTarget()), Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(outputPath + "organizationProject"); } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkUpdateProjectInfo.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkUpdateProjectInfo.java index 2bd2978..cb9eaaf 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkUpdateProjectInfo.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkUpdateProjectInfo.java @@ -10,9 +10,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; -import eu.dnetlib.dhp.eosc.model.*; -import eu.dnetlib.dhp.schema.oaf.Field; -import eu.dnetlib.dhp.schema.oaf.Project; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; @@ -31,7 +28,10 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.eosc.model.*; import eu.dnetlib.dhp.oa.graph.dump.Constants; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.Project; import scala.Array; import scala.Tuple2; @@ -110,68 +110,86 @@ public class SparkUpdateProjectInfo implements Serializable { Dataset project = Utils.readPath(spark, inputPath + "/project", Project.class); - Dataset projectIds = result.joinWith(resultProject, result.col("id").equalTo(resultProject.col("resultId"))) - .flatMap((FlatMapFunction, String>) t2 -> t2._2().getProjectsList() - .stream().map(p -> p.getId()).collect(Collectors.toList()).iterator(), Encoders.STRING()) - .distinct(); + Dataset projectIds = result + .joinWith(resultProject, result.col("id").equalTo(resultProject.col("resultId"))) + .flatMap( + (FlatMapFunction, String>) t2 -> t2 + ._2() + .getProjectsList() + .stream() + .map(p -> p.getId()) + .collect(Collectors.toList()) + .iterator(), + Encoders.STRING()) + .distinct(); - projectIds.joinWith(project, projectIds.col("value").equalTo(project.col("id"))) - .map((MapFunction, eu.dnetlib.dhp.eosc.model.Project>)t2->mapProject(t2._2()), Encoders.bean(eu.dnetlib.dhp.eosc.model.Project.class) ) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(outputPath + "project"); + projectIds + .joinWith(project, projectIds.col("value").equalTo(project.col("id"))) + .map( + (MapFunction, eu.dnetlib.dhp.eosc.model.Project>) t2 -> mapProject(t2._2()), + Encoders.bean(eu.dnetlib.dhp.eosc.model.Project.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "project"); - resultProject.flatMap((FlatMapFunction) rp -> - rp.getProjectsList().stream().map(p -> Relation.newInstance(rp.getResultId(), p.getId())) - .collect(Collectors.toList()).iterator(), Encoders.bean(Relation.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(outputPath + "resultProject"); + resultProject + .flatMap( + (FlatMapFunction) rp -> rp + .getProjectsList() + .stream() + .map(p -> Relation.newInstance(rp.getResultId(), p.getId())) + .collect(Collectors.toList()) + .iterator(), + Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "resultProject"); } - private static eu.dnetlib.dhp.eosc.model.Project mapProject(eu.dnetlib.dhp.schema.oaf.Project p) throws DocumentException { + private static eu.dnetlib.dhp.eosc.model.Project mapProject(eu.dnetlib.dhp.schema.oaf.Project p) + throws DocumentException { if (Boolean.TRUE.equals(p.getDataInfo().getDeletedbyinference())) return null; eu.dnetlib.dhp.eosc.model.Project project = new eu.dnetlib.dhp.eosc.model.Project(); Optional - .ofNullable(p.getId()) - .ifPresent(id -> project.setId(id)); + .ofNullable(p.getId()) + .ifPresent(id -> project.setId(id)); Optional - .ofNullable(p.getWebsiteurl()) - .ifPresent(w -> project.setWebsiteurl(w.getValue())); + .ofNullable(p.getWebsiteurl()) + .ifPresent(w -> project.setWebsiteurl(w.getValue())); Optional - .ofNullable(p.getCode()) - .ifPresent(code -> project.setCode(code.getValue())); + .ofNullable(p.getCode()) + .ifPresent(code -> project.setCode(code.getValue())); Optional - .ofNullable(p.getAcronym()) - .ifPresent(acronynim -> project.setAcronym(acronynim.getValue())); + .ofNullable(p.getAcronym()) + .ifPresent(acronynim -> project.setAcronym(acronynim.getValue())); Optional - .ofNullable(p.getTitle()) - .ifPresent(title -> project.setTitle(title.getValue())); + .ofNullable(p.getTitle()) + .ifPresent(title -> project.setTitle(title.getValue())); Optional - .ofNullable(p.getStartdate()) - .ifPresent(sdate -> project.setStartdate(sdate.getValue())); + .ofNullable(p.getStartdate()) + .ifPresent(sdate -> project.setStartdate(sdate.getValue())); Optional - .ofNullable(p.getEnddate()) - .ifPresent(edate -> project.setEnddate(edate.getValue())); + .ofNullable(p.getEnddate()) + .ifPresent(edate -> project.setEnddate(edate.getValue())); Optional - .ofNullable(p.getCallidentifier()) - .ifPresent(cide -> project.setCallidentifier(cide.getValue())); + .ofNullable(p.getCallidentifier()) + .ifPresent(cide -> project.setCallidentifier(cide.getValue())); Optional - .ofNullable(p.getKeywords()) - .ifPresent(key -> project.setKeywords(key.getValue())); + .ofNullable(p.getKeywords()) + .ifPresent(key -> project.setKeywords(key.getValue())); Optional> omandate = Optional.ofNullable(p.getOamandatepublications()); Optional> oecsc39 = Optional.ofNullable(p.getEcsc39()); @@ -191,19 +209,19 @@ public class SparkUpdateProjectInfo implements Serializable { project.setOpenaccessmandatefordataset(false); Optional - .ofNullable(p.getEcarticle29_3()) - .ifPresent(oamandate -> project.setOpenaccessmandatefordataset(oamandate.getValue().equals("true"))); + .ofNullable(p.getEcarticle29_3()) + .ifPresent(oamandate -> project.setOpenaccessmandatefordataset(oamandate.getValue().equals("true"))); project - .setSubject( - Optional - .ofNullable(p.getSubjects()) - .map(subjs -> subjs.stream().map(s -> s.getValue()).collect(Collectors.toList())) - .orElse(new ArrayList<>())); + .setSubject( + Optional + .ofNullable(p.getSubjects()) + .map(subjs -> subjs.stream().map(s -> s.getValue()).collect(Collectors.toList())) + .orElse(new ArrayList<>())); Optional - .ofNullable(p.getSummary()) - .ifPresent(summary -> project.setSummary(summary.getValue())); + .ofNullable(p.getSummary()) + .ifPresent(summary -> project.setSummary(summary.getValue())); Optional ofundedamount = Optional.ofNullable(p.getFundedamount()); Optional> ocurrency = Optional.ofNullable(p.getCurrency()); @@ -213,8 +231,8 @@ public class SparkUpdateProjectInfo implements Serializable { if (ofundedamount.isPresent()) { if (ototalcost.isPresent()) { project - .setGranted( - Granted.newInstance(ocurrency.get().getValue(), ototalcost.get(), ofundedamount.get())); + .setGranted( + Granted.newInstance(ocurrency.get().getValue(), ototalcost.get(), ofundedamount.get())); } else { project.setGranted(Granted.newInstance(ocurrency.get().getValue(), ofundedamount.get())); } @@ -222,21 +240,21 @@ public class SparkUpdateProjectInfo implements Serializable { } project - .setH2020programme( - Optional - .ofNullable(p.getH2020classification()) - .map( - classification -> classification - .stream() - .map( - c -> Programme - .newInstance( - c.getH2020Programme().getCode(), c.getH2020Programme().getDescription())) - .collect(Collectors.toList())) - .orElse(new ArrayList<>())); + .setH2020programme( + Optional + .ofNullable(p.getH2020classification()) + .map( + classification -> classification + .stream() + .map( + c -> Programme + .newInstance( + c.getH2020Programme().getCode(), c.getH2020Programme().getDescription())) + .collect(Collectors.toList())) + .orElse(new ArrayList<>())); Optional>> ofundTree = Optional - .ofNullable(p.getFundingtree()); + .ofNullable(p.getFundingtree()); List funList = new ArrayList<>(); if (ofundTree.isPresent()) { for (Field fundingtree : ofundTree.get()) { diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_dump_organizationprojectrelations.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_dump_organizationprojectrelations.json new file mode 100644 index 0000000..763e0df --- /dev/null +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_dump_organizationprojectrelations.json @@ -0,0 +1,26 @@ + +[ + + + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the name node", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "the path used to store temporary output files", + "paramRequired": false + } +] + + + diff --git a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResultTest.java b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResultTest.java index 691cdab..13eb61c 100644 --- a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResultTest.java +++ b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResultTest.java @@ -24,8 +24,8 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.eosc.model.Indicator; import eu.dnetlib.dhp.eosc.model.Affiliation; +import eu.dnetlib.dhp.eosc.model.Indicator; import eu.dnetlib.dhp.eosc.model.Result; import eu.dnetlib.dhp.schema.action.AtomicAction; import scala.Tuple2;