diff --git a/dhp-build/dhp-build-assembly-resources/pom.xml b/dhp-build/dhp-build-assembly-resources/pom.xml
index 9b03536dd..327c33d6f 100644
--- a/dhp-build/dhp-build-assembly-resources/pom.xml
+++ b/dhp-build/dhp-build-assembly-resources/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp-build
- 1.1.7-SNAPSHOT
+ 1.2.1-SNAPSHOT
dhp-build-assembly-resources
diff --git a/dhp-build/dhp-build-properties-maven-plugin/pom.xml b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
index 4d40edd99..873046e08 100644
--- a/dhp-build/dhp-build-properties-maven-plugin/pom.xml
+++ b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp-build
- 1.1.7-SNAPSHOT
+ 1.2.1-SNAPSHOT
dhp-build-properties-maven-plugin
diff --git a/dhp-build/dhp-code-style/pom.xml b/dhp-build/dhp-code-style/pom.xml
index 5e896e7a5..8099a72e4 100644
--- a/dhp-build/dhp-code-style/pom.xml
+++ b/dhp-build/dhp-code-style/pom.xml
@@ -1,13 +1,11 @@
-
+
4.0.0
eu.dnetlib.dhp
dhp-code-style
- 1.1.7-SNAPSHOT
+ 1.2.1-SNAPSHOT
jar
diff --git a/dhp-build/pom.xml b/dhp-build/pom.xml
index 041641fcf..a700a2918 100644
--- a/dhp-build/pom.xml
+++ b/dhp-build/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp
- 1.1.7-SNAPSHOT
+ 1.2.1-SNAPSHOT
dhp-build
pom
diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index 51af8d954..c1d6e1b5b 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.1.7-SNAPSHOT
+ 1.2.1-SNAPSHOT
../
diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml
index 4a123ceda..fe5d0c431 100644
--- a/dhp-schemas/pom.xml
+++ b/dhp-schemas/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.1.7-SNAPSHOT
+ 1.2.1-SNAPSHOT
../
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
index c6bfff12d..accc06d12 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
@@ -6,36 +6,86 @@ import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class ModelConstants {
public static final String DNET_RESULT_TYPOLOGIES = "dnet:result_typologies";
+ public static final String DNET_PUBLICATION_RESOURCE = "dnet:publication_resource";
+ public static final String DNET_ACCESS_MODES = "dnet:access_modes";
+ public static final String DNET_LANGUAGES = "dnet:languages";
+ public static final String DNET_PID_TYPES = "dnet:pid_types";
+ public static final String DNET_DATA_CITE_DATE = "dnet:dataCite_date";
+ public static final String DNET_DATA_CITE_RESOURCE = "dnet:dataCite_resource";
+ public static final String DNET_PROVENANCE_ACTIONS = "dnet:provenanceActions";
+
+ public static final String SYSIMPORT_CROSSWALK_REPOSITORY = "sysimport:crosswalk:repository";
+ public static final String SYSIMPORT_CROSSWALK_ENTITYREGISTRY = "sysimport:crosswalk:entityregistry";
+ public static final String USER_CLAIM = "user:claim";
public static final String DATASET_RESULTTYPE_CLASSID = "dataset";
public static final String PUBLICATION_RESULTTYPE_CLASSID = "publication";
public static final String SOFTWARE_RESULTTYPE_CLASSID = "software";
public static final String ORP_RESULTTYPE_CLASSID = "other";
- public static Qualifier PUBLICATION_DEFAULT_RESULTTYPE = new Qualifier();
- public static Qualifier DATASET_DEFAULT_RESULTTYPE = new Qualifier();
- public static Qualifier SOFTWARE_DEFAULT_RESULTTYPE = new Qualifier();
- public static Qualifier ORP_DEFAULT_RESULTTYPE = new Qualifier();
+ public static final String RESULT_RESULT = "resultResult";
+ public static final String PUBLICATION_DATASET = "publicationDataset";
+ public static final String IS_RELATED_TO = "isRelatedTo";
+ public static final String SUPPLEMENT = "supplement";
+ public static final String IS_SUPPLEMENT_TO = "isSupplementTo";
+ public static final String IS_SUPPLEMENTED_BY = "isSupplementedBy";
+ public static final String PART = "part";
+ public static final String IS_PART_OF = "IsPartOf";
+ public static final String HAS_PARTS = "HasParts";
+ public static final String RELATIONSHIP = "relationship";
- static {
- PUBLICATION_DEFAULT_RESULTTYPE.setClassid(PUBLICATION_RESULTTYPE_CLASSID);
- PUBLICATION_DEFAULT_RESULTTYPE.setClassname(PUBLICATION_RESULTTYPE_CLASSID);
- PUBLICATION_DEFAULT_RESULTTYPE.setSchemeid(DNET_RESULT_TYPOLOGIES);
- PUBLICATION_DEFAULT_RESULTTYPE.setSchemename(DNET_RESULT_TYPOLOGIES);
+ public static final String RESULT_PROJECT = "resultProject";
+ public static final String OUTCOME = "outcome";
+ public static final String IS_PRODUCED_BY = "isProducedBy";
+ public static final String PRODUCES = "produces";
- DATASET_DEFAULT_RESULTTYPE.setClassid(DATASET_RESULTTYPE_CLASSID);
- DATASET_DEFAULT_RESULTTYPE.setClassname(DATASET_RESULTTYPE_CLASSID);
- DATASET_DEFAULT_RESULTTYPE.setSchemeid(DNET_RESULT_TYPOLOGIES);
- DATASET_DEFAULT_RESULTTYPE.setSchemename(DNET_RESULT_TYPOLOGIES);
+ public static final String DATASOURCE_ORGANIZATION = "datasourceOrganization";
+ public static final String PROVISION = "provision";
+ public static final String IS_PROVIDED_BY = "isProvidedBy";
+ public static final String PROVIDES = "provides";
- SOFTWARE_DEFAULT_RESULTTYPE.setClassid(SOFTWARE_RESULTTYPE_CLASSID);
- SOFTWARE_DEFAULT_RESULTTYPE.setClassname(SOFTWARE_RESULTTYPE_CLASSID);
- SOFTWARE_DEFAULT_RESULTTYPE.setSchemeid(DNET_RESULT_TYPOLOGIES);
- SOFTWARE_DEFAULT_RESULTTYPE.setSchemename(DNET_RESULT_TYPOLOGIES);
+ public static final String PROJECT_ORGANIZATION = "projectOrganization";
+ public static final String PARTICIPATION = "participation";
+ public static final String HAS_PARTICIPANT = "hasParticipant";
+ public static final String IS_PARTICIPANT = "isParticipant";
- ORP_DEFAULT_RESULTTYPE.setClassid(ORP_RESULTTYPE_CLASSID);
- ORP_DEFAULT_RESULTTYPE.setClassname(ORP_RESULTTYPE_CLASSID);
- ORP_DEFAULT_RESULTTYPE.setSchemeid(DNET_RESULT_TYPOLOGIES);
- ORP_DEFAULT_RESULTTYPE.setSchemename(DNET_RESULT_TYPOLOGIES);
+ public static final String UNKNOWN = "UNKNOWN";
+ public static final String NOT_AVAILABLE = "not available";
+
+ public static final Qualifier PUBLICATION_DEFAULT_RESULTTYPE = qualifier(
+ PUBLICATION_RESULTTYPE_CLASSID, PUBLICATION_RESULTTYPE_CLASSID,
+ DNET_RESULT_TYPOLOGIES, DNET_RESULT_TYPOLOGIES);
+
+ public static final Qualifier DATASET_DEFAULT_RESULTTYPE = qualifier(
+ DATASET_RESULTTYPE_CLASSID, DATASET_RESULTTYPE_CLASSID,
+ DNET_RESULT_TYPOLOGIES, DNET_RESULT_TYPOLOGIES);
+
+ public static final Qualifier SOFTWARE_DEFAULT_RESULTTYPE = qualifier(
+ SOFTWARE_RESULTTYPE_CLASSID, SOFTWARE_RESULTTYPE_CLASSID,
+ DNET_RESULT_TYPOLOGIES, DNET_RESULT_TYPOLOGIES);
+
+ public static final Qualifier ORP_DEFAULT_RESULTTYPE = qualifier(
+ ORP_RESULTTYPE_CLASSID, ORP_RESULTTYPE_CLASSID,
+ DNET_RESULT_TYPOLOGIES, DNET_RESULT_TYPOLOGIES);
+
+ public static final Qualifier REPOSITORY_PROVENANCE_ACTIONS = qualifier(
+ SYSIMPORT_CROSSWALK_REPOSITORY, SYSIMPORT_CROSSWALK_REPOSITORY,
+ DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS);
+
+ public static final Qualifier ENTITYREGISTRY_PROVENANCE_ACTION = qualifier(
+ SYSIMPORT_CROSSWALK_ENTITYREGISTRY, SYSIMPORT_CROSSWALK_ENTITYREGISTRY,
+ DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS);
+
+ private static Qualifier qualifier(
+ final String classid,
+ final String classname,
+ final String schemeid,
+ final String schemename) {
+ final Qualifier q = new Qualifier();
+ q.setClassid(classid);
+ q.setClassname(classname);
+ q.setSchemeid(schemeid);
+ q.setSchemename(schemename);
+ return q;
}
}
diff --git a/dhp-workflows/dhp-actionmanager/pom.xml b/dhp-workflows/dhp-actionmanager/pom.xml
index 22ca7504d..22a81f7da 100644
--- a/dhp-workflows/dhp-actionmanager/pom.xml
+++ b/dhp-workflows/dhp-actionmanager/pom.xml
@@ -1,11 +1,10 @@
-
+
4.0.0
eu.dnetlib.dhp
dhp-workflows
- 1.1.7-SNAPSHOT
+ 1.2.1-SNAPSHOT
dhp-actionmanager
diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/MigrateActionSet.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/MigrateActionSet.java
index 89cb63fab..77be7652e 100644
--- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/MigrateActionSet.java
+++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/MigrateActionSet.java
@@ -84,8 +84,11 @@ public class MigrateActionSet {
final List sourcePaths = getSourcePaths(sourceNN, isLookUp);
log
.info(
- "paths to process:\n{}",
- sourcePaths.stream().map(p -> p.toString()).collect(Collectors.joining("\n")));
+ "paths to process:\n{}", sourcePaths
+ .stream()
+ .map(p -> p.toString())
+ .collect(Collectors.joining("\n")));
+
for (Path source : sourcePaths) {
if (!sourceFS.exists(source)) {
@@ -119,9 +122,8 @@ public class MigrateActionSet {
}
}
- props
- .setProperty(
- TARGET_PATHS, targetPaths.stream().map(p -> p.toString()).collect(Collectors.joining(",")));
+ final String targetPathsCsv = targetPaths.stream().map(p -> p.toString()).collect(Collectors.joining(","));
+ props.setProperty(TARGET_PATHS, targetPathsCsv);
File file = new File(System.getProperty("oozie.action.output.properties"));
try (OutputStream os = new FileOutputStream(file)) {
diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java
index 456113c43..90d573ac0 100644
--- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java
+++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java
@@ -1,12 +1,10 @@
package eu.dnetlib.dhp.actionmanager.migration;
-import static eu.dnetlib.data.proto.KindProtos.Kind.entity;
-import static eu.dnetlib.data.proto.KindProtos.Kind.relation;
-import static eu.dnetlib.data.proto.TypeProtos.*;
-import static eu.dnetlib.data.proto.TypeProtos.Type.*;
+import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -21,10 +19,6 @@ import eu.dnetlib.dhp.schema.oaf.*;
public class ProtoConverter implements Serializable {
- public static final String UNKNOWN = "UNKNOWN";
- public static final String NOT_AVAILABLE = "not available";
- public static final String DNET_ACCESS_MODES = "dnet:access_modes";
-
public static Oaf convert(OafProtos.Oaf oaf) {
try {
switch (oaf.getKind()) {
@@ -64,6 +58,7 @@ public class ProtoConverter implements Serializable {
case result:
final Result r = convertResult(oaf);
r.setInstance(convertInstances(oaf));
+ r.setExternalReference(convertExternalRefs(oaf));
return r;
case project:
return convertProject(oaf);
@@ -94,13 +89,44 @@ public class ProtoConverter implements Serializable {
i.setHostedby(mapKV(ri.getHostedby()));
i.setInstancetype(mapQualifier(ri.getInstancetype()));
i.setLicense(mapStringField(ri.getLicense()));
- i.setUrl(ri.getUrlList());
+ i
+ .setUrl(
+ ri.getUrlList() != null ? ri
+ .getUrlList()
+ .stream()
+ .distinct()
+ .collect(Collectors.toCollection(ArrayList::new)) : null);
i.setRefereed(mapStringField(ri.getRefereed()));
i.setProcessingchargeamount(mapStringField(ri.getProcessingchargeamount()));
i.setProcessingchargecurrency(mapStringField(ri.getProcessingchargecurrency()));
return i;
}
+ private static List convertExternalRefs(OafProtos.Oaf oaf) {
+ ResultProtos.Result r = oaf.getEntity().getResult();
+ if (r.getExternalReferenceCount() > 0) {
+ return r
+ .getExternalReferenceList()
+ .stream()
+ .map(e -> convertExtRef(e))
+ .collect(Collectors.toList());
+ }
+ return Lists.newArrayList();
+ }
+
+ private static ExternalReference convertExtRef(ResultProtos.Result.ExternalReference e) {
+ ExternalReference ex = new ExternalReference();
+ ex.setUrl(e.getUrl());
+ ex.setSitename(e.getSitename());
+ ex.setRefidentifier(e.getRefidentifier());
+ ex.setQuery(e.getQuery());
+ ex.setQualifier(mapQualifier(e.getQualifier()));
+ ex.setLabel(e.getLabel());
+ ex.setDescription(e.getDescription());
+ ex.setDataInfo(ex.getDataInfo());
+ return ex;
+ }
+
private static Organization convertOrganization(OafProtos.Oaf oaf) {
final OrganizationProtos.Organization.Metadata m = oaf.getEntity().getOrganization().getMetadata();
final Organization org = setOaf(new Organization(), oaf);
diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml
index 3e7b1a375..1c5465c14 100644
--- a/dhp-workflows/dhp-aggregation/pom.xml
+++ b/dhp-workflows/dhp-aggregation/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp-workflows
- 1.1.7-SNAPSHOT
+ 1.2.1-SNAPSHOT
dhp-aggregation
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java
index 9811fb707..861ae5201 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java
@@ -1,17 +1,21 @@
package eu.dnetlib.dhp.collection;
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import org.apache.commons.cli.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -23,6 +27,8 @@ import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -35,6 +41,8 @@ import eu.dnetlib.message.MessageType;
public class GenerateNativeStoreSparkJob {
+ private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class);
+
public static MetadataRecord parseRecord(
final String input,
final String xpath,
@@ -78,84 +86,90 @@ public class GenerateNativeStoreSparkJob {
final Provenance provenance = jsonMapper.readValue(parser.get("provenance"), Provenance.class);
final long dateOfCollection = new Long(parser.get("dateOfCollection"));
- final SparkSession spark = SparkSession
- .builder()
- .appName("GenerateNativeStoreSparkJob")
- .master(parser.get("master"))
- .getOrCreate();
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final Map ongoingMap = new HashMap<>();
final Map reportMap = new HashMap<>();
final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest"));
- final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+ SparkConf conf = new SparkConf();
+ runWithSparkSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
- final JavaPairRDD inputRDD = sc
- .sequenceFile(parser.get("input"), IntWritable.class, Text.class);
+ final JavaPairRDD inputRDD = sc
+ .sequenceFile(parser.get("input"), IntWritable.class, Text.class);
- final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
+ final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
+ final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
- final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
+ final MessageManager manager = new MessageManager(
+ parser.get("rabbitHost"),
+ parser.get("rabbitUser"),
+ parser.get("rabbitPassword"),
+ false,
+ false,
+ null);
- final MessageManager manager = new MessageManager(
- parser.get("rabbitHost"),
- parser.get("rabbitUser"),
- parser.get("rabbitPassword"),
- false,
- false,
- null);
+ final JavaRDD mappeRDD = inputRDD
+ .map(
+ item -> parseRecord(
+ item._2().toString(),
+ parser.get("xpath"),
+ parser.get("encoding"),
+ provenance,
+ dateOfCollection,
+ totalItems,
+ invalidRecords))
+ .filter(Objects::nonNull)
+ .distinct();
- final JavaRDD mappeRDD = inputRDD
- .map(
- item -> parseRecord(
- item._2().toString(),
- parser.get("xpath"),
- parser.get("encoding"),
- provenance,
- dateOfCollection,
- totalItems,
- invalidRecords))
- .filter(Objects::nonNull)
- .distinct();
+ ongoingMap.put("ongoing", "0");
+ if (!test) {
+ manager
+ .sendMessage(
+ new Message(
+ parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap),
+ parser.get("rabbitOngoingQueue"),
+ true,
+ false);
+ }
- ongoingMap.put("ongoing", "0");
- if (!test) {
- manager
- .sendMessage(
- new Message(
- parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap),
- parser.get("rabbitOngoingQueue"),
- true,
- false);
- }
+ final Encoder encoder = Encoders.bean(MetadataRecord.class);
+ final Dataset mdstore = spark.createDataset(mappeRDD.rdd(), encoder);
+ final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
+ mdStoreRecords.add(mdstore.count());
+ ongoingMap.put("ongoing", "" + totalItems.value());
+ if (!test) {
+ manager
+ .sendMessage(
+ new Message(
+ parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap),
+ parser.get("rabbitOngoingQueue"),
+ true,
+ false);
+ }
+ mdstore.write().format("parquet").save(parser.get("output"));
+ reportMap.put("inputItem", "" + totalItems.value());
+ reportMap.put("invalidRecords", "" + invalidRecords.value());
+ reportMap.put("mdStoreSize", "" + mdStoreRecords.value());
+ if (!test) {
+ manager
+ .sendMessage(
+ new Message(parser.get("workflowId"), "Collection", MessageType.REPORT, reportMap),
+ parser.get("rabbitReportQueue"),
+ true,
+ false);
+ manager.close();
+ }
+ });
- final Encoder encoder = Encoders.bean(MetadataRecord.class);
- final Dataset mdstore = spark.createDataset(mappeRDD.rdd(), encoder);
- final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
- mdStoreRecords.add(mdstore.count());
- ongoingMap.put("ongoing", "" + totalItems.value());
- if (!test) {
- manager
- .sendMessage(
- new Message(
- parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap),
- parser.get("rabbitOngoingQueue"),
- true,
- false);
- }
- mdstore.write().format("parquet").save(parser.get("output"));
- reportMap.put("inputItem", "" + totalItems.value());
- reportMap.put("invalidRecords", "" + invalidRecords.value());
- reportMap.put("mdStoreSize", "" + mdStoreRecords.value());
- if (!test) {
- manager
- .sendMessage(
- new Message(parser.get("workflowId"), "Collection", MessageType.REPORT, reportMap),
- parser.get("rabbitReportQueue"),
- true,
- false);
- manager.close();
- }
}
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java
index 5f39717d0..8737d36ef 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java
@@ -1,13 +1,17 @@
package eu.dnetlib.dhp.transformation;
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import org.apache.commons.cli.*;
import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
@@ -17,8 +21,11 @@ import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.vocabulary.Vocabulary;
import eu.dnetlib.dhp.transformation.vocabulary.VocabularyHelper;
@@ -29,6 +36,8 @@ import eu.dnetlib.message.MessageType;
public class TransformSparkJobNode {
+ private static final Logger log = LoggerFactory.getLogger(TransformSparkJobNode.class);
+
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
@@ -40,12 +49,18 @@ public class TransformSparkJobNode {
parser.parseArgument(args);
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
final String inputPath = parser.get("input");
final String outputPath = parser.get("output");
final String workflowId = parser.get("workflowId");
final String trasformationRule = extractXSLTFromTR(
Objects.requireNonNull(DHPUtils.decompressString(parser.get("transformationRule"))));
- final String master = parser.get("master");
+
final String rabbitUser = parser.get("rabbitUser");
final String rabbitPassword = parser.get("rabbitPassword");
final String rabbitHost = parser.get("rabbitHost");
@@ -53,46 +68,48 @@ public class TransformSparkJobNode {
final long dateOfCollection = new Long(parser.get("dateOfCollection"));
final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest"));
- final SparkSession spark = SparkSession
- .builder()
- .appName("TransformStoreSparkJob")
- .master(master)
- .getOrCreate();
+ SparkConf conf = new SparkConf();
+ runWithSparkSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ final Encoder encoder = Encoders.bean(MetadataRecord.class);
+ final Dataset mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder);
+ final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems");
+ final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems");
+ final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems");
+ final Map vocabularies = new HashMap<>();
+ vocabularies.put("dnet:languages", VocabularyHelper.getVocabularyFromAPI("dnet:languages"));
+ final TransformFunction transformFunction = new TransformFunction(
+ totalItems,
+ errorItems,
+ transformedItems,
+ trasformationRule,
+ dateOfCollection,
+ vocabularies);
+ mdstoreInput.map(transformFunction, encoder).write().format("parquet").save(outputPath);
+ if (rabbitHost != null) {
+ System.out.println("SEND FINAL REPORT");
+ final Map reportMap = new HashMap<>();
+ reportMap.put("inputItem", "" + totalItems.value());
+ reportMap.put("invalidRecords", "" + errorItems.value());
+ reportMap.put("mdStoreSize", "" + transformedItems.value());
+ System.out.println(new Message(workflowId, "Transform", MessageType.REPORT, reportMap));
+ if (!test) {
+ final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false,
+ false,
+ null);
+ manager
+ .sendMessage(
+ new Message(workflowId, "Transform", MessageType.REPORT, reportMap),
+ rabbitReportQueue,
+ true,
+ false);
+ manager.close();
+ }
+ }
+ });
- final Encoder encoder = Encoders.bean(MetadataRecord.class);
- final Dataset mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder);
- final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems");
- final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems");
- final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems");
- final Map vocabularies = new HashMap<>();
- vocabularies.put("dnet:languages", VocabularyHelper.getVocabularyFromAPI("dnet:languages"));
- final TransformFunction transformFunction = new TransformFunction(
- totalItems,
- errorItems,
- transformedItems,
- trasformationRule,
- dateOfCollection,
- vocabularies);
- mdstoreInput.map(transformFunction, encoder).write().format("parquet").save(outputPath);
- if (rabbitHost != null) {
- System.out.println("SEND FINAL REPORT");
- final Map reportMap = new HashMap<>();
- reportMap.put("inputItem", "" + totalItems.value());
- reportMap.put("invalidRecords", "" + errorItems.value());
- reportMap.put("mdStoreSize", "" + transformedItems.value());
- System.out.println(new Message(workflowId, "Transform", MessageType.REPORT, reportMap));
- if (!test) {
- final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false, false,
- null);
- manager
- .sendMessage(
- new Message(workflowId, "Transform", MessageType.REPORT, reportMap),
- rabbitReportQueue,
- true,
- false);
- manager.close();
- }
- }
}
private static String extractXSLTFromTR(final String tr) throws DocumentException {
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json
index 4b4925f27..4a6aec5ee 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json
@@ -1,16 +1,86 @@
[
- {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
- {"paramName":"e", "paramLongName":"encoding", "paramDescription": "the encoding of the input record should be JSON or XML", "paramRequired": true},
- {"paramName":"d", "paramLongName":"dateOfCollection", "paramDescription": "the date when the record has been stored", "paramRequired": true},
- {"paramName":"p", "paramLongName":"provenance", "paramDescription": "the infos about the provenance of the collected records", "paramRequired": true},
- {"paramName":"x", "paramLongName":"xpath", "paramDescription": "the xpath to identify the record ifentifier", "paramRequired": true},
- {"paramName":"i", "paramLongName":"input", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
- {"paramName":"o", "paramLongName":"output", "paramDescription": "the path of the result DataFrame on HDFS", "paramRequired": true},
- {"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true},
- {"paramName":"rp", "paramLongName":"rabbitPassword", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true},
- {"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true},
- {"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true},
- {"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true},
- {"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true},
- {"paramName":"t", "paramLongName":"isTest", "paramDescription": "the name of the report queue", "paramRequired": false}
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+ },
+ {
+ "paramName": "e",
+ "paramLongName": "encoding",
+ "paramDescription": "the encoding of the input record should be JSON or XML",
+ "paramRequired": true
+ },
+ {
+ "paramName": "d",
+ "paramLongName": "dateOfCollection",
+ "paramDescription": "the date when the record has been stored",
+ "paramRequired": true
+ },
+ {
+ "paramName": "p",
+ "paramLongName": "provenance",
+ "paramDescription": "the infos about the provenance of the collected records",
+ "paramRequired": true
+ },
+ {
+ "paramName": "x",
+ "paramLongName": "xpath",
+ "paramDescription": "the xpath to identify the record identifier",
+ "paramRequired": true
+ },
+ {
+ "paramName": "i",
+ "paramLongName": "input",
+ "paramDescription": "the path of the sequencial file to read",
+ "paramRequired": true
+ },
+ {
+ "paramName": "o",
+ "paramLongName": "output",
+ "paramDescription": "the path of the result DataFrame on HDFS",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ru",
+ "paramLongName": "rabbitUser",
+ "paramDescription": "the user to connect with RabbitMq for messaging",
+ "paramRequired": true
+ },
+ {
+ "paramName": "rp",
+ "paramLongName": "rabbitPassword",
+ "paramDescription": "the password to connect with RabbitMq for messaging",
+ "paramRequired": true
+ },
+ {
+ "paramName": "rh",
+ "paramLongName": "rabbitHost",
+ "paramDescription": "the host of the RabbitMq server",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ro",
+ "paramLongName": "rabbitOngoingQueue",
+ "paramDescription": "the name of the ongoing queue",
+ "paramRequired": true
+ },
+ {
+ "paramName": "rr",
+ "paramLongName": "rabbitReportQueue",
+ "paramDescription": "the name of the report queue",
+ "paramRequired": true
+ },
+ {
+ "paramName": "w",
+ "paramLongName": "workflowId",
+ "paramDescription": "the identifier of the dnet Workflow",
+ "paramRequired": true
+ },
+ {
+ "paramName": "t",
+ "paramLongName": "isTest",
+ "paramDescription": "the name of the report queue",
+ "paramRequired": false
+ }
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json
index 3af21f53f..4bb5fd56a 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json
@@ -1,16 +1,74 @@
[
- {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
- {"paramName":"d", "paramLongName":"dateOfCollection", "paramDescription": "the date when the record has been stored", "paramRequired": true},
- {"paramName":"i", "paramLongName":"input", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
- {"paramName":"o", "paramLongName":"output", "paramDescription": "the path of the result DataFrame on HDFS", "paramRequired": true},
- {"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true},
- {"paramName":"tr", "paramLongName":"transformationRule","paramDescription": "the transformation Rule to apply to the input MDStore", "paramRequired": true},
- {"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true},
- {"paramName":"rp", "paramLongName":"rabbitPassword", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true},
- {"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true},
- {"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true},
- {"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true},
- {"paramName":"t", "paramLongName":"isTest", "paramDescription": "the name of the report queue", "paramRequired": false}
-
-
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+ },
+ {
+ "paramName": "d",
+ "paramLongName": "dateOfCollection",
+ "paramDescription": "the date when the record has been stored",
+ "paramRequired": true
+ },
+ {
+ "paramName": "i",
+ "paramLongName": "input",
+ "paramDescription": "the path of the sequencial file to read",
+ "paramRequired": true
+ },
+ {
+ "paramName": "o",
+ "paramLongName": "output",
+ "paramDescription": "the path of the result DataFrame on HDFS",
+ "paramRequired": true
+ },
+ {
+ "paramName": "w",
+ "paramLongName": "workflowId",
+ "paramDescription": "the identifier of the dnet Workflow",
+ "paramRequired": true
+ },
+ {
+ "paramName": "tr",
+ "paramLongName": "transformationRule",
+ "paramDescription": "the transformation Rule to apply to the input MDStore",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ru",
+ "paramLongName": "rabbitUser",
+ "paramDescription": "the user to connect with RabbitMq for messaging",
+ "paramRequired": true
+ },
+ {
+ "paramName": "rp",
+ "paramLongName": "rabbitPassword",
+ "paramDescription": "the password to connect with RabbitMq for messaging",
+ "paramRequired": true
+ },
+ {
+ "paramName": "rh",
+ "paramLongName": "rabbitHost",
+ "paramDescription": "the host of the RabbitMq server",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ro",
+ "paramLongName": "rabbitOngoingQueue",
+ "paramDescription": "the name of the ongoing queue",
+ "paramRequired": true
+ },
+ {
+ "paramName": "rr",
+ "paramLongName": "rabbitReportQueue",
+ "paramDescription": "the name of the report queue",
+ "paramRequired": true
+ },
+ {
+ "paramName": "t",
+ "paramLongName": "isTest",
+ "paramDescription": "the name of the report queue",
+ "paramRequired": false
+ }
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java
index 44364b30a..c3b05f5c9 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java
@@ -9,65 +9,60 @@ import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.io.TempDir;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
public class CollectionJobTest {
- private Path testDir;
+ private static SparkSession spark;
- @BeforeEach
- public void setup() throws IOException {
- testDir = Files.createTempDirectory("dhp-collection");
+ @BeforeAll
+ public static void beforeAll() {
+ SparkConf conf = new SparkConf();
+ conf.setAppName(CollectionJobTest.class.getSimpleName());
+ conf.setMaster("local");
+ spark = SparkSession.builder().config(conf).getOrCreate();
}
- @AfterEach
- public void teadDown() throws IOException {
- FileUtils.deleteDirectory(testDir.toFile());
+ @AfterAll
+ public static void afterAll() {
+ spark.stop();
}
@Test
- public void tesCollection() throws Exception {
+ public void tesCollection(@TempDir Path testDir) throws Exception {
final Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix");
+ Assertions.assertNotNull(new ObjectMapper().writeValueAsString(provenance));
+
GenerateNativeStoreSparkJob
.main(
new String[] {
- "-mt",
- "local",
- "-w",
- "wid",
- "-e",
- "XML",
- "-d",
- "" + System.currentTimeMillis(),
- "-p",
- new ObjectMapper().writeValueAsString(provenance),
- "-x",
- "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
- "-i",
- this.getClass().getResource("/eu/dnetlib/dhp/collection/native.seq").toString(),
- "-o",
- testDir.toString() + "/store",
- "-t",
- "true",
- "-ru",
- "",
- "-rp",
- "",
- "-rh",
- "",
- "-ro",
- "",
- "-rr",
- ""
+ "issm", "true",
+ "-w", "wid",
+ "-e", "XML",
+ "-d", "" + System.currentTimeMillis(),
+ "-p", new ObjectMapper().writeValueAsString(provenance),
+ "-x", "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
+ "-i", this.getClass().getResource("/eu/dnetlib/dhp/collection/native.seq").toString(),
+ "-o", testDir.toString() + "/store",
+ "-t", "true",
+ "-ru", "",
+ "-rp", "",
+ "-rh", "",
+ "-ro", "",
+ "-rr", ""
});
- System.out.println(new ObjectMapper().writeValueAsString(provenance));
+
+ // TODO introduce useful assertions
+
}
@Test
@@ -85,9 +80,8 @@ public class CollectionJobTest {
null,
null);
- assert record != null;
- System.out.println(record.getId());
- System.out.println(record.getOriginalId());
+ assertNotNull(record.getId());
+ assertNotNull(record.getOriginalId());
}
@Test
@@ -112,10 +106,12 @@ public class CollectionJobTest {
System.currentTimeMillis(),
null,
null);
- assert record != null;
+
record.setBody("ciao");
- assert record1 != null;
record1.setBody("mondo");
+
+ assertNotNull(record);
+ assertNotNull(record1);
assertEquals(record, record1);
}
}
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java
index 01c9e3103..98c8cf66c 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java
@@ -12,10 +12,14 @@ import java.util.Map;
import javax.xml.transform.stream.StreamSource;
import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -23,6 +27,7 @@ import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import eu.dnetlib.dhp.collection.CollectionJobTest;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.functions.Cleaner;
import eu.dnetlib.dhp.transformation.vocabulary.Vocabulary;
@@ -33,6 +38,21 @@ import net.sf.saxon.s9api.*;
@ExtendWith(MockitoExtension.class)
public class TransformationJobTest {
+ private static SparkSession spark;
+
+ @BeforeAll
+ public static void beforeAll() {
+ SparkConf conf = new SparkConf();
+ conf.setAppName(CollectionJobTest.class.getSimpleName());
+ conf.setMaster("local");
+ spark = SparkSession.builder().config(conf).getOrCreate();
+ }
+
+ @AfterAll
+ public static void afterAll() {
+ spark.stop();
+ }
+
@Mock
private LongAccumulator accumulator;
@@ -78,31 +98,21 @@ public class TransformationJobTest {
TransformSparkJobNode
.main(
new String[] {
- "-mt",
- "local",
- "-i",
- mdstore_input,
- "-o",
- mdstore_output,
- "-d",
- "1",
- "-w",
- "1",
- "-tr",
- xslt,
- "-t",
- "true",
- "-ru",
- "",
- "-rp",
- "",
- "-rh",
- "",
- "-ro",
- "",
- "-rr",
- ""
+ "-issm", "true",
+ "-i", mdstore_input,
+ "-o", mdstore_output,
+ "-d", "1",
+ "-w", "1",
+ "-tr", xslt,
+ "-t", "true",
+ "-ru", "",
+ "-rp", "",
+ "-rh", "",
+ "-ro", "",
+ "-rr", ""
});
+
+ // TODO introduce useful assertions
}
@Test
diff --git a/dhp-workflows/dhp-broker-events/README.md b/dhp-workflows/dhp-broker-events/README.md
new file mode 100644
index 000000000..bee6e9995
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/README.md
@@ -0,0 +1,3 @@
+# dhp-broker-events
+dhp-broker-events is a DNET module responsible
+of the production of events for the OpenAIRE Broker Service.
diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml
new file mode 100644
index 000000000..8b7ec3851
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/pom.xml
@@ -0,0 +1,66 @@
+
+
+
+ dhp-workflows
+ eu.dnetlib.dhp
+ 1.2.1-SNAPSHOT
+
+ 4.0.0
+
+ dhp-broker-events
+
+
+
+
+ commons-io
+ commons-io
+
+
+
+ org.apache.spark
+ spark-core_2.11
+
+
+ org.apache.spark
+ spark-sql_2.11
+
+
+ org.apache.spark
+ spark-hive_2.11
+ test
+
+
+
+ eu.dnetlib.dhp
+ dhp-common
+ ${project.version}
+
+
+ eu.dnetlib.dhp
+ dhp-schemas
+ ${project.version}
+
+
+
+ com.jayway.jsonpath
+ json-path
+
+
+ dom4j
+ dom4j
+
+
+ jaxen
+ jaxen
+
+
+
+ eu.dnetlib
+ dnet-openaire-broker-common
+ [1.0.0,2.0.0)
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java
new file mode 100644
index 000000000..0512a3813
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java
@@ -0,0 +1,104 @@
+
+package eu.dnetlib.dhp.broker.model;
+
+import java.util.Map;
+
+public class Event {
+
+ private String eventId;
+
+ private String producerId;
+
+ private String topic;
+
+ private String payload;
+
+ private Long creationDate;
+
+ private Long expiryDate;
+
+ private boolean instantMessage;
+
+ private Map map;
+
+ public Event() {
+ }
+
+ public Event(final String producerId, final String eventId, final String topic, final String payload,
+ final Long creationDate, final Long expiryDate,
+ final boolean instantMessage,
+ final Map map) {
+ this.producerId = producerId;
+ this.eventId = eventId;
+ this.topic = topic;
+ this.payload = payload;
+ this.creationDate = creationDate;
+ this.expiryDate = expiryDate;
+ this.instantMessage = instantMessage;
+ this.map = map;
+ }
+
+ public String getProducerId() {
+ return this.producerId;
+ }
+
+ public void setProducerId(final String producerId) {
+ this.producerId = producerId;
+ }
+
+ public String getEventId() {
+ return this.eventId;
+ }
+
+ public void setEventId(final String eventId) {
+ this.eventId = eventId;
+ }
+
+ public String getTopic() {
+ return this.topic;
+ }
+
+ public void setTopic(final String topic) {
+ this.topic = topic;
+ }
+
+ public String getPayload() {
+ return this.payload;
+ }
+
+ public void setPayload(final String payload) {
+ this.payload = payload;
+ }
+
+ public Long getCreationDate() {
+ return this.creationDate;
+ }
+
+ public void setCreationDate(final Long creationDate) {
+ this.creationDate = creationDate;
+ }
+
+ public Long getExpiryDate() {
+ return this.expiryDate;
+ }
+
+ public void setExpiryDate(final Long expiryDate) {
+ this.expiryDate = expiryDate;
+ }
+
+ public boolean isInstantMessage() {
+ return this.instantMessage;
+ }
+
+ public void setInstantMessage(final boolean instantMessage) {
+ this.instantMessage = instantMessage;
+ }
+
+ public Map getMap() {
+ return this.map;
+ }
+
+ public void setMap(final Map map) {
+ this.map = map;
+ }
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
new file mode 100644
index 000000000..0694556b2
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
@@ -0,0 +1,140 @@
+
+package eu.dnetlib.dhp.broker.model;
+
+import java.text.ParseException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.DateUtils;
+
+import eu.dnetlib.broker.objects.OpenAireEventPayload;
+import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.schema.oaf.Author;
+import eu.dnetlib.dhp.schema.oaf.KeyValue;
+import eu.dnetlib.dhp.schema.oaf.Result;
+import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+
+public class EventFactory {
+
+ private final static String PRODUCER_ID = "OpenAIRE";
+
+ private static final int TTH_DAYS = 365;
+
+ private final static String[] DATE_PATTERNS = {
+ "yyyy-MM-dd"
+ };
+
+ public static Event newBrokerEvent(final Result source, final Result target, final UpdateInfo> updateInfo) {
+
+ final long now = new Date().getTime();
+
+ final Event res = new Event();
+
+ final Map map = createMapFromResult(target, source, updateInfo);
+
+ final String payload = createPayload(target, updateInfo);
+
+ final String eventId = calculateEventId(
+ updateInfo.getTopic(), target.getOriginalId().get(0), updateInfo.getHighlightValueAsString());
+
+ res.setEventId(eventId);
+ res.setProducerId(PRODUCER_ID);
+ res.setPayload(payload);
+ res.setMap(map);
+ res.setTopic(updateInfo.getTopic());
+ res.setCreationDate(now);
+ res.setExpiryDate(calculateExpiryDate(now));
+ res.setInstantMessage(false);
+ return res;
+ }
+
+ private static String createPayload(final Result result, final UpdateInfo> updateInfo) {
+ final OpenAireEventPayload payload = new OpenAireEventPayload();
+ // TODO
+
+ updateInfo.compileHighlight(payload);
+
+ return payload.toJSON();
+ }
+
+ private static Map createMapFromResult(final Result oaf, final Result source,
+ final UpdateInfo> updateInfo) {
+ final Map map = new HashMap<>();
+
+ final List collectedFrom = oaf.getCollectedfrom();
+ if (collectedFrom.size() == 1) {
+ map.put("target_datasource_id", collectedFrom.get(0).getKey());
+ map.put("target_datasource_name", collectedFrom.get(0).getValue());
+ }
+
+ final List ids = oaf.getOriginalId();
+ if (ids.size() > 0) {
+ map.put("target_publication_id", ids.get(0));
+ }
+
+ final List titles = oaf.getTitle();
+ if (titles.size() > 0) {
+ map.put("target_publication_title", titles.get(0));
+ }
+
+ final long date = parseDateTolong(oaf.getDateofacceptance().getValue());
+ if (date > 0) {
+ map.put("target_dateofacceptance", date);
+ }
+
+ final List subjects = oaf.getSubject();
+ if (subjects.size() > 0) {
+ map
+ .put(
+ "target_publication_subject_list",
+ subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList()));
+ }
+
+ final List authors = oaf.getAuthor();
+ if (authors.size() > 0) {
+ map
+ .put(
+ "target_publication_author_list",
+ authors.stream().map(Author::getFullname).collect(Collectors.toList()));
+ }
+
+ // PROVENANCE INFO
+ map.put("trust", updateInfo.getTrust());
+ final List sourceCollectedFrom = source.getCollectedfrom();
+ if (sourceCollectedFrom.size() == 1) {
+ map.put("provenance_datasource_id", sourceCollectedFrom.get(0).getKey());
+ map.put("provenance_datasource_name", sourceCollectedFrom.get(0).getValue());
+ }
+ map.put("provenance_publication_id_list", source.getOriginalId());
+
+ return map;
+ }
+
+ private static String calculateEventId(final String topic, final String publicationId, final String value) {
+ return "event-"
+ + DigestUtils.md5Hex(topic).substring(0, 6) + "-"
+ + DigestUtils.md5Hex(publicationId).substring(0, 8) + "-"
+ + DigestUtils.md5Hex(value).substring(0, 8);
+ }
+
+ private static long calculateExpiryDate(final long now) {
+ return now + TTH_DAYS * 24 * 60 * 60 * 1000;
+ }
+
+ private static long parseDateTolong(final String date) {
+ if (StringUtils.isBlank(date)) {
+ return -1;
+ }
+ try {
+ return DateUtils.parseDate(date, DATE_PATTERNS).getTime();
+ } catch (final ParseException e) {
+ return -1;
+ }
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
new file mode 100644
index 000000000..54d4ef36a
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
@@ -0,0 +1,112 @@
+
+package eu.dnetlib.dhp.broker.oa;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.broker.model.Event;
+import eu.dnetlib.dhp.broker.model.EventFactory;
+import eu.dnetlib.dhp.broker.oa.util.EnrichMissingAbstract;
+import eu.dnetlib.dhp.broker.oa.util.EnrichMissingAuthorOrcid;
+import eu.dnetlib.dhp.broker.oa.util.EnrichMissingOpenAccess;
+import eu.dnetlib.dhp.broker.oa.util.EnrichMissingPid;
+import eu.dnetlib.dhp.broker.oa.util.EnrichMissingProject;
+import eu.dnetlib.dhp.broker.oa.util.EnrichMissingPublicationDate;
+import eu.dnetlib.dhp.broker.oa.util.EnrichMissingSubject;
+import eu.dnetlib.dhp.broker.oa.util.EnrichMoreOpenAccess;
+import eu.dnetlib.dhp.broker.oa.util.EnrichMorePid;
+import eu.dnetlib.dhp.broker.oa.util.EnrichMoreSubject;
+import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class GenerateEventsApplication {
+
+ private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ GenerateEventsApplication.class
+ .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
+ parser.parseArgument(args);
+
+ final Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String graphPath = parser.get("graphPath");
+ log.info("graphPath: {}", graphPath);
+
+ final String eventsPath = parser.get("eventsPath");
+ log.info("eventsPath: {}", eventsPath);
+
+ final SparkConf conf = new SparkConf();
+ runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+ removeOutputDir(spark, eventsPath);
+ generateEvents(spark, graphPath, eventsPath);
+ });
+
+ }
+
+ private static void removeOutputDir(final SparkSession spark, final String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+ private static void generateEvents(final SparkSession spark, final String graphPath, final String eventsPath) {
+ // TODO
+ }
+
+ private List generateEvents(final Result... children) {
+ final List list = new ArrayList<>();
+
+ for (final Result source : children) {
+ for (final Result target : children) {
+ if (source != target) {
+ list
+ .addAll(
+ findUpdates(source, target)
+ .stream()
+ .map(info -> EventFactory.newBrokerEvent(source, target, info))
+ .collect(Collectors.toList()));
+ }
+ }
+ }
+
+ return list;
+ }
+
+ private List> findUpdates(final Result source, final Result target) {
+ final List> list = new ArrayList<>();
+ list.addAll(EnrichMissingAbstract.findUpdates(source, target));
+ list.addAll(EnrichMissingAuthorOrcid.findUpdates(source, target));
+ list.addAll(EnrichMissingOpenAccess.findUpdates(source, target));
+ list.addAll(EnrichMissingPid.findUpdates(source, target));
+ list.addAll(EnrichMissingProject.findUpdates(source, target));
+ list.addAll(EnrichMissingPublicationDate.findUpdates(source, target));
+ list.addAll(EnrichMissingSubject.findUpdates(source, target));
+ list.addAll(EnrichMoreOpenAccess.findUpdates(source, target));
+ list.addAll(EnrichMorePid.findUpdates(source, target));
+ list.addAll(EnrichMoreSubject.findUpdates(source, target));
+ return list;
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAbstract.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAbstract.java
new file mode 100644
index 000000000..493d1f97c
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAbstract.java
@@ -0,0 +1,31 @@
+
+package eu.dnetlib.dhp.broker.oa.util;
+
+import java.util.Arrays;
+import java.util.List;
+
+import eu.dnetlib.broker.objects.OpenAireEventPayload;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMissingAbstract extends UpdateInfo {
+
+ public static List findUpdates(final Result source, final Result target) {
+ // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
+ return Arrays.asList();
+ }
+
+ private EnrichMissingAbstract(final String highlightValue, final float trust) {
+ super("ENRICH/MISSING/ABSTRACT", highlightValue, trust);
+ }
+
+ @Override
+ public void compileHighlight(final OpenAireEventPayload payload) {
+ payload.getHighlight().getAbstracts().add(getHighlightValue());
+ }
+
+ @Override
+ public String getHighlightValueAsString() {
+ return getHighlightValue();
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAuthorOrcid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAuthorOrcid.java
new file mode 100644
index 000000000..6899c62a3
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAuthorOrcid.java
@@ -0,0 +1,31 @@
+
+package eu.dnetlib.dhp.broker.oa.util;
+
+import java.util.Arrays;
+import java.util.List;
+
+import eu.dnetlib.broker.objects.OpenAireEventPayload;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMissingAuthorOrcid extends UpdateInfo {
+
+ public static List findUpdates(final Result source, final Result target) {
+ // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
+ return Arrays.asList();
+ }
+
+ private EnrichMissingAuthorOrcid(final String highlightValue, final float trust) {
+ super("ENRICH/MISSING/AUTHOR/ORCID", highlightValue, trust);
+ }
+
+ @Override
+ public void compileHighlight(final OpenAireEventPayload payload) {
+ // TODO
+ }
+
+ @Override
+ public String getHighlightValueAsString() {
+ return getHighlightValue();
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingOpenAccess.java
new file mode 100644
index 000000000..9464130f3
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingOpenAccess.java
@@ -0,0 +1,32 @@
+
+package eu.dnetlib.dhp.broker.oa.util;
+
+import java.util.Arrays;
+import java.util.List;
+
+import eu.dnetlib.broker.objects.Instance;
+import eu.dnetlib.broker.objects.OpenAireEventPayload;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMissingOpenAccess extends UpdateInfo {
+
+ public static List findUpdates(final Result source, final Result target) {
+ // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
+ return Arrays.asList();
+ }
+
+ private EnrichMissingOpenAccess(final Instance highlightValue, final float trust) {
+ super("ENRICH/MISSING/OPENACCESS_VERSION", highlightValue, trust);
+ }
+
+ @Override
+ public void compileHighlight(final OpenAireEventPayload payload) {
+ payload.getHighlight().getInstances().add(getHighlightValue());
+ }
+
+ @Override
+ public String getHighlightValueAsString() {
+ return getHighlightValue().getUrl();
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPid.java
new file mode 100644
index 000000000..293d4993f
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPid.java
@@ -0,0 +1,32 @@
+
+package eu.dnetlib.dhp.broker.oa.util;
+
+import java.util.Arrays;
+import java.util.List;
+
+import eu.dnetlib.broker.objects.OpenAireEventPayload;
+import eu.dnetlib.broker.objects.Pid;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMissingPid extends UpdateInfo {
+
+ public static List findUpdates(final Result source, final Result target) {
+ // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
+ return Arrays.asList();
+ }
+
+ private EnrichMissingPid(final Pid highlightValue, final float trust) {
+ super("ENRICH/MISSING/PID", highlightValue, trust);
+ }
+
+ @Override
+ public void compileHighlight(final OpenAireEventPayload payload) {
+ payload.getHighlight().getPids().add(getHighlightValue());
+ }
+
+ @Override
+ public String getHighlightValueAsString() {
+ return getHighlightValue().getType() + "::" + getHighlightValue().getValue();
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingProject.java
new file mode 100644
index 000000000..a22c179a2
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingProject.java
@@ -0,0 +1,33 @@
+
+package eu.dnetlib.dhp.broker.oa.util;
+
+import java.util.Arrays;
+import java.util.List;
+
+import eu.dnetlib.broker.objects.OpenAireEventPayload;
+import eu.dnetlib.broker.objects.Project;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMissingProject extends UpdateInfo {
+
+ public static List findUpdates(final Result source, final Result target) {
+ // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
+ return Arrays.asList();
+ }
+
+ private EnrichMissingProject(final Project highlightValue, final float trust) {
+ super("ENRICH/MISSING/PROJECT", highlightValue, trust);
+ }
+
+ @Override
+ public void compileHighlight(final OpenAireEventPayload payload) {
+ payload.getHighlight().getProjects().add(getHighlightValue());
+ }
+
+ @Override
+ public String getHighlightValueAsString() {
+ return getHighlightValue().getFunder() + "::" + getHighlightValue().getFundingProgram()
+ + getHighlightValue().getCode();
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPublicationDate.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPublicationDate.java
new file mode 100644
index 000000000..869dca264
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPublicationDate.java
@@ -0,0 +1,31 @@
+
+package eu.dnetlib.dhp.broker.oa.util;
+
+import java.util.Arrays;
+import java.util.List;
+
+import eu.dnetlib.broker.objects.OpenAireEventPayload;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMissingPublicationDate extends UpdateInfo {
+
+ public static List findUpdates(final Result source, final Result target) {
+ // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
+ return Arrays.asList();
+ }
+
+ private EnrichMissingPublicationDate(final String highlightValue, final float trust) {
+ super("ENRICH/MISSING/PUBLICATION_DATE", highlightValue, trust);
+ }
+
+ @Override
+ public void compileHighlight(final OpenAireEventPayload payload) {
+ payload.getHighlight().setPublicationdate(getHighlightValue());
+ }
+
+ @Override
+ public String getHighlightValueAsString() {
+ return getHighlightValue();
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingSubject.java
new file mode 100644
index 000000000..a2ed5d043
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingSubject.java
@@ -0,0 +1,36 @@
+
+package eu.dnetlib.dhp.broker.oa.util;
+
+import java.util.Arrays;
+import java.util.List;
+
+import eu.dnetlib.broker.objects.OpenAireEventPayload;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMissingSubject extends UpdateInfo {
+
+ public static List findUpdates(final Result source, final Result target) {
+ // MESHEUROPMC
+ // ARXIV
+ // JEL
+ // DDC
+ // ACM
+
+ return Arrays.asList();
+ }
+
+ private EnrichMissingSubject(final String subjectClassification, final String highlightValue, final float trust) {
+ super("ENRICH/MISSING/SUBJECT/" + subjectClassification, highlightValue, trust);
+ }
+
+ @Override
+ public void compileHighlight(final OpenAireEventPayload payload) {
+ payload.getHighlight().getSubjects().add(getHighlightValue());
+ }
+
+ @Override
+ public String getHighlightValueAsString() {
+ return getHighlightValue();
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreOpenAccess.java
new file mode 100644
index 000000000..4f1e88d3d
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreOpenAccess.java
@@ -0,0 +1,32 @@
+
+package eu.dnetlib.dhp.broker.oa.util;
+
+import java.util.Arrays;
+import java.util.List;
+
+import eu.dnetlib.broker.objects.Instance;
+import eu.dnetlib.broker.objects.OpenAireEventPayload;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMoreOpenAccess extends UpdateInfo {
+
+ public static List findUpdates(final Result source, final Result target) {
+ // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
+ return Arrays.asList();
+ }
+
+ private EnrichMoreOpenAccess(final Instance highlightValue, final float trust) {
+ super("ENRICH/MORE/OPENACCESS_VERSION", highlightValue, trust);
+ }
+
+ @Override
+ public void compileHighlight(final OpenAireEventPayload payload) {
+ payload.getHighlight().getInstances().add(getHighlightValue());
+ }
+
+ @Override
+ public String getHighlightValueAsString() {
+ return getHighlightValue().getUrl();
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMorePid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMorePid.java
new file mode 100644
index 000000000..ecf2cf310
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMorePid.java
@@ -0,0 +1,32 @@
+
+package eu.dnetlib.dhp.broker.oa.util;
+
+import java.util.Arrays;
+import java.util.List;
+
+import eu.dnetlib.broker.objects.OpenAireEventPayload;
+import eu.dnetlib.broker.objects.Pid;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMorePid extends UpdateInfo {
+
+ public static List findUpdates(final Result source, final Result target) {
+ // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
+ return Arrays.asList();
+ }
+
+ private EnrichMorePid(final Pid highlightValue, final float trust) {
+ super("ENRICH/MORE/PID", highlightValue, trust);
+ }
+
+ @Override
+ public void compileHighlight(final OpenAireEventPayload payload) {
+ payload.getHighlight().getPids().add(getHighlightValue());
+ }
+
+ @Override
+ public String getHighlightValueAsString() {
+ return getHighlightValue().getType() + "::" + getHighlightValue().getValue();
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreSubject.java
new file mode 100644
index 000000000..f29b86292
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreSubject.java
@@ -0,0 +1,36 @@
+
+package eu.dnetlib.dhp.broker.oa.util;
+
+import java.util.Arrays;
+import java.util.List;
+
+import eu.dnetlib.broker.objects.OpenAireEventPayload;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMoreSubject extends UpdateInfo {
+
+ public static List findUpdates(final Result source, final Result target) {
+ // MESHEUROPMC
+ // ARXIV
+ // JEL
+ // DDC
+ // ACM
+
+ return Arrays.asList();
+ }
+
+ private EnrichMoreSubject(final String subjectClassification, final String highlightValue, final float trust) {
+ super("ENRICH/MORE/SUBJECT/" + subjectClassification, highlightValue, trust);
+ }
+
+ @Override
+ public void compileHighlight(final OpenAireEventPayload payload) {
+ payload.getHighlight().getSubjects().add(getHighlightValue());
+ }
+
+ @Override
+ public String getHighlightValueAsString() {
+ return getHighlightValue();
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
new file mode 100644
index 000000000..f7b6b69e9
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
@@ -0,0 +1,36 @@
+
+package eu.dnetlib.dhp.broker.oa.util;
+
+import eu.dnetlib.broker.objects.OpenAireEventPayload;
+
+public abstract class UpdateInfo {
+
+ private final String topic;
+
+ private final T highlightValue;
+
+ private final float trust;
+
+ protected UpdateInfo(final String topic, final T highlightValue, final float trust) {
+ this.topic = topic;
+ this.highlightValue = highlightValue;
+ this.trust = trust;
+ }
+
+ public T getHighlightValue() {
+ return highlightValue;
+ }
+
+ public float getTrust() {
+ return trust;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ abstract public void compileHighlight(OpenAireEventPayload payload);
+
+ abstract public String getHighlightValueAsString();
+
+}
diff --git a/dhp-workflows/dhp-dedup-openaire/pom.xml b/dhp-workflows/dhp-dedup-openaire/pom.xml
index e7f2a926f..fcc356ac0 100644
--- a/dhp-workflows/dhp-dedup-openaire/pom.xml
+++ b/dhp-workflows/dhp-dedup-openaire/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.1.7-SNAPSHOT
+ 1.2.1-SNAPSHOT
4.0.0
dhp-dedup-openaire
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java
index a44650823..c0503d991 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java
@@ -137,10 +137,14 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
}
private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) {
+
+ String entityType = dedupConf.getWf().getEntityType();
+
Relation r = new Relation();
r.setSource(source);
r.setTarget(target);
r.setRelClass(relClass);
+ r.setRelType(entityType + entityType.substring(0, 1).toUpperCase() + entityType.substring(1));
r.setSubRelType("dedup");
DataInfo info = new DataInfo();
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java
index 2d18c9a61..516808511 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java
@@ -86,7 +86,8 @@ public class SparkPropagateRelation extends AbstractSparkAction {
mergedIds,
FieldType.TARGET,
getFixRelFn(FieldType.TARGET))
- .filter(SparkPropagateRelation::containsDedup);
+ .filter(SparkPropagateRelation::containsDedup)
+ .distinct();
Dataset updated = processDataset(
processDataset(rels, mergedIds, FieldType.SOURCE, getDeletedFn()),
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml
index 2451947a1..298a248e3 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml
@@ -75,12 +75,20 @@
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+
+
+
yarn
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java
index 990ac04c0..8dd00be97 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java
@@ -18,6 +18,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@@ -29,6 +30,8 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
@@ -420,7 +423,7 @@ public class SparkDedupTest implements Serializable {
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
- assertEquals(5022, relations);
+ assertEquals(4975, relations);
// check deletedbyinference
final Dataset mergeRels = spark
@@ -450,6 +453,25 @@ public class SparkDedupTest implements Serializable {
assertEquals(updated, deletedbyinference);
}
+ @Test
+ @Order(6)
+ public void testRelations() throws Exception {
+ testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10);
+ testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);
+ }
+
+ private void testUniqueness(String path, int expected_total, int expected_unique) {
+ Dataset rel = spark
+ .read()
+ .textFile(getClass().getResource(path).getPath())
+ .map(
+ (MapFunction) s -> new ObjectMapper().readValue(s, Relation.class),
+ Encoders.bean(Relation.class));
+
+ assertEquals(expected_total, rel.count());
+ assertEquals(expected_unique, rel.distinct().count());
+ }
+
@AfterAll
public static void finalCleanUp() throws IOException {
FileUtils.deleteDirectory(new File(testOutputBasePath));
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/test/relation_1.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/test/relation_1.json
new file mode 100644
index 000000000..c0cf8b695
--- /dev/null
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/test/relation_1.json
@@ -0,0 +1,12 @@
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1588608946167,"relClass":"provides","relType":"datasourceOrganization","source":"20|doajarticles::40c7b1dfa18c3693d374dafd21ef852f","subRelType":"provision","target":"10|doajarticles::618df40624078491acfd93ca3ff6921c"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1588608946167,"relClass":"provides","relType":"datasourceOrganization","source":"20|doajarticles::0b4e756a73338f60b84de98d080f6422","subRelType":"provision","target":"10|doajarticles::6d01e689db13b6977b411f4170b6143b"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1588608946167,"relClass":"provides","relType":"datasourceOrganization","source":"20|doajarticles::fe2f7c9d350b9c5aa658ec384d761e33","subRelType":"provision","target":"10|doajarticles::9b8a956b0703854ba79e52ddf7dc552e"}
+{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1588608946167,"relClass":"provides","relType":"datasourceOrganization","source":"20|doajarticles::a116734108ba011ef715b012f095e3f5","subRelType":"provision","target":"10|doajarticles::c5de04b1a35da2cc4468e299bc9ffa16"}
+{"collectedfrom":[{"key":"10|openaire____::47ce9e9f4fad46e732cff06419ecaabb","value":"OpenDOAR"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1588608946167,"relClass":"provides","relType":"datasourceOrganization","source":"20|opendoar____::8b83abbbcad5496fe43cda88d0045aa4","subRelType":"provision","target":"10|opendoar____::6855456e2fe46a9d49d3d3af4f57443d"}
+{"collectedfrom":[{"key":"10|openaire____::47ce9e9f4fad46e732cff06419ecaabb","value":"OpenDOAR"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1588608946167,"relClass":"provides","relType":"datasourceOrganization","source":"20|opendoar____::88034de0247d9d36e22783e9319c5ba3","subRelType":"provision","target":"10|opendoar____::c17028c9b6e0c5deaad29665d582284a"}
+{"collectedfrom":[{"key":"10|openaire____::47ce9e9f4fad46e732cff06419ecaabb","value":"OpenDOAR"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1588608946167,"relClass":"provides","relType":"datasourceOrganization","source":"20|opendoar____::dfb21c796f33e9acf505cc960a3d8d2c","subRelType":"provision","target":"10|opendoar____::dfa037a53e121ecc9e0926800c3e814e"}
+{"collectedfrom":[{"key":"10|openaire____::21f8a223b9925c2f87c404096080b046","value":"Registry of Research Data Repository"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1588608946167,"relClass":"provides","relType":"datasourceOrganization","source":"20|re3data_____::b526b1aa1562038881a31be59896985f","subRelType":"provision","target":"10|re3data_____::2e457773b62df3534cc04441bf406a70"}
+{"collectedfrom":[{"key":"10|openaire____::21f8a223b9925c2f87c404096080b046","value":"Registry of Research Data Repository"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1588608946167,"relClass":"provides","relType":"datasourceOrganization","source":"20|re3data_____::6b306183bc051b5aaa5376f2fab6e6e5","subRelType":"provision","target":"10|re3data_____::6371ff9ee1ec7073416cb83c868b10a3"}
+{"collectedfrom":[{"key":"10|openaire____::21f8a223b9925c2f87c404096080b046","value":"Registry of Research Data Repository"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1588608946167,"relClass":"provides","relType":"datasourceOrganization","source":"20|re3data_____::0f697c2543a43bc0da793bf78ecd4996","subRelType":"provision","target":"10|re3data_____::770ef1f8eb03f174c0add746523c6f28"}
+{"collectedfrom":[{"key":"10|openaire____::21f8a223b9925c2f87c404096080b046","value":"Registry of Research Data Repository"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1588608946167,"relClass":"provides","relType":"datasourceOrganization","source":"20|re3data_____::0f697c2543a43bc0da793bf78ecd4996","subRelType":"provision","target":"10|re3data_____::770ef1f8eb03f174c0add746523c6f28"}
+{"collectedfrom":[{"key":"10|openaire____::21f8a223b9925c2f87c404096080b046","value":"Registry of Research Data Repository"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1588608946167,"relClass":"provides","relType":"datasourceOrganization","source":"20|re3data_____::0f697c2543a43bc0da793bf78ecd4996","subRelType":"provision","target":"10|re3data_____::770ef1f8eb03f174c0add746523c6f28"}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/test/relation_2.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/test/relation_2.json
new file mode 100644
index 000000000..00db9715b
--- /dev/null
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/test/relation_2.json
@@ -0,0 +1,10 @@
+{"collectedfrom":null,"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"decisiontree-dedup-test","inferred":true,"invisible":false,"provenanceaction":{"classid":"sysimport:dedup","classname":"sysimport:dedup","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":null},"lastupdatetimestamp":null,"relClass":"isMergedIn","relType":"resultResult","source":"50|dedup_wf_001::498c4e6cfff198831b488a6c62221241","subRelType":"dedup","target":"50|doiboost____::8e5e14d80d0f2ebe6a6a55d972681628"}
+{"collectedfrom":null,"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"decisiontree-dedup-test","inferred":true,"invisible":false,"provenanceaction":{"classid":"sysimport:dedup","classname":"sysimport:dedup","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":null},"lastupdatetimestamp":null,"relClass":"isMergedIn","relType":"resultResult","source":"50|dedup_wf_001::498c4e6cfff198831b488a6c62221241","subRelType":"dedup","target":"50|doiboost____::8e5e14d80d0f2ebe6a6a55d972681628"}
+{"collectedfrom":null,"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"decisiontree-dedup-test","inferred":true,"invisible":false,"provenanceaction":{"classid":"sysimport:dedup","classname":"sysimport:dedup","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":null},"lastupdatetimestamp":null,"relClass":"isMergedIn","relType":"resultResult","source":"50|dedup_wf_001::498c4e6cfff198831b488a6c62221241","subRelType":"dedup","target":"50|doiboost____::8e5e14d80d0f2ebe6a6a55d972681628"}
+{"collectedfrom":null,"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"decisiontree-dedup-test","inferred":true,"invisible":false,"provenanceaction":{"classid":"sysimport:dedup","classname":"sysimport:dedup","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":null},"lastupdatetimestamp":null,"relClass":"isMergedIn","relType":"resultResult","source":"50|dedup_wf_001::498c4e6cfff198831b488a6c62221241","subRelType":"dedup","target":"50|doiboost____::8e5e14d80d0f2ebe6a6a55d972681628"}
+{"collectedfrom":null,"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"decisiontree-dedup-test","inferred":true,"invisible":false,"provenanceaction":{"classid":"sysimport:dedup","classname":"sysimport:dedup","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":null},"lastupdatetimestamp":null,"relClass":"isMergedIn","relType":"resultResult","source":"50|dedup_wf_001::498c4e6cfff198831b488a6c62221241","subRelType":"dedup","target":"50|doiboost____::8e5e14d80d0f2ebe6a6a55d972681628"}
+{"collectedfrom":null,"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"decisiontree-dedup-test","inferred":true,"invisible":false,"provenanceaction":{"classid":"sysimport:dedup","classname":"sysimport:dedup","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":null},"lastupdatetimestamp":null,"relClass":"isMergedIn","relType":"resultResult","source":"50|dedup_wf_001::498c4e6cfff198831b488a6c62221241","subRelType":"dedup","target":"50|doiboost____::8e5e14d80d0f2ebe6a6a55d972681628"}
+{"collectedfrom":null,"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"decisiontree-dedup-test","inferred":true,"invisible":false,"provenanceaction":{"classid":"sysimport:dedup","classname":"sysimport:dedup","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":null},"lastupdatetimestamp":null,"relClass":"isMergedIn","relType":"resultResult","source":"50|dedup_wf_001::498c4e6cfff198831b488a6c62221241","subRelType":"dedup","target":"50|doiboost____::8e5e14d80d0f2ebe6a6a55d972681628"}
+{"collectedfrom":null,"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"decisiontree-dedup-test","inferred":true,"invisible":false,"provenanceaction":{"classid":"sysimport:dedup","classname":"sysimport:dedup","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":null},"lastupdatetimestamp":null,"relClass":"isMergedIn","relType":"resultResult","source":"50|dedup_wf_001::498c4e6cfff198831b488a6c62221241","subRelType":"dedup","target":"50|doiboost____::8e5e14d80d0f2ebe6a6a55d972681628"}
+{"collectedfrom":null,"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"decisiontree-dedup-test","inferred":true,"invisible":false,"provenanceaction":{"classid":"sysimport:dedup","classname":"sysimport:dedup","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":null},"lastupdatetimestamp":null,"relClass":"isMergedIn","relType":"resultResult","source":"50|dedup_wf_001::498c4e6cfff198831b488a6c62221241","subRelType":"dedup","target":"50|doiboost____::8e5e14d80d0f2ebe6a6a55d972681628"}
+{"collectedfrom":null,"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"decisiontree-dedup-test","inferred":true,"invisible":false,"provenanceaction":{"classid":"sysimport:dedup","classname":"sysimport:dedup","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":null},"lastupdatetimestamp":null,"relClass":"isMergedIn","relType":"resultResult","source":"50|dedup_wf_001::498c4e6cfff198831b488a6c62221241","subRelType":"dedup","target":"50|doiboost____::8e5e14d80d0f2ebe6a6a55d972681629"}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup-scholexplorer/pom.xml b/dhp-workflows/dhp-dedup-scholexplorer/pom.xml
index e87811cd5..dff376c2d 100644
--- a/dhp-workflows/dhp-dedup-scholexplorer/pom.xml
+++ b/dhp-workflows/dhp-dedup-scholexplorer/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.1.7-SNAPSHOT
+ 1.2.1-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-distcp/pom.xml b/dhp-workflows/dhp-distcp/pom.xml
index d013dd1d9..c13bec8e6 100644
--- a/dhp-workflows/dhp-distcp/pom.xml
+++ b/dhp-workflows/dhp-distcp/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.1.7-SNAPSHOT
+ 1.2.1-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml
index d25446bbc..62968c410 100644
--- a/dhp-workflows/dhp-graph-mapper/pom.xml
+++ b/dhp-workflows/dhp-graph-mapper/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.1.7-SNAPSHOT
+ 1.2.1-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
index 82f5cbfd0..fd12716b4 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
@@ -10,6 +10,7 @@ import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.oaiIProvenance;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
+import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import java.util.ArrayList;
import java.util.Arrays;
@@ -24,7 +25,6 @@ import org.dom4j.DocumentFactory;
import org.dom4j.DocumentHelper;
import org.dom4j.Node;
-import eu.dnetlib.dhp.oa.graph.raw.common.MigrationConstants;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
@@ -48,6 +48,21 @@ public abstract class AbstractMdRecordToOafMapper {
protected final Map code2name;
+ protected static final String DATACITE_SCHEMA_KERNEL_4 = "http://datacite.org/schema/kernel-4";
+ protected static final String DATACITE_SCHEMA_KERNEL_3 = "http://datacite.org/schema/kernel-3";
+
+ protected static final Map nsContext = new HashMap<>();
+
+ static {
+ nsContext.put("dr", "http://www.driver-repository.eu/namespace/dr");
+ nsContext.put("dri", "http://www.driver-repository.eu/namespace/dri");
+ nsContext.put("oaf", "http://namespace.openaire.eu/oaf");
+ nsContext.put("oai", "http://www.openarchives.org/OAI/2.0/");
+ nsContext.put("prov", "http://www.openarchives.org/OAI/2.0/provenance");
+ nsContext.put("dc", "http://purl.org/dc/elements/1.1/");
+ nsContext.put("datacite", DATACITE_SCHEMA_KERNEL_3);
+ }
+
protected static final Qualifier MAIN_TITLE_QUALIFIER = qualifier(
"main title", "main title", "dnet:dataCite_title", "dnet:dataCite_title");
@@ -57,31 +72,27 @@ public abstract class AbstractMdRecordToOafMapper {
public List processMdRecord(final String xml) {
try {
- final Map nsContext = new HashMap<>();
- nsContext.put("dr", "http://www.driver-repository.eu/namespace/dr");
- nsContext.put("dri", "http://www.driver-repository.eu/namespace/dri");
- nsContext.put("oaf", "http://namespace.openaire.eu/oaf");
- nsContext.put("oai", "http://www.openarchives.org/OAI/2.0/");
- nsContext.put("prov", "http://www.openarchives.org/OAI/2.0/provenance");
- nsContext.put("dc", "http://purl.org/dc/elements/1.1/");
- nsContext.put("datacite", "http://datacite.org/schema/kernel-3");
DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext);
final Document doc = DocumentHelper
.parseText(
- xml
- .replaceAll(
- "http://datacite.org/schema/kernel-4", "http://datacite.org/schema/kernel-3"));
+ xml.replaceAll(DATACITE_SCHEMA_KERNEL_4, DATACITE_SCHEMA_KERNEL_3));
final String type = doc.valueOf("//dr:CobjCategory/@type");
- final KeyValue collectedFrom = keyValue(
- createOpenaireId(10, doc.valueOf("//oaf:collectedFrom/@id"), true),
- doc.valueOf("//oaf:collectedFrom/@name"));
+ final KeyValue collectedFrom = getProvenanceDatasource(
+ doc, "//oaf:collectedFrom/@id", "//oaf:collectedFrom/@name");
+
+ if (collectedFrom == null) {
+ return null;
+ }
+
final KeyValue hostedBy = StringUtils.isBlank(doc.valueOf("//oaf:hostedBy/@id"))
? collectedFrom
- : keyValue(
- createOpenaireId(10, doc.valueOf("//oaf:hostedBy/@id"), true),
- doc.valueOf("//oaf:hostedBy/@name"));
+ : getProvenanceDatasource(doc, "//oaf:hostedBy/@id", "//oaf:hostedBy/@name");
+
+ if (hostedBy == null) {
+ return null;
+ }
final DataInfo info = prepareDataInfo(doc);
final long lastUpdateTimestamp = new Date().getTime();
@@ -92,6 +103,19 @@ public abstract class AbstractMdRecordToOafMapper {
}
}
+ private KeyValue getProvenanceDatasource(Document doc, String xpathId, String xpathName) {
+ final String dsId = doc.valueOf(xpathId);
+ final String dsName = doc.valueOf(xpathName);
+
+ if (StringUtils.isBlank(dsId) | StringUtils.isBlank(dsName)) {
+ return null;
+ }
+
+ return keyValue(
+ createOpenaireId(10, dsId, true),
+ dsName);
+ }
+
protected List createOafs(
final Document doc,
final String type,
@@ -107,14 +131,14 @@ public abstract class AbstractMdRecordToOafMapper {
case "publication":
final Publication p = new Publication();
populateResultFields(p, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
- p.setResulttype(MigrationConstants.PUBLICATION_RESULTTYPE_QUALIFIER);
+ p.setResulttype(PUBLICATION_DEFAULT_RESULTTYPE);
p.setJournal(prepareJournal(doc, info));
oafs.add(p);
break;
case "dataset":
final Dataset d = new Dataset();
populateResultFields(d, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
- d.setResulttype(MigrationConstants.DATASET_RESULTTYPE_QUALIFIER);
+ d.setResulttype(PUBLICATION_DEFAULT_RESULTTYPE);
d.setStoragedate(prepareDatasetStorageDate(doc, info));
d.setDevice(prepareDatasetDevice(doc, info));
d.setSize(prepareDatasetSize(doc, info));
@@ -127,7 +151,7 @@ public abstract class AbstractMdRecordToOafMapper {
case "software":
final Software s = new Software();
populateResultFields(s, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
- s.setResulttype(MigrationConstants.SOFTWARE_RESULTTYPE_QUALIFIER);
+ s.setResulttype(SOFTWARE_DEFAULT_RESULTTYPE);
s.setDocumentationUrl(prepareSoftwareDocumentationUrls(doc, info));
s.setLicense(prepareSoftwareLicenses(doc, info));
s.setCodeRepositoryUrl(prepareSoftwareCodeRepositoryUrl(doc, info));
@@ -138,7 +162,7 @@ public abstract class AbstractMdRecordToOafMapper {
default:
final OtherResearchProduct o = new OtherResearchProduct();
populateResultFields(o, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
- o.setResulttype(MigrationConstants.OTHER_RESULTTYPE_QUALIFIER);
+ o.setResulttype(ORP_DEFAULT_RESULTTYPE);
o.setContactperson(prepareOtherResearchProductContactPersons(doc, info));
o.setContactgroup(prepareOtherResearchProductContactGroups(doc, info));
o.setTool(prepareOtherResearchProductTools(doc, info));
@@ -171,33 +195,36 @@ public abstract class AbstractMdRecordToOafMapper {
if (StringUtils.isNotBlank(originalId)) {
final String projectId = createOpenaireId(40, originalId, true);
- final Relation r1 = new Relation();
- r1.setRelType("resultProject");
- r1.setSubRelType("outcome");
- r1.setRelClass("isProducedBy");
- r1.setSource(docId);
- r1.setTarget(projectId);
- r1.setCollectedfrom(Arrays.asList(collectedFrom));
- r1.setDataInfo(info);
- r1.setLastupdatetimestamp(lastUpdateTimestamp);
- res.add(r1);
-
- final Relation r2 = new Relation();
- r2.setRelType("resultProject");
- r2.setSubRelType("outcome");
- r2.setRelClass("produces");
- r2.setSource(projectId);
- r2.setTarget(docId);
- r2.setCollectedfrom(Arrays.asList(collectedFrom));
- r2.setDataInfo(info);
- r2.setLastupdatetimestamp(lastUpdateTimestamp);
- res.add(r2);
+ res
+ .add(
+ getRelation(
+ docId, projectId, RESULT_PROJECT, OUTCOME, IS_PRODUCED_BY, collectedFrom, info,
+ lastUpdateTimestamp));
+ res
+ .add(
+ getRelation(
+ projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, collectedFrom, info,
+ lastUpdateTimestamp));
}
}
return res;
}
+ protected Relation getRelation(String source, String target, String relType, String subRelType, String relClass,
+ KeyValue collectedFrom, DataInfo info, long lastUpdateTimestamp) {
+ final Relation rel = new Relation();
+ rel.setRelType(relType);
+ rel.setSubRelType(subRelType);
+ rel.setRelClass(relClass);
+ rel.setSource(source);
+ rel.setTarget(target);
+ rel.setCollectedfrom(Arrays.asList(collectedFrom));
+ rel.setDataInfo(info);
+ rel.setLastupdatetimestamp(lastUpdateTimestamp);
+ return rel;
+ }
+
protected abstract List addOtherResultRels(
final Document doc,
final KeyValue collectedFrom,
@@ -423,7 +450,7 @@ public abstract class AbstractMdRecordToOafMapper {
if (n == null) {
return dataInfo(
- false, null, false, false, MigrationConstants.REPOSITORY_PROVENANCE_ACTIONS, "0.9");
+ false, null, false, false, REPOSITORY_PROVENANCE_ACTIONS, "0.9");
}
final String paClassId = n.valueOf("./oaf:provenanceaction/@classid");
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java
index ccc9f8a89..503e4c504 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java
@@ -95,6 +95,7 @@ public class GenerateEntitiesApplication {
.sequenceFile(sp, Text.class, Text.class)
.map(k -> new Tuple2<>(k._1().toString(), k._2().toString()))
.map(k -> convertToListOaf(k._1(), k._2(), code2name))
+ .filter(Objects::nonNull)
.flatMap(list -> list.iterator()));
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java
index 58339fdc5..e96c41066 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java
@@ -10,6 +10,7 @@ import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listKeyValues;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
+import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import java.io.Closeable;
import java.io.IOException;
@@ -31,7 +32,6 @@ import org.apache.commons.logging.LogFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.oa.graph.raw.common.DbClient;
-import eu.dnetlib.dhp.oa.graph.raw.common.MigrationConstants;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
@@ -51,19 +51,23 @@ import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
- implements Closeable {
+ implements Closeable {
private static final Log log = LogFactory.getLog(MigrateDbEntitiesApplication.class);
+ public static final String SOURCE_TYPE = "source_type";
+ public static final String TARGET_TYPE = "target_type";
+
private final DbClient dbClient;
private final long lastUpdateTimestamp;
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils
- .toString(MigrateDbEntitiesApplication.class
- .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_db_entities_parameters.json")));
+ IOUtils
+ .toString(
+ MigrateDbEntitiesApplication.class
+ .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_db_entities_parameters.json")));
parser.parseArgument(args);
@@ -76,7 +80,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims");
try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, dbUrl, dbUser,
- dbPassword)) {
+ dbPassword)) {
if (processClaims) {
log.info("Processing claims...");
smdbe.execute("queryClaims.sql", smdbe::processClaims);
@@ -107,15 +111,15 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
}
public MigrateDbEntitiesApplication(
- final String hdfsPath, final String dbUrl, final String dbUser, final String dbPassword)
- throws Exception {
+ final String hdfsPath, final String dbUrl, final String dbUser, final String dbPassword)
+ throws Exception {
super(hdfsPath);
this.dbClient = new DbClient(dbUrl, dbUser, dbPassword);
this.lastUpdateTimestamp = new Date().getTime();
}
public void execute(final String sqlFile, final Function> producer)
- throws Exception {
+ throws Exception {
final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/sql/" + sqlFile));
final Consumer consumer = rs -> producer.apply(rs).forEach(oaf -> emitOaf(oaf));
@@ -134,7 +138,10 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
ds.setId(createOpenaireId(10, rs.getString("datasourceid"), true));
ds.setOriginalId(Arrays.asList(rs.getString("datasourceid")));
ds
- .setCollectedfrom(listKeyValues(createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname")));
+ .setCollectedfrom(
+ listKeyValues(
+ createOpenaireId(10, rs.getString("collectedfromid"), true),
+ rs.getString("collectedfromname")));
ds.setPid(new ArrayList<>());
ds.setDateofcollection(asString(rs.getDate("dateofcollection")));
ds.setDateoftransformation(null); // Value not returned by the SQL query
@@ -175,7 +182,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
ds.setCertificates(field(rs.getString("certificates"), info));
ds.setPolicies(new ArrayList<>()); // The sql query returns an empty array
ds
- .setJournal(prepareJournal(rs.getString("officialname"), rs.getString("journal"), info)); // Journal
+ .setJournal(prepareJournal(rs.getString("officialname"), rs.getString("journal"), info)); // Journal
ds.setDataInfo(info);
ds.setLastupdatetimestamp(lastUpdateTimestamp);
@@ -195,7 +202,10 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
p.setId(createOpenaireId(40, rs.getString("projectid"), true));
p.setOriginalId(Arrays.asList(rs.getString("projectid")));
p
- .setCollectedfrom(listKeyValues(createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname")));
+ .setCollectedfrom(
+ listKeyValues(
+ createOpenaireId(10, rs.getString("collectedfromid"), true),
+ rs.getString("collectedfromname")));
p.setPid(new ArrayList<>());
p.setDateofcollection(asString(rs.getDate("dateofcollection")));
p.setDateoftransformation(asString(rs.getDate("dateoftransformation")));
@@ -212,7 +222,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
p.setDuration(field(Integer.toString(rs.getInt("duration")), info));
p.setEcsc39(field(Boolean.toString(rs.getBoolean("ecsc39")), info));
p
- .setOamandatepublications(field(Boolean.toString(rs.getBoolean("oamandatepublications")), info));
+ .setOamandatepublications(field(Boolean.toString(rs.getBoolean("oamandatepublications")), info));
p.setEcarticle29_3(field(Boolean.toString(rs.getBoolean("ecarticle29_3")), info));
p.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info));
p.setFundingtree(prepareListFields(rs.getArray("fundingtree"), info));
@@ -249,7 +259,10 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
o.setId(createOpenaireId(20, rs.getString("organizationid"), true));
o.setOriginalId(Arrays.asList(rs.getString("organizationid")));
o
- .setCollectedfrom(listKeyValues(createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname")));
+ .setCollectedfrom(
+ listKeyValues(
+ createOpenaireId(10, rs.getString("collectedfromid"), true),
+ rs.getString("collectedfromname")));
o.setPid(new ArrayList<>());
o.setDateofcollection(asString(rs.getDate("dateofcollection")));
o.setDateoftransformation(asString(rs.getDate("dateoftransformation")));
@@ -264,12 +277,14 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
o.setEclegalperson(field(Boolean.toString(rs.getBoolean("eclegalperson")), info));
o.setEcnonprofit(field(Boolean.toString(rs.getBoolean("ecnonprofit")), info));
o
- .setEcresearchorganization(field(Boolean.toString(rs.getBoolean("ecresearchorganization")), info));
+ .setEcresearchorganization(field(Boolean.toString(rs.getBoolean("ecresearchorganization")), info));
o.setEchighereducation(field(Boolean.toString(rs.getBoolean("echighereducation")), info));
o
- .setEcinternationalorganizationeurinterests(field(Boolean.toString(rs.getBoolean("ecinternationalorganizationeurinterests")), info));
+ .setEcinternationalorganizationeurinterests(
+ field(Boolean.toString(rs.getBoolean("ecinternationalorganizationeurinterests")), info));
o
- .setEcinternationalorganization(field(Boolean.toString(rs.getBoolean("ecinternationalorganization")), info));
+ .setEcinternationalorganization(
+ field(Boolean.toString(rs.getBoolean("ecinternationalorganization")), info));
o.setEcenterprise(field(Boolean.toString(rs.getBoolean("ecenterprise")), info));
o.setEcsmevalidated(field(Boolean.toString(rs.getBoolean("ecsmevalidated")), info));
o.setEcnutscode(field(Boolean.toString(rs.getBoolean("ecnutscode")), info));
@@ -288,12 +303,13 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
final DataInfo info = prepareDataInfo(rs);
final String orgId = createOpenaireId(20, rs.getString("organization"), true);
final String dsId = createOpenaireId(10, rs.getString("datasource"), true);
- final List collectedFrom = listKeyValues(createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
+ final List collectedFrom = listKeyValues(
+ createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
final Relation r1 = new Relation();
- r1.setRelType("datasourceOrganization");
- r1.setSubRelType("provision");
- r1.setRelClass("isProvidedBy");
+ r1.setRelType(DATASOURCE_ORGANIZATION);
+ r1.setSubRelType(PROVISION);
+ r1.setRelClass(IS_PROVIDED_BY);
r1.setSource(dsId);
r1.setTarget(orgId);
r1.setCollectedfrom(collectedFrom);
@@ -301,9 +317,9 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
r1.setLastupdatetimestamp(lastUpdateTimestamp);
final Relation r2 = new Relation();
- r2.setRelType("datasourceOrganization");
- r2.setSubRelType("provision");
- r2.setRelClass("provides");
+ r2.setRelType(DATASOURCE_ORGANIZATION);
+ r2.setSubRelType(PROVISION);
+ r2.setRelClass(PROVIDES);
r2.setSource(orgId);
r2.setTarget(dsId);
r2.setCollectedfrom(collectedFrom);
@@ -321,12 +337,13 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
final DataInfo info = prepareDataInfo(rs);
final String orgId = createOpenaireId(20, rs.getString("resporganization"), true);
final String projectId = createOpenaireId(40, rs.getString("project"), true);
- final List collectedFrom = listKeyValues(createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
+ final List collectedFrom = listKeyValues(
+ createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
final Relation r1 = new Relation();
- r1.setRelType("projectOrganization");
- r1.setSubRelType("participation");
- r1.setRelClass("hasParticipant");
+ r1.setRelType(PROJECT_ORGANIZATION);
+ r1.setSubRelType(PARTICIPATION);
+ r1.setRelClass(HAS_PARTICIPANT);
r1.setSource(projectId);
r1.setTarget(orgId);
r1.setCollectedfrom(collectedFrom);
@@ -334,9 +351,9 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
r1.setLastupdatetimestamp(lastUpdateTimestamp);
final Relation r2 = new Relation();
- r2.setRelType("projectOrganization");
- r2.setSubRelType("participation");
- r2.setRelClass("isParticipant");
+ r2.setRelType(PROJECT_ORGANIZATION);
+ r2.setSubRelType(PARTICIPATION);
+ r2.setRelClass(IS_PARTICIPANT);
r2.setSource(orgId);
r2.setTarget(projectId);
r2.setCollectedfrom(collectedFrom);
@@ -351,28 +368,30 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
public List processClaims(final ResultSet rs) {
- final DataInfo info =
- dataInfo(false, null, false, false, qualifier("user:claim", "user:claim", "dnet:provenanceActions", "dnet:provenanceActions"), "0.9");
+ final DataInfo info = dataInfo(
+ false, null, false, false,
+ qualifier(USER_CLAIM, USER_CLAIM, DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS), "0.9");
- final List collectedFrom = listKeyValues(createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE");
+ final List collectedFrom = listKeyValues(
+ createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE");
try {
- if (rs.getString("source_type").equals("context")) {
+ if (rs.getString(SOURCE_TYPE).equals("context")) {
final Result r;
- if (rs.getString("target_type").equals("dataset")) {
+ if (rs.getString(TARGET_TYPE).equals("dataset")) {
r = new Dataset();
- r.setResulttype(MigrationConstants.DATASET_RESULTTYPE_QUALIFIER);
- } else if (rs.getString("target_type").equals("software")) {
+ r.setResulttype(DATASET_DEFAULT_RESULTTYPE);
+ } else if (rs.getString(TARGET_TYPE).equals("software")) {
r = new Software();
- r.setResulttype(MigrationConstants.SOFTWARE_RESULTTYPE_QUALIFIER);
- } else if (rs.getString("target_type").equals("other")) {
+ r.setResulttype(SOFTWARE_DEFAULT_RESULTTYPE);
+ } else if (rs.getString(TARGET_TYPE).equals("other")) {
r = new OtherResearchProduct();
- r.setResulttype(MigrationConstants.OTHER_RESULTTYPE_QUALIFIER);
+ r.setResulttype(ORP_DEFAULT_RESULTTYPE);
} else {
r = new Publication();
- r.setResulttype(MigrationConstants.PUBLICATION_RESULTTYPE_QUALIFIER);
+ r.setResulttype(PUBLICATION_DEFAULT_RESULTTYPE);
}
r.setId(createOpenaireId(50, rs.getString("target_id"), false));
r.setLastupdatetimestamp(lastUpdateTimestamp);
@@ -382,32 +401,32 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
return Arrays.asList(r);
} else {
- final String sourceId = createOpenaireId(rs.getString("source_type"), rs.getString("source_id"), false);
- final String targetId = createOpenaireId(rs.getString("target_type"), rs.getString("target_id"), false);
+ final String sourceId = createOpenaireId(rs.getString(SOURCE_TYPE), rs.getString("source_id"), false);
+ final String targetId = createOpenaireId(rs.getString(TARGET_TYPE), rs.getString("target_id"), false);
final Relation r1 = new Relation();
final Relation r2 = new Relation();
- if (rs.getString("source_type").equals("project")) {
+ if (rs.getString(SOURCE_TYPE).equals("project")) {
r1.setCollectedfrom(collectedFrom);
- r1.setRelType("resultProject");
- r1.setSubRelType("outcome");
- r1.setRelClass("produces");
+ r1.setRelType(RESULT_PROJECT);
+ r1.setSubRelType(OUTCOME);
+ r1.setRelClass(PRODUCES);
r2.setCollectedfrom(collectedFrom);
- r2.setRelType("resultProject");
- r2.setSubRelType("outcome");
- r2.setRelClass("isProducedBy");
+ r2.setRelType(RESULT_PROJECT);
+ r2.setSubRelType(OUTCOME);
+ r2.setRelClass(IS_PRODUCED_BY);
} else {
r1.setCollectedfrom(collectedFrom);
- r1.setRelType("resultResult");
- r1.setSubRelType("relationship");
- r1.setRelClass("isRelatedTo");
+ r1.setRelType(RESULT_RESULT);
+ r1.setSubRelType(RELATIONSHIP);
+ r1.setRelClass(IS_RELATED_TO);
r2.setCollectedfrom(collectedFrom);
- r2.setRelType("resultResult");
- r2.setSubRelType("relationship");
- r2.setRelClass("isRelatedTo");
+ r2.setRelType(RESULT_RESULT);
+ r2.setSubRelType(RELATIONSHIP);
+ r2.setRelClass(IS_RELATED_TO);
}
r1.setSource(sourceId);
@@ -440,11 +459,14 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
final String inferenceprovenance = rs.getString("inferenceprovenance");
final Boolean inferred = rs.getBoolean("inferred");
final String trust = rs.getString("trust");
- return dataInfo(deletedbyinference, inferenceprovenance, inferred, false, MigrationConstants.ENTITYREGISTRY_PROVENANCE_ACTION, trust);
+ return dataInfo(
+ deletedbyinference, inferenceprovenance, inferred, false, ENTITYREGISTRY_PROVENANCE_ACTION, trust);
}
private Qualifier prepareQualifierSplitting(final String s) {
- if (StringUtils.isBlank(s)) { return null; }
+ if (StringUtils.isBlank(s)) {
+ return null;
+ }
final String[] arr = s.split("@@@");
return arr.length == 4 ? qualifier(arr[0], arr[1], arr[2], arr[3]) : null;
}
@@ -458,19 +480,23 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
}
private StructuredProperty prepareStructProp(final String s, final DataInfo dataInfo) {
- if (StringUtils.isBlank(s)) { return null; }
+ if (StringUtils.isBlank(s)) {
+ return null;
+ }
final String[] parts = s.split("###");
if (parts.length == 2) {
final String value = parts[0];
final String[] arr = parts[1].split("@@@");
- if (arr.length == 4) { return structuredProperty(value, arr[0], arr[1], arr[2], arr[3], dataInfo); }
+ if (arr.length == 4) {
+ return structuredProperty(value, arr[0], arr[1], arr[2], arr[3], dataInfo);
+ }
}
return null;
}
private List prepareListOfStructProps(
- final Array array,
- final DataInfo dataInfo) throws SQLException {
+ final Array array,
+ final DataInfo dataInfo) throws SQLException {
final List res = new ArrayList<>();
if (array != null) {
for (final String s : (String[]) array.getArray()) {
@@ -489,8 +515,9 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
final String[] arr = sj.split("@@@");
if (arr.length == 3) {
final String issn = StringUtils.isNotBlank(arr[0]) ? arr[0].trim() : null;
- final String eissn = StringUtils.isNotBlank(arr[1]) ? arr[1].trim() : null;;
- final String lissn = StringUtils.isNotBlank(arr[2]) ? arr[2].trim() : null;;
+ final String eissn = StringUtils.isNotBlank(arr[1]) ? arr[1].trim() : null;
+ final String lissn = StringUtils.isNotBlank(arr[2]) ? arr[2].trim() : null;
+
if (issn != null || eissn != null || lissn != null) {
return journal(name, issn, eissn, eissn, null, null, null, null, null, null, null, info);
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java
index 286656149..891fee57e 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java
@@ -3,27 +3,19 @@ package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
+import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document;
import org.dom4j.Node;
+import com.google.common.collect.Lists;
+
import eu.dnetlib.dhp.oa.graph.raw.common.PacePerson;
-import eu.dnetlib.dhp.schema.oaf.Author;
-import eu.dnetlib.dhp.schema.oaf.DataInfo;
-import eu.dnetlib.dhp.schema.oaf.Field;
-import eu.dnetlib.dhp.schema.oaf.GeoLocation;
-import eu.dnetlib.dhp.schema.oaf.Instance;
-import eu.dnetlib.dhp.schema.oaf.KeyValue;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
-import eu.dnetlib.dhp.schema.oaf.Qualifier;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+import eu.dnetlib.dhp.schema.oaf.*;
public class OafToOafMapper extends AbstractMdRecordToOafMapper {
@@ -52,7 +44,7 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
@Override
protected Qualifier prepareLanguages(final Document doc) {
- return prepareQualifier(doc, "//dc:language", "dnet:languages", "dnet:languages");
+ return prepareQualifier(doc, "//dc:language", DNET_LANGUAGES, DNET_LANGUAGES);
}
@Override
@@ -96,38 +88,43 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
final DataInfo info,
final KeyValue collectedfrom,
final KeyValue hostedby) {
- final List res = new ArrayList<>();
- for (final Object o : doc.selectNodes("//dc:identifier")) {
- final String url = ((Node) o).getText().trim();
- if (url.startsWith("http")) {
- final Instance instance = new Instance();
- instance.setUrl(Arrays.asList(url));
- instance
- .setInstancetype(
- prepareQualifier(
- doc,
- "//dr:CobjCategory",
- "dnet:publication_resource",
- "dnet:publication_resource"));
- instance.setCollectedfrom(collectedfrom);
- instance.setHostedby(hostedby);
- instance.setDateofacceptance(field(doc.valueOf("//oaf:dateAccepted"), info));
- instance.setDistributionlocation(doc.valueOf("//oaf:distributionlocation"));
- instance
- .setAccessright(
- prepareQualifier(doc, "//oaf:accessrights", "dnet:access_modes", "dnet:access_modes"));
- instance.setLicense(field(doc.valueOf("//oaf:license"), info));
- instance.setRefereed(field(doc.valueOf("//oaf:refereed"), info));
- instance
- .setProcessingchargeamount(
- field(doc.valueOf("//oaf:processingchargeamount"), info));
- instance
- .setProcessingchargecurrency(
- field(doc.valueOf("//oaf:processingchargeamount/@currency"), info));
- res.add(instance);
- }
- }
- return res;
+
+ final Instance instance = new Instance();
+ instance
+ .setInstancetype(
+ prepareQualifier(
+ doc,
+ "//dr:CobjCategory",
+ DNET_PUBLICATION_RESOURCE,
+ DNET_PUBLICATION_RESOURCE));
+ instance.setCollectedfrom(collectedfrom);
+ instance.setHostedby(hostedby);
+ instance.setDateofacceptance(field(doc.valueOf("//oaf:dateAccepted"), info));
+ instance.setDistributionlocation(doc.valueOf("//oaf:distributionlocation"));
+ instance
+ .setAccessright(
+ prepareQualifier(doc, "//oaf:accessrights", DNET_ACCESS_MODES, DNET_ACCESS_MODES));
+ instance.setLicense(field(doc.valueOf("//oaf:license"), info));
+ instance.setRefereed(field(doc.valueOf("//oaf:refereed"), info));
+ instance
+ .setProcessingchargeamount(
+ field(doc.valueOf("//oaf:processingchargeamount"), info));
+ instance
+ .setProcessingchargecurrency(
+ field(doc.valueOf("//oaf:processingchargeamount/@currency"), info));
+
+ List nodes = Lists.newArrayList(doc.selectNodes("//dc:identifier"));
+ instance
+ .setUrl(
+ nodes
+ .stream()
+ .filter(n -> StringUtils.isNotBlank(n.getText()))
+ .map(n -> n.getText().trim())
+ .filter(u -> u.startsWith("http"))
+ .distinct()
+ .collect(Collectors.toCollection(ArrayList::new)));
+
+ return Lists.newArrayList(instance);
}
@Override
@@ -241,27 +238,16 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
final String otherId = createOpenaireId(50, originalId, false);
- final Relation r1 = new Relation();
- r1.setRelType("resultResult");
- r1.setSubRelType("publicationDataset");
- r1.setRelClass("isRelatedTo");
- r1.setSource(docId);
- r1.setTarget(otherId);
- r1.setCollectedfrom(Arrays.asList(collectedFrom));
- r1.setDataInfo(info);
- r1.setLastupdatetimestamp(lastUpdateTimestamp);
- res.add(r1);
-
- final Relation r2 = new Relation();
- r2.setRelType("resultResult");
- r2.setSubRelType("publicationDataset");
- r2.setRelClass("isRelatedTo");
- r2.setSource(otherId);
- r2.setTarget(docId);
- r2.setCollectedfrom(Arrays.asList(collectedFrom));
- r2.setDataInfo(info);
- r2.setLastupdatetimestamp(lastUpdateTimestamp);
- res.add(r2);
+ res
+ .add(
+ getRelation(
+ docId, otherId, RESULT_RESULT, PUBLICATION_DATASET, IS_RELATED_TO, collectedFrom, info,
+ lastUpdateTimestamp));
+ res
+ .add(
+ getRelation(
+ otherId, docId, RESULT_RESULT, PUBLICATION_DATASET, IS_RELATED_TO, collectedFrom, info,
+ lastUpdateTimestamp));
}
}
return res;
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java
index 93b0eb29c..04984d008 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java
@@ -4,16 +4,15 @@ package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
+import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document;
import org.dom4j.Node;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
@@ -27,6 +26,8 @@ import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
+ public static final String HTTP_DX_DOI_PREIFX = "http://dx.doi.org/";
+
public OdfToOafMapper(final Map code2name) {
super(code2name);
}
@@ -62,7 +63,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
structuredProperty(
((Node) o).getText(),
prepareQualifier(
- (Node) o, "./@nameIdentifierScheme", "dnet:pid_types", "dnet:pid_types"),
+ (Node) o, "./@nameIdentifierScheme", DNET_PID_TYPES, DNET_PID_TYPES),
info));
}
return res;
@@ -76,18 +77,19 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
final KeyValue hostedby) {
final Instance instance = new Instance();
+ final Set url = new HashSet<>();
instance.setUrl(new ArrayList<>());
instance
.setInstancetype(
prepareQualifier(
- doc, "//dr:CobjCategory", "dnet:publication_resource", "dnet:publication_resource"));
+ doc, "//dr:CobjCategory", DNET_PUBLICATION_RESOURCE, DNET_PUBLICATION_RESOURCE));
instance.setCollectedfrom(collectedfrom);
instance.setHostedby(hostedby);
instance.setDateofacceptance(field(doc.valueOf("//oaf:dateAccepted"), info));
instance.setDistributionlocation(doc.valueOf("//oaf:distributionlocation"));
instance
.setAccessright(
- prepareQualifier(doc, "//oaf:accessrights", "dnet:access_modes", "dnet:access_modes"));
+ prepareQualifier(doc, "//oaf:accessrights", DNET_ACCESS_MODES, DNET_ACCESS_MODES));
instance.setLicense(field(doc.valueOf("//oaf:license"), info));
instance.setRefereed(field(doc.valueOf("//oaf:refereed"), info));
instance.setProcessingchargeamount(field(doc.valueOf("//oaf:processingchargeamount"), info));
@@ -96,17 +98,18 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
field(doc.valueOf("//oaf:processingchargeamount/@currency"), info));
for (final Object o : doc.selectNodes("//datacite:alternateIdentifier[@alternateIdentifierType='URL']")) {
- instance.getUrl().add(((Node) o).getText().trim());
+ url.add(((Node) o).getText().trim());
}
for (final Object o : doc.selectNodes("//datacite:identifier[@identifierType='URL']")) {
- instance.getUrl().add(((Node) o).getText().trim());
+ url.add(((Node) o).getText().trim());
}
for (final Object o : doc.selectNodes("//datacite:alternateIdentifier[@alternateIdentifierType='DOI']")) {
- instance.getUrl().add("http://dx.doi.org/" + ((Node) o).getText().trim());
+ url.add(HTTP_DX_DOI_PREIFX + ((Node) o).getText().trim());
}
for (final Object o : doc.selectNodes("//datacite:identifier[@identifierType='DOI']")) {
- instance.getUrl().add("http://dx.doi.org/" + ((Node) o).getText().trim());
+ url.add(HTTP_DX_DOI_PREIFX + ((Node) o).getText().trim());
}
+ instance.getUrl().addAll(url);
return Arrays.asList(instance);
}
@@ -131,8 +134,8 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
((Node) o).getText(),
"UNKNOWN",
"UNKNOWN",
- "dnet:dataCite_date",
- "dnet:dataCite_date",
+ DNET_DATA_CITE_DATE,
+ DNET_DATA_CITE_DATE,
info));
}
}
@@ -171,7 +174,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
@Override
protected Qualifier prepareLanguages(final Document doc) {
- return prepareQualifier(doc, "//datacite:language", "dnet:languages", "dnet:languages");
+ return prepareQualifier(doc, "//datacite:language", DNET_LANGUAGES, DNET_LANGUAGES);
}
@Override
@@ -292,36 +295,29 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
final String otherId = createOpenaireId(50, originalId, false);
final String type = ((Node) o).valueOf("@relationType");
- if (type.equals("IsSupplementTo")) {
+ if (type.equalsIgnoreCase("IsSupplementTo")) {
res
.add(
- prepareOtherResultRel(
- collectedFrom,
- info,
- lastUpdateTimestamp,
- docId,
- otherId,
- "supplement",
- "isSupplementTo"));
+ getRelation(
+ docId, otherId, RESULT_RESULT, SUPPLEMENT, IS_SUPPLEMENT_TO, collectedFrom, info,
+ lastUpdateTimestamp));
res
.add(
- prepareOtherResultRel(
- collectedFrom,
- info,
- lastUpdateTimestamp,
- otherId,
- docId,
- "supplement",
- "isSupplementedBy"));
+ getRelation(
+ otherId, docId, RESULT_RESULT, SUPPLEMENT, IS_SUPPLEMENTED_BY, collectedFrom, info,
+ lastUpdateTimestamp));
} else if (type.equals("IsPartOf")) {
+
res
.add(
- prepareOtherResultRel(
- collectedFrom, info, lastUpdateTimestamp, docId, otherId, "part", "IsPartOf"));
+ getRelation(
+ docId, otherId, RESULT_RESULT, PART, IS_PART_OF, collectedFrom, info,
+ lastUpdateTimestamp));
res
.add(
- prepareOtherResultRel(
- collectedFrom, info, lastUpdateTimestamp, otherId, docId, "part", "HasParts"));
+ getRelation(
+ otherId, docId, RESULT_RESULT, PART, HAS_PARTS, collectedFrom, info,
+ lastUpdateTimestamp));
} else {
}
}
@@ -329,32 +325,12 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
return res;
}
- private Relation prepareOtherResultRel(
- final KeyValue collectedFrom,
- final DataInfo info,
- final long lastUpdateTimestamp,
- final String source,
- final String target,
- final String subRelType,
- final String relClass) {
- final Relation r = new Relation();
- r.setRelType("resultResult");
- r.setSubRelType(subRelType);
- r.setRelClass(relClass);
- r.setSource(source);
- r.setTarget(target);
- r.setCollectedfrom(Arrays.asList(collectedFrom));
- r.setDataInfo(info);
- r.setLastupdatetimestamp(lastUpdateTimestamp);
- return r;
- }
-
@Override
protected Qualifier prepareResourceType(final Document doc, final DataInfo info) {
return prepareQualifier(
doc,
"//*[local-name() = 'resource']//*[local-name() = 'resourceType']",
- "dnet:dataCite_resource",
- "dnet:dataCite_resource");
+ DNET_DATA_CITE_RESOURCE,
+ DNET_DATA_CITE_RESOURCE);
}
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MigrationConstants.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MigrationConstants.java
deleted file mode 100644
index 15bff9565..000000000
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MigrationConstants.java
+++ /dev/null
@@ -1,27 +0,0 @@
-
-package eu.dnetlib.dhp.oa.graph.raw.common;
-
-import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
-
-import eu.dnetlib.dhp.schema.oaf.Qualifier;
-
-public class MigrationConstants {
-
- public static final Qualifier PUBLICATION_RESULTTYPE_QUALIFIER = qualifier(
- "publication", "publication", "dnet:result_typologies", "dnet:result_typologies");
- public static final Qualifier DATASET_RESULTTYPE_QUALIFIER = qualifier(
- "dataset", "dataset",
- "dnet:result_typologies", "dnet:result_typologies");
- public static final Qualifier SOFTWARE_RESULTTYPE_QUALIFIER = qualifier(
- "software", "software",
- "dnet:result_typologies", "dnet:result_typologies");
- public static final Qualifier OTHER_RESULTTYPE_QUALIFIER = qualifier(
- "other", "other",
- "dnet:result_typologies", "dnet:result_typologies");
- public static final Qualifier REPOSITORY_PROVENANCE_ACTIONS = qualifier(
- "sysimport:crosswalk:repository", "sysimport:crosswalk:repository",
- "dnet:provenanceActions", "dnet:provenanceActions");
- public static final Qualifier ENTITYREGISTRY_PROVENANCE_ACTION = qualifier(
- "sysimport:crosswalk:entityregistry", "sysimport:crosswalk:entityregistry",
- "dnet:provenanceActions", "dnet:provenanceActions");
-}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml
index 9f91380ab..fa015499c 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml
@@ -115,11 +115,11 @@
eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication
- -p${contentPath}/db_claims
- -pgurl${postgresURL}
- -pguser${postgresUser}
- -pgpasswd${postgresPassword}
- -aclaims
+ --hdfsPath${contentPath}/db_claims
+ --postgresUrl${postgresURL}
+ --postgresUser${postgresUser}
+ --postgresPassword${postgresPassword}
+ --actionclaims
@@ -165,10 +165,10 @@
eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication
- -p${contentPath}/db_records
- -pgurl${postgresURL}
- -pguser${postgresUser}
- -pgpasswd${postgresPassword}
+ --hdfsPath${contentPath}/db_records
+ --postgresUrl${postgresURL}
+ --postgresUser${postgresUser}
+ --postgresPassword${postgresPassword}
@@ -180,12 +180,12 @@
eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication
- -p${contentPath}/odf_records
- -mongourl${mongoURL}
- -mongodb${mongoDb}
- -fODF
- -lstore
- -icleaned
+ --hdfsPath${contentPath}/odf_records
+ --mongoBaseUrl${mongoURL}
+ --mongoDb${mongoDb}
+ --mdFormatODF
+ --mdLayoutstore
+ --mdInterpretationcleaned
@@ -197,12 +197,12 @@
eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication
- -p${contentPath}/oaf_records
- -mongourl${mongoURL}
- -mongodb${mongoDb}
- -fOAF
- -lstore
- -icleaned
+ --hdfsPath${contentPath}/oaf_records
+ --mongoBaseUrl${mongoURL}
+ --mongoDb${mongoDb}
+ --mdFormatOAF
+ --mdLayoutstore
+ --mdInterpretationcleaned
@@ -231,11 +231,11 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- -s${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims
- -t${workingDir}/entities_claim
- -pgurl${postgresURL}
- -pguser${postgresUser}
- -pgpasswd${postgresPassword}
+ --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims
+ --targetPath${workingDir}/entities_claim
+ --postgresUrl${postgresURL}
+ --postgresUser${postgresUser}
+ --postgresPassword${postgresPassword}
@@ -257,8 +257,8 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- -s${workingDir}/entities_claim
- -g${workingDir}/graph_claims
+ --sourcePath${workingDir}/entities_claim
+ --graphRawPath${workingDir}/graph_claims
@@ -280,11 +280,11 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- -s${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records
- -t${workingDir}/entities
- -pgurl${postgresURL}
- -pguser${postgresUser}
- -pgpasswd${postgresPassword}
+ --sourcePaths${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records
+ --targetPath${workingDir}/entities
+ --postgresUrl${postgresURL}
+ --postgresUser${postgresUser}
+ --postgresPassword${postgresPassword}
@@ -307,8 +307,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- -s${workingDir}/entities
- -g${workingDir}/graph_raw
+ --sourcePath${workingDir}/entities
+ --graphRawPath${workingDir}/graph_raw
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml
index 0730f3a1f..05b85a561 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml
@@ -1,8 +1,8 @@
-
+
- migrationPathStep1
- the base path to store hdfs file
+ contentPath
+ path location to store (or reuse) content from the aggregator
postgresURL
@@ -16,6 +16,7 @@
postgresPassword
the password postgres
+
sparkDriverMemory
memory for driver process
@@ -28,31 +29,81 @@
sparkExecutorCores
number of cores used by single executor
+
+ oozieActionShareLibForSpark2
+ oozie action sharelib for spark 2.*
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+ spark 2.* extra listeners classname
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+ spark 2.* sql query execution listeners classname
+
+
+ spark2YarnHistoryServerAddress
+ spark 2.* yarn history server address
+
+
+ spark2EventLogDir
+ spark 2.* event log dir location
+
-
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ mapreduce.job.queuename
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.queue.name
+ ${oozieLauncherQueueName}
+
+
+ oozie.action.sharelib.for.spark
+ ${oozieActionShareLibForSpark2}
+
+
+
+
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
-
-
-
-
+
+
+
+
+
+ eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication
+ --hdfsPath${contentPath}/db_records
+ --postgresUrl${postgresURL}
+ --postgresUser${postgresUser}
+ --postgresPassword${postgresPassword}
+
+
-
+
- ${jobTracker}
- ${nameNode}
- eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication
- -p${migrationPathStep1}/db_records
- -pgurl${postgresURL}
- -pguser${postgresUser}
- -pgpasswd${postgresPassword}
+
+
+
+ eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication
+ --hdfsPath${contentPath}/db_claims
+ --postgresUrl${postgresURL}
+ --postgresUser${postgresUser}
+ --postgresPassword${postgresPassword}
+ --actionclaims
diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml
index 03604f431..e0ee03660 100644
--- a/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml
+++ b/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.1.7-SNAPSHOT
+ 1.2.1-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-graph-provision/pom.xml b/dhp-workflows/dhp-graph-provision/pom.xml
index baac163d2..39699b3b6 100644
--- a/dhp-workflows/dhp-graph-provision/pom.xml
+++ b/dhp-workflows/dhp-graph-provision/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.1.7-SNAPSHOT
+ 1.2.1-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
index 923f6de69..298ac7589 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
@@ -405,6 +405,9 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.speculation=false
+ --conf spark.hadoop.mapreduce.map.speculative=false
+ --conf spark.hadoop.mapreduce.reduce.speculative=false
--inputPath${workingDir}/xml
--isLookupUrl ${isLookupUrl}
diff --git a/dhp-workflows/dhp-stats-update/pom.xml b/dhp-workflows/dhp-stats-update/pom.xml
index 0f5e18082..06408937b 100644
--- a/dhp-workflows/dhp-stats-update/pom.xml
+++ b/dhp-workflows/dhp-stats-update/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.1.7-SNAPSHOT
+ 1.2.1-SNAPSHOT
4.0.0
dhp-stats-update
diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step9_6.sql b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step9_6.sql
index d4ca2e10e..461f48bfc 100644
--- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step9_6.sql
+++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step9_6.sql
@@ -1,2 +1,2 @@
DROP TABLE IF EXISTS ${stats_db_name}.datasource_languages;
-CREATE TABLE ${stats_db_name}.datasource_languages AS SELECT substr(d.id, 4) as id, langs.languages as language from openaire.datasource d LATERAL VIEW explode(d.odlanguages.value) langs as languages;
+CREATE TABLE ${stats_db_name}.datasource_languages AS SELECT substr(d.id, 4) as id, langs.languages as language from ${openaire_db_name}.datasource d LATERAL VIEW explode(d.odlanguages.value) langs as languages;
diff --git a/dhp-workflows/dhp-worfklow-profiles/pom.xml b/dhp-workflows/dhp-worfklow-profiles/pom.xml
index bad72a9ef..5f99cdc8d 100644
--- a/dhp-workflows/dhp-worfklow-profiles/pom.xml
+++ b/dhp-workflows/dhp-worfklow-profiles/pom.xml
@@ -1,11 +1,9 @@
-
+
dhp-workflows
eu.dnetlib.dhp
- 1.1.7-SNAPSHOT
+ 1.2.1-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml
index 1645129b1..4b3a96aa8 100644
--- a/dhp-workflows/pom.xml
+++ b/dhp-workflows/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp
- 1.1.7-SNAPSHOT
+ 1.2.1-SNAPSHOT
../
@@ -26,6 +26,7 @@
dhp-dedup-scholexplorer
dhp-graph-provision-scholexplorer
dhp-stats-update
+ dhp-broker-events
diff --git a/pom.xml b/pom.xml
index 483873219..419de3540 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
4.0.0
eu.dnetlib.dhp
dhp
- 1.1.7-SNAPSHOT
+ 1.2.1-SNAPSHOT
pom