From e8f19ad003ff40edc9f79c91e1e2bdea5b2d2bd3 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 13 Mar 2024 15:22:56 +0100 Subject: [PATCH] [SKG-IF] selection of subset of relevant results from the set provided via input --- .../dump/filterentities/FilterEntities.java | 93 ++++++++ .../dump/filterentities/Identifiers.java | 30 +++ .../SelectConnectedEntities.java | 215 ++++++++++++++++++ .../oa/graph/dump/skgif/DumpDatasource.java | 2 +- .../dhp/oa/graph/dump/skgif/DumpGrant.java | 2 +- .../oa/graph/dump/skgif/DumpOrganization.java | 2 +- .../dhp/oa/graph/dump/skgif/DumpResult.java | 2 +- .../dhp/oa/graph/dump/skgif/DumpVenue.java | 2 +- .../oa/graph/dump/skgif/EmitFromEntities.java | 2 +- .../dhp/oa/graph/dump/skgif/Utils.java | 2 +- .../dump/filter_entities_parameters.json | 26 +++ .../select_connected_entities_parameters.json | 26 +++ .../dump_datasource_parameters.json | 0 .../{ => skgif}/dump_grant_parameters.json | 0 .../dump_organization_parameters.json | 0 .../{ => skgif}/dump_result_parameters.json | 0 .../{ => skgif}/emit_biblio_parameters.json | 0 .../graph/dump/skgif/oozie_app/workflow.xml | 188 ++++++++++++++- .../oa/graph/dump/skgif/DumpGrantTest.java | 5 + 19 files changed, 578 insertions(+), 19 deletions(-) create mode 100644 dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/FilterEntities.java create mode 100644 dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/Identifiers.java create mode 100644 dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectConnectedEntities.java create mode 100644 dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/filter_entities_parameters.json create mode 100644 dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/select_connected_entities_parameters.json rename dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/{ => skgif}/dump_datasource_parameters.json (100%) rename dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/{ => skgif}/dump_grant_parameters.json (100%) rename dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/{ => skgif}/dump_organization_parameters.json (100%) rename dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/{ => skgif}/dump_result_parameters.json (100%) rename dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/{ => skgif}/emit_biblio_parameters.json (100%) diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/FilterEntities.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/FilterEntities.java new file mode 100644 index 0000000..e3c2723 --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/FilterEntities.java @@ -0,0 +1,93 @@ + +package eu.dnetlib.dhp.oa.graph.dump.filterentities; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.skgif.EmitFromEntities; +import eu.dnetlib.dhp.oa.graph.dump.skgif.Utils; +import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EmitPerManifestation; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Result; +import scala.Tuple2; + +/** + * @author miriam.baglioni + * @Date 12/03/24 + */ +public class FilterEntities implements Serializable { + private static final Logger log = LoggerFactory.getLogger(EmitFromEntities.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + FilterEntities.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/filter_entities_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String filterPath = parser.get("filterPath"); + log.info("filterPath: {}", filterPath); + + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + + filterEntities(spark, inputPath, filterPath, workingDir); + + }); + } + + private static void filterEntities(SparkSession spark, String inputPath, String filterPath, + String workingDir) { + ModelSupport.entityTypes.keySet().forEach(e -> { + if (ModelSupport.isResult(e)) { + Class resultClazz = ModelSupport.entityTypes.get(e); + + Dataset result = Utils + .readPath(spark, inputPath + e.name(), resultClazz); + + Dataset filterIds = spark.read().parquet(filterPath + e.name() + "_ids"); + + result + .joinWith(filterIds, result.col("id").equalTo(filterIds.col("id"))) + .map((MapFunction, R>) t2 -> t2._1(), Encoders.bean(resultClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir + e.name()); + } + + }); + } +} diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/Identifiers.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/Identifiers.java new file mode 100644 index 0000000..9633479 --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/Identifiers.java @@ -0,0 +1,30 @@ + +package eu.dnetlib.dhp.oa.graph.dump.filterentities; + +import java.io.Serializable; + +/** + * @author miriam.baglioni + * @Date 13/03/24 + */ +public class Identifiers implements Serializable { + + private String id; + private String CCL; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getCCL() { + return CCL; + } + + public void setCCL(String CCL) { + this.CCL = CCL; + } +} diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectConnectedEntities.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectConnectedEntities.java new file mode 100644 index 0000000..b191e7e --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectConnectedEntities.java @@ -0,0 +1,215 @@ + +package eu.dnetlib.dhp.oa.graph.dump.filterentities; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.skgif.EmitFromEntities; +import eu.dnetlib.dhp.oa.graph.dump.skgif.Utils; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import scala.Tuple2; + +/** + * @author miriam.baglioni + * @Date 12/03/24 + */ +public class SelectConnectedEntities implements Serializable { + private static final Logger log = LoggerFactory.getLogger(EmitFromEntities.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SelectConnectedEntities.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/select_connected_entities_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + final String filterPath = parser.get("filterPath"); + log.info("filterPath: {}", filterPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + + selectConnectedEntities(spark, inputPath, filterPath, workingDir); + + }); + } + + private static void selectConnectedEntities(SparkSession spark, String inputPath, + String filterPath, + String workingDir) { + + Dataset resultIds = spark.emptyDataset(Encoders.bean(Identifiers.class)); + for (EntityType entity : ModelSupport.entityTypes.keySet()) + if (ModelSupport.isResult(entity)) + resultIds = resultIds + .union( + spark + .read() + .parquet(filterPath + entity.name() + "_ids") + .as(Encoders.bean(Identifiers.class))); + + Dataset relation = Utils + .readPath(spark, inputPath + "relation", Relation.class) + .filter((FilterFunction) r -> !r.getDataInfo().getDeletedbyinference()); + Dataset organizations = Utils + .readPath(spark, inputPath + "organization", Organization.class) + .filter((FilterFunction) o -> !o.getDataInfo().getDeletedbyinference()); + Dataset projects = Utils + .readPath(spark, inputPath + "project", Project.class) + .filter((FilterFunction) p -> !p.getDataInfo().getDeletedbyinference()) + .filter( + (FilterFunction) p -> Optional.ofNullable(p.getFundingtree()).isPresent() && + p.getFundingtree().size() > 0 && + Utils + .getFunderName(p.getFundingtree().get(0).getValue()) + .equalsIgnoreCase("European Commission")); + Dataset datasources = Utils + .readPath(spark, inputPath + "datasource", Datasource.class) + .filter((FilterFunction) d -> !d.getDataInfo().getDeletedbyinference()); + + // select relations having source in the set of identifiers selected for eosc + Dataset resultSource = resultIds + .joinWith(relation, resultIds.col("id").equalTo(relation.col("source"))) + .map((MapFunction, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class)); + + // write relations having sorce and target in the set + resultIds + .joinWith(resultSource, resultIds.col("id").equalTo(resultSource.col("target"))) + .map((MapFunction, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(workingDir + "relation"); + + // write relations between results and organizations + resultSource + .joinWith(organizations, resultSource.col("target").equalTo(organizations.col("id"))) + .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(workingDir + "relation"); + + // write relations between results and projects + resultSource + .joinWith(projects, resultSource.col("target").equalTo(projects.col("id"))) + .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(workingDir + "relation"); + + // write organizations linked to results in the set + resultSource + .joinWith(organizations, resultSource.col("target").equalTo(organizations.col("id"))) + .map( + (MapFunction, Organization>) t2 -> t2._2(), + Encoders.bean(Organization.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir + "organization"); + + // write projects linked to results in the set + resultSource + .joinWith(projects, resultSource.col("target").equalTo(projects.col("id"))) + .map((MapFunction, Project>) t2 -> t2._2(), Encoders.bean(Project.class)) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(workingDir + "project"); + + // read the results and select all the distinct instance.hostedbykey + Dataset hostedbyIds = spark.emptyDataset(Encoders.STRING()); + for (EntityType entity : ModelSupport.entityTypes.keySet()) + if (ModelSupport.isResult(entity)) { + Class resultClazz = ModelSupport.entityTypes.get(entity); + hostedbyIds = hostedbyIds + .union( + Utils + .readPath(spark, workingDir + entity.name(), resultClazz) + .flatMap( + (FlatMapFunction) r -> r + .getInstance() + .stream() + .map(i -> i.getHostedby().getKey()) + .collect(Collectors.toList()) + .iterator(), + Encoders.STRING())); + } + // join with the datasources and write the datasource in the join + hostedbyIds + .joinWith(datasources, hostedbyIds.col("value").equalTo(datasources.col("id"))) + .map((MapFunction, Datasource>) t2 -> t2._2(), Encoders.bean(Datasource.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir + "datasource"); + + // selecting relations between organizations and projects in the selected set + Dataset organizationSbs = Utils.readPath(spark, workingDir + "organization", Organization.class); + Dataset projectSbs = Utils.readPath(spark, workingDir + "project", Project.class); + Dataset orgSourceRels = organizationSbs + .joinWith(relation, organizationSbs.col("id").equalTo(relation.col("source"))) + .map((MapFunction, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class)); + orgSourceRels + .joinWith(projectSbs, orgSourceRels.col("target").equalTo(projectSbs.col("id"))) + .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(workingDir + "relation"); + + // selecting relations between datasources and organizations in the selected set + Dataset datasourceSbs = Utils.readPath(spark, workingDir + "datasource", Datasource.class); + Dataset dsSourceRels = datasourceSbs + .joinWith(relation, datasourceSbs.col("id").equalTo(relation.col("source"))) + .map((MapFunction, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class)); + dsSourceRels + .joinWith(organizationSbs, dsSourceRels.col("target").equalTo(organizations.col("id"))) + .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(workingDir + "relation"); + + /** + * DATASOURCE_PROVIDED_BY_ORGANIZATION( + * "isProvidedBy"), + */ + + } +} diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpDatasource.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpDatasource.java index 25d0c74..3e3301d 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpDatasource.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpDatasource.java @@ -42,7 +42,7 @@ public class DumpDatasource implements Serializable { .toString( DumpDatasource.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/dump_datasource_parameters.json")); + "/eu/dnetlib/dhp/oa/graph/dump/skgif/dump_datasource_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrant.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrant.java index b603ee6..ca4339d 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrant.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrant.java @@ -45,7 +45,7 @@ public class DumpGrant implements Serializable { .toString( DumpGrant.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/dump_grant_parameters.json")); + "/eu/dnetlib/dhp/oa/graph/dump/skgif/dump_grant_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpOrganization.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpOrganization.java index 2746e3d..66901d1 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpOrganization.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpOrganization.java @@ -36,7 +36,7 @@ public class DumpOrganization implements Serializable { .toString( DumpOrganization.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/dump_organization_parameters.json")); + "/eu/dnetlib/dhp/oa/graph/dump/skgif/dump_organization_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResult.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResult.java index 34d4079..b0f862e 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResult.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResult.java @@ -43,7 +43,7 @@ public class DumpResult implements Serializable { .toString( DumpResult.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/dump_result_parameters.json")); + "/eu/dnetlib/dhp/oa/graph/dump/skgif/dump_result_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpVenue.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpVenue.java index 5a70156..eb206ad 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpVenue.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpVenue.java @@ -38,7 +38,7 @@ public class DumpVenue implements Serializable { .toString( DumpVenue.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/dump_datasource_parameters.json")); + "/eu/dnetlib/dhp/oa/graph/dump/skgif/dump_datasource_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromEntities.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromEntities.java index 8803272..212978f 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromEntities.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromEntities.java @@ -44,7 +44,7 @@ public class EmitFromEntities implements Serializable { .toString( EmitFromEntities.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/emit_biblio_parameters.json")); + "/eu/dnetlib/dhp/oa/graph/dump/skgif/emit_biblio_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/Utils.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/Utils.java index 0b7a4ea..7c9fb62 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/Utils.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/Utils.java @@ -123,7 +123,7 @@ public class Utils implements Serializable { mg.setLocal_identifier(p.getId()); if (Optional.ofNullable(p.getCode()).isPresent()) mg.setCode(p.getCode().getValue()); - if (Optional.ofNullable(p.getFundingtree()).isPresent()) + if (Optional.ofNullable(p.getFundingtree()).isPresent() && p.getFundingtree().size() > 0) mg.setFunder(getFunderName(p.getFundingtree().get(0).getValue())); if (Optional.ofNullable(p.getAcronym()).isPresent()) mg.setTitle(p.getAcronym().getValue()); diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/filter_entities_parameters.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/filter_entities_parameters.json new file mode 100644 index 0000000..f2a1bb4 --- /dev/null +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/filter_entities_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName": "fp", + "paramLongName": "filterPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + { + "paramName": "ssm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false + }, + { + "paramName": "wd", + "paramLongName": "workingDir", + "paramDescription": "the relationPath", + "paramRequired": false + } +] \ No newline at end of file diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/select_connected_entities_parameters.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/select_connected_entities_parameters.json new file mode 100644 index 0000000..f2a1bb4 --- /dev/null +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/select_connected_entities_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName": "fp", + "paramLongName": "filterPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + { + "paramName": "ssm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false + }, + { + "paramName": "wd", + "paramLongName": "workingDir", + "paramDescription": "the relationPath", + "paramRequired": false + } +] \ No newline at end of file diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/dump_datasource_parameters.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/dump_datasource_parameters.json similarity index 100% rename from dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/dump_datasource_parameters.json rename to dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/dump_datasource_parameters.json diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/dump_grant_parameters.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/dump_grant_parameters.json similarity index 100% rename from dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/dump_grant_parameters.json rename to dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/dump_grant_parameters.json diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/dump_organization_parameters.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/dump_organization_parameters.json similarity index 100% rename from dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/dump_organization_parameters.json rename to dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/dump_organization_parameters.json diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/dump_result_parameters.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/dump_result_parameters.json similarity index 100% rename from dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/dump_result_parameters.json rename to dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/dump_result_parameters.json diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/emit_biblio_parameters.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/emit_biblio_parameters.json similarity index 100% rename from dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/emit_biblio_parameters.json rename to dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/emit_biblio_parameters.json diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml index 705b8cc..04ace0a 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml @@ -62,15 +62,178 @@ - + + + + + + ${wf:conf('filter') eq "true"} + + + + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + ${nameNode}/${sourcePath}/relation + ${nameNode}/${workingDir}/graph/relation + + + + + + + ${jobTracker} + ${nameNode} + ${nameNode}/${sourcePath}/organization + ${nameNode}/${workingDir}/graph/organization + + + + + + + ${jobTracker} + ${nameNode} + ${nameNode}/${sourcePath}/project + ${nameNode}/${workingDir}/graph/project + + + + + + + ${jobTracker} + ${nameNode} + ${nameNode}/${sourcePath}/datasource + ${nameNode}/${workingDir}/graph/datasource + + + + + + + ${jobTracker} + ${nameNode} + ${nameNode}/${sourcePath}/publication + ${nameNode}/${workingDir}/graph/publication + + + + + + + ${jobTracker} + ${nameNode} + ${nameNode}/${sourcePath}/dataset + ${nameNode}/${workingDir}/graph/dataset + + + + + + + ${jobTracker} + ${nameNode} + ${nameNode}/${sourcePath}/software + ${nameNode}/${workingDir}/graph/software + + + + + + + ${jobTracker} + ${nameNode} + ${nameNode}/${sourcePath}/otherresearchproduct + ${nameNode}/${workingDir}/graph/otherresearchproduct + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + yarn + cluster + Selecting subset of results + eu.dnetlib.dhp.oa.graph.dump.filterentities.FilterEntities + dump-${projectVersion}.jar + + --executor-cores=4 + --executor-memory=4G + --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=5G + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + --conf spark.sql.shuffle.partitions=15000 + + --sourcePath${sourcePath} + --workingDir${workingDir}/graph/ + --filterPath${filterPath} + + + + + + + yarn + cluster + Selecting relevant linked relations and entities + eu.dnetlib.dhp.oa.graph.dump.filterentities.SelectConnectedEntities + dump-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath} + + --workingDir${nameNode}/user/miriam.baglioni/oa/graph/dump/temp/graph/ + --filterPath${filterPath} + + + + + + + + ${jobTracker} + ${nameNode} + ${nameNode}/user/miriam.baglioni/oa/graph/dump/temp/graph/* + ${nameNode}/${workingDir}/graph/ + + + + yarn cluster - Extraction + Emit from Result eu.dnetlib.dhp.oa.graph.dump.skgif.EmitFromEntities dump-${projectVersion}.jar @@ -83,7 +246,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --sourcePath${sourcePath} + --sourcePath${workingDir}/graph/ --workingDir${workingDir}/ --outputPath${outputPath} @@ -94,7 +257,7 @@ yarn cluster - Dump table project + Dump table results eu.dnetlib.dhp.oa.graph.dump.skgif.DumpResult dump-${projectVersion}.jar @@ -107,7 +270,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --sourcePath${sourcePath} + --sourcePath${workingDir}/graph/ --outputPath${outputPath} --workingDir${workingDir}/ @@ -118,7 +281,7 @@ yarn cluster - Dump table project + Dump table datasource eu.dnetlib.dhp.oa.graph.dump.skgif.DumpDatasource dump-${projectVersion}.jar @@ -131,7 +294,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --sourcePath${sourcePath} + --sourcePath${workingDir}/graph/ --outputPath${outputPath} --workingDir${workingDir}/ @@ -142,7 +305,7 @@ yarn cluster - Dump table project + Dump table venues eu.dnetlib.dhp.oa.graph.dump.skgif.DumpVenue dump-${projectVersion}.jar @@ -155,7 +318,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --sourcePath${sourcePath} + --sourcePath${workingDir}/graph/ --outputPath${outputPath} --workingDir${workingDir}/ @@ -167,7 +330,7 @@ yarn cluster - Dump table project + Dump table organization eu.dnetlib.dhp.oa.graph.dump.skgif.DumpOrganization dump-${projectVersion}.jar @@ -180,7 +343,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --sourcePath${sourcePath} + --sourcePath${workingDir}/graph/ --outputPath${outputPath} --workingDir${workingDir}/ @@ -203,8 +366,9 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + - --sourcePath${sourcePath} + --sourcePath${workingDir}/graph/ --outputPath${outputPath} --workingDir${workingDir}/ diff --git a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrantTest.java b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrantTest.java index 4aeb45b..6ad9ab8 100644 --- a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrantTest.java +++ b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpGrantTest.java @@ -148,4 +148,9 @@ public class DumpGrantTest implements Serializable { grant.foreach(g -> System.out.println(OBJECT_MAPPER.writeValueAsString(g))); } + + @Test + public void testDumpFunder() throws Exception { + + } }