[SKG-IF] refactoring and fixing issues

This commit is contained in:
Miriam Baglioni 2024-03-01 09:35:15 +01:00
parent 0c887ca015
commit 752fd896e4
32 changed files with 2082 additions and 1496 deletions

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.skgif.model;
import java.io.Serializable;
@ -7,7 +8,8 @@ import java.io.Serializable;
* @Date 22/02/24
*/
public class Contributor implements Serializable {
private String person; //I would not map it because we have only information regarding the person (if any) associated to the leading organization
private String person; // I would not map it because we have only information regarding the person (if any)
// associated to the leading organization
private String organization; // contributors.person
private String role;// private

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.skgif.model;
import java.io.Serializable;
@ -14,19 +15,22 @@ public class Datasource implements Serializable {
private String submission_policy_url;// submissionpolicyurl
private String preservation_policy_url;// preservationpolicyurl
private Boolean version_control;// versioncontrol bool
private List<PersistentIdentitySystems> persistent_identity_systems;//. product_type researchentitytype list type to be remapped to the eosc types
private List<PersistentIdentitySystems> persistent_identity_systems;// . product_type researchentitytype list type
// to be remapped to the eosc types
// persistent_identity_systems. pid_scheme pidsystems.value when not null. It can be a string with multiple values
private String jurisdiction;// jurisdiction.classname
private String data_source_classification;// eoscdatasourcetype.classname
private List<String> research_product_type;// researchentitytype list type to be remapped to the eosc types
private Boolean thematic;// thematic bool
private List<Licence> research_product_license; // .name not mappable listresearch_product_license.url not mappable
private List<String> research_product_access_policy;// "databaseaccesstype if open => open access (https://vocabularies.coar-repositories.org/access_rights/c_abf2/)
private List<String> research_product_access_policy;// "databaseaccesstype if open => open access
// (https://vocabularies.coar-repositories.org/access_rights/c_abf2/)
// if restricted => restricted access (https://vocabularies.coar-repositories.org/access_rights/c_16ec/)
// if closed => metadata only access (https://vocabularies.coar-repositories.org/access_rights/c_14cb/) " list
private List<Licence> research_product_metadata_license; // .name not mappable list
// research_product_metadata_license.url not mappable
private List<String>research_product_metadata_access_policy ;//researchproductmetadataccesspolicies list with the same mapping of research_product_access_policy
private List<String> research_product_metadata_access_policy;// researchproductmetadataccesspolicies list with the
// same mapping of research_product_access_policy
public String getLocal_identifier() {
return local_identifier;

View File

@ -1,11 +1,11 @@
package eu.dnetlib.dhp.skgif.model;
import java.io.Serializable;
import java.util.List;
import org.codehaus.jackson.annotate.JsonProperty;
import java.io.Serializable;
/**
* @author miriam.baglioni
* @Date 22/02/24
@ -28,7 +28,8 @@ public class Grant implements Serializable {
private String start_date;// startdate.value
private String end_date;// enddate.value
private String website;// websiteurl.value
private List<String> beneficiaries;// organization.id for the organizations in the relation with semantic class isParticipant produces the list of organization internal identifiers
private List<String> beneficiaries;// organization.id for the organizations in the relation with semantic class
// isParticipant produces the list of organization internal identifiers
private List<Contributor> contributors;//
public String getLocal_identifier() {

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.skgif.model;
import java.io.Serializable;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.skgif.model;
import java.io.Serializable;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.skgif.model;
public enum OrganizationTypes {
@ -5,13 +6,9 @@ public enum OrganizationTypes {
COMPANY("company"),
EDUCATION("education"),
FACILITY("facility"),
GOVERNMENT("government"),
HEALTHCARE("healthcare"),
NONPROFIT("nonprofit"),
FUNDER("funder"),
OTHER("other");
EDUCATION("education"), FACILITY("facility"), GOVERNMENT("government"), HEALTHCARE("healthcare"), NONPROFIT(
"nonprofit"), FUNDER("funder"), OTHER("other");
public final String label;
private OrganizationTypes(String label) {

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.skgif.model;
import java.io.Serializable;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.skgif.model;
package eu.dnetlib.dhp.skgif.model;
import java.io.Serializable;
@ -19,9 +19,7 @@ public enum Prefixes implements Serializable {
TEMPORARY_PERSON("temp_person_::"),
DATASOURCE("datasource__::"),
TOPIC("topic_______::"),
VENUE("venue_______::");
DATASOURCE("datasource__::"), TOPIC("topic_______::"), VENUE("venue_______::");
public final String label;

View File

@ -8,15 +8,10 @@ import java.io.Serializable;
* @Date 05/09/23
*/
public enum RelationType implements Serializable {
RESULT_OUTCOME_FUNDING("isProducedBy"),
RESULT_AFFILIATIED_TO_ORGANIZATION("hasAuthorInstitution"),
ORGANIZATION_PARTICIPANT_IN_PROJECT("isParticipant"),
SUPPLEMENT("IsSupplementedBy"),
DOCUMENTS(
"IsDocumentedBy"),
PART("IsPartOf"),
VERSION("IsNewVersionOf"),
CITATION("Cites");
RESULT_OUTCOME_FUNDING("isProducedBy"), RESULT_AFFILIATIED_TO_ORGANIZATION(
"hasAuthorInstitution"), ORGANIZATION_PARTICIPANT_IN_PROJECT("isParticipant"), SUPPLEMENT(
"IsSupplementedBy"), DOCUMENTS(
"IsDocumentedBy"), PART("IsPartOf"), VERSION("IsNewVersionOf"), CITATION("Cites");
public final String label;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.skgif.model;
import java.io.Serializable;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.skgif.model;
import java.io.Serializable;

View File

@ -1,18 +1,12 @@
package eu.dnetlib.dhp.skgif.model;
import java.io.Serializable;
public enum VenueIdentifierType implements Serializable {
EISSN("eissn"),
ISSN("issn"),
LISSN("lissn"),
ISBN("isbn"),
OPENDOAR(
"opendoar"),
R3DATA("re3data.org"),
FAIRSHARING("fairsharing");
EISSN("eissn"), ISSN("issn"), LISSN("lissn"), ISBN("isbn"), OPENDOAR(
"opendoar"), R3DATA("re3data.org"), FAIRSHARING("fairsharing");
public final String label;

View File

@ -1,17 +1,12 @@
package eu.dnetlib.dhp.skgif.model;
import java.io.Serializable;
public enum VenueType implements Serializable {
REPOSITORY("repository"),
JOURNAL("journal"),
CONFERENCE("conference"),
BOOK("book"),
OTHER(
"other"),
UNKNOWN("unknown");
REPOSITORY("repository"), JOURNAL("journal"), CONFERENCE("conference"), BOOK("book"), OTHER(
"other"), UNKNOWN("unknown");
public final String label;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.oa.graph.dump.exceptions;
public class CardinalityTooHighException extends Exception {

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.oa.graph.dump.exceptions;
public class NoAvailableEntityTypeException extends Exception {

View File

@ -1,9 +1,12 @@
package eu.dnetlib.dhp.oa.graph.dump.skgif;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.skgif.model.Identifier;
import eu.dnetlib.dhp.skgif.model.Prefixes;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
@ -15,11 +18,10 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.skgif.model.Identifier;
import eu.dnetlib.dhp.skgif.model.Prefixes;
/**
* @author miriam.baglioni
@ -67,33 +69,51 @@ public class DumpDatasource implements Serializable {
}
private static void mapDatasource(SparkSession spark, String inputPath, String outputPath) {
Utils.readPath(spark, inputPath + "datasource", Datasource.class)
.filter((FilterFunction<Datasource>) d -> !d.getDataInfo().getInvisible() && ! d.getDataInfo().getDeletedbyinference())
Utils
.readPath(spark, inputPath + "datasource", Datasource.class)
.filter(
(FilterFunction<Datasource>) d -> !d.getDataInfo().getInvisible()
&& !d.getDataInfo().getDeletedbyinference())
.map((MapFunction<Datasource, eu.dnetlib.dhp.skgif.model.Datasource>) d -> {
eu.dnetlib.dhp.skgif.model.Datasource datasource = new eu.dnetlib.dhp.skgif.model.Datasource();
datasource.setLocal_identifier(Utils.getIdentifier(Prefixes.DATASOURCE, d.getId()));
datasource.setIdentifiers(d.getPid()
datasource
.setIdentifiers(
d
.getPid()
.stream()
.map(p -> Identifier.newInstance(p.getQualifier().getClassid(), p.getValue()))
.collect(Collectors.toList()));
datasource.setName(d.getOfficialname().getValue());
datasource.setSubmission_policy_url(d.getSubmissionpolicyurl());
datasource.setJurisdiction(Optional.ofNullable(d.getJurisdiction())
.map(v -> v.getClassid()).
orElse(new String()));
datasource
.setJurisdiction(
Optional
.ofNullable(d.getJurisdiction())
.map(v -> v.getClassid())
.orElse(new String()));
datasource.setPreservation_policy_url(d.getPreservationpolicyurl());
datasource.setVersion_control(d.getVersioncontrol());
datasource.setData_source_classification(Optional.ofNullable(d.getEoscdatasourcetype())
.map(v -> v.getClassname()).
orElse(new String()));
datasource
.setData_source_classification(
Optional
.ofNullable(d.getEoscdatasourcetype())
.map(v -> v.getClassname())
.orElse(new String()));
datasource.setResearch_product_type(getEoscProductType(d.getResearchentitytypes()));
datasource.setThematic(d.getThematic());
datasource.setResearch_product_access_policy(Optional.ofNullable(d.getDatabaseaccesstype())
datasource
.setResearch_product_access_policy(
Optional
.ofNullable(d.getDatabaseaccesstype())
.map(v -> getResearchProductAccessPolicy(d.getDatabaseaccesstype().getValue()))
.orElse(new ArrayList<>()));
datasource.setResearch_product_metadata_access_policy(Optional.ofNullable(d.getResearchproductmetadataaccesspolicies())
datasource
.setResearch_product_metadata_access_policy(
Optional
.ofNullable(d.getResearchproductmetadataaccesspolicies())
.map(v -> getResearchProductAccessPolicy(d.getResearchproductmetadataaccesspolicies()))
.orElse(new ArrayList<>()));
return datasource;
@ -106,9 +126,15 @@ public class DumpDatasource implements Serializable {
private static List<String> getResearchProductAccessPolicy(List<String> value) {
return value.stream().map(v -> getResearchProductAccessPolicy(v)).filter(Objects::nonNull)
.map(v -> v.get(0)).distinct().collect(Collectors.toList());
return value
.stream()
.map(v -> getResearchProductAccessPolicy(v))
.filter(Objects::nonNull)
.map(v -> v.get(0))
.distinct()
.collect(Collectors.toList());
}
private static List<String> getResearchProductAccessPolicy(String value) {
// "databaseaccesstype if open => open access (https://vocabularies.coar-repositories.org/access_rights/c_abf2/)
// if restricted => restricted access (https://vocabularies.coar-repositories.org/access_rights/c_16ec/)

View File

@ -1,13 +1,16 @@
package eu.dnetlib.dhp.oa.graph.dump.skgif;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.skgif.model.Grant;
import eu.dnetlib.dhp.skgif.model.Identifier;
import eu.dnetlib.dhp.skgif.model.Prefixes;
import eu.dnetlib.dhp.skgif.model.RelationType;
import java.io.Serializable;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericData;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
@ -22,18 +25,16 @@ import org.dom4j.DocumentException;
import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.skgif.model.Grant;
import eu.dnetlib.dhp.skgif.model.Identifier;
import eu.dnetlib.dhp.skgif.model.Prefixes;
import eu.dnetlib.dhp.skgif.model.RelationType;
import scala.Tuple2;
import java.io.Serializable;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/**
* @author miriam.baglioni
* @Date 22/02/24
@ -80,14 +81,19 @@ public class DumpGrant implements Serializable {
}
private static void mapGrants(SparkSession spark, String inputPath, String outputPath) {
Dataset<Project> projects = Utils.readPath(spark, inputPath + "project", Project.class)
.filter((FilterFunction<Project>) p -> !p.getDataInfo().getDeletedbyinference() &&
Dataset<Project> projects = Utils
.readPath(spark, inputPath + "project", Project.class)
.filter(
(FilterFunction<Project>) p -> !p.getDataInfo().getDeletedbyinference() &&
!p.getDataInfo().getInvisible());
Dataset<Relation> relations = Utils.readPath(spark, inputPath + "relation", Relation.class)
.filter((FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
Dataset<Relation> relations = Utils
.readPath(spark, inputPath + "relation", Relation.class)
.filter(
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
!r.getDataInfo().getInvisible() &&
r.getRelClass().equalsIgnoreCase(RelationType.ORGANIZATION_PARTICIPANT_IN_PROJECT.label));
projects.joinWith(relations, projects.col("id").equalTo(relations.col("target")), "left")
projects
.joinWith(relations, projects.col("id").equalTo(relations.col("target")), "left")
.groupByKey((MapFunction<Tuple2<Project, Relation>, String>) t2 -> t2._1().getId(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Tuple2<Project, Relation>, Grant>) (k, v) -> {
Grant g = new Grant();
@ -95,29 +101,65 @@ public class DumpGrant implements Serializable {
g.setLocal_identifier(Utils.getIdentifier(Prefixes.GRANT, k));
g.setIdentifiers(getProjectIdentifier(first._1()));
g.setTitle(first._1().getTitle().getValue());
g.setSummary(Optional.ofNullable(first._1().getSummary())
.map(value->value.getValue()).orElse(new String()));
g.setAcronym(Optional.ofNullable(first._1().getAcronym())
.map(value->value.getValue()).orElse(new String()));
g
.setSummary(
Optional
.ofNullable(first._1().getSummary())
.map(value -> value.getValue())
.orElse(new String()));
g
.setAcronym(
Optional
.ofNullable(first._1().getAcronym())
.map(value -> value.getValue())
.orElse(new String()));
g.setFunder(getFunderName(first._1().getFundingtree().get(0).getValue()));
// * private String funding_stream;// fundingtree to be used the xpath //funding_level_[n]
g.setFunding_stream(getFundingStream(first._1().getFundingtree().get(0).getValue()));
g.setCurrency(Optional.ofNullable(first._1().getCurrency())
.map(value -> value.getValue()).orElse(new String()));
g.setFunded_amount(Optional.ofNullable(first._1().getFundedamount())
g
.setCurrency(
Optional
.ofNullable(first._1().getCurrency())
.map(value -> value.getValue())
.orElse(new String()));
g
.setFunded_amount(
Optional
.ofNullable(first._1().getFundedamount())
.orElse(null));
g.setKeywords(first._1().getSubjects()
.stream().map(s -> s.getValue()).collect(Collectors.toList()));
g.setStart_date(Optional.ofNullable(first._1().getStartdate())
.map(value -> value.getValue()).orElse(new String()));
g.setEnd_date(Optional.ofNullable(first._1().getEnddate())
.map(value -> value.getValue()).orElse(new String()));
g.setWebsite(Optional.ofNullable(first._1().getWebsiteurl())
.map(value -> value.getValue()).orElse(new String()));
g
.setKeywords(
first
._1()
.getSubjects()
.stream()
.map(s -> s.getValue())
.collect(Collectors.toList()));
g
.setStart_date(
Optional
.ofNullable(first._1().getStartdate())
.map(value -> value.getValue())
.orElse(new String()));
g
.setEnd_date(
Optional
.ofNullable(first._1().getEnddate())
.map(value -> value.getValue())
.orElse(new String()));
g
.setWebsite(
Optional
.ofNullable(first._1().getWebsiteurl())
.map(value -> value.getValue())
.orElse(new String()));
if (Optional.ofNullable(first._2()).isPresent()) {
List<String> relevantOrganizatios = new ArrayList<>();
relevantOrganizatios.add(Utils.getIdentifier(Prefixes.ORGANIZATION, first._2().getSource()));
v.forEachRemaining(t2 -> relevantOrganizatios.add(Utils.getIdentifier(Prefixes.ORGANIZATION, t2._2().getSource())));
v
.forEachRemaining(
t2 -> relevantOrganizatios
.add(Utils.getIdentifier(Prefixes.ORGANIZATION, t2._2().getSource())));
g.setBeneficiaries(relevantOrganizatios);
}
return g;
@ -149,14 +191,19 @@ public class DumpGrant implements Serializable {
}
private static List<Identifier> getProjectIdentifier(Project project) {
private static List<Identifier> getProjectIdentifier(Project project) throws DocumentException {
List<Identifier> identifiers = new ArrayList<>();
if (project.getPid().size() > 0)
return project.getPid().stream().map(p -> Identifier.newInstance(p.getQualifier().getClassid(), p.getValue())).collect(Collectors.toList());
return new ArrayList<>();
// private List<Identifier> identifiers;//.schema pid.qualifier.classid identifiers.value pid.value
//identifiers.schema funder acronym to be used the xpath //fundingtree/funder/shortname
//identifiers.value project.code
project
.getPid()
.stream()
.forEach(p -> identifiers.add(Identifier.newInstance(p.getQualifier().getClassid(), p.getValue())));
identifiers
.add(
Identifier
.newInstance(
getFunderName(project.getFundingtree().get(0).getValue()), project.getCode().getValue()));
return identifiers;
}
}

View File

@ -1,10 +1,12 @@
package eu.dnetlib.dhp.oa.graph.dump.skgif;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.skgif.model.Identifier;
import eu.dnetlib.dhp.skgif.model.OrganizationTypes;
import eu.dnetlib.dhp.skgif.model.Prefixes;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
@ -16,11 +18,11 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.Optional;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.skgif.model.Identifier;
import eu.dnetlib.dhp.skgif.model.OrganizationTypes;
import eu.dnetlib.dhp.skgif.model.Prefixes;
/**
* @author miriam.baglioni
@ -66,28 +68,46 @@ public class DumpOrganization implements Serializable {
private static void mapOrganization(SparkSession spark, String inputPath, String outputPath) {
Dataset<Organization> organizations = Utils.readPath(spark, inputPath + "organization", Organization.class);
organizations.filter((FilterFunction<Organization>) o -> !o.getDataInfo().getDeletedbyinference() && !o.getDataInfo().getInvisible())
organizations
.filter(
(FilterFunction<Organization>) o -> !o.getDataInfo().getDeletedbyinference()
&& !o.getDataInfo().getInvisible())
.map((MapFunction<Organization, eu.dnetlib.dhp.skgif.model.Organization>) o -> {
eu.dnetlib.dhp.skgif.model.Organization organization = new eu.dnetlib.dhp.skgif.model.Organization();
organization.setLocal_identifier(Utils.getIdentifier(Prefixes.ORGANIZATION, o.getId()));
organization.setCountry(Optional.ofNullable(o.getCountry().getClassid())
organization
.setCountry(
Optional
.ofNullable(o.getCountry().getClassid())
.orElse(new String()));
organization.setName(Optional.ofNullable(o.getLegalname().getValue())
organization
.setName(
Optional
.ofNullable(o.getLegalname().getValue())
.orElse(new String()));
organization.setShort_name(Optional.ofNullable(o.getLegalshortname())
organization
.setShort_name(
Optional
.ofNullable(o.getLegalshortname())
.map(v -> v.getValue())
.orElse(new String()));
organization.setIdentifiers(o.getPid()
organization
.setIdentifiers(
o
.getPid()
.stream()
.map(p -> Identifier.newInstance(p.getQualifier().getClassid(), p.getValue()))
.collect(Collectors.toList()));
organization.setOther_names(o.getAlternativeNames().stream()
organization
.setOther_names(
o
.getAlternativeNames()
.stream()
.map(a -> a.getValue())
.collect(Collectors.toList()));
organization.setType(getOrganizationType(o));
return organization;
}
, Encoders.bean(eu.dnetlib.dhp.skgif.model.Organization.class))
}, Encoders.bean(eu.dnetlib.dhp.skgif.model.Organization.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
@ -95,13 +115,17 @@ public class DumpOrganization implements Serializable {
}
private static String getOrganizationType(Organization o) {
if(Optional.ofNullable(o.getEcenterprise()).isPresent() && o.getEcenterprise().getValue().equalsIgnoreCase("true"))
if (Optional.ofNullable(o.getEcenterprise()).isPresent()
&& o.getEcenterprise().getValue().equalsIgnoreCase("true"))
return OrganizationTypes.COMPANY.label;
if(Optional.ofNullable(o.getEchighereducation()).isPresent() && o.getEchighereducation().getValue().equalsIgnoreCase("true"))
if (Optional.ofNullable(o.getEchighereducation()).isPresent()
&& o.getEchighereducation().getValue().equalsIgnoreCase("true"))
return OrganizationTypes.EDUCATION.label;
if(Optional.ofNullable(o.getEcresearchorganization()).isPresent() && o.getEcresearchorganization().getValue().equalsIgnoreCase("true"))
if (Optional.ofNullable(o.getEcresearchorganization()).isPresent()
&& o.getEcresearchorganization().getValue().equalsIgnoreCase("true"))
return OrganizationTypes.EDUCATION.label;
if(Optional.ofNullable(o.getEcnonprofit()).isPresent() && o.getEcnonprofit().getValue().equalsIgnoreCase("true"))
if (Optional.ofNullable(o.getEcnonprofit()).isPresent()
&& o.getEcnonprofit().getValue().equalsIgnoreCase("true"))
return OrganizationTypes.NONPROFIT.label;
return OrganizationTypes.OTHER.label;

View File

@ -6,11 +6,6 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.*;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EmitPerManifestation;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.PartialResearchProduct;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.RelationPerProduct;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
@ -22,9 +17,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EmitPerManifestation;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.PartialResearchProduct;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.RelationPerProduct;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.skgif.model.*;
import eu.dnetlib.dhp.skgif.model.AccessRight;
import eu.dnetlib.dhp.utils.DHPUtils;
@ -106,15 +105,20 @@ public class DumpResult implements Serializable {
Dataset<Datasource> datasource = Utils
.readPath(spark, inputPath + "/datasource", Datasource.class)
.filter(
(FilterFunction<Datasource>) d -> Optional.ofNullable(d.getEosctype()).isPresent() &&
(FilterFunction<Datasource>) d -> Optional.ofNullable(d.getEoscdatasourcetype()).isPresent() &&
d.getEoscdatasourcetype().getClassid().equalsIgnoreCase("Journal archive"));
Dataset<EmitPerManifestation> man = Utils
.readPath(spark, workingDir + e.name() + "/manifestation", EmitPerManifestation.class);
Dataset<PartialResearchProduct> partialResearchProduct = man.joinWith(datasource, man.col("instance.hostedby.key").equalTo(datasource.col("id")), "left")
.groupByKey((MapFunction<Tuple2<EmitPerManifestation, Datasource>, String>) t2 -> t2._1().getResultId(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Tuple2<EmitPerManifestation, Datasource>, PartialResearchProduct>) (k, v) -> {
Dataset<PartialResearchProduct> partialResearchProduct = man
.joinWith(datasource, man.col("instance.hostedby.key").equalTo(datasource.col("id")), "left")
.groupByKey(
(MapFunction<Tuple2<EmitPerManifestation, Datasource>, String>) t2 -> t2._1().getResultId(),
Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, Tuple2<EmitPerManifestation, Datasource>, PartialResearchProduct>) (
k, v) -> {
PartialResearchProduct prp = new PartialResearchProduct();
prp.setResultId(k);
List<Manifestation> manifestationList = new ArrayList<>();
@ -124,8 +128,11 @@ public class DumpResult implements Serializable {
return prp;
}, Encoders.bean(PartialResearchProduct.class));
partialResearchProduct
.joinWith(aggRelations, partialResearchProduct.col("resultId").equalTo(aggRelations.col("resultId")), "left")
.map((MapFunction<Tuple2<PartialResearchProduct, RelationPerProduct>, PartialResearchProduct>) t2 -> {
.joinWith(
aggRelations, partialResearchProduct.col("resultId").equalTo(aggRelations.col("resultId")),
"left")
.map(
(MapFunction<Tuple2<PartialResearchProduct, RelationPerProduct>, PartialResearchProduct>) t2 -> {
PartialResearchProduct prp = t2._1();
if (Optional.ofNullable(t2._2()).isPresent()) {
prp.setRelated_products(t2._2().getRelatedProduct());
@ -192,14 +199,20 @@ public class DumpResult implements Serializable {
manifestation.setAccess_right(AccessRight.UNAVAILABLE.label);
}
manifestation.setLicence(Optional.ofNullable(epm.getInstance().getLicense())
manifestation
.setLicence(
Optional
.ofNullable(epm.getInstance().getLicense())
.map(value -> value.getValue())
.orElse(null));
manifestation.setUrl(Optional.ofNullable(epm.getInstance().getUrl())
manifestation
.setUrl(
Optional
.ofNullable(epm.getInstance().getUrl())
.map(value -> value.get(0))
.orElse(null));
if (Optional.ofNullable(epm.getInstance().getPid()).isPresent()) {
if (Optional.ofNullable(epm.getInstance().getPid()).isPresent() && epm.getInstance().getPid().size() > 0) {
manifestation.setPid(epm.getInstance().getPid().get(0).getValue());
}
if (Optional.ofNullable(t2._2()).isPresent()) {
@ -215,77 +228,6 @@ public class DumpResult implements Serializable {
return manifestation;
}
// private static List<Manifestation> getManifestationList(Dataset<EmitPerManifestation> emitformanifestation,
// Dataset<Datasource> datasource) {
// return emitformanifestation
// .joinWith(
// datasource, emitformanifestation
// .col("hostedBy")
// .equalTo(datasource.col("id")),
// "left")
// .map((MapFunction<Tuple2<EmitPerManifestation, Datasource>, Manifestation>) t2 -> {
// // se il lato sinistro c'e' allora ho la biblio e la venue
// // se non c'e' allora ho solo gli altri valori
// EmitPerManifestation epm = t2._1();
// Manifestation manifestation = new Manifestation();
// manifestation.setProduct_local_type_schema(epm.getInstance().getInstancetype().getClassname());
// manifestation.setProduct_local_type_schema(epm.getInstance().getInstancetype().getSchemename());
// manifestation
// .setDates(
// Arrays
// .asList(
// Dates.newInstance(epm.getInstance().getDateofacceptance().getValue(), "publishing")));
// if (Optional.ofNullable(epm.getInstance().getRefereed()).isPresent())
// switch (epm.getInstance().getRefereed().getClassid()) {
// case "0000":
// manifestation.setPeer_review(PeerReview.UNAVAILABLE.label);
// break;
// case "0001":
// manifestation.setPeer_review(PeerReview.PEER_REVIEWED.label);
// break;
// case "0002":
// manifestation.setPeer_review(PeerReview.NON_PEER_REVIEWED.label);
// break;
// }
//
// manifestation.setMetadata_curation("unavailable");
// if (Optional.ofNullable(epm.getInstance().getAccessright()).isPresent())
// switch (epm.getInstance().getAccessright().getClassid()) {
// case "OPEN":
// case "OPEN DATA":
// case "OPEN SOURCE":
// manifestation.setAccess_right(AccessRight.OPEN.label);
// break;
// case "CLOSED":
// manifestation.setAccess_right(AccessRight.CLOSED.label);
// break;
// case "RESTRICTED":
// manifestation.setAccess_right(AccessRight.RESTRICTED.label);
// break;
// case "EMBARGO":
// case "12MONTHS":
// case "6MONTHS":
// manifestation.setAccess_right(AccessRight.EMBARGO.label);
// break;
// default:
// manifestation.setAccess_right(AccessRight.UNAVAILABLE.label);
//
// }
// manifestation.setLicence(epm.getInstance().getLicense().getValue());
// manifestation.setUrl(epm.getInstance().getUrl().get(0));
// if (Optional.ofNullable(epm.getInstance().getPid()).isPresent()) {
// manifestation.setPid(epm.getInstance().getPid().get(0).getValue());
// }
// if (Optional.ofNullable(t2._2()).isPresent())
// manifestation.setBiblio(getBiblio(epm));
// manifestation.setVenue("venue_______::" + DHPUtils.md5(epm.getInstance().getHostedby().getKey()));
// manifestation
// .setHosting_datasource("datasource__::" + DHPUtils.md5(epm.getInstance().getHostedby().getKey()));
// return manifestation;
// }, Encoders.bean(Manifestation.class))
// .collectAsList();
// }
private static Biblio getBiblio(EmitPerManifestation epm) {
Biblio biblio = new Biblio();
biblio.setEdition(epm.getJournal().getEdition());
@ -335,7 +277,8 @@ public class DumpResult implements Serializable {
Dataset<ResearchProduct> researchProducts = spark.emptyDataset(Encoders.bean(ResearchProduct.class));
for (EntityType e : ModelSupport.entityTypes.keySet()) {
if (ModelSupport.isResult(e))
researchProducts = researchProducts.union(Utils.readPath(spark,workingDir + e.name() + "/researchproduct", ResearchProduct.class));
researchProducts = researchProducts
.union(Utils.readPath(spark, workingDir + e.name() + "/researchproduct", ResearchProduct.class));
}
researchProducts
.write()
@ -346,11 +289,17 @@ public class DumpResult implements Serializable {
}
private static void selectRelations(SparkSession spark, String inputPath, String workingDir) {
Dataset<Relation> relation = Utils.readPath(spark,
Dataset<Relation> relation = Utils
.readPath(
spark,
inputPath + "relation", Relation.class)
.filter((FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
.filter(
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
!r.getDataInfo().getInvisible())
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase(RelationType.RESULT_AFFILIATIED_TO_ORGANIZATION.label) ||
.filter(
(FilterFunction<Relation>) r -> r
.getRelClass()
.equalsIgnoreCase(RelationType.RESULT_AFFILIATIED_TO_ORGANIZATION.label) ||
r.getRelClass().equalsIgnoreCase(RelationType.RESULT_OUTCOME_FUNDING.label) ||
r.getRelClass().equalsIgnoreCase(RelationType.SUPPLEMENT.label) ||
r.getRelClass().equalsIgnoreCase(RelationType.DOCUMENTS.label) ||
@ -378,7 +327,9 @@ public class DumpResult implements Serializable {
default:
if (!remainignRelations.keySet().contains(relClass))
remainignRelations.put(relClass, new ArrayList<>());
remainignRelations.get(relClass).add(Utils.getIdentifier(Prefixes.RESEARCH_PRODUCT, target));
remainignRelations
.get(relClass)
.add(Utils.getIdentifier(Prefixes.RESEARCH_PRODUCT, target));
}
}
for (String key : remainignRelations.keySet())

View File

@ -1,25 +1,30 @@
package eu.dnetlib.dhp.oa.graph.dump.skgif;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.skgif.model.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EmitPerManifestation;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.skgif.model.*;
import scala.Tuple2;
/**
* @author miriam.baglioni
@ -62,16 +67,26 @@ public class DumpVenue implements Serializable {
spark -> {
Utils.removeOutputDir(spark, outputPath + "Venue");
mapDatasource(spark, inputPath, outputPath, workingDir);
mapVenue(spark, inputPath, outputPath, workingDir);
});
}
private static void mapDatasource(SparkSession spark, String inputPath, String outputPath, String workingDir) {
Utils.readPath(spark, inputPath + "datasource", Datasource.class)
.filter((FilterFunction<Datasource>) d -> !d.getDataInfo().getInvisible() && ! d.getDataInfo().getDeletedbyinference()
&& d.getEoscdatasourcetype().getClassid().equalsIgnoreCase("Journal archive"))
.map((MapFunction<Datasource, eu.dnetlib.dhp.skgif.model.Venue>) d -> {
private static void mapVenue(SparkSession spark, String inputPath, String outputPath, String workingDir) {
Dataset<EmitPerManifestation> manifestationDataset = Utils
.readPath(spark, workingDir + "datasourcePublisher", EmitPerManifestation.class);
Dataset<Datasource> datasourceDataset = Utils
.readPath(spark, inputPath + "datasource", Datasource.class)
.filter(
(FilterFunction<Datasource>) d -> !d.getDataInfo().getInvisible()
&& !d.getDataInfo().getDeletedbyinference()
&& d.getEoscdatasourcetype().getClassid().equalsIgnoreCase("Journal archive"));
datasourceDataset
.joinWith(
manifestationDataset, datasourceDataset.col("id").equalTo(manifestationDataset.col("hostedby.key")),
"left")
.map((MapFunction<Tuple2<Datasource, EmitPerManifestation>, Venue>) t2 -> {
Venue venue = new Venue();
Datasource d = t2._1();
if (Optional.ofNullable(d.getJournal().getIssnPrinted()).isPresent())
venue.setLocal_identifier(Utils.getIdentifier(Prefixes.VENUE, d.getJournal().getIssnPrinted()));
else if (Optional.ofNullable(d.getJournal().getIssnOnline()).isPresent())
@ -79,8 +94,8 @@ public class DumpVenue implements Serializable {
venue.setIdentifiers(getVenueIdentifier(d.getJournal()));
venue.setName(d.getOfficialname().getValue());
venue.setType(VenueType.JOURNAL.label);
//todo add map for publisher. Get from results?
venue.setPublisher("find it from result");
if (Optional.ofNullable(t2._2()).isPresent())
venue.setPublisher(t2._2().getPublisher());
venue.setAcronym(null);
venue.setSeries(null);
venue.setIs_currently_full_oa(null);
@ -88,12 +103,14 @@ public class DumpVenue implements Serializable {
venue.setContributions(null);
return venue;
}, Encoders.bean(Venue.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + "Venues");
Utils.readPath(spark, workingDir + "Venues", Venue.class)
Utils
.readPath(spark, workingDir + "Venues", Venue.class)
.groupByKey((MapFunction<Venue, String>) v -> v.getLocal_identifier(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Venue, Venue>) (k, v) -> v.next(), Encoders.bean(Venue.class))
.write()
@ -115,9 +132,15 @@ public class DumpVenue implements Serializable {
private static List<String> getResearchProductAccessPolicy(List<String> value) {
return value.stream().map(v -> getResearchProductAccessPolicy(v)).filter(Objects::nonNull)
.map(v -> v.get(0)).distinct().collect(Collectors.toList());
return value
.stream()
.map(v -> getResearchProductAccessPolicy(v))
.filter(Objects::nonNull)
.map(v -> v.get(0))
.distinct()
.collect(Collectors.toList());
}
private static List<String> getResearchProductAccessPolicy(String value) {
// "databaseaccesstype if open => open access (https://vocabularies.coar-repositories.org/access_rights/c_abf2/)
// if restricted => restricted access (https://vocabularies.coar-repositories.org/access_rights/c_16ec/)

View File

@ -7,8 +7,6 @@ import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EmitPerManifestation;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
@ -23,10 +21,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EmitPerManifestation;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.skgif.model.*;
import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2;
@ -91,17 +90,21 @@ public class EmitFromResults implements Serializable {
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Utils
.readPath(spark, inputPath + e.name(), resultClazz)
.filter((FilterFunction<R>) r -> Optional.of(r.getSubject()).isPresent())
.filter((FilterFunction<R>) r -> Optional.ofNullable(r.getSubject()).isPresent())
.flatMap(
(FlatMapFunction<R, Topic>) r -> r
.getSubject()
.stream()
.filter(s -> s.getQualifier().getClassid().equalsIgnoreCase("fos") || s.getQualifier().getClassid().equalsIgnoreCase("sdg"))
.filter(
s -> s.getQualifier().getClassid().equalsIgnoreCase("fos")
|| s.getQualifier().getClassid().equalsIgnoreCase("sdg"))
.map(s -> {
Topic t = new Topic();
t
.setLocal_identifier(
Utils.getIdentifier(Prefixes.TOPIC ,s.getQualifier().getClassid() + s.getValue()));
Utils
.getIdentifier(
Prefixes.TOPIC, s.getQualifier().getClassid() + s.getValue()));
t
.setIdentifiers(
Arrays
@ -154,7 +157,8 @@ public class EmitFromResults implements Serializable {
p.setGiven_name(a.getName());
String identifier = new String();
if (Optional.ofNullable(a.getPid()).isPresent()) {
Tuple2<String, Boolean> orcid = eu.dnetlib.dhp.oa.graph.dump.skgif.Utils.getOrcid(a.getPid());
Tuple2<String, Boolean> orcid = eu.dnetlib.dhp.oa.graph.dump.skgif.Utils
.getOrcid(a.getPid());
if (orcid != null) {
identifier = Utils.getIdentifier(Prefixes.PERSON, orcid._1() + orcid._2());
if (orcid._2())
@ -164,12 +168,15 @@ public class EmitFromResults implements Serializable {
else
p
.setIdentifiers(
Arrays.asList(Identifier.newInstance("inferred_orcid", orcid._1())));
Arrays
.asList(Identifier.newInstance("inferred_orcid", orcid._1())));
} else {
if (Optional.ofNullable(a.getRank()).isPresent()) {
identifier = Utils.getIdentifier(Prefixes.TEMPORARY_PERSON,r.getId() + a.getRank());
identifier = Utils
.getIdentifier(Prefixes.TEMPORARY_PERSON, r.getId() + a.getRank());
} else {
identifier = Utils.getIdentifier(Prefixes.TEMPORARY_PERSON,r.getId() + count);
identifier = Utils
.getIdentifier(Prefixes.TEMPORARY_PERSON, r.getId() + count);
}
}
@ -243,6 +250,32 @@ public class EmitFromResults implements Serializable {
}
});
Dataset<EmitPerManifestation> emitPerManifestationDataset = Utils
.readPath(
spark, workingDir + "software/manifestation", EmitPerManifestation.class)
.union(
Utils
.readPath(
spark, workingDir + "dataset/manifestation", EmitPerManifestation.class))
.union(
Utils
.readPath(
spark, workingDir + "publication/manifestation", EmitPerManifestation.class))
.union(
Utils
.readPath(
spark, workingDir + "otherresearchproduct/manifestation", EmitPerManifestation.class));
emitPerManifestationDataset
.groupByKey((MapFunction<EmitPerManifestation, String>) p -> p.getHostedBy(), Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, EmitPerManifestation, EmitPerManifestation>) (k, v) -> v.next(),
Encoders.bean(EmitPerManifestation.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + "/datasourcePublisher");
}
}

View File

@ -5,10 +5,9 @@ import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.oa.graph.dump.skgif.exception.NoAllowedTypeException;
import eu.dnetlib.dhp.oa.graph.dump.skgif.exception.NoTitleFoundException;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.skgif.model.*;
import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2;
@ -59,7 +58,8 @@ public class ResultMapper implements Serializable {
contribution.setPerson(Utils.getIdentifier(Prefixes.PERSON, orcid._1() + orcid._2()));
} else {
if (Optional.ofNullable(a.getRank()).isPresent()) {
contribution.setPerson(Utils.getIdentifier(Prefixes.TEMPORARY_PERSON,input.getId() + a.getRank()));
contribution
.setPerson(Utils.getIdentifier(Prefixes.TEMPORARY_PERSON, input.getId() + a.getRank()));
} else {
contribution.setPerson(Utils.getIdentifier(Prefixes.TEMPORARY_PERSON, input.getId() + count));
}
@ -83,11 +83,14 @@ public class ResultMapper implements Serializable {
input
.getSubject()
.stream()
.filter(s -> s.getQualifier().getClassid().equalsIgnoreCase("fos") ||
.filter(
s -> s.getQualifier().getClassid().equalsIgnoreCase("fos") ||
s.getQualifier().getClassid().equalsIgnoreCase("sdg"))
.map(s -> {
ResultTopic topic = new ResultTopic();
topic.setTopic(Utils.getIdentifier(Prefixes.TOPIC, s.getQualifier().getClassid() + s.getValue()));
topic
.setTopic(
Utils.getIdentifier(Prefixes.TOPIC, s.getQualifier().getClassid() + s.getValue()));
if (Optional.ofNullable(s.getDataInfo()).isPresent()) {
Provenance provenance = new Provenance();
provenance.setTrust(Double.valueOf(s.getDataInfo().getTrust()));
@ -101,7 +104,6 @@ public class ResultMapper implements Serializable {
}
}
private static <E extends Result> void mapType(ResearchProduct out, E input) throws NoAllowedTypeException {
switch (input.getResulttype().getClassid()) {
case "publication":

View File

@ -5,16 +5,18 @@ import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.skgif.model.Prefixes;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
/**

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,216 @@
<workflow-app name="dump_graph" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="emit_from_result"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="emit_from_result">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extraction</name>
<class>eu.dnetlib.dhp.oa.graph.dump.skgif.EmitFromResults</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--workingDir</arg><arg>${workingDir}/</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
</spark>
<ok to="dump_result"/>
<error to="Kill"/>
</action>
<action name="dump_result">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table project </name>
<class>eu.dnetlib.dhp.oa.graph.dump.skgif.DumpResult</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}/</arg>
</spark>
<ok to="dump_datasource"/>
<error to="Kill"/>
</action>
<action name="dump_datasource">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table project </name>
<class>eu.dnetlib.dhp.oa.graph.dump.skgif.DumpDatasource</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}/</arg>
</spark>
<ok to="dump_venue"/>
<error to="Kill"/>
</action>
<action name="dump_venue">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table project </name>
<class>eu.dnetlib.dhp.oa.graph.dump.skgif.DumpVenue</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}/</arg>
</spark>
<ok to="dump_organization"/>
<error to="Kill"/>
</action>
<action name="dump_organization">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table project </name>
<class>eu.dnetlib.dhp.oa.graph.dump.skgif.DumpOrganization</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}/</arg>
</spark>
<ok to="dump_grant"/>
<error to="Kill"/>
</action>
<action name="dump_grant">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table project </name>
<class>eu.dnetlib.dhp.oa.graph.dump.skgif.DumpGrant</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}/</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -7,7 +7,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import eu.dnetlib.dhp.oa.graph.dump.skgif.EmitFromResultJobTest;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.jupiter.api.Assertions;
@ -17,10 +16,10 @@ import org.junit.jupiter.api.Test;
import com.google.gson.Gson;
import eu.dnetlib.dhp.oa.graph.dump.skgif.EmitFromResultJobTest;
import eu.dnetlib.dhp.oa.zenodoapi.MissingConceptDoiException;
import eu.dnetlib.dhp.oa.zenodoapi.ZenodoAPIClient;
@Disabled
public class ZenodoUploadTest {
@ -162,8 +161,6 @@ public class ZenodoUploadTest {
}
@Test
void depositBigFile() throws MissingConceptDoiException, IOException {
ZenodoAPIClient client = new ZenodoAPIClient(URL_STRING,

View File

@ -1,8 +1,11 @@
package eu.dnetlib.dhp.oa.graph.dump.skgif;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.skgif.model.Datasource;
import eu.dnetlib.dhp.skgif.model.Organization;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
@ -18,10 +21,10 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.skgif.model.Datasource;
import eu.dnetlib.dhp.skgif.model.Organization;
/**
* @author miriam.baglioni
@ -70,8 +73,8 @@ public class DumpDatasourceTest implements Serializable {
.getResource("/eu/dnetlib/dhp/oa/graph/dump/skgif/graph/")
.getPath();
DumpDatasource.main(
DumpDatasource
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
@ -123,8 +126,8 @@ Assertions.assertEquals(5,datasourceDataset.count());
.getResource("/eu/dnetlib/dhp/oa/graph/dump/skgif/graph_complete_entities/")
.getPath();
DumpDatasource.main(
DumpDatasource
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
@ -140,7 +143,8 @@ Assertions.assertEquals(5,datasourceDataset.count());
Dataset<Datasource> datasourceDataset = spark.createDataset(datasource.rdd(), Encoders.bean(Datasource.class));
datasourceDataset.foreach((ForeachFunction<Datasource>) d -> System.out.println(OBJECT_MAPPER.writeValueAsString(d)));
datasourceDataset
.foreach((ForeachFunction<Datasource>) d -> System.out.println(OBJECT_MAPPER.writeValueAsString(d)));
// Assertions.assertEquals(7, relationDataset.count());
// RelationPerProduct temp = relationDataset.filter((FilterFunction<RelationPerProduct>) r -> r.getResultId().equalsIgnoreCase("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")).first();
// Assertions.assertEquals(3, temp.getFunding().size()+temp.getRelatedProduct().size()+temp.getOrganizations().size());

View File

@ -1,8 +1,11 @@
package eu.dnetlib.dhp.oa.graph.dump.skgif;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.skgif.model.Datasource;
import eu.dnetlib.dhp.skgif.model.Grant;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
@ -17,10 +20,10 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.skgif.model.Datasource;
import eu.dnetlib.dhp.skgif.model.Grant;
/**
* @author miriam.baglioni
@ -69,8 +72,8 @@ public class DumpGrantTest implements Serializable {
.getResource("/eu/dnetlib/dhp/oa/graph/dump/skgif/graph/")
.getPath();
DumpGrant.main(
DumpGrant
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,

View File

@ -1,10 +1,11 @@
package eu.dnetlib.dhp.oa.graph.dump.skgif;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.RelationPerProduct;
import eu.dnetlib.dhp.skgif.model.Organization;
import eu.dnetlib.dhp.skgif.model.Prefixes;
import eu.dnetlib.dhp.skgif.model.ResearchProduct;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
@ -20,10 +21,12 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.RelationPerProduct;
import eu.dnetlib.dhp.skgif.model.Organization;
import eu.dnetlib.dhp.skgif.model.Prefixes;
import eu.dnetlib.dhp.skgif.model.ResearchProduct;
/**
* @author miriam.baglioni
@ -72,7 +75,6 @@ public class DumpOrganizationTest implements Serializable {
.getResource("/eu/dnetlib/dhp/oa/graph/dump/skgif/graph/")
.getPath();
DumpOrganization
.main(
new String[] {
@ -88,7 +90,8 @@ public class DumpOrganizationTest implements Serializable {
.textFile(workingDir.toString() + "/Organization")
.map(item -> OBJECT_MAPPER.readValue(item, Organization.class));
Dataset<Organization> organizationDataset = spark.createDataset(organization.rdd(), Encoders.bean(Organization.class));
Dataset<Organization> organizationDataset = spark
.createDataset(organization.rdd(), Encoders.bean(Organization.class));
Assertions.assertEquals(34 - 19, organizationDataset.count());
organizationDataset.show(false);
// Assertions.assertEquals(7, relationDataset.count());

View File

@ -1,10 +1,13 @@
package eu.dnetlib.dhp.oa.graph.dump.skgif;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EmitPerManifestation;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.RelationPerProduct;
import eu.dnetlib.dhp.skgif.model.*;
import eu.dnetlib.dhp.utils.DHPUtils;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.stream.Collectors;
import javax.validation.constraints.AssertTrue;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
@ -21,11 +24,12 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.validation.constraints.AssertTrue;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EmitPerManifestation;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.RelationPerProduct;
import eu.dnetlib.dhp.skgif.model.*;
import eu.dnetlib.dhp.utils.DHPUtils;
/**
* @author miriam.baglioni
@ -93,16 +97,38 @@ public class DumpResultTest {
.textFile(workingDir + "/aggrelation")
.map(item -> OBJECT_MAPPER.readValue(item, RelationPerProduct.class));
Dataset<RelationPerProduct> relationDataset = spark.createDataset(relation.rdd(), Encoders.bean(RelationPerProduct.class));
Dataset<RelationPerProduct> relationDataset = spark
.createDataset(relation.rdd(), Encoders.bean(RelationPerProduct.class));
relationDataset.show(false);
Assertions.assertEquals(7, relationDataset.count());
RelationPerProduct temp = relationDataset.filter((FilterFunction<RelationPerProduct>) r -> r.getResultId().equalsIgnoreCase("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")).first();
Assertions.assertEquals(3, temp.getFunding().size()+temp.getRelatedProduct().size()+temp.getOrganizations().size());
RelationPerProduct temp = relationDataset
.filter(
(FilterFunction<RelationPerProduct>) r -> r
.getResultId()
.equalsIgnoreCase("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9"))
.first();
Assertions
.assertEquals(
3, temp.getFunding().size() + temp.getRelatedProduct().size() + temp.getOrganizations().size());
Assertions.assertEquals(1, temp.getFunding().size());
Assertions.assertEquals(2, temp.getRelatedProduct().size());
Assertions.assertEquals(1, temp.getRelatedProduct().stream().filter(rp -> rp.getRelation_type().equalsIgnoreCase("issupplementedby")).count());
Assertions.assertEquals(1, temp.getRelatedProduct().stream().filter(rp -> rp.getRelation_type().equalsIgnoreCase("isdocumentedby")).count());
Assertions
.assertEquals(
1,
temp
.getRelatedProduct()
.stream()
.filter(rp -> rp.getRelation_type().equalsIgnoreCase("issupplementedby"))
.count());
Assertions
.assertEquals(
1,
temp
.getRelatedProduct()
.stream()
.filter(rp -> rp.getRelation_type().equalsIgnoreCase("isdocumentedby"))
.count());
JavaRDD<ResearchProduct> researchProduct = sc
.textFile(workingDir.toString() + "/publication/researchproduct")
@ -111,8 +137,28 @@ public class DumpResultTest {
org.apache.spark.sql.Dataset<ResearchProduct> researchProductDataset = spark
.createDataset(researchProduct.rdd(), Encoders.bean(ResearchProduct.class));
Assertions.assertEquals(1, researchProductDataset.filter((FilterFunction<ResearchProduct>) p -> p.getLocal_identifier().equalsIgnoreCase(Utils.getIdentifier(Prefixes.RESEARCH_PRODUCT, "50|DansKnawCris::0224aae28af558f21768dbc6439c7a95"))).count());
ResearchProduct product = researchProductDataset.filter((FilterFunction<ResearchProduct>) p -> p.getLocal_identifier().equalsIgnoreCase(Utils.getIdentifier(Prefixes.RESEARCH_PRODUCT, "50|DansKnawCris::0224aae28af558f21768dbc6439c7a95"))).first();
Assertions
.assertEquals(
1,
researchProductDataset
.filter(
(FilterFunction<ResearchProduct>) p -> p
.getLocal_identifier()
.equalsIgnoreCase(
Utils
.getIdentifier(
Prefixes.RESEARCH_PRODUCT,
"50|DansKnawCris::0224aae28af558f21768dbc6439c7a95")))
.count());
ResearchProduct product = researchProductDataset
.filter(
(FilterFunction<ResearchProduct>) p -> p
.getLocal_identifier()
.equalsIgnoreCase(
Utils
.getIdentifier(
Prefixes.RESEARCH_PRODUCT, "50|DansKnawCris::0224aae28af558f21768dbc6439c7a95")))
.first();
Assertions.assertEquals(2, product.getRelevant_organizations().size());
Assertions.assertEquals(1, product.getFunding().size());
Assertions.assertEquals(0, product.getRelated_products().size());
@ -121,8 +167,6 @@ public class DumpResultTest {
researchProductDataset.show(false);
}
@Test
@ -163,12 +207,42 @@ public class DumpResultTest {
// check the pids of the result
Assertions.assertEquals(3, rp.getIdentifiers().size());
Assertions.assertEquals(1, rp.getIdentifiers().stream().filter(p->p.getScheme().equalsIgnoreCase("doi")).count());
Assertions.assertEquals("10.1007/s40199-021-00403-x", rp.getIdentifiers().stream().filter(p->p.getScheme().equalsIgnoreCase("doi")).collect(Collectors.toList()).get(0).getValue());
Assertions.assertEquals(1, rp.getIdentifiers().stream().filter(p->p.getScheme().equalsIgnoreCase("pmid")).count());
Assertions.assertEquals("34327650", rp.getIdentifiers().stream().filter(p->p.getScheme().equalsIgnoreCase("pmid")).collect(Collectors.toList()).get(0).getValue());
Assertions.assertEquals(1, rp.getIdentifiers().stream().filter(p->p.getScheme().equalsIgnoreCase("pmc")).count());
Assertions.assertEquals("PMC8602609", rp.getIdentifiers().stream().filter(p->p.getScheme().equalsIgnoreCase("pmc")).collect(Collectors.toList()).get(0).getValue());
Assertions
.assertEquals(1, rp.getIdentifiers().stream().filter(p -> p.getScheme().equalsIgnoreCase("doi")).count());
Assertions
.assertEquals(
"10.1007/s40199-021-00403-x",
rp
.getIdentifiers()
.stream()
.filter(p -> p.getScheme().equalsIgnoreCase("doi"))
.collect(Collectors.toList())
.get(0)
.getValue());
Assertions
.assertEquals(1, rp.getIdentifiers().stream().filter(p -> p.getScheme().equalsIgnoreCase("pmid")).count());
Assertions
.assertEquals(
"34327650",
rp
.getIdentifiers()
.stream()
.filter(p -> p.getScheme().equalsIgnoreCase("pmid"))
.collect(Collectors.toList())
.get(0)
.getValue());
Assertions
.assertEquals(1, rp.getIdentifiers().stream().filter(p -> p.getScheme().equalsIgnoreCase("pmc")).count());
Assertions
.assertEquals(
"PMC8602609",
rp
.getIdentifiers()
.stream()
.filter(p -> p.getScheme().equalsIgnoreCase("pmc"))
.collect(Collectors.toList())
.get(0)
.getValue());
// check the title
Assertions.assertEquals(1, rp.getTitles().keySet().size());
@ -185,27 +259,84 @@ public class DumpResultTest {
// check topics
Assertions.assertEquals(3, rp.getTopics().size());
Assertions.assertTrue(rp.getTopics().stream().anyMatch(t -> t.getTopic().equalsIgnoreCase(Prefixes.TOPIC.label + DHPUtils.md5("FOSSustained delivery"))));
Assertions
.assertTrue(
rp
.getTopics()
.stream()
.anyMatch(
t -> t
.getTopic()
.equalsIgnoreCase(Prefixes.TOPIC.label + DHPUtils.md5("FOSSustained delivery"))));
// check contributions
Assertions.assertEquals(4, rp.getContributions().size());
Assertions.assertEquals(3, rp.getContributions().stream().filter(c -> c.getPerson().startsWith("person")).count());
Assertions.assertEquals(1, rp.getContributions().stream().filter(c -> c.getPerson().startsWith("temp")).count());
Assertions
.assertEquals(3, rp.getContributions().stream().filter(c -> c.getPerson().startsWith("person")).count());
Assertions
.assertEquals(1, rp.getContributions().stream().filter(c -> c.getPerson().startsWith("temp")).count());
rp.getContributions().forEach(c -> Assertions.assertTrue(c.getDeclared_affiliation() == null));
Assertions.assertEquals(1, rp.getContributions().stream().filter(c -> c.getPerson().equals(Utils.getIdentifier(Prefixes.PERSON, "0000-0001-8284-6269true")))
.collect(Collectors.toList()).get(0).getRank());
Assertions.assertEquals(2, rp.getContributions().stream().filter(c -> c.getPerson().equals(Utils.getIdentifier(Prefixes.PERSON, "0000-0002-0940-893xtrue")))
.collect(Collectors.toList()).get(0).getRank());
Assertions.assertEquals(3, rp.getContributions().stream().filter(c -> c.getPerson().equals(Utils.getIdentifier(Prefixes.PERSON, "0000-0001-5291-577xtrue")))
.collect(Collectors.toList()).get(0).getRank());
Assertions.assertEquals(4, rp.getContributions().stream().filter(c -> c.getPerson().equals(Utils.getIdentifier(Prefixes.TEMPORARY_PERSON, "50|doi_dedup___::0000661be7c602727bae9690778b16514")))
.collect(Collectors.toList()).get(0).getRank());
Assertions
.assertEquals(
1,
rp
.getContributions()
.stream()
.filter(c -> c.getPerson().equals(Utils.getIdentifier(Prefixes.PERSON, "0000-0001-8284-6269true")))
.collect(Collectors.toList())
.get(0)
.getRank());
Assertions
.assertEquals(
2,
rp
.getContributions()
.stream()
.filter(c -> c.getPerson().equals(Utils.getIdentifier(Prefixes.PERSON, "0000-0002-0940-893xtrue")))
.collect(Collectors.toList())
.get(0)
.getRank());
Assertions
.assertEquals(
3,
rp
.getContributions()
.stream()
.filter(c -> c.getPerson().equals(Utils.getIdentifier(Prefixes.PERSON, "0000-0001-5291-577xtrue")))
.collect(Collectors.toList())
.get(0)
.getRank());
Assertions
.assertEquals(
4,
rp
.getContributions()
.stream()
.filter(
c -> c
.getPerson()
.equals(
Utils
.getIdentifier(
Prefixes.TEMPORARY_PERSON,
"50|doi_dedup___::0000661be7c602727bae9690778b16514")))
.collect(Collectors.toList())
.get(0)
.getRank());
researchProductDataset.show(10, 100, true);
// check manifestation 1
Assertions.assertEquals(3, rp.getManifestations().size());
Manifestation manifestation = rp.getManifestations().stream().filter(m -> m.getHosting_datasource().equals(Utils.getIdentifier(Prefixes.DATASOURCE , "10|doajarticles::6107489403b31fc7cf37cb7fda35f7f1")))
.collect(Collectors.toList()).get(0);
Manifestation manifestation = rp
.getManifestations()
.stream()
.filter(
m -> m
.getHosting_datasource()
.equals(
Utils.getIdentifier(Prefixes.DATASOURCE, "10|doajarticles::6107489403b31fc7cf37cb7fda35f7f1")))
.collect(Collectors.toList())
.get(0);
Assertions.assertEquals("Article", manifestation.getProduct_local_type());
Assertions.assertEquals("dnet:publication_resource", manifestation.getProduct_local_type_schema());
Assertions.assertEquals(1, manifestation.getDates().size());
@ -227,8 +358,16 @@ public class DumpResultTest {
Assertions.assertEquals("438", biblio.getEnd_page());
// check manifestation 2
manifestation = rp.getManifestations().stream().filter(m -> m.getHosting_datasource().equals(Utils.getIdentifier(Prefixes.DATASOURCE , "10|openaire____::55045bd2a65019fd8e6741a755395c8c")))
.collect(Collectors.toList()).get(0);
manifestation = rp
.getManifestations()
.stream()
.filter(
m -> m
.getHosting_datasource()
.equals(
Utils.getIdentifier(Prefixes.DATASOURCE, "10|openaire____::55045bd2a65019fd8e6741a755395c8c")))
.collect(Collectors.toList())
.get(0);
Assertions.assertEquals("Article", manifestation.getProduct_local_type());
Assertions.assertEquals("dnet:publication_resource", manifestation.getProduct_local_type_schema());
Assertions.assertEquals(1, manifestation.getDates().size());
@ -243,8 +382,16 @@ public class DumpResultTest {
Assertions.assertTrue(manifestation.getBiblio() == null);
// check manifestation 3
manifestation = rp.getManifestations().stream().filter(m -> m.getHosting_datasource().equals(Utils.getIdentifier(Prefixes.DATASOURCE , "10|opendoar____::8b6dd7db9af49e67306feb59a8bdc52c")))
.collect(Collectors.toList()).get(0);
manifestation = rp
.getManifestations()
.stream()
.filter(
m -> m
.getHosting_datasource()
.equals(
Utils.getIdentifier(Prefixes.DATASOURCE, "10|opendoar____::8b6dd7db9af49e67306feb59a8bdc52c")))
.collect(Collectors.toList())
.get(0);
Assertions.assertEquals("Other literature type", manifestation.getProduct_local_type());
Assertions.assertEquals("dnet:publication_resource", manifestation.getProduct_local_type_schema());
Assertions.assertEquals(1, manifestation.getDates().size());
@ -260,8 +407,10 @@ public class DumpResultTest {
// check relevant organization
Assertions.assertEquals(1, rp.getRelevant_organizations().size());
Assertions.assertEquals(Prefixes.ORGANIZATION.label + "601e510b1fda7cc6cb03329531502171", rp.getRelevant_organizations().get(0));
Assertions
.assertEquals(
Prefixes.ORGANIZATION.label + "601e510b1fda7cc6cb03329531502171",
rp.getRelevant_organizations().get(0));
// check funding
Assertions.assertEquals(1, rp.getFunding().size());
@ -269,12 +418,61 @@ public class DumpResultTest {
// check related products
Assertions.assertEquals(5, rp.getRelated_products().size());
Assertions.assertEquals(4, rp.getRelated_products().stream().filter(r -> r.getRelation_type().equalsIgnoreCase(RelationType.CITATION.label)).collect(Collectors.toList()).get(0).getProduct_list().size());
Assertions.assertEquals(1, rp.getRelated_products().stream().filter(r -> r.getRelation_type().equalsIgnoreCase(RelationType.DOCUMENTS.label)).collect(Collectors.toList()).get(0).getProduct_list().size());
Assertions.assertEquals(1, rp.getRelated_products().stream().filter(r -> r.getRelation_type().equalsIgnoreCase(RelationType.PART.label)).collect(Collectors.toList()).get(0).getProduct_list().size());
Assertions.assertEquals(1, rp.getRelated_products().stream().filter(r -> r.getRelation_type().equalsIgnoreCase(RelationType.SUPPLEMENT.label)).collect(Collectors.toList()).get(0).getProduct_list().size());
Assertions.assertEquals(1, rp.getRelated_products().stream().filter(r -> r.getRelation_type().equalsIgnoreCase(RelationType.VERSION.label)).collect(Collectors.toList()).get(0).getProduct_list().size());
Assertions
.assertEquals(
4,
rp
.getRelated_products()
.stream()
.filter(r -> r.getRelation_type().equalsIgnoreCase(RelationType.CITATION.label))
.collect(Collectors.toList())
.get(0)
.getProduct_list()
.size());
Assertions
.assertEquals(
1,
rp
.getRelated_products()
.stream()
.filter(r -> r.getRelation_type().equalsIgnoreCase(RelationType.DOCUMENTS.label))
.collect(Collectors.toList())
.get(0)
.getProduct_list()
.size());
Assertions
.assertEquals(
1,
rp
.getRelated_products()
.stream()
.filter(r -> r.getRelation_type().equalsIgnoreCase(RelationType.PART.label))
.collect(Collectors.toList())
.get(0)
.getProduct_list()
.size());
Assertions
.assertEquals(
1,
rp
.getRelated_products()
.stream()
.filter(r -> r.getRelation_type().equalsIgnoreCase(RelationType.SUPPLEMENT.label))
.collect(Collectors.toList())
.get(0)
.getProduct_list()
.size());
Assertions
.assertEquals(
1,
rp
.getRelated_products()
.stream()
.filter(r -> r.getRelation_type().equalsIgnoreCase(RelationType.VERSION.label))
.collect(Collectors.toList())
.get(0)
.getProduct_list()
.size());
}

View File

@ -5,7 +5,6 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import eu.dnetlib.dhp.skgif.model.Topic;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
@ -24,9 +23,9 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EmitPerManifestation;
import eu.dnetlib.dhp.skgif.model.Persons;
import eu.dnetlib.dhp.skgif.model.Topic;
//@Disabled
public class EmitFromResultJobTest {
@ -101,17 +100,54 @@ public class EmitFromResultJobTest {
.createDataset(persons.rdd(), Encoders.bean(Persons.class));
personsDataset.show(false);
Persons claudiaBorer = personsDataset.filter((FilterFunction<Persons>) p -> p.getLocal_identifier().equalsIgnoreCase("tmp_person__::2c1eea261f7d9a97ab7ca8c4200781db"))
Persons claudiaBorer = personsDataset
.filter(
(FilterFunction<Persons>) p -> p
.getLocal_identifier()
.equalsIgnoreCase("tmp_person__::2c1eea261f7d9a97ab7ca8c4200781db"))
.first();
Assertions.assertEquals(2, personsDataset.filter((FilterFunction<Persons>) p -> p.getGiven_name().equalsIgnoreCase("claudia") && p.getFamily_name().equalsIgnoreCase("borer")).count());
Assertions.assertEquals(1, personsDataset.filter((FilterFunction<Persons>) p -> p.getGiven_name().equalsIgnoreCase("claudia") && p.getFamily_name().equalsIgnoreCase("borer") && !p.getLocal_identifier().equalsIgnoreCase("tmp_person__::2c1eea261f7d9a97ab7ca8c4200781db")).count());
Assertions
.assertEquals(
2,
personsDataset
.filter(
(FilterFunction<Persons>) p -> p.getGiven_name().equalsIgnoreCase("claudia")
&& p.getFamily_name().equalsIgnoreCase("borer"))
.count());
Assertions
.assertEquals(
1,
personsDataset
.filter(
(FilterFunction<Persons>) p -> p.getGiven_name().equalsIgnoreCase("claudia")
&& p.getFamily_name().equalsIgnoreCase("borer")
&& !p
.getLocal_identifier()
.equalsIgnoreCase("tmp_person__::2c1eea261f7d9a97ab7ca8c4200781db"))
.count());
Assertions.assertEquals("claudia", claudiaBorer.getGiven_name().toLowerCase());
Assertions.assertEquals("borer", claudiaBorer.getFamily_name().toLowerCase());
Assertions.assertEquals(2, personsDataset.filter((FilterFunction<Persons>) p -> p.getLocal_identifier().startsWith("person")).count());
Assertions.assertEquals(1, personsDataset.filter((FilterFunction<Persons>) p -> p.getLocal_identifier().startsWith("person") && p.getIdentifiers().get(0).getValue().equals("0000-0002-5597-4916")).count());
Persons orcidPerson = personsDataset.filter((FilterFunction<Persons>) p -> p.getLocal_identifier().startsWith("person") && p.getIdentifiers().get(0).getValue().equals("0000-0002-5597-4916")).first();
Assertions
.assertEquals(
2,
personsDataset
.filter((FilterFunction<Persons>) p -> p.getLocal_identifier().startsWith("person"))
.count());
Assertions
.assertEquals(
1,
personsDataset
.filter(
(FilterFunction<Persons>) p -> p.getLocal_identifier().startsWith("person")
&& p.getIdentifiers().get(0).getValue().equals("0000-0002-5597-4916"))
.count());
Persons orcidPerson = personsDataset
.filter(
(FilterFunction<Persons>) p -> p.getLocal_identifier().startsWith("person")
&& p.getIdentifiers().get(0).getValue().equals("0000-0002-5597-4916"))
.first();
Assertions.assertEquals("M.", orcidPerson.getGiven_name());
Assertions.assertEquals("Kooi", orcidPerson.getFamily_name());
Assertions.assertEquals(1, orcidPerson.getIdentifiers().size());
@ -129,7 +165,6 @@ public class EmitFromResultJobTest {
}
@Test
public void testEmitFromResultComplete() throws Exception {
final String sourcePath = getClass()
@ -194,6 +229,5 @@ public class EmitFromResultJobTest {
// Assertions.assertEquals(4, manifestationDataset.count());
//
}
}