From 7fca920b5fa64474e9c9fb216a5ff540c3e0c013 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 25 Oct 2023 11:14:46 +0200 Subject: [PATCH] Added extention to dump Projects and also relations of type resultProject --- .../eu/dnetlib/dhp/eosc/model/Funder.java | 64 ++--- .../dnetlib/dhp/eosc/model/FunderShort.java | 58 +++++ .../eu/dnetlib/dhp/eosc/model/Fundings.java | 44 ++++ .../eu/dnetlib/dhp/eosc/model/Granted.java | 66 +++++ .../eu/dnetlib/dhp/eosc/model/Programme.java | 46 ++++ .../eu/dnetlib/dhp/eosc/model/Project.java | 245 +++++++++++++----- .../dhp/eosc/model/ProjectSummary.java | 97 +++++++ .../eu/dnetlib/dhp/eosc/model/Result.java | 6 +- ...ExtendEoscResultWithOrganizationStep2.java | 12 +- .../dhp/oa/graph/dump/eosc/ResultProject.java | 8 +- .../dump/eosc/SparkPrepareResultProject.java | 18 +- .../dump/eosc/SparkUpdateProjectInfo.java | 213 ++++++++++++++- .../oa/graph/dump/eosc/oozie_app/workflow.xml | 118 +-------- .../oa/graph/dump/UpdateProjectInfoTest.java | 12 +- 14 files changed, 749 insertions(+), 258 deletions(-) create mode 100644 dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/FunderShort.java create mode 100644 dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Fundings.java create mode 100644 dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Granted.java create mode 100644 dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Programme.java create mode 100644 dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/ProjectSummary.java diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Funder.java b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Funder.java index cea8c3e..f2b198d 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Funder.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Funder.java @@ -1,58 +1,28 @@ - package eu.dnetlib.dhp.eosc.model; -import java.io.Serializable; - -import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; - /** * @author miriam.baglioni - * @Date 26/01/23 + * @Date 25/10/23 */ -public class Funder implements Serializable { - @JsonSchema(description = "The short name of the funder (EC)") - private String shortName; - @JsonSchema(description = "The name of the funder (European Commission)") - private String name; + import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; - @JsonSchema( - description = "Geographical jurisdiction (e.g. for European Commission is EU, for Croatian Science Foundation is HR)") - private String jurisdiction; +/** + * To store information about the funder funding the project related to the result. It extends + * eu.dnetlib.dhp.schema.dump.oaf.Funder with the following parameter: - - private + * eu.dnetdlib.dhp.schema.dump.oaf.graph.Fundings funding_stream to store the fundingstream + */ + public class Funder extends FunderShort { - public String getJurisdiction() { - return jurisdiction; - } + @JsonSchema(description = "Description of the funding stream") + private Fundings funding_stream; - public void setJurisdiction(String jurisdiction) { - this.jurisdiction = jurisdiction; - } + public Fundings getFunding_stream() { + return funding_stream; + } - public String getShortName() { - return shortName; - } - - public void setShortName(String shortName) { - this.shortName = shortName; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - @JsonSchema(description = "Stream of funding (e.g. for European Commission can be H2020 or FP7)") - private String fundingStream; - - public String getFundingStream() { - return fundingStream; - } - - public void setFundingStream(String fundingStream) { - this.fundingStream = fundingStream; - } -} + public void setFunding_stream(Fundings funding_stream) { + this.funding_stream = funding_stream; + } + } diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/FunderShort.java b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/FunderShort.java new file mode 100644 index 0000000..32711cf --- /dev/null +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/FunderShort.java @@ -0,0 +1,58 @@ + +package eu.dnetlib.dhp.eosc.model; + +import java.io.Serializable; + +import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; + +/** + * @author miriam.baglioni + * @Date 26/01/23 + */ +public class FunderShort implements Serializable { + + @JsonSchema(description = "The short name of the funder (EC)") + private String shortName; + + @JsonSchema(description = "The name of the funder (European Commission)") + private String name; + + @JsonSchema( + description = "Geographical jurisdiction (e.g. for European Commission is EU, for Croatian Science Foundation is HR)") + private String jurisdiction; + + public String getJurisdiction() { + return jurisdiction; + } + + public void setJurisdiction(String jurisdiction) { + this.jurisdiction = jurisdiction; + } + + public String getShortName() { + return shortName; + } + + public void setShortName(String shortName) { + this.shortName = shortName; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @JsonSchema(description = "Stream of funding (e.g. for European Commission can be H2020 or FP7)") + private String fundingStream; + + public String getFundingStream() { + return fundingStream; + } + + public void setFundingStream(String fundingStream) { + this.fundingStream = fundingStream; + } +} diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Fundings.java b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Fundings.java new file mode 100644 index 0000000..245a140 --- /dev/null +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Fundings.java @@ -0,0 +1,44 @@ +package eu.dnetlib.dhp.eosc.model; + +/** + * @author miriam.baglioni + * @Date 25/10/23 + */ + + + +import java.io.Serializable; + +import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; + +/** + * To store inforamtion about the funding stream. It has two parameters: - private String id to store the id of the + * fundings stream. The id is created by appending the shortname of the funder to the name of each level in the xml + * representing the fundng stream. For example: if the funder is the European Commission, the funding level 0 name is + * FP7, the funding level 1 name is SP3 and the funding level 2 name is PEOPLE then the id will be: EC::FP7::SP3::PEOPLE + * - private String description to describe the funding stream. It is created by concatenating the description of each + * funding level so for the example above the description would be: SEVENTH FRAMEWORK PROGRAMME - SP3-People - + * Marie-Curie Actions + */ +public class Fundings implements Serializable { + + @JsonSchema(description = "Id of the funding stream") + private String id; + private String description; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } +} diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Granted.java b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Granted.java new file mode 100644 index 0000000..4202ea2 --- /dev/null +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Granted.java @@ -0,0 +1,66 @@ +package eu.dnetlib.dhp.eosc.model; + +/** + * @author miriam.baglioni + * @Date 25/10/23 + */ + + +import java.io.Serializable; + +import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; + +/** + * To describe the funded amount. It has the following parameters: - private String currency to store the currency of + * the fund - private float totalcost to store the total cost of the project - private float fundedamount to store the + * funded amount by the funder + */ +public class Granted implements Serializable { + @JsonSchema(description = "The currency of the granted amount (e.g. EUR)") + private String currency; + + @JsonSchema(description = "The total cost of the project") + private float totalcost; + + @JsonSchema(description = "The funded amount") + private float fundedamount; + + public String getCurrency() { + return currency; + } + + public void setCurrency(String currency) { + this.currency = currency; + } + + public float getTotalcost() { + return totalcost; + } + + public void setTotalcost(float totalcost) { + this.totalcost = totalcost; + } + + public float getFundedamount() { + return fundedamount; + } + + public void setFundedamount(float fundedamount) { + this.fundedamount = fundedamount; + } + + public static Granted newInstance(String currency, float totalcost, float fundedamount) { + Granted granted = new Granted(); + granted.currency = currency; + granted.totalcost = totalcost; + granted.fundedamount = fundedamount; + return granted; + } + + public static Granted newInstance(String currency, float fundedamount) { + Granted granted = new Granted(); + granted.currency = currency; + granted.fundedamount = fundedamount; + return granted; + } +} diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Programme.java b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Programme.java new file mode 100644 index 0000000..76e9846 --- /dev/null +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Programme.java @@ -0,0 +1,46 @@ +package eu.dnetlib.dhp.eosc.model; + +/** + * @author miriam.baglioni + * @Date 25/10/23 + */ + + +import java.io.Serializable; + +import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; + +/** + * To store information about the ec programme for the project. It has the following parameters: - private String code + * to store the code of the programme - private String description to store the description of the programme + */ +public class Programme implements Serializable { + @JsonSchema(description = "The code of the programme") + private String code; + + @JsonSchema(description = "The description of the programme") + private String description; + + public String getCode() { + return code; + } + + public void setCode(String code) { + this.code = code; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public static Programme newInstance(String code, String description) { + Programme p = new Programme(); + p.code = code; + p.description = description; + return p; + } +} diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Project.java b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Project.java index 810b657..e6a9d5b 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Project.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Project.java @@ -1,97 +1,206 @@ - package eu.dnetlib.dhp.eosc.model; -import java.io.Serializable; - -import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; - /** * @author miriam.baglioni - * @Date 26/01/23 + * @Date 25/10/23 */ + + +import java.io.Serializable; +import java.util.List; + +import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; + + +/** + * This is the class representing the Project in the model used for the dumps of the whole graph. At the moment the dump + * of the Projects differs from the other dumps because we do not create relations between Funders (Organization) and + * Projects but we put the information about the Funder within the Project representation. We also removed the + * collected from element from the Project. No relation between the Project and the Datasource entity from which it is + * collected will be created. We will never create relations between Project and Datasource. In case some relation will + * be extracted from the Project they will refer the Funder and will be of type ( organization -> funds -> project, + * project -> isFundedBy -> organization) We also removed the duration parameter because the most of times it is set to + * 0. It has the following parameters: + * - private String id to store the id of the project (OpenAIRE id) + * - private String websiteurl to store the websiteurl of the project + * - private String code to store the grant agreement of the project + * - private String acronym to store the acronym of the project + * - private String title to store the tile of the project + * - private String startdate to store the start date + * - private String enddate to store the end date + * - private String callidentifier to store the call indentifier + * - private String keywords to store the keywords + * - private boolean openaccessmandateforpublications to store if the project must accomplish to the open access mandate + * for publications. This value will be set to true if one of the field in the project represented in the internal model + * is set to true + * - private boolean openaccessmandatefordataset to store if the project must accomplish to the open access mandate for + * dataset. It is set to the value in the corresponding filed of the project represented in the internal model + * - private List subject to store the list of subjects of the project + * - private List funding to store the list of funder of the project + * - private String summary to store the summary of the project + * - private Granted granted to store the granted amount + * - private List h2020programme to store the list of programmes the project is related to + */ + public class Project implements Serializable { - @JsonSchema(description = "The OpenAIRE id for the project") - protected String id;// OpenAIRE id + private String id; - @JsonSchema(description = "The grant agreement number") - protected String code; + private String websiteurl; + private String code; + private String acronym; + private String title; - @JsonSchema(description = "The acronym of the project") - protected String acronym; + private String startdate; - protected String title; + private String enddate; - @JsonSchema(description = "Information about the funder funding the project") - private Funder funder; + private String callidentifier; - private Provenance provenance; + private String keywords; - private Validated validated; + private boolean openaccessmandateforpublications; - public void setValidated(Validated validated) { - this.validated = validated; - } + private boolean openaccessmandatefordataset; + private List subject; - public Validated getValidated() { - return validated; - } + @JsonSchema(description = "Funding information for the project") + private List funding; - public Provenance getProvenance() { - return provenance; - } + private String summary; - public void setProvenance(Provenance provenance) { - this.provenance = provenance; - } + @JsonSchema(description = "The money granted to the project") + private Granted granted; - public Funder getFunder() { - return funder; - } + @JsonSchema(description = "The h2020 programme funding the project") + private List h2020programme; - public void setFunder(Funder funders) { - this.funder = funders; - } - public String getId() { - return id; - } - public void setId(String id) { - this.id = id; - } + public String getId() { + return id; + } - public String getCode() { - return code; - } + public void setId(String id) { + this.id = id; + } - public void setCode(String code) { - this.code = code; - } + public String getWebsiteurl() { + return websiteurl; + } - public String getAcronym() { - return acronym; - } + public void setWebsiteurl(String websiteurl) { + this.websiteurl = websiteurl; + } - public void setAcronym(String acronym) { - this.acronym = acronym; - } + public String getCode() { + return code; + } - public String getTitle() { - return title; - } + public void setCode(String code) { + this.code = code; + } - public void setTitle(String title) { - this.title = title; - } + public String getAcronym() { + return acronym; + } - public static Project newInstance(String id, String code, String acronym, String title, Funder funder) { - Project project = new Project(); - project.setAcronym(acronym); - project.setCode(code); - project.setFunder(funder); - project.setId(id); - project.setTitle(title); - return project; - } + public void setAcronym(String acronym) { + this.acronym = acronym; + } + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getStartdate() { + return startdate; + } + + public void setStartdate(String startdate) { + this.startdate = startdate; + } + + public String getEnddate() { + return enddate; + } + + public void setEnddate(String enddate) { + this.enddate = enddate; + } + + public String getCallidentifier() { + return callidentifier; + } + + public void setCallidentifier(String callidentifier) { + this.callidentifier = callidentifier; + } + + public String getKeywords() { + return keywords; + } + + public void setKeywords(String keywords) { + this.keywords = keywords; + } + + public boolean isOpenaccessmandateforpublications() { + return openaccessmandateforpublications; + } + + public void setOpenaccessmandateforpublications(boolean openaccessmandateforpublications) { + this.openaccessmandateforpublications = openaccessmandateforpublications; + } + + public boolean isOpenaccessmandatefordataset() { + return openaccessmandatefordataset; + } + + public void setOpenaccessmandatefordataset(boolean openaccessmandatefordataset) { + this.openaccessmandatefordataset = openaccessmandatefordataset; + } + + public List getSubject() { + return subject; + } + + public void setSubject(List subject) { + this.subject = subject; + } + + public List getFunding() { + return funding; + } + + public void setFunding(List funding) { + this.funding = funding; + } + + public String getSummary() { + return summary; + } + + public void setSummary(String summary) { + this.summary = summary; + } + + public Granted getGranted() { + return granted; + } + + public void setGranted(Granted granted) { + this.granted = granted; + } + + public List getH2020programme() { + return h2020programme; + } + + public void setH2020programme(List h2020programme) { + this.h2020programme = h2020programme; + } } diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/ProjectSummary.java b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/ProjectSummary.java new file mode 100644 index 0000000..d5011a0 --- /dev/null +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/ProjectSummary.java @@ -0,0 +1,97 @@ + +package eu.dnetlib.dhp.eosc.model; + +import java.io.Serializable; + +import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; + +/** + * @author miriam.baglioni + * @Date 26/01/23 + */ +public class ProjectSummary implements Serializable { + @JsonSchema(description = "The OpenAIRE id for the project") + protected String id;// OpenAIRE id + + @JsonSchema(description = "The grant agreement number") + protected String code; + + @JsonSchema(description = "The acronym of the project") + protected String acronym; + + protected String title; + + @JsonSchema(description = "Information about the funder funding the project") + private FunderShort funder; + + private Provenance provenance; + + private Validated validated; + + public void setValidated(Validated validated) { + this.validated = validated; + } + + public Validated getValidated() { + return validated; + } + + public Provenance getProvenance() { + return provenance; + } + + public void setProvenance(Provenance provenance) { + this.provenance = provenance; + } + + public FunderShort getFunder() { + return funder; + } + + public void setFunder(FunderShort funders) { + this.funder = funders; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getCode() { + return code; + } + + public void setCode(String code) { + this.code = code; + } + + public String getAcronym() { + return acronym; + } + + public void setAcronym(String acronym) { + this.acronym = acronym; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public static ProjectSummary newInstance(String id, String code, String acronym, String title, FunderShort funder) { + ProjectSummary project = new ProjectSummary(); + project.setAcronym(acronym); + project.setCode(code); + project.setFunder(funder); + project.setId(id); + project.setTitle(title); + return project; + } + +} diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Result.java b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Result.java index e3ef5b5..df9325e 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Result.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Result.java @@ -30,7 +30,7 @@ public class Result implements Serializable { private Indicator indicator; @JsonSchema(description = "List of projects (i.e. grants) that (co-)funded the production ofn the research results") - private List projects; + private List projects; @JsonSchema( description = "Reference to a relevant research infrastructure, initiative or community (RI/RC) among those collaborating with OpenAIRE. Please see https://connect.openaire.eu") @@ -409,11 +409,11 @@ public class Result implements Serializable { this.collectedfrom = collectedfrom; } - public List getProjects() { + public List getProjects() { return projects; } - public void setProjects(List projects) { + public void setProjects(List projects) { this.projects = projects; } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java index 9c0f785..14f4836 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java @@ -60,8 +60,8 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { final String workingPath = parser.get("workingPath"); log.info("workingPath: {}", workingPath); -// final String outputPath = parser.get("outputPath"); -// log.info("outputPath: {}", outputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); SparkConf conf = new SparkConf(); @@ -70,11 +70,11 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, workingPath + "publicationextendedaffiliation"); - addOrganizations(spark, inputPath, workingPath ); + addOrganizations(spark, inputPath, workingPath , outputPath); }); } - private static void addOrganizations(SparkSession spark, String inputPath, String workingPath) { + private static void addOrganizations(SparkSession spark, String inputPath, String workingPath, String outputPath) { Dataset results = Utils .readPath(spark, workingPath + "publication", Result.class); @@ -162,7 +162,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { .write() .mode(SaveMode.Overwrite) .option("compression","gzip") - .json(workingPath + "organization"); + .json(outputPath + "organization"); relations .joinWith(organizations, relations.col("source").equalTo(organizations.col("id"))) @@ -170,7 +170,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { .write() .mode(SaveMode.Overwrite) .option("compression","gzip") - .json(workingPath + "resultOrganization"); + .json(outputPath + "resultOrganization"); } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ResultProject.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ResultProject.java index 4035d17..def635e 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ResultProject.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ResultProject.java @@ -4,11 +4,11 @@ package eu.dnetlib.dhp.oa.graph.dump.eosc; import java.io.Serializable; import java.util.List; -import eu.dnetlib.dhp.eosc.model.Project; +import eu.dnetlib.dhp.eosc.model.ProjectSummary; public class ResultProject implements Serializable { private String resultId; - private List projectsList; + private List projectsList; public String getResultId() { return resultId; @@ -18,11 +18,11 @@ public class ResultProject implements Serializable { this.resultId = resultId; } - public List getProjectsList() { + public List getProjectsList() { return projectsList; } - public void setProjectsList(List projectsList) { + public void setProjectsList(List projectsList) { this.projectsList = projectsList; } } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkPrepareResultProject.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkPrepareResultProject.java index 03630b6..3a0770f 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkPrepareResultProject.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkPrepareResultProject.java @@ -26,8 +26,8 @@ import org.slf4j.LoggerFactory; import org.xml.sax.SAXException; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.eosc.model.Funder; -import eu.dnetlib.dhp.eosc.model.Project; +import eu.dnetlib.dhp.eosc.model.FunderShort; +import eu.dnetlib.dhp.eosc.model.ProjectSummary; import eu.dnetlib.dhp.eosc.model.Provenance; import eu.dnetlib.dhp.eosc.model.Validated; import eu.dnetlib.dhp.schema.common.ModelConstants; @@ -103,9 +103,9 @@ public class SparkPrepareResultProject implements Serializable { rp.setResultId(s); eu.dnetlib.dhp.schema.oaf.Project p = first._1(); projectSet.add(p.getId()); - Project ps = getProject(p, first._2); + ProjectSummary ps = getProject(p, first._2); - List projList = new ArrayList<>(); + List projList = new ArrayList<>(); projList.add(ps); rp.setProjectsList(projList); it.forEachRemaining(c -> { @@ -132,8 +132,8 @@ public class SparkPrepareResultProject implements Serializable { .json(outputPath); } - private static Project getProject(eu.dnetlib.dhp.schema.oaf.Project op, Relation relation) { - Project p = Project + private static ProjectSummary getProject(eu.dnetlib.dhp.schema.oaf.Project op, Relation relation) { + ProjectSummary p = ProjectSummary .newInstance( op.getId(), op.getCode().getValue(), @@ -148,7 +148,7 @@ public class SparkPrepareResultProject implements Serializable { Optional .ofNullable(op.getFundingtree()) .map(value -> { - List tmp = value + List tmp = value .stream() .map(ft -> getFunder(ft.getValue())) .collect(Collectors.toList()); @@ -174,8 +174,8 @@ public class SparkPrepareResultProject implements Serializable { } - private static Funder getFunder(String fundingtree) { - final Funder f = new Funder(); + private static FunderShort getFunder(String fundingtree) { + final FunderShort f = new FunderShort(); final Document doc; try { final SAXReader reader = new SAXReader(); diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkUpdateProjectInfo.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkUpdateProjectInfo.java index 376a677..2bd2978 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkUpdateProjectInfo.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkUpdateProjectInfo.java @@ -4,23 +4,35 @@ package eu.dnetlib.dhp.oa.graph.dump.eosc; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; +import eu.dnetlib.dhp.eosc.model.*; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.Project; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.Node; +import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.eosc.model.Result; import eu.dnetlib.dhp.oa.graph.dump.Constants; +import scala.Array; import scala.Tuple2; public class SparkUpdateProjectInfo implements Serializable { @@ -47,12 +59,15 @@ public class SparkUpdateProjectInfo implements Serializable { final String inputPath = parser.get("sourcePath"); log.info("inputPath: {}", inputPath); - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); final String preparedInfoPath = parser.get("preparedInfoPath"); log.info("preparedInfoPath: {}", preparedInfoPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + final String dumpType = Optional .ofNullable(parser.get("dumpType")) .orElse(Constants.DUMPTYPE.COMMUNITY.getType()); @@ -64,18 +79,19 @@ public class SparkUpdateProjectInfo implements Serializable { conf, isSparkSessionManaged, spark -> { - Utils.removeOutputDir(spark, outputPath); - extend(spark, inputPath, outputPath, preparedInfoPath); + Utils.removeOutputDir(spark, workingPath + "publicationextendedproject"); + extend(spark, inputPath, workingPath, preparedInfoPath, outputPath); }); } private static void extend( SparkSession spark, String inputPath, - String outputPath, - String preparedInfoPath) { + String workingPath, + String preparedInfoPath, + String outputPath) { - Dataset result = Utils.readPath(spark, inputPath, Result.class); + Dataset result = Utils.readPath(spark, workingPath + "publicationextendedaffiliation", Result.class); Dataset resultProject = Utils.readPath(spark, preparedInfoPath, ResultProject.class); result @@ -90,7 +106,186 @@ public class SparkUpdateProjectInfo implements Serializable { .write() .option("compression", "gzip") .mode(SaveMode.Append) - .json(outputPath); + .json(workingPath + "publicationextendedproject"); + + Dataset project = Utils.readPath(spark, inputPath + "/project", Project.class); + + Dataset projectIds = result.joinWith(resultProject, result.col("id").equalTo(resultProject.col("resultId"))) + .flatMap((FlatMapFunction, String>) t2 -> t2._2().getProjectsList() + .stream().map(p -> p.getId()).collect(Collectors.toList()).iterator(), Encoders.STRING()) + .distinct(); + + projectIds.joinWith(project, projectIds.col("value").equalTo(project.col("id"))) + .map((MapFunction, eu.dnetlib.dhp.eosc.model.Project>)t2->mapProject(t2._2()), Encoders.bean(eu.dnetlib.dhp.eosc.model.Project.class) ) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(outputPath + "project"); + + resultProject.flatMap((FlatMapFunction) rp -> + rp.getProjectsList().stream().map(p -> Relation.newInstance(rp.getResultId(), p.getId())) + .collect(Collectors.toList()).iterator(), Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(outputPath + "resultProject"); + } + + private static eu.dnetlib.dhp.eosc.model.Project mapProject(eu.dnetlib.dhp.schema.oaf.Project p) throws DocumentException { + if (Boolean.TRUE.equals(p.getDataInfo().getDeletedbyinference())) + return null; + + eu.dnetlib.dhp.eosc.model.Project project = new eu.dnetlib.dhp.eosc.model.Project(); + + Optional + .ofNullable(p.getId()) + .ifPresent(id -> project.setId(id)); + + Optional + .ofNullable(p.getWebsiteurl()) + .ifPresent(w -> project.setWebsiteurl(w.getValue())); + + Optional + .ofNullable(p.getCode()) + .ifPresent(code -> project.setCode(code.getValue())); + + Optional + .ofNullable(p.getAcronym()) + .ifPresent(acronynim -> project.setAcronym(acronynim.getValue())); + + Optional + .ofNullable(p.getTitle()) + .ifPresent(title -> project.setTitle(title.getValue())); + + Optional + .ofNullable(p.getStartdate()) + .ifPresent(sdate -> project.setStartdate(sdate.getValue())); + + Optional + .ofNullable(p.getEnddate()) + .ifPresent(edate -> project.setEnddate(edate.getValue())); + + Optional + .ofNullable(p.getCallidentifier()) + .ifPresent(cide -> project.setCallidentifier(cide.getValue())); + + Optional + .ofNullable(p.getKeywords()) + .ifPresent(key -> project.setKeywords(key.getValue())); + + Optional> omandate = Optional.ofNullable(p.getOamandatepublications()); + Optional> oecsc39 = Optional.ofNullable(p.getEcsc39()); + boolean mandate = false; + if (omandate.isPresent()) { + if (omandate.get().getValue().equals("true")) { + mandate = true; + } + } + if (oecsc39.isPresent()) { + if (oecsc39.get().getValue().equals("true")) { + mandate = true; + } + } + + project.setOpenaccessmandateforpublications(mandate); + project.setOpenaccessmandatefordataset(false); + + Optional + .ofNullable(p.getEcarticle29_3()) + .ifPresent(oamandate -> project.setOpenaccessmandatefordataset(oamandate.getValue().equals("true"))); + + project + .setSubject( + Optional + .ofNullable(p.getSubjects()) + .map(subjs -> subjs.stream().map(s -> s.getValue()).collect(Collectors.toList())) + .orElse(new ArrayList<>())); + + Optional + .ofNullable(p.getSummary()) + .ifPresent(summary -> project.setSummary(summary.getValue())); + + Optional ofundedamount = Optional.ofNullable(p.getFundedamount()); + Optional> ocurrency = Optional.ofNullable(p.getCurrency()); + Optional ototalcost = Optional.ofNullable(p.getTotalcost()); + + if (ocurrency.isPresent()) { + if (ofundedamount.isPresent()) { + if (ototalcost.isPresent()) { + project + .setGranted( + Granted.newInstance(ocurrency.get().getValue(), ototalcost.get(), ofundedamount.get())); + } else { + project.setGranted(Granted.newInstance(ocurrency.get().getValue(), ofundedamount.get())); + } + } + } + + project + .setH2020programme( + Optional + .ofNullable(p.getH2020classification()) + .map( + classification -> classification + .stream() + .map( + c -> Programme + .newInstance( + c.getH2020Programme().getCode(), c.getH2020Programme().getDescription())) + .collect(Collectors.toList())) + .orElse(new ArrayList<>())); + + Optional>> ofundTree = Optional + .ofNullable(p.getFundingtree()); + List funList = new ArrayList<>(); + if (ofundTree.isPresent()) { + for (Field fundingtree : ofundTree.get()) { + funList.add(getFunder(fundingtree.getValue())); + } + } + project.setFunding(funList); + + return project; + } + + public static Funder getFunder(String fundingtree) throws DocumentException { + Funder f = new Funder(); + final Document doc; + + doc = new SAXReader().read(new StringReader(fundingtree)); + f.setShortName(((org.dom4j.Node) (doc.selectNodes("//funder/shortname").get(0))).getText()); + f.setName(((org.dom4j.Node) (doc.selectNodes("//funder/name").get(0))).getText()); + f.setJurisdiction(((org.dom4j.Node) (doc.selectNodes("//funder/jurisdiction").get(0))).getText()); + + String id = ""; + + StringBuilder bld = new StringBuilder(); + + int level = 0; + List nodes = doc.selectNodes("//funding_level_" + level); + while (!nodes.isEmpty()) { + for (org.dom4j.Node n : nodes) { + + List node = n.selectNodes("./id"); + id = ((org.dom4j.Node) node.get(0)).getText(); + id = id.substring(id.indexOf("::") + 2); + + node = n.selectNodes("./description"); + bld.append(((Node) node.get(0)).getText() + " - "); + + } + level += 1; + nodes = doc.selectNodes("//funding_level_" + level); + } + String description = bld.toString(); + if (!id.equals("")) { + Fundings fundings = new Fundings(); + fundings.setId(id); + fundings.setDescription(description.substring(0, description.length() - 3).trim()); + f.setFunding_stream(fundings); + } + + return f; } diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml index 631986f..def9f0c 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml @@ -165,32 +165,7 @@ --sourcePath${sourcePath} --workingPath${workingDir}/dump/ - - - - - - - - - yarn - cluster - Extend Dump Publication with indicators - eu.dnetlib.dhp.oa.graph.dump.eosc.ExtendWithUsageCounts - dump-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - - --actionSetPath${actionSetPath} - --resultPath${workingDir}/dump/publicationextendedaffiliation - --outputPath${workingDir}/dump/publicationextended + --outputPath${outputPath}/dump/ @@ -241,38 +216,15 @@ --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --sourcePath${sourcePath} - --resultPath${workingDir}/dump/dataset - --outputPath${workingDir}/dump/datasetextendedaffiliation - - - - - - - yarn - cluster - Extend Dump Dataset with indicators - eu.dnetlib.dhp.oa.graph.dump.eosc.ExtendWithUsageCounts - dump-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - - --actionSetPath${actionSetPath} - --resultPath${workingDir}/dump/datasetextendedaffiliation - --outputPath${workingDir}/dump/datasetextended + --workingPath${workingDir}/dump/ + --outputPath${outputPath}/dump/ + yarn @@ -317,32 +269,8 @@ --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --sourcePath${sourcePath} - --resultPath${workingDir}/dump/otherresearchproduct - --outputPath${workingDir}/dump/otherresearchproductextendedaffiliation - - - - - - - yarn - cluster - Extend Dump ORP with indicators - eu.dnetlib.dhp.oa.graph.dump.eosc.ExtendWithUsageCounts - dump-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - - --actionSetPath${actionSetPath} - --resultPath${workingDir}/dump/otherresearchproductextendedaffiliation - --outputPath${workingDir}/dump/otherresearchproductextended + --workingPath${workingDir}/dump/ + --outputPath${outputPath}/dump/ @@ -393,37 +321,14 @@ --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --sourcePath${sourcePath} - --resultPath${workingDir}/dump/software - --outputPath${workingDir}/dump/softwareextendedaffiliation - - - - - - - yarn - cluster - Extend Dump ORP with indicators - eu.dnetlib.dhp.oa.graph.dump.eosc.ExtendWithUsageCounts - dump-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - - --actionSetPath${actionSetPath} - --resultPath${workingDir}/dump/softwareextendedaffiliation - --outputPath${workingDir}/dump/softwareextended + --workingPath${workingDir}/dump/ + --outputPath${outputPath}/dump/ + @@ -474,10 +379,11 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --sourcePath${workingDir}/dump/publicationextendedaffiliation - --outputPath${workingDir}/dump/publicationextendedproject + --sourcePath${sourcePath} + --workingPath${workingDir}/dump/ --preparedInfoPath${workingDir}/preparedInfo --dumpTypeeosc + --outputPath${outputPath}/dump/ diff --git a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/UpdateProjectInfoTest.java b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/UpdateProjectInfoTest.java index 33c1963..e75d2ca 100644 --- a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/UpdateProjectInfoTest.java +++ b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/UpdateProjectInfoTest.java @@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.eosc.model.Project; +import eu.dnetlib.dhp.eosc.model.ProjectSummary; import eu.dnetlib.dhp.eosc.model.Result; import eu.dnetlib.dhp.oa.graph.dump.eosc.SparkUpdateProjectInfo; @@ -194,15 +194,15 @@ public class UpdateProjectInfoTest { .filter("id = '50|pensoft_____::00ea4a1cd53806a97d62ea6bf268f2a2' and code = '119027'") .count()); - Project project = verificationDataset + ProjectSummary project = verificationDataset .map( - (MapFunction) cr -> cr + (MapFunction) cr -> cr .getProjects() .stream() .filter(p -> p.getValidated() != null) .collect(Collectors.toList()) .get(0), - Encoders.bean(Project.class)) + Encoders.bean(ProjectSummary.class)) .first(); Assertions.assertTrue(project.getFunder().getName().equals("Academy of Finland")); @@ -213,13 +213,13 @@ public class UpdateProjectInfoTest { project = verificationDataset .map( - (MapFunction) cr -> cr + (MapFunction) cr -> cr .getProjects() .stream() .filter(p -> p.getValidated() == null) .collect(Collectors.toList()) .get(0), - Encoders.bean(Project.class)) + Encoders.bean(ProjectSummary.class)) .first(); Assertions.assertTrue(project.getFunder().getName().equals("European Commission"));