From 87370338bbfe98334244bdafcd85e5cec94b9c35 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 8 Apr 2024 10:26:48 +0200 Subject: [PATCH] [SKG-IF-EOSC] added possibility to filter for LOT1 constraints and results related to specific communities --- .../SelectCommunityEntities.java | 88 ++++++++++++ .../filterentities/SelectEOSCEntities.java | 13 -- .../filterentities/SelectLOT1Entities.java | 133 ++++++++++++++++++ .../dump/skgif/eosc_entities_parameters.json | 7 +- .../graph/dump/skgif/oozie_app/workflow.xml | 58 +++++++- 5 files changed, 283 insertions(+), 16 deletions(-) create mode 100644 dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectCommunityEntities.java create mode 100644 dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectLOT1Entities.java diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectCommunityEntities.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectCommunityEntities.java new file mode 100644 index 0000000..6593e5c --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectCommunityEntities.java @@ -0,0 +1,88 @@ + +package eu.dnetlib.dhp.oa.graph.dump.filterentities; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Result; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +/** + * @author miriam.baglioni + * @Date 20/03/24 + */ +public class SelectCommunityEntities implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SelectCommunityEntities.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + FilterEntities.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/skgif/eosc_entities_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String filterPath = parser.get("filterPath"); + log.info("filterPath: {}", filterPath); + + final String communityId = parser.get("communityId"); + log.info("communityId: {}", communityId); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + + selectEntities(spark, inputPath, filterPath, communityId); + + }); + } + + private static void selectEntities(SparkSession spark, String inputPath, String filterPath, String communityId) { + ModelSupport.entityTypes.keySet().forEach(e -> { + if (ModelSupport.isResult(e)) { + + spark + .read() + .schema(Encoders.bean(Result.class).schema()) + .json(inputPath + e.name()) + .where("datainfo.deletedbyinference != true and datainfo.invisible != true") + .select("id", "context") + .where("array_contains(context.id,'"+communityId+"')") + .drop("context") + .distinct() + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .parquet(filterPath + e.name() + "_ids"); + + } + + }); + } + +} diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectEOSCEntities.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectEOSCEntities.java index 72ed5e7..9e9fcef 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectEOSCEntities.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectEOSCEntities.java @@ -28,7 +28,6 @@ import scala.Tuple2; */ public class SelectEOSCEntities implements Serializable { private static final Logger log = LoggerFactory.getLogger(SelectEOSCEntities.class); - private static final String B2FIND_IDENTIFIER = "10|re3data_____::730f562f9efe8a3b3742d2da510d4335"; public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils @@ -68,17 +67,6 @@ public class SelectEOSCEntities implements Serializable { private static void selectEntities(SparkSession spark, String inputPath, String filterPath) { ModelSupport.entityTypes.keySet().forEach(e -> { if (ModelSupport.isResult(e)) { -// Utils -// .readPath(spark, inputPath + e.name(), ModelSupport.entityTypes.get(e)) -// .filter( -// (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() -// && !r.getDataInfo().getInvisible() -// && (r.getContext().stream().anyMatch(c -> c.getId().equals("eosc")) || -// r -// .getCollectedfrom() -// .stream() -// .anyMatch(cf -> cf.getValue().equalsIgnoreCase("B2FIND")))) -// .map((MapFunction) r -> r.getId(), Encoders.STRING()) spark .read() @@ -94,7 +82,6 @@ public class SelectEOSCEntities implements Serializable { .option("compression", "gzip") .parquet(filterPath + e.name() + "_ids"); -// } }); diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectLOT1Entities.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectLOT1Entities.java new file mode 100644 index 0000000..4e756ca --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectLOT1Entities.java @@ -0,0 +1,133 @@ + +package eu.dnetlib.dhp.oa.graph.dump.filterentities; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static org.apache.spark.sql.functions.expr; +import static org.apache.spark.sql.functions.max; +import static org.apache.spark.sql.functions.col; + +import java.io.Serializable; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +/** + * @author miriam.baglioni + * @Date 20/03/24 + */ +public class SelectLOT1Entities implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SelectLOT1Entities.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + FilterEntities.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/skgif/eosc_entities_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String filterPath = parser.get("filterPath"); + log.info("filterPath: {}", filterPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + + selectEntities(spark, inputPath, filterPath); + + }); + } + + private static void selectEntities(SparkSession spark, String inputPath, String filterPath) { + selectPublications(spark,inputPath,filterPath); + selectDataset(spark,inputPath,filterPath); + selectSoftware(spark,inputPath,filterPath); + selectOthers(spark,inputPath,filterPath); + + } + + private static void selectOthers(SparkSession spark, String inputPath, String filterPath) { + spark.read().schema(Encoders.bean(OtherResearchProduct.class).schema()) + .json(inputPath + "otherresearchproduct") + .where("datainfo.deletedbyinference != true AND datainfo.invisible != true") + .selectExpr("id", "instance", "explode(pid) as pid").where("pid.qualifier.classid IN ('doi', 'handle')") // filter by pid type + .selectExpr("id", "explode(instance) as instance") + .withColumn("CCL", expr("CASE WHEN instance.license.value LIKE 'CC%' OR instance.license.value LIKE '%/creativecommons.org/%' THEN 1 ELSE 0 END")) + .groupBy("id") + .agg(max(col("CCL")).as("CCL")) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .parquet(filterPath + "otherresearchproduct_ids"); + } + + private static void selectSoftware(SparkSession spark, String inputPath, String filterPath) { + spark.read().schema(Encoders.bean(Software.class).schema()) + .json(inputPath + "software") + .where("datainfo.deletedbyinference != true AND datainfo.invisible != true") + .selectExpr("id", "instance", "explode(pid) as pid").where("pid.qualifier.classid IN ('doi', 'swhid')") // filter by pid type + .selectExpr("id", "explode(instance) as instance") + .withColumn("CCL", expr("CASE WHEN instance.license.value LIKE 'CC%' OR instance.license.value LIKE '%/creativecommons.org/%' THEN 1 ELSE 0 END")) + .groupBy("id") + .agg(max(col("CCL")).as("CCL")) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .parquet(filterPath + "software_ids"); + } + + private static void selectDataset(SparkSession spark, String inputPath, String filterPath) { + spark.read().schema(Encoders.bean(Dataset.class).schema()) + .json(inputPath + "dataset") + .where("datainfo.deletedbyinference != true AND datainfo.invisible != true") + .selectExpr("id", "instance", "explode(pid) as pid").where("pid.qualifier.classid IN ('doi', 'handle', 'pdb', 'ena', 'uniprot')") // filter by pid type + .selectExpr("id", "explode(instance) as instance") + .withColumn("CCL", expr("CASE WHEN instance.license.value LIKE 'CC%' OR instance.license.value LIKE '%/creativecommons.org/%' THEN 1 ELSE 0 END")) + .groupBy("id") + .agg(max(col("CCL")).as("CCL")) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .parquet(filterPath + "dataset_ids"); + } + + private static void selectPublications(SparkSession spark, String inputPath, String filterPath) { + spark.read().schema(Encoders.bean(Publication.class).schema()) + .json(inputPath + "publication") + .where("datainfo.deletedbyinference != true AND datainfo.invisible != true") + .selectExpr("id", "instance", "explode(pid) as pid").where("pid.qualifier.classid IN ('doi', 'arXiv', 'pmid', 'handle')") // filter by pid type + .selectExpr("id", "explode(instance) as instance").where("instance.instancetype.classname IN('Book', 'Article', 'Journal', 'Data Paper', 'Software Paper', 'Preprint', 'Part of book or chapter of book', 'Thesis', 'Master thesis', 'Bachelor thesis', 'Doctoral thesis', 'Conference object', 'Research', 'Other literature type')") + .withColumn("CCL", expr("CASE WHEN instance.license.value LIKE 'CC%' OR instance.license.value LIKE '%/creativecommons.org/%' THEN 1 ELSE 0 END")) + .groupBy("id") + .agg(max(col("CCL")).as("CCL")) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .parquet(filterPath + "publication_ids"); + } + +} diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/eosc_entities_parameters.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/eosc_entities_parameters.json index 6194cc3..1985774 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/eosc_entities_parameters.json +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/eosc_entities_parameters.json @@ -16,5 +16,10 @@ "paramLongName": "isSparkSessionManaged", "paramDescription": "true if the spark session is managed, false otherwise", "paramRequired": false - } + },{ + "paramName": "ci", + "paramLongName": "communityId", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false +} ] \ No newline at end of file diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml index 1a0f489..c850095 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml @@ -77,7 +77,9 @@ - ${wf:conf('filter') eq true} + ${wf:conf('filter') eq 'EOSC'} + ${wf:conf('filter') eq 'LOT1'} + ${wf:conf('filter') eq 'Community'} @@ -113,7 +115,59 @@ - + + + yarn + cluster + Selecting graph results ids relevant for LOT1 + eu.dnetlib.dhp.oa.graph.dump.filterentities.SelectLOT1Entities + dump-${projectVersion}.jar + + --executor-cores=4 + --executor-memory=4G + --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=5G + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + --conf spark.sql.shuffle.partitions=15000 + + --sourcePath${sourcePath} + --filterPath${filterPath}/eoscIds/ + + + + + + + yarn + cluster + Selecting graph results ids relevant for Communities + eu.dnetlib.dhp.oa.graph.dump.filterentities.SelectCommunityEntities + dump-${projectVersion}.jar + + --executor-cores=4 + --executor-memory=4G + --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=5G + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + --conf spark.sql.shuffle.partitions=15000 + + --sourcePath${sourcePath} + --filterPath${filterPath}/eoscIds/ + --communityId${communityId} + + + + yarn