diff --git a/dhp-build/dhp-code-style/pom.xml b/dhp-build/dhp-code-style/pom.xml
index e60e8076e..77aa2aedb 100644
--- a/dhp-build/dhp-code-style/pom.xml
+++ b/dhp-build/dhp-code-style/pom.xml
@@ -15,12 +15,12 @@
dnet45-snapshots
DNet45 Snapshots
- http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots
+ https://maven.d4science.org/nexus/content/repositories/dnet45-snapshots
default
dnet45-releases
- http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases
+ https://maven.d4science.org/nexus/content/repositories/dnet45-releases
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java
similarity index 85%
rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java
rename to dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java
index 84b29e3d4..4a66f91dc 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java
@@ -1,5 +1,5 @@
-package eu.dnetlib.dhp.oa.graph.raw.common;
+package eu.dnetlib.dhp.schema.oaf;
import java.util.ArrayList;
import java.util.Arrays;
@@ -13,19 +13,43 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
-import eu.dnetlib.dhp.schema.oaf.DataInfo;
-import eu.dnetlib.dhp.schema.oaf.ExtraInfo;
-import eu.dnetlib.dhp.schema.oaf.Field;
-import eu.dnetlib.dhp.schema.oaf.Journal;
-import eu.dnetlib.dhp.schema.oaf.KeyValue;
-import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
-import eu.dnetlib.dhp.schema.oaf.OriginDescription;
-import eu.dnetlib.dhp.schema.oaf.Qualifier;
-import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.utils.DHPUtils;
public class OafMapperUtils {
+ public static Oaf merge(final Oaf o1, final Oaf o2) {
+ if (ModelSupport.isSubClass(o1, OafEntity.class)) {
+ if (ModelSupport.isSubClass(o1, Result.class)) {
+
+ return mergeResults((Result) o1, (Result) o2);
+ } else if (ModelSupport.isSubClass(o1, Datasource.class)) {
+ ((Datasource) o1).mergeFrom((Datasource) o2);
+ } else if (ModelSupport.isSubClass(o1, Organization.class)) {
+ ((Organization) o1).mergeFrom((Organization) o2);
+ } else if (ModelSupport.isSubClass(o1, Project.class)) {
+ ((Project) o1).mergeFrom((Project) o2);
+ } else {
+ throw new RuntimeException("invalid OafEntity subtype:" + o1.getClass().getCanonicalName());
+ }
+ } else if (ModelSupport.isSubClass(o1, Relation.class)) {
+ ((Relation) o1).mergeFrom((Relation) o2);
+ } else {
+ throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName());
+ }
+ return o1;
+ }
+
+ public static Result mergeResults(Result r1, Result r2) {
+ if (new ResultTypeComparator().compare(r1, r2) < 0) {
+ r1.mergeFrom(r2);
+ return r1;
+ } else {
+ r2.mergeFrom(r1);
+ return r2;
+ }
+ }
+
public static KeyValue keyValue(final String k, final String v) {
final KeyValue kv = new KeyValue();
kv.setKey(k);
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java
new file mode 100644
index 000000000..6c11d1a85
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java
@@ -0,0 +1,49 @@
+
+package eu.dnetlib.dhp.schema.oaf;
+
+import java.util.Comparator;
+
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+
+public class ResultTypeComparator implements Comparator {
+
+ @Override
+ public int compare(Result left, Result right) {
+
+ if (left == null && right == null)
+ return 0;
+ if (left == null)
+ return 1;
+ if (right == null)
+ return -1;
+
+ String lClass = left.getResulttype().getClassid();
+ String rClass = right.getResulttype().getClassid();
+
+ if (lClass.equals(rClass))
+ return 0;
+
+ if (lClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
+ return -1;
+ if (rClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
+ return 1;
+
+ if (lClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
+ return -1;
+ if (rClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
+ return 1;
+
+ if (lClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
+ return -1;
+ if (rClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
+ return 1;
+
+ if (lClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
+ return -1;
+ if (rClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
+ return 1;
+
+ // Else (but unlikely), lexicographical ordering will do.
+ return lClass.compareTo(rClass);
+ }
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java
index dfbaf3a6c..8872174a5 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java
@@ -5,6 +5,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
+import java.util.List;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@@ -15,9 +16,15 @@ import org.apache.commons.codec.binary.Hex;
import com.jayway.jsonpath.JsonPath;
import net.minidev.json.JSONArray;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
public class DHPUtils {
+ public static Seq toSeq(List list) {
+ return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq();
+ }
+
public static String md5(final String s) {
try {
final MessageDigest md = MessageDigest.getInstance("MD5");
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java
index e295b9503..8231dd77e 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java
@@ -3,14 +3,12 @@ package eu.dnetlib.dhp.oa.graph.clean;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
-import java.io.BufferedInputStream;
-import java.util.Objects;
import java.util.Optional;
-import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@@ -23,11 +21,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
-import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
-import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
-import eu.dnetlib.dhp.schema.common.ModelConstants;
-import eu.dnetlib.dhp.schema.oaf.*;
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@@ -75,12 +71,12 @@ public class CleanGraphSparkJob {
conf,
isSparkSessionManaged,
spark -> {
- removeOutputDir(spark, outputPath);
- fixGraphTable(spark, vocs, inputPath, entityClazz, outputPath);
+ HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
+ cleanGraphTable(spark, vocs, inputPath, entityClazz, outputPath);
});
}
- private static void fixGraphTable(
+ private static void cleanGraphTable(
SparkSession spark,
VocabularyGroup vocs,
String inputPath,
@@ -106,13 +102,15 @@ public class CleanGraphSparkJob {
return spark
.read()
.textFile(inputEntityPath)
+ .filter((FilterFunction) s -> isEntityType(s, clazz))
+ .map((MapFunction) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING())
.map(
(MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz),
Encoders.bean(clazz));
}
- private static void removeOutputDir(SparkSession spark, String path) {
- HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ private static boolean isEntityType(final String s, final Class clazz) {
+ return StringUtils.substringBefore(s, "|").equals(clazz.getName());
}
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java
index 56a4aaf5a..4bcce8037 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java
@@ -11,7 +11,6 @@ import org.apache.commons.lang3.StringUtils;
import com.clearspring.analytics.util.Lists;
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
-import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
@@ -115,7 +114,7 @@ public class CleaningFunctions {
.stream()
.filter(Objects::nonNull)
.filter(sp -> StringUtils.isNotBlank(StringUtils.trim(sp.getValue())))
- .filter(sp -> NONE.equalsIgnoreCase(sp.getValue()))
+ .filter(sp -> !NONE.equalsIgnoreCase(sp.getValue().trim()))
.filter(sp -> Objects.nonNull(sp.getQualifier()))
.filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
.map(sp -> {
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/GroupEntitiesAndRelationsSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/GroupEntitiesAndRelationsSparkJob.java
new file mode 100644
index 000000000..9c80528e3
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/GroupEntitiesAndRelationsSparkJob.java
@@ -0,0 +1,206 @@
+
+package eu.dnetlib.dhp.oa.graph.clean;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FilterFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.expressions.Aggregator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.Option;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.*;
+import scala.Tuple2;
+
+/**
+ * Groups the graph content by entity identifier to ensure ID uniqueness
+ */
+public class GroupEntitiesAndRelationsSparkJob {
+
+ private static final Logger log = LoggerFactory.getLogger(GroupEntitiesAndRelationsSparkJob.class);
+
+ private final static String ID_JPATH = "$.id";
+
+ private final static String SOURCE_JPATH = "$.source";
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static void main(String[] args) throws Exception {
+
+ String jsonConfiguration = IOUtils
+ .toString(
+ GroupEntitiesAndRelationsSparkJob.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json"));
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ String graphInputPath = parser.get("graphInputPath");
+ log.info("graphInputPath: {}", graphInputPath);
+
+ String outputPath = parser.get("outputPath");
+ log.info("outputPath: {}", outputPath);
+
+ SparkConf conf = new SparkConf();
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.registerKryoClasses(ModelSupport.getOafModelClasses());
+
+ runWithSparkSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
+ groupEntitiesAndRelations(spark, graphInputPath, outputPath);
+ });
+ }
+
+ private static void groupEntitiesAndRelations(
+ SparkSession spark,
+ String inputPath,
+ String outputPath) {
+
+ TypedColumn aggregator = new GroupingAggregator().toColumn();
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+ spark
+ .read()
+ .textFile(toSeq(listPaths(inputPath, sc)))
+ .map((MapFunction) s -> parseOaf(s), Encoders.kryo(Oaf.class))
+ .filter((FilterFunction) oaf -> StringUtils.isNotBlank(ModelSupport.idFn().apply(oaf)))
+ .groupByKey((MapFunction) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING())
+ .agg(aggregator)
+ .map(
+ (MapFunction, String>) t -> t._2().getClass().getName() +
+ "|" + OBJECT_MAPPER.writeValueAsString(t._2()),
+ Encoders.STRING())
+ .write()
+ .option("compression", "gzip")
+ .mode(SaveMode.Overwrite)
+ .text(outputPath);
+ }
+
+ public static class GroupingAggregator extends Aggregator {
+
+ @Override
+ public Oaf zero() {
+ return null;
+ }
+
+ @Override
+ public Oaf reduce(Oaf b, Oaf a) {
+ return mergeAndGet(b, a);
+ }
+
+ private Oaf mergeAndGet(Oaf b, Oaf a) {
+ if (Objects.nonNull(a) && Objects.nonNull(b)) {
+ return OafMapperUtils.merge(b, a);
+ }
+ return Objects.isNull(a) ? b : a;
+ }
+
+ @Override
+ public Oaf merge(Oaf b, Oaf a) {
+ return mergeAndGet(b, a);
+ }
+
+ @Override
+ public Oaf finish(Oaf j) {
+ return j;
+ }
+
+ @Override
+ public Encoder bufferEncoder() {
+ return Encoders.kryo(Oaf.class);
+ }
+
+ @Override
+ public Encoder outputEncoder() {
+ return Encoders.kryo(Oaf.class);
+ }
+
+ }
+
+ private static Oaf parseOaf(String s) {
+
+ DocumentContext dc = JsonPath
+ .parse(s, Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS));
+ final String id = dc.read(ID_JPATH);
+ if (StringUtils.isNotBlank(id)) {
+
+ String prefix = StringUtils.substringBefore(id, "|");
+ switch (prefix) {
+ case "10":
+ return parse(s, Datasource.class);
+ case "20":
+ return parse(s, Organization.class);
+ case "40":
+ return parse(s, Project.class);
+ case "50":
+ String resultType = dc.read("$.resulttype.classid");
+ switch (resultType) {
+ case "publication":
+ return parse(s, Publication.class);
+ case "dataset":
+ return parse(s, eu.dnetlib.dhp.schema.oaf.Dataset.class);
+ case "software":
+ return parse(s, Software.class);
+ case "other":
+ return parse(s, OtherResearchProduct.class);
+ default:
+ throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType));
+ }
+ default:
+ throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix));
+ }
+ } else {
+ String source = dc.read(SOURCE_JPATH);
+ if (StringUtils.isNotBlank(source)) {
+ return parse(s, Relation.class);
+ } else {
+ throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s));
+ }
+ }
+ }
+
+ private static Oaf parse(String s, Class clazz) {
+ try {
+ return OBJECT_MAPPER.readValue(s, clazz);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static List listPaths(String inputPath, JavaSparkContext sc) {
+ return HdfsSupport
+ .listFiles(inputPath, sc.hadoopConfiguration())
+ .stream()
+ .collect(Collectors.toList());
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java
similarity index 98%
rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java
rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java
index 037683604..e53f4ca30 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java
@@ -33,9 +33,9 @@ import scala.Tuple2;
* are picked preferring those from the BETA aggregator rather then from PROD. The identity of a relationship is defined
* by eu.dnetlib.dhp.schema.common.ModelSupport#idFn()
*/
-public class MergeGraphSparkJob {
+public class MergeGraphTableSparkJob {
- private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class);
+ private static final Logger log = LoggerFactory.getLogger(MergeGraphTableSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
index 5b6ae72f1..da4b5e324 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
@@ -1,8 +1,8 @@
package eu.dnetlib.dhp.oa.graph.raw;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
+import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.*;
import java.util.*;
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java
index 3568dc52a..cfd190670 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java
@@ -29,16 +29,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelSupport;
-import eu.dnetlib.dhp.schema.oaf.Dataset;
-import eu.dnetlib.dhp.schema.oaf.Datasource;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
-import eu.dnetlib.dhp.schema.oaf.OafEntity;
-import eu.dnetlib.dhp.schema.oaf.Organization;
-import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
-import eu.dnetlib.dhp.schema.oaf.Project;
-import eu.dnetlib.dhp.schema.oaf.Publication;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.oaf.Software;
+import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Tuple2;
@@ -78,7 +69,7 @@ public class GenerateEntitiesApplication {
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
- removeOutputDir(spark, targetPath);
+ HdfsSupport.remove(targetPath, spark.sparkContext().hadoopConfiguration());
generateEntities(spark, vocs, sourcePaths, targetPath);
});
}
@@ -92,7 +83,7 @@ public class GenerateEntitiesApplication {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final List existingSourcePaths = Arrays
.stream(sourcePaths.split(","))
- .filter(p -> exists(sc, p))
+ .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()))
.collect(Collectors.toList());
log.info("Generate entities from files:");
@@ -113,7 +104,7 @@ public class GenerateEntitiesApplication {
inputRdd
.mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
- .reduceByKey((o1, o2) -> merge(o1, o2))
+ .reduceByKey((o1, o2) -> OafMapperUtils.merge(o1, o2))
.map(Tuple2::_2)
.map(
oaf -> oaf.getClass().getSimpleName().toLowerCase()
@@ -122,17 +113,6 @@ public class GenerateEntitiesApplication {
.saveAsTextFile(targetPath, GzipCodec.class);
}
- private static Oaf merge(final Oaf o1, final Oaf o2) {
- if (ModelSupport.isSubClass(o1, OafEntity.class)) {
- ((OafEntity) o1).mergeFrom((OafEntity) o2);
- } else if (ModelSupport.isSubClass(o1, Relation.class)) {
- ((Relation) o1).mergeFrom((Relation) o2);
- } else {
- throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName());
- }
- return o1;
- }
-
private static List convertToListOaf(
final String id,
final String s,
@@ -181,17 +161,4 @@ public class GenerateEntitiesApplication {
}
}
- private static boolean exists(final JavaSparkContext context, final String pathToFile) {
- try {
- final FileSystem hdfs = FileSystem.get(context.hadoopConfiguration());
- final Path path = new Path(pathToFile);
- return hdfs.exists(path);
- } catch (final IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private static void removeOutputDir(final SparkSession spark, final String path) {
- HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
- }
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java
index 6365a1db9..b6210013c 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java
@@ -1,15 +1,6 @@
package eu.dnetlib.dhp.oa.graph.raw;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.asString;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listKeyValues;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
@@ -32,6 +23,7 @@ import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_RESULT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.USER_CLAIM;
+import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.*;
import java.io.Closeable;
import java.io.IOException;
@@ -527,9 +519,12 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
final Boolean deletedbyinference = rs.getBoolean("deletedbyinference");
final String inferenceprovenance = rs.getString("inferenceprovenance");
final Boolean inferred = rs.getBoolean("inferred");
- final String trust = rs.getString("trust");
+
+ final double trust = rs.getDouble("trust");
+
return dataInfo(
- deletedbyinference, inferenceprovenance, inferred, false, ENTITYREGISTRY_PROVENANCE_ACTION, trust);
+ deletedbyinference, inferenceprovenance, inferred, false, ENTITYREGISTRY_PROVENANCE_ACTION,
+ String.format("%.3f", trust));
}
private Qualifier prepareQualifierSplitting(final String s) {
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java
index dea80fabd..e62bc0790 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java
@@ -1,10 +1,10 @@
package eu.dnetlib.dhp.oa.graph.raw;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
+import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId;
+import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.field;
+import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.structuredProperty;
import java.util.ArrayList;
import java.util.List;
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java
index 6fe7bb971..7124684d5 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java
@@ -1,10 +1,10 @@
package eu.dnetlib.dhp.oa.graph.raw;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
+import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId;
+import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.field;
+import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.structuredProperty;
import java.util.ArrayList;
import java.util.Arrays;
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/Vocabulary.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/Vocabulary.java
index 9bf198c8b..bfc4fd6f1 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/Vocabulary.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/Vocabulary.java
@@ -10,6 +10,7 @@ import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Maps;
+import eu.dnetlib.dhp.schema.oaf.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class Vocabulary implements Serializable {
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java
index 334339d3b..32452bdc5 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java
@@ -7,6 +7,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
+import eu.dnetlib.dhp.schema.oaf.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml
index 7329df29a..992d8c40e 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml
@@ -50,12 +50,36 @@
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+ yarn
+ cluster
+ group graph entities and relations
+ eu.dnetlib.dhp.oa.graph.clean.GroupEntitiesAndRelationsSparkJob
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=7680
+
+ --graphInputPath${graphInputPath}
+ --outputPath${workingDir}/grouped_entities
+
+
+
+
+
@@ -84,7 +108,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --inputPath${graphInputPath}/publication
+ --inputPath${workingDir}/grouped_entities
--outputPath${graphOutputPath}/publication
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication
--isLookupUrl${isLookupUrl}
@@ -110,7 +134,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --inputPath${graphInputPath}/dataset
+ --inputPath${workingDir}/grouped_entities
--outputPath${graphOutputPath}/dataset
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset
--isLookupUrl${isLookupUrl}
@@ -136,7 +160,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --inputPath${graphInputPath}/otherresearchproduct
+ --inputPath${workingDir}/grouped_entities
--outputPath${graphOutputPath}/otherresearchproduct
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct
--isLookupUrl${isLookupUrl}
@@ -162,7 +186,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --inputPath${graphInputPath}/software
+ --inputPath${workingDir}/grouped_entities
--outputPath${graphOutputPath}/software
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software
--isLookupUrl${isLookupUrl}
@@ -188,7 +212,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --inputPath${graphInputPath}/datasource
+ --inputPath${workingDir}/grouped_entities
--outputPath${graphOutputPath}/datasource
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource
--isLookupUrl${isLookupUrl}
@@ -214,7 +238,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --inputPath${graphInputPath}/organization
+ --inputPath${workingDir}/grouped_entities
--outputPath${graphOutputPath}/organization
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization
--isLookupUrl${isLookupUrl}
@@ -240,7 +264,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --inputPath${graphInputPath}/project
+ --inputPath${workingDir}/grouped_entities
--outputPath${graphOutputPath}/project
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project
--isLookupUrl${isLookupUrl}
@@ -266,7 +290,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --inputPath${graphInputPath}/relation
+ --inputPath${workingDir}/grouped_entities
--outputPath${graphOutputPath}/relation
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation
--isLookupUrl${isLookupUrl}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json
new file mode 100644
index 000000000..e65acb3c4
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json
@@ -0,0 +1,20 @@
+[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+ },
+ {
+ "paramName": "gin",
+ "paramLongName": "graphInputPath",
+ "paramDescription": "the graph root path",
+ "paramRequired": true
+ },
+ {
+ "paramName": "out",
+ "paramLongName": "outputPath",
+ "paramDescription": "the output merged graph root path",
+ "paramRequired": true
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml
index 07a125fb6..86fb51042 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml
@@ -2,11 +2,11 @@
- betaInputGgraphPath
+ betaInputGraphPath
the beta graph root path
- prodInputGgraphPath
+ prodInputGraphPath
the production graph root path
@@ -76,7 +76,7 @@
yarn
cluster
Merge publications
- eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob
+ eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob
dhp-graph-mapper-${projectVersion}.jar
--executor-cores=${sparkExecutorCores}
@@ -88,8 +88,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --betaInputPath${betaInputGgraphPath}/publication
- --prodInputPath${prodInputGgraphPath}/publication
+ --betaInputPath${betaInputGraphPath}/publication
+ --prodInputPath${prodInputGraphPath}/publication
--outputPath${graphOutputPath}/publication
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication
--priority${priority}
@@ -103,7 +103,7 @@
yarn
cluster
Merge datasets
- eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob
+ eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob
dhp-graph-mapper-${projectVersion}.jar
--executor-cores=${sparkExecutorCores}
@@ -115,8 +115,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --betaInputPath${betaInputGgraphPath}/dataset
- --prodInputPath${prodInputGgraphPath}/dataset
+ --betaInputPath${betaInputGraphPath}/dataset
+ --prodInputPath${prodInputGraphPath}/dataset
--outputPath${graphOutputPath}/dataset
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset
--priority${priority}
@@ -130,7 +130,7 @@
yarn
cluster
Merge otherresearchproducts
- eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob
+ eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob
dhp-graph-mapper-${projectVersion}.jar
--executor-cores=${sparkExecutorCores}
@@ -142,8 +142,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --betaInputPath${betaInputGgraphPath}/otherresearchproduct
- --prodInputPath${prodInputGgraphPath}/otherresearchproduct
+ --betaInputPath${betaInputGraphPath}/otherresearchproduct
+ --prodInputPath${prodInputGraphPath}/otherresearchproduct
--outputPath${graphOutputPath}/otherresearchproduct
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct
--priority${priority}
@@ -157,7 +157,7 @@
yarn
cluster
Merge softwares
- eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob
+ eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob
dhp-graph-mapper-${projectVersion}.jar
--executor-cores=${sparkExecutorCores}
@@ -169,8 +169,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --betaInputPath${betaInputGgraphPath}/software
- --prodInputPath${prodInputGgraphPath}/software
+ --betaInputPath${betaInputGraphPath}/software
+ --prodInputPath${prodInputGraphPath}/software
--outputPath${graphOutputPath}/software
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software
--priority${priority}
@@ -184,7 +184,7 @@
yarn
cluster
Merge datasources
- eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob
+ eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob
dhp-graph-mapper-${projectVersion}.jar
--executor-cores=${sparkExecutorCores}
@@ -196,8 +196,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --betaInputPath${betaInputGgraphPath}/datasource
- --prodInputPath${prodInputGgraphPath}/datasource
+ --betaInputPath${betaInputGraphPath}/datasource
+ --prodInputPath${prodInputGraphPath}/datasource
--outputPath${graphOutputPath}/datasource
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource
--priority${priority}
@@ -211,7 +211,7 @@
yarn
cluster
Merge organizations
- eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob
+ eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob
dhp-graph-mapper-${projectVersion}.jar
--executor-cores=${sparkExecutorCores}
@@ -223,8 +223,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --betaInputPath${betaInputGgraphPath}/organization
- --prodInputPath${prodInputGgraphPath}/organization
+ --betaInputPath${betaInputGraphPath}/organization
+ --prodInputPath${prodInputGraphPath}/organization
--outputPath${graphOutputPath}/organization
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization
--priority${priority}
@@ -238,7 +238,7 @@
yarn
cluster
Merge projects
- eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob
+ eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob
dhp-graph-mapper-${projectVersion}.jar
--executor-cores=${sparkExecutorCores}
@@ -250,8 +250,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --betaInputPath${betaInputGgraphPath}/project
- --prodInputPath${prodInputGgraphPath}/project
+ --betaInputPath${betaInputGraphPath}/project
+ --prodInputPath${prodInputGraphPath}/project
--outputPath${graphOutputPath}/project
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project
--priority${priority}
@@ -265,7 +265,7 @@
yarn
cluster
Merge relations
- eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob
+ eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob
dhp-graph-mapper-${projectVersion}.jar
--executor-cores=${sparkExecutorCores}
@@ -277,8 +277,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --betaInputPath${betaInputGgraphPath}/relation
- --prodInputPath${prodInputGgraphPath}/relation
+ --betaInputPath${betaInputGraphPath}/relation
+ --prodInputPath${prodInputGraphPath}/relation
--outputPath${graphOutputPath}/relation
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation
--priority${priority}
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJobTest.java
similarity index 90%
rename from dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java
rename to dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJobTest.java
index 28e8e5abc..0089811cf 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJobTest.java
@@ -15,7 +15,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.Datasource;
-public class MergeGraphSparkJobTest {
+public class MergeGraphTableSparkJobTest {
private ObjectMapper mapper;
@@ -28,7 +28,7 @@ public class MergeGraphSparkJobTest {
public void testMergeDatasources() throws IOException {
assertEquals(
"openaire-cris_1.1",
- MergeGraphSparkJob
+ MergeGraphTableSparkJob
.mergeDatasource(
d("datasource_cris.json"),
d("datasource_UNKNOWN.json"))
@@ -36,7 +36,7 @@ public class MergeGraphSparkJobTest {
.getClassid());
assertEquals(
"openaire-cris_1.1",
- MergeGraphSparkJob
+ MergeGraphTableSparkJob
.mergeDatasource(
d("datasource_UNKNOWN.json"),
d("datasource_cris.json"))
@@ -44,7 +44,7 @@ public class MergeGraphSparkJobTest {
.getClassid());
assertEquals(
"driver-openaire2.0",
- MergeGraphSparkJob
+ MergeGraphTableSparkJob
.mergeDatasource(
d("datasource_native.json"),
d("datasource_driver-openaire2.0.json"))
@@ -52,7 +52,7 @@ public class MergeGraphSparkJobTest {
.getClassid());
assertEquals(
"driver-openaire2.0",
- MergeGraphSparkJob
+ MergeGraphTableSparkJob
.mergeDatasource(
d("datasource_driver-openaire2.0.json"),
d("datasource_native.json"))
@@ -60,7 +60,7 @@ public class MergeGraphSparkJobTest {
.getClassid());
assertEquals(
"openaire4.0",
- MergeGraphSparkJob
+ MergeGraphTableSparkJob
.mergeDatasource(
d("datasource_notCompatible.json"),
d("datasource_openaire4.0.json"))
@@ -68,7 +68,7 @@ public class MergeGraphSparkJobTest {
.getClassid());
assertEquals(
"notCompatible",
- MergeGraphSparkJob
+ MergeGraphTableSparkJob
.mergeDatasource(
d("datasource_notCompatible.json"),
d("datasource_UNKNOWN.json"))
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java
new file mode 100644
index 000000000..705f1dddb
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java
@@ -0,0 +1,99 @@
+
+package eu.dnetlib.dhp.oa.graph.raw;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.lenient;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
+import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.oaf.*;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+
+@ExtendWith(MockitoExtension.class)
+public class GenerateEntitiesApplicationTest {
+
+ @Mock
+ private ISLookUpService isLookUpService;
+
+ @Mock
+ private VocabularyGroup vocs;
+
+ @BeforeEach
+ public void setUp() throws IOException, ISLookUpException {
+
+ lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs());
+ lenient()
+ .when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY))
+ .thenReturn(synonyms());
+
+ vocs = VocabularyGroup.loadVocsFromIS(isLookUpService);
+ }
+
+ @Test
+ public void testMergeResult() throws IOException {
+ Result publication = getResult("oaf_record.xml", Publication.class);
+ Result dataset = getResult("odf_dataset.xml", Dataset.class);
+ Result software = getResult("odf_software.xml", Software.class);
+ Result orp = getResult("oaf_orp.xml", OtherResearchProduct.class);
+
+ verifyMerge(publication, dataset, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
+ verifyMerge(dataset, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
+
+ verifyMerge(publication, software, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
+ verifyMerge(software, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
+
+ verifyMerge(publication, orp, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
+ verifyMerge(orp, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
+
+ verifyMerge(dataset, software, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
+ verifyMerge(software, dataset, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
+
+ verifyMerge(dataset, orp, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
+ verifyMerge(orp, dataset, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
+
+ verifyMerge(software, orp, Software.class, ModelConstants.SOFTWARE_RESULTTYPE_CLASSID);
+ verifyMerge(orp, software, Software.class, ModelConstants.SOFTWARE_RESULTTYPE_CLASSID);
+ }
+
+ protected void verifyMerge(Result publication, Result dataset, Class clazz,
+ String resultType) {
+ final Result merge = OafMapperUtils.mergeResults(publication, dataset);
+ assertTrue(clazz.isAssignableFrom(merge.getClass()));
+ assertEquals(resultType, merge.getResulttype().getClassid());
+ }
+
+ protected Result getResult(String xmlFileName, Class clazz) throws IOException {
+ final String xml = IOUtils.toString(getClass().getResourceAsStream(xmlFileName));
+ return new OdfToOafMapper(vocs, false)
+ .processMdRecord(xml)
+ .stream()
+ .filter(s -> clazz.isAssignableFrom(s.getClass()))
+ .map(s -> (Result) s)
+ .findFirst()
+ .get();
+ }
+
+ private List vocs() throws IOException {
+ return IOUtils
+ .readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));
+ }
+
+ private List synonyms() throws IOException {
+ return IOUtils
+ .readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt"));
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java
index f663d6095..9cf75f208 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java
@@ -27,14 +27,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
-import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
-import eu.dnetlib.dhp.schema.oaf.Datasource;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
-import eu.dnetlib.dhp.schema.oaf.Organization;
-import eu.dnetlib.dhp.schema.oaf.Project;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.oaf.Result;
+import eu.dnetlib.dhp.schema.oaf.*;
@ExtendWith(MockitoExtension.class)
public class MigrateDbEntitiesApplicationTest {
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasourceorganization_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasourceorganization_resultset_entry.json
index 2baf7c8f1..06b0d483b 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasourceorganization_resultset_entry.json
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasourceorganization_resultset_entry.json
@@ -31,8 +31,8 @@
},
{
"field": "trust",
- "type": "string",
- "value": "0.9"
+ "type": "double",
+ "value": 0.9
},
{
"field": "inferenceprovenance",
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasources_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasources_resultset_entry.json
index 8f8aed3a0..befa722e1 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasources_resultset_entry.json
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasources_resultset_entry.json
@@ -114,8 +114,8 @@
},
{
"field": "trust",
- "type": "string",
- "value": "0.9"
+ "type": "double",
+ "value": 0.9
},
{
"field": "inferenceprovenance",
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_orp.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_orp.xml
new file mode 100644
index 000000000..6c83073de
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_orp.xml
@@ -0,0 +1,84 @@
+
+
+
+ pensoft_____::00ea4a1cd53806a97d62ea6bf268f2a2
+ 10.3897/oneeco.2.e13718
+
+
+
+
+
+ 2020-03-23T00:20:51.392Z
+ 2020-03-23T00:26:59.078Z
+ pensoft_____
+
+
+ Ecosystem Service capacity is higher in areas of multiple designation types
+ Nikolaidou,Charitini
+ Votsi,Nefta
+ Sgardelis,Steanos
+ Halley,John
+ Pantis,John
+ Tsiafouli,Maria
+ 2017
+ The implementation of the Ecosystem Service (ES) concept into practice might be a challenging task as it has to take into account previous “traditional” policies and approaches that have evaluated nature and biodiversity differently. Among them the Habitat (92/43/EC) and Bird Directives (79/409/EC), the Water Framework Directive (2000/60/EC), and the Noise Directive (2002/49/EC) have led to the evaluation/designation of areas in Europe with different criteria. In this study our goal was to understand how the ES capacity of an area is related to its designation and if areas with multiple designations have higher capacity in providing ES. We selected four catchments in Greece with a great variety of characteristics covering over 25% of the national territory. Inside the catchments we assessed the ES capacity (following the methodology of Burkhard et al. 2009) of areas designated as Natura 2000 sites, Quiet areas and Wetlands or Water bodies and found those areas that have multiple designations. Data were analyzed by GLM to reveal differences regarding the ES capacity among the different types of areas. We also investigated by PCA synergies and trade-offs among different kinds of ES and tested for correlations among landscape properties, such as elevation, aspect and slope and the ES potential. Our results show that areas with different types or multiple designations have a different capacity in providing ES. Areas of one designation type (Protected or Quiet Areas) had in general intermediate scores in most ES but scores were higher compared to areas with no designation, which displayed stronger capacity in provisioning services. Among Protected Areas and Quiet Areas the latter scored better in general. Areas that combined both designation types (Protected and Quiet Areas) showed the highest capacity in 13 out of 29 ES, that were mostly linked with natural and forest ecosystems. We found significant synergies among most regulating, supporting and cultural ES which in turn display trade-offs with provisioning services. The different ES are spatially related and display strong correlation with landscape properties, such as elevation and slope. We suggest that the designation status of an area can be used as an alternative tool for environmental policy, indicating the capacity for ES provision. Multiple designations of areas can be used as proxies for locating ES “hotspots”. This integration of “traditional” evaluation and designation and the “newer” ES concept forms a time- and cost-effective way to be adopted by stakeholders and policy-makers in order to start complying with new standards and demands for nature conservation and environmental management.
+ text/html
+ https://doi.org/10.3897/oneeco.2.e13718
+ https://oneecosystem.pensoft.net/article/13718/
+ eng
+ Pensoft Publishers
+ info:eu-repo/semantics/altIdentifier/eissn/2367-8194
+ info:eu-repo/grantAgreement/EC/FP7/226852
+ One Ecosystem 2: e13718
+ One Ecosystem 2: e13718
+ One Ecosystem 2: e13718
+ Ecosystem Services hotspots
+ Natura 2000
+ Quiet Protected Areas
+ Biodiversity
+ Agriculture
+ Elevation
+ Slope
+ Ecosystem Service trade-offs and synergies
+ cultural services
+ provisioning services
+ regulating services
+ supporting services
+ Research Article
+
+ 0020
+ 2017-01-01
+ corda_______::226852
+ OPEN
+
+
+ 10.3897/oneeco.2.e13718
+ https://oneecosystem.pensoft.net/article/13718/
+ One Ecosystem
+ 0001
+
+
+
+
+ http%3A%2F%2Fzookeys.pensoft.net%2Foai.php
+ 10.3897/oneeco.2.e13718
+ 2017-09-08
+ http://www.openarchives.org/OAI/2.0/oai_dc/
+
+
+
+ false
+ false
+ 0.9
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/organizations_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/organizations_resultset_entry.json
index 38657a1e1..811a9079f 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/organizations_resultset_entry.json
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/organizations_resultset_entry.json
@@ -96,8 +96,8 @@
},
{
"field": "trust",
- "type": "string",
- "value": "0.9"
+ "type": "double",
+ "value": 0.9
},
{
"field": "inferenceprovenance",
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projectorganization_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projectorganization_resultset_entry.json
index 4311086e7..a3305926d 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projectorganization_resultset_entry.json
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projectorganization_resultset_entry.json
@@ -41,8 +41,8 @@
},
{
"field": "trust",
- "type": "string",
- "value": "0.9"
+ "type": "double",
+ "value": 0.9
},
{
"field": "inferenceprovenance",
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json
index a25215ca3..818bf3e58 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json
@@ -86,8 +86,8 @@
},
{
"field": "trust",
- "type": "string",
- "value": "0.9"
+ "type": "double",
+ "value": 0.9
},
{
"field": "inferenceprovenance",
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java
index d8eba31b6..b44ed7446 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java
@@ -2,12 +2,11 @@
package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
@@ -28,13 +27,11 @@ import com.google.common.collect.Maps;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
-import eu.dnetlib.dhp.oa.provision.model.*;
+import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
+import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
-import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
/**
* XmlConverterJob converts the JoinedEntities as XML records
@@ -43,8 +40,6 @@ public class XmlConverterJob {
private static final Logger log = LoggerFactory.getLogger(XmlConverterJob.class);
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd";
public static void main(String[] args) throws Exception {
@@ -129,10 +124,6 @@ public class XmlConverterJob {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
- private static Seq toSeq(List list) {
- return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq();
- }
-
private static Map prepareAccumulators(SparkContext sc) {
Map accumulators = Maps.newHashMap();
accumulators
diff --git a/pom.xml b/pom.xml
index 3629e2f1b..5d324c488 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,7 +50,7 @@
dnet45-releases
D-Net 45 releases
- http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases
+ https://maven.d4science.org/nexus/content/repositories/dnet45-releases
default
false
@@ -651,12 +651,12 @@
dnet45-snapshots
DNet45 Snapshots
- http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots
+ https://maven.d4science.org/nexus/content/repositories/dnet45-snapshots
default
dnet45-releases
- http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases
+ https://maven.d4science.org/nexus/content/repositories/dnet45-releases