) s -> OBJECT_MAPPER.readValue(s, clazz), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java
new file mode 100644
index 0000000000..b723de9554
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java
@@ -0,0 +1,162 @@
+
+package eu.dnetlib.dhp.oa.graph.merge;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FilterFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.*;
+import scala.Tuple2;
+
+/**
+ * Combines the content from two aggregator graph tables of the same type, entities (or relationships) with the same ids
+ * are picked preferring those from the BETA aggregator rather then from PROD. The identity of a relationship is defined
+ * by eu.dnetlib.dhp.schema.common.ModelSupport#idFn()
+ */
+public class MergeGraphSparkJob {
+
+ private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private static final String PRIORITY_DEFAULT = "BETA"; // BETA | PROD
+
+ public static void main(String[] args) throws Exception {
+
+ String jsonConfiguration = IOUtils
+ .toString(
+ CleanGraphSparkJob.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json"));
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+ parser.parseArgument(args);
+
+ String priority = Optional
+ .ofNullable(parser.get("priority"))
+ .orElse(PRIORITY_DEFAULT);
+ log.info("priority: {}", priority);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ String betaInputPath = parser.get("betaInputPath");
+ log.info("betaInputPath: {}", betaInputPath);
+
+ String prodInputPath = parser.get("prodInputPath");
+ log.info("prodInputPath: {}", prodInputPath);
+
+ String outputPath = parser.get("outputPath");
+ log.info("outputPath: {}", outputPath);
+
+ String graphTableClassName = parser.get("graphTableClassName");
+ log.info("graphTableClassName: {}", graphTableClassName);
+
+ Class extends OafEntity> entityClazz = (Class extends OafEntity>) Class.forName(graphTableClassName);
+
+ SparkConf conf = new SparkConf();
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.registerKryoClasses(ModelSupport.getOafModelClasses());
+
+ runWithSparkSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ removeOutputDir(spark, outputPath);
+ mergeGraphTable(spark, priority, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath);
+ });
+ }
+
+ private static void mergeGraphTable(
+ SparkSession spark,
+ String priority,
+ String betaInputPath,
+ String prodInputPath,
+ Class
p_clazz,
+ Class b_clazz,
+ String outputPath) {
+
+ Dataset> beta = readTableFromPath(spark, betaInputPath, b_clazz);
+ Dataset> prod = readTableFromPath(spark, prodInputPath, p_clazz);
+
+ prod
+ .joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer")
+ .map((MapFunction, Tuple2>, P>) value -> {
+ Optional p = Optional.ofNullable(value._1()).map(Tuple2::_2);
+ Optional b = Optional.ofNullable(value._2()).map(Tuple2::_2);
+ switch (priority) {
+ default:
+ case "BETA":
+ return mergeWithPriorityToBETA(p, b);
+ case "PROD":
+ return mergeWithPriorityToPROD(p, b);
+ }
+ }, Encoders.bean(p_clazz))
+ .filter((FilterFunction
) Objects::nonNull)
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(outputPath);
+ }
+
+ private static
P mergeWithPriorityToPROD(Optional
p, Optional b) {
+ if (b.isPresent() & !p.isPresent()) {
+ return (P) b.get();
+ }
+ if (p.isPresent()) {
+ return p.get();
+ }
+ return null;
+ }
+
+ private static
P mergeWithPriorityToBETA(Optional
p, Optional b) {
+ if (p.isPresent() & !b.isPresent()) {
+ return p.get();
+ }
+ if (b.isPresent()) {
+ return (P) b.get();
+ }
+ return null;
+ }
+
+ private static Dataset> readTableFromPath(
+ SparkSession spark, String inputEntityPath, Class clazz) {
+
+ log.info("Reading Graph table from: {}", inputEntityPath);
+ return spark
+ .read()
+ .textFile(inputEntityPath)
+ .map(
+ (MapFunction>) value -> {
+ final T t = OBJECT_MAPPER.readValue(value, clazz);
+ final String id = ModelSupport.idFn().apply(t);
+ return new Tuple2<>(id, t);
+ },
+ Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
+ }
+
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
index fc77950d01..5159fa9bb6 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
@@ -1,36 +1,10 @@
package eu.dnetlib.dhp.oa.graph.raw;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.keyValue;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.oaiIProvenance;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
-import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE;
-import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_ACCESS_MODES;
-import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PID_TYPES;
-import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PRODUCED_BY;
-import static eu.dnetlib.dhp.schema.common.ModelConstants.NOT_AVAILABLE;
-import static eu.dnetlib.dhp.schema.common.ModelConstants.ORP_DEFAULT_RESULTTYPE;
-import static eu.dnetlib.dhp.schema.common.ModelConstants.OUTCOME;
-import static eu.dnetlib.dhp.schema.common.ModelConstants.PRODUCES;
-import static eu.dnetlib.dhp.schema.common.ModelConstants.PUBLICATION_DEFAULT_RESULTTYPE;
-import static eu.dnetlib.dhp.schema.common.ModelConstants.REPOSITORY_PROVENANCE_ACTIONS;
-import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT;
-import static eu.dnetlib.dhp.schema.common.ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE;
-import static eu.dnetlib.dhp.schema.common.ModelConstants.UNKNOWN;
+import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
+import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document;
@@ -40,24 +14,8 @@ import org.dom4j.Node;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.LicenseComparator;
-import eu.dnetlib.dhp.schema.oaf.Author;
-import eu.dnetlib.dhp.schema.oaf.Context;
-import eu.dnetlib.dhp.schema.oaf.DataInfo;
-import eu.dnetlib.dhp.schema.oaf.Dataset;
-import eu.dnetlib.dhp.schema.oaf.Field;
-import eu.dnetlib.dhp.schema.oaf.GeoLocation;
-import eu.dnetlib.dhp.schema.oaf.Instance;
-import eu.dnetlib.dhp.schema.oaf.Journal;
-import eu.dnetlib.dhp.schema.oaf.KeyValue;
-import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
-import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
-import eu.dnetlib.dhp.schema.oaf.Publication;
-import eu.dnetlib.dhp.schema.oaf.Qualifier;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.oaf.Result;
-import eu.dnetlib.dhp.schema.oaf.Software;
-import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.oaf.*;
public abstract class AbstractMdRecordToOafMapper {
@@ -99,7 +57,6 @@ public abstract class AbstractMdRecordToOafMapper {
final Document doc = DocumentHelper
.parseText(xml.replaceAll(DATACITE_SCHEMA_KERNEL_4, DATACITE_SCHEMA_KERNEL_3));
- final String type = doc.valueOf("//dr:CobjCategory/@type");
final KeyValue collectedFrom = getProvenanceDatasource(
doc, "//oaf:collectedFrom/@id", "//oaf:collectedFrom/@name");
@@ -118,12 +75,39 @@ public abstract class AbstractMdRecordToOafMapper {
final DataInfo info = prepareDataInfo(doc, invisible);
final long lastUpdateTimestamp = new Date().getTime();
- return createOafs(doc, type, collectedFrom, hostedBy, info, lastUpdateTimestamp);
+ final List instances = prepareInstances(doc, info, collectedFrom, hostedBy);
+
+ final String type = getResultType(doc, instances);
+
+ return createOafs(doc, type, instances, collectedFrom, info, lastUpdateTimestamp);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
+ protected String getResultType(final Document doc, final List instances) {
+ String type = doc.valueOf("//dr:CobjCategory/@type");
+
+ if (StringUtils.isBlank(type) & vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) {
+ String instanceType = instances
+ .stream()
+ .map(i -> i.getInstancetype().getClassid())
+ .findFirst()
+ .map(s -> UNKNOWN.equalsIgnoreCase(s) ? "0000" : s)
+ .orElse("0000"); // Unknown
+ return Optional
+ .ofNullable(vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType))
+ .map(q -> q.getClassid())
+ .orElse("0000");
+ /*
+ * .orElseThrow( () -> new IllegalArgumentException( String.format("'%s' not mapped in %s", instanceType,
+ * DNET_RESULT_TYPOLOGIES)));
+ */
+ }
+
+ return type;
+ }
+
private KeyValue getProvenanceDatasource(final Document doc, final String xpathId, final String xpathName) {
final String dsId = doc.valueOf(xpathId);
final String dsName = doc.valueOf(xpathName);
@@ -138,8 +122,8 @@ public abstract class AbstractMdRecordToOafMapper {
protected List createOafs(
final Document doc,
final String type,
+ final List instances,
final KeyValue collectedFrom,
- final KeyValue hostedBy,
final DataInfo info,
final long lastUpdateTimestamp) {
@@ -148,14 +132,14 @@ public abstract class AbstractMdRecordToOafMapper {
switch (type.toLowerCase()) {
case "publication":
final Publication p = new Publication();
- populateResultFields(p, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
+ populateResultFields(p, doc, instances, collectedFrom, info, lastUpdateTimestamp);
p.setResulttype(PUBLICATION_DEFAULT_RESULTTYPE);
p.setJournal(prepareJournal(doc, info));
oafs.add(p);
break;
case "dataset":
final Dataset d = new Dataset();
- populateResultFields(d, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
+ populateResultFields(d, doc, instances, collectedFrom, info, lastUpdateTimestamp);
d.setResulttype(DATASET_DEFAULT_RESULTTYPE);
d.setStoragedate(prepareDatasetStorageDate(doc, info));
d.setDevice(prepareDatasetDevice(doc, info));
@@ -168,7 +152,7 @@ public abstract class AbstractMdRecordToOafMapper {
break;
case "software":
final Software s = new Software();
- populateResultFields(s, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
+ populateResultFields(s, doc, instances, collectedFrom, info, lastUpdateTimestamp);
s.setResulttype(SOFTWARE_DEFAULT_RESULTTYPE);
s.setDocumentationUrl(prepareSoftwareDocumentationUrls(doc, info));
s.setLicense(prepareSoftwareLicenses(doc, info));
@@ -180,7 +164,7 @@ public abstract class AbstractMdRecordToOafMapper {
case "otherresearchproducts":
default:
final OtherResearchProduct o = new OtherResearchProduct();
- populateResultFields(o, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
+ populateResultFields(o, doc, instances, collectedFrom, info, lastUpdateTimestamp);
o.setResulttype(ORP_DEFAULT_RESULTTYPE);
o.setContactperson(prepareOtherResearchProductContactPersons(doc, info));
o.setContactgroup(prepareOtherResearchProductContactGroups(doc, info));
@@ -259,14 +243,16 @@ public abstract class AbstractMdRecordToOafMapper {
private void populateResultFields(
final Result r,
final Document doc,
+ final List instances,
final KeyValue collectedFrom,
- final KeyValue hostedBy,
final DataInfo info,
final long lastUpdateTimestamp) {
r.setDataInfo(info);
r.setLastupdatetimestamp(lastUpdateTimestamp);
r.setId(createOpenaireId(50, doc.valueOf("//dri:objIdentifier"), false));
- r.setOriginalId(Arrays.asList(doc.valueOf("//dri:objIdentifier")));
+
+ r.setOriginalId(Arrays.asList(findOriginalId(doc)));
+
r.setCollectedfrom(Arrays.asList(collectedFrom));
r.setPid(prepareResultPids(doc, info));
r.setDateofcollection(doc.valueOf("//dr:dateOfCollection"));
@@ -291,7 +277,7 @@ public abstract class AbstractMdRecordToOafMapper {
r.setCoverage(prepareCoverages(doc, info));
r.setContext(prepareContexts(doc, info));
r.setExternalReference(new ArrayList<>()); // NOT PRESENT IN MDSTORES
- final List instances = prepareInstances(doc, info, collectedFrom, hostedBy);
+
r.setInstance(instances);
r.setBestaccessright(getBestAccessRights(instances));
}
@@ -378,6 +364,10 @@ public abstract class AbstractMdRecordToOafMapper {
protected abstract Field prepareDatasetStorageDate(Document doc, DataInfo info);
+ public static Qualifier createBestAccessRights(final List instanceList) {
+ return getBestAccessRights(instanceList);
+ }
+
protected static Qualifier getBestAccessRights(final List instanceList) {
if (instanceList != null) {
final Optional min = instanceList
@@ -425,6 +415,18 @@ public abstract class AbstractMdRecordToOafMapper {
return null;
}
+ private String findOriginalId(final Document doc) {
+ final Node n = doc.selectSingleNode("//*[local-name()='provenance']/*[local-name()='originDescription']");
+ if (n != null) {
+ final String id = n.valueOf("./*[local-name()='identifier']");
+ if (StringUtils.isNotBlank(id)) {
+ return id;
+ }
+ }
+ return doc.valueOf("//*[local-name()='header']/*[local-name()='identifier']");
+
+ }
+
protected Qualifier prepareQualifier(final Node node, final String xpath, final String schemeId) {
return prepareQualifier(node.valueOf(xpath).trim(), schemeId);
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java
index 8ede407731..63db13b8fb 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java
@@ -4,7 +4,11 @@ package eu.dnetlib.dhp.oa.graph.raw.common;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@@ -57,6 +61,7 @@ public class OafMapperUtils {
.stream(values)
.map(v -> field(v, info))
.filter(Objects::nonNull)
+ .filter(distinctByKey(f -> f.getValue()))
.collect(Collectors.toList());
}
@@ -65,6 +70,7 @@ public class OafMapperUtils {
.stream()
.map(v -> field(v, info))
.filter(Objects::nonNull)
+ .filter(distinctByKey(f -> f.getValue()))
.collect(Collectors.toList());
}
@@ -237,4 +243,10 @@ public class OafMapperUtils {
public static String asString(final Object o) {
return o == null ? "" : o.toString();
}
+
+ public static Predicate distinctByKey(
+ final Function super T, ?> keyExtractor) {
+ final Map