Added extention to dump Organizations and also relations of type resultOrganization

This commit is contained in:
Miriam Baglioni 2023-10-24 17:01:03 +02:00
parent 64f10b6d31
commit a623883b62
10 changed files with 252 additions and 55 deletions

View File

@ -0,0 +1,46 @@
package eu.dnetlib.dhp.eosc.model;
import java.io.Serializable;
import java.util.List;
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;
/**
* @author miriam.baglioni
* @Date 13/09/22
*/
public class Affiliation implements Serializable {
@JsonSchema(description = "the OpenAIRE id of the organizaiton")
private String id;
@JsonSchema(description = "the name of the organization")
private String name;
@JsonSchema(description = "the list of pids we have in OpenAIRE for the organization")
private List<OrganizationPid> pid;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public List<OrganizationPid> getPid() {
return pid;
}
public void setPid(List<OrganizationPid> pid) {
this.pid = pid;
}
}

View File

@ -1,4 +1,3 @@
package eu.dnetlib.dhp.eosc.model;
import java.io.Serializable;
@ -7,40 +6,86 @@ import java.util.List;
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;
/**
* @author miriam.baglioni
* @Date 13/09/22
* To represent the generic organizaiton. It has the following parameters:
* - private String legalshortname to store the legalshortname of the organizaiton
* - private String legalname to store the legal name of the organization
* - private String websiteurl to store the websiteurl of the organization
* - private List<String> alternativenames to store the alternative names of the organization
* - private Country country to store the country of the organization
* - private String id to store the openaire id of the organization
* - private List<OrganizationPid> pid to store the list of pids for the organization
*/
public class Organization implements Serializable {
@JsonSchema(description = "the OpenAIRE id of the organizaiton")
private String id;
private String legalshortname;
private String legalname;
private String websiteurl;
@JsonSchema(description = "the name of the organization")
private String name;
@JsonSchema(description = "Alternative names that identify the organisation")
private List<String> alternativenames;
@JsonSchema(description = "the list of pids we have in OpenAIRE for the organization")
private List<OrganizationPid> pid;
@JsonSchema(description = "The organisation country")
private Country country;
public String getId() {
return id;
}
@JsonSchema(description = "The OpenAIRE id for the organisation")
private String id;
public void setId(String id) {
this.id = id;
}
@JsonSchema(description = "Persistent identifiers for the organisation i.e. isni 0000000090326370")
private List<OrganizationPid> pid;
public String getName() {
return name;
}
public String getLegalshortname() {
return legalshortname;
}
public void setName(String name) {
this.name = name;
}
public void setLegalshortname(String legalshortname) {
this.legalshortname = legalshortname;
}
public List<OrganizationPid> getPid() {
return pid;
}
public String getLegalname() {
return legalname;
}
public void setLegalname(String legalname) {
this.legalname = legalname;
}
public String getWebsiteurl() {
return websiteurl;
}
public void setWebsiteurl(String websiteurl) {
this.websiteurl = websiteurl;
}
public List<String> getAlternativenames() {
return alternativenames;
}
public void setAlternativenames(List<String> alternativenames) {
this.alternativenames = alternativenames;
}
public Country getCountry() {
return country;
}
public void setCountry(Country country) {
this.country = country;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public List<OrganizationPid> getPid() {
return pid;
}
public void setPid(List<OrganizationPid> pid) {
this.pid = pid;
}
public void setPid(List<OrganizationPid> pid) {
this.pid = pid;
}
}

View File

@ -32,4 +32,12 @@ public class OrganizationPid implements Serializable {
public void setValue(String value) {
this.value = value;
}
public static OrganizationPid newInstance(String type, String value){
OrganizationPid op = new OrganizationPid();
op.type = type;
op.value = value;
return op;
}
}

View File

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.eosc.model;
import java.io.Serializable;
import java.util.Objects;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;
/**
@ -20,12 +21,15 @@ public class Relation implements Serializable {
private String target;
@JsonSchema(description = "To represent the semantics of a relation between two entities")
@JsonIgnoreProperties(ignoreUnknown = true)
private RelType reltype;
@JsonSchema(description = "The reason why OpenAIRE holds the relation ")
@JsonIgnoreProperties(ignoreUnknown = true)
private Provenance provenance;
@JsonSchema(description = "The result type of the target for this relation")
@JsonIgnoreProperties(ignoreUnknown = true)
private String targetType;
public String getTargetType() {
@ -82,4 +86,12 @@ public class Relation implements Serializable {
relation.provenance = provenance;
return relation;
}
public static Relation newInstance(String source, String target) {
Relation relation = new Relation();
relation.source = source;
relation.target = target;
return relation;
}
}

View File

@ -24,7 +24,7 @@ public class Result implements Serializable {
private List<String> keywords;
@JsonSchema(description = "The list of organizations the result is affiliated to")
private List<Organization> affiliation;
private List<Affiliation> affiliation;
@JsonSchema(description = "The indicators for this result")
private Indicator indicator;
@ -465,11 +465,11 @@ public class Result implements Serializable {
this.subject = subject;
}
public List<Organization> getAffiliation() {
public List<Affiliation> getAffiliation() {
return affiliation;
}
public void setAffiliation(List<Organization> affiliation) {
public void setAffiliation(List<Affiliation> affiliation) {
this.affiliation = affiliation;
}
}

View File

@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.*;
import eu.dnetlib.dhp.eosc.model.Affiliation;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
@ -91,7 +92,7 @@ public class ExtendEoscResultWithOrganization implements Serializable {
if (t2._2() != null) {
ResultOrganizations rOrg = new ResultOrganizations();
rOrg.setResultId(t2._1().getTarget());
eu.dnetlib.dhp.eosc.model.Organization org = new eu.dnetlib.dhp.eosc.model.Organization();
Affiliation org = new Affiliation();
org.setId(t2._2().getId());
if (Optional.ofNullable(t2._2().getLegalname()).isPresent()) {
org.setName(t2._2().getLegalname().getValue());
@ -135,7 +136,7 @@ public class ExtendEoscResultWithOrganization implements Serializable {
return first._1();
}
Result ret = first._1();
List<eu.dnetlib.dhp.eosc.model.Organization> affiliation = new ArrayList<>();
List<Affiliation> affiliation = new ArrayList<>();
Set<String> alreadyInsertedAffiliations = new HashSet<>();
affiliation.add(first._2().getAffiliation());
alreadyInsertedAffiliations.add(first._2().getAffiliation().getId());

View File

@ -5,7 +5,10 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.eosc.model.Affiliation;
import eu.dnetlib.dhp.eosc.model.Country;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
@ -33,6 +36,8 @@ import scala.Tuple2;
public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
private static final Logger log = LoggerFactory.getLogger(ExtendEoscResultWithOrganizationStep2.class);
private final static String UNKNOWN = "UNKNOWN";
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
@ -52,11 +57,11 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String resultPath = parser.get("resultPath");
log.info("resultPath: {}", resultPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
// final String outputPath = parser.get("outputPath");
// log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
@ -64,15 +69,15 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
addOrganizations(spark, inputPath, outputPath, resultPath);
Utils.removeOutputDir(spark, workingPath + "publicationextendedaffiliation");
addOrganizations(spark, inputPath, workingPath );
});
}
private static void addOrganizations(SparkSession spark, String inputPath, String outputPath,
String resultPath) {
private static void addOrganizations(SparkSession spark, String inputPath, String workingPath) {
Dataset<Result> results = Utils
.readPath(spark, resultPath, Result.class);
.readPath(spark, workingPath + "publication", Result.class);
Dataset<Relation> relations = Utils
.readPath(spark, inputPath + "/relation", Relation.class)
@ -88,7 +93,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
if (t2._2() != null) {
ResultOrganizations rOrg = new ResultOrganizations();
rOrg.setResultId(t2._1().getTarget());
eu.dnetlib.dhp.eosc.model.Organization org = new eu.dnetlib.dhp.eosc.model.Organization();
Affiliation org = new Affiliation();
org.setId(t2._2().getId());
if (Optional.ofNullable(t2._2().getLegalname()).isPresent()) {
org.setName(t2._2().getLegalname().getValue());
@ -131,7 +136,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
return first._1();
}
Result ret = first._1();
List<eu.dnetlib.dhp.eosc.model.Organization> affiliation = new ArrayList<>();
List<Affiliation> affiliation = new ArrayList<>();
Set<String> alreadyInsertedAffiliations = new HashSet<>();
affiliation.add(first._2().getAffiliation());
alreadyInsertedAffiliations.add(first._2().getAffiliation().getId());
@ -148,8 +153,88 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
.json(workingPath + "publicationextendedaffiliation");
relations
.joinWith(organizations, relations.col("source").equalTo(organizations.col("id")))
.map((MapFunction<Tuple2<Relation, Organization>, eu.dnetlib.dhp.eosc.model.Organization>) t2 -> mapOrganization(t2._2()),Encoders.bean(eu.dnetlib.dhp.eosc.model.Organization.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(workingPath + "organization");
relations
.joinWith(organizations, relations.col("source").equalTo(organizations.col("id")))
.map((MapFunction<Tuple2<Relation, Organization>, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> eu.dnetlib.dhp.eosc.model.Relation.newInstance(t2._1().getSource(), t2._1().getTarget()), Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class) )
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(workingPath + "resultOrganization");
}
}
private static eu.dnetlib.dhp.eosc.model.Organization mapOrganization(Organization org){
if (Boolean.TRUE.equals(org.getDataInfo().getDeletedbyinference()))
return null;
if (!Optional.ofNullable(org.getLegalname()).isPresent()
&& !Optional.ofNullable(org.getLegalshortname()).isPresent())
return null;
eu.dnetlib.dhp.eosc.model.Organization organization = new eu.dnetlib.dhp.eosc.model.Organization();
Optional
.ofNullable(org.getLegalshortname())
.ifPresent(value -> organization.setLegalshortname(value.getValue()));
Optional
.ofNullable(org.getLegalname())
.ifPresent(value -> organization.setLegalname(value.getValue()));
Optional
.ofNullable(org.getWebsiteurl())
.ifPresent(value -> organization.setWebsiteurl(value.getValue()));
Optional
.ofNullable(org.getAlternativeNames())
.ifPresent(
value -> organization
.setAlternativenames(
value
.stream()
.map(v -> v.getValue())
.collect(Collectors.toList())));
Optional
.ofNullable(org.getCountry())
.ifPresent(
value -> {
if (!value.getClassid().equals(UNKNOWN)) {
organization
.setCountry(
Country.newInstance(value.getClassid(), value.getClassname()));
}
});
Optional
.ofNullable(org.getId())
.ifPresent(value -> organization.setId(value));
Optional
.ofNullable(org.getPid())
.ifPresent(
value -> organization
.setPid(
value
.stream()
.map(p -> OrganizationPid.newInstance(p.getQualifier().getClassid(), p.getValue()))
.collect(Collectors.toList())));
return organization;
}
}

View File

@ -2,9 +2,8 @@
package eu.dnetlib.dhp.oa.graph.dump.eosc;
import java.io.Serializable;
import java.util.List;
import eu.dnetlib.dhp.eosc.model.Organization;
import eu.dnetlib.dhp.eosc.model.Affiliation;
/**
* @author miriam.baglioni
@ -12,7 +11,7 @@ import eu.dnetlib.dhp.eosc.model.Organization;
*/
public class ResultOrganizations implements Serializable {
private String resultId;
private Organization affiliation;
private Affiliation affiliation;
public String getResultId() {
return resultId;
@ -22,11 +21,11 @@ public class ResultOrganizations implements Serializable {
this.resultId = resultId;
}
public Organization getAffiliation() {
public Affiliation getAffiliation() {
return affiliation;
}
public void setAffiliation(Organization affiliation) {
public void setAffiliation(Affiliation affiliation) {
this.affiliation = affiliation;
}
}

View File

@ -164,8 +164,9 @@
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--resultPath</arg><arg>${workingDir}/dump/publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/publicationextendedaffiliation</arg>
<arg>--workingPath</arg><arg>${workingDir}/dump/</arg>
<!-- <arg>&#45;&#45;resultPath</arg><arg>${workingDir}/dump/publication</arg>-->
<!-- <arg>&#45;&#45;outputPath</arg><arg>${workingDir}/dump/publicationextendedaffiliation</arg>-->
</spark>
<ok to="wait_eosc_dump"/>
<error to="Kill"/>

View File

@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.eosc.model.Indicator;
import eu.dnetlib.dhp.eosc.model.Organization;
import eu.dnetlib.dhp.eosc.model.Affiliation;
import eu.dnetlib.dhp.eosc.model.Result;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import scala.Tuple2;
@ -174,7 +174,7 @@ public class SelectEoscResultTest {
.getAffiliation()
.size());
List<Organization> affiliations = tmp
List<Affiliation> affiliations = tmp
.filter(r -> r.getId().equalsIgnoreCase("50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cba"))
.first()
.getAffiliation();
@ -184,7 +184,7 @@ public class SelectEoscResultTest {
affiliations.stream().anyMatch(a -> a.getName().equalsIgnoreCase("Doris Engineering (France)")));
Assertions.assertTrue(affiliations.stream().anyMatch(a -> a.getName().equalsIgnoreCase("RENNES METROPOLE")));
Organization organization = affiliations
Affiliation organization = affiliations
.stream()
.filter(a -> a.getId().equalsIgnoreCase("20|13811704aa70::51a6ade52065e3b371d1ae822e07f1ff"))
.findFirst()