Added extention to dump Projects and also relations of type resultProject

This commit is contained in:
Miriam Baglioni 2023-10-25 11:14:46 +02:00
parent a623883b62
commit 7fca920b5f
14 changed files with 749 additions and 258 deletions

View File

@ -1,58 +1,28 @@
package eu.dnetlib.dhp.eosc.model;
import java.io.Serializable;
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;
/**
* @author miriam.baglioni
* @Date 26/01/23
* @Date 25/10/23
*/
public class Funder implements Serializable {
@JsonSchema(description = "The short name of the funder (EC)")
private String shortName;
@JsonSchema(description = "The name of the funder (European Commission)")
private String name;
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;
@JsonSchema(
description = "Geographical jurisdiction (e.g. for European Commission is EU, for Croatian Science Foundation is HR)")
private String jurisdiction;
/**
* To store information about the funder funding the project related to the result. It extends
* eu.dnetlib.dhp.schema.dump.oaf.Funder with the following parameter: - - private
* eu.dnetdlib.dhp.schema.dump.oaf.graph.Fundings funding_stream to store the fundingstream
*/
public class Funder extends FunderShort {
public String getJurisdiction() {
return jurisdiction;
}
@JsonSchema(description = "Description of the funding stream")
private Fundings funding_stream;
public void setJurisdiction(String jurisdiction) {
this.jurisdiction = jurisdiction;
}
public Fundings getFunding_stream() {
return funding_stream;
}
public String getShortName() {
return shortName;
}
public void setShortName(String shortName) {
this.shortName = shortName;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@JsonSchema(description = "Stream of funding (e.g. for European Commission can be H2020 or FP7)")
private String fundingStream;
public String getFundingStream() {
return fundingStream;
}
public void setFundingStream(String fundingStream) {
this.fundingStream = fundingStream;
}
}
public void setFunding_stream(Fundings funding_stream) {
this.funding_stream = funding_stream;
}
}

View File

@ -0,0 +1,58 @@
package eu.dnetlib.dhp.eosc.model;
import java.io.Serializable;
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;
/**
* @author miriam.baglioni
* @Date 26/01/23
*/
public class FunderShort implements Serializable {
@JsonSchema(description = "The short name of the funder (EC)")
private String shortName;
@JsonSchema(description = "The name of the funder (European Commission)")
private String name;
@JsonSchema(
description = "Geographical jurisdiction (e.g. for European Commission is EU, for Croatian Science Foundation is HR)")
private String jurisdiction;
public String getJurisdiction() {
return jurisdiction;
}
public void setJurisdiction(String jurisdiction) {
this.jurisdiction = jurisdiction;
}
public String getShortName() {
return shortName;
}
public void setShortName(String shortName) {
this.shortName = shortName;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@JsonSchema(description = "Stream of funding (e.g. for European Commission can be H2020 or FP7)")
private String fundingStream;
public String getFundingStream() {
return fundingStream;
}
public void setFundingStream(String fundingStream) {
this.fundingStream = fundingStream;
}
}

View File

@ -0,0 +1,44 @@
package eu.dnetlib.dhp.eosc.model;
/**
* @author miriam.baglioni
* @Date 25/10/23
*/
import java.io.Serializable;
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;
/**
* To store inforamtion about the funding stream. It has two parameters: - private String id to store the id of the
* fundings stream. The id is created by appending the shortname of the funder to the name of each level in the xml
* representing the fundng stream. For example: if the funder is the European Commission, the funding level 0 name is
* FP7, the funding level 1 name is SP3 and the funding level 2 name is PEOPLE then the id will be: EC::FP7::SP3::PEOPLE
* - private String description to describe the funding stream. It is created by concatenating the description of each
* funding level so for the example above the description would be: SEVENTH FRAMEWORK PROGRAMME - SP3-People -
* Marie-Curie Actions
*/
public class Fundings implements Serializable {
@JsonSchema(description = "Id of the funding stream")
private String id;
private String description;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
}

View File

@ -0,0 +1,66 @@
package eu.dnetlib.dhp.eosc.model;
/**
* @author miriam.baglioni
* @Date 25/10/23
*/
import java.io.Serializable;
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;
/**
* To describe the funded amount. It has the following parameters: - private String currency to store the currency of
* the fund - private float totalcost to store the total cost of the project - private float fundedamount to store the
* funded amount by the funder
*/
public class Granted implements Serializable {
@JsonSchema(description = "The currency of the granted amount (e.g. EUR)")
private String currency;
@JsonSchema(description = "The total cost of the project")
private float totalcost;
@JsonSchema(description = "The funded amount")
private float fundedamount;
public String getCurrency() {
return currency;
}
public void setCurrency(String currency) {
this.currency = currency;
}
public float getTotalcost() {
return totalcost;
}
public void setTotalcost(float totalcost) {
this.totalcost = totalcost;
}
public float getFundedamount() {
return fundedamount;
}
public void setFundedamount(float fundedamount) {
this.fundedamount = fundedamount;
}
public static Granted newInstance(String currency, float totalcost, float fundedamount) {
Granted granted = new Granted();
granted.currency = currency;
granted.totalcost = totalcost;
granted.fundedamount = fundedamount;
return granted;
}
public static Granted newInstance(String currency, float fundedamount) {
Granted granted = new Granted();
granted.currency = currency;
granted.fundedamount = fundedamount;
return granted;
}
}

View File

@ -0,0 +1,46 @@
package eu.dnetlib.dhp.eosc.model;
/**
* @author miriam.baglioni
* @Date 25/10/23
*/
import java.io.Serializable;
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;
/**
* To store information about the ec programme for the project. It has the following parameters: - private String code
* to store the code of the programme - private String description to store the description of the programme
*/
public class Programme implements Serializable {
@JsonSchema(description = "The code of the programme")
private String code;
@JsonSchema(description = "The description of the programme")
private String description;
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public static Programme newInstance(String code, String description) {
Programme p = new Programme();
p.code = code;
p.description = description;
return p;
}
}

View File

@ -1,97 +1,206 @@
package eu.dnetlib.dhp.eosc.model;
import java.io.Serializable;
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;
/**
* @author miriam.baglioni
* @Date 26/01/23
* @Date 25/10/23
*/
import java.io.Serializable;
import java.util.List;
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;
/**
* This is the class representing the Project in the model used for the dumps of the whole graph. At the moment the dump
* of the Projects differs from the other dumps because we do not create relations between Funders (Organization) and
* Projects but we put the information about the Funder within the Project representation. We also removed the
* collected from element from the Project. No relation between the Project and the Datasource entity from which it is
* collected will be created. We will never create relations between Project and Datasource. In case some relation will
* be extracted from the Project they will refer the Funder and will be of type ( organization -> funds -> project,
* project -> isFundedBy -> organization) We also removed the duration parameter because the most of times it is set to
* 0. It has the following parameters:
* - private String id to store the id of the project (OpenAIRE id)
* - private String websiteurl to store the websiteurl of the project
* - private String code to store the grant agreement of the project
* - private String acronym to store the acronym of the project
* - private String title to store the tile of the project
* - private String startdate to store the start date
* - private String enddate to store the end date
* - private String callidentifier to store the call indentifier
* - private String keywords to store the keywords
* - private boolean openaccessmandateforpublications to store if the project must accomplish to the open access mandate
* for publications. This value will be set to true if one of the field in the project represented in the internal model
* is set to true
* - private boolean openaccessmandatefordataset to store if the project must accomplish to the open access mandate for
* dataset. It is set to the value in the corresponding filed of the project represented in the internal model
* - private List<String> subject to store the list of subjects of the project
* - private List<Funder> funding to store the list of funder of the project
* - private String summary to store the summary of the project
* - private Granted granted to store the granted amount
* - private List<Programme> h2020programme to store the list of programmes the project is related to
*/
public class Project implements Serializable {
@JsonSchema(description = "The OpenAIRE id for the project")
protected String id;// OpenAIRE id
private String id;
@JsonSchema(description = "The grant agreement number")
protected String code;
private String websiteurl;
private String code;
private String acronym;
private String title;
@JsonSchema(description = "The acronym of the project")
protected String acronym;
private String startdate;
protected String title;
private String enddate;
@JsonSchema(description = "Information about the funder funding the project")
private Funder funder;
private String callidentifier;
private Provenance provenance;
private String keywords;
private Validated validated;
private boolean openaccessmandateforpublications;
public void setValidated(Validated validated) {
this.validated = validated;
}
private boolean openaccessmandatefordataset;
private List<String> subject;
public Validated getValidated() {
return validated;
}
@JsonSchema(description = "Funding information for the project")
private List<Funder> funding;
public Provenance getProvenance() {
return provenance;
}
private String summary;
public void setProvenance(Provenance provenance) {
this.provenance = provenance;
}
@JsonSchema(description = "The money granted to the project")
private Granted granted;
public Funder getFunder() {
return funder;
}
@JsonSchema(description = "The h2020 programme funding the project")
private List<Programme> h2020programme;
public void setFunder(Funder funders) {
this.funder = funders;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getId() {
return id;
}
public String getCode() {
return code;
}
public void setId(String id) {
this.id = id;
}
public void setCode(String code) {
this.code = code;
}
public String getWebsiteurl() {
return websiteurl;
}
public String getAcronym() {
return acronym;
}
public void setWebsiteurl(String websiteurl) {
this.websiteurl = websiteurl;
}
public void setAcronym(String acronym) {
this.acronym = acronym;
}
public String getCode() {
return code;
}
public String getTitle() {
return title;
}
public void setCode(String code) {
this.code = code;
}
public void setTitle(String title) {
this.title = title;
}
public String getAcronym() {
return acronym;
}
public static Project newInstance(String id, String code, String acronym, String title, Funder funder) {
Project project = new Project();
project.setAcronym(acronym);
project.setCode(code);
project.setFunder(funder);
project.setId(id);
project.setTitle(title);
return project;
}
public void setAcronym(String acronym) {
this.acronym = acronym;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getStartdate() {
return startdate;
}
public void setStartdate(String startdate) {
this.startdate = startdate;
}
public String getEnddate() {
return enddate;
}
public void setEnddate(String enddate) {
this.enddate = enddate;
}
public String getCallidentifier() {
return callidentifier;
}
public void setCallidentifier(String callidentifier) {
this.callidentifier = callidentifier;
}
public String getKeywords() {
return keywords;
}
public void setKeywords(String keywords) {
this.keywords = keywords;
}
public boolean isOpenaccessmandateforpublications() {
return openaccessmandateforpublications;
}
public void setOpenaccessmandateforpublications(boolean openaccessmandateforpublications) {
this.openaccessmandateforpublications = openaccessmandateforpublications;
}
public boolean isOpenaccessmandatefordataset() {
return openaccessmandatefordataset;
}
public void setOpenaccessmandatefordataset(boolean openaccessmandatefordataset) {
this.openaccessmandatefordataset = openaccessmandatefordataset;
}
public List<String> getSubject() {
return subject;
}
public void setSubject(List<String> subject) {
this.subject = subject;
}
public List<Funder> getFunding() {
return funding;
}
public void setFunding(List<Funder> funding) {
this.funding = funding;
}
public String getSummary() {
return summary;
}
public void setSummary(String summary) {
this.summary = summary;
}
public Granted getGranted() {
return granted;
}
public void setGranted(Granted granted) {
this.granted = granted;
}
public List<Programme> getH2020programme() {
return h2020programme;
}
public void setH2020programme(List<Programme> h2020programme) {
this.h2020programme = h2020programme;
}
}

View File

@ -0,0 +1,97 @@
package eu.dnetlib.dhp.eosc.model;
import java.io.Serializable;
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;
/**
* @author miriam.baglioni
* @Date 26/01/23
*/
public class ProjectSummary implements Serializable {
@JsonSchema(description = "The OpenAIRE id for the project")
protected String id;// OpenAIRE id
@JsonSchema(description = "The grant agreement number")
protected String code;
@JsonSchema(description = "The acronym of the project")
protected String acronym;
protected String title;
@JsonSchema(description = "Information about the funder funding the project")
private FunderShort funder;
private Provenance provenance;
private Validated validated;
public void setValidated(Validated validated) {
this.validated = validated;
}
public Validated getValidated() {
return validated;
}
public Provenance getProvenance() {
return provenance;
}
public void setProvenance(Provenance provenance) {
this.provenance = provenance;
}
public FunderShort getFunder() {
return funder;
}
public void setFunder(FunderShort funders) {
this.funder = funders;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getAcronym() {
return acronym;
}
public void setAcronym(String acronym) {
this.acronym = acronym;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public static ProjectSummary newInstance(String id, String code, String acronym, String title, FunderShort funder) {
ProjectSummary project = new ProjectSummary();
project.setAcronym(acronym);
project.setCode(code);
project.setFunder(funder);
project.setId(id);
project.setTitle(title);
return project;
}
}

View File

@ -30,7 +30,7 @@ public class Result implements Serializable {
private Indicator indicator;
@JsonSchema(description = "List of projects (i.e. grants) that (co-)funded the production ofn the research results")
private List<Project> projects;
private List<ProjectSummary> projects;
@JsonSchema(
description = "Reference to a relevant research infrastructure, initiative or community (RI/RC) among those collaborating with OpenAIRE. Please see https://connect.openaire.eu")
@ -409,11 +409,11 @@ public class Result implements Serializable {
this.collectedfrom = collectedfrom;
}
public List<Project> getProjects() {
public List<ProjectSummary> getProjects() {
return projects;
}
public void setProjects(List<Project> projects) {
public void setProjects(List<ProjectSummary> projects) {
this.projects = projects;
}

View File

@ -60,8 +60,8 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
// final String outputPath = parser.get("outputPath");
// log.info("outputPath: {}", outputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
@ -70,11 +70,11 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, workingPath + "publicationextendedaffiliation");
addOrganizations(spark, inputPath, workingPath );
addOrganizations(spark, inputPath, workingPath , outputPath);
});
}
private static void addOrganizations(SparkSession spark, String inputPath, String workingPath) {
private static void addOrganizations(SparkSession spark, String inputPath, String workingPath, String outputPath) {
Dataset<Result> results = Utils
.readPath(spark, workingPath + "publication", Result.class);
@ -162,7 +162,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(workingPath + "organization");
.json(outputPath + "organization");
relations
.joinWith(organizations, relations.col("source").equalTo(organizations.col("id")))
@ -170,7 +170,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(workingPath + "resultOrganization");
.json(outputPath + "resultOrganization");
}

View File

@ -4,11 +4,11 @@ package eu.dnetlib.dhp.oa.graph.dump.eosc;
import java.io.Serializable;
import java.util.List;
import eu.dnetlib.dhp.eosc.model.Project;
import eu.dnetlib.dhp.eosc.model.ProjectSummary;
public class ResultProject implements Serializable {
private String resultId;
private List<Project> projectsList;
private List<ProjectSummary> projectsList;
public String getResultId() {
return resultId;
@ -18,11 +18,11 @@ public class ResultProject implements Serializable {
this.resultId = resultId;
}
public List<Project> getProjectsList() {
public List<ProjectSummary> getProjectsList() {
return projectsList;
}
public void setProjectsList(List<Project> projectsList) {
public void setProjectsList(List<ProjectSummary> projectsList) {
this.projectsList = projectsList;
}
}

View File

@ -26,8 +26,8 @@ import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.eosc.model.Funder;
import eu.dnetlib.dhp.eosc.model.Project;
import eu.dnetlib.dhp.eosc.model.FunderShort;
import eu.dnetlib.dhp.eosc.model.ProjectSummary;
import eu.dnetlib.dhp.eosc.model.Provenance;
import eu.dnetlib.dhp.eosc.model.Validated;
import eu.dnetlib.dhp.schema.common.ModelConstants;
@ -103,9 +103,9 @@ public class SparkPrepareResultProject implements Serializable {
rp.setResultId(s);
eu.dnetlib.dhp.schema.oaf.Project p = first._1();
projectSet.add(p.getId());
Project ps = getProject(p, first._2);
ProjectSummary ps = getProject(p, first._2);
List<Project> projList = new ArrayList<>();
List<ProjectSummary> projList = new ArrayList<>();
projList.add(ps);
rp.setProjectsList(projList);
it.forEachRemaining(c -> {
@ -132,8 +132,8 @@ public class SparkPrepareResultProject implements Serializable {
.json(outputPath);
}
private static Project getProject(eu.dnetlib.dhp.schema.oaf.Project op, Relation relation) {
Project p = Project
private static ProjectSummary getProject(eu.dnetlib.dhp.schema.oaf.Project op, Relation relation) {
ProjectSummary p = ProjectSummary
.newInstance(
op.getId(),
op.getCode().getValue(),
@ -148,7 +148,7 @@ public class SparkPrepareResultProject implements Serializable {
Optional
.ofNullable(op.getFundingtree())
.map(value -> {
List<Funder> tmp = value
List<FunderShort> tmp = value
.stream()
.map(ft -> getFunder(ft.getValue()))
.collect(Collectors.toList());
@ -174,8 +174,8 @@ public class SparkPrepareResultProject implements Serializable {
}
private static Funder getFunder(String fundingtree) {
final Funder f = new Funder();
private static FunderShort getFunder(String fundingtree) {
final FunderShort f = new FunderShort();
final Document doc;
try {
final SAXReader reader = new SAXReader();

View File

@ -4,23 +4,35 @@ package eu.dnetlib.dhp.oa.graph.dump.eosc;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.eosc.model.*;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Project;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.eosc.model.Result;
import eu.dnetlib.dhp.oa.graph.dump.Constants;
import scala.Array;
import scala.Tuple2;
public class SparkUpdateProjectInfo implements Serializable {
@ -47,12 +59,15 @@ public class SparkUpdateProjectInfo implements Serializable {
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String preparedInfoPath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", preparedInfoPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String dumpType = Optional
.ofNullable(parser.get("dumpType"))
.orElse(Constants.DUMPTYPE.COMMUNITY.getType());
@ -64,18 +79,19 @@ public class SparkUpdateProjectInfo implements Serializable {
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
extend(spark, inputPath, outputPath, preparedInfoPath);
Utils.removeOutputDir(spark, workingPath + "publicationextendedproject");
extend(spark, inputPath, workingPath, preparedInfoPath, outputPath);
});
}
private static void extend(
SparkSession spark,
String inputPath,
String outputPath,
String preparedInfoPath) {
String workingPath,
String preparedInfoPath,
String outputPath) {
Dataset<Result> result = Utils.readPath(spark, inputPath, Result.class);
Dataset<Result> result = Utils.readPath(spark, workingPath + "publicationextendedaffiliation", Result.class);
Dataset<ResultProject> resultProject = Utils.readPath(spark, preparedInfoPath, ResultProject.class);
result
@ -90,7 +106,186 @@ public class SparkUpdateProjectInfo implements Serializable {
.write()
.option("compression", "gzip")
.mode(SaveMode.Append)
.json(outputPath);
.json(workingPath + "publicationextendedproject");
Dataset<Project> project = Utils.readPath(spark, inputPath + "/project", Project.class);
Dataset<String> projectIds = result.joinWith(resultProject, result.col("id").equalTo(resultProject.col("resultId")))
.flatMap((FlatMapFunction<Tuple2<Result, ResultProject>, String>) t2 -> t2._2().getProjectsList()
.stream().map(p -> p.getId()).collect(Collectors.toList()).iterator(), Encoders.STRING())
.distinct();
projectIds.joinWith(project, projectIds.col("value").equalTo(project.col("id")))
.map((MapFunction<Tuple2<String, Project>, eu.dnetlib.dhp.eosc.model.Project>)t2->mapProject(t2._2()), Encoders.bean(eu.dnetlib.dhp.eosc.model.Project.class) )
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(outputPath + "project");
resultProject.flatMap((FlatMapFunction<ResultProject, Relation>) rp ->
rp.getProjectsList().stream().map(p -> Relation.newInstance(rp.getResultId(), p.getId()))
.collect(Collectors.toList()).iterator(), Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(outputPath + "resultProject");
}
private static eu.dnetlib.dhp.eosc.model.Project mapProject(eu.dnetlib.dhp.schema.oaf.Project p) throws DocumentException {
if (Boolean.TRUE.equals(p.getDataInfo().getDeletedbyinference()))
return null;
eu.dnetlib.dhp.eosc.model.Project project = new eu.dnetlib.dhp.eosc.model.Project();
Optional
.ofNullable(p.getId())
.ifPresent(id -> project.setId(id));
Optional
.ofNullable(p.getWebsiteurl())
.ifPresent(w -> project.setWebsiteurl(w.getValue()));
Optional
.ofNullable(p.getCode())
.ifPresent(code -> project.setCode(code.getValue()));
Optional
.ofNullable(p.getAcronym())
.ifPresent(acronynim -> project.setAcronym(acronynim.getValue()));
Optional
.ofNullable(p.getTitle())
.ifPresent(title -> project.setTitle(title.getValue()));
Optional
.ofNullable(p.getStartdate())
.ifPresent(sdate -> project.setStartdate(sdate.getValue()));
Optional
.ofNullable(p.getEnddate())
.ifPresent(edate -> project.setEnddate(edate.getValue()));
Optional
.ofNullable(p.getCallidentifier())
.ifPresent(cide -> project.setCallidentifier(cide.getValue()));
Optional
.ofNullable(p.getKeywords())
.ifPresent(key -> project.setKeywords(key.getValue()));
Optional<Field<String>> omandate = Optional.ofNullable(p.getOamandatepublications());
Optional<Field<String>> oecsc39 = Optional.ofNullable(p.getEcsc39());
boolean mandate = false;
if (omandate.isPresent()) {
if (omandate.get().getValue().equals("true")) {
mandate = true;
}
}
if (oecsc39.isPresent()) {
if (oecsc39.get().getValue().equals("true")) {
mandate = true;
}
}
project.setOpenaccessmandateforpublications(mandate);
project.setOpenaccessmandatefordataset(false);
Optional
.ofNullable(p.getEcarticle29_3())
.ifPresent(oamandate -> project.setOpenaccessmandatefordataset(oamandate.getValue().equals("true")));
project
.setSubject(
Optional
.ofNullable(p.getSubjects())
.map(subjs -> subjs.stream().map(s -> s.getValue()).collect(Collectors.toList()))
.orElse(new ArrayList<>()));
Optional
.ofNullable(p.getSummary())
.ifPresent(summary -> project.setSummary(summary.getValue()));
Optional<Float> ofundedamount = Optional.ofNullable(p.getFundedamount());
Optional<Field<String>> ocurrency = Optional.ofNullable(p.getCurrency());
Optional<Float> ototalcost = Optional.ofNullable(p.getTotalcost());
if (ocurrency.isPresent()) {
if (ofundedamount.isPresent()) {
if (ototalcost.isPresent()) {
project
.setGranted(
Granted.newInstance(ocurrency.get().getValue(), ototalcost.get(), ofundedamount.get()));
} else {
project.setGranted(Granted.newInstance(ocurrency.get().getValue(), ofundedamount.get()));
}
}
}
project
.setH2020programme(
Optional
.ofNullable(p.getH2020classification())
.map(
classification -> classification
.stream()
.map(
c -> Programme
.newInstance(
c.getH2020Programme().getCode(), c.getH2020Programme().getDescription()))
.collect(Collectors.toList()))
.orElse(new ArrayList<>()));
Optional<List<Field<String>>> ofundTree = Optional
.ofNullable(p.getFundingtree());
List<Funder> funList = new ArrayList<>();
if (ofundTree.isPresent()) {
for (Field<String> fundingtree : ofundTree.get()) {
funList.add(getFunder(fundingtree.getValue()));
}
}
project.setFunding(funList);
return project;
}
public static Funder getFunder(String fundingtree) throws DocumentException {
Funder f = new Funder();
final Document doc;
doc = new SAXReader().read(new StringReader(fundingtree));
f.setShortName(((org.dom4j.Node) (doc.selectNodes("//funder/shortname").get(0))).getText());
f.setName(((org.dom4j.Node) (doc.selectNodes("//funder/name").get(0))).getText());
f.setJurisdiction(((org.dom4j.Node) (doc.selectNodes("//funder/jurisdiction").get(0))).getText());
String id = "";
StringBuilder bld = new StringBuilder();
int level = 0;
List<org.dom4j.Node> nodes = doc.selectNodes("//funding_level_" + level);
while (!nodes.isEmpty()) {
for (org.dom4j.Node n : nodes) {
List node = n.selectNodes("./id");
id = ((org.dom4j.Node) node.get(0)).getText();
id = id.substring(id.indexOf("::") + 2);
node = n.selectNodes("./description");
bld.append(((Node) node.get(0)).getText() + " - ");
}
level += 1;
nodes = doc.selectNodes("//funding_level_" + level);
}
String description = bld.toString();
if (!id.equals("")) {
Fundings fundings = new Fundings();
fundings.setId(id);
fundings.setDescription(description.substring(0, description.length() - 3).trim());
f.setFunding_stream(fundings);
}
return f;
}

View File

@ -165,32 +165,7 @@
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--workingPath</arg><arg>${workingDir}/dump/</arg>
<!-- <arg>&#45;&#45;resultPath</arg><arg>${workingDir}/dump/publication</arg>-->
<!-- <arg>&#45;&#45;outputPath</arg><arg>${workingDir}/dump/publicationextendedaffiliation</arg>-->
</spark>
<ok to="wait_eosc_dump"/>
<error to="Kill"/>
</action>
<action name="extend_publication_with_indicators">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend Dump Publication with indicators </name>
<class>eu.dnetlib.dhp.oa.graph.dump.eosc.ExtendWithUsageCounts</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--actionSetPath</arg><arg>${actionSetPath}</arg>
<arg>--resultPath</arg><arg>${workingDir}/dump/publicationextendedaffiliation</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/publicationextended</arg>
<arg>--outputPath</arg><arg>${outputPath}/dump/</arg>
</spark>
<ok to="wait_eosc_dump"/>
<error to="Kill"/>
@ -241,38 +216,15 @@
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--resultPath</arg><arg>${workingDir}/dump/dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/datasetextendedaffiliation</arg>
</spark>
<ok to="wait_eosc_dump"/>
<error to="Kill"/>
</action>
<action name="extend_dataset_with_indicators">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend Dump Dataset with indicators </name>
<class>eu.dnetlib.dhp.oa.graph.dump.eosc.ExtendWithUsageCounts</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--actionSetPath</arg><arg>${actionSetPath}</arg>
<arg>--resultPath</arg><arg>${workingDir}/dump/datasetextendedaffiliation</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/datasetextended</arg>
<arg>--workingPath</arg><arg>${workingDir}/dump/</arg>
<arg>--outputPath</arg><arg>${outputPath}/dump/</arg>
</spark>
<ok to="wait_eosc_dump"/>
<error to="Kill"/>
</action>
<action name="dump_eosc_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
@ -317,32 +269,8 @@
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--resultPath</arg><arg>${workingDir}/dump/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/otherresearchproductextendedaffiliation</arg>
</spark>
<ok to="wait_eosc_dump"/>
<error to="Kill"/>
</action>
<action name="extend_orp_with_indicators">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend Dump ORP with indicators </name>
<class>eu.dnetlib.dhp.oa.graph.dump.eosc.ExtendWithUsageCounts</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--actionSetPath</arg><arg>${actionSetPath}</arg>
<arg>--resultPath</arg><arg>${workingDir}/dump/otherresearchproductextendedaffiliation</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/otherresearchproductextended</arg>
<arg>--workingPath</arg><arg>${workingDir}/dump/</arg>
<arg>--outputPath</arg><arg>${outputPath}/dump/</arg>
</spark>
<ok to="wait_eosc_dump"/>
<error to="Kill"/>
@ -393,37 +321,14 @@
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--resultPath</arg><arg>${workingDir}/dump/software</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/softwareextendedaffiliation</arg>
</spark>
<ok to="wait_eosc_dump"/>
<error to="Kill"/>
</action>
<action name="extend_software_with_indicators">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend Dump ORP with indicators </name>
<class>eu.dnetlib.dhp.oa.graph.dump.eosc.ExtendWithUsageCounts</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--actionSetPath</arg><arg>${actionSetPath}</arg>
<arg>--resultPath</arg><arg>${workingDir}/dump/softwareextendedaffiliation</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/softwareextended</arg>
<arg>--workingPath</arg><arg>${workingDir}/dump/</arg>
<arg>--outputPath</arg><arg>${outputPath}/dump/</arg>
</spark>
<ok to="wait_eosc_dump"/>
<error to="Kill"/>
</action>
<join name="wait_eosc_dump" to="prepareResultProject"/>
<action name="prepareResultProject">
@ -474,10 +379,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/publicationextendedaffiliation</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/publicationextendedproject</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--workingPath</arg><arg>${workingDir}/dump/</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
<arg>--dumpType</arg><arg>eosc</arg>
<arg>--outputPath</arg><arg>${outputPath}/dump/</arg>
</spark>
<ok to="join_extend"/>
<error to="Kill"/>

View File

@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.eosc.model.Project;
import eu.dnetlib.dhp.eosc.model.ProjectSummary;
import eu.dnetlib.dhp.eosc.model.Result;
import eu.dnetlib.dhp.oa.graph.dump.eosc.SparkUpdateProjectInfo;
@ -194,15 +194,15 @@ public class UpdateProjectInfoTest {
.filter("id = '50|pensoft_____::00ea4a1cd53806a97d62ea6bf268f2a2' and code = '119027'")
.count());
Project project = verificationDataset
ProjectSummary project = verificationDataset
.map(
(MapFunction<Result, Project>) cr -> cr
(MapFunction<Result, ProjectSummary>) cr -> cr
.getProjects()
.stream()
.filter(p -> p.getValidated() != null)
.collect(Collectors.toList())
.get(0),
Encoders.bean(Project.class))
Encoders.bean(ProjectSummary.class))
.first();
Assertions.assertTrue(project.getFunder().getName().equals("Academy of Finland"));
@ -213,13 +213,13 @@ public class UpdateProjectInfoTest {
project = verificationDataset
.map(
(MapFunction<Result, Project>) cr -> cr
(MapFunction<Result, ProjectSummary>) cr -> cr
.getProjects()
.stream()
.filter(p -> p.getValidated() == null)
.collect(Collectors.toList())
.get(0),
Encoders.bean(Project.class))
Encoders.bean(ProjectSummary.class))
.first();
Assertions.assertTrue(project.getFunder().getName().equals("European Commission"));