diff --git a/dhp-build/dhp-build-assembly-resources/pom.xml b/dhp-build/dhp-build-assembly-resources/pom.xml
index 2a89a26fd3..012ff89a36 100644
--- a/dhp-build/dhp-build-assembly-resources/pom.xml
+++ b/dhp-build/dhp-build-assembly-resources/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp-build
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-build-assembly-resources
diff --git a/dhp-build/dhp-build-properties-maven-plugin/pom.xml b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
index 5be114e3c6..256017e2c7 100644
--- a/dhp-build/dhp-build-properties-maven-plugin/pom.xml
+++ b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp-build
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-build-properties-maven-plugin
diff --git a/dhp-build/dhp-code-style/pom.xml b/dhp-build/dhp-code-style/pom.xml
index 515ed35cee..e60e8076ef 100644
--- a/dhp-build/dhp-code-style/pom.xml
+++ b/dhp-build/dhp-code-style/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp-code-style
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
jar
diff --git a/dhp-build/pom.xml b/dhp-build/pom.xml
index d2dcbc36e6..12b999b9c8 100644
--- a/dhp-build/pom.xml
+++ b/dhp-build/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-build
pom
diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index 0e7652dd3e..0819a8bd20 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
../
diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml
index 56fb8ead2a..2e5652b430 100644
--- a/dhp-schemas/pom.xml
+++ b/dhp-schemas/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
../
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
index a3c1610db9..c5905e45b3 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
@@ -1,6 +1,10 @@
package eu.dnetlib.dhp.schema.common;
+import java.security.Key;
+
+import eu.dnetlib.dhp.schema.oaf.DataInfo;
+import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class ModelConstants {
@@ -95,6 +99,9 @@ public class ModelConstants {
SYSIMPORT_CROSSWALK_ENTITYREGISTRY, SYSIMPORT_CROSSWALK_ENTITYREGISTRY,
DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS);
+ public static final KeyValue UNKNOWN_REPOSITORY = keyValue(
+ "10|openaire____::55045bd2a65019fd8e6741a755395c8c", "Unknown Repository");
+
private static Qualifier qualifier(
final String classid,
final String classname,
@@ -107,4 +114,12 @@ public class ModelConstants {
q.setSchemename(schemename);
return q;
}
+
+ private static KeyValue keyValue(String key, String value) {
+ KeyValue kv = new KeyValue();
+ kv.setKey(key);
+ kv.setValue(value);
+ kv.setDataInfo(new DataInfo());
+ return kv;
+ }
}
diff --git a/dhp-workflows/dhp-actionmanager/pom.xml b/dhp-workflows/dhp-actionmanager/pom.xml
index b50c6705b2..0b4d25700b 100644
--- a/dhp-workflows/dhp-actionmanager/pom.xml
+++ b/dhp-workflows/dhp-actionmanager/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp-workflows
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-actionmanager
diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java
index 17bfc4af36..5fa9e67235 100644
--- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java
+++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java
@@ -4,6 +4,7 @@ package eu.dnetlib.dhp.actionmanager.promote;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
+import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
@@ -20,6 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier;
@@ -134,24 +136,39 @@ public class PromoteActionPayloadForGraphTableJob {
.map(
(MapFunction) value -> OBJECT_MAPPER.readValue(value, rowClazz),
Encoders.bean(rowClazz));
-
- /*
- * return spark .read() .parquet(path) .as(Encoders.bean(rowClazz));
- */
}
private static Dataset readActionPayload(
SparkSession spark, String path, Class actionPayloadClazz) {
logger.info("Reading action payload from path: {}", path);
+
return spark
.read()
.parquet(path)
+ .map((MapFunction) value -> extractPayload(value), Encoders.STRING())
.map(
- (MapFunction) value -> OBJECT_MAPPER
- .readValue(value. getAs("payload"), actionPayloadClazz),
+ (MapFunction) value -> decodePayload(actionPayloadClazz, value),
Encoders.bean(actionPayloadClazz));
}
+ private static String extractPayload(Row value) {
+ try {
+ return value. getAs("payload");
+ } catch (IllegalArgumentException | ClassCastException e) {
+ logger.error("cannot extract payload from action: {}", value.toString());
+ throw e;
+ }
+ }
+
+ private static A decodePayload(Class actionPayloadClazz, String payload) throws IOException {
+ try {
+ return OBJECT_MAPPER.readValue(payload, actionPayloadClazz);
+ } catch (UnrecognizedPropertyException e) {
+ logger.error("error decoding payload: {}", payload);
+ throw e;
+ }
+ }
+
private static Dataset promoteActionPayloadForGraphTable(
Dataset rowDS,
Dataset actionPayloadDS,
diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml
index c04910a583..a1bc1c483d 100644
--- a/dhp-workflows/dhp-aggregation/pom.xml
+++ b/dhp-workflows/dhp-aggregation/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp-workflows
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-aggregation
diff --git a/dhp-workflows/dhp-blacklist/pom.xml b/dhp-workflows/dhp-blacklist/pom.xml
index 04d334cd75..9c25f7b293 100644
--- a/dhp-workflows/dhp-blacklist/pom.xml
+++ b/dhp-workflows/dhp-blacklist/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml
index cd3257991f..424015a3c6 100644
--- a/dhp-workflows/dhp-broker-events/pom.xml
+++ b/dhp-workflows/dhp-broker-events/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
4.0.0
@@ -53,7 +53,7 @@
eu.dnetlib
dnet-openaire-broker-common
- [3.0.1,4.0.0)
+ [3.0.3,4.0.0)
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java
index f94d286e4f..18950d98e5 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java
@@ -2,7 +2,6 @@
package eu.dnetlib.dhp.broker.model;
import java.io.Serializable;
-import java.util.Map;
public class Event implements Serializable {
@@ -25,7 +24,7 @@ public class Event implements Serializable {
private boolean instantMessage;
- private Map map;
+ private MappedFields map;
public Event() {
}
@@ -33,7 +32,7 @@ public class Event implements Serializable {
public Event(final String producerId, final String eventId, final String topic, final String payload,
final Long creationDate, final Long expiryDate,
final boolean instantMessage,
- final Map map) {
+ final MappedFields map) {
this.producerId = producerId;
this.eventId = eventId;
this.topic = topic;
@@ -100,11 +99,11 @@ public class Event implements Serializable {
this.instantMessage = instantMessage;
}
- public Map getMap() {
+ public MappedFields getMap() {
return this.map;
}
- public void setMap(final Map map) {
+ public void setMap(final MappedFields map) {
this.map = map;
}
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
index bf4f62d243..315a054d38 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
@@ -3,15 +3,14 @@ package eu.dnetlib.dhp.broker.model;
import java.text.ParseException;
import java.util.Date;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
public class EventFactory {
@@ -30,10 +29,10 @@ public class EventFactory {
final Event res = new Event();
- final Map map = createMapFromResult(updateInfo);
+ final MappedFields map = createMapFromResult(updateInfo);
final String eventId = calculateEventId(
- updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId(), updateInfo.getHighlightValueAsString());
+ updateInfo.getTopicPath(), updateInfo.getTarget().getOpenaireId(), updateInfo.getHighlightValueAsString());
res.setEventId(eventId);
res.setProducerId(PRODUCER_ID);
@@ -46,35 +45,35 @@ public class EventFactory {
return res;
}
- private static Map createMapFromResult(final UpdateInfo> updateInfo) {
- final Map map = new HashMap<>();
+ private static MappedFields createMapFromResult(final UpdateInfo> updateInfo) {
+ final MappedFields map = new MappedFields();
- final OpenaireBrokerResult source = updateInfo.getSource();
- final OpenaireBrokerResult target = updateInfo.getTarget();
+ final OaBrokerMainEntity source = updateInfo.getSource();
+ final OaBrokerMainEntity target = updateInfo.getTarget();
- map.put("target_datasource_id", target.getCollectedFromId());
- map.put("target_datasource_name", target.getCollectedFromName());
+ map.setTargetDatasourceId(target.getCollectedFromId());
+ map.setTargetDatasourceName(target.getCollectedFromName());
- map.put("target_publication_id", target.getOriginalId());
+ map.setTargetResultId(target.getOpenaireId());
final List titles = target.getTitles();
if (titles.size() > 0) {
- map.put("target_publication_title", titles.get(0));
+ map.setTargetResultTitle(titles.get(0));
}
final long date = parseDateTolong(target.getPublicationdate());
if (date > 0) {
- map.put("target_dateofacceptance", date);
+ map.setTargetDateofacceptance(date);
}
- map.put("target_publication_subject_list", target.getSubjects());
- map.put("target_publication_author_list", target.getCreators());
+ map.setTargetSubjects(target.getSubjects().stream().map(s -> s.getValue()).collect(Collectors.toList()));
+ map.setTargetAuthors(target.getCreators().stream().map(a -> a.getFullname()).collect(Collectors.toList()));
// PROVENANCE INFO
- map.put("trust", updateInfo.getTrust());
- map.put("provenance_datasource_id", source.getCollectedFromId());
- map.put("provenance_datasource_name", source.getCollectedFromName());
- map.put("provenance_publication_id_list", source.getOriginalId());
+ map.setTrust(updateInfo.getTrust());
+ map.setProvenanceDatasourceId(source.getCollectedFromId());
+ map.setProvenanceDatasourceName(source.getCollectedFromName());
+ map.setProvenanceResultId(source.getOpenaireId());
return map;
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MappedFields.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MappedFields.java
new file mode 100644
index 0000000000..4b0ed171b1
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MappedFields.java
@@ -0,0 +1,114 @@
+
+package eu.dnetlib.dhp.broker.model;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class MappedFields implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -7999704113195802008L;
+
+ private String targetDatasourceId;
+ private String targetDatasourceName;
+ private String targetResultId;
+ private String targetResultTitle;
+ private long targetDateofacceptance;
+ private List targetSubjects;
+ private List targetAuthors;
+ private float trust;
+ private String provenanceDatasourceId;
+ private String provenanceDatasourceName;
+ private String provenanceResultId;
+
+ public String getTargetDatasourceId() {
+ return targetDatasourceId;
+ }
+
+ public void setTargetDatasourceId(final String targetDatasourceId) {
+ this.targetDatasourceId = targetDatasourceId;
+ }
+
+ public String getTargetDatasourceName() {
+ return targetDatasourceName;
+ }
+
+ public void setTargetDatasourceName(final String targetDatasourceName) {
+ this.targetDatasourceName = targetDatasourceName;
+ }
+
+ public String getTargetResultId() {
+ return targetResultId;
+ }
+
+ public void setTargetResultId(final String targetResultId) {
+ this.targetResultId = targetResultId;
+ }
+
+ public String getTargetResultTitle() {
+ return targetResultTitle;
+ }
+
+ public void setTargetResultTitle(final String targetResultTitle) {
+ this.targetResultTitle = targetResultTitle;
+ }
+
+ public long getTargetDateofacceptance() {
+ return targetDateofacceptance;
+ }
+
+ public void setTargetDateofacceptance(final long targetDateofacceptance) {
+ this.targetDateofacceptance = targetDateofacceptance;
+ }
+
+ public List getTargetSubjects() {
+ return targetSubjects;
+ }
+
+ public void setTargetSubjects(final List targetSubjects) {
+ this.targetSubjects = targetSubjects;
+ }
+
+ public List getTargetAuthors() {
+ return targetAuthors;
+ }
+
+ public void setTargetAuthors(final List targetAuthors) {
+ this.targetAuthors = targetAuthors;
+ }
+
+ public float getTrust() {
+ return trust;
+ }
+
+ public void setTrust(final float trust) {
+ this.trust = trust;
+ }
+
+ public String getProvenanceDatasourceId() {
+ return provenanceDatasourceId;
+ }
+
+ public void setProvenanceDatasourceId(final String provenanceDatasourceId) {
+ this.provenanceDatasourceId = provenanceDatasourceId;
+ }
+
+ public String getProvenanceDatasourceName() {
+ return provenanceDatasourceName;
+ }
+
+ public void setProvenanceDatasourceName(final String provenanceDatasourceName) {
+ this.provenanceDatasourceName = provenanceDatasourceName;
+ }
+
+ public String getProvenanceResultId() {
+ return provenanceResultId;
+ }
+
+ public void setProvenanceResultId(final String provenanceResultId) {
+ this.provenanceResultId = provenanceResultId;
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
deleted file mode 100644
index 3357710f09..0000000000
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
+++ /dev/null
@@ -1,228 +0,0 @@
-
-package eu.dnetlib.dhp.broker.oa;
-
-import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
-
-import java.util.Optional;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.TypedColumn;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.broker.model.Event;
-import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
-import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
-import eu.dnetlib.dhp.broker.oa.util.EventFinder;
-import eu.dnetlib.dhp.broker.oa.util.EventGroup;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
-import eu.dnetlib.dhp.common.HdfsSupport;
-import eu.dnetlib.dhp.schema.oaf.Project;
-import eu.dnetlib.dhp.schema.oaf.Publication;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.oaf.Result;
-import eu.dnetlib.dhp.schema.oaf.Software;
-import eu.dnetlib.dhp.utils.ISLookupClientFactory;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
-import eu.dnetlib.pace.config.DedupConfig;
-import scala.Tuple2;
-
-public class GenerateEventsApplication {
-
- private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class);
-
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
- public static void main(final String[] args) throws Exception {
- final ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils
- .toString(
- GenerateEventsApplication.class
- .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json")));
- parser.parseArgument(args);
-
- final Boolean isSparkSessionManaged = Optional
- .ofNullable(parser.get("isSparkSessionManaged"))
- .map(Boolean::valueOf)
- .orElse(Boolean.TRUE);
- log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
-
- final String graphPath = parser.get("graphPath");
- log.info("graphPath: {}", graphPath);
-
- final String eventsPath = parser.get("eventsPath");
- log.info("eventsPath: {}", eventsPath);
-
- final String isLookupUrl = parser.get("isLookupUrl");
- log.info("isLookupUrl: {}", isLookupUrl);
-
- final String dedupConfigProfileId = parser.get("dedupConfProfile");
- log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
-
- final SparkConf conf = new SparkConf();
- // conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- // conf.registerKryoClasses(BrokerConstants.getModelClasses());
-
- // TODO UNCOMMENT
- // final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
- final DedupConfig dedupConfig = null;
-
- runWithSparkSession(conf, isSparkSessionManaged, spark -> {
-
- removeOutputDir(spark, eventsPath);
-
- // TODO REMOVE THIS
- expandResultsWithRelations(spark, graphPath, Publication.class)
- .write()
- .mode(SaveMode.Overwrite)
- .json(eventsPath);
-
- // TODO UNCOMMENT THIS
- // spark
- // .emptyDataset(Encoders.bean(Event.class))
- // .union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
- // .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
- // .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
- // .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
- // .write()
- // .mode(SaveMode.Overwrite)
- // .option("compression", "gzip")
- // .json(eventsPath);
- });
-
- }
-
- private static void removeOutputDir(final SparkSession spark, final String path) {
- HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
- }
-
- private static Dataset generateEvents(
- final SparkSession spark,
- final String graphPath,
- final Class sourceClass,
- final DedupConfig dedupConfig) {
-
- final Dataset results = expandResultsWithRelations(spark, graphPath, sourceClass);
-
- final Dataset mergedRels = readPath(spark, graphPath + "/relation", Relation.class)
- .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
-
- final TypedColumn, ResultGroup> aggr = new ResultAggregator()
- .toColumn();
-
- return results
- .joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner")
- .groupByKey(
- (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING())
- .agg(aggr)
- .map((MapFunction, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class))
- .filter(ResultGroup::isValid)
- .map(
- (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig),
- Encoders.bean(EventGroup.class))
- .flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class));
- }
-
- private static Dataset expandResultsWithRelations(
- final SparkSession spark,
- final String graphPath,
- final Class sourceClass) {
-
- final Dataset projects = readPath(spark, graphPath + "/project", Project.class);
- final Dataset datasets = readPath(
- spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
- final Dataset softwares = readPath(spark, graphPath + "/software", Software.class);
- final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class);
-
- final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class)
- .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
- .cache();
-
- final Dataset r0 = readPath(
- spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
- .filter(r -> r.getDataInfo().getDeletedbyinference())
- .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class));
-
- // TODO UNCOMMENT THIS
- // final Dataset r1 = join(r0, rels, relatedEntities(projects, rels,
- // RelatedProject.class));
- // final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels,
- // RelatedSoftware.class));
- // final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels,
- // RelatedDataset.class));
- // final Dataset r4 = join(r3, rels, relatedEntities(publications, rels,
- // RelatedPublication.class));;
-
- return r0; // TODO it should be r4
- }
-
- private static Dataset