diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/RelationType.java b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/RelationType.java index 1376284..5d8133f 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/RelationType.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/skgif/model/RelationType.java @@ -10,9 +10,10 @@ import java.io.Serializable; public enum RelationType implements Serializable { RESULT_OUTCOME_FUNDING("isProducedBy"), RESULT_AFFILIATIED_TO_ORGANIZATION( "hasAuthorInstitution"), DATASOURCE_PROVIDED_BY_ORGANIZATION( - "isProvidedBy"), ORGANIZATION_PARTICIPANT_IN_PROJECT("isParticipant"), SUPPLEMENT( + "isProvidedBy"), PROJECT_HAS_PARTICIPANT_ORGANIZATION("hasParticipant"), SUPPLEMENT( "IsSupplementedBy"), DOCUMENTS( - "IsDocumentedBy"), PART("IsPartOf"), VERSION("IsNewVersionOf"), CITATION("Cites"); + "IsDocumentedBy"), PART("IsPartOf"), VERSION( + "IsNewVersionOf"), CITATION("Cites"), ORGANIZATION_PARTICIPANT_IN_PROJECT("isParticipant"); public final String label; diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/Identifiers.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/Identifiers.java deleted file mode 100644 index 9633479..0000000 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/Identifiers.java +++ /dev/null @@ -1,30 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.dump.filterentities; - -import java.io.Serializable; - -/** - * @author miriam.baglioni - * @Date 13/03/24 - */ -public class Identifiers implements Serializable { - - private String id; - private String CCL; - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public String getCCL() { - return CCL; - } - - public void setCCL(String CCL) { - this.CCL = CCL; - } -} diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectConnectedEntities.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectConnectedEntities.java index efc6cf2..1da486a 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectConnectedEntities.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectConnectedEntities.java @@ -18,6 +18,7 @@ import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,14 +71,19 @@ public class SelectConnectedEntities implements Serializable { isSparkSessionManaged, spark -> { - selectConnectedEntities(spark, inputPath, filterPath, workingDir); + selectConnectedEntities2(spark, inputPath, filterPath, workingDir); }); } - private static void selectConnectedEntities(SparkSession spark, String inputPath, - String filterPath, - String workingDir) throws JsonProcessingException { + private static void selectConnectedEntities2(SparkSession spark, String inputPath, String filterPath, + String workingDir) { + final StructType rp = new StructType() + .add( + "dataInfo", new StructType() + .add("deletedbyinference", DataTypes.BooleanType)) + .add("source", DataTypes.StringType) + .add("target", DataTypes.StringType); Dataset resultIds = spark.emptyDataset(Encoders.STRING()); for (EntityType entity : ModelSupport.entityTypes.keySet()) @@ -90,12 +96,29 @@ public class SelectConnectedEntities implements Serializable { .select("id") .as(Encoders.STRING())); - Dataset relation = Utils - .readPath(spark, inputPath + "relation", Relation.class) - .filter((FilterFunction) r -> !r.getDataInfo().getDeletedbyinference()); - Dataset organizations = Utils - .readPath(spark, inputPath + "organization", Organization.class) - .filter((FilterFunction) o -> !o.getDataInfo().getDeletedbyinference()); + Dataset relation = spark + .read() + .schema(rp) + .json(inputPath + "relation") + .filter(("datainfo.deletedbyinference != true")) + .drop("datainfo"); + + Dataset matchingRels = relation + .join( + resultIds, relation + .col("source") + .equalTo(resultIds.col("value")), + "leftsemi") + .select("target") + .distinct() + ; + + Dataset organization = spark + .read() + .schema(Encoders.bean(Organization.class).schema()) + .json(inputPath + "organization") + .filter("datainfo.deletedbyinference != true "); + Dataset projects = Utils .readPath(spark, inputPath + "project", Project.class) .filter((FilterFunction) p -> !p.getDataInfo().getDeletedbyinference()) @@ -106,53 +129,187 @@ public class SelectConnectedEntities implements Serializable { .getFunderName(p.getFundingtree().get(0).getValue()) .equalsIgnoreCase("European Commission")); - Dataset datasources = Utils - .readPath(spark, inputPath + "datasource", Datasource.class) - .filter((FilterFunction) d -> !d.getDataInfo().getDeletedbyinference()); - // select relations having source in the set of identifiers selected for eosc - Dataset relationSource = relation - .join(resultIds, resultIds.col("value").equalTo(relation.col("source")), "left_semi"); - relationSource - .join(resultIds, resultIds.col("value").equalTo(relation.col("target")), "left_semi") - .write() - .option("compression", "gzip") - .mode(SaveMode.Overwrite) - .json(workingDir + "resultrelation"); -// -// // write relations between results and organizations - relationSource - .joinWith(organizations, relation.col("target").equalTo(organizations.col("id")), "left_semi") - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingDir + "organizaitonrelation"); - - relationSource - .joinWith(projects, relation.col("target").equalTo(projects.col("id")), "left_semi") - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingDir + "projectrelation"); - - // write organizations linked to results in the set - - organizations - .join(relationSource, relationSource.col("target").equalTo(organizations.col("id")), "left_semi") + organization + .join(matchingRels, organization.col("id").equalTo(matchingRels.col("target")), "leftsemi") .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(workingDir + "organization"); - // write projects linked to results in the set + projects - .join(relationSource, relationSource.col("target").equalTo(projects.col("id"))) + .join(matchingRels, projects.col("id").equalTo(matchingRels.col("target")), "leftsemi") .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(workingDir + "project"); + .json(workingDir + "/project"); - // read the results and select all the distinct instance.hostedbykey + Dataset datasources = spark + .read() + .schema(Encoders.bean(Datasource.class).schema()) + .json(inputPath + "datasource") + .filter("datainfo.deletedbyinference != true"); + final Dataset datasourceReferencedIds = getDatasourceReferenceIdDataset(spark, workingDir); + datasources + .join( + datasourceReferencedIds, datasourceReferencedIds.col("value").equalTo(datasources.col("id")), + "left_semi") + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir + "datasource"); + + } + +// private static void selectConnectedEntities(SparkSession spark, String inputPath, +// String filterPath, +// String workingDir) throws JsonProcessingException { +// +// Dataset resultIds = spark.emptyDataset(Encoders.STRING()); +// for (EntityType entity : ModelSupport.entityTypes.keySet()) +// if (ModelSupport.isResult(entity)) +// resultIds = resultIds +// .union( +// spark +// .read() +// .parquet(filterPath + entity.name() + "_ids") +// .select("id") +// .as(Encoders.STRING())); +// +// Dataset relation = Utils +// .readPath(spark, inputPath + "relation", Relation.class) +// .filter((FilterFunction) r -> !r.getDataInfo().getDeletedbyinference()); +// Dataset organizations = Utils +// .readPath(spark, inputPath + "organization", Organization.class) +// .filter((FilterFunction) o -> !o.getDataInfo().getDeletedbyinference()); +// Dataset projects = Utils +// .readPath(spark, inputPath + "project", Project.class) +// .filter((FilterFunction) p -> !p.getDataInfo().getDeletedbyinference()) +// .filter( +// (FilterFunction) p -> Optional.ofNullable(p.getFundingtree()).isPresent() && +// p.getFundingtree().size() > 0 && +// Utils +// .getFunderName(p.getFundingtree().get(0).getValue()) +// .equalsIgnoreCase("European Commission")); +// +// Dataset datasources = Utils +// .readPath(spark, inputPath + "datasource", Datasource.class) +// .filter((FilterFunction) d -> !d.getDataInfo().getDeletedbyinference()); +// +// // select relations having source in the set of identifiers selected for eosc +// Dataset relationSource = relation +// .join(resultIds, resultIds.col("value").equalTo(relation.col("source")), "left_semi"); +// relationSource +// .join(resultIds, resultIds.col("value").equalTo(relation.col("target")), "left_semi") +// .write() +// .option("compression", "gzip") +// .mode(SaveMode.Overwrite) +// .json(workingDir + "resultrelation"); +//// +//// // write relations between results and organizations +// relationSource +// .joinWith(organizations, relation.col("target").equalTo(organizations.col("id")), "left_semi") +// .write() +// .mode(SaveMode.Overwrite) +// .option("compression", "gzip") +// .json(workingDir + "organizaitonrelation"); +// +// relationSource +// .joinWith(projects, relation.col("target").equalTo(projects.col("id")), "left_semi") +// .write() +// .mode(SaveMode.Overwrite) +// .option("compression", "gzip") +// .json(workingDir + "projectrelation"); +// +// // write organizations linked to results in the set +// +// organizations +// .join(relationSource, relationSource.col("target").equalTo(organizations.col("id")), "left_semi") +// .write() +// .mode(SaveMode.Overwrite) +// .option("compression", "gzip") +// .json(workingDir + "organization"); +// +// // write projects linked to results in the set +// projects +// .join(relationSource, relationSource.col("target").equalTo(projects.col("id"))) +// .write() +// .mode(SaveMode.Overwrite) +// .option("compression", "gzip") +// .json(workingDir + "project"); +// +// // read the results and select all the distinct instance.hostedbykey +// final Dataset datasourceReferencedIds = getDatasourceReferenceIdDataset(spark, workingDir); +// // join with the datasources and write the datasource in the join +// datasources +// .joinWith( +// datasourceReferencedIds, datasourceReferencedIds.col("value").equalTo(datasources.col("id")), +// "left_semi") +// .write() +// .mode(SaveMode.Overwrite) +// .option("compression", "gzip") +// .json(workingDir + "datasource"); +// +// // selecting relations between organizations and projects in the selected set +// StructType tp = StructType.fromDDL("`id` STRING"); +// Dataset organizationSbs = spark +// .read() +// .schema(tp) +// .json(workingDir + "organization") +// .select("id"); +// +// Dataset projectSbs = spark +// .read() +// .schema(tp) +// .json(workingDir + "project") +// .select("id"); +//// +// Dataset tmpRel; +// tmpRel = relation +// .join( +// organizationSbs, organizationSbs +// .col("id") +// .equalTo(relation.col("source")), +// "left_semi"); +// tmpRel +// .join(projectSbs, tmpRel.col("target").equalTo(projectSbs.col("id")), "left_semi") +// .write() +// .mode(SaveMode.Overwrite) +// .option("compression", "gzip") +// .json(workingDir + "orgprojelation"); +// +// // selecting relations between datasources and organizations in the selected set +// Dataset datasourceSbs = spark +// .read() +// .schema(tp) +// .json(workingDir + "datasource") +// .select("id"); +// +// tmpRel = relation +// .join(datasourceSbs, datasourceSbs.col("id").equalTo(relation.col("source")), "left_semi"); +// tmpRel +// .join(organizationSbs, tmpRel.col("target").equalTo(organizationSbs.col("id")), "left_semi") +// .write() +// .mode(SaveMode.Overwrite) +// .option("compression", "gzip") +// .json(workingDir + "datsorgrelation"); +// +// Utils +// .readPath(spark, workingDir + "resultrelation", Relation.class) +// .union(Utils.readPath(spark, workingDir + "organizaitonrelation", Relation.class)) +// .union(Utils.readPath(spark, workingDir + "projectrelation", Relation.class)) +// .union(Utils.readPath(spark, workingDir + "orgprojelation", Relation.class)) +// .union(Utils.readPath(spark, workingDir + "datsorgrelation", Relation.class)) +// .write() +// .mode(SaveMode.Overwrite) +// .option("compression", "gzip") +// .json(workingDir + "relation"); +// +// } + + private static Dataset getDatasourceReferenceIdDataset(SparkSession spark, + String workingDir) { Dataset datasourceReferencedIds = spark.emptyDataset(Encoders.STRING()); for (EntityType entity : ModelSupport.entityTypes.keySet()) @@ -186,70 +343,6 @@ public class SelectConnectedEntities implements Serializable { Encoders.STRING())); } datasourceReferencedIds = datasourceReferencedIds.distinct(); - // join with the datasources and write the datasource in the join - datasources - .joinWith( - datasourceReferencedIds, datasourceReferencedIds.col("value").equalTo(datasources.col("id")), - "left_semi") - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingDir + "datasource"); - - // selecting relations between organizations and projects in the selected set - StructType tp = StructType.fromDDL("`id` STRING"); - Dataset organizationSbs = spark - .read() - .schema(tp) - .json(workingDir + "organization") - .select("id"); - - Dataset projectSbs = spark - .read() - .schema(tp) - .json(workingDir + "project") - .select("id"); -// - Dataset tmpRel; - tmpRel = relation - .join( - organizationSbs, organizationSbs - .col("id") - .equalTo(relation.col("source")), - "left_semi"); - tmpRel - .join(projectSbs, tmpRel.col("target").equalTo(projectSbs.col("id")), "left_semi") - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingDir + "orgprojelation"); - - // selecting relations between datasources and organizations in the selected set - Dataset datasourceSbs = spark - .read() - .schema(tp) - .json(workingDir + "datasource") - .select("id"); - - tmpRel = relation - .join(datasourceSbs, datasourceSbs.col("id").equalTo(relation.col("source")), "left_semi"); - tmpRel - .join(organizationSbs, tmpRel.col("target").equalTo(organizationSbs.col("id")), "left_semi") - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingDir + "datsorgrelation"); - - Utils - .readPath(spark, workingDir + "resultrelation", Relation.class) - .union(Utils.readPath(spark, workingDir + "organizaitonrelation", Relation.class)) - .union(Utils.readPath(spark, workingDir + "projectrelation", Relation.class)) - .union(Utils.readPath(spark, workingDir + "orgprojelation", Relation.class)) - .union(Utils.readPath(spark, workingDir + "datsorgrelation", Relation.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingDir + "relation"); - + return datasourceReferencedIds; } } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpDatasource.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpDatasource.java index 3e3301d..e9ad7aa 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpDatasource.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpDatasource.java @@ -21,6 +21,7 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EncloseMinElement; +import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.ExtendingOrganization; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.schema.oaf.Relation; @@ -76,92 +77,24 @@ public class DumpDatasource implements Serializable { } private static void mapDatasource(SparkSession spark, String inputPath, String outputPath, String workingDir) { - Dataset relation = Utils - .readPath(spark, inputPath + "relation", Relation.class) - .filter((FilterFunction) r -> !r.getDataInfo().getDeletedbyinference()) - .filter( - (FilterFunction) r -> r - .getRelClass() - .equalsIgnoreCase(RelationType.DATASOURCE_PROVIDED_BY_ORGANIZATION.label)); - - Dataset eme = Utils - .readPath(spark, workingDir + "minEntity", EncloseMinElement.class) - .filter((FilterFunction) e -> Optional.ofNullable(e.getMinOrganization()).isPresent()); - + Dataset organizations = Utils + .readPath(spark, workingDir + "/relations/datasource_providing_organization", ExtendingOrganization.class); Dataset datasourceDataset = Utils .readPath(spark, inputPath + "datasource", Datasource.class) .filter( (FilterFunction) d -> !d.getDataInfo().getInvisible() && !d.getDataInfo().getDeletedbyinference()); - Dataset> datasourceOrganization = relation - .joinWith(eme, relation.col("target").equalTo(eme.col("enclosedEntityId"))) - .map( - (MapFunction, Tuple2>) t2 -> new Tuple2<>( - t2._1().getSource(), t2._2()), - Encoders.tuple(Encoders.STRING(), Encoders.bean(EncloseMinElement.class))); datasourceDataset .joinWith( - datasourceOrganization, datasourceDataset.col("id").equalTo(datasourceOrganization.col("_1")), "left") - .groupByKey( - (MapFunction>, String>) t2 -> t2._1().getId(), - Encoders.STRING()) - .mapGroups( - (MapGroupsFunction>, eu.dnetlib.dhp.skgif.model.Datasource>) ( - k, vs) -> { - eu.dnetlib.dhp.skgif.model.Datasource datasource = new eu.dnetlib.dhp.skgif.model.Datasource(); - Tuple2> first = vs.next(); - Datasource d = first._1(); - // datasource.setLocal_identifier(Utils.getIdentifier(Prefixes.DATASOURCE, d.getId())); - datasource.setLocal_identifier(d.getId()); - datasource - .setIdentifiers( - d - .getPid() - .stream() - .map(p -> Identifier.newInstance(p.getQualifier().getClassid(), p.getValue())) - .collect(Collectors.toList())); - - datasource.setName(d.getOfficialname().getValue()); - datasource.setSubmission_policy_url(d.getSubmissionpolicyurl()); - datasource - .setJurisdiction( - Optional - .ofNullable(d.getJurisdiction()) - .map(v -> v.getClassid()) - .orElse(new String())); - datasource.setPreservation_policy_url(d.getPreservationpolicyurl()); - datasource.setVersion_control(d.getVersioncontrol()); - - datasource - .setData_source_classification( - Optional - .ofNullable(d.getEoscdatasourcetype()) - .map(v -> v.getClassname()) - .orElse(new String())); - datasource.setResearch_product_type(getEoscProductType(d.getResearchentitytypes())); - datasource.setThematic(d.getThematic()); - datasource - .setResearch_product_access_policy( - Optional - .ofNullable(d.getDatabaseaccesstype()) - .map(v -> getResearchProductAccessPolicy(d.getDatabaseaccesstype().getValue())) - .orElse(new ArrayList<>())); - datasource - .setResearch_product_metadata_access_policy( - Optional - .ofNullable(d.getResearchproductmetadataaccesspolicies()) - .map(v -> getResearchProductAccessPolicy(d.getResearchproductmetadataaccesspolicies())) - .orElse(new ArrayList<>())); - if (Optional.ofNullable(first._2()).isPresent()) { - List organizations = new ArrayList<>(); - organizations.add(first._2()._2().getMinOrganization()); - vs.forEachRemaining(org -> organizations.add(org._2()._2().getMinOrganization())); - datasource.setOrganization(organizations); - } - return datasource; - - }, Encoders.bean(eu.dnetlib.dhp.skgif.model.Datasource.class)) + organizations, datasourceDataset.col("id").equalTo(organizations.col("entityId")), "left") + .map((MapFunction, eu.dnetlib.dhp.skgif.model.Datasource>) t2 -> { + eu.dnetlib.dhp.skgif.model.Datasource datasource = dumpDatasource(t2._1()); + if (Optional.ofNullable(t2._2()).isPresent()) { + datasource.setOrganization(t2._2().getRelevant_organization()); + } + return datasource; + }, Encoders.bean(eu.dnetlib.dhp.skgif.model.Datasource.class)) .write() .mode(SaveMode.Overwrite) @@ -169,6 +102,51 @@ public class DumpDatasource implements Serializable { .json(outputPath + "Datasource"); } + private static eu.dnetlib.dhp.skgif.model.Datasource dumpDatasource(Datasource d) { + eu.dnetlib.dhp.skgif.model.Datasource datasource = new eu.dnetlib.dhp.skgif.model.Datasource(); + datasource.setLocal_identifier(d.getId()); + datasource + .setIdentifiers( + d + .getPid() + .stream() + .map(p -> Identifier.newInstance(p.getQualifier().getClassid(), p.getValue())) + .collect(Collectors.toList())); + + datasource.setName(d.getOfficialname().getValue()); + datasource.setSubmission_policy_url(d.getSubmissionpolicyurl()); + datasource + .setJurisdiction( + Optional + .ofNullable(d.getJurisdiction()) + .map(v -> v.getClassid()) + .orElse(new String())); + datasource.setPreservation_policy_url(d.getPreservationpolicyurl()); + datasource.setVersion_control(d.getVersioncontrol()); + + datasource + .setData_source_classification( + Optional + .ofNullable(d.getEoscdatasourcetype()) + .map(v -> v.getClassname()) + .orElse(new String())); + datasource.setResearch_product_type(getEoscProductType(d.getResearchentitytypes())); + datasource.setThematic(d.getThematic()); + datasource + .setResearch_product_access_policy( + Optional + .ofNullable(d.getDatabaseaccesstype()) + .map(v -> getResearchProductAccessPolicy(d.getDatabaseaccesstype().getValue())) + .orElse(new ArrayList<>())); + datasource + .setResearch_product_metadata_access_policy( + Optional + .ofNullable(d.getResearchproductmetadataaccesspolicies()) + .map(v -> getResearchProductAccessPolicy(d.getResearchproductmetadataaccesspolicies())) + .orElse(new ArrayList<>())); + return datasource; + } + private static List getResearchProductAccessPolicy(List value) { return value diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrant.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrant.java index ca4339d..d12ab1e 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrant.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrant.java @@ -16,10 +16,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.io.SAXReader; @@ -28,6 +25,8 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EncloseMinElement; +import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.ExtendingOrganization; +import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.GrantRelation; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.skgif.model.*; @@ -84,112 +83,102 @@ public class DumpGrant implements Serializable { .filter( (FilterFunction) p -> !p.getDataInfo().getDeletedbyinference() && !p.getDataInfo().getInvisible()); - Dataset relations = Utils - .readPath(spark, inputPath + "relation", Relation.class) - .filter( - (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && - !r.getDataInfo().getInvisible() && - r.getRelClass().equalsIgnoreCase(RelationType.ORGANIZATION_PARTICIPANT_IN_PROJECT.label)); - Dataset eme = Utils - .readPath(spark, workingDir + "minEntity", EncloseMinElement.class) - .filter((FilterFunction) e -> e.getMinOrganization() != null); - - Dataset> partecipantOrganization = relations - .joinWith(eme, relations.col("source").equalTo(eme.col("enclosedEntityId"))) - .map( - (MapFunction, Tuple2>) t2 -> new Tuple2<>( - t2._1().getTarget(), t2._2()), - Encoders.tuple(Encoders.STRING(), Encoders.bean(EncloseMinElement.class))); + Dataset partecipatingOrgs = Utils + .readPath(spark, workingDir + "relations/project_partecipating_organization", ExtendingOrganization.class); + projects = projects + .groupByKey((MapFunction) p -> p.getId(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, v) -> v.next(), Encoders.bean(Project.class)); projects - .joinWith(partecipantOrganization, projects.col("id").equalTo(partecipantOrganization.col("_1")), "left") - .groupByKey( - (MapFunction>, String>) t2 -> t2._1().getId(), - Encoders.STRING()) - .mapGroups( - (MapGroupsFunction>, Grant>) (k, v) -> { - Grant g = new Grant(); - Tuple2> first = v.next(); - g.setLocal_identifier(k); - g.setGrantCode(first._1().getCode().getValue()); - g.setIdentifiers(getProjectIdentifier(first._1())); - g.setTitle(first._1().getTitle().getValue()); - g - .setSummary( - Optional - .ofNullable(first._1().getSummary()) - .map(value -> value.getValue()) - .orElse(new String())); - g - .setAcronym( - Optional - .ofNullable(first._1().getAcronym()) - .map(value -> value.getValue()) - .orElse(new String())); - g.setFunder(Utils.getFunderName(first._1().getFundingtree().get(0).getValue())); - // * private String funding_stream;// fundingtree to be used the xpath //funding_level_[n] - g.setFunding_stream(getFundingStream(first._1().getFundingtree().get(0).getValue())); - g - .setCurrency( - Optional - .ofNullable(first._1().getCurrency()) - .map(value -> value.getValue()) - .orElse(new String())); - g - .setFunded_amount( - Optional - .ofNullable(first._1().getFundedamount()) - .orElse(null)); - g - .setKeywords( - first - ._1() - .getSubjects() - .stream() - .map(s -> s.getValue()) - .collect(Collectors.toList())); - g - .setStart_date( - Optional - .ofNullable(first._1().getStartdate()) - .map(value -> value.getValue()) - .orElse(new String())); - g - .setEnd_date( - Optional - .ofNullable(first._1().getEnddate()) - .map(value -> value.getValue()) - .orElse(new String())); - g - .setWebsite( - Optional - .ofNullable(first._1().getWebsiteurl()) - .map(value -> value.getValue()) - .orElse(new String())); - if (Optional.ofNullable(first._2()).isPresent()) { - List relevantOrganizatios = new ArrayList<>(); - relevantOrganizatios.add(first._2()._2().getMinOrganization()); - v - .forEachRemaining( - t2 -> relevantOrganizatios - .add(t2._2()._2().getMinOrganization())); - g.setBeneficiaries(relevantOrganizatios); - } - return g; - }, Encoders.bean(Grant.class)) + .joinWith(partecipatingOrgs, projects.col("id").equalTo(partecipatingOrgs.col("entityId")), "left") + .map((MapFunction, Grant>) t2 -> { + Grant g = dumpGrant(t2._1()); + if (Optional.ofNullable(t2._2()).isPresent()) + g.setBeneficiaries(t2._2().getRelevant_organization()); + return g; + }, Encoders.bean(Grant.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath + "Grant"); } + private static Grant dumpGrant(Project project) throws DocumentException { + Grant g = new Grant(); + + g.setLocal_identifier(project.getId()); + g.setGrantCode(project.getCode().getValue()); + g.setIdentifiers(getProjectIdentifier(project)); + if (Optional.ofNullable(project.getTitle()).isPresent()) + g.setTitle(project.getTitle().getValue()); + g + .setSummary( + Optional + .ofNullable(project.getSummary()) + .map(value -> value.getValue()) + .orElse(new String())); + g + .setAcronym( + Optional + .ofNullable(project.getAcronym()) + .map(value -> value.getValue()) + .orElse(new String())); + if (Optional.ofNullable(project.getFundingtree()).isPresent() && + project.getFundingtree().size() > 0) { + g.setFunder(Utils.getFunderName(project.getFundingtree().get(0).getValue())); + // * private String funding_stream;// fundingtree to be used the xpath //funding_level_[n] + g.setFunding_stream(getFundingStream(project.getFundingtree().get(0).getValue())); + } + + g + .setCurrency( + Optional + .ofNullable(project.getCurrency()) + .map(value -> value.getValue()) + .orElse(new String())); + g + .setFunded_amount( + Optional + .ofNullable(project.getFundedamount()) + .orElse(null)); + if (Optional.ofNullable(project.getSubjects()).isPresent()) + g + .setKeywords( + project + .getSubjects() + .stream() + .map(s -> s.getValue()) + .collect(Collectors.toList())); + g + .setStart_date( + Optional + .ofNullable(project.getStartdate()) + .map(value -> value.getValue()) + .orElse(new String())); + g + .setEnd_date( + Optional + .ofNullable(project.getEnddate()) + .map(value -> value.getValue()) + .orElse(new String())); + g + .setWebsite( + Optional + .ofNullable(project.getWebsiteurl()) + .map(value -> value.getValue()) + .orElse(new String())); + return g; + } + private static String getFundingStream(String fundingtree) throws DocumentException { final Document doc; doc = new SAXReader().read(new StringReader(fundingtree)); if (Optional.ofNullable(doc.selectNodes("//funding_level_0")).isPresent() && - doc.selectNodes("//funding_level_0").size() > 0) - return ((org.dom4j.Node) (doc.selectNodes("//funding_level_0").get(0))).getText(); + doc.selectNodes("//funding_level_0").size() > 0 && + Optional.ofNullable(doc.selectNodes("//funding_level_0/name")).isPresent() && + doc.selectNodes("//funding_level_0/name").size() > 0) + return ((org.dom4j.Node) (doc.selectNodes("//funding_level_0/name").get(0))).getText(); return new String(); } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpOrganization.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpOrganization.java index 66901d1..9cd7104 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpOrganization.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpOrganization.java @@ -11,6 +11,7 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -20,6 +21,7 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.skgif.model.Identifier; import eu.dnetlib.dhp.skgif.model.OrganizationTypes; import eu.dnetlib.dhp.skgif.model.Prefixes; @@ -68,47 +70,55 @@ public class DumpOrganization implements Serializable { private static void mapOrganization(SparkSession spark, String inputPath, String outputPath) { Dataset organizations = Utils.readPath(spark, inputPath + "organization", Organization.class); - organizations + organizations = organizations .filter( (FilterFunction) o -> !o.getDataInfo().getDeletedbyinference() && !o.getDataInfo().getInvisible()) - .map((MapFunction) o -> { - eu.dnetlib.dhp.skgif.model.Organization organization = new eu.dnetlib.dhp.skgif.model.Organization(); - // organization.setLocal_identifier(Utils.getIdentifier(Prefixes.ORGANIZATION, o.getId())); - organization.setLocal_identifier(o.getId()); - organization - .setCountry( - Optional - .ofNullable(o.getCountry().getClassid()) - .orElse(new String())); - organization - .setName( - Optional - .ofNullable(o.getLegalname().getValue()) - .orElse(new String())); - organization - .setShort_name( - Optional - .ofNullable(o.getLegalshortname()) - .map(v -> v.getValue()) - .orElse(new String())); - organization - .setIdentifiers( - o - .getPid() - .stream() - .map(p -> Identifier.newInstance(p.getQualifier().getClassid(), p.getValue())) - .collect(Collectors.toList())); - organization - .setOther_names( - o - .getAlternativeNames() - .stream() - .map(a -> a.getValue()) - .collect(Collectors.toList())); - organization.setType(getOrganizationType(o)); - return organization; - }, Encoders.bean(eu.dnetlib.dhp.skgif.model.Organization.class)) + .groupByKey((MapFunction) p -> p.getId(), Encoders.STRING()) + .mapGroups( + (MapGroupsFunction) (k, v) -> v.next(), + Encoders.bean(Organization.class)); + + organizations.map((MapFunction) o -> { + if (!Optional.ofNullable(o.getPid()).isPresent() || o.getPid().size() == 0) + return null; + eu.dnetlib.dhp.skgif.model.Organization organization = new eu.dnetlib.dhp.skgif.model.Organization(); + // organization.setLocal_identifier(Utils.getIdentifier(Prefixes.ORGANIZATION, o.getId())); + organization.setLocal_identifier(o.getId()); + organization + .setCountry( + Optional + .ofNullable(o.getCountry().getClassid()) + .orElse(new String())); + organization + .setName( + Optional + .ofNullable(o.getLegalname().getValue()) + .orElse(new String())); + organization + .setShort_name( + Optional + .ofNullable(o.getLegalshortname()) + .map(v -> v.getValue()) + .orElse(new String())); + organization + .setIdentifiers( + o + .getPid() + .stream() + .map(p -> Identifier.newInstance(p.getQualifier().getClassid(), p.getValue())) + .collect(Collectors.toList())); + organization + .setOther_names( + o + .getAlternativeNames() + .stream() + .map(a -> a.getValue()) + .collect(Collectors.toList())); + organization.setType(getOrganizationType(o)); + return organization; + }, Encoders.bean(eu.dnetlib.dhp.skgif.model.Organization.class)) + .filter((FilterFunction) o -> o != null) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResearchProduct.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResearchProduct.java new file mode 100644 index 0000000..6023c7b --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResearchProduct.java @@ -0,0 +1,408 @@ + +package eu.dnetlib.dhp.oa.graph.dump.skgif; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.*; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.skgif.model.*; +import eu.dnetlib.dhp.skgif.model.AccessRight; +import scala.Tuple2; + +/** + * @author miriam.baglioni + * @Date 06/02/24 + */ +public class DumpResearchProduct implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(DumpResearchProduct.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + DumpResearchProduct.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/skgif/emit_biblio_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath + "ResearchProduct"); + emitFromResult(spark, inputPath, outputPath, workingDir); + + }); + } + + // per ogni result emetto id + journal se esiste + istanza + hosted by dell'istanza + public static void emitFromResult(SparkSession spark, String inputPath, String outputPath, + String workingDir) { + dumpResearchProduct(spark, inputPath, workingDir, outputPath); + moveDumpedProducts(spark, workingDir, outputPath); + + } + + private static void moveDumpedProducts(SparkSession spark, String workingDir, String outputPath) { + Dataset researchProducts = spark.emptyDataset(Encoders.bean(ResearchProduct.class)); + for (EntityType e : ModelSupport.entityTypes.keySet()) { + if (ModelSupport.isResult(e)) + researchProducts = researchProducts + .union(Utils.readPath(spark, workingDir + e.name() + "/researchproduct", ResearchProduct.class)); + } + researchProducts + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "ResearchProduct"); + } + + private static void dumpResearchProduct(SparkSession spark, String inputPath, String workingDir, + String outputPath) { + ModelSupport.entityTypes.keySet().forEach(e -> { + if (ModelSupport.isResult(e)) { + Class resultClazz = ModelSupport.entityTypes.get(e); + + if (e.name().equalsIgnoreCase("publication")) { + dumpPublication(spark, inputPath, workingDir, e, resultClazz); + } else { + dumpOtherResults(spark, inputPath, workingDir, e, resultClazz); + } + + includeRelevantOrganization(spark, workingDir, e); + includeFunding(spark, workingDir, e); + includeRelatedProducts(spark, workingDir, e); + + } + }); + } + + private static void includeRelatedProducts(SparkSession spark, String workingDir, EntityType e) { + Dataset pprWitGrants = spark + .read() + .schema(Encoders.bean(ResearchProduct.class).schema()) + .json(workingDir + e.name() + "/temp_researchproductgrant") + .as(Encoders.bean(ResearchProduct.class)); + Dataset relatedResults = Utils + .readPath(spark, workingDir + "/relations/related_products", ProductsRelation.class); + pprWitGrants + .joinWith( + relatedResults, pprWitGrants.col("local_identifier").equalTo(relatedResults.col("resultId")), + "left") + .map( + (MapFunction, ResearchProduct>) t2 -> { + if (t2._2() == null) + return t2._1(); + t2._1().setRelated_products(t2._2().getRelated_products()); + return t2._1(); + + }, Encoders.bean(ResearchProduct.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir + e.name() + "/researchproduct"); + Utils.removeOutputDir(spark, workingDir + e.name() + "/temp_researchproductgrant"); + } + + private static void includeFunding(SparkSession spark, String workingDir, EntityType e) { + Dataset prrWithAffiliation = spark + .read() + .schema(Encoders.bean(ResearchProduct.class).schema()) + .json(workingDir + e.name() + "/temp_researchproductaff") + .as(Encoders.bean(ResearchProduct.class)); + + Dataset grants = Utils + .readPath(spark, workingDir + "relations/funding", GrantRelation.class); + + // Dataset pprWitGrants = + prrWithAffiliation + .joinWith( + grants, prrWithAffiliation.col("local_identifier").equalTo(grants.col("resultId")), "left") + .map((MapFunction, ResearchProduct>) t2 -> { + if (t2._2() == null) + return t2._1(); + t2._1().setFunding(t2._2().getFunding()); + return t2._1(); + }, Encoders.bean(ResearchProduct.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir + e.name() + "/temp_researchproductgrant"); + + Utils.removeOutputDir(spark, workingDir + e.name() + "/temp_researchproductaff"); + } + + private static void includeRelevantOrganization(SparkSession spark, String workingDir, EntityType e) { + Dataset affiliations = Utils + .readPath( + spark, workingDir + "relations/result_relevant_organizations", ExtendingOrganization.class); + Dataset partialResearchProduct = spark + .read() + .schema(Encoders.bean(ResearchProduct.class).schema()) + .json(workingDir + e.name() + "/temp_researchProduct") + .as(Encoders.bean(ResearchProduct.class)); + // Dataset prrWithAffiliation = + partialResearchProduct + .joinWith( + affiliations, + partialResearchProduct.col("local_identifier").equalTo(affiliations.col("entityId")), + "left") + .map( + (MapFunction, ResearchProduct>) t2 -> { + if (t2._2() == null) + return t2._1(); + t2._1().setRelevant_organizations(t2._2().getRelevant_organization()); + return t2._1(); + }, Encoders.bean(ResearchProduct.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir + e.name() + "/temp_researchproductaff"); + Utils.removeOutputDir(spark, workingDir + e.name() + "/temp_researchProduct"); + } + + private static void dumpOtherResults(SparkSession spark, String inputPath, String workingDir, EntityType e, Class resultClazz) { + Dataset results = Utils.readPath(spark, inputPath + e.name(), resultClazz); + results.map((MapFunction) r -> { + ArrayList journalHbIds = new ArrayList<>(); + + ResearchProduct rp = ResultMapper.map(r); + rp + .setManifestations( + r + .getInstance() + .stream() + .map(i -> getManifestation(i, journalHbIds, r)) + .collect(Collectors.toList())); + + return rp; + }, Encoders.bean(ResearchProduct.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir + e.name() + "/temp_researchProduct"); + } + + private static void dumpPublication(SparkSession spark, String inputPath, String workingDir, EntityType e, Class resultClazz) { + Dataset> resultHostedBy = Utils + .readPath(spark, inputPath + e.name(), resultClazz) + .flatMap( + (FlatMapFunction>) p -> p + .getInstance() + .stream() + .map(i -> new Tuple2<>(p.getId(), i.getHostedby().getKey())) + .collect(Collectors.toList()) + .iterator(), + Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + + Dataset journalIds = spark + .read() + .schema(Encoders.bean(Datasource.class).schema()) + .json(inputPath + "/datasource") + .filter( + "datainfo.deletedbyinference != true and " + + "eoscdatasourcetype.classid == 'Journal archive' ") + .select("id"); + + Dataset journalHostedByPerResult = resultHostedBy + .join( + journalIds, + resultHostedBy.col("_2").equalTo(journalIds.col("id")), "leftsemi") + .selectExpr("_1 as id", "_2 as journalHostedBy"); + Dataset results = Utils.readPath(spark, inputPath + e.name(), Publication.class); + results + .joinWith( + journalHostedByPerResult, results + .col("id") + .equalTo(journalHostedByPerResult.col("id")), + "left") + .groupByKey( + (MapFunction, String>) t2 -> t2._1().getId(), Encoders.STRING()) + .mapGroups((MapGroupsFunction, ResearchProduct>) (k, v) -> { + ArrayList journalHbIds = new ArrayList<>(); + Tuple2 first = v.next(); + if (Optional.ofNullable(first._2()).isPresent()) + journalHbIds.add(first._2().getAs("journalHostedBy")); + v.forEachRemaining(value -> journalHbIds.add(value._2().getAs("journalHostedBy"))); + Publication p = first._1(); + ResearchProduct rp = ResultMapper.map(p); + rp + .setManifestations( + p + .getInstance() + .stream() + .map(i -> getManifestation(i, journalHbIds, p)) + .collect(Collectors.toList())); + + return rp; + }, Encoders.bean(ResearchProduct.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir + e.name() + "/temp_researchProduct"); + } + + @NotNull + private static Manifestation getManifestation(Instance i, ArrayList journalHbIds, R p) { + Manifestation m = new Manifestation(); + m.setProduct_local_type(i.getInstancetype().getClassname()); + m.setProduct_local_type_schema(i.getInstancetype().getSchemename()); + m.setPeer_review(getPeerReviewd(i)); + m.setAccess_right(getAccessRigth(i)); + m + .setLicence( + getLicence(i)); + if (Optional.ofNullable(i.getUrl()).isPresent() && i.getUrl().size() > 0) + m.setUrl(i.getUrl().get(0)); + else + m.setUrl(null); + if (Optional.ofNullable(i.getPid()).isPresent() && i.getPid().size() > 0) { + m.setPid(i.getPid().get(0).getValue()); + } + if (Optional.ofNullable(i.getDateofacceptance()).isPresent()) + m + .setDates( + Arrays + .asList( + Dates.newInstance(i.getDateofacceptance().getValue(), "publishing"))); + + if (p instanceof Publication) { + if (journalHbIds.contains(i.getHostedby().getKey()) + && Optional.ofNullable(((Publication) p).getJournal()).isPresent()) { + Biblio biblio = getBiblio(((Publication) p).getJournal()); + if (Optional.ofNullable(p.getPublisher()).isPresent()) + biblio.setPublisher(p.getPublisher().getValue()); + m.setBiblio(biblio); + if (Optional.ofNullable(((Publication) p).getJournal().getIssnPrinted()).isPresent()) + m + .setVenue( + MinVenue + .newInstance( + Utils + .getIdentifier(Prefixes.VENUE, ((Publication) p).getJournal().getIssnPrinted()), + i.getHostedby().getValue())); + else if (Optional.ofNullable(((Publication) p).getJournal().getIssnOnline()).isPresent()) + m + .setVenue( + MinVenue + .newInstance( + Utils.getIdentifier(Prefixes.VENUE, ((Publication) p).getJournal().getIssnOnline()), + i.getHostedby().getValue())); + } + } + m + .setHosting_datasource( + MinVenue + .newInstance( + // Utils.getIdentifier(Prefixes.DATASOURCE, epm.getInstance().getHostedby().getKey()), + i.getHostedby().getKey(), + i.getHostedby().getValue())); + return m; + } + + private static Biblio getBiblio(Journal epm) { + Biblio biblio = new Biblio(); + if (Optional.ofNullable(epm.getEdition()).isPresent()) + biblio.setEdition(epm.getEdition()); + if (Optional.ofNullable(epm.getIss()).isPresent()) + biblio.setIssue(epm.getIss()); + if (Optional.ofNullable(epm.getVol()).isPresent()) + biblio.setVolume(epm.getVol()); + if (Optional.ofNullable(epm.getEp()).isPresent()) + biblio.setEnd_page(epm.getEp()); + if (Optional.ofNullable(epm.getSp()).isPresent()) + biblio.setStart_page(epm.getSp()); + return biblio; + } + + @Nullable + private static String getLicence(Instance i) { + return Optional + .ofNullable(i.getLicense()) + .map(value -> value.getValue()) + .orElse(null); + } + + private static String getAccessRigth(Instance i) { + if (Optional.ofNullable(i.getAccessright()).isPresent()) + switch (i.getAccessright().getClassid()) { + case "OPEN": + case "OPEN DATA": + case "OPEN SOURCE": + return AccessRight.OPEN.label; + + case "CLOSED": + return AccessRight.CLOSED.label; + + case "RESTRICTED": + return AccessRight.RESTRICTED.label; + + case "EMBARGO": + case "12MONTHS": + case "6MONTHS": + return AccessRight.EMBARGO.label; + + default: + return AccessRight.UNAVAILABLE.label; + + } + return AccessRight.UNAVAILABLE.label; + + } + + private static String getPeerReviewd(Instance i) { + if (Optional.ofNullable(i.getRefereed()).isPresent()) + + switch (i.getRefereed().getClassid()) { + case "0000": + return PeerReview.UNAVAILABLE.label; + + case "0001": + return PeerReview.PEER_REVIEWED.label; + + case "0002": + return PeerReview.NON_PEER_REVIEWED.label; + } + return PeerReview.UNAVAILABLE.label; + } + +} diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResult.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResult.java index fea0a48..b0876ce 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResult.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResult.java @@ -8,6 +8,8 @@ import java.io.Serializable; import java.util.*; import java.util.stream.Collectors; +import javax.xml.crypto.Data; + import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -30,6 +32,7 @@ import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.skgif.model.*; import eu.dnetlib.dhp.skgif.model.AccessRight; +import eu.dnetlib.dhp.skgif.model.Organization; import scala.Tuple2; /** @@ -87,7 +90,7 @@ public class DumpResult implements Serializable { // selection of the relevant relations from result type to other entity. Only teh semantic relevant ones are // considered - selectRelations(spark, inputPath, workingDir); + // selectRelations(spark, inputPath, workingDir); // merge of relations and manifestation for the same result getRelationAndManifestation(spark, workingDir, inputPath); @@ -138,8 +141,18 @@ public class DumpResult implements Serializable { // } private static void getRelationAndManifestation(SparkSession spark, String workingDir, String inputPath) { - Dataset aggRelations = Utils - .readPath(spark, workingDir + "aggrelation", RelationPerProduct.class); +// Dataset aggRelations = Utils +// .readPath(spark, workingDir + "aggrelation", RelationPerProduct.class); + final StructType rp = new StructType() + .add( + "dataInfo", new StructType() + .add("deletedbyinference", DataTypes.BooleanType)) + .add( + "eoscdatasourcetype", new StructType() + .add("classid", DataTypes.StringType)) + .add("id", DataTypes.StringType) + + ; ModelSupport.entityTypes .keySet() @@ -147,165 +160,194 @@ public class DumpResult implements Serializable { .filter(ModelSupport::isResult) .forEach(e -> { Utils.removeOutputDir(spark, workingDir + e.name() + "/partialresearchproduct"); + log.info("executing on {}", e.name()); + Dataset datasource = spark + .read() + .schema(rp) + .json(inputPath + "/datasource") + .filter(("datainfo.deletedbyinference != true and eoscdatasourcetype.classid == 'Journal archive'")) + .drop("datainfo", "eoscdatasourcetype"); - Dataset datasource = Utils - .readPath(spark, inputPath + "/datasource", Datasource.class) - .filter( - (FilterFunction) d -> Optional.ofNullable(d.getEoscdatasourcetype()).isPresent() && - d.getEoscdatasourcetype().getClassid().equalsIgnoreCase("Journal archive")); +// Dataset man = Utils +// .readPath(spark, workingDir + e.name() + "/manifestation", EmitPerManifestation.class); + Dataset man = spark + .read() + .schema(Encoders.bean(EmitPerManifestation.class).schema()) + .json(workingDir + e.name() + "/manifestation"); + // Dataset partialResearchProduct = - Dataset man = Utils - .readPath(spark, workingDir + e.name() + "/manifestation", EmitPerManifestation.class); + man + .joinWith(datasource, man.col("hostedby").equalTo(datasource.col("id")), "left") - Dataset partialResearchProduct = man - .joinWith(datasource, man.col("instance.hostedby.key").equalTo(datasource.col("id")), "left") .groupByKey( - (MapFunction, String>) t2 -> t2._1().getResultId(), + (MapFunction, String>) t2 -> t2._1().getAs("resultId"), Encoders.STRING()) .mapGroups( - (MapGroupsFunction, PartialResearchProduct>) ( + (MapGroupsFunction, PartialResearchProduct>) ( k, v) -> { PartialResearchProduct prp = new PartialResearchProduct(); prp.setResultId(k); List manifestationList = new ArrayList<>(); - while (v.hasNext()) - manifestationList.add(getManifestation(v.next())); + Tuple2 first = v.next(); + manifestationList.add(getManifestation(first)); + v.forEachRemaining(value -> manifestationList.add(getManifestation(value))); prp.setManifestations(manifestationList); return prp; - }, Encoders.bean(PartialResearchProduct.class)); - partialResearchProduct - .joinWith( - aggRelations, partialResearchProduct.col("resultId").equalTo(aggRelations.col("resultId")), - "left") - .map( - (MapFunction, PartialResearchProduct>) t2 -> { - PartialResearchProduct prp = t2._1(); - if (Optional.ofNullable(t2._2()).isPresent()) { - prp - .setRelated_products( - t2 - ._2() - .getRelatedProduct() - .keySet() - .stream() - .map( - key -> Relations.newInstance(key, t2._2().getRelatedProduct().get(key))) - .collect(Collectors.toList())); - prp.setRelevant_organizations(t2._2().getOrganizations()); - prp.setFunding(t2._2().getFunding()); - } - return prp; + }, Encoders.bean(PartialResearchProduct.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir + e.name() + "/temp_partitalresearchproduct"); + + Dataset partialResearchProduct = spark + .read() + .schema(Encoders.bean(PartialResearchProduct.class).schema()) + .json(workingDir + e.name() + "/temp_partitalresearchproduct") + .as(Encoders.bean(PartialResearchProduct.class)); + + Dataset affiliations = Utils + .readPath( + spark, workingDir + "relations/result_relevant_organizations", ExtendingOrganization.class); + // Dataset prrWithAffiliation = + partialResearchProduct + .joinWith( + affiliations, partialResearchProduct.col("resultId").equalTo(affiliations.col("entityId")), + "left") + .map( + (MapFunction, PartialResearchProduct>) t2 -> { + if (t2._2() == null) + return t2._1(); + t2._1().setRelevant_organizations(t2._2().getRelevant_organization()); + return t2._1(); + }, Encoders.bean(PartialResearchProduct.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir + e.name() + "/temp_partitalresearchproductaff"); + + Utils.removeOutputDir(spark, workingDir + e.name() + "/temp_partitalresearchproduct"); + + Dataset prrWithAffiliation = spark + .read() + .schema(Encoders.bean(PartialResearchProduct.class).schema()) + .json(workingDir + e.name() + "/temp_partitalresearchproductaff") + .as(Encoders.bean(PartialResearchProduct.class)); + Dataset grants = Utils + .readPath(spark, workingDir + "relations/funding", GrantRelation.class); + + // Dataset pprWitGrants = + prrWithAffiliation + .joinWith(grants, prrWithAffiliation.col("resultId").equalTo(grants.col("resultId")), "left") + .map((MapFunction, PartialResearchProduct>) t2 -> { + if (t2._2() == null) + return t2._1(); + t2._1().setFunding(t2._2().getFunding()); + return t2._1(); + }, Encoders.bean(PartialResearchProduct.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir + e.name() + "/temp_partitalresearchproductgrant"); + + Utils.removeOutputDir(spark, workingDir + e.name() + "/temp_partitalresearchproductaff"); + Dataset pprWitGrants = spark + .read() + .schema(Encoders.bean(PartialResearchProduct.class).schema()) + .json(workingDir + e.name() + "/temp_partitalresearchproductgrant") + .as(Encoders.bean(PartialResearchProduct.class)); + Dataset relatedResults = Utils + .readPath(spark, workingDir + "/relations/related_products", ProductsRelation.class); + pprWitGrants + .joinWith( + relatedResults, pprWitGrants.col("resultId").equalTo(relatedResults.col("resultId")), + "left") + .map( + (MapFunction, PartialResearchProduct>) t2 -> { + if (t2._2() == null) + return t2._1(); + t2._1().setRelated_products(t2._2().getRelated_products()); + return t2._1(); + }, Encoders.bean(PartialResearchProduct.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(workingDir + e.name() + "/partialresearchproduct"); + Utils.removeOutputDir(spark, workingDir + e.name() + "/temp_partitalresearchproductgrant"); }); } - private static Manifestation getManifestation(Tuple2 t2) { + private static Manifestation getManifestation(Tuple2 t2) { // se il lato sinistro c'e' allora ho la biblio e la venue // se non c'e' allora ho solo gli altri valori - EmitPerManifestation epm = t2._1(); + Row epm = t2._1(); Manifestation manifestation = new Manifestation(); - manifestation.setProduct_local_type(epm.getInstance().getInstancetype().getClassname()); - manifestation.setProduct_local_type_schema(epm.getInstance().getInstancetype().getSchemename()); - if (Optional.ofNullable(epm.getInstance().getDateofacceptance()).isPresent()) + manifestation.setProduct_local_type(epm.getAs("product_local_type")); + manifestation.setProduct_local_type_schema(epm.getAs("product_local_type_schema")); + if (Optional.ofNullable(epm.getAs("publishing_date")).isPresent()) manifestation .setDates( Arrays .asList( - Dates.newInstance(epm.getInstance().getDateofacceptance().getValue(), "publishing"))); - if (Optional.ofNullable(epm.getInstance().getRefereed()).isPresent()) - switch (epm.getInstance().getRefereed().getClassid()) { - case "0000": - manifestation.setPeer_review(PeerReview.UNAVAILABLE.label); - break; - case "0001": - manifestation.setPeer_review(PeerReview.PEER_REVIEWED.label); - break; - case "0002": - manifestation.setPeer_review(PeerReview.NON_PEER_REVIEWED.label); - break; - } - + Dates.newInstance(epm.getAs("publishing_date"), "publishing"))); + manifestation.setPeer_review(epm.getAs("peer_reviewed")); manifestation.setMetadata_curation("unavailable"); - if (Optional.ofNullable(epm.getInstance().getAccessright()).isPresent()) - switch (epm.getInstance().getAccessright().getClassid()) { - case "OPEN": - case "OPEN DATA": - case "OPEN SOURCE": - manifestation.setAccess_right(AccessRight.OPEN.label); - break; - case "CLOSED": - manifestation.setAccess_right(AccessRight.CLOSED.label); - break; - case "RESTRICTED": - manifestation.setAccess_right(AccessRight.RESTRICTED.label); - break; - case "EMBARGO": - case "12MONTHS": - case "6MONTHS": - manifestation.setAccess_right(AccessRight.EMBARGO.label); - break; - default: - manifestation.setAccess_right(AccessRight.UNAVAILABLE.label); - - } - - manifestation - .setLicence( - Optional - .ofNullable(epm.getInstance().getLicense()) - .map(value -> value.getValue()) - .orElse(null)); - if (Optional.ofNullable(epm.getInstance().getUrl()).isPresent() && epm.getInstance().getUrl().size() > 0) - manifestation - .setUrl(epm.getInstance().getUrl().get(0)); - else - manifestation.setUrl(null); - - if (Optional.ofNullable(epm.getInstance().getPid()).isPresent() && epm.getInstance().getPid().size() > 0) { - manifestation.setPid(epm.getInstance().getPid().get(0).getValue()); - } + manifestation.setAccess_right(epm.getAs("access_right")); + manifestation.setLicence(epm.getAs("licence")); + manifestation.setUrl(epm.getAs("url")); + manifestation.setPid(epm.getAs("pid")); if (Optional.ofNullable(t2._2()).isPresent()) { + Biblio biblio = getBiblio(epm); +// if (biblio == null) +// log.info("null biblio fo {} ", epm.getAs("resultId")); manifestation.setBiblio(getBiblio(epm)); - if (Optional.ofNullable(t2._2().getJournal().getIssnPrinted()).isPresent()) + if (Optional.ofNullable(epm.getAs("journal")).isPresent() && + Optional.ofNullable(epm.getAs("journal.issnPrinted")).isPresent()) manifestation .setVenue( MinVenue .newInstance( - Utils.getIdentifier(Prefixes.VENUE, t2._2().getJournal().getIssnPrinted()), - t2._1().getJournal().getName())); - else if (Optional.ofNullable(t2._2().getJournal().getIssnOnline()).isPresent()) + Utils.getIdentifier(Prefixes.VENUE, epm.getAs("journal.issnPrinted")), + epm.getAs("hostedbyvalue"))); + else if (Optional.ofNullable(epm.getAs("journal")).isPresent() && + Optional.ofNullable(epm.getAs("journal.issnOnline")).isPresent()) manifestation .setVenue( MinVenue .newInstance( - Utils.getIdentifier(Prefixes.VENUE, t2._1().getJournal().getIssnOnline()), - t2._1().getJournal().getName())); + Utils.getIdentifier(Prefixes.VENUE, epm.getAs("journal.issnOnline")), + epm.getAs("hostedbyvalue"))); } manifestation .setHosting_datasource( MinVenue .newInstance( // Utils.getIdentifier(Prefixes.DATASOURCE, epm.getInstance().getHostedby().getKey()), - epm.getInstance().getHostedby().getKey(), - epm.getInstance().getHostedby().getValue())); + epm.getAs("hostedBy"), + epm.getAs("hostedbyvalue"))); return manifestation; } - private static Biblio getBiblio(EmitPerManifestation epm) { + private static Biblio getBiblio(Row epm) { Biblio biblio = new Biblio(); - biblio.setEdition(epm.getJournal().getEdition()); - biblio.setIssue(epm.getJournal().getIss()); - biblio.setPublisher(epm.getPublisher()); - biblio.setVolume(epm.getJournal().getVol()); - biblio.setEnd_page(epm.getJournal().getEp()); - biblio.setStart_page(epm.getJournal().getSp()); + if (!Optional.ofNullable(epm.getAs("journal")).isPresent()) { + return null; + } + if (Optional.ofNullable(epm.getAs("journal.edition")).isPresent()) + biblio.setEdition(epm.getAs("journal.edition")); + if (Optional.ofNullable(epm.getAs("journal.iss")).isPresent()) + biblio.setIssue(epm.getAs("journal.iss")); + if (Optional.ofNullable(epm.getAs("publisher")).isPresent()) + biblio.setPublisher(epm.getAs("publisher")); + if (Optional.ofNullable(epm.getAs("journal.vol")).isPresent()) + biblio.setVolume(epm.getAs("journal.vol")); + if (Optional.ofNullable(epm.getAs("journal.ep")).isPresent()) + biblio.setEnd_page(epm.getAs("journal.ep")); + if (Optional.ofNullable(epm.getAs("journal.sp")).isPresent()) + biblio.setStart_page(epm.getAs("journal.sp")); return biblio; } @@ -369,6 +411,25 @@ public class DumpResult implements Serializable { RelationType.PART.label, RelationType.VERSION.label); + Dataset relation = spark + .read() + .schema(Encoders.bean(Relation.class).schema()) + .json(inputPath + "relation") + .filter( + "datainfo.deletedbyinference != true and " + + "relClass == 'hasAuthorInstitution") + .select("source", "target"); + + Dataset organization = spark + .read() + .schema(Encoders.bean(Organization.class).schema()) + .json(inputPath + "organization") + .filter("datainfo.deletedbyinference != true") + .select("id", "pid", "legalname.value"); + +// result = spark.read().schema(Encoders.bean(Result.class).schema()) +// .json(inputPath + ) + // relationsProducts // .stream() // .forEach(r -> buildRelationPerProducts(spark, inputPath, workingDir, r)); diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpVenue.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpVenue.java index eb206ad..60c3bff 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpVenue.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpVenue.java @@ -85,6 +85,8 @@ public class DumpVenue implements Serializable { manifestationDataset, datasourceDataset.col("id").equalTo(manifestationDataset.col("hostedby")), "left") .map((MapFunction, Venue>) t2 -> { + if (!Optional.ofNullable(t2._1().getJournal()).isPresent()) + return null; Venue venue = new Venue(); Datasource d = t2._1(); if (Optional.ofNullable(d.getJournal()).isPresent() @@ -105,6 +107,7 @@ public class DumpVenue implements Serializable { venue.setContributions(null); return venue; }, Encoders.bean(Venue.class)) + .filter(Objects::nonNull) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromEntities.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromEntities.java index 2a8e0f7..ff330d8 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromEntities.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromEntities.java @@ -15,20 +15,18 @@ import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EmitPerManifestation; -import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EncloseMinElement; import eu.dnetlib.dhp.schema.common.EntityType; -import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.skgif.model.*; +import eu.dnetlib.dhp.skgif.model.AccessRight; import scala.Tuple2; /** @@ -72,9 +70,9 @@ public class EmitFromEntities implements Serializable { spark -> { Utils.removeOutputDir(spark, outputPath); emitFromResult(spark, inputPath, outputPath, workingDir); - emitFromDatasource(spark, inputPath, workingDir); - emitFromOrganization(spark, inputPath, workingDir); - emitFromProject(spark, inputPath, workingDir); +// emitFromDatasource(spark, inputPath, workingDir); +// emitFromOrganization(spark, inputPath, workingDir); +// emitFromProject(spark, inputPath, workingDir); }); } @@ -196,13 +194,53 @@ public class EmitFromEntities implements Serializable { // per ogni result emetto id + journal se esiste + istanza + hosted by dell'istanza public static void emitFromResult(SparkSession spark, String inputPath, String outputPath, String workingDir) { - emitManifestation(spark, inputPath, workingDir); + // emitManifestation(spark, inputPath, workingDir); emitPerson(spark, inputPath, outputPath, workingDir); emitTopic(spark, inputPath, outputPath, workingDir); - emitMinProduct(spark, inputPath, workingDir); + emitDatasourcePublisher(spark, inputPath, workingDir); + // emitMinProduct(spark, inputPath, workingDir); } +//the publisher is at the level of the result as well as the information for the journal. We do not know which instance + // hostedby.key is the one for the journal + private static void emitDatasourcePublisher(SparkSession spark, String inputPath, String workingDir) { + Dataset journalIds = spark + .read() + .schema(Encoders.bean(Datasource.class).schema()) + .json((inputPath + "datasource")) + .filter( + "datainfo.deletedbyinference !=true false and " + + "eoscdatasourcetype.classid == 'Journal archive' ") + .select("id"); + + Dataset result = spark + .read() + .schema(Encoders.bean(Publication.class).schema()) + .json(inputPath + "publication") + .filter("datainfo.deletedbyinference != true ") + .as(Encoders.bean(Publication.class)); + + Dataset datasourcePublisher = result.flatMap((FlatMapFunction>) r -> { + ArrayList> dsPub = new ArrayList<>(); + if (Optional.ofNullable(r.getJournal()).isPresent() && + Optional.ofNullable(r.getPublisher()).isPresent()) { + for (Instance i : r.getInstance()) + dsPub.add(new Tuple2<>(i.getHostedby().getKey(), r.getPublisher().getValue())); + } + return dsPub.iterator(); + }, Encoders.tuple(Encoders.STRING(), Encoders.STRING())) + .selectExpr("_1 as hostedby", "_2 as publisher"); + + datasourcePublisher + .join(journalIds, datasourcePublisher.col("hostedby").equalTo(journalIds.col("id")), "leftsemi") + .distinct() + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir + "/datasourcePublisher"); + } + private static void emitMinProduct(SparkSession spark, String inputPath, String workingDir) { Utils.removeOutputDir(spark, workingDir + "minProduct"); ModelSupport.entityTypes.keySet().forEach(e -> { @@ -365,8 +403,10 @@ public class EmitFromEntities implements Serializable { .flatMap((FlatMapFunction) p -> p.getInstance().stream().map(i -> { EmitPerManifestation epb = new EmitPerManifestation(); epb.setResultId(p.getId()); - epb.setInstance(i); + setInstanceFields(epb, i); + // epb.setInstance(i); epb.setHostedBy(i.getHostedby().getKey()); + epb.setHostedbyvalue(i.getHostedby().getValue()); epb .setPublisher( Optional @@ -414,4 +454,69 @@ public class EmitFromEntities implements Serializable { .json(workingDir + "/datasourcePublisher"); } + private static void setInstanceFields(EmitPerManifestation epb, Instance i) { + epb.setProduct_local_type(i.getInstancetype().getClassname()); + epb.setProduct_local_type_schema(i.getInstancetype().getSchemename()); + epb.setPeer_reviewed(getPeerReviewd(i)); + epb.setAccess_right(getAccessRigth(i)); + epb + .setLicence( + Optional + .ofNullable(i.getLicense()) + .map(value -> value.getValue()) + .orElse(null)); + if (Optional.ofNullable(i.getUrl()).isPresent() && i.getUrl().size() > 0) + epb.setUrl(i.getUrl().get(0)); + else + epb.setUrl(null); + if (Optional.ofNullable(i.getPid()).isPresent() && i.getPid().size() > 0) { + epb.setPid(i.getPid().get(0).getValue()); + } + if (Optional.ofNullable(i.getDateofacceptance()).isPresent()) + epb.setPublishing_date(i.getDateofacceptance().getValue()); + } + + private static String getAccessRigth(Instance i) { + if (Optional.ofNullable(i.getAccessright()).isPresent()) + switch (i.getAccessright().getClassid()) { + case "OPEN": + case "OPEN DATA": + case "OPEN SOURCE": + return AccessRight.OPEN.label; + + case "CLOSED": + return AccessRight.CLOSED.label; + + case "RESTRICTED": + return AccessRight.RESTRICTED.label; + + case "EMBARGO": + case "12MONTHS": + case "6MONTHS": + return AccessRight.EMBARGO.label; + + default: + return AccessRight.UNAVAILABLE.label; + + } + return AccessRight.UNAVAILABLE.label; + + } + + private static String getPeerReviewd(Instance i) { + if (Optional.ofNullable(i.getRefereed()).isPresent()) + + switch (i.getRefereed().getClassid()) { + case "0000": + return PeerReview.UNAVAILABLE.label; + + case "0001": + return PeerReview.PEER_REVIEWED.label; + + case "0002": + return PeerReview.NON_PEER_REVIEWED.label; + } + return PeerReview.UNAVAILABLE.label; + } + } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/SelectRelation.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/SelectRelation.java new file mode 100644 index 0000000..af0cb17 --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/SelectRelation.java @@ -0,0 +1,516 @@ + +package eu.dnetlib.dhp.oa.graph.dump.skgif; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.*; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.ExtendingOrganization; +import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.GrantRelation; +import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.ProductsRelation; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.skgif.model.*; +import scala.Tuple5; + +/** + * @author miriam.baglioni + * @Date 16/03/24 + */ +public class SelectRelation implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SelectRelation.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + DumpResult.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/skgif/select_relation_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); + + final String relationPath = parser.get("relationPath"); + log.info("relationPath: {}", relationPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { +// Utils.removeOutputDir(spark, workingDir + "aggrelation"); + + // selectAffiliationRelations(spark, inputPath, workingDir, outputPath); + createOrganizationExtention( + spark, inputPath, RelationType.RESULT_AFFILIATIED_TO_ORGANIZATION.label, + workingDir + "relations/result_relevant_organizations", relationPath); + selectFundingRelations(spark, inputPath, workingDir, relationPath); + selectProductRelation(spark, inputPath, workingDir, relationPath); + // selectDatasourceOrganizationRelation(spark, inputPath, workingDir, outputPath); + createOrganizationExtention( + spark, inputPath, RelationType.DATASOURCE_PROVIDED_BY_ORGANIZATION.label, + workingDir + "relations/datasource_providing_organization", relationPath); + createOrganizationExtention( + spark, inputPath, RelationType.PROJECT_HAS_PARTICIPANT_ORGANIZATION.label, + workingDir + "relations/project_partecipating_organization", relationPath); + }); + } + + private static void createOrganizationExtention(SparkSession spark, String inputPath, String relationSem, + String outputPath, String relationPath) { + final StructType rp = new StructType() + .add( + "dataInfo", new StructType() + .add("deletedbyinference", DataTypes.BooleanType)) + .add("source", DataTypes.StringType) + .add("target", DataTypes.StringType) + .add("relClass", DataTypes.StringType); + + Dataset relation = spark + .read() + .schema(rp) + .json(relationPath) + .filter( + "datainfo.deletedbyinference != true and " + + "relClass == '" + relationSem + "'") + .drop("datainfo", "relClass"); + + final Dataset minOrganizations = getMinOrganizationDataset(spark, inputPath); + + relation + .join(minOrganizations, relation.col("target").equalTo(minOrganizations.col("local_identifier"))) + .drop("target") + .groupByKey((MapFunction) r -> r.getAs("source"), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, v) -> { + ExtendingOrganization ar = new ExtendingOrganization(); + ar.setEntityId(k); + addRelevantOrganization(ar, v); + return ar; + }, Encoders.bean(ExtendingOrganization.class)) + // .show(false); + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + + } + + private static void selectDatasourceOrganizationRelation(SparkSession spark, String inputPath, String workingDir, + String outputPath) { + final StructType rp = new StructType() + .add( + "dataInfo", new StructType() + .add("deletedbyinference", DataTypes.BooleanType)) + .add("source", DataTypes.StringType) + .add("target", DataTypes.StringType) + .add("relClass", DataTypes.StringType); + + Dataset relation = spark + .read() + .schema(rp) + .json(inputPath + "relation") + .filter( + "datainfo.deletedbyinference != true and " + + "relClass == '" + RelationType.DATASOURCE_PROVIDED_BY_ORGANIZATION + "'") + .drop("datainfo", "relClass"); + + final Dataset minOrganizations = getMinOrganizationDataset(spark, inputPath); + + relation + .join(minOrganizations, relation.col("target").equalTo(minOrganizations.col("local_identifier"))) + .drop("target") + .groupByKey((MapFunction) r -> r.getAs("source"), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, v) -> { + ExtendingOrganization ar = new ExtendingOrganization(); + ar.setEntityId(k); + addRelevantOrganization(ar, v); + return ar; + }, Encoders.bean(ExtendingOrganization.class)) + // .show(false); + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json("/tmp/miriam/prova/providingOrganization"); + + } + + private static void selectProductRelation(SparkSession spark, String inputPath, String workingDir, + String relationPath) { + + final StructType rp = new StructType() + .add( + "dataInfo", new StructType() + .add("deletedbyinference", DataTypes.BooleanType)) + .add("source", DataTypes.StringType) + .add("target", DataTypes.StringType) + .add("relClass", DataTypes.StringType); + + Dataset relation = spark + .read() + .schema(rp) + .json(relationPath) + .filter("datainfo.deletedbyinference != true") + .filter( + "relClass == '" + RelationType.DOCUMENTS.label + "' or " + + "relClass == '" + RelationType.CITATION.label + "' or " + + "relClass == '" + RelationType.PART.label + "' or " + + "relClass == '" + RelationType.SUPPLEMENT.label + "' or " + + "relClass == '" + RelationType.VERSION.label + "'") + .drop("datainfo"); + + Dataset result = spark + .read() + .schema(Encoders.bean(Result.class).schema()) + .json(inputPath + "publication") + .filter( + "datainfo.deletedbyinference != true and " + + "datainfo.invisible != true") + .selectExpr("id", "title[0].value as title", "pid"); + + result.createOrReplaceTempView("res"); + + String query = "select id, pide.qualifier.classid as schema, pide.value as pid, title " + + "from res " + + "lateral view explode (pid) p as pide "; + + Dataset minProduct = spark + .sql(query) + // .show(false); + .groupByKey((MapFunction) r -> r.getAs("id"), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, v) -> { + MinProduct mp = new MinProduct(); + mp.setLocal_identifier(k); + Row r = v.next(); + mp.setTitle(r.getAs("title")); + addProductPid(mp, r); + v.forEachRemaining(row -> addProductPid(mp, row)); + return mp; + }, Encoders.bean(MinProduct.class)); + + relation + .join(minProduct, relation.col("target").equalTo(minProduct.col("local_identifier"))) + .selectExpr("source", "local_identifier", "title", "doi", "pmcid", "pmid", "arxivid", "relClass as sem") + .groupByKey((MapFunction) r -> r.getAs("source"), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, v) -> { + ProductsRelation pr = new ProductsRelation(); + pr.setResultId(k); + addResulRelations(pr, v); + return pr; + }, Encoders.bean(ProductsRelation.class)) + // .show(false); + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir + "relations/related_products"); + + } + + private static void addResulRelations(ProductsRelation pr, Iterator v) { + pr.setRelated_products(new ArrayList<>()); + Map> hashMap = new HashMap<>(); + while (v.hasNext()) { + Row next = v.next(); + String sem = next.getAs("sem"); + if (!hashMap.containsKey(sem)) + hashMap.put(sem, new ArrayList<>()); + hashMap.get(sem).add(getMinProduct(next)); + } + + hashMap + .keySet() + .stream() + .forEach(key -> pr.getRelated_products().add(Relations.newInstance(key, hashMap.get(key)))); + } + + private static MinProduct getMinProduct(Row next) { + MinProduct mp = new MinProduct(); + mp.setLocal_identifier(next.getAs("local_identifier")); + if (Optional.ofNullable(next.getAs("doi")).isPresent()) + mp.setDoi(next.getAs("doi")); + if (Optional.ofNullable(next.getAs("pmid")).isPresent()) + mp.setPmid(next.getAs("pmid")); + if (Optional.ofNullable(next.getAs("pmcid")).isPresent()) + mp.setPmcid(next.getAs("pmcid")); + if (Optional.ofNullable(next.getAs("arxivid")).isPresent()) + mp.setArxivid(next.getAs("arxivid")); + return mp; + } + + private static void addProductPid(MinProduct mp, Row next) { + String schema = next.getAs("schema"); + if (Optional.ofNullable(schema).isPresent()) { + switch (schema) { + case "doi": + mp.setDoi(next.getAs("pid")); + break; + case "pmcid": + mp.setPmcid(next.getAs("pid")); + break; + case "pmid": + mp.setPmid(next.getAs("pid")); + break; + case "arXiv": + mp.setArxivid(next.getAs("pid")); + break; + + } + } + } + + private static void selectFundingRelations(SparkSession spark, String inputPath, String workingDir, + String relationPath) { + final StructType tp = new StructType() + .add( + "dataInfo", new StructType() + .add("deletedbyinference", DataTypes.BooleanType)) + .add("id", DataTypes.StringType); + final StructType rp = new StructType() + .add( + "dataInfo", new StructType() + .add("deletedbyinference", DataTypes.BooleanType)) + .add("source", DataTypes.StringType) + .add("target", DataTypes.StringType) + .add("relClass", DataTypes.StringType); + + Dataset relation = spark + .read() + .schema(rp) + .json(relationPath) + .filter( + "datainfo.deletedbyinference != true and " + + "relClass == '" + RelationType.RESULT_OUTCOME_FUNDING.label + "'") + .drop("datainfo", "relClass"); + Dataset projects = Utils + .readPath(spark, inputPath + "project", Project.class) + .filter( + (FilterFunction) p -> !p.getDataInfo().getDeletedbyinference() && + p.getFundingtree().size() > 0 + && + Utils + .getFunderName(p.getFundingtree().get(0).getValue()) + .equalsIgnoreCase("European Commission")) + + .map((MapFunction>) p -> { + String id = p.getId(); + String acronym = ""; + if (Optional.ofNullable(p.getAcronym()).isPresent()) + acronym = p.getAcronym().getValue(); + String title = ""; + if (Optional.ofNullable(p.getTitle()).isPresent()) + title = p.getTitle().getValue(); + String funder = Utils.getFunderName(p.getFundingtree().get(0).getValue()); + String code = p.getCode().getValue(); + return new Tuple5<>(id, acronym, title, funder, code); + }, Encoders + .tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING(), Encoders.STRING(), Encoders.STRING())) + .selectExpr("_1 as id", "_2 as acronym", "_3 as title", "_4 as funder", "_5 as code"); + + relation + .join(projects, relation.col("target").equalTo(projects.col("id"))) + .drop("target") + .groupByKey((MapFunction) r -> r.getAs("source"), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, v) -> { + GrantRelation gr = new GrantRelation(); + gr.setResultId(k); + addFunding(gr, v); + return gr; + }, Encoders.bean(GrantRelation.class)) +// .show(false); + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir + "relations/funding"); + } + + private static void addFunding(GrantRelation gr, Iterator v) { + gr.setFunding(new ArrayList<>()); + while (v.hasNext()) { + gr.getFunding().add(getMinGrant(v.next())); + + } + } + + private static MinGrant getMinGrant(Row next) { + MinGrant mn = new MinGrant(); + mn.setCode(next.getAs("code")); + mn.setLocal_identifier(next.getAs("id")); + mn.setFunder(next.getAs("funder")); + if (Optional.ofNullable(next.getAs("acronym")).isPresent()) + mn.setTitle(next.getAs("acronym")); + else + mn.setTitle(next.getAs("title")); + return mn; + } + + private static void selectAffiliationRelations(SparkSession spark, String inputPath, String workingDir, + String outputPath) { + + final StructType rp = new StructType() + .add( + "dataInfo", new StructType() + .add("deletedbyinference", DataTypes.BooleanType)) + .add("source", DataTypes.StringType) + .add("target", DataTypes.StringType) + .add("relClass", DataTypes.StringType); + + Dataset relation = spark + .read() + .schema(rp) + .json(inputPath + "relation") + .filter( + "datainfo.deletedbyinference != true and " + + "relClass == '" + RelationType.RESULT_AFFILIATIED_TO_ORGANIZATION.label + "'") + .drop("datainfo", "relClass"); + + final Dataset minOrganizations = getMinOrganizationDataset(spark, inputPath); + + relation + .join(minOrganizations, relation.col("target").equalTo(minOrganizations.col("local_identifier"))) + .drop("target") + .groupByKey((MapFunction) r -> r.getAs("source"), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, v) -> { + ExtendingOrganization ar = new ExtendingOrganization(); + ar.setEntityId(k); + addRelevantOrganization(ar, v); + return ar; + }, Encoders.bean(ExtendingOrganization.class)) + // .show(false); + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json("/tmp/miriam/prova/relevantOrganization"); + + } + + private static Dataset getMinOrganizationDataset(SparkSession spark, String inputPath) { + Dataset organization = spark + .read() + .schema(Encoders.bean(Organization.class).schema()) + .json(inputPath + "organization") + .filter("datainfo.deletedbyinference != true") + .selectExpr("id", "legalname.value as name", "pid"); + + organization.createOrReplaceTempView("org"); + + String query = "select id, pide.qualifier.classid as schema, pide.value as pid, name " + + "from org " + + "lateral view explode (pid) p as pide "; + + Dataset minOrganizations = spark + .sql(query) + .groupByKey((MapFunction) r -> r.getAs("id"), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, v) -> { + MinOrganization mn = new MinOrganization(); + mn.setLocal_identifier(k); + Row r = v.next(); + mn.setName(r.getAs("name")); + addOrganizationPid(mn, r); + v.forEachRemaining(row -> addOrganizationPid(mn, row)); + return mn; + }, Encoders.bean(MinOrganization.class)); + return minOrganizations; + } + + private static void addOrganizationPid(MinOrganization mo, Row next) { + String schema = next.getAs("schema"); + if (Optional.ofNullable(schema).isPresent()) { + switch (schema) { + case "ROR": + mo.setRor(next.getAs("pid")); + break; + case "ISNI": + mo.setIsni(next.getAs("pid")); + break; + case "FundRef": + mo.setFundRef(next.getAs("pid")); + break; + case "RingGold": + mo.setRinGold(next.getAs("pid")); + break; + case "Wikidata": + mo.setWikidata(next.getAs("pid")); + break; + + } + } + } + + private static void addRelevantOrganization(ExtendingOrganization ar, Iterator v) { + ar.setRelevant_organization(new ArrayList<>()); + while (v.hasNext()) + ar.getRelevant_organization().add(getMinOrg(v.next())); + } + + private static MinOrganization getMinOrg(Row next) { + MinOrganization mo = new MinOrganization(); + + mo.setLocal_identifier(next.getAs("local_identifier")); + mo.setName(next.getAs("name")); + if (Optional.ofNullable(next.getAs("ror")).isPresent()) + mo.setRor(next.getAs("ror")); + if (Optional.ofNullable(next.getAs("isni")).isPresent()) + mo.setIsni(next.getAs("isni")); + if (Optional.ofNullable(next.getAs("fundRef")).isPresent()) + mo.setFundRef(next.getAs("fundRef")); + if (Optional.ofNullable(next.getAs("rinGold")).isPresent()) + mo.setRinGold(next.getAs("rinGold")); + if (Optional.ofNullable(next.getAs("wikidata")).isPresent()) + mo.setWikidata(next.getAs("wikidata")); +// return mo; +// } + // +// if (Optional.ofNullable(pids).isPresent()) +// pids.toStream().foreach(pid -> { +// if (Optional.ofNullable(pid.getQualifier()).isPresent() && +// Optional.ofNullable(pid.getQualifier().getClassid()).isPresent()) +// switch (pid.getQualifier().getClassid().toLowerCase()) { +// case "ror": +// mo.setRor(pid.getValue()); +// break; +// case "isni": +// mo.setIsni(pid.getValue()); +// break; +// case "fundref": +// mo.setFundRef(pid.getValue()); +// break; +// case "ringgold": +// mo.setRinGold(pid.getValue()); +// break; +// case "wikidata": +// mo.setWikidata(pid.getValue()); +// break; +// +// } +// return null; +// }); + return mo; + } +} diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/EmitPerManifestation.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/EmitPerManifestation.java index a3345ff..137c204 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/EmitPerManifestation.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/EmitPerManifestation.java @@ -3,9 +3,7 @@ package eu.dnetlib.dhp.oa.graph.dump.skgif.beans; import java.io.Serializable; -import eu.dnetlib.dhp.schema.oaf.Instance; import eu.dnetlib.dhp.schema.oaf.Journal; -import eu.dnetlib.dhp.skgif.model.Biblio; /** * @author miriam.baglioni @@ -14,9 +12,94 @@ import eu.dnetlib.dhp.skgif.model.Biblio; public class EmitPerManifestation implements Serializable { private String resultId; private String hostedBy; + private String hostedbyvalue; private Journal journal; - private Instance instance; + // private Instance instance; private String publisher; + private String product_local_type; // instance.getinstancetype.getclassname + private String product_local_type_schema; // getInstance().getInstancetype().getSchemename() + private String publishing_date; + private String peer_reviewed; + + private String access_right; + + private String licence; + + private String url; + + private String pid; + + public String getProduct_local_type() { + return product_local_type; + } + + public void setProduct_local_type(String product_local_type) { + this.product_local_type = product_local_type; + } + + public String getProduct_local_type_schema() { + return product_local_type_schema; + } + + public void setProduct_local_type_schema(String product_local_type_schema) { + this.product_local_type_schema = product_local_type_schema; + } + + public String getPublishing_date() { + return publishing_date; + } + + public void setPublishing_date(String publishing_date) { + this.publishing_date = publishing_date; + } + + public String getPeer_reviewed() { + return peer_reviewed; + } + + public void setPeer_reviewed(String peer_reviewed) { + this.peer_reviewed = peer_reviewed; + } + + public String getAccess_right() { + return access_right; + } + + public void setAccess_right(String access_right) { + this.access_right = access_right; + } + + public String getLicence() { + return licence; + } + + public void setLicence(String licence) { + this.licence = licence; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public String getPid() { + return pid; + } + + public void setPid(String pid) { + this.pid = pid; + } + + public String getHostedbyvalue() { + return hostedbyvalue; + } + + public void setHostedbyvalue(String hostedbyvalue) { + this.hostedbyvalue = hostedbyvalue; + } public String getPublisher() { return publisher; @@ -50,11 +133,4 @@ public class EmitPerManifestation implements Serializable { this.journal = journal; } - public Instance getInstance() { - return instance; - } - - public void setInstance(Instance instance) { - this.instance = instance; - } } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/ExtendingOrganization.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/ExtendingOrganization.java new file mode 100644 index 0000000..590af1c --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/ExtendingOrganization.java @@ -0,0 +1,32 @@ + +package eu.dnetlib.dhp.oa.graph.dump.skgif.beans; + +import java.io.Serializable; +import java.util.List; + +import eu.dnetlib.dhp.skgif.model.MinOrganization; + +/** + * @author miriam.baglioni + * @Date 16/03/24 + */ +public class ExtendingOrganization implements Serializable { + private String entityId; + private java.util.List relevant_organization; + + public String getEntityId() { + return entityId; + } + + public void setEntityId(String entityId) { + this.entityId = entityId; + } + + public List getRelevant_organization() { + return relevant_organization; + } + + public void setRelevant_organization(List relevant_organization) { + this.relevant_organization = relevant_organization; + } +} diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/GrantRelation.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/GrantRelation.java new file mode 100644 index 0000000..f0662cc --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/GrantRelation.java @@ -0,0 +1,32 @@ + +package eu.dnetlib.dhp.oa.graph.dump.skgif.beans; + +import java.io.Serializable; +import java.util.List; + +import eu.dnetlib.dhp.skgif.model.MinGrant; + +/** + * @author miriam.baglioni + * @Date 16/03/24 + */ +public class GrantRelation implements Serializable { + private String resultId; + private java.util.List funding; + + public String getResultId() { + return resultId; + } + + public void setResultId(String resultId) { + this.resultId = resultId; + } + + public List getFunding() { + return funding; + } + + public void setFunding(List funding) { + this.funding = funding; + } +} diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/ProductsRelation.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/ProductsRelation.java new file mode 100644 index 0000000..b769cf5 --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/beans/ProductsRelation.java @@ -0,0 +1,32 @@ + +package eu.dnetlib.dhp.oa.graph.dump.skgif.beans; + +import java.io.Serializable; +import java.util.List; + +import eu.dnetlib.dhp.skgif.model.Relations; + +/** + * @author miriam.baglioni + * @Date 16/03/24 + */ +public class ProductsRelation implements Serializable { + private String resultId; + private java.util.List related_products; + + public String getResultId() { + return resultId; + } + + public void setResultId(String resultId) { + this.resultId = resultId; + } + + public List getRelated_products() { + return related_products; + } + + public void setRelated_products(List related_products) { + this.related_products = related_products; + } +} diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/dump_organization_parameters.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/dump_organization_parameters.json index 33ec33f..48e888b 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/dump_organization_parameters.json +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/dump_organization_parameters.json @@ -18,5 +18,11 @@ "paramLongName": "outputPath", "paramDescription": "the relationPath", "paramRequired": false + }, + { + "paramName": "wd", + "paramLongName": "workingDir", + "paramDescription": "the relationPath", + "paramRequired": false } ] \ No newline at end of file diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml index 0107cee..c703eb8 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml @@ -62,110 +62,12 @@ - + - - - - ${wf:conf('filter') eq "true"} - - - - - - - - - - - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/${sourcePath}/relation - ${nameNode}/${workingDir}/graph/relation - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/${sourcePath}/organization - ${nameNode}/${workingDir}/graph/organization - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/${sourcePath}/project - ${nameNode}/${workingDir}/graph/project - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/${sourcePath}/datasource - ${nameNode}/${workingDir}/graph/datasource - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/${sourcePath}/publication - ${nameNode}/${workingDir}/graph/publication - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/${sourcePath}/dataset - ${nameNode}/${workingDir}/graph/dataset - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/${sourcePath}/software - ${nameNode}/${workingDir}/graph/software - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/${sourcePath}/otherresearchproduct - ${nameNode}/${workingDir}/graph/otherresearchproduct - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn @@ -213,95 +115,46 @@ --conf spark.sql.shuffle.partitions=15000 --sourcePath${sourcePath} - - --workingDir/user/miriam.baglioni/oa/graph/dump/temp/graph/ + --workingDir${workingDir}/graph/ + --filterPath${filterPath} - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/user/miriam.baglioni/oa/graph/dump/temp/graph/publication - ${nameNode}/${workingDir}/graph/publication - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/user/miriam.baglioni/oa/graph/dump/temp/graph/dataset - ${nameNode}/${workingDir}/graph/dataset - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/user/miriam.baglioni/oa/graph/dump/temp/graph/otherresearchproduct - ${nameNode}/${workingDir}/graph/otherresearchproduct - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/user/miriam.baglioni/oa/graph/dump/temp/graph/software - ${nameNode}/${workingDir}/graph/software - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/user/miriam.baglioni/oa/graph/dump/temp/graph/datasource - ${nameNode}/${workingDir}/graph/datasource - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/user/miriam.baglioni/oa/graph/dump/temp/graph/project - ${nameNode}/${workingDir}/graph/project - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/user/miriam.baglioni/oa/graph/dump/temp/graph/organization - ${nameNode}/${workingDir}/graph/organization - - - - - - - ${jobTracker} - ${nameNode} - ${nameNode}/user/miriam.baglioni/oa/graph/dump/temp/graph/relation - ${nameNode}/${workingDir}/graph/relation - - + + + + + yarn + cluster + select relations + eu.dnetlib.dhp.oa.graph.dump.skgif.SelectRelation + dump-${projectVersion}.jar + + --executor-cores=4 + --executor-memory=4G + --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=5G + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + --conf spark.sql.shuffle.partitions=15000 + + + --sourcePath${sourcePath} + --relationPath${sourcePath}/relation + + --workingDir${workingDir}/ + + + + + + + yarn @@ -319,43 +172,44 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - - + --sourcePath${workingDir}/graph/ + --workingDir${workingDir}/ --outputPath${outputPath} - --workingDir/user/miriam.baglioni/oa/graph/dump/temp/working_dir/ - --sourcePath/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/ + + - - - - - - yarn - cluster - Dump table results - eu.dnetlib.dhp.oa.graph.dump.skgif.DumpResult - dump-${projectVersion}.jar - - --executor-cores=4 - --executor-memory=8G - --driver-memory=${sparkDriverMemory} - --conf spark.executor.memoryOverhead=5G - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --conf spark.sql.shuffle.partitions=15000 - - - --sourcePath/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/ - --outputPath${outputPath} - - --workingDir/user/miriam.baglioni/oa/graph/dump/temp/working_dir/ - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + yarn @@ -373,11 +227,11 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - - --sourcePath/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/ + --sourcePath${workingDir}/graph/ + --outputPath${outputPath} - - --workingDir/user/miriam.baglioni/oa/graph/dump/temp/working_dir/ + --workingDir${workingDir}/ + @@ -399,11 +253,11 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - - --sourcePath/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/ + --sourcePath${workingDir}/graph/ + --outputPath${outputPath} - - --workingDir/user/miriam.baglioni/oa/graph/dump/temp/working_dir/ + --workingDir${workingDir}/ + @@ -426,10 +280,10 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - + --sourcePath/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/ --outputPath${outputPath} - + --workingDir/user/miriam.baglioni/oa/graph/dump/temp/working_dir/ @@ -453,15 +307,44 @@ --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - + --sourcePath/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/ --outputPath${outputPath} - + --workingDir/user/miriam.baglioni/oa/graph/dump/temp/working_dir/ + + + yarn + cluster + Dump table results + eu.dnetlib.dhp.oa.graph.dump.skgif.DumpResearchProduct + dump-${projectVersion}.jar + + --executor-cores=4 + --executor-memory=4G + --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=5G + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + --conf spark.sql.shuffle.partitions=15000 + + --sourcePath${workingDir}/graph/ + + --outputPath${outputPath} + --workingDir${workingDir}/ + + + + + + \ No newline at end of file diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/select_relation_parameters.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/select_relation_parameters.json new file mode 100644 index 0000000..c4d2440 --- /dev/null +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/select_relation_parameters.json @@ -0,0 +1,27 @@ +[ + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + + { + "paramName": "ssm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false + }, + { + "paramName": "wd", + "paramLongName": "workingDir", + "paramDescription": "the relationPath", + "paramRequired": false + }, + { + "paramName": "rp", + "paramLongName": "relationPath", + "paramDescription": "the relationPath", + "paramRequired": false + } +] \ No newline at end of file diff --git a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrantTest.java b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrantTest.java index 6ad9ab8..0d63519 100644 --- a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrantTest.java +++ b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrantTest.java @@ -3,8 +3,10 @@ package eu.dnetlib.dhp.oa.graph.dump.skgif; import java.io.IOException; import java.io.Serializable; +import java.io.StringReader; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Optional; import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; @@ -13,6 +15,9 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.io.SAXReader; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -152,5 +157,51 @@ public class DumpGrantTest implements Serializable { @Test public void testDumpFunder() throws Exception { + String fundingtree = "" + + " " + + " ec__________::EC\n" + + " EC\n" + + " European Commission\n" + + " EU\n" + + " \n" + + " \n" + + " ec__________::EC::FP7::SP2::ERC\n" + + " ERC\n" + + " ERC\n" + + " ec:program\n" + + " \n" + + " \n" + + " ec__________::EC::FP7::SP2\n" + + " SP2-Ideas\n" + + " SP2\n" + + " ec:specificprogram\n" + + " \n" + + " \n" + + " ec__________::EC::FP7\n" + + " SEVENTH FRAMEWORK PROGRAMME\n" + + " FP7\n" + + " \n" + + " ec:frameworkprogram\n" + + " \n" + + " \n" + + " \n" + + " \n" + + " \n" + + " ".replace("\n", " "); + System.out.println(getFundingStream(fundingtree)); + + } + + private static String getFundingStream(String fundingtree) throws DocumentException { + final Document doc; + + doc = new SAXReader().read(new StringReader(fundingtree)); + if (Optional.ofNullable(doc.selectNodes("//funding_level_0")).isPresent() && + doc.selectNodes("//funding_level_0").size() > 0 && + Optional.ofNullable(doc.selectNodes("//funding_level_0/name")).isPresent() && + doc.selectNodes("//funding_level_0/name").size() > 0) + return ((org.dom4j.Node) (doc.selectNodes("//funding_level_0/name").get(0))).getText(); + return new String(); + } } diff --git a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResultTest.java b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResultTest.java index 83ec0db..c7fc774 100644 --- a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResultTest.java +++ b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResultTest.java @@ -867,4 +867,32 @@ public class DumpResultTest { } + @Test + public void testSelectRelation() throws Exception { + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/skgif/graphForAPIExample/") + .getPath(); + + final String workingDir = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/skgif/workingDirApiExample/") + .getPath(); + + SelectRelation + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", sourcePath, + "-workingDir", workingDir, + "-outputPath", workingDir + + }); + +// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); +// +// JavaRDD researchProduct = sc +// .textFile(workingDir.toString() + "ResearchProduct") +// .map(item -> OBJECT_MAPPER.readValue(item, ResearchProduct.class)); +// +// researchProduct.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));} + } }