From d5420960d1aa4bc6c5d0395d94ef95a707276e18 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 12 Jan 2023 18:44:37 +0100 Subject: [PATCH] extended code to select the relations between the set of results in eosc. Added also the related step in the workflow --- .../graph/dump/eosc/SparkSelectRelation.java | 119 ++++++++++++++++++ .../dump/eoscdump/oozie_app/workflow.xml | 28 ++++- 2 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkSelectRelation.java diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkSelectRelation.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkSelectRelation.java new file mode 100644 index 0000000..aee49dd --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkSelectRelation.java @@ -0,0 +1,119 @@ +package eu.dnetlib.dhp.oa.graph.dump.eosc; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.eosc.model.EoscResult; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.Serializable; +import java.util.*; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +/** + * @author miriam.baglioni + * @Date 12/01/23 + */ +public class SparkSelectRelation Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkSelectRelation.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkSelectRelation.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/input_relationdump_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + Optional rs = Optional.ofNullable(parser.get("removeSet")); + final Set removeSet = new HashSet<>(); + if (rs.isPresent()) { + Collections.addAll(removeSet, rs.get().split(";")); + } + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + selectSubset(spark, inputPath, outputPath, removeSet); + + }); + + } + + private static void selectSubset(SparkSession spark, String inputPath, String outputPath, Set removeSet) { + Dataset relation = Utils + .readPath(spark, inputPath + "/relation", Relation.class) + .filter( + (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() + && !removeSet.contains(r.getRelClass())); + + Dataset resultIds = Utils + .readPath(spark, outputPath + "/publication", EoscResult.class) + + .map((MapFunction) p -> p.getId(), Encoders.STRING()) + .union( + Utils + .readPath(spark, outputPath + "/dataset", EoscResult.class) + + .map((MapFunction) d -> d.getId(), Encoders.STRING())) + .union( + Utils + .readPath(spark, outputPath + "/software", EoscResult.class) + + .map((MapFunction) s -> s.getId(), Encoders.STRING())) + .union( + Utils + .readPath(spark, outputPath + "/otherresearchproduct", EoscResult.class) + + .map((MapFunction) o -> o.getId(), Encoders.STRING())); + + // select result -> result relations + Dataset relResultResult = relation + .joinWith(resultIds, relation.col("source").equalTo(resultIds.col("value"))) + .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)); + + relResultResult + .joinWith(resultIds, relResultResult.col("target").equalTo(resultIds.col("value"))) + .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath + "/relation"); + + + + } + +} + diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eoscdump/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eoscdump/oozie_app/workflow.xml index c133ff9..1967c8d 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eoscdump/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eoscdump/oozie_app/workflow.xml @@ -547,7 +547,33 @@ - + + + + + yarn + cluster + Select the set of relations between the results in the selected set + eu.dnetlib.dhp.oa.graph.dump.eosc.SparkSelectRelation + dump-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath} + --outputPath${workingDir}/dump + --preparedInfoPath${workingDir}/preparedInfo + --dumpTypeeosc + + + +