diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Contribution.java b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Contribution.java index 3c23506..bbc54f7 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Contribution.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Contribution.java @@ -11,17 +11,17 @@ import com.fasterxml.jackson.annotation.JsonProperty; * @Date 01/09/23 */ public class Contribution implements Serializable { - private String person; + private MinPerson person; @JsonProperty("declared_affiliations") private List declared_affiliation; private List roles; private Integer rank; - public String getPerson() { + public MinPerson getPerson() { return person; } - public void setPerson(String person) { + public void setPerson(MinPerson person) { this.person = person; } diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Contributor.java b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Contributor.java index b47c0f5..4004ae2 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Contributor.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Contributor.java @@ -8,9 +8,33 @@ import java.io.Serializable; * @Date 22/02/24 */ public class Contributor implements Serializable { - private String person; // I would not map it because we have only information regarding the person (if any) + private MinPerson person; // I would not map it because we have only information regarding the person (if any) // associated to the leading organization private String organization; // contributors.person private String role;// private + + public MinPerson getPerson() { + return person; + } + + public void setPerson(MinPerson person) { + this.person = person; + } + + public String getOrganization() { + return organization; + } + + public void setOrganization(String organization) { + this.organization = organization; + } + + public String getRole() { + return role; + } + + public void setRole(String role) { + this.role = role; + } } diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Datasource.java b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Datasource.java index d8b94ec..6353517 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Datasource.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Datasource.java @@ -31,6 +31,15 @@ public class Datasource implements Serializable { // research_product_metadata_license.url not mappable private List research_product_metadata_access_policy;// researchproductmetadataccesspolicies list with the // same mapping of research_product_access_policy + private List organization; + + public List getOrganization() { + return organization; + } + + public void setOrganization(List organization) { + this.organization = organization; + } public String getLocal_identifier() { return local_identifier; diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Grant.java b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Grant.java index 05abf66..52dd5a8 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Grant.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Grant.java @@ -28,7 +28,7 @@ public class Grant implements Serializable { private String start_date;// startdate.value private String end_date;// enddate.value private String website;// websiteurl.value - private List beneficiaries;// organization.id for the organizations in the relation with semantic class + private List beneficiaries;// organization.id for the organizations in the relation with semantic class // isParticipant produces the list of organization internal identifiers private List contributors;// @@ -136,11 +136,11 @@ public class Grant implements Serializable { this.website = website; } - public List getBeneficiaries() { + public List getBeneficiaries() { return beneficiaries; } - public void setBeneficiaries(List beneficiaries) { + public void setBeneficiaries(List beneficiaries) { this.beneficiaries = beneficiaries; } diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Manifestation.java b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Manifestation.java index 11c470d..70c2631 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Manifestation.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Manifestation.java @@ -28,9 +28,9 @@ public class Manifestation implements Serializable { @JsonProperty("licance_schema") private String licence_schema; private Biblio biblio; - private String venue; + private MinVenue venue; @JsonProperty("hosting_datasource") - private String hosting_datasource; + private MinVenue hosting_datasource; public String getProduct_local_type() { return product_local_type; @@ -120,19 +120,19 @@ public class Manifestation implements Serializable { this.biblio = biblio; } - public String getVenue() { + public MinVenue getVenue() { return venue; } - public void setVenue(String venue) { + public void setVenue(MinVenue venue) { this.venue = venue; } - public String getHosting_datasource() { + public MinVenue getHosting_datasource() { return hosting_datasource; } - public void setHosting_datasource(String hosting_datasource) { + public void setHosting_datasource(MinVenue hosting_datasource) { this.hosting_datasource = hosting_datasource; } } diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/MinGrant.java b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/MinGrant.java new file mode 100644 index 0000000..a4a1b6e --- /dev/null +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/MinGrant.java @@ -0,0 +1,37 @@ +package eu.dnetlib.dhp.skgif.model; + +import java.io.Serializable; + +/** + * @author miriam.baglioni + * @Date 04/03/24 + */ +public class MinGrant implements Serializable { + private String local_identifier; + private String funder; + private String code; + + public String getLocal_identifier() { + return local_identifier; + } + + public void setLocal_identifier(String local_identifier) { + this.local_identifier = local_identifier; + } + + public String getFunder() { + return funder; + } + + public void setFunder(String funder) { + this.funder = funder; + } + + public String getCode() { + return code; + } + + public void setCode(String code) { + this.code = code; + } +} diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/MinOrganization.java b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/MinOrganization.java new file mode 100644 index 0000000..c9bf5d5 --- /dev/null +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/MinOrganization.java @@ -0,0 +1,48 @@ +package eu.dnetlib.dhp.skgif.model; + +import java.io.Serializable; + +/** + * @author miriam.baglioni + * @Date 04/03/24 + */ +public class MinOrganization implements Serializable { + private String local_identifier; + private String name; + private String ror; + private String isni; + + public String getLocal_identifier() { + return local_identifier; + } + + public void setLocal_identifier(String local_identifier) { + this.local_identifier = local_identifier; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getRor() { + return ror; + } + + public void setRor(String ror) { + this.ror = ror; + } + + public String getIsni() { + return isni; + } + + public void setIsni(String isni) { + this.isni = isni; + } + + +} diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/MinPerson.java b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/MinPerson.java new file mode 100644 index 0000000..ca64492 --- /dev/null +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/MinPerson.java @@ -0,0 +1,37 @@ +package eu.dnetlib.dhp.skgif.model; + +import java.io.Serializable; + +/** + * @author miriam.baglioni + * @Date 04/03/24 + */ +public class MinPerson implements Serializable { + private String local_identifier; + private String full_name; + private String orcid; + + public String getLocal_identifier() { + return local_identifier; + } + + public void setLocal_identifier(String local_identifier) { + this.local_identifier = local_identifier; + } + + public String getFull_name() { + return full_name; + } + + public void setFull_name(String full_name) { + this.full_name = full_name; + } + + public String getOrcid() { + return orcid; + } + + public void setOrcid(String orcid) { + this.orcid = orcid; + } +} diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/MinProduct.java b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/MinProduct.java new file mode 100644 index 0000000..20b9e0d --- /dev/null +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/MinProduct.java @@ -0,0 +1,55 @@ +package eu.dnetlib.dhp.skgif.model; + +import java.io.Serializable; + +/** + * @author miriam.baglioni + * @Date 04/03/24 + */ +public class MinProduct implements Serializable { + private String local_identifier; + private String title; + private String doi; + private String pmcid; + private String arxivid; + + public String getLocal_identifier() { + return local_identifier; + } + + public void setLocal_identifier(String local_identifier) { + this.local_identifier = local_identifier; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getDoi() { + return doi; + } + + public void setDoi(String doi) { + this.doi = doi; + } + + public String getPmcid() { + return pmcid; + } + + public void setPmcid(String pmcid) { + this.pmcid = pmcid; + } + + public String getArxivid() { + return arxivid; + } + + public void setArxivid(String arxivid) { + this.arxivid = arxivid; + } +} diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/MinTopic.java b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/MinTopic.java new file mode 100644 index 0000000..93d16ff --- /dev/null +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/MinTopic.java @@ -0,0 +1,28 @@ +package eu.dnetlib.dhp.skgif.model; + +import java.io.Serializable; + +/** + * @author miriam.baglioni + * @Date 04/03/24 + */ +public class MinTopic implements Serializable { + private String local_identifier; + private String value; + + public String getLocal_identifier() { + return local_identifier; + } + + public void setLocal_identifier(String local_identifier) { + this.local_identifier = local_identifier; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } +} diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/MinVenue.java b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/MinVenue.java new file mode 100644 index 0000000..4473cbb --- /dev/null +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/MinVenue.java @@ -0,0 +1,35 @@ +package eu.dnetlib.dhp.skgif.model; + +import java.io.Serializable; + +/** + * @author miriam.baglioni + * @Date 04/03/24 + */ +public class MinVenue implements Serializable { + private String local_identifier; + private String name; + + public String getLocal_identifier() { + return local_identifier; + } + + public void setLocal_identifier(String loval_identifier) { + this.local_identifier = loval_identifier; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public static MinVenue newInstance(String local_identifier, String name){ + MinVenue minVenue = new MinVenue(); + minVenue.local_identifier = local_identifier; + minVenue.name = name; + return minVenue; + } +} diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/RelationType.java b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/RelationType.java index 477bf77..0d4f1d3 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/RelationType.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/RelationType.java @@ -8,10 +8,18 @@ import java.io.Serializable; * @Date 05/09/23 */ public enum RelationType implements Serializable { - RESULT_OUTCOME_FUNDING("isProducedBy"), RESULT_AFFILIATIED_TO_ORGANIZATION( - "hasAuthorInstitution"), ORGANIZATION_PARTICIPANT_IN_PROJECT("isParticipant"), SUPPLEMENT( - "IsSupplementedBy"), DOCUMENTS( - "IsDocumentedBy"), PART("IsPartOf"), VERSION("IsNewVersionOf"), CITATION("Cites"); + RESULT_OUTCOME_FUNDING("isProducedBy"), + RESULT_AFFILIATIED_TO_ORGANIZATION( + "hasAuthorInstitution"), + DATASOURCE_PROVIDED_BY_ORGANIZATION ("isProvidedBy"), + ORGANIZATION_PARTICIPANT_IN_PROJECT("isParticipant"), + SUPPLEMENT( + "IsSupplementedBy"), + DOCUMENTS( + "IsDocumentedBy"), + PART("IsPartOf"), + VERSION("IsNewVersionOf"), + CITATION("Cites"); public final String label; diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Relations.java b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Relations.java index 1e22c69..8ac3945 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Relations.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/Relations.java @@ -14,9 +14,9 @@ public class Relations implements Serializable { @JsonProperty("relation_type") private String relation_type; @JsonProperty("product_list") - private List product_list; + private List product_list; - public static Relations newInstance(String relClass, List target) { + public static Relations newInstance(String relClass, List target) { Relations r = new Relations(); r.relation_type = relClass; r.product_list = target; @@ -31,11 +31,11 @@ public class Relations implements Serializable { this.relation_type = relation_type; } - public List getProduct_list() { + public List getProduct_list() { return product_list; } - public void setProduct_list(List product_list) { + public void setProduct_list(List product_list) { this.product_list = product_list; } } diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/ResearchProduct.java b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/ResearchProduct.java index 3b52fd6..d2da138 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/ResearchProduct.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/ResearchProduct.java @@ -23,11 +23,12 @@ public class ResearchProduct implements Serializable { private List contributions; private List manifestations; @JsonProperty("relevant_organizations") - private List relevant_organizations; - private List funding; + private List relevant_organizations; + private List funding; @JsonProperty("related_products") private List related_products; + public String getLocal_identifier() { return local_identifier; } @@ -92,19 +93,19 @@ public class ResearchProduct implements Serializable { this.manifestations = manifestations; } - public List getRelevant_organizations() { + public List getRelevant_organizations() { return relevant_organizations; } - public void setRelevant_organizations(List relevant_organizations) { + public void setRelevant_organizations(List relevant_organizations) { this.relevant_organizations = relevant_organizations; } - public List getFunding() { + public List getFunding() { return funding; } - public void setFunding(List funding) { + public void setFunding(List funding) { this.funding = funding; } diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/ResultTopic.java b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/ResultTopic.java index 2b9840c..d004da0 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/ResultTopic.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/ResultTopic.java @@ -8,14 +8,14 @@ import java.io.Serializable; * @Date 16/02/24 */ public class ResultTopic implements Serializable { - private String topic; + private MinTopic topic; private Provenance provenance; - public String getTopic() { + public MinTopic getTopic() { return topic; } - public void setTopic(String topic) { + public void setTopic(MinTopic topic) { this.topic = topic; } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpDatasource.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpDatasource.java index 6081b33..91442ec 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpDatasource.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpDatasource.java @@ -7,10 +7,16 @@ import java.io.Serializable; import java.util.*; import java.util.stream.Collectors; +import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EncloseMinElement; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.skgif.model.MinOrganization; +import eu.dnetlib.dhp.skgif.model.RelationType; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -22,6 +28,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.skgif.model.Identifier; import eu.dnetlib.dhp.skgif.model.Prefixes; +import scala.Tuple2; /** * @author miriam.baglioni @@ -64,60 +71,82 @@ public class DumpDatasource implements Serializable { spark -> { Utils.removeOutputDir(spark, outputPath + "Datasources"); - mapDatasource(spark, inputPath, outputPath); + mapDatasource(spark, inputPath, outputPath, workingDir); }); } - private static void mapDatasource(SparkSession spark, String inputPath, String outputPath) { - Utils - .readPath(spark, inputPath + "datasource", Datasource.class) - .filter( - (FilterFunction) d -> !d.getDataInfo().getInvisible() - && !d.getDataInfo().getDeletedbyinference()) - .map((MapFunction) d -> { - eu.dnetlib.dhp.skgif.model.Datasource datasource = new eu.dnetlib.dhp.skgif.model.Datasource(); - datasource.setLocal_identifier(Utils.getIdentifier(Prefixes.DATASOURCE, d.getId())); - datasource - .setIdentifiers( - d - .getPid() - .stream() - .map(p -> Identifier.newInstance(p.getQualifier().getClassid(), p.getValue())) - .collect(Collectors.toList())); + private static void mapDatasource(SparkSession spark, String inputPath, String outputPath, String workingDir) { + Dataset relation = Utils.readPath(spark, inputPath + "relation", Relation.class) + .filter((FilterFunction) r -> !r.getDataInfo().getDeletedbyinference()) + .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase(RelationType.DATASOURCE_PROVIDED_BY_ORGANIZATION.label)); - datasource.setName(d.getOfficialname().getValue()); - datasource.setSubmission_policy_url(d.getSubmissionpolicyurl()); - datasource - .setJurisdiction( - Optional - .ofNullable(d.getJurisdiction()) - .map(v -> v.getClassid()) - .orElse(new String())); - datasource.setPreservation_policy_url(d.getPreservationpolicyurl()); - datasource.setVersion_control(d.getVersioncontrol()); + Dataset eme = Utils.readPath(spark, workingDir + "minEntity", EncloseMinElement.class) + .filter((FilterFunction) e -> Optional.ofNullable(e.getMinOrganization()).isPresent()); + + Dataset datasourceDataset = Utils + .readPath(spark, inputPath + "datasource", Datasource.class) + .filter( + (FilterFunction) d -> !d.getDataInfo().getInvisible() + && !d.getDataInfo().getDeletedbyinference()); + Dataset> datasourceOrganization = relation.joinWith(eme, relation.col("target").equalTo(eme.col("enclosedEntityId"))) + .map((MapFunction, Tuple2>) t2 -> new Tuple2<>(t2._1().getSource(), t2._2()), Encoders.tuple(Encoders.STRING(), Encoders.bean(EncloseMinElement.class))); + + datasourceDataset.joinWith(datasourceOrganization, datasourceDataset.col("id").equalTo(datasourceOrganization.col("_1")), "left") + .groupByKey((MapFunction>, String>) t2 -> t2._1().getId(), Encoders.STRING() ) + .mapGroups((MapGroupsFunction>, eu.dnetlib.dhp.skgif.model.Datasource>) (k,vs) -> { + eu.dnetlib.dhp.skgif.model.Datasource datasource = new eu.dnetlib.dhp.skgif.model.Datasource(); + Tuple2> first = vs.next(); + Datasource d = first._1(); + datasource.setLocal_identifier(Utils.getIdentifier(Prefixes.DATASOURCE, d.getId())); + datasource + .setIdentifiers( + d + .getPid() + .stream() + .map(p -> Identifier.newInstance(p.getQualifier().getClassid(), p.getValue())) + .collect(Collectors.toList())); + + datasource.setName(d.getOfficialname().getValue()); + datasource.setSubmission_policy_url(d.getSubmissionpolicyurl()); + datasource + .setJurisdiction( + Optional + .ofNullable(d.getJurisdiction()) + .map(v -> v.getClassid()) + .orElse(new String())); + datasource.setPreservation_policy_url(d.getPreservationpolicyurl()); + datasource.setVersion_control(d.getVersioncontrol()); + + datasource + .setData_source_classification( + Optional + .ofNullable(d.getEoscdatasourcetype()) + .map(v -> v.getClassname()) + .orElse(new String())); + datasource.setResearch_product_type(getEoscProductType(d.getResearchentitytypes())); + datasource.setThematic(d.getThematic()); + datasource + .setResearch_product_access_policy( + Optional + .ofNullable(d.getDatabaseaccesstype()) + .map(v -> getResearchProductAccessPolicy(d.getDatabaseaccesstype().getValue())) + .orElse(new ArrayList<>())); + datasource + .setResearch_product_metadata_access_policy( + Optional + .ofNullable(d.getResearchproductmetadataaccesspolicies()) + .map(v -> getResearchProductAccessPolicy(d.getResearchproductmetadataaccesspolicies())) + .orElse(new ArrayList<>())); + if(Optional.ofNullable(first._2()).isPresent()){ + List organizations = new ArrayList<>(); + organizations.add(first._2()._2().getMinOrganization()); + vs.forEachRemaining(org -> organizations.add(org._2()._2().getMinOrganization())); + datasource.setOrganization(organizations); + } + return datasource; + + }, Encoders.bean( eu.dnetlib.dhp.skgif.model.Datasource.class)) - datasource - .setData_source_classification( - Optional - .ofNullable(d.getEoscdatasourcetype()) - .map(v -> v.getClassname()) - .orElse(new String())); - datasource.setResearch_product_type(getEoscProductType(d.getResearchentitytypes())); - datasource.setThematic(d.getThematic()); - datasource - .setResearch_product_access_policy( - Optional - .ofNullable(d.getDatabaseaccesstype()) - .map(v -> getResearchProductAccessPolicy(d.getDatabaseaccesstype().getValue())) - .orElse(new ArrayList<>())); - datasource - .setResearch_product_metadata_access_policy( - Optional - .ofNullable(d.getResearchproductmetadataaccesspolicies()) - .map(v -> getResearchProductAccessPolicy(d.getResearchproductmetadataaccesspolicies())) - .orElse(new ArrayList<>())); - return datasource; - }, Encoders.bean(eu.dnetlib.dhp.skgif.model.Datasource.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrant.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrant.java index cc1c684..8bc6d0b 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrant.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrant.java @@ -10,6 +10,8 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EncloseMinElement; +import eu.dnetlib.dhp.skgif.model.*; import org.apache.avro.generic.GenericData; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -29,10 +31,6 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.skgif.model.Grant; -import eu.dnetlib.dhp.skgif.model.Identifier; -import eu.dnetlib.dhp.skgif.model.Prefixes; -import eu.dnetlib.dhp.skgif.model.RelationType; import scala.Tuple2; /** @@ -76,11 +74,11 @@ public class DumpGrant implements Serializable { spark -> { Utils.removeOutputDir(spark, outputPath + "Grant"); - mapGrants(spark, inputPath, outputPath); + mapGrants(spark, inputPath, outputPath, workingDir); }); } - private static void mapGrants(SparkSession spark, String inputPath, String outputPath) { + private static void mapGrants(SparkSession spark, String inputPath, String outputPath, String workingDir) { Dataset projects = Utils .readPath(spark, inputPath + "project", Project.class) .filter( @@ -92,78 +90,84 @@ public class DumpGrant implements Serializable { (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && !r.getDataInfo().getInvisible() && r.getRelClass().equalsIgnoreCase(RelationType.ORGANIZATION_PARTICIPANT_IN_PROJECT.label)); + Dataset eme = Utils.readPath(spark, workingDir + "minEntity", EncloseMinElement.class) + .filter((FilterFunction) e -> Optional.ofNullable(e.getMinOrganization()).isPresent()); + + Dataset> partecipantOrganization = relations.joinWith(eme, relations.col("source").equalTo(eme.col("enclosedEntityId"))) + .map((MapFunction, Tuple2>) t2 -> new Tuple2<>(t2._1().getTarget(), t2._2()), Encoders.tuple(Encoders.STRING(), Encoders.bean(EncloseMinElement.class))); + projects - .joinWith(relations, projects.col("id").equalTo(relations.col("target")), "left") - .groupByKey((MapFunction, String>) t2 -> t2._1().getId(), Encoders.STRING()) - .mapGroups((MapGroupsFunction, Grant>) (k, v) -> { - Grant g = new Grant(); - Tuple2 first = v.next(); - g.setLocal_identifier(Utils.getIdentifier(Prefixes.GRANT, k)); - g.setIdentifiers(getProjectIdentifier(first._1())); - g.setTitle(first._1().getTitle().getValue()); - g - .setSummary( - Optional - .ofNullable(first._1().getSummary()) - .map(value -> value.getValue()) - .orElse(new String())); - g - .setAcronym( - Optional - .ofNullable(first._1().getAcronym()) - .map(value -> value.getValue()) - .orElse(new String())); - g.setFunder(getFunderName(first._1().getFundingtree().get(0).getValue())); - // * private String funding_stream;// fundingtree to be used the xpath //funding_level_[n] - g.setFunding_stream(getFundingStream(first._1().getFundingtree().get(0).getValue())); - g - .setCurrency( - Optional - .ofNullable(first._1().getCurrency()) - .map(value -> value.getValue()) - .orElse(new String())); - g - .setFunded_amount( - Optional - .ofNullable(first._1().getFundedamount()) - .orElse(null)); - g - .setKeywords( - first - ._1() - .getSubjects() - .stream() - .map(s -> s.getValue()) - .collect(Collectors.toList())); - g - .setStart_date( - Optional - .ofNullable(first._1().getStartdate()) - .map(value -> value.getValue()) - .orElse(new String())); - g - .setEnd_date( - Optional - .ofNullable(first._1().getEnddate()) - .map(value -> value.getValue()) - .orElse(new String())); - g - .setWebsite( - Optional - .ofNullable(first._1().getWebsiteurl()) - .map(value -> value.getValue()) - .orElse(new String())); - if (Optional.ofNullable(first._2()).isPresent()) { - List relevantOrganizatios = new ArrayList<>(); - relevantOrganizatios.add(Utils.getIdentifier(Prefixes.ORGANIZATION, first._2().getSource())); - v - .forEachRemaining( - t2 -> relevantOrganizatios - .add(Utils.getIdentifier(Prefixes.ORGANIZATION, t2._2().getSource()))); - g.setBeneficiaries(relevantOrganizatios); - } - return g; - }, Encoders.bean(Grant.class)) + .joinWith(partecipantOrganization, projects.col("id").equalTo(partecipantOrganization.col("_1")), "left") + .groupByKey((MapFunction>, String>) t2 -> t2._1().getId(), Encoders.STRING() ) + .mapGroups((MapGroupsFunction>, Grant>) (k,v) -> { + Grant g = new Grant(); + Tuple2> first = v.next(); + g.setLocal_identifier(Utils.getIdentifier(Prefixes.GRANT, k)); + g.setIdentifiers(getProjectIdentifier(first._1())); + g.setTitle(first._1().getTitle().getValue()); + g + .setSummary( + Optional + .ofNullable(first._1().getSummary()) + .map(value -> value.getValue()) + .orElse(new String())); + g + .setAcronym( + Optional + .ofNullable(first._1().getAcronym()) + .map(value -> value.getValue()) + .orElse(new String())); + g.setFunder(Utils.getFunderName(first._1().getFundingtree().get(0).getValue())); + // * private String funding_stream;// fundingtree to be used the xpath //funding_level_[n] + g.setFunding_stream(getFundingStream(first._1().getFundingtree().get(0).getValue())); + g + .setCurrency( + Optional + .ofNullable(first._1().getCurrency()) + .map(value -> value.getValue()) + .orElse(new String())); + g + .setFunded_amount( + Optional + .ofNullable(first._1().getFundedamount()) + .orElse(null)); + g + .setKeywords( + first + ._1() + .getSubjects() + .stream() + .map(s -> s.getValue()) + .collect(Collectors.toList())); + g + .setStart_date( + Optional + .ofNullable(first._1().getStartdate()) + .map(value -> value.getValue()) + .orElse(new String())); + g + .setEnd_date( + Optional + .ofNullable(first._1().getEnddate()) + .map(value -> value.getValue()) + .orElse(new String())); + g + .setWebsite( + Optional + .ofNullable(first._1().getWebsiteurl()) + .map(value -> value.getValue()) + .orElse(new String())); + if (Optional.ofNullable(first._2()).isPresent()) { + List relevantOrganizatios = new ArrayList<>(); + relevantOrganizatios.add(first._2()._2().getMinOrganization()); + v + .forEachRemaining( + t2 -> relevantOrganizatios + .add(t2._2()._2().getMinOrganization())); + g.setBeneficiaries(relevantOrganizatios); + } + return g; + }, Encoders.bean(Grant.class) ) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") @@ -181,15 +185,7 @@ public class DumpGrant implements Serializable { } - private static String getFunderName(String fundingtree) throws DocumentException { - final Document doc; - doc = new SAXReader().read(new StringReader(fundingtree)); - // f.setShortName(((org.dom4j.Node) (doc.selectNodes("//funder/shortname").get(0))).getText()); - return ((org.dom4j.Node) (doc.selectNodes("//funder/name").get(0))).getText(); - // f.setJurisdiction(((org.dom4j.Node) (doc.selectNodes("//funder/jurisdiction").get(0))).getText()); - - } private static List getProjectIdentifier(Project project) throws DocumentException { List identifiers = new ArrayList<>(); @@ -202,7 +198,7 @@ public class DumpGrant implements Serializable { .add( Identifier .newInstance( - getFunderName(project.getFundingtree().get(0).getValue()), project.getCode().getValue())); + Utils.getFunderName(project.getFundingtree().get(0).getValue()), project.getCode().getValue())); return identifiers; } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResult.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResult.java index 1e8d465..5914aea 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResult.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResult.java @@ -5,7 +5,9 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; import java.util.*; +import java.util.stream.Collectors; +import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.*; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -13,17 +15,16 @@ import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EmitPerManifestation; -import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.PartialResearchProduct; -import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.RelationPerProduct; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.skgif.model.*; import eu.dnetlib.dhp.skgif.model.AccessRight; import eu.dnetlib.dhp.utils.DHPUtils; @@ -79,6 +80,9 @@ public class DumpResult implements Serializable { public static void mapResult(SparkSession spark, String inputPath, String workingDir, String outputPath) { + //emit the snippet of the entities to be included in other entities for the dematerialization + // emitMinEntities(spark, inputPath, workingDir); + // selection of the relevant relations from result type to other entity. Only teh semantic relevant ones are // considered selectRelations(spark, inputPath, workingDir); @@ -91,10 +95,50 @@ public class DumpResult implements Serializable { } +// private static void emitMinEntities(SparkSession spark, String inputPath, String workingDir) { +// +// Utils.readPath(spark, inputPath + "organization", Organization.class) +// .filter((FilterFunction) o -> !o.getDataInfo().getDeletedbyinference()) +// .map((MapFunction) o -> { +// EncloseMinElement eme = new EncloseMinElement(); +// eme.setEnclosedEntityId(o.getId()); +// eme.setMinOrganization(Utils.getMinOrganization(o)); +// return eme; +// }, Encoders.bean(EncloseMinElement.class) ) +// .write() +// .mode(SaveMode.Overwrite) +// .option("compression","gzip") +// .json(workingDir + "encloseMinEntity"); +// +// Utils.readPath(spark, inputPath + "project", Project.class) +// .filter((FilterFunction) p -> !p.getDataInfo().getDeletedbyinference()) +// .map((MapFunction) p -> { +// EncloseMinElement eme = new EncloseMinElement(); +// eme.setEnclosedEntityId(p.getId()); +// eme.setMinGrant(Utils.getMinGrant(p)); +// return eme; +// }, Encoders.bean(EncloseMinElement.class)) +// .write() +// .mode(SaveMode.Append) +// .option("compression","gzip") +// .json(workingDir + "encloseMinEntity"); +// +// getMinProduct(spark, inputPath + "publication" , Publication.class) +// .union(getMinProduct(spark, inputPath + "dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class)) +// .union(getMinProduct(spark, inputPath + "software", Software.class)) +// .union(getMinProduct(spark, inputPath + "otherresearchproduct", OtherResearchProduct.class)) +// .write() +// .mode(SaveMode.Append) +// .option("compression","gzip") +// .json(workingDir + "encloseMinEntity"); +// +// +// } + private static void getRelationAndManifestation(SparkSession spark, String workingDir, String inputPath) { Dataset aggRelations = Utils .readPath(spark, workingDir + "aggrelation", RelationPerProduct.class); - aggRelations.count(); + ModelSupport.entityTypes .keySet() .stream() @@ -135,7 +179,8 @@ public class DumpResult implements Serializable { (MapFunction, PartialResearchProduct>) t2 -> { PartialResearchProduct prp = t2._1(); if (Optional.ofNullable(t2._2()).isPresent()) { - prp.setRelated_products(t2._2().getRelatedProduct()); + prp.setRelated_products(t2._2().getRelatedProduct().keySet() + .stream().map(key -> Relations.newInstance(key, t2._2().getRelatedProduct().get(key))).collect(Collectors.toList())); prp.setRelevant_organizations(t2._2().getOrganizations()); prp.setFunding(t2._2().getFunding()); } @@ -218,12 +263,12 @@ public class DumpResult implements Serializable { if (Optional.ofNullable(t2._2()).isPresent()) { manifestation.setBiblio(getBiblio(epm)); if (Optional.ofNullable(t2._2().getJournal().getIssnPrinted()).isPresent()) - manifestation.setVenue(Utils.getIdentifier(Prefixes.VENUE, t2._1().getJournal().getIssnPrinted())); + manifestation.setVenue(MinVenue.newInstance(Utils.getIdentifier(Prefixes.VENUE, t2._1().getJournal().getIssnPrinted()),t2._1().getJournal().getName())); else if (Optional.ofNullable(t2._2().getJournal().getIssnOnline()).isPresent()) - manifestation.setVenue(Utils.getIdentifier(Prefixes.VENUE, t2._1().getJournal().getIssnOnline())); + manifestation.setVenue(MinVenue.newInstance(Utils.getIdentifier(Prefixes.VENUE, t2._1().getJournal().getIssnOnline()),t2._1().getJournal().getName())); } manifestation - .setHosting_datasource(Utils.getIdentifier(Prefixes.DATASOURCE, epm.getInstance().getHostedby().getKey())); + .setHosting_datasource(MinVenue.newInstance(Utils.getIdentifier(Prefixes.DATASOURCE, epm.getInstance().getHostedby().getKey()), epm.getInstance().getHostedby().getValue())); return manifestation; } @@ -306,40 +351,56 @@ public class DumpResult implements Serializable { r.getRelClass().equalsIgnoreCase(RelationType.PART.label) || r.getRelClass().equalsIgnoreCase(RelationType.VERSION.label) || r.getRelClass().equalsIgnoreCase(RelationType.CITATION.label)); + Dataset encloseMinEntity = Utils.readPath(spark, workingDir + "minEntity", EncloseMinElement.class); - relation - .groupByKey((MapFunction) r -> r.getSource(), Encoders.STRING()) - .mapGroups((MapGroupsFunction) (k, v) -> { - RelationPerProduct rpp = new RelationPerProduct(); - rpp.setResultId(k); - Map> remainignRelations = new HashMap<>(); - while (v.hasNext()) { - Relation rel = v.next(); - String target = rel.getTarget(); - String relClass = rel.getRelClass(); - switch (rel.getRelClass().toLowerCase()) { - case "hasauthorinstitution": - rpp.getOrganizations().add(Utils.getIdentifier(Prefixes.ORGANIZATION, target)); - break; - case "isproducedby": - rpp.getFunding().add(Utils.getIdentifier(Prefixes.GRANT, target)); - break; - default: - if (!remainignRelations.keySet().contains(relClass)) - remainignRelations.put(relClass, new ArrayList<>()); - remainignRelations - .get(relClass) - .add(Utils.getIdentifier(Prefixes.RESEARCH_PRODUCT, target)); - } - } - for (String key : remainignRelations.keySet()) - rpp.getRelatedProduct().add(Relations.newInstance(key, remainignRelations.get(key))); - return rpp; - }, Encoders.bean(RelationPerProduct.class)) + relation.joinWith(encloseMinEntity, relation.col("target").equalTo(encloseMinEntity.col("enclosedEntityId"))) + .map((MapFunction, EncloseMinElement>) t2 -> + { + EncloseMinElement eme = t2._2(); + eme.setResultId(t2._1().getSource()); + eme.setSemantics(t2._1().getRelClass()); + return eme; + }, Encoders.bean(EncloseMinElement.class)) + .groupByKey((MapFunction) eme -> eme.getResultId(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k,v) -> + { + RelationPerProduct rpp = new RelationPerProduct(); + rpp.setResultId(k); + insertEnclosedElement(rpp,v.next()); + v.forEachRemaining(e -> insertEnclosedElement(rpp,e)); + return rpp; + }, Encoders.bean(RelationPerProduct.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(workingDir + "/aggrelation"); } + private static void insertEnclosedElement(RelationPerProduct rpp, EncloseMinElement element) { + if(Optional.ofNullable(element.getMinOrganization()).isPresent()) + rpp.getOrganizations().add(element.getMinOrganization()); + if(Optional.ofNullable(element.getMinGrant()).isPresent()) + rpp.getFunding().add(element.getMinGrant()); + if(Optional.ofNullable(element.getMinProduct()).isPresent()){ + String sem = element.getSemantics(); + if(!rpp.getRelatedProduct().containsKey(sem)) + rpp.getRelatedProduct().put(sem, new ArrayList<>()); + rpp.getRelatedProduct().get(sem).add(element.getMinProduct()); + } + + } + + + private static Dataset getMinProduct(SparkSession spark, String inputPath, Class clazz) { + return Utils.readPath(spark, inputPath , clazz) + .filter((FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && + !r.getDataInfo().getInvisible()) + .map((MapFunction) r -> { + EncloseMinElement eme = new EncloseMinElement(); + eme.setEnclosedEntityId(r.getId()); + eme.setMinProduct(Utils.getMinProduct(r)); + return eme; + }, Encoders.bean(EncloseMinElement.class)); + } + } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromResults.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromEntities.java similarity index 68% rename from dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromResults.java rename to dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromEntities.java index 3bd4624..b7b1691 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromResults.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromEntities.java @@ -7,6 +7,8 @@ import java.io.Serializable; import java.util.*; import java.util.stream.Collectors; +import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EncloseMinElement; +import eu.dnetlib.dhp.schema.oaf.Organization; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -27,21 +29,20 @@ import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.skgif.model.*; -import eu.dnetlib.dhp.utils.DHPUtils; import scala.Tuple2; /** * @author miriam.baglioni * @Date 06/02/24 */ -public class EmitFromResults implements Serializable { +public class EmitFromEntities implements Serializable { - private static final Logger log = LoggerFactory.getLogger(EmitFromResults.class); + private static final Logger log = LoggerFactory.getLogger(EmitFromEntities.class); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( - EmitFromResults.class + EmitFromEntities.class .getResourceAsStream( "/eu/dnetlib/dhp/oa/graph/dump/emit_biblio_parameters.json")); @@ -71,18 +72,108 @@ public class EmitFromResults implements Serializable { spark -> { Utils.removeOutputDir(spark, outputPath); emitFromResult(spark, inputPath, outputPath, workingDir); + emitFromDatasource(spark, inputPath, workingDir); + emitFromOrganization(spark, inputPath, workingDir); + emitFromProject(spark, inputPath, workingDir); + }); } -//per ogni result emetto id + journal se esiste + istanza + hosted by dell'istanza + private static void emitFromProject(SparkSession spark, String inputPath, String workingDir) { + Utils.readPath(spark, inputPath + "project" , Project.class) + .filter((FilterFunction) p -> !p.getDataInfo().getDeletedbyinference()) + .map((MapFunction) p->{ + EncloseMinElement eme = new EncloseMinElement(); + eme.setEnclosedEntityId(p.getId()); + eme.setMinGrant(Utils.getMinGrant(p)); + return eme;}, Encoders.bean(EncloseMinElement.class) ) + .write() + .mode(SaveMode.Append) + .option("compression","gzip") + .json(workingDir + "/minEntity"); + + } + + private static void emitFromOrganization(SparkSession spark, String inputPath, String workingDir) { + Utils.readPath(spark, inputPath + "organization", Organization.class) + .filter((FilterFunction) o -> !o.getDataInfo().getDeletedbyinference()) + .map((MapFunction) o -> { + EncloseMinElement eme = new EncloseMinElement(); + eme.setMinOrganization(Utils.getMinOrganization(o)); + eme.setEnclosedEntityId(o.getId()); + return eme;}, + Encoders.bean(EncloseMinElement.class)) + .write() + .mode(SaveMode.Append) + .option("compression","gzip") + .json(workingDir + "/minEntity"); + } + + private static void emitFromDatasource(SparkSession spark, String inputPath, String workingDir) { + Utils.readPath(spark, inputPath + "datasource", Datasource.class) + .filter((FilterFunction) d -> !d.getDataInfo().getDeletedbyinference()) + .map((MapFunction) d -> { + EncloseMinElement eme = new EncloseMinElement(); + eme.setMinDatsource(MinVenue.newInstance(Utils.getIdentifier(Prefixes.DATASOURCE, d.getId()), d.getOfficialname().getValue())); + eme.setEnclosedEntityId(d.getId()); + return eme; + } + , Encoders.bean(EncloseMinElement.class)) + .write() + .mode(SaveMode.Append) + .option("compression","gzip") + .json(workingDir + "/minEntity"); + + Utils.readPath(spark, inputPath + "datasource", Datasource.class) + .filter((FilterFunction) d -> !d.getDataInfo().getDeletedbyinference()) + .filter((FilterFunction) d-> d.getEoscdatasourcetype().getClassid().equalsIgnoreCase("Journal archive")) + .map((MapFunction) d-> { + EncloseMinElement eme = new EncloseMinElement(); + eme.setEnclosedEntityId(d.getId()); + if(Optional.ofNullable(d.getJournal().getIssnPrinted()).isPresent()) + eme.setMinVenue( MinVenue.newInstance(Utils.getIdentifier(Prefixes.VENUE, d.getJournal().getIssnPrinted()), d.getOfficialname().getValue())); + if(Optional.ofNullable(d.getJournal().getIssnOnline()).isPresent()) + eme.setMinVenue( MinVenue.newInstance(Utils.getIdentifier(Prefixes.VENUE, d.getJournal().getIssnOnline()), d.getOfficialname().getValue())); + return null; + },Encoders.bean(EncloseMinElement.class) ) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Append) + .option("compression","gzip") + .json(workingDir + "/minEntity"); + + } + + //per ogni result emetto id + journal se esiste + istanza + hosted by dell'istanza public static void emitFromResult(SparkSession spark, String inputPath, String outputPath, String workingDir) { emitManifestation(spark, inputPath, workingDir); emitPerson(spark, inputPath, outputPath, workingDir); emitTopic(spark, inputPath, outputPath, workingDir); + emitMinProduct(spark, inputPath, workingDir); } + private static void emitMinProduct(SparkSession spark, String inputPath, String workingDir) { + Utils.removeOutputDir(spark, workingDir + "minEntity"); + ModelSupport.entityTypes.keySet().forEach(e -> { + if (ModelSupport.isResult(e)) { + Class resultClazz = ModelSupport.entityTypes.get(e); + + Utils + .readPath(spark, inputPath + e.name(), resultClazz) + .map((MapFunction) p -> Utils.getMinProduct(p), Encoders.bean(MinProduct.class)) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(workingDir + "/minEntity"); + + + } + + }); + } + private static void emitTopic(SparkSession spark, String inputPath, String outputPath, String workingDir) { ModelSupport.entityTypes.keySet().forEach(e -> { @@ -213,16 +304,10 @@ public class EmitFromResults implements Serializable { } private static void emitManifestation(SparkSession spark, String inputPath, String workingDir) { - Dataset datasource = Utils - .readPath(spark, inputPath + "datasource", Datasource.class) - .filter( - (FilterFunction) d -> Optional.ofNullable(d.getEosctype()).isPresent() && - d.getEosctype().getClassname().equalsIgnoreCase("Journal archive")); - ModelSupport.entityTypes.keySet().forEach(e -> { if (ModelSupport.isResult(e)) { Class resultClazz = ModelSupport.entityTypes.get(e); -// Dataset emitformanifestation = + Utils .readPath(spark, inputPath + e.name(), resultClazz) .flatMap((FlatMapFunction) p -> p.getInstance().stream().map(i -> { @@ -245,7 +330,7 @@ public class EmitFromResults implements Serializable { .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(workingDir + e.name() + "/manifestation"); - ; + } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/ResultMapper.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/ResultMapper.java index 3062779..4e6ac81 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/ResultMapper.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/ResultMapper.java @@ -54,14 +54,20 @@ public class ResultMapper implements Serializable { count += 1; Contribution contribution = new Contribution(); Tuple2 orcid = Utils.getOrcid(a.getPid()); + MinPerson minPerson = new MinPerson(); + minPerson.setFull_name(a.getFullname()); if (orcid != null) { - contribution.setPerson(Utils.getIdentifier(Prefixes.PERSON, orcid._1() + orcid._2())); + minPerson.setLocal_identifier(Utils.getIdentifier(Prefixes.PERSON, orcid._1() + orcid._2())); + minPerson.setOrcid(orcid._1()); + contribution.setPerson(minPerson); } else { if (Optional.ofNullable(a.getRank()).isPresent()) { + minPerson.setLocal_identifier(Utils.getIdentifier(Prefixes.TEMPORARY_PERSON, input.getId() + a.getRank())); contribution - .setPerson(Utils.getIdentifier(Prefixes.TEMPORARY_PERSON, input.getId() + a.getRank())); + .setPerson(minPerson); } else { - contribution.setPerson(Utils.getIdentifier(Prefixes.TEMPORARY_PERSON, input.getId() + count)); + minPerson.setLocal_identifier(Utils.getIdentifier(Prefixes.TEMPORARY_PERSON, input.getId() + count)); + contribution.setPerson(minPerson); } } @@ -88,9 +94,12 @@ public class ResultMapper implements Serializable { s.getQualifier().getClassid().equalsIgnoreCase("sdg")) .map(s -> { ResultTopic topic = new ResultTopic(); + MinTopic minTopic = new MinTopic(); + minTopic.setLocal_identifier(Utils.getIdentifier(Prefixes.TOPIC, s.getQualifier().getClassid() + s.getValue())); + minTopic.setValue(s.getValue()); topic - .setTopic( - Utils.getIdentifier(Prefixes.TOPIC, s.getQualifier().getClassid() + s.getValue())); + .setTopic(minTopic + ); if (Optional.ofNullable(s.getDataInfo()).isPresent()) { Provenance provenance = new Provenance(); provenance.setTrust(Double.valueOf(s.getDataInfo().getTrust())); diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/Utils.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/Utils.java index d578a2c..05892ea 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/Utils.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/Utils.java @@ -2,9 +2,16 @@ package eu.dnetlib.dhp.oa.graph.dump.skgif; import java.io.Serializable; +import java.io.StringReader; import java.util.List; import java.util.Optional; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.skgif.model.MinGrant; +import eu.dnetlib.dhp.skgif.model.MinOrganization; +import eu.dnetlib.dhp.skgif.model.MinProduct; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -17,6 +24,9 @@ import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.skgif.model.Prefixes; import eu.dnetlib.dhp.utils.DHPUtils; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.io.SAXReader; import scala.Tuple2; /** @@ -63,4 +73,64 @@ public class Utils implements Serializable { return entity.label + DHPUtils.md5(id); } + + public static String getFunderName(String fundingtree) throws DocumentException { + final Document doc; + + doc = new SAXReader().read(new StringReader(fundingtree)); + // f.setShortName(((org.dom4j.Node) (doc.selectNodes("//funder/shortname").get(0))).getText()); + return ((org.dom4j.Node) (doc.selectNodes("//funder/name").get(0))).getText(); + // f.setJurisdiction(((org.dom4j.Node) (doc.selectNodes("//funder/jurisdiction").get(0))).getText()); + + } + + public static MinOrganization getMinOrganization(Organization o) { + MinOrganization mo = new MinOrganization(); + mo.setLocal_identifier(Utils.getIdentifier(Prefixes.ORGANIZATION, o.getId())); + mo.setName(o.getLegalname().getValue()); + for(StructuredProperty pid : o.getPid()){ + switch (pid.getQualifier().getClassid().toLowerCase()){ + case "ror": + mo.setRor(pid.getValue()); + break; + case "isni": + mo.setIsni(pid.getValue()); + break; + + } + } + return mo; + } + + public static MinGrant getMinGrant(Project p) throws DocumentException { + MinGrant mg = new MinGrant(); + mg.setLocal_identifier(Utils.getIdentifier(Prefixes.GRANT, p.getId())); + mg.setCode(p.getCode().getValue()); + mg.setFunder(getFunderName(p.getFundingtree().get(0).getValue())); + return mg; + } + + public static MinProduct getMinProduct(R r) { + MinProduct mp = new MinProduct(); + mp.setLocal_identifier(Utils.getIdentifier(Prefixes.RESEARCH_PRODUCT, r.getId())); + for (StructuredProperty title : r.getTitle()) { + if (title.getQualifier().getClassid().equalsIgnoreCase("main title")) { + mp.setTitle(title.getValue()); + } + } + for (StructuredProperty pid : r.getPid()) { + switch (pid.getQualifier().getClassid().toLowerCase()) { + case "doi": + mp.setDoi(pid.getValue()); + break; + case "pmcid": + mp.setPmcid(pid.getValue()); + break; + case "arxiv": + mp.setArxivid(pid.getValue()); + break; + } + } + return mp; + } } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/Couple.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/Couple.java new file mode 100644 index 0000000..a81982a --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/Couple.java @@ -0,0 +1,35 @@ +package eu.dnetlib.dhp.oa.graph.dump.skgif.beans; + +import java.io.Serializable; + +/** + * @author miriam.baglioni + * @Date 04/03/24 + */ +public class Couple implements Serializable { + private String originalIdentifier; + private String localIdentifier; + + public String getOriginalIdentifier() { + return originalIdentifier; + } + + public void setOriginalIdentifier(String originalIdentifier) { + this.originalIdentifier = originalIdentifier; + } + + public String getLocalIdentifier() { + return localIdentifier; + } + + public void setLocalIdentifier(String localIdentifier) { + this.localIdentifier = localIdentifier; + } + + public static Couple newInstance(String originalIdentifier, String localIdentifier){ + Couple couple = new Couple(); + couple.originalIdentifier = originalIdentifier; + couple.localIdentifier = localIdentifier; + return couple; + } +} diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/EncloseMinElement.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/EncloseMinElement.java new file mode 100644 index 0000000..5fc7d93 --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/EncloseMinElement.java @@ -0,0 +1,87 @@ +package eu.dnetlib.dhp.oa.graph.dump.skgif.beans; + +import eu.dnetlib.dhp.skgif.model.MinGrant; +import eu.dnetlib.dhp.skgif.model.MinOrganization; +import eu.dnetlib.dhp.skgif.model.MinProduct; +import eu.dnetlib.dhp.skgif.model.MinVenue; + +import java.io.Serializable; + +/** + * @author miriam.baglioni + * @Date 04/03/24 + */ +public class EncloseMinElement implements Serializable { + private String resultId; + private String enclosedEntityId; + private MinOrganization minOrganization; + private MinVenue minVenue; + private MinVenue minDatsource; + private MinGrant minGrant; + private MinProduct minProduct; + private String semantics; + + public MinVenue getMinVenue() { + return minVenue; + } + + public void setMinVenue(MinVenue minVenue) { + this.minVenue = minVenue; + } + + public MinVenue getMinDatsource() { + return minDatsource; + } + + public void setMinDatsource(MinVenue minDatsource) { + this.minDatsource = minDatsource; + } + + public String getSemantics() { + return semantics; + } + + public void setSemantics(String semantics) { + this.semantics = semantics; + } + + public String getResultId() { + return resultId; + } + + public void setResultId(String resultId) { + this.resultId = resultId; + } + + public String getEnclosedEntityId() { + return enclosedEntityId; + } + + public void setEnclosedEntityId(String enclosedEntityId) { + this.enclosedEntityId = enclosedEntityId; + } + + public MinOrganization getMinOrganization() { + return minOrganization; + } + + public void setMinOrganization(MinOrganization minOrganization) { + this.minOrganization = minOrganization; + } + + public MinGrant getMinGrant() { + return minGrant; + } + + public void setMinGrant(MinGrant minGrant) { + this.minGrant = minGrant; + } + + public MinProduct getMinProduct() { + return minProduct; + } + + public void setMinProduct(MinProduct minProduct) { + this.minProduct = minProduct; + } +} diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/RelationPerProduct.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/RelationPerProduct.java index e164f54..9b239a5 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/RelationPerProduct.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/RelationPerProduct.java @@ -3,9 +3,13 @@ package eu.dnetlib.dhp.oa.graph.dump.skgif.beans; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; -import eu.dnetlib.dhp.skgif.model.Relations; +import eu.dnetlib.dhp.skgif.model.MinGrant; +import eu.dnetlib.dhp.skgif.model.MinOrganization; +import eu.dnetlib.dhp.skgif.model.MinProduct; /** * @author miriam.baglioni @@ -14,14 +18,14 @@ import eu.dnetlib.dhp.skgif.model.Relations; public class RelationPerProduct implements Serializable { private String resultId; - private List organizations; - private List funding; - private List relatedProduct; + private List organizations; + private List funding; + private Map> relatedProduct; public RelationPerProduct() { organizations = new ArrayList<>(); funding = new ArrayList<>(); - relatedProduct = new ArrayList<>(); + relatedProduct = new HashMap<>(); } public String getResultId() { @@ -32,27 +36,27 @@ public class RelationPerProduct implements Serializable { this.resultId = resultId; } - public List getOrganizations() { + public List getOrganizations() { return organizations; } - public void setOrganizations(List organizations) { + public void setOrganizations(List organizations) { this.organizations = organizations; } - public List getFunding() { + public List getFunding() { return funding; } - public void setFunding(List funding) { + public void setFunding(List funding) { this.funding = funding; } - public List getRelatedProduct() { + public Map> getRelatedProduct() { return relatedProduct; } - public void setRelatedProduct(List relatedProduct) { + public void setRelatedProduct(Map> relatedProduct) { this.relatedProduct = relatedProduct; } } diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml index de19f37..705b8cc 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml @@ -71,7 +71,7 @@ yarn cluster Extraction - eu.dnetlib.dhp.oa.graph.dump.skgif.EmitFromResults + eu.dnetlib.dhp.oa.graph.dump.skgif.EmitFromEntities dump-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} diff --git a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResultTest.java b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResultTest.java index 473d128..45c371a 100644 --- a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResultTest.java +++ b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResultTest.java @@ -267,14 +267,15 @@ public class DumpResultTest { .anyMatch( t -> t .getTopic() + .getValue() .equalsIgnoreCase(Prefixes.TOPIC.label + DHPUtils.md5("FOSSustained delivery")))); // check contributions Assertions.assertEquals(4, rp.getContributions().size()); Assertions - .assertEquals(3, rp.getContributions().stream().filter(c -> c.getPerson().startsWith("person")).count()); + .assertEquals(3, rp.getContributions().stream().filter(c -> c.getPerson().getLocal_identifier().startsWith("person")).count()); Assertions - .assertEquals(1, rp.getContributions().stream().filter(c -> c.getPerson().startsWith("temp")).count()); + .assertEquals(1, rp.getContributions().stream().filter(c -> c.getPerson().getLocal_identifier().startsWith("temp")).count()); rp.getContributions().forEach(c -> Assertions.assertTrue(c.getDeclared_affiliation() == null)); Assertions .assertEquals( diff --git a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromResultJobTest.java b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromResultJobTest.java index 1ac3cfc..7519d84 100644 --- a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromResultJobTest.java +++ b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromResultJobTest.java @@ -72,7 +72,7 @@ public class EmitFromResultJobTest { .getResource("/eu/dnetlib/dhp/oa/graph/dump/skgif/graph/") .getPath(); - EmitFromResults + EmitFromEntities .main( new String[] { "-isSparkSessionManaged", Boolean.FALSE.toString(), @@ -171,7 +171,7 @@ public class EmitFromResultJobTest { .getResource("/eu/dnetlib/dhp/oa/graph/dump/skgif/graph_complete_entities/") .getPath(); - EmitFromResults + EmitFromEntities .main( new String[] { "-isSparkSessionManaged", Boolean.FALSE.toString(),