diff --git a/dhp-build/dhp-build-assembly-resources/pom.xml b/dhp-build/dhp-build-assembly-resources/pom.xml
index 2a89a26fd..012ff89a3 100644
--- a/dhp-build/dhp-build-assembly-resources/pom.xml
+++ b/dhp-build/dhp-build-assembly-resources/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp-build
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-build-assembly-resources
diff --git a/dhp-build/dhp-build-properties-maven-plugin/pom.xml b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
index 5be114e3c..256017e2c 100644
--- a/dhp-build/dhp-build-properties-maven-plugin/pom.xml
+++ b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp-build
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-build-properties-maven-plugin
diff --git a/dhp-build/dhp-code-style/pom.xml b/dhp-build/dhp-code-style/pom.xml
index 515ed35ce..e60e8076e 100644
--- a/dhp-build/dhp-code-style/pom.xml
+++ b/dhp-build/dhp-code-style/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp-code-style
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
jar
diff --git a/dhp-build/pom.xml b/dhp-build/pom.xml
index d2dcbc36e..12b999b9c 100644
--- a/dhp-build/pom.xml
+++ b/dhp-build/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-build
pom
diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index 0e7652dd3..0819a8bd2 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
../
diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml
index 56fb8ead2..2e5652b43 100644
--- a/dhp-schemas/pom.xml
+++ b/dhp-schemas/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
../
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
index a3c1610db..c5905e45b 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
@@ -1,6 +1,10 @@
package eu.dnetlib.dhp.schema.common;
+import java.security.Key;
+
+import eu.dnetlib.dhp.schema.oaf.DataInfo;
+import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class ModelConstants {
@@ -95,6 +99,9 @@ public class ModelConstants {
SYSIMPORT_CROSSWALK_ENTITYREGISTRY, SYSIMPORT_CROSSWALK_ENTITYREGISTRY,
DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS);
+ public static final KeyValue UNKNOWN_REPOSITORY = keyValue(
+ "10|openaire____::55045bd2a65019fd8e6741a755395c8c", "Unknown Repository");
+
private static Qualifier qualifier(
final String classid,
final String classname,
@@ -107,4 +114,12 @@ public class ModelConstants {
q.setSchemename(schemename);
return q;
}
+
+ private static KeyValue keyValue(String key, String value) {
+ KeyValue kv = new KeyValue();
+ kv.setKey(key);
+ kv.setValue(value);
+ kv.setDataInfo(new DataInfo());
+ return kv;
+ }
}
diff --git a/dhp-workflows/dhp-actionmanager/pom.xml b/dhp-workflows/dhp-actionmanager/pom.xml
index b50c6705b..0b4d25700 100644
--- a/dhp-workflows/dhp-actionmanager/pom.xml
+++ b/dhp-workflows/dhp-actionmanager/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp-workflows
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-actionmanager
diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java
index 17bfc4af3..5fa9e6723 100644
--- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java
+++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java
@@ -4,6 +4,7 @@ package eu.dnetlib.dhp.actionmanager.promote;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
+import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
@@ -20,6 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier;
@@ -134,24 +136,39 @@ public class PromoteActionPayloadForGraphTableJob {
.map(
(MapFunction) value -> OBJECT_MAPPER.readValue(value, rowClazz),
Encoders.bean(rowClazz));
-
- /*
- * return spark .read() .parquet(path) .as(Encoders.bean(rowClazz));
- */
}
private static Dataset readActionPayload(
SparkSession spark, String path, Class actionPayloadClazz) {
logger.info("Reading action payload from path: {}", path);
+
return spark
.read()
.parquet(path)
+ .map((MapFunction) value -> extractPayload(value), Encoders.STRING())
.map(
- (MapFunction) value -> OBJECT_MAPPER
- .readValue(value. getAs("payload"), actionPayloadClazz),
+ (MapFunction) value -> decodePayload(actionPayloadClazz, value),
Encoders.bean(actionPayloadClazz));
}
+ private static String extractPayload(Row value) {
+ try {
+ return value. getAs("payload");
+ } catch (IllegalArgumentException | ClassCastException e) {
+ logger.error("cannot extract payload from action: {}", value.toString());
+ throw e;
+ }
+ }
+
+ private static A decodePayload(Class actionPayloadClazz, String payload) throws IOException {
+ try {
+ return OBJECT_MAPPER.readValue(payload, actionPayloadClazz);
+ } catch (UnrecognizedPropertyException e) {
+ logger.error("error decoding payload: {}", payload);
+ throw e;
+ }
+ }
+
private static Dataset promoteActionPayloadForGraphTable(
Dataset rowDS,
Dataset actionPayloadDS,
diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml
index c04910a58..a1bc1c483 100644
--- a/dhp-workflows/dhp-aggregation/pom.xml
+++ b/dhp-workflows/dhp-aggregation/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp-workflows
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-aggregation
diff --git a/dhp-workflows/dhp-blacklist/pom.xml b/dhp-workflows/dhp-blacklist/pom.xml
index 04d334cd7..9c25f7b29 100644
--- a/dhp-workflows/dhp-blacklist/pom.xml
+++ b/dhp-workflows/dhp-blacklist/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml
index f943ac93a..424015a3c 100644
--- a/dhp-workflows/dhp-broker-events/pom.xml
+++ b/dhp-workflows/dhp-broker-events/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
4.0.0
@@ -53,7 +53,7 @@
eu.dnetlib
dnet-openaire-broker-common
- [3.0.2,4.0.0)
+ [3.0.3,4.0.0)
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
index bf4f62d24..6e38f7448 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
@@ -11,7 +11,7 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
public class EventFactory {
@@ -49,8 +49,8 @@ public class EventFactory {
private static Map createMapFromResult(final UpdateInfo> updateInfo) {
final Map map = new HashMap<>();
- final OpenaireBrokerResult source = updateInfo.getSource();
- final OpenaireBrokerResult target = updateInfo.getTarget();
+ final OaBrokerMainEntity source = updateInfo.getSource();
+ final OaBrokerMainEntity target = updateInfo.getTarget();
map.put("target_datasource_id", target.getCollectedFromId());
map.put("target_datasource_name", target.getCollectedFromName());
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
deleted file mode 100644
index 62171ac61..000000000
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
+++ /dev/null
@@ -1,229 +0,0 @@
-
-package eu.dnetlib.dhp.broker.oa;
-
-import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
-
-import java.util.Optional;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.TypedColumn;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.broker.model.Event;
-import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
-import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
-import eu.dnetlib.dhp.broker.oa.util.EventFinder;
-import eu.dnetlib.dhp.broker.oa.util.EventGroup;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
-import eu.dnetlib.dhp.common.HdfsSupport;
-import eu.dnetlib.dhp.schema.oaf.Project;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.oaf.Result;
-import eu.dnetlib.dhp.utils.ISLookupClientFactory;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
-import eu.dnetlib.pace.config.DedupConfig;
-import scala.Tuple2;
-
-public class GenerateEventsApplication {
-
- private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class);
-
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
- public static void main(final String[] args) throws Exception {
- final ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils
- .toString(
- GenerateEventsApplication.class
- .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json")));
- parser.parseArgument(args);
-
- final Boolean isSparkSessionManaged = Optional
- .ofNullable(parser.get("isSparkSessionManaged"))
- .map(Boolean::valueOf)
- .orElse(Boolean.TRUE);
- log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
-
- final String graphPath = parser.get("graphPath");
- log.info("graphPath: {}", graphPath);
-
- final String eventsPath = parser.get("eventsPath");
- log.info("eventsPath: {}", eventsPath);
-
- final String isLookupUrl = parser.get("isLookupUrl");
- log.info("isLookupUrl: {}", isLookupUrl);
-
- final String dedupConfigProfileId = parser.get("dedupConfProfile");
- log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
-
- final SparkConf conf = new SparkConf();
- // conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- // conf.registerKryoClasses(BrokerConstants.getModelClasses());
-
- // TODO UNCOMMENT
- // final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
- final DedupConfig dedupConfig = null;
-
- runWithSparkSession(conf, isSparkSessionManaged, spark -> {
-
- removeOutputDir(spark, eventsPath);
-
- // TODO REMOVE THIS
- final Dataset projects = readPath(spark, graphPath + "/project", Project.class);
- final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class)
- .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
- .cache();
- relatedEntities(projects, rels, RelatedProject.class)
- .write()
- .mode(SaveMode.Overwrite)
- .json(eventsPath);
-
- // TODO UNCOMMENT THIS
- // spark
- // .emptyDataset(Encoders.bean(Event.class))
- // .union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
- // .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
- // .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
- // .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
- // .write()
- // .mode(SaveMode.Overwrite)
- // .option("compression", "gzip")
- // .json(eventsPath);
- });
-
- }
-
- private static void removeOutputDir(final SparkSession spark, final String path) {
- HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
- }
-
- private static Dataset generateEvents(
- final SparkSession spark,
- final String graphPath,
- final Class sourceClass,
- final DedupConfig dedupConfig) {
-
- final Dataset results = expandResultsWithRelations(spark, graphPath, sourceClass);
-
- final Dataset mergedRels = readPath(spark, graphPath + "/relation", Relation.class)
- .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
-
- final TypedColumn, ResultGroup> aggr = new ResultAggregator()
- .toColumn();
-
- return results
- .joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner")
- .groupByKey(
- (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING())
- .agg(aggr)
- .map((MapFunction, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class))
- .filter(rg -> rg.getData().size() > 1)
- .map(
- (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig),
- Encoders.bean(EventGroup.class))
- .flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class));
- }
-
- private static Dataset expandResultsWithRelations(
- final SparkSession spark,
- final String graphPath,
- final Class sourceClass) {
-
- final Dataset projects = readPath(spark, graphPath + "/project", Project.class);
- // final Dataset datasets = readPath(
- // spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
- // final Dataset softwares = readPath(spark, graphPath + "/software", Software.class);
- // final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class);
-
- final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class)
- .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
- .cache();
-
- final Dataset r0 = readPath(
- spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
- .filter(r -> r.getDataInfo().getDeletedbyinference())
- .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class));
-
- // TODO UNCOMMENT THIS
- final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
- // final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels,
- // RelatedSoftware.class));
- // final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels,
- // RelatedDataset.class));
- // final Dataset r4 = join(r3, rels, relatedEntities(publications, rels,
- // RelatedPublication.class));;
-
- return r0; // TODO it should be r4
- }
-
- private static Dataset