added code to dump the relations between organizaiton and projects in the subset of entities relevant for EOSC

This commit is contained in:
Miriam Baglioni 2023-10-25 11:46:10 +02:00
parent da19f117d8
commit a821371af2
13 changed files with 459 additions and 442 deletions

View File

@ -1,19 +1,18 @@
package eu.dnetlib.dhp.eosc.model;
/**
* @author miriam.baglioni
* @Date 25/10/23
*/
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;
/**
* To store information about the funder funding the project related to the result. It extends
* eu.dnetlib.dhp.schema.dump.oaf.Funder with the following parameter: - - private
* eu.dnetdlib.dhp.schema.dump.oaf.graph.Fundings funding_stream to store the fundingstream
*/
public class Funder extends FunderShort {
public class Funder extends FunderShort {
@JsonSchema(description = "Description of the funding stream")
private Fundings funding_stream;
@ -25,4 +24,4 @@ package eu.dnetlib.dhp.eosc.model;
public void setFunding_stream(Fundings funding_stream) {
this.funding_stream = funding_stream;
}
}
}

View File

@ -1,12 +1,10 @@
package eu.dnetlib.dhp.eosc.model;
/**
* @author miriam.baglioni
* @Date 25/10/23
*/
import java.io.Serializable;
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;

View File

@ -1,11 +1,10 @@
package eu.dnetlib.dhp.eosc.model;
/**
* @author miriam.baglioni
* @Date 25/10/23
*/
import java.io.Serializable;
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.eosc.model;
import java.io.Serializable;

View File

@ -33,7 +33,7 @@ public class OrganizationPid implements Serializable {
this.value = value;
}
public static OrganizationPid newInstance(String type, String value){
public static OrganizationPid newInstance(String type, String value) {
OrganizationPid op = new OrganizationPid();
op.type = type;
op.value = value;

View File

@ -1,11 +1,10 @@
package eu.dnetlib.dhp.eosc.model;
/**
* @author miriam.baglioni
* @Date 25/10/23
*/
import java.io.Serializable;
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;

View File

@ -1,17 +1,15 @@
package eu.dnetlib.dhp.eosc.model;
/**
* @author miriam.baglioni
* @Date 25/10/23
*/
import java.io.Serializable;
import java.util.List;
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;
/**
* This is the class representing the Project in the model used for the dumps of the whole graph. At the moment the dump
* of the Projects differs from the other dumps because we do not create relations between Funders (Organization) and
@ -74,8 +72,6 @@ public class Project implements Serializable {
@JsonSchema(description = "The h2020 programme funding the project")
private List<Programme> h2020programme;
public String getId() {
return id;
}

View File

@ -6,7 +6,6 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.*;
import eu.dnetlib.dhp.eosc.model.Affiliation;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
@ -20,6 +19,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.eosc.model.Affiliation;
import eu.dnetlib.dhp.eosc.model.OrganizationPid;
import eu.dnetlib.dhp.eosc.model.Result;
import eu.dnetlib.dhp.schema.common.ModelConstants;

View File

@ -7,8 +7,6 @@ import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.eosc.model.Affiliation;
import eu.dnetlib.dhp.eosc.model.Country;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
@ -22,6 +20,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.eosc.model.Affiliation;
import eu.dnetlib.dhp.eosc.model.Country;
import eu.dnetlib.dhp.eosc.model.OrganizationPid;
import eu.dnetlib.dhp.eosc.model.Result;
import eu.dnetlib.dhp.schema.common.ModelConstants;
@ -70,7 +70,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, workingPath + "publicationextendedaffiliation");
addOrganizations(spark, inputPath, workingPath , outputPath);
addOrganizations(spark, inputPath, workingPath, outputPath);
});
}
@ -157,24 +157,30 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
relations
.joinWith(organizations, relations.col("source").equalTo(organizations.col("id")))
.map((MapFunction<Tuple2<Relation, Organization>, eu.dnetlib.dhp.eosc.model.Organization>) t2 -> mapOrganization(t2._2()),Encoders.bean(eu.dnetlib.dhp.eosc.model.Organization.class))
.map(
(MapFunction<Tuple2<Relation, Organization>, eu.dnetlib.dhp.eosc.model.Organization>) t2 -> mapOrganization(
t2._2()),
Encoders.bean(eu.dnetlib.dhp.eosc.model.Organization.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.option("compression", "gzip")
.json(outputPath + "organization");
relations
.joinWith(organizations, relations.col("source").equalTo(organizations.col("id")))
.map((MapFunction<Tuple2<Relation, Organization>, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> eu.dnetlib.dhp.eosc.model.Relation.newInstance(t2._1().getSource(), t2._1().getTarget()), Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class) )
.map(
(MapFunction<Tuple2<Relation, Organization>, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> eu.dnetlib.dhp.eosc.model.Relation
.newInstance(t2._1().getSource(), t2._1().getTarget()),
Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.option("compression", "gzip")
.json(outputPath + "resultOrganization");
}
private static eu.dnetlib.dhp.eosc.model.Organization mapOrganization(Organization org){
private static eu.dnetlib.dhp.eosc.model.Organization mapOrganization(Organization org) {
if (Boolean.TRUE.equals(org.getDataInfo().getDeletedbyinference()))
return null;
@ -235,6 +241,4 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
return organization;
}
}
}

View File

@ -6,8 +6,12 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Optional;
import eu.dnetlib.dhp.eosc.model.Organization;
import eu.dnetlib.dhp.eosc.model.Project;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@ -21,19 +25,20 @@ import eu.dnetlib.dhp.eosc.model.Provenance;
import eu.dnetlib.dhp.eosc.model.RelType;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
/**
* @author miriam.baglioni
* @Date 12/01/23
*/
public class SparkDumpRelation implements Serializable {
public class SparkDumpOrganizationProject implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkDumpRelation.class);
private static final Logger log = LoggerFactory.getLogger(SparkDumpOrganizationProject.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkDumpRelation.class
SparkDumpOrganizationProject.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_relationdump_parameters.json"));
@ -66,49 +71,21 @@ public class SparkDumpRelation implements Serializable {
}
private static void dumpRelation(SparkSession spark, String inputPath, String outputPath) {
Dataset<Relation> relations = Utils.readPath(spark, inputPath, Relation.class);
relations
Dataset<Organization> organization = Utils.readPath(spark, outputPath + "organization", Organization.class);
Dataset<Project> project = Utils.readPath(spark, outputPath + "project", Project.class);
.map((MapFunction<Relation, eu.dnetlib.dhp.eosc.model.Relation>) relation -> {
eu.dnetlib.dhp.eosc.model.Relation relNew = new eu.dnetlib.dhp.eosc.model.Relation();
relNew
.setSource(
Dataset<Relation> relation = Utils.readPath(spark, inputPath + "/relation", Relation.class)
.filter((FilterFunction<Relation>) r-> !r.getDataInfo().getDeletedbyinference() && r.getRelClass().equalsIgnoreCase(ModelConstants.IS_PARTICIPANT));
relation.getSource());
Dataset<Relation> eoscOrgs = relation.joinWith(organization, relation.col("source").equalTo(organization.col("id")))
.map((MapFunction<Tuple2<Relation, Organization>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class));
relNew
.setTarget(
relation.getTarget());
relNew
.setReltype(
RelType
.newInstance(
relation.getRelClass(),
relation.getSubRelType()));
Optional<DataInfo> odInfo = Optional.ofNullable(relation.getDataInfo());
if (odInfo.isPresent()) {
DataInfo dInfo = odInfo.get();
if (Optional.ofNullable(dInfo.getProvenanceaction()).isPresent() &&
Optional.ofNullable(dInfo.getProvenanceaction().getClassname()).isPresent()) {
relNew
.setProvenance(
Provenance
.newInstance(
dInfo.getProvenanceaction().getClassname(),
dInfo.getTrust()));
}
}
return relNew;
}, Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class))
eoscOrgs.joinWith(project, eoscOrgs.col("target").equalTo(project.col("id")))
.map((MapFunction<Tuple2<Relation, Project>, eu.dnetlib.dhp.eosc.model.Relation>) t2-> eu.dnetlib.dhp.eosc.model.Relation.newInstance(t2._1().getSource(), t2._1().getTarget()), Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Append)
.json(outputPath);
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(outputPath + "organizationProject");
}

View File

@ -10,9 +10,6 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.eosc.model.*;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Project;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
@ -31,7 +28,10 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.eosc.model.*;
import eu.dnetlib.dhp.oa.graph.dump.Constants;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Project;
import scala.Array;
import scala.Tuple2;
@ -110,28 +110,46 @@ public class SparkUpdateProjectInfo implements Serializable {
Dataset<Project> project = Utils.readPath(spark, inputPath + "/project", Project.class);
Dataset<String> projectIds = result.joinWith(resultProject, result.col("id").equalTo(resultProject.col("resultId")))
.flatMap((FlatMapFunction<Tuple2<Result, ResultProject>, String>) t2 -> t2._2().getProjectsList()
.stream().map(p -> p.getId()).collect(Collectors.toList()).iterator(), Encoders.STRING())
Dataset<String> projectIds = result
.joinWith(resultProject, result.col("id").equalTo(resultProject.col("resultId")))
.flatMap(
(FlatMapFunction<Tuple2<Result, ResultProject>, String>) t2 -> t2
._2()
.getProjectsList()
.stream()
.map(p -> p.getId())
.collect(Collectors.toList())
.iterator(),
Encoders.STRING())
.distinct();
projectIds.joinWith(project, projectIds.col("value").equalTo(project.col("id")))
.map((MapFunction<Tuple2<String, Project>, eu.dnetlib.dhp.eosc.model.Project>)t2->mapProject(t2._2()), Encoders.bean(eu.dnetlib.dhp.eosc.model.Project.class) )
projectIds
.joinWith(project, projectIds.col("value").equalTo(project.col("id")))
.map(
(MapFunction<Tuple2<String, Project>, eu.dnetlib.dhp.eosc.model.Project>) t2 -> mapProject(t2._2()),
Encoders.bean(eu.dnetlib.dhp.eosc.model.Project.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.option("compression", "gzip")
.json(outputPath + "project");
resultProject.flatMap((FlatMapFunction<ResultProject, Relation>) rp ->
rp.getProjectsList().stream().map(p -> Relation.newInstance(rp.getResultId(), p.getId()))
.collect(Collectors.toList()).iterator(), Encoders.bean(Relation.class))
resultProject
.flatMap(
(FlatMapFunction<ResultProject, Relation>) rp -> rp
.getProjectsList()
.stream()
.map(p -> Relation.newInstance(rp.getResultId(), p.getId()))
.collect(Collectors.toList())
.iterator(),
Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.option("compression", "gzip")
.json(outputPath + "resultProject");
}
private static eu.dnetlib.dhp.eosc.model.Project mapProject(eu.dnetlib.dhp.schema.oaf.Project p) throws DocumentException {
private static eu.dnetlib.dhp.eosc.model.Project mapProject(eu.dnetlib.dhp.schema.oaf.Project p)
throws DocumentException {
if (Boolean.TRUE.equals(p.getDataInfo().getDeletedbyinference()))
return null;

View File

@ -0,0 +1,26 @@
[
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the name node",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "the path used to store temporary output files",
"paramRequired": false
}
]

View File

@ -24,8 +24,8 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.eosc.model.Indicator;
import eu.dnetlib.dhp.eosc.model.Affiliation;
import eu.dnetlib.dhp.eosc.model.Indicator;
import eu.dnetlib.dhp.eosc.model.Result;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import scala.Tuple2;