From 3adedd0a681e8a352fe96e850eed78ad227b210a Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Fri, 17 Jul 2020 11:58:11 +0200 Subject: [PATCH 1/9] trust truncated to 3 decimals --- .../dhp/oa/graph/raw/MigrateDbEntitiesApplication.java | 7 +++++-- .../graph/raw/datasourceorganization_resultset_entry.json | 4 ++-- .../dhp/oa/graph/raw/datasources_resultset_entry.json | 4 ++-- .../dhp/oa/graph/raw/organizations_resultset_entry.json | 4 ++-- .../oa/graph/raw/projectorganization_resultset_entry.json | 4 ++-- .../dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json | 4 ++-- 6 files changed, 15 insertions(+), 12 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index da2ba4723..3a6ab25f0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -496,9 +496,12 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i final Boolean deletedbyinference = rs.getBoolean("deletedbyinference"); final String inferenceprovenance = rs.getString("inferenceprovenance"); final Boolean inferred = rs.getBoolean("inferred"); - final String trust = rs.getString("trust"); + + final double trust = rs.getDouble("trust"); + return dataInfo( - deletedbyinference, inferenceprovenance, inferred, false, ENTITYREGISTRY_PROVENANCE_ACTION, trust); + deletedbyinference, inferenceprovenance, inferred, false, ENTITYREGISTRY_PROVENANCE_ACTION, + String.format("%.3f", trust)); } private Qualifier prepareQualifierSplitting(final String s) { diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasourceorganization_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasourceorganization_resultset_entry.json index 2baf7c8f1..06b0d483b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasourceorganization_resultset_entry.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasourceorganization_resultset_entry.json @@ -31,8 +31,8 @@ }, { "field": "trust", - "type": "string", - "value": "0.9" + "type": "double", + "value": 0.9 }, { "field": "inferenceprovenance", diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasources_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasources_resultset_entry.json index 0f1da7095..23809bb85 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasources_resultset_entry.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasources_resultset_entry.json @@ -114,8 +114,8 @@ }, { "field": "trust", - "type": "string", - "value": "0.9" + "type": "double", + "value": 0.9 }, { "field": "inferenceprovenance", diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/organizations_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/organizations_resultset_entry.json index 38657a1e1..811a9079f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/organizations_resultset_entry.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/organizations_resultset_entry.json @@ -96,8 +96,8 @@ }, { "field": "trust", - "type": "string", - "value": "0.9" + "type": "double", + "value": 0.9 }, { "field": "inferenceprovenance", diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projectorganization_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projectorganization_resultset_entry.json index 4311086e7..a3305926d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projectorganization_resultset_entry.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projectorganization_resultset_entry.json @@ -41,8 +41,8 @@ }, { "field": "trust", - "type": "string", - "value": "0.9" + "type": "double", + "value": 0.9 }, { "field": "inferenceprovenance", diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json index d6109cac1..37480fc94 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json @@ -86,8 +86,8 @@ }, { "field": "trust", - "type": "string", - "value": "0.9" + "type": "double", + "value": 0.9 }, { "field": "inferenceprovenance", From 13e36a4da0297cc6186e9e014cdec7c0e59a284d Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 13 Nov 2020 10:05:02 +0100 Subject: [PATCH 2/9] WIP: added oozie workflow for grouping graph entities by id --- .../dhp/schema/oaf/OafMapperUtils.java | 2 + .../dhp/schema/oaf/ResultTypeComparator.java | 2 + .../groupbyid/DispatchEntitiesSparkJob.java | 87 ++++++ .../GroupEntitiesAndRelationsSparkJob.java | 186 ++++++++++++ ...kJob.java => MergeGraphTableSparkJob.java} | 4 +- .../raw/GenerateEntitiesApplication.java | 31 +- .../oa/graph/raw/common/OafMapperUtils.java | 273 ------------------ .../dispatch_entities_bytype_parameters.json | 20 ++ .../group_graph_entities_parameters.json | 20 ++ .../groupbyid/oozie_app/config-default.xml | 18 ++ .../oa/graph/groupbyid/oozie_app/workflow.xml | 271 +++++++++++++++++ .../dhp/oa/graph/merge/oozie_app/workflow.xml | 16 +- ....java => MergeGraphTableSparkJobTest.java} | 14 +- .../raw/GenerateEntitiesApplicationTest.java | 99 +++++++ 14 files changed, 727 insertions(+), 316 deletions(-) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java rename dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/{MergeGraphSparkJob.java => MergeGraphTableSparkJob.java} (98%) delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml rename dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/{MergeGraphSparkJobTest.java => MergeGraphTableSparkJobTest.java} (90%) create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java new file mode 100644 index 000000000..6ba918069 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java @@ -0,0 +1,2 @@ +package eu.dnetlib.dhp.schema.oaf;public class OafMapperUtils { +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java new file mode 100644 index 000000000..a2e64b83b --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java @@ -0,0 +1,2 @@ +package eu.dnetlib.dhp.schema.oaf;public class ResultTypeComparator { +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java new file mode 100644 index 000000000..6b90e9971 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java @@ -0,0 +1,87 @@ + +package eu.dnetlib.dhp.oa.graph.fuse; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class DispatchEntitiesSparkJob { + + private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + MigrateMongoMdstoresApplication.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json"))); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String sourcePath = parser.get("sourcePath"); + final String targetPath = parser.get("graphRawPath"); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, targetPath); + ModelSupport.oafTypes + .values() + .forEach(clazz -> processEntity(spark, clazz, sourcePath, targetPath)); + }); + } + + private static void processEntity( + final SparkSession spark, + final Class clazz, + final String sourcePath, + final String targetPath) { + final String type = clazz.getSimpleName().toLowerCase(); + + log.info("Processing entities ({}) in file: {}", type, sourcePath); + + spark + .read() + .textFile(sourcePath) + .filter((FilterFunction) value -> isEntityType(value, type)) + .map( + (MapFunction) l -> StringUtils.substringAfter(l, "|"), + Encoders.STRING()) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .text(targetPath + "/" + type); + } + + private static boolean isEntityType(final String line, final String type) { + return StringUtils.substringBefore(line, "|").equalsIgnoreCase(type); + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java new file mode 100644 index 000000000..7ab8646ec --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java @@ -0,0 +1,186 @@ + +package eu.dnetlib.dhp.oa.graph.groupbyid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.Aggregator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.jayway.jsonpath.JsonPath; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import scala.Tuple2; + +/** + * Groups the graph content by entity identifier to ensure ID uniqueness + */ +public class GroupEntitiesSparkJob { + + private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); + + private final static String ID_JPATH = "$.id"; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + GroupEntitiesSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String graphInputPath = parser.get("graphInputPath"); + log.info("graphInputPath: {}", graphInputPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + groupEntities(spark, graphInputPath, outputPath); + }); + } + + private static void groupEntities( + SparkSession spark, + String inputPath, + String outputPath) { + + TypedColumn aggregator = new GroupingAggregator().toColumn(); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + spark + .read() + .textFile(toSeq(listEntityPaths(inputPath, sc))) + .map((MapFunction) s -> parseEntity(s), Encoders.kryo(Oaf.class)) + .groupByKey((MapFunction) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING()) + .agg(aggregator) + .map((MapFunction, Oaf>) Tuple2::_2, Encoders.kryo(Oaf.class)) + .write() + .mode(SaveMode.Overwrite) + .save(outputPath); + } + + public static class GroupingAggregator extends Aggregator { + + @Override + public Oaf zero() { + return null; + } + + @Override + public Oaf reduce(Oaf b, Oaf a) { + return mergeAndGet(b, a); + } + + private Oaf mergeAndGet(Oaf b, Oaf a) { + if (Objects.nonNull(a) && Objects.nonNull(b)) { + return OafMapperUtils.merge(b, a); + } + return Objects.isNull(a) ? b : a; + } + + @Override + public Oaf merge(Oaf b, Oaf a) { + return mergeAndGet(b, a); + } + + @Override + public Oaf finish(Oaf j) { + return j; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.kryo(Oaf.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.kryo(Oaf.class); + } + + } + + private static Oaf parseEntity(String s) { + String prefix = StringUtils.substringBefore(jPath(ID_JPATH, s), "|"); + try { + switch (prefix) { + case "10": + return OBJECT_MAPPER.readValue(s, Datasource.class); + case "20": + return OBJECT_MAPPER.readValue(s, Organization.class); + case "40": + return OBJECT_MAPPER.readValue(s, Project.class); + case "50": + String resultType = jPath("$.resulttype.classid", s); + switch (resultType) { + case "publication": + return OBJECT_MAPPER.readValue(s, Publication.class); + case "dataset": + return OBJECT_MAPPER.readValue(s, eu.dnetlib.dhp.schema.oaf.Dataset.class); + case "software": + return OBJECT_MAPPER.readValue(s, Software.class); + case "other": + return OBJECT_MAPPER.readValue(s, OtherResearchProduct.class); + default: + throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType)); + } + default: + throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static List listEntityPaths(String inputPath, JavaSparkContext sc) { + return HdfsSupport + .listFiles(inputPath, sc.hadoopConfiguration()) + .stream() + .filter(p -> !p.contains("relation")) + .collect(Collectors.toList()); + } + + private static String jPath(final String path, final String json) { + Object o = JsonPath.read(json, path); + if (o instanceof String) + return (String) o; + throw new IllegalStateException(String.format("could not extract '%s' from:\n%s", path, json)); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java similarity index 98% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java index 037683604..e53f4ca30 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java @@ -33,9 +33,9 @@ import scala.Tuple2; * are picked preferring those from the BETA aggregator rather then from PROD. The identity of a relationship is defined * by eu.dnetlib.dhp.schema.common.ModelSupport#idFn() */ -public class MergeGraphSparkJob { +public class MergeGraphTableSparkJob { - private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class); + private static final Logger log = LoggerFactory.getLogger(MergeGraphTableSparkJob.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index 3568dc52a..99aa9c3f0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -4,11 +4,9 @@ package eu.dnetlib.dhp.oa.graph.raw; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.Optional; +import java.util.*; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -20,6 +18,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,16 +28,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Dataset; -import eu.dnetlib.dhp.schema.oaf.Datasource; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import eu.dnetlib.dhp.schema.oaf.Organization; -import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; -import eu.dnetlib.dhp.schema.oaf.Project; -import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.Software; +import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; @@ -113,7 +103,7 @@ public class GenerateEntitiesApplication { inputRdd .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf)) - .reduceByKey((o1, o2) -> merge(o1, o2)) + .reduceByKey((o1, o2) -> OafMapperUtils.merge(o1, o2)) .map(Tuple2::_2) .map( oaf -> oaf.getClass().getSimpleName().toLowerCase() @@ -122,17 +112,6 @@ public class GenerateEntitiesApplication { .saveAsTextFile(targetPath, GzipCodec.class); } - private static Oaf merge(final Oaf o1, final Oaf o2) { - if (ModelSupport.isSubClass(o1, OafEntity.class)) { - ((OafEntity) o1).mergeFrom((OafEntity) o2); - } else if (ModelSupport.isSubClass(o1, Relation.class)) { - ((Relation) o1).mergeFrom((Relation) o2); - } else { - throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName()); - } - return o1; - } - private static List convertToListOaf( final String id, final String s, diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java deleted file mode 100644 index 84b29e3d4..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java +++ /dev/null @@ -1,273 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.raw.common; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Collectors; - -import org.apache.commons.lang3.StringUtils; - -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.ExtraInfo; -import eu.dnetlib.dhp.schema.oaf.Field; -import eu.dnetlib.dhp.schema.oaf.Journal; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.OAIProvenance; -import eu.dnetlib.dhp.schema.oaf.OriginDescription; -import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -import eu.dnetlib.dhp.utils.DHPUtils; - -public class OafMapperUtils { - - public static KeyValue keyValue(final String k, final String v) { - final KeyValue kv = new KeyValue(); - kv.setKey(k); - kv.setValue(v); - return kv; - } - - public static List listKeyValues(final String... s) { - if (s.length % 2 > 0) { - throw new RuntimeException("Invalid number of parameters (k,v,k,v,....)"); - } - - final List list = new ArrayList<>(); - for (int i = 0; i < s.length; i += 2) { - list.add(keyValue(s[i], s[i + 1])); - } - return list; - } - - public static Field field(final T value, final DataInfo info) { - if (value == null || StringUtils.isBlank(value.toString())) { - return null; - } - - final Field field = new Field<>(); - field.setValue(value); - field.setDataInfo(info); - return field; - } - - public static List> listFields(final DataInfo info, final String... values) { - return Arrays - .stream(values) - .map(v -> field(v, info)) - .filter(Objects::nonNull) - .filter(distinctByKey(f -> f.getValue())) - .collect(Collectors.toList()); - } - - public static List> listFields(final DataInfo info, final List values) { - return values - .stream() - .map(v -> field(v, info)) - .filter(Objects::nonNull) - .filter(distinctByKey(f -> f.getValue())) - .collect(Collectors.toList()); - } - - public static Qualifier unknown(final String schemeid, final String schemename) { - return qualifier("UNKNOWN", "Unknown", schemeid, schemename); - } - - public static Qualifier qualifier( - final String classid, - final String classname, - final String schemeid, - final String schemename) { - final Qualifier q = new Qualifier(); - q.setClassid(classid); - q.setClassname(classname); - q.setSchemeid(schemeid); - q.setSchemename(schemename); - return q; - } - - public static StructuredProperty structuredProperty( - final String value, - final String classid, - final String classname, - final String schemeid, - final String schemename, - final DataInfo dataInfo) { - - return structuredProperty(value, qualifier(classid, classname, schemeid, schemename), dataInfo); - } - - public static StructuredProperty structuredProperty( - final String value, - final Qualifier qualifier, - final DataInfo dataInfo) { - if (value == null) { - return null; - } - final StructuredProperty sp = new StructuredProperty(); - sp.setValue(value); - sp.setQualifier(qualifier); - sp.setDataInfo(dataInfo); - return sp; - } - - public static ExtraInfo extraInfo( - final String name, - final String value, - final String typology, - final String provenance, - final String trust) { - final ExtraInfo info = new ExtraInfo(); - info.setName(name); - info.setValue(value); - info.setTypology(typology); - info.setProvenance(provenance); - info.setTrust(trust); - return info; - } - - public static OAIProvenance oaiIProvenance( - final String identifier, - final String baseURL, - final String metadataNamespace, - final Boolean altered, - final String datestamp, - final String harvestDate) { - - final OriginDescription desc = new OriginDescription(); - desc.setIdentifier(identifier); - desc.setBaseURL(baseURL); - desc.setMetadataNamespace(metadataNamespace); - desc.setAltered(altered); - desc.setDatestamp(datestamp); - desc.setHarvestDate(harvestDate); - - final OAIProvenance p = new OAIProvenance(); - p.setOriginDescription(desc); - - return p; - } - - public static Journal journal( - final String name, - final String issnPrinted, - final String issnOnline, - final String issnLinking, - final DataInfo dataInfo) { - return journal( - name, - issnPrinted, - issnOnline, - issnLinking, - null, - null, - null, - null, - null, - null, - null, - dataInfo); - } - - public static Journal journal( - final String name, - final String issnPrinted, - final String issnOnline, - final String issnLinking, - final String ep, - final String iss, - final String sp, - final String vol, - final String edition, - final String conferenceplace, - final String conferencedate, - final DataInfo dataInfo) { - - if (StringUtils.isNotBlank(name) - || StringUtils.isNotBlank(issnPrinted) - || StringUtils.isNotBlank(issnOnline) - || StringUtils.isNotBlank(issnLinking)) { - final Journal j = new Journal(); - j.setName(name); - j.setIssnPrinted(issnPrinted); - j.setIssnOnline(issnOnline); - j.setIssnLinking(issnLinking); - j.setEp(ep); - j.setIss(iss); - j.setSp(sp); - j.setVol(vol); - j.setEdition(edition); - j.setConferenceplace(conferenceplace); - j.setConferencedate(conferencedate); - j.setDataInfo(dataInfo); - return j; - } else { - return null; - } - } - - public static DataInfo dataInfo( - final Boolean deletedbyinference, - final String inferenceprovenance, - final Boolean inferred, - final Boolean invisible, - final Qualifier provenanceaction, - final String trust) { - final DataInfo d = new DataInfo(); - d.setDeletedbyinference(deletedbyinference); - d.setInferenceprovenance(inferenceprovenance); - d.setInferred(inferred); - d.setInvisible(invisible); - d.setProvenanceaction(provenanceaction); - d.setTrust(trust); - return d; - } - - public static String createOpenaireId( - final int prefix, - final String originalId, - final boolean to_md5) { - if (StringUtils.isBlank(originalId)) { - return null; - } else if (to_md5) { - final String nsPrefix = StringUtils.substringBefore(originalId, "::"); - final String rest = StringUtils.substringAfter(originalId, "::"); - return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest)); - } else { - return String.format("%s|%s", prefix, originalId); - } - } - - public static String createOpenaireId( - final String type, - final String originalId, - final boolean to_md5) { - switch (type) { - case "datasource": - return createOpenaireId(10, originalId, to_md5); - case "organization": - return createOpenaireId(20, originalId, to_md5); - case "person": - return createOpenaireId(30, originalId, to_md5); - case "project": - return createOpenaireId(40, originalId, to_md5); - default: - return createOpenaireId(50, originalId, to_md5); - } - } - - public static String asString(final Object o) { - return o == null ? "" : o.toString(); - } - - public static Predicate distinctByKey( - final Function keyExtractor) { - final Map seen = new ConcurrentHashMap<>(); - return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null; - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json new file mode 100644 index 000000000..7d995f39a --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the source path", + "paramRequired": true + }, + { + "paramName": "g", + "paramLongName": "graphRawPath", + "paramDescription": "the path of the graph Raw in hdfs", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json new file mode 100644 index 000000000..e65acb3c4 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "gin", + "paramLongName": "graphInputPath", + "paramDescription": "the graph root path", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the output merged graph root path", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/config-default.xml new file mode 100644 index 000000000..2e0ed9aee --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml new file mode 100644 index 000000000..3715d097d --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml @@ -0,0 +1,271 @@ + + + + + graphInputPath + the graph root input path + + + outputPath + the graph root output path + + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Fuse graph entities by ID + eu.dnetlib.dhp.oa.graph.fuse.FuseGraphResultsSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --graphInputPath${graphInputPath} + --outputPath${workingDir}/entities + + + + + + + + yarn + cluster + Merge datasets + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --betaInputPath${betaInputGgraphPath}/dataset + --prodInputPath${prodInputGgraphPath}/dataset + --outputPath${graphOutputPath}/dataset + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset + --priority${priority} + + + + + + + + yarn + cluster + Merge otherresearchproducts + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --betaInputPath${betaInputGgraphPath}/otherresearchproduct + --prodInputPath${prodInputGgraphPath}/otherresearchproduct + --outputPath${graphOutputPath}/otherresearchproduct + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + --priority${priority} + + + + + + + + yarn + cluster + Merge softwares + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --betaInputPath${betaInputGgraphPath}/software + --prodInputPath${prodInputGgraphPath}/software + --outputPath${graphOutputPath}/software + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software + --priority${priority} + + + + + + + + yarn + cluster + Merge datasources + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --betaInputPath${betaInputGgraphPath}/datasource + --prodInputPath${prodInputGgraphPath}/datasource + --outputPath${graphOutputPath}/datasource + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource + --priority${priority} + + + + + + + + yarn + cluster + Merge organizations + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --betaInputPath${betaInputGgraphPath}/organization + --prodInputPath${prodInputGgraphPath}/organization + --outputPath${graphOutputPath}/organization + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization + --priority${priority} + + + + + + + + yarn + cluster + Merge projects + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --betaInputPath${betaInputGgraphPath}/project + --prodInputPath${prodInputGgraphPath}/project + --outputPath${graphOutputPath}/project + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project + --priority${priority} + + + + + + + + yarn + cluster + Merge relations + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --betaInputPath${betaInputGgraphPath}/relation + --prodInputPath${prodInputGgraphPath}/relation + --outputPath${graphOutputPath}/relation + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation + --priority${priority} + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml index 07a125fb6..604f515a5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml @@ -76,7 +76,7 @@ yarn cluster Merge publications - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -103,7 +103,7 @@ yarn cluster Merge datasets - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -130,7 +130,7 @@ yarn cluster Merge otherresearchproducts - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -157,7 +157,7 @@ yarn cluster Merge softwares - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -184,7 +184,7 @@ yarn cluster Merge datasources - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -211,7 +211,7 @@ yarn cluster Merge organizations - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -238,7 +238,7 @@ yarn cluster Merge projects - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -265,7 +265,7 @@ yarn cluster Merge relations - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJobTest.java similarity index 90% rename from dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java rename to dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJobTest.java index 28e8e5abc..0089811cf 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJobTest.java @@ -15,7 +15,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.oaf.Datasource; -public class MergeGraphSparkJobTest { +public class MergeGraphTableSparkJobTest { private ObjectMapper mapper; @@ -28,7 +28,7 @@ public class MergeGraphSparkJobTest { public void testMergeDatasources() throws IOException { assertEquals( "openaire-cris_1.1", - MergeGraphSparkJob + MergeGraphTableSparkJob .mergeDatasource( d("datasource_cris.json"), d("datasource_UNKNOWN.json")) @@ -36,7 +36,7 @@ public class MergeGraphSparkJobTest { .getClassid()); assertEquals( "openaire-cris_1.1", - MergeGraphSparkJob + MergeGraphTableSparkJob .mergeDatasource( d("datasource_UNKNOWN.json"), d("datasource_cris.json")) @@ -44,7 +44,7 @@ public class MergeGraphSparkJobTest { .getClassid()); assertEquals( "driver-openaire2.0", - MergeGraphSparkJob + MergeGraphTableSparkJob .mergeDatasource( d("datasource_native.json"), d("datasource_driver-openaire2.0.json")) @@ -52,7 +52,7 @@ public class MergeGraphSparkJobTest { .getClassid()); assertEquals( "driver-openaire2.0", - MergeGraphSparkJob + MergeGraphTableSparkJob .mergeDatasource( d("datasource_driver-openaire2.0.json"), d("datasource_native.json")) @@ -60,7 +60,7 @@ public class MergeGraphSparkJobTest { .getClassid()); assertEquals( "openaire4.0", - MergeGraphSparkJob + MergeGraphTableSparkJob .mergeDatasource( d("datasource_notCompatible.json"), d("datasource_openaire4.0.json")) @@ -68,7 +68,7 @@ public class MergeGraphSparkJobTest { .getClassid()); assertEquals( "notCompatible", - MergeGraphSparkJob + MergeGraphTableSparkJob .mergeDatasource( d("datasource_notCompatible.json"), d("datasource_UNKNOWN.json")) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java new file mode 100644 index 000000000..705f1dddb --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java @@ -0,0 +1,99 @@ + +package eu.dnetlib.dhp.oa.graph.raw; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.lenient; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest; +import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; + +@ExtendWith(MockitoExtension.class) +public class GenerateEntitiesApplicationTest { + + @Mock + private ISLookUpService isLookUpService; + + @Mock + private VocabularyGroup vocs; + + @BeforeEach + public void setUp() throws IOException, ISLookUpException { + + lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs()); + lenient() + .when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY)) + .thenReturn(synonyms()); + + vocs = VocabularyGroup.loadVocsFromIS(isLookUpService); + } + + @Test + public void testMergeResult() throws IOException { + Result publication = getResult("oaf_record.xml", Publication.class); + Result dataset = getResult("odf_dataset.xml", Dataset.class); + Result software = getResult("odf_software.xml", Software.class); + Result orp = getResult("oaf_orp.xml", OtherResearchProduct.class); + + verifyMerge(publication, dataset, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID); + verifyMerge(dataset, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID); + + verifyMerge(publication, software, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID); + verifyMerge(software, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID); + + verifyMerge(publication, orp, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID); + verifyMerge(orp, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID); + + verifyMerge(dataset, software, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID); + verifyMerge(software, dataset, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID); + + verifyMerge(dataset, orp, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID); + verifyMerge(orp, dataset, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID); + + verifyMerge(software, orp, Software.class, ModelConstants.SOFTWARE_RESULTTYPE_CLASSID); + verifyMerge(orp, software, Software.class, ModelConstants.SOFTWARE_RESULTTYPE_CLASSID); + } + + protected void verifyMerge(Result publication, Result dataset, Class clazz, + String resultType) { + final Result merge = OafMapperUtils.mergeResults(publication, dataset); + assertTrue(clazz.isAssignableFrom(merge.getClass())); + assertEquals(resultType, merge.getResulttype().getClassid()); + } + + protected Result getResult(String xmlFileName, Class clazz) throws IOException { + final String xml = IOUtils.toString(getClass().getResourceAsStream(xmlFileName)); + return new OdfToOafMapper(vocs, false) + .processMdRecord(xml) + .stream() + .filter(s -> clazz.isAssignableFrom(s.getClass())) + .map(s -> (Result) s) + .findFirst() + .get(); + } + + private List vocs() throws IOException { + return IOUtils + .readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt")); + } + + private List synonyms() throws IOException { + return IOUtils + .readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt")); + } + +} From 2bed29eb09616e01ee1d26c082847d7298db9e98 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 13 Nov 2020 10:05:12 +0100 Subject: [PATCH 3/9] WIP: added oozie workflow for grouping graph entities by id --- .../dhp/schema/oaf/OafMapperUtils.java | 297 +++++++++++++++++- .../dhp/schema/oaf/ResultTypeComparator.java | 49 ++- .../java/eu/dnetlib/dhp/utils/DHPUtils.java | 7 + .../oa/graph/clean/CleanGraphSparkJob.java | 10 +- .../dhp/oa/graph/clean/CleaningFunctions.java | 1 - .../groupbyid/DispatchEntitiesSparkJob.java | 64 ++-- .../GroupEntitiesAndRelationsSparkJob.java | 80 +++-- .../raw/AbstractMdRecordToOafMapper.java | 2 +- .../raw/GenerateEntitiesApplication.java | 24 +- .../raw/MigrateDbEntitiesApplication.java | 10 +- .../dhp/oa/graph/raw/OafToOafMapper.java | 6 +- .../dhp/oa/graph/raw/OdfToOafMapper.java | 6 +- .../dhp/oa/graph/raw/common/Vocabulary.java | 1 + .../oa/graph/raw/common/VocabularyGroup.java | 1 + .../dispatch_entities_bytype_parameters.json | 16 +- .../oa/graph/groupbyid/oozie_app/workflow.xml | 188 ++++++----- .../dhp/oa/graph/merge/oozie_app/workflow.xml | 4 +- .../raw/MigrateDbEntitiesApplicationTest.java | 8 +- .../dhp/oa/provision/XmlConverterJob.java | 15 +- 19 files changed, 578 insertions(+), 211 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java index 6ba918069..4a66f91dc 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java @@ -1,2 +1,297 @@ -package eu.dnetlib.dhp.schema.oaf;public class OafMapperUtils { + +package eu.dnetlib.dhp.schema.oaf; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; + +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.utils.DHPUtils; + +public class OafMapperUtils { + + public static Oaf merge(final Oaf o1, final Oaf o2) { + if (ModelSupport.isSubClass(o1, OafEntity.class)) { + if (ModelSupport.isSubClass(o1, Result.class)) { + + return mergeResults((Result) o1, (Result) o2); + } else if (ModelSupport.isSubClass(o1, Datasource.class)) { + ((Datasource) o1).mergeFrom((Datasource) o2); + } else if (ModelSupport.isSubClass(o1, Organization.class)) { + ((Organization) o1).mergeFrom((Organization) o2); + } else if (ModelSupport.isSubClass(o1, Project.class)) { + ((Project) o1).mergeFrom((Project) o2); + } else { + throw new RuntimeException("invalid OafEntity subtype:" + o1.getClass().getCanonicalName()); + } + } else if (ModelSupport.isSubClass(o1, Relation.class)) { + ((Relation) o1).mergeFrom((Relation) o2); + } else { + throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName()); + } + return o1; + } + + public static Result mergeResults(Result r1, Result r2) { + if (new ResultTypeComparator().compare(r1, r2) < 0) { + r1.mergeFrom(r2); + return r1; + } else { + r2.mergeFrom(r1); + return r2; + } + } + + public static KeyValue keyValue(final String k, final String v) { + final KeyValue kv = new KeyValue(); + kv.setKey(k); + kv.setValue(v); + return kv; + } + + public static List listKeyValues(final String... s) { + if (s.length % 2 > 0) { + throw new RuntimeException("Invalid number of parameters (k,v,k,v,....)"); + } + + final List list = new ArrayList<>(); + for (int i = 0; i < s.length; i += 2) { + list.add(keyValue(s[i], s[i + 1])); + } + return list; + } + + public static Field field(final T value, final DataInfo info) { + if (value == null || StringUtils.isBlank(value.toString())) { + return null; + } + + final Field field = new Field<>(); + field.setValue(value); + field.setDataInfo(info); + return field; + } + + public static List> listFields(final DataInfo info, final String... values) { + return Arrays + .stream(values) + .map(v -> field(v, info)) + .filter(Objects::nonNull) + .filter(distinctByKey(f -> f.getValue())) + .collect(Collectors.toList()); + } + + public static List> listFields(final DataInfo info, final List values) { + return values + .stream() + .map(v -> field(v, info)) + .filter(Objects::nonNull) + .filter(distinctByKey(f -> f.getValue())) + .collect(Collectors.toList()); + } + + public static Qualifier unknown(final String schemeid, final String schemename) { + return qualifier("UNKNOWN", "Unknown", schemeid, schemename); + } + + public static Qualifier qualifier( + final String classid, + final String classname, + final String schemeid, + final String schemename) { + final Qualifier q = new Qualifier(); + q.setClassid(classid); + q.setClassname(classname); + q.setSchemeid(schemeid); + q.setSchemename(schemename); + return q; + } + + public static StructuredProperty structuredProperty( + final String value, + final String classid, + final String classname, + final String schemeid, + final String schemename, + final DataInfo dataInfo) { + + return structuredProperty(value, qualifier(classid, classname, schemeid, schemename), dataInfo); + } + + public static StructuredProperty structuredProperty( + final String value, + final Qualifier qualifier, + final DataInfo dataInfo) { + if (value == null) { + return null; + } + final StructuredProperty sp = new StructuredProperty(); + sp.setValue(value); + sp.setQualifier(qualifier); + sp.setDataInfo(dataInfo); + return sp; + } + + public static ExtraInfo extraInfo( + final String name, + final String value, + final String typology, + final String provenance, + final String trust) { + final ExtraInfo info = new ExtraInfo(); + info.setName(name); + info.setValue(value); + info.setTypology(typology); + info.setProvenance(provenance); + info.setTrust(trust); + return info; + } + + public static OAIProvenance oaiIProvenance( + final String identifier, + final String baseURL, + final String metadataNamespace, + final Boolean altered, + final String datestamp, + final String harvestDate) { + + final OriginDescription desc = new OriginDescription(); + desc.setIdentifier(identifier); + desc.setBaseURL(baseURL); + desc.setMetadataNamespace(metadataNamespace); + desc.setAltered(altered); + desc.setDatestamp(datestamp); + desc.setHarvestDate(harvestDate); + + final OAIProvenance p = new OAIProvenance(); + p.setOriginDescription(desc); + + return p; + } + + public static Journal journal( + final String name, + final String issnPrinted, + final String issnOnline, + final String issnLinking, + final DataInfo dataInfo) { + return journal( + name, + issnPrinted, + issnOnline, + issnLinking, + null, + null, + null, + null, + null, + null, + null, + dataInfo); + } + + public static Journal journal( + final String name, + final String issnPrinted, + final String issnOnline, + final String issnLinking, + final String ep, + final String iss, + final String sp, + final String vol, + final String edition, + final String conferenceplace, + final String conferencedate, + final DataInfo dataInfo) { + + if (StringUtils.isNotBlank(name) + || StringUtils.isNotBlank(issnPrinted) + || StringUtils.isNotBlank(issnOnline) + || StringUtils.isNotBlank(issnLinking)) { + final Journal j = new Journal(); + j.setName(name); + j.setIssnPrinted(issnPrinted); + j.setIssnOnline(issnOnline); + j.setIssnLinking(issnLinking); + j.setEp(ep); + j.setIss(iss); + j.setSp(sp); + j.setVol(vol); + j.setEdition(edition); + j.setConferenceplace(conferenceplace); + j.setConferencedate(conferencedate); + j.setDataInfo(dataInfo); + return j; + } else { + return null; + } + } + + public static DataInfo dataInfo( + final Boolean deletedbyinference, + final String inferenceprovenance, + final Boolean inferred, + final Boolean invisible, + final Qualifier provenanceaction, + final String trust) { + final DataInfo d = new DataInfo(); + d.setDeletedbyinference(deletedbyinference); + d.setInferenceprovenance(inferenceprovenance); + d.setInferred(inferred); + d.setInvisible(invisible); + d.setProvenanceaction(provenanceaction); + d.setTrust(trust); + return d; + } + + public static String createOpenaireId( + final int prefix, + final String originalId, + final boolean to_md5) { + if (StringUtils.isBlank(originalId)) { + return null; + } else if (to_md5) { + final String nsPrefix = StringUtils.substringBefore(originalId, "::"); + final String rest = StringUtils.substringAfter(originalId, "::"); + return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest)); + } else { + return String.format("%s|%s", prefix, originalId); + } + } + + public static String createOpenaireId( + final String type, + final String originalId, + final boolean to_md5) { + switch (type) { + case "datasource": + return createOpenaireId(10, originalId, to_md5); + case "organization": + return createOpenaireId(20, originalId, to_md5); + case "person": + return createOpenaireId(30, originalId, to_md5); + case "project": + return createOpenaireId(40, originalId, to_md5); + default: + return createOpenaireId(50, originalId, to_md5); + } + } + + public static String asString(final Object o) { + return o == null ? "" : o.toString(); + } + + public static Predicate distinctByKey( + final Function keyExtractor) { + final Map seen = new ConcurrentHashMap<>(); + return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null; + } } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java index a2e64b83b..6c11d1a85 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java @@ -1,2 +1,49 @@ -package eu.dnetlib.dhp.schema.oaf;public class ResultTypeComparator { + +package eu.dnetlib.dhp.schema.oaf; + +import java.util.Comparator; + +import eu.dnetlib.dhp.schema.common.ModelConstants; + +public class ResultTypeComparator implements Comparator { + + @Override + public int compare(Result left, Result right) { + + if (left == null && right == null) + return 0; + if (left == null) + return 1; + if (right == null) + return -1; + + String lClass = left.getResulttype().getClassid(); + String rClass = right.getResulttype().getClassid(); + + if (lClass.equals(rClass)) + return 0; + + if (lClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID)) + return -1; + if (rClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID)) + return 1; + + if (lClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID)) + return -1; + if (rClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID)) + return 1; + + if (lClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID)) + return -1; + if (rClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID)) + return 1; + + if (lClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID)) + return -1; + if (rClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID)) + return 1; + + // Else (but unlikely), lexicographical ordering will do. + return lClass.compareTo(rClass); + } } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java index dfbaf3a6c..8872174a5 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java @@ -5,6 +5,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; +import java.util.List; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -15,9 +16,15 @@ import org.apache.commons.codec.binary.Hex; import com.jayway.jsonpath.JsonPath; import net.minidev.json.JSONArray; +import scala.collection.JavaConverters; +import scala.collection.Seq; public class DHPUtils { + public static Seq toSeq(List list) { + return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq(); + } + public static String md5(final String s) { try { final MessageDigest md = MessageDigest.getInstance("MD5"); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index e295b9503..714b35dac 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -3,13 +3,9 @@ package eu.dnetlib.dhp.oa.graph.clean; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.io.BufferedInputStream; -import java.util.Objects; import java.util.Optional; -import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -23,11 +19,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper; -import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java index 56a4aaf5a..9f06ea056 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java @@ -11,7 +11,6 @@ import org.apache.commons.lang3.StringUtils; import com.clearspring.analytics.util.Lists; import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper; -import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java index 6b90e9971..1b4226411 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java @@ -1,11 +1,10 @@ -package eu.dnetlib.dhp.oa.graph.fuse; +package eu.dnetlib.dhp.oa.graph.groupbyid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Oaf; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; @@ -17,14 +16,22 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Optional; +import com.fasterxml.jackson.databind.ObjectMapper; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import scala.Tuple2; public class DispatchEntitiesSparkJob { private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils @@ -40,48 +47,51 @@ public class DispatchEntitiesSparkJob { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String sourcePath = parser.get("sourcePath"); - final String targetPath = parser.get("graphRawPath"); + final String entitiesPath = parser.get("entitiesPath"); + log.info("entitiesPath: {}", entitiesPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); + + Class entityClazz = (Class) Class.forName(graphTableClassName); SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + runWithSparkSession( conf, isSparkSessionManaged, spark -> { - removeOutputDir(spark, targetPath); - ModelSupport.oafTypes - .values() - .forEach(clazz -> processEntity(spark, clazz, sourcePath, targetPath)); + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + dispatchOaf(spark, entityClazz, entitiesPath, outputPath); }); } - private static void processEntity( + private static void dispatchOaf( final SparkSession spark, final Class clazz, final String sourcePath, final String targetPath) { - final String type = clazz.getSimpleName().toLowerCase(); - log.info("Processing entities ({}) in file: {}", type, sourcePath); + log.info("Processing entities ({}) in file: {}", clazz.getName(), sourcePath); spark .read() .textFile(sourcePath) - .filter((FilterFunction) value -> isEntityType(value, type)) - .map( - (MapFunction) l -> StringUtils.substringAfter(l, "|"), - Encoders.STRING()) + .filter((FilterFunction) s -> isEntityType(s, clazz)) + .map((MapFunction) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) .write() .option("compression", "gzip") .mode(SaveMode.Overwrite) - .text(targetPath + "/" + type); + .text(targetPath); } - private static boolean isEntityType(final String line, final String type) { - return StringUtils.substringBefore(line, "|").equalsIgnoreCase(type); + private static boolean isEntityType(final String s, final Class clazz) { + return StringUtils.substringBefore(s, "|").equals(clazz.getName()); } - private static void removeOutputDir(SparkSession spark, String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); - } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java index 7ab8646ec..1d887cecb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java @@ -12,9 +12,9 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.expressions.Aggregator; @@ -22,7 +22,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.Option; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; @@ -33,19 +36,21 @@ import scala.Tuple2; /** * Groups the graph content by entity identifier to ensure ID uniqueness */ -public class GroupEntitiesSparkJob { +public class GroupEntitiesAndRelationsSparkJob { - private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); + private static final Logger log = LoggerFactory.getLogger(GroupEntitiesAndRelationsSparkJob.class); private final static String ID_JPATH = "$.id"; + private final static String SOURCE_JPATH = "$.source"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( - GroupEntitiesSparkJob.class + GroupEntitiesAndRelationsSparkJob.class .getResourceAsStream( "/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); @@ -72,11 +77,11 @@ public class GroupEntitiesSparkJob { isSparkSessionManaged, spark -> { HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - groupEntities(spark, graphInputPath, outputPath); + groupEntitiesAndRelations(spark, graphInputPath, outputPath); }); } - private static void groupEntities( + private static void groupEntitiesAndRelations( SparkSession spark, String inputPath, String outputPath) { @@ -85,14 +90,19 @@ public class GroupEntitiesSparkJob { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); spark .read() - .textFile(toSeq(listEntityPaths(inputPath, sc))) - .map((MapFunction) s -> parseEntity(s), Encoders.kryo(Oaf.class)) + .textFile(toSeq(listPaths(inputPath, sc))) + .map((MapFunction) s -> parseOaf(s), Encoders.kryo(Oaf.class)) + .filter((FilterFunction) oaf -> StringUtils.isNotBlank(ModelSupport.idFn().apply(oaf))) .groupByKey((MapFunction) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING()) .agg(aggregator) - .map((MapFunction, Oaf>) Tuple2::_2, Encoders.kryo(Oaf.class)) + .map( + (MapFunction, String>) t -> t._2().getClass().getName() + + "|" + OBJECT_MAPPER.writeValueAsString(t._2()), + Encoders.STRING()) .write() + .option("compression", "gzip") .mode(SaveMode.Overwrite) - .save(outputPath); + .text(outputPath); } public static class GroupingAggregator extends Aggregator { @@ -136,51 +146,61 @@ public class GroupEntitiesSparkJob { } - private static Oaf parseEntity(String s) { - String prefix = StringUtils.substringBefore(jPath(ID_JPATH, s), "|"); - try { + private static Oaf parseOaf(String s) { + + DocumentContext dc = JsonPath + .parse(s, Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS)); + final String id = dc.read(ID_JPATH); + if (StringUtils.isNotBlank(id)) { + + String prefix = StringUtils.substringBefore(id, "|"); switch (prefix) { case "10": - return OBJECT_MAPPER.readValue(s, Datasource.class); + return parse(s, Datasource.class); case "20": - return OBJECT_MAPPER.readValue(s, Organization.class); + return parse(s, Organization.class); case "40": - return OBJECT_MAPPER.readValue(s, Project.class); + return parse(s, Project.class); case "50": - String resultType = jPath("$.resulttype.classid", s); + String resultType = dc.read("$.resulttype.classid"); switch (resultType) { case "publication": - return OBJECT_MAPPER.readValue(s, Publication.class); + return parse(s, Publication.class); case "dataset": - return OBJECT_MAPPER.readValue(s, eu.dnetlib.dhp.schema.oaf.Dataset.class); + return parse(s, eu.dnetlib.dhp.schema.oaf.Dataset.class); case "software": - return OBJECT_MAPPER.readValue(s, Software.class); + return parse(s, Software.class); case "other": - return OBJECT_MAPPER.readValue(s, OtherResearchProduct.class); + return parse(s, OtherResearchProduct.class); default: throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType)); } default: throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix)); } + } else { + String source = dc.read(SOURCE_JPATH); + if (StringUtils.isNotBlank(source)) { + return parse(s, Relation.class); + } else { + throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s)); + } + } + } + + private static Oaf parse(String s, Class clazz) { + try { + return OBJECT_MAPPER.readValue(s, clazz); } catch (IOException e) { throw new RuntimeException(e); } } - private static List listEntityPaths(String inputPath, JavaSparkContext sc) { + private static List listPaths(String inputPath, JavaSparkContext sc) { return HdfsSupport .listFiles(inputPath, sc.hadoopConfiguration()) .stream() - .filter(p -> !p.contains("relation")) .collect(Collectors.toList()); } - private static String jPath(final String path, final String json) { - Object o = JsonPath.read(json, path); - if (o instanceof String) - return (String) o; - throw new IllegalStateException(String.format("could not extract '%s' from:\n%s", path, json)); - } - } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index 5b6ae72f1..da4b5e324 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -1,8 +1,8 @@ package eu.dnetlib.dhp.oa.graph.raw; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*; import static eu.dnetlib.dhp.schema.common.ModelConstants.*; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.*; import java.util.*; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index 99aa9c3f0..cfd190670 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -4,9 +4,11 @@ package eu.dnetlib.dhp.oa.graph.raw; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.IOException; -import java.util.*; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -18,7 +20,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +69,7 @@ public class GenerateEntitiesApplication { final SparkConf conf = new SparkConf(); runWithSparkSession(conf, isSparkSessionManaged, spark -> { - removeOutputDir(spark, targetPath); + HdfsSupport.remove(targetPath, spark.sparkContext().hadoopConfiguration()); generateEntities(spark, vocs, sourcePaths, targetPath); }); } @@ -82,7 +83,7 @@ public class GenerateEntitiesApplication { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final List existingSourcePaths = Arrays .stream(sourcePaths.split(",")) - .filter(p -> exists(sc, p)) + .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) .collect(Collectors.toList()); log.info("Generate entities from files:"); @@ -160,17 +161,4 @@ public class GenerateEntitiesApplication { } } - private static boolean exists(final JavaSparkContext context, final String pathToFile) { - try { - final FileSystem hdfs = FileSystem.get(context.hadoopConfiguration()); - final Path path = new Path(pathToFile); - return hdfs.exists(path); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - private static void removeOutputDir(final SparkSession spark, final String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); - } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index 6365a1db9..f6f0e0a36 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -1,15 +1,6 @@ package eu.dnetlib.dhp.oa.graph.raw; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.asString; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listKeyValues; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty; import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE; import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION; import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS; @@ -32,6 +23,7 @@ import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT; import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_RESULT; import static eu.dnetlib.dhp.schema.common.ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE; import static eu.dnetlib.dhp.schema.common.ModelConstants.USER_CLAIM; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.*; import java.io.Closeable; import java.io.IOException; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java index dea80fabd..e62bc0790 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java @@ -1,10 +1,10 @@ package eu.dnetlib.dhp.oa.graph.raw; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty; import static eu.dnetlib.dhp.schema.common.ModelConstants.*; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.field; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.structuredProperty; import java.util.ArrayList; import java.util.List; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java index 6fe7bb971..7124684d5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java @@ -1,10 +1,10 @@ package eu.dnetlib.dhp.oa.graph.raw; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty; import static eu.dnetlib.dhp.schema.common.ModelConstants.*; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.field; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.structuredProperty; import java.util.ArrayList; import java.util.Arrays; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/Vocabulary.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/Vocabulary.java index 9bf198c8b..bfc4fd6f1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/Vocabulary.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/Vocabulary.java @@ -10,6 +10,7 @@ import org.apache.commons.lang3.StringUtils; import com.google.common.collect.Maps; +import eu.dnetlib.dhp.schema.oaf.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.Qualifier; public class Vocabulary implements Serializable { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java index 334339d3b..32452bdc5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java @@ -7,6 +7,7 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import eu.dnetlib.dhp.schema.oaf.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json index 7d995f39a..a75ff3653 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json @@ -6,15 +6,21 @@ "paramRequired": false }, { - "paramName": "s", - "paramLongName": "sourcePath", - "paramDescription": "the source path", + "paramName": "ep", + "paramLongName": "entitiesPath", + "paramDescription": "the entities path", "paramRequired": true }, { "paramName": "g", - "paramLongName": "graphRawPath", - "paramDescription": "the path of the graph Raw in hdfs", + "paramLongName": "outputPath", + "paramDescription": "the output path to store the dispatched entities", + "paramRequired": true + }, + { + "paramName": "class", + "paramLongName": "graphTableClassName", + "paramDescription": "class name modelling the graph table", "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml index 3715d097d..70b2ea68e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + @@ -46,18 +46,18 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - Fuse graph entities by ID - eu.dnetlib.dhp.oa.graph.fuse.FuseGraphResultsSparkJob + group graph entities and relations + eu.dnetlib.dhp.oa.graph.groupbyid.GroupEntitiesAndRelationsSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -70,18 +70,29 @@ --conf spark.sql.shuffle.partitions=7680 --graphInputPath${graphInputPath} - --outputPath${workingDir}/entities + --outputPath${workingDir}/grouped_entities - + - + + + + + + + + + + + + yarn cluster - Merge datasets - eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + Dispatch publications + eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -93,22 +104,45 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/dataset - --prodInputPath${prodInputGgraphPath}/dataset - --outputPath${graphOutputPath}/dataset + --entitiesPath${workingDir}/grouped_entities + --outputPath${outputPath}/publication + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication + + + + + + + + yarn + cluster + Dispatch datasets + eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --entitiesPath${workingDir}/grouped_entities + --outputPath${outputPath}/dataset --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - --priority${priority} - + - + yarn cluster - Merge otherresearchproducts - eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + Dispatch softwares + eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -120,49 +154,20 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/otherresearchproduct - --prodInputPath${prodInputGgraphPath}/otherresearchproduct - --outputPath${graphOutputPath}/otherresearchproduct - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --priority${priority} - - - - - - - - yarn - cluster - Merge softwares - eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --betaInputPath${betaInputGgraphPath}/software - --prodInputPath${prodInputGgraphPath}/software - --outputPath${graphOutputPath}/software + --entitiesPath${workingDir}/grouped_entities + --outputPath${outputPath}/software --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - --priority${priority} - + - + yarn cluster - Merge datasources - eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + Dispatch otherresearchproducts + eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -174,22 +179,45 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/datasource - --prodInputPath${prodInputGgraphPath}/datasource - --outputPath${graphOutputPath}/datasource + --entitiesPath${workingDir}/grouped_entities + --outputPath${outputPath}/otherresearchproduct + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + + + + + + + + yarn + cluster + Dispatch datasources + eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --entitiesPath${workingDir}/grouped_entities + --outputPath${outputPath}/datasource --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource - --priority${priority} - + - + yarn cluster - Merge organizations - eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + Dispatch organizations + eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -201,22 +229,20 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/organization - --prodInputPath${prodInputGgraphPath}/organization - --outputPath${graphOutputPath}/organization + --entitiesPath${workingDir}/grouped_entities + --outputPath${outputPath}/organization --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization - --priority${priority} - + - + yarn cluster - Merge projects - eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + Dispatch project + eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -228,22 +254,20 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/project - --prodInputPath${prodInputGgraphPath}/project - --outputPath${graphOutputPath}/project + --entitiesPath${workingDir}/grouped_entities + --outputPath${outputPath}/project --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project - --priority${priority} - + - + yarn cluster - Merge relations - eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + Dispatch relations + eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -255,17 +279,15 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/relation - --prodInputPath${prodInputGgraphPath}/relation - --outputPath${graphOutputPath}/relation + --entitiesPath${workingDir}/grouped_entities + --outputPath${outputPath}/relation --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation - --priority${priority} - + - + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml index 604f515a5..5af117d68 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml @@ -2,11 +2,11 @@ - betaInputGgraphPath + betaInputGraphPath the beta graph root path - prodInputGgraphPath + prodInputGraphPath the production graph root path diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java index f663d6095..9cf75f208 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java @@ -27,14 +27,8 @@ import org.mockito.junit.jupiter.MockitoExtension; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; -import eu.dnetlib.dhp.schema.oaf.Datasource; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.Organization; -import eu.dnetlib.dhp.schema.oaf.Project; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.*; @ExtendWith(MockitoExtension.class) public class MigrateDbEntitiesApplicationTest { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java index d8eba31b6..b44ed7446 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java @@ -2,12 +2,11 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; @@ -28,13 +27,11 @@ import com.google.common.collect.Maps; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.provision.model.*; +import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; +import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; -import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; -import scala.collection.JavaConverters; -import scala.collection.Seq; /** * XmlConverterJob converts the JoinedEntities as XML records @@ -43,8 +40,6 @@ public class XmlConverterJob { private static final Logger log = LoggerFactory.getLogger(XmlConverterJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd"; public static void main(String[] args) throws Exception { @@ -129,10 +124,6 @@ public class XmlConverterJob { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } - private static Seq toSeq(List list) { - return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq(); - } - private static Map prepareAccumulators(SparkContext sc) { Map accumulators = Maps.newHashMap(); accumulators From 528231a287be05c251709773c62ed8386382e8a9 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 13 Nov 2020 15:37:48 +0100 Subject: [PATCH 4/9] grouping graph entities by id turned out to be an easy extension for the already existing cleaning workflow --- .../oa/graph/clean/CleanGraphSparkJob.java | 14 +- .../GroupEntitiesAndRelationsSparkJob.java | 2 +- .../groupbyid/DispatchEntitiesSparkJob.java | 97 ------ .../dhp/oa/graph/clean/oozie_app/workflow.xml | 26 +- .../groupbyid/oozie_app/config-default.xml | 18 -- .../oa/graph/groupbyid/oozie_app/workflow.xml | 293 ------------------ 6 files changed, 35 insertions(+), 415 deletions(-) rename dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/{groupbyid => clean}/GroupEntitiesAndRelationsSparkJob.java (99%) delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index 714b35dac..8231dd77e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -6,7 +6,9 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.Optional; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -69,12 +71,12 @@ public class CleanGraphSparkJob { conf, isSparkSessionManaged, spark -> { - removeOutputDir(spark, outputPath); - fixGraphTable(spark, vocs, inputPath, entityClazz, outputPath); + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + cleanGraphTable(spark, vocs, inputPath, entityClazz, outputPath); }); } - private static void fixGraphTable( + private static void cleanGraphTable( SparkSession spark, VocabularyGroup vocs, String inputPath, @@ -100,13 +102,15 @@ public class CleanGraphSparkJob { return spark .read() .textFile(inputEntityPath) + .filter((FilterFunction) s -> isEntityType(s, clazz)) + .map((MapFunction) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) .map( (MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); } - private static void removeOutputDir(SparkSession spark, String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + private static boolean isEntityType(final String s, final Class clazz) { + return StringUtils.substringBefore(s, "|").equals(clazz.getName()); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/GroupEntitiesAndRelationsSparkJob.java similarity index 99% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/GroupEntitiesAndRelationsSparkJob.java index 1d887cecb..9c80528e3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/GroupEntitiesAndRelationsSparkJob.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.oa.graph.groupbyid; +package eu.dnetlib.dhp.oa.graph.clean; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java deleted file mode 100644 index 1b4226411..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java +++ /dev/null @@ -1,97 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.groupbyid; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.util.Optional; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import scala.Tuple2; - -public class DispatchEntitiesSparkJob { - - private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - public static void main(final String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - MigrateMongoMdstoresApplication.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json"))); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String entitiesPath = parser.get("entitiesPath"); - log.info("entitiesPath: {}", entitiesPath); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - String graphTableClassName = parser.get("graphTableClassName"); - log.info("graphTableClassName: {}", graphTableClassName); - - Class entityClazz = (Class) Class.forName(graphTableClassName); - - SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - dispatchOaf(spark, entityClazz, entitiesPath, outputPath); - }); - } - - private static void dispatchOaf( - final SparkSession spark, - final Class clazz, - final String sourcePath, - final String targetPath) { - - log.info("Processing entities ({}) in file: {}", clazz.getName(), sourcePath); - - spark - .read() - .textFile(sourcePath) - .filter((FilterFunction) s -> isEntityType(s, clazz)) - .map((MapFunction) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) - .write() - .option("compression", "gzip") - .mode(SaveMode.Overwrite) - .text(targetPath); - } - - private static boolean isEntityType(final String s, final Class clazz) { - return StringUtils.substringBefore(s, "|").equals(clazz.getName()); - } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml index 7329df29a..8b6ca9de6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml @@ -50,12 +50,36 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + yarn + cluster + group graph entities and relations + eu.dnetlib.dhp.oa.graph.clean.GroupEntitiesAndRelationsSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --graphInputPath${graphInputPath} + --outputPath${workingDir}/grouped_entities + + + + + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/config-default.xml deleted file mode 100644 index 2e0ed9aee..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/config-default.xml +++ /dev/null @@ -1,18 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml deleted file mode 100644 index 70b2ea68e..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml +++ /dev/null @@ -1,293 +0,0 @@ - - - - - graphInputPath - the graph root input path - - - outputPath - the graph root output path - - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - oozieActionShareLibForSpark2 - oozie action sharelib for spark 2.* - - - spark2ExtraListeners - com.cloudera.spark.lineage.NavigatorAppListener - spark 2.* extra listeners classname - - - spark2SqlQueryExecutionListeners - com.cloudera.spark.lineage.NavigatorQueryListener - spark 2.* sql query execution listeners classname - - - spark2YarnHistoryServerAddress - spark 2.* yarn history server address - - - spark2EventLogDir - spark 2.* event log dir location - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - yarn - cluster - group graph entities and relations - eu.dnetlib.dhp.oa.graph.groupbyid.GroupEntitiesAndRelationsSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --graphInputPath${graphInputPath} - --outputPath${workingDir}/grouped_entities - - - - - - - - - - - - - - - - - - - yarn - cluster - Dispatch publications - eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --entitiesPath${workingDir}/grouped_entities - --outputPath${outputPath}/publication - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication - - - - - - - - yarn - cluster - Dispatch datasets - eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --entitiesPath${workingDir}/grouped_entities - --outputPath${outputPath}/dataset - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - - - - - - - - yarn - cluster - Dispatch softwares - eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --entitiesPath${workingDir}/grouped_entities - --outputPath${outputPath}/software - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - - - - - - - - yarn - cluster - Dispatch otherresearchproducts - eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --entitiesPath${workingDir}/grouped_entities - --outputPath${outputPath}/otherresearchproduct - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - - - - - - - - yarn - cluster - Dispatch datasources - eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --entitiesPath${workingDir}/grouped_entities - --outputPath${outputPath}/datasource - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource - - - - - - - - yarn - cluster - Dispatch organizations - eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --entitiesPath${workingDir}/grouped_entities - --outputPath${outputPath}/organization - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization - - - - - - - - yarn - cluster - Dispatch project - eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --entitiesPath${workingDir}/grouped_entities - --outputPath${outputPath}/project - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project - - - - - - - - yarn - cluster - Dispatch relations - eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --entitiesPath${workingDir}/grouped_entities - --outputPath${outputPath}/relation - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation - - - - - - - - - \ No newline at end of file From 2facfefc190b47496fbc2806c3ff9cacf365d3af Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 13 Nov 2020 15:38:40 +0100 Subject: [PATCH 5/9] updated maven repository URL --- dhp-build/dhp-code-style/pom.xml | 4 ++-- pom.xml | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dhp-build/dhp-code-style/pom.xml b/dhp-build/dhp-code-style/pom.xml index e60e8076e..77aa2aedb 100644 --- a/dhp-build/dhp-code-style/pom.xml +++ b/dhp-build/dhp-code-style/pom.xml @@ -15,12 +15,12 @@ dnet45-snapshots DNet45 Snapshots - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots + https://maven.d4science.org/nexus/content/repositories/dnet45-snapshots default dnet45-releases - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases + https://maven.d4science.org/nexus/content/repositories/dnet45-releases diff --git a/pom.xml b/pom.xml index 03c69108d..e9b90a765 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ dnet45-releases D-Net 45 releases - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases + https://maven.d4science.org/nexus/content/repositories/dnet45-releases default false @@ -639,12 +639,12 @@ dnet45-snapshots DNet45 Snapshots - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots + https://maven.d4science.org/nexus/content/repositories/dnet45-snapshots default dnet45-releases - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases + https://maven.d4science.org/nexus/content/repositories/dnet45-releases From 5d4e34e26afc5346828a01ae8b5648f2fd908aef Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Sat, 14 Nov 2020 10:32:26 +0100 Subject: [PATCH 6/9] fixed typo in variable name --- .../dhp/oa/graph/merge/oozie_app/workflow.xml | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml index 5af117d68..86fb51042 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml @@ -88,8 +88,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/publication - --prodInputPath${prodInputGgraphPath}/publication + --betaInputPath${betaInputGraphPath}/publication + --prodInputPath${prodInputGraphPath}/publication --outputPath${graphOutputPath}/publication --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication --priority${priority} @@ -115,8 +115,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/dataset - --prodInputPath${prodInputGgraphPath}/dataset + --betaInputPath${betaInputGraphPath}/dataset + --prodInputPath${prodInputGraphPath}/dataset --outputPath${graphOutputPath}/dataset --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset --priority${priority} @@ -142,8 +142,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/otherresearchproduct - --prodInputPath${prodInputGgraphPath}/otherresearchproduct + --betaInputPath${betaInputGraphPath}/otherresearchproduct + --prodInputPath${prodInputGraphPath}/otherresearchproduct --outputPath${graphOutputPath}/otherresearchproduct --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --priority${priority} @@ -169,8 +169,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/software - --prodInputPath${prodInputGgraphPath}/software + --betaInputPath${betaInputGraphPath}/software + --prodInputPath${prodInputGraphPath}/software --outputPath${graphOutputPath}/software --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software --priority${priority} @@ -196,8 +196,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/datasource - --prodInputPath${prodInputGgraphPath}/datasource + --betaInputPath${betaInputGraphPath}/datasource + --prodInputPath${prodInputGraphPath}/datasource --outputPath${graphOutputPath}/datasource --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource --priority${priority} @@ -223,8 +223,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/organization - --prodInputPath${prodInputGgraphPath}/organization + --betaInputPath${betaInputGraphPath}/organization + --prodInputPath${prodInputGraphPath}/organization --outputPath${graphOutputPath}/organization --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization --priority${priority} @@ -250,8 +250,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/project - --prodInputPath${prodInputGgraphPath}/project + --betaInputPath${betaInputGraphPath}/project + --prodInputPath${prodInputGraphPath}/project --outputPath${graphOutputPath}/project --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project --priority${priority} @@ -277,8 +277,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/relation - --prodInputPath${prodInputGgraphPath}/relation + --betaInputPath${betaInputGraphPath}/relation + --prodInputPath${prodInputGraphPath}/relation --outputPath${graphOutputPath}/relation --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation --priority${priority} From 331d62180060caee96f38431a65cae0960be7eca Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Sat, 14 Nov 2020 12:16:15 +0100 Subject: [PATCH 7/9] added test resource --- .../eu/dnetlib/dhp/oa/graph/raw/oaf_orp.xml | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_orp.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_orp.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_orp.xml new file mode 100644 index 000000000..6c83073de --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_orp.xml @@ -0,0 +1,84 @@ + + +
+ pensoft_____::00ea4a1cd53806a97d62ea6bf268f2a2 + 10.3897/oneeco.2.e13718 + + + + + + 2020-03-23T00:20:51.392Z + 2020-03-23T00:26:59.078Z + pensoft_____ +
+ + Ecosystem Service capacity is higher in areas of multiple designation types + Nikolaidou,Charitini + Votsi,Nefta + Sgardelis,Steanos + Halley,John + Pantis,John + Tsiafouli,Maria + 2017 + The implementation of the Ecosystem Service (ES) concept into practice might be a challenging task as it has to take into account previous “traditional” policies and approaches that have evaluated nature and biodiversity differently. Among them the Habitat (92/43/EC) and Bird Directives (79/409/EC), the Water Framework Directive (2000/60/EC), and the Noise Directive (2002/49/EC) have led to the evaluation/designation of areas in Europe with different criteria. In this study our goal was to understand how the ES capacity of an area is related to its designation and if areas with multiple designations have higher capacity in providing ES. We selected four catchments in Greece with a great variety of characteristics covering over 25% of the national territory. Inside the catchments we assessed the ES capacity (following the methodology of Burkhard et al. 2009) of areas designated as Natura 2000 sites, Quiet areas and Wetlands or Water bodies and found those areas that have multiple designations. Data were analyzed by GLM to reveal differences regarding the ES capacity among the different types of areas. We also investigated by PCA synergies and trade-offs among different kinds of ES and tested for correlations among landscape properties, such as elevation, aspect and slope and the ES potential. Our results show that areas with different types or multiple designations have a different capacity in providing ES. Areas of one designation type (Protected or Quiet Areas) had in general intermediate scores in most ES but scores were higher compared to areas with no designation, which displayed stronger capacity in provisioning services. Among Protected Areas and Quiet Areas the latter scored better in general. Areas that combined both designation types (Protected and Quiet Areas) showed the highest capacity in 13 out of 29 ES, that were mostly linked with natural and forest ecosystems. We found significant synergies among most regulating, supporting and cultural ES which in turn display trade-offs with provisioning services. The different ES are spatially related and display strong correlation with landscape properties, such as elevation and slope. We suggest that the designation status of an area can be used as an alternative tool for environmental policy, indicating the capacity for ES provision. Multiple designations of areas can be used as proxies for locating ES “hotspots”. This integration of “traditional” evaluation and designation and the “newer” ES concept forms a time- and cost-effective way to be adopted by stakeholders and policy-makers in order to start complying with new standards and demands for nature conservation and environmental management. + text/html + https://doi.org/10.3897/oneeco.2.e13718 + https://oneecosystem.pensoft.net/article/13718/ + eng + Pensoft Publishers + info:eu-repo/semantics/altIdentifier/eissn/2367-8194 + info:eu-repo/grantAgreement/EC/FP7/226852 + One Ecosystem 2: e13718 + One Ecosystem 2: e13718 + One Ecosystem 2: e13718 + Ecosystem Services hotspots + Natura 2000 + Quiet Protected Areas + Biodiversity + Agriculture + Elevation + Slope + Ecosystem Service trade-offs and synergies + cultural services + provisioning services + regulating services + supporting services + Research Article + + 0020 + 2017-01-01 + corda_______::226852 + OPEN + + + 10.3897/oneeco.2.e13718 + https://oneecosystem.pensoft.net/article/13718/ + One Ecosystem + 0001 + + + + + http%3A%2F%2Fzookeys.pensoft.net%2Foai.php + 10.3897/oneeco.2.e13718 + 2017-09-08 + http://www.openarchives.org/OAI/2.0/oai_dc/ + + + + false + false + 0.9 + + + + +
\ No newline at end of file From 4de8c8b237dc8a93a9d4ed3fe7b8745bb4f567d3 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 16 Nov 2020 10:03:11 +0100 Subject: [PATCH 8/9] fixed workflow variable name --- .../dhp/oa/graph/clean/oozie_app/workflow.xml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml index 8b6ca9de6..992d8c40e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml @@ -108,7 +108,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/publication + --inputPath${workingDir}/grouped_entities --outputPath${graphOutputPath}/publication --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication --isLookupUrl${isLookupUrl} @@ -134,7 +134,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/dataset + --inputPath${workingDir}/grouped_entities --outputPath${graphOutputPath}/dataset --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset --isLookupUrl${isLookupUrl} @@ -160,7 +160,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/otherresearchproduct + --inputPath${workingDir}/grouped_entities --outputPath${graphOutputPath}/otherresearchproduct --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --isLookupUrl${isLookupUrl} @@ -186,7 +186,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/software + --inputPath${workingDir}/grouped_entities --outputPath${graphOutputPath}/software --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software --isLookupUrl${isLookupUrl} @@ -212,7 +212,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/datasource + --inputPath${workingDir}/grouped_entities --outputPath${graphOutputPath}/datasource --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource --isLookupUrl${isLookupUrl} @@ -238,7 +238,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/organization + --inputPath${workingDir}/grouped_entities --outputPath${graphOutputPath}/organization --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization --isLookupUrl${isLookupUrl} @@ -264,7 +264,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/project + --inputPath${workingDir}/grouped_entities --outputPath${graphOutputPath}/project --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project --isLookupUrl${isLookupUrl} @@ -290,7 +290,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/relation + --inputPath${workingDir}/grouped_entities --outputPath${graphOutputPath}/relation --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation --isLookupUrl${isLookupUrl} From 6ab1ce53c9e96fe719097588b003ad55609c3b8b Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 16 Nov 2020 10:09:17 +0100 Subject: [PATCH 9/9] fixed condition in result pid cleaning; cleanup --- .../dhp/oa/graph/clean/CleaningFunctions.java | 2 +- .../dispatch_entities_bytype_parameters.json | 26 ------------------- 2 files changed, 1 insertion(+), 27 deletions(-) delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java index 9f06ea056..4bcce8037 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java @@ -114,7 +114,7 @@ public class CleaningFunctions { .stream() .filter(Objects::nonNull) .filter(sp -> StringUtils.isNotBlank(StringUtils.trim(sp.getValue()))) - .filter(sp -> NONE.equalsIgnoreCase(sp.getValue())) + .filter(sp -> !NONE.equalsIgnoreCase(sp.getValue().trim())) .filter(sp -> Objects.nonNull(sp.getQualifier())) .filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid())) .map(sp -> { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json deleted file mode 100644 index a75ff3653..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json +++ /dev/null @@ -1,26 +0,0 @@ -[ - { - "paramName": "issm", - "paramLongName": "isSparkSessionManaged", - "paramDescription": "when true will stop SparkSession after job execution", - "paramRequired": false - }, - { - "paramName": "ep", - "paramLongName": "entitiesPath", - "paramDescription": "the entities path", - "paramRequired": true - }, - { - "paramName": "g", - "paramLongName": "outputPath", - "paramDescription": "the output path to store the dispatched entities", - "paramRequired": true - }, - { - "paramName": "class", - "paramLongName": "graphTableClassName", - "paramDescription": "class name modelling the graph table", - "paramRequired": true - } -] \ No newline at end of file