From 64511dad2c008944712df9554e7940fcd8a040b9 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 23 Jan 2023 16:08:50 +0200 Subject: [PATCH] adding relations to the dump --- .../eu/dnetlib/dhp/eosc/model/Relation.java | 78 +++++++++ ...ExtendEoscResultWithOrganizationStep2.java | 6 +- .../dump/eosc/SelectEoscResultsJobStep1.java | 1 - .../oa/graph/dump/eosc/SparkDumpRelation.java | 122 ++++++++++++++ .../graph/dump/eosc/SparkSelectRelation.java | 155 +++++++++--------- .../dump/eoscdump/oozie_app/workflow.xml | 27 ++- .../dump/projectsubset/oozie_app/workflow.xml | 2 +- .../graph/dump/wf/main/oozie_app/workflow.xml | 4 +- .../community/oozie_app/workflow.xml | 10 +- .../funder/oozie_app/workflow.xml | 2 +- 10 files changed, 315 insertions(+), 92 deletions(-) create mode 100644 dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Relation.java create mode 100644 dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkDumpRelation.java diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Relation.java b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Relation.java new file mode 100644 index 0000000..0cfb774 --- /dev/null +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Relation.java @@ -0,0 +1,78 @@ + +package eu.dnetlib.dhp.eosc.model; + +import java.io.Serializable; +import java.util.Objects; + +import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema; + +import eu.dnetlib.dhp.oa.model.Provenance; +import eu.dnetlib.dhp.oa.model.graph.Node; +import eu.dnetlib.dhp.oa.model.graph.RelType; + +/** + * To represent the gereric relation between two entities. It has the following parameters: - private Node source to + * represent the entity source of the relation - private Node target to represent the entity target of the relation - + * private RelType reltype to represent the semantics of the relation - private Provenance provenance to represent the + * provenance of the relation + */ +public class Relation implements Serializable { + @JsonSchema(description = "The node source in the relation") + private String source; + + @JsonSchema(description = "The node target in the relation") + private String target; + + @JsonSchema(description = "To represent the semantics of a relation between two entities") + private RelType reltype; + + @JsonSchema(description = "The reason why OpenAIRE holds the relation ") + private Provenance provenance; + + public String getSource() { + return source; + } + + public void setSource(String source) { + this.source = source; + } + + public String getTarget() { + return target; + } + + public void setTarget(String target) { + this.target = target; + } + + public RelType getReltype() { + return reltype; + } + + public void setReltype(RelType reltype) { + this.reltype = reltype; + } + + public Provenance getProvenance() { + return provenance; + } + + public void setProvenance(Provenance provenance) { + this.provenance = provenance; + } + + @Override + public int hashCode() { + + return Objects.hash(source, target, reltype.getType() + ":" + reltype.getName()); + } + + public static Relation newInstance(String source, String target, RelType reltype, Provenance provenance) { + Relation relation = new Relation(); + relation.source = source; + relation.target = target; + relation.reltype = reltype; + relation.provenance = provenance; + return relation; + } +} diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java index 9ca1f2f..6f785d2 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java @@ -93,7 +93,11 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { rOrg.setResultId(t2._1().getTarget()); eu.dnetlib.dhp.eosc.model.Organization org = new eu.dnetlib.dhp.eosc.model.Organization(); org.setId(t2._2().getId()); - org.setName(t2._2().getLegalname().getValue()); + if (Optional.ofNullable(t2._2().getLegalname()).isPresent()) { + org.setName(t2._2().getLegalname().getValue()); + } else { + org.setName(""); + } HashMap> organizationPids = new HashMap<>(); if (Optional.ofNullable(t2._2().getPid()).isPresent()) t2._2().getPid().forEach(p -> { diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResultsJobStep1.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResultsJobStep1.java index 04f4c5a..ee9e1e5 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResultsJobStep1.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResultsJobStep1.java @@ -22,7 +22,6 @@ import eu.dnetlib.dhp.oa.graph.dump.Constants; import eu.dnetlib.dhp.oa.graph.dump.ResultMapper; import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; -import eu.dnetlib.dhp.oa.model.graph.GraphResult; import eu.dnetlib.dhp.schema.oaf.Result; /** diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkDumpRelation.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkDumpRelation.java new file mode 100644 index 0000000..fd1a5fc --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkDumpRelation.java @@ -0,0 +1,122 @@ + +package eu.dnetlib.dhp.oa.graph.dump.eosc; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.oa.model.Provenance; +import eu.dnetlib.dhp.oa.model.graph.Node; +import eu.dnetlib.dhp.oa.model.graph.RelType; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Relation; + +/** + * @author miriam.baglioni + * @Date 12/01/23 + */ +public class SparkDumpRelation implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkDumpRelation.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkDumpRelation.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/input_relationdump_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + dumpRelation(spark, inputPath, outputPath); + + }); + + } + + private static void dumpRelation(SparkSession spark, String inputPath, String outputPath) { + Dataset relations = Utils.readPath(spark, inputPath, Relation.class); + relations + + .map((MapFunction) relation -> { + eu.dnetlib.dhp.eosc.model.Relation relNew = new eu.dnetlib.dhp.eosc.model.Relation(); + relNew + .setSource( + + relation.getSource()); + + relNew + .setTarget( + + relation.getTarget()); + + relNew + .setReltype( + RelType + .newInstance( + relation.getRelClass(), + relation.getSubRelType())); + + Optional odInfo = Optional.ofNullable(relation.getDataInfo()); + if (odInfo.isPresent()) { + DataInfo dInfo = odInfo.get(); + if (Optional.ofNullable(dInfo.getProvenanceaction()).isPresent() && + Optional.ofNullable(dInfo.getProvenanceaction().getClassname()).isPresent()) { + relNew + .setProvenance( + Provenance + .newInstance( + dInfo.getProvenanceaction().getClassname(), + dInfo.getTrust())); + } + } + + return relNew; + + }, Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Append) + .json(outputPath); + + } + +} diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkSelectRelation.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkSelectRelation.java index aee49dd..b4edf47 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkSelectRelation.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkSelectRelation.java @@ -1,10 +1,11 @@ + package eu.dnetlib.dhp.oa.graph.dump.eosc; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.eosc.model.EoscResult; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.*; + import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -17,103 +18,101 @@ import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.eosc.model.EoscResult; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; -import java.io.Serializable; -import java.util.*; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - /** * @author miriam.baglioni * @Date 12/01/23 */ -public class SparkSelectRelation Serializable { - private static final Logger log = LoggerFactory.getLogger(SparkSelectRelation.class); +public class SparkSelectRelation implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkSelectRelation.class); - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkSelectRelation.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/input_relationdump_parameters.json")); + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkSelectRelation.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/input_relationdump_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - Optional rs = Optional.ofNullable(parser.get("removeSet")); - final Set removeSet = new HashSet<>(); - if (rs.isPresent()) { - Collections.addAll(removeSet, rs.get().split(";")); - } + Optional rs = Optional.ofNullable(parser.get("removeSet")); + final Set removeSet = new HashSet<>(); + if (rs.isPresent()) { + Collections.addAll(removeSet, rs.get().split(";")); + } - SparkConf conf = new SparkConf(); + SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - selectSubset(spark, inputPath, outputPath, removeSet); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + selectSubset(spark, inputPath, outputPath, removeSet); - }); + }); - } + } - private static void selectSubset(SparkSession spark, String inputPath, String outputPath, Set removeSet) { - Dataset relation = Utils - .readPath(spark, inputPath + "/relation", Relation.class) - .filter( - (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() - && !removeSet.contains(r.getRelClass())); + private static void selectSubset(SparkSession spark, String inputPath, String outputPath, Set removeSet) { + Dataset relation = Utils + .readPath(spark, inputPath + "/relation", Relation.class) + .filter( + (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() + && !removeSet.contains(r.getRelClass())); - Dataset resultIds = Utils - .readPath(spark, outputPath + "/publication", EoscResult.class) + Dataset resultIds = Utils + .readPath(spark, outputPath + "/publication", EoscResult.class) - .map((MapFunction) p -> p.getId(), Encoders.STRING()) - .union( - Utils - .readPath(spark, outputPath + "/dataset", EoscResult.class) + .map((MapFunction) p -> p.getId(), Encoders.STRING()) + .union( + Utils + .readPath(spark, outputPath + "/dataset", EoscResult.class) - .map((MapFunction) d -> d.getId(), Encoders.STRING())) - .union( - Utils - .readPath(spark, outputPath + "/software", EoscResult.class) + .map((MapFunction) d -> d.getId(), Encoders.STRING())) + .union( + Utils + .readPath(spark, outputPath + "/software", EoscResult.class) - .map((MapFunction) s -> s.getId(), Encoders.STRING())) - .union( - Utils - .readPath(spark, outputPath + "/otherresearchproduct", EoscResult.class) + .map((MapFunction) s -> s.getId(), Encoders.STRING())) + .union( + Utils + .readPath(spark, outputPath + "/otherresearchproduct", EoscResult.class) - .map((MapFunction) o -> o.getId(), Encoders.STRING())); + .map((MapFunction) o -> o.getId(), Encoders.STRING())); - // select result -> result relations - Dataset relResultResult = relation - .joinWith(resultIds, relation.col("source").equalTo(resultIds.col("value"))) - .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)); + // select result -> result relations + Dataset relResultResult = relation + .joinWith(resultIds, relation.col("source").equalTo(resultIds.col("value"))) + .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)); - relResultResult - .joinWith(resultIds, relResultResult.col("target").equalTo(resultIds.col("value"))) - .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)) - .write() - .option("compression", "gzip") - .mode(SaveMode.Overwrite) - .json(outputPath + "/relation"); + relResultResult + .joinWith(resultIds, relResultResult.col("target").equalTo(resultIds.col("value"))) + .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath + "/relation"); - - - } + } } - diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eoscdump/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eoscdump/oozie_app/workflow.xml index 1967c8d..452c3c0 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eoscdump/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eoscdump/oozie_app/workflow.xml @@ -568,13 +568,34 @@ --sourcePath${sourcePath} --outputPath${workingDir}/dump - --preparedInfoPath${workingDir}/preparedInfo - --dumpTypeeosc + --removeSet${removeSet} - + + + yarn + cluster + Select the set of relations between the results in the selected set + eu.dnetlib.dhp.oa.graph.dump.eosc.SparkDumpRelation + dump-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${workingDir}/dump/relation + --outputPath${workingDir}/tar/relation + + + + eu.dnetlib.dhp.oa.graph.dump.MakeTar diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/oozie_app/workflow.xml index 620f761..bfb443e 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/oozie_app/workflow.xml @@ -144,7 +144,7 @@ - eu.dnetlib.dhp.oa.graph.dump.MakeTar + eu.dnetlib.dhp.oa.graph.dump.eosc.MakeTar --hdfsPath${outputPath} --nameNode${nameNode} --sourcePath${workingDir}/tar diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/main/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/main/oozie_app/workflow.xml index 2a612de..5b57282 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/main/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/main/oozie_app/workflow.xml @@ -158,7 +158,7 @@ - eu.dnetlib.dhp.oa.graph.dump.SaveCommunityMap + eu.dnetlib.dhp.oa.graph.dump.eosc.SaveCommunityMap --outputPath${workingDir}/communityMap --nameNode${nameNode} --isLookUpUrl${isLookUpUrl} @@ -269,7 +269,7 @@ - eu.dnetlib.dhp.oa.graph.dump.MakeTar + eu.dnetlib.dhp.oa.graph.dump.eosc.MakeTar --hdfsPath${outputPath} --nameNode${nameNode} --sourcePath${workingDir}/tar diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/community/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/community/oozie_app/workflow.xml index a39980e..fdaf099 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/community/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/community/oozie_app/workflow.xml @@ -202,7 +202,7 @@ yarn cluster Prepare association result subset of project info - eu.dnetlib.dhp.oa.graph.dump.community.SparkPrepareResultProject + eu.dnetlib.dhp.oa.graph.dump.eosc.SparkPrepareResultProject dump-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -233,7 +233,7 @@ yarn cluster Extend dumped publications with information about project - eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo + eu.dnetlib.dhp.oa.graph.dump.eosc.SparkUpdateProjectInfo dump-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -258,7 +258,7 @@ yarn cluster Extend dumped dataset with information about project - eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo + eu.dnetlib.dhp.oa.graph.dump.eosc.SparkUpdateProjectInfo dump-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -283,7 +283,7 @@ yarn cluster Extend dumped ORP with information about project - eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo + eu.dnetlib.dhp.oa.graph.dump.eosc.SparkUpdateProjectInfo dump-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -308,7 +308,7 @@ yarn cluster Extend dumped software with information about project - eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo + eu.dnetlib.dhp.oa.graph.dump.eosc.SparkUpdateProjectInfo dump-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/funder/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/funder/oozie_app/workflow.xml index 75124cf..d4c4cd7 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/funder/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/funder/oozie_app/workflow.xml @@ -88,7 +88,7 @@ yarn cluster Prepare association result subset of project info - eu.dnetlib.dhp.oa.graph.dump.community.SparkPrepareResultProject + eu.dnetlib.dhp.oa.graph.dump.eosc.SparkPrepareResultProject dump-${projectVersion}.jar --executor-memory=${sparkExecutorMemory}