diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml
index f943ac93aa..8d7d3b88c5 100644
--- a/dhp-workflows/dhp-broker-events/pom.xml
+++ b/dhp-workflows/dhp-broker-events/pom.xml
@@ -53,7 +53,7 @@
eu.dnetlib
dnet-openaire-broker-common
- [3.0.2,4.0.0)
+ [3.0.3,4.0.0)
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
index bf4f62d243..6e38f74489 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
@@ -11,7 +11,7 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
public class EventFactory {
@@ -49,8 +49,8 @@ public class EventFactory {
private static Map createMapFromResult(final UpdateInfo> updateInfo) {
final Map map = new HashMap<>();
- final OpenaireBrokerResult source = updateInfo.getSource();
- final OpenaireBrokerResult target = updateInfo.getTarget();
+ final OaBrokerMainEntity source = updateInfo.getSource();
+ final OaBrokerMainEntity target = updateInfo.getTarget();
map.put("target_datasource_id", target.getCollectedFromId());
map.put("target_datasource_name", target.getCollectedFromName());
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
index f15d918c96..db59920106 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
@@ -18,27 +18,20 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
+import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.EventFinder;
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
-import eu.dnetlib.dhp.common.HdfsSupport;
-import eu.dnetlib.dhp.schema.common.ModelConstants;
-import eu.dnetlib.dhp.schema.oaf.Project;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OaBrokerMainEntityAggregator;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
-import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
@@ -48,8 +41,6 @@ public class GenerateEventsApplication {
private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class);
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
@@ -86,11 +77,11 @@ public class GenerateEventsApplication {
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
- removeOutputDir(spark, eventsPath);
+ ClusterUtils.removeDir(spark, eventsPath);
// TODO REMOVE THIS
- relatedProjects(spark, graphPath)
+ expandResultsWithRelations(spark, graphPath, Publication.class)
.write()
.mode(SaveMode.Overwrite)
.json(eventsPath);
@@ -110,28 +101,25 @@ public class GenerateEventsApplication {
}
- private static void removeOutputDir(final SparkSession spark, final String path) {
- HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
- }
-
private static Dataset generateEvents(
final SparkSession spark,
final String graphPath,
final Class sourceClass,
final DedupConfig dedupConfig) {
- final Dataset results = expandResultsWithRelations(spark, graphPath, sourceClass);
+ final Dataset results = expandResultsWithRelations(spark, graphPath, sourceClass);
- final Dataset mergedRels = readPath(spark, graphPath + "/relation", Relation.class)
+ final Dataset mergedRels = ClusterUtils
+ .readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
- final TypedColumn, ResultGroup> aggr = new ResultAggregator()
+ final TypedColumn, ResultGroup> aggr = new ResultAggregator()
.toColumn();
return results
.joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner")
.groupByKey(
- (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING())
+ (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING())
.agg(aggr)
.map((MapFunction, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class))
.filter(rg -> rg.getData().size() > 1)
@@ -141,7 +129,7 @@ public class GenerateEventsApplication {
.flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class));
}
- private static Dataset expandResultsWithRelations(
+ private static Dataset expandResultsWithRelations(
final SparkSession spark,
final String graphPath,
final Class sourceClass) {
@@ -151,116 +139,35 @@ public class GenerateEventsApplication {
// final Dataset softwares = readPath(spark, graphPath + "/software", Software.class);
// final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class);
- final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class)
- .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
- .cache();
-
- final Dataset r0 = readPath(
- spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
- .filter(r -> r.getDataInfo().getDeletedbyinference())
- .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class));
+ final Dataset r0 = ClusterUtils
+ .readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
+ .filter(r -> r.getDataInfo().getDeletedbyinference())
+ .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OaBrokerMainEntity.class));
// TODO UNCOMMENT THIS
- final Dataset r1 = join(r0, rels, relatedProjects(spark, graphPath));
- // final Dataset r2 = join(r1, rels, relatedDataset(spark, graphPath));
- // final Dataset r3 = join(r2, rels, relatedPublications(spark, graphPath));
- // final Dataset r4 = join(r3, rels, relatedSoftwares(spark, graphPath));
+ // final Dataset r1 = join(r0, relatedProjects(spark, graphPath));
+ // final Dataset r2 = join(r1, relatedDataset(spark, graphPath));
+ // final Dataset r3 = join(r2, relatedPublications(spark, graphPath));
+ // final Dataset r4 = join(r3, relatedSoftwares(spark, graphPath));
- return r1; // TODO it should be r4
+ return r0; // TODO it should be r4
}
- private static Dataset relatedProjects(final SparkSession spark, final String graphPath) {
-
- final Dataset projects = readPath(spark, graphPath + "/project", Project.class);
-
- final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class)
- .filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT));
-
- return rels
- .joinWith(projects, projects.col("id").equalTo(rels.col("target")), "inner")
- .map(
- t -> new RelatedProject(
- t._1.getSource(),
- t._1.getRelType(),
- ConversionUtils.oafProjectToBrokerProject(t._2)),
- Encoders.bean(RelatedProject.class));
- }
-
- private static Dataset relatedDataset(final SparkSession spark, final String graphPath) {
-
- final Dataset datasets = readPath(
- spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
-
- final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class);
-
- return rels
- .joinWith(datasets, datasets.col("id").equalTo(rels.col("target")), "inner")
- .map(
- t -> new RelatedDataset(
- t._1.getSource(),
- t._1.getRelType(),
- ConversionUtils.oafDatasetToBrokerDataset(t._2)),
- Encoders.bean(RelatedDataset.class));
- }
-
- private static Dataset relatedSoftwares(final SparkSession spark, final String graphPath) {
-
- final Dataset softwares = readPath(spark, graphPath + "/software", Software.class);
-
- final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class);
-
- return rels
- .joinWith(softwares, softwares.col("id").equalTo(rels.col("target")), "inner")
- .map(
- t -> new RelatedSoftware(
- t._1.getSource(),
- t._1.getRelType(),
- ConversionUtils.oafSoftwareToBrokerSoftware(t._2)),
- Encoders.bean(RelatedSoftware.class));
- }
-
- private static Dataset relatedPublications(final SparkSession spark, final String graphPath) {
-
- final Dataset pubs = readPath(spark, graphPath + "/publication", Publication.class);
-
- final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class);
-
- return rels
- .joinWith(pubs, pubs.col("id").equalTo(rels.col("target")), "inner")
- .map(
- t -> new RelatedPublication(
- t._1.getSource(),
- t._1.getRelType(),
- ConversionUtils.oafPublicationToBrokerPublication(t._2)),
- Encoders.bean(RelatedPublication.class));
- }
-
- private static Dataset join(final Dataset sources,
- final Dataset rels,
+ private static Dataset join(final Dataset sources,
final Dataset typedRels) {
- final TypedColumn, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator()
+ final TypedColumn, OaBrokerMainEntity> aggr = new OaBrokerMainEntityAggregator()
.toColumn();
return sources
- .joinWith(typedRels, sources.col("openaireId").equalTo(rels.col("source")), "left_outer")
+ .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey(
- (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
+ (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
.agg(aggr)
- .map(t -> t._2, Encoders.bean(OpenaireBrokerResult.class));
+ .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
}
- public static Dataset readPath(
- final SparkSession spark,
- final String inputPath,
- final Class clazz) {
- return spark
- .read()
- .textFile(inputPath)
- .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
- }
-
private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedDatasets.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedDatasets.java
new file mode 100644
index 0000000000..4a10fbabf1
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedDatasets.java
@@ -0,0 +1,73 @@
+
+package eu.dnetlib.dhp.broker.oa;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+
+public class GenerateRelatedDatasets {
+
+ private static final Logger log = LoggerFactory.getLogger(GenerateRelatedDatasets.class);
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ GenerateRelatedDatasets.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json")));
+ parser.parseArgument(args);
+
+ final Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String graphPath = parser.get("graphPath");
+ log.info("graphPath: {}", graphPath);
+
+ final String relsPath = parser.get("relsPath");
+ log.info("relsPath: {}", relsPath);
+
+ final SparkConf conf = new SparkConf();
+
+ runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+
+ ClusterUtils.removeDir(spark, relsPath);
+
+ final Dataset datasets = ClusterUtils
+ .readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
+
+ final Dataset rels = ClusterUtils.readPath(spark, graphPath + "/relation", Relation.class);
+
+ rels
+ .joinWith(datasets, datasets.col("id").equalTo(rels.col("target")), "inner")
+ .map(
+ t -> new RelatedDataset(
+ t._1.getSource(),
+ t._1.getRelType(),
+ ConversionUtils.oafDatasetToBrokerDataset(t._2)),
+ Encoders.bean(RelatedDataset.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .json(relsPath);
+
+ });
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedProjects.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedProjects.java
new file mode 100644
index 0000000000..59ed388e7c
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedProjects.java
@@ -0,0 +1,79 @@
+
+package eu.dnetlib.dhp.broker.oa;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.oaf.Project;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+
+public class GenerateRelatedProjects {
+
+ private static final Logger log = LoggerFactory.getLogger(GenerateRelatedProjects.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ GenerateRelatedProjects.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json")));
+ parser.parseArgument(args);
+
+ final Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String graphPath = parser.get("graphPath");
+ log.info("graphPath: {}", graphPath);
+
+ final String relsPath = parser.get("relsPath");
+ log.info("relsPath: {}", relsPath);
+
+ final SparkConf conf = new SparkConf();
+
+ runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+
+ ClusterUtils.removeDir(spark, relsPath);
+
+ final Dataset projects = ClusterUtils.readPath(spark, graphPath + "/project", Project.class);
+
+ final Dataset rels = ClusterUtils
+ .readPath(spark, graphPath + "/relation", Relation.class)
+ .filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT));
+
+ rels
+ .joinWith(projects, projects.col("id").equalTo(rels.col("target")), "inner")
+ .map(
+ t -> new RelatedProject(
+ t._1.getSource(),
+ t._1.getRelType(),
+ ConversionUtils.oafProjectToBrokerProject(t._2)),
+ Encoders.bean(RelatedProject.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .json(relsPath);
+ });
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedPublications.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedPublications.java
new file mode 100644
index 0000000000..0c20081dc8
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedPublications.java
@@ -0,0 +1,78 @@
+
+package eu.dnetlib.dhp.broker.oa;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
+import eu.dnetlib.dhp.schema.oaf.Publication;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+
+public class GenerateRelatedPublications {
+
+ private static final Logger log = LoggerFactory.getLogger(GenerateRelatedPublications.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ GenerateRelatedPublications.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json")));
+ parser.parseArgument(args);
+
+ final Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String graphPath = parser.get("graphPath");
+ log.info("graphPath: {}", graphPath);
+
+ final String relsPath = parser.get("relsPath");
+ log.info("relsPath: {}", relsPath);
+
+ final SparkConf conf = new SparkConf();
+
+ runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+
+ ClusterUtils.removeDir(spark, relsPath);
+
+ final Dataset pubs = ClusterUtils
+ .readPath(spark, graphPath + "/publication", Publication.class);
+
+ final Dataset rels = ClusterUtils.readPath(spark, graphPath + "/relation", Relation.class);
+
+ rels
+ .joinWith(pubs, pubs.col("id").equalTo(rels.col("target")), "inner")
+ .map(
+ t -> new RelatedPublication(
+ t._1.getSource(),
+ t._1.getRelType(),
+ ConversionUtils.oafPublicationToBrokerPublication(t._2)),
+ Encoders.bean(RelatedPublication.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .json(relsPath);
+
+ });
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedSoftwares.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedSoftwares.java
new file mode 100644
index 0000000000..b957888463
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedSoftwares.java
@@ -0,0 +1,76 @@
+
+package eu.dnetlib.dhp.broker.oa;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import eu.dnetlib.dhp.schema.oaf.Software;
+
+public class GenerateRelatedSoftwares {
+
+ private static final Logger log = LoggerFactory.getLogger(GenerateRelatedSoftwares.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ GenerateRelatedSoftwares.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json")));
+ parser.parseArgument(args);
+
+ final Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String graphPath = parser.get("graphPath");
+ log.info("graphPath: {}", graphPath);
+
+ final String relsPath = parser.get("relsPath");
+ log.info("relsPath: {}", relsPath);
+
+ final SparkConf conf = new SparkConf();
+
+ runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+
+ ClusterUtils.removeDir(spark, relsPath);
+ final Dataset softwares = ClusterUtils.readPath(spark, graphPath + "/software", Software.class);
+
+ final Dataset rels = ClusterUtils.readPath(spark, graphPath + "/relation", Relation.class);
+
+ rels
+ .joinWith(softwares, softwares.col("id").equalTo(rels.col("target")), "inner")
+ .map(
+ t -> new RelatedSoftware(
+ t._1.getSource(),
+ t._1.getRelType(),
+ ConversionUtils.oafSoftwareToBrokerSoftware(t._2)),
+ Encoders.bean(RelatedSoftware.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .json(relsPath);
+
+ });
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateSimpleEntitities.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateSimpleEntitities.java
new file mode 100644
index 0000000000..59485d5cf0
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateSimpleEntitities.java
@@ -0,0 +1,85 @@
+
+package eu.dnetlib.dhp.broker.oa;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
+import eu.dnetlib.dhp.schema.oaf.Publication;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class GenerateSimpleEntitities {
+
+ private static final Logger log = LoggerFactory.getLogger(GenerateSimpleEntitities.class);
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ GenerateSimpleEntitities.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json")));
+ parser.parseArgument(args);
+
+ final Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String graphPath = parser.get("graphPath");
+ log.info("graphPath: {}", graphPath);
+
+ final String simpleEntitiesPath = parser.get("simpleEntitiesPath");
+ log.info("simpleEntitiesPath: {}", simpleEntitiesPath);
+
+ final SparkConf conf = new SparkConf();
+
+ runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+
+ ClusterUtils.removeDir(spark, simpleEntitiesPath);
+
+ expandResultsWithRelations(spark, graphPath, Publication.class)
+ .write()
+ .mode(SaveMode.Overwrite)
+ .json(simpleEntitiesPath);
+
+ // TODO UNCOMMENT THIS
+ // spark
+ // .emptyDataset(Encoders.bean(Event.class))
+ // .union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
+ // .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
+ // .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
+ // .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
+ // .write()
+ // .mode(SaveMode.Overwrite)
+ // .option("compression", "gzip")
+ // .json(eventsPath);
+ });
+
+ }
+
+ private static Dataset expandResultsWithRelations(
+ final SparkSession spark,
+ final String graphPath,
+ final Class sourceClass) {
+
+ return ClusterUtils
+ .readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
+ .filter(r -> r.getDataInfo().getDeletedbyinference())
+ .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OaBrokerMainEntity.class));
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java
index 13aeefb2fc..9aa6f53847 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java
@@ -12,7 +12,7 @@ import java.util.function.Function;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.pace.config.DedupConfig;
@@ -21,11 +21,11 @@ public abstract class UpdateMatcher {
private final boolean multipleUpdate;
private final Function topicFunction;
- private final BiConsumer compileHighlightFunction;
+ private final BiConsumer compileHighlightFunction;
private final Function highlightToStringFunction;
public UpdateMatcher(final boolean multipleUpdate, final Function topicFunction,
- final BiConsumer compileHighlightFunction,
+ final BiConsumer compileHighlightFunction,
final Function highlightToStringFunction) {
this.multipleUpdate = multipleUpdate;
this.topicFunction = topicFunction;
@@ -33,13 +33,13 @@ public abstract class UpdateMatcher {
this.highlightToStringFunction = highlightToStringFunction;
}
- public Collection> searchUpdatesForRecord(final OpenaireBrokerResult res,
- final Collection others,
+ public Collection> searchUpdatesForRecord(final OaBrokerMainEntity res,
+ final Collection others,
final DedupConfig dedupConfig) {
final Map> infoMap = new HashMap<>();
- for (final OpenaireBrokerResult source : others) {
+ for (final OaBrokerMainEntity source : others) {
if (source != res) {
for (final T hl : findDifferences(source, res)) {
final Topic topic = getTopicFunction().apply(hl);
@@ -68,7 +68,7 @@ public abstract class UpdateMatcher {
}
}
- protected abstract List findDifferences(OpenaireBrokerResult source, OpenaireBrokerResult target);
+ protected abstract List findDifferences(OaBrokerMainEntity source, OaBrokerMainEntity target);
protected static boolean isMissing(final List list) {
return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0));
@@ -86,7 +86,7 @@ public abstract class UpdateMatcher {
return topicFunction;
}
- public BiConsumer getCompileHighlightFunction() {
+ public BiConsumer getCompileHighlightFunction() {
return compileHighlightFunction;
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java
index 7a58f986ba..c197734a3b 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java
@@ -5,13 +5,12 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
-import eu.dnetlib.broker.objects.Dataset;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.broker.objects.OaBrokerRelatedDataset;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-public abstract class AbstractEnrichMissingDataset
- extends UpdateMatcher {
+public abstract class AbstractEnrichMissingDataset extends UpdateMatcher {
public AbstractEnrichMissingDataset(final Topic topic) {
super(true,
@@ -23,14 +22,14 @@ public abstract class AbstractEnrichMissingDataset
protected abstract boolean filterByType(String relType);
@Override
- protected final List findDifferences(final OpenaireBrokerResult source,
- final OpenaireBrokerResult target) {
+ protected final List findDifferences(final OaBrokerMainEntity source,
+ final OaBrokerMainEntity target) {
final Set existingDatasets = target
.getDatasets()
.stream()
.filter(rel -> filterByType(rel.getRelType()))
- .map(Dataset::getOriginalId)
+ .map(OaBrokerRelatedDataset::getOriginalId)
.collect(Collectors.toSet());
return source
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java
index fa5fde7251..49c546bbaf 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java
@@ -4,12 +4,12 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects;
import java.util.ArrayList;
import java.util.List;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
-import eu.dnetlib.broker.objects.Project;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.broker.objects.OaBrokerProject;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-public class EnrichMissingProject extends UpdateMatcher {
+public class EnrichMissingProject extends UpdateMatcher {
public EnrichMissingProject() {
super(true,
@@ -19,7 +19,7 @@ public class EnrichMissingProject extends UpdateMatcher {
}
@Override
- protected List findDifferences(final OpenaireBrokerResult source, final OpenaireBrokerResult target) {
+ protected List findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) {
if (target.getProjects().isEmpty()) {
return source.getProjects();
} else {
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java
index ca63aeb49f..6954a3fb5e 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java
@@ -5,12 +5,12 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
-import eu.dnetlib.broker.objects.Project;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.broker.objects.OaBrokerProject;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-public class EnrichMoreProject extends UpdateMatcher {
+public class EnrichMoreProject extends UpdateMatcher {
public EnrichMoreProject() {
super(true,
@@ -19,13 +19,13 @@ public class EnrichMoreProject extends UpdateMatcher {
prj -> projectAsString(prj));
}
- private static String projectAsString(final Project prj) {
+ private static String projectAsString(final OaBrokerProject prj) {
return prj.getFunder() + "::" + prj.getFundingProgram() + "::" + prj.getCode();
}
@Override
- protected List findDifferences(final OpenaireBrokerResult source,
- final OpenaireBrokerResult target) {
+ protected List findDifferences(final OaBrokerMainEntity source,
+ final OaBrokerMainEntity target) {
final Set existingProjects = target
.getProjects()
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java
index 3008639493..ad6d8263b8 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java
@@ -5,12 +5,12 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
-import eu.dnetlib.broker.objects.Publication;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-public abstract class AbstractEnrichMissingPublication extends UpdateMatcher {
+public abstract class AbstractEnrichMissingPublication extends UpdateMatcher {
public AbstractEnrichMissingPublication(final Topic topic) {
super(true,
@@ -23,15 +23,15 @@ public abstract class AbstractEnrichMissingPublication extends UpdateMatcher findDifferences(
- final OpenaireBrokerResult source,
- final OpenaireBrokerResult target) {
+ protected final List findDifferences(
+ final OaBrokerMainEntity source,
+ final OaBrokerMainEntity target) {
final Set existingPublications = target
.getPublications()
.stream()
.filter(rel -> filterByType(rel.getRelType()))
- .map(Publication::getOriginalId)
+ .map(OaBrokerRelatedPublication::getOriginalId)
.collect(Collectors.toSet());
return source
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java
index 76ae061e67..452caa5033 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java
@@ -4,12 +4,13 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware;
import java.util.ArrayList;
import java.util.List;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMissingSoftware
- extends UpdateMatcher {
+ extends UpdateMatcher {
public EnrichMissingSoftware() {
super(true,
@@ -19,9 +20,9 @@ public class EnrichMissingSoftware
}
@Override
- protected List findDifferences(
- final OpenaireBrokerResult source,
- final OpenaireBrokerResult target) {
+ protected List findDifferences(
+ final OaBrokerMainEntity source,
+ final OaBrokerMainEntity target) {
if (target.getSoftwares().isEmpty()) {
return source.getSoftwares();
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java
index ebd421b8e7..aaffe12498 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java
@@ -5,12 +5,12 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
-import eu.dnetlib.broker.objects.Software;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-public class EnrichMoreSoftware extends UpdateMatcher {
+public class EnrichMoreSoftware extends UpdateMatcher {
public EnrichMoreSoftware() {
super(true,
@@ -20,14 +20,14 @@ public class EnrichMoreSoftware extends UpdateMatcher {
}
@Override
- protected List findDifferences(
- final OpenaireBrokerResult source,
- final OpenaireBrokerResult target) {
+ protected List findDifferences(
+ final OaBrokerMainEntity source,
+ final OaBrokerMainEntity target) {
final Set existingSoftwares = source
.getSoftwares()
.stream()
- .map(Software::getName)
+ .map(OaBrokerRelatedSoftware::getName)
.collect(Collectors.toSet());
return target
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java
index b2cbbce2c5..73462bae8f 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java
@@ -5,7 +5,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
@@ -19,7 +19,7 @@ public class EnrichMissingAbstract extends UpdateMatcher {
}
@Override
- protected List findDifferences(final OpenaireBrokerResult source, final OpenaireBrokerResult target) {
+ protected List findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) {
if (isMissing(target.getAbstracts()) && !isMissing(source.getAbstracts())) {
return Arrays.asList(source.getAbstracts().get(0));
} else {
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java
index c4b96e67bf..2a01188a9d 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java
@@ -7,12 +7,12 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
-import eu.dnetlib.broker.objects.Author;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.OaBrokerAuthor;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-public class EnrichMissingAuthorOrcid extends UpdateMatcher {
+public class EnrichMissingAuthorOrcid extends UpdateMatcher {
public EnrichMissingAuthorOrcid() {
super(true,
@@ -22,13 +22,13 @@ public class EnrichMissingAuthorOrcid extends UpdateMatcher {
}
@Override
- protected List findDifferences(final OpenaireBrokerResult source,
- final OpenaireBrokerResult target) {
+ protected List findDifferences(final OaBrokerMainEntity source,
+ final OaBrokerMainEntity target) {
final Set existingOrcids = target
.getCreators()
.stream()
- .map(Author::getOrcid)
+ .map(OaBrokerAuthor::getOrcid)
.filter(StringUtils::isNotBlank)
.collect(Collectors.toSet());
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java
index e870cf1faa..4873829570 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java
@@ -5,28 +5,28 @@ import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
-import eu.dnetlib.broker.objects.Instance;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.OaBrokerInstance;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
-public class EnrichMissingOpenAccess extends UpdateMatcher {
+public class EnrichMissingOpenAccess extends UpdateMatcher {
public EnrichMissingOpenAccess() {
super(true,
i -> Topic.ENRICH_MISSING_OA_VERSION,
(p, i) -> p.getInstances().add(i),
- Instance::getUrl);
+ OaBrokerInstance::getUrl);
}
@Override
- protected List findDifferences(final OpenaireBrokerResult source,
- final OpenaireBrokerResult target) {
+ protected List findDifferences(final OaBrokerMainEntity source,
+ final OaBrokerMainEntity target) {
final long count = target
.getInstances()
.stream()
- .map(Instance::getLicense)
+ .map(OaBrokerInstance::getLicense)
.filter(right -> right.equals(BrokerConstants.OPEN_ACCESS))
.count();
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java
index cc72d9fa9a..ee1617b1e0 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java
@@ -5,12 +5,12 @@ import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
-import eu.dnetlib.broker.objects.TypedValue;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.broker.objects.OaBrokerTypedValue;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-public class EnrichMissingPid extends UpdateMatcher {
+public class EnrichMissingPid extends UpdateMatcher {
public EnrichMissingPid() {
super(true,
@@ -20,8 +20,8 @@ public class EnrichMissingPid extends UpdateMatcher {
}
@Override
- protected List findDifferences(final OpenaireBrokerResult source,
- final OpenaireBrokerResult target) {
+ protected List findDifferences(final OaBrokerMainEntity source,
+ final OaBrokerMainEntity target) {
final long count = target.getPids().size();
if (count > 0) {
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java
index ed8c26b5ab..2c0533fa35 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java
@@ -5,7 +5,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
@@ -19,8 +19,8 @@ public class EnrichMissingPublicationDate extends UpdateMatcher {
}
@Override
- protected List findDifferences(final OpenaireBrokerResult source,
- final OpenaireBrokerResult target) {
+ protected List findDifferences(final OaBrokerMainEntity source,
+ final OaBrokerMainEntity target) {
if (isMissing(target.getPublicationdate()) && !isMissing(source.getPublicationdate())) {
return Arrays.asList(source.getPublicationdate());
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java
index 07b1fa41a9..9ab9fce488 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java
@@ -5,12 +5,12 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
-import eu.dnetlib.broker.objects.TypedValue;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.broker.objects.OaBrokerTypedValue;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-public class EnrichMissingSubject extends UpdateMatcher {
+public class EnrichMissingSubject extends UpdateMatcher {
public EnrichMissingSubject() {
super(true,
@@ -20,8 +20,8 @@ public class EnrichMissingSubject extends UpdateMatcher {
}
@Override
- protected List findDifferences(final OpenaireBrokerResult source,
- final OpenaireBrokerResult target) {
+ protected List findDifferences(final OaBrokerMainEntity source,
+ final OaBrokerMainEntity target) {
final Set existingSubject = target
.getSubjects()
.stream()
@@ -35,7 +35,7 @@ public class EnrichMissingSubject extends UpdateMatcher {
.collect(Collectors.toList());
}
- private static String subjectAsString(final TypedValue s) {
+ private static String subjectAsString(final OaBrokerTypedValue s) {
return s.getType() + "::" + s.getValue();
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java
index bfef3ee4f6..e90a8f2012 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java
@@ -5,24 +5,24 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
-import eu.dnetlib.broker.objects.Instance;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.OaBrokerInstance;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
-public class EnrichMoreOpenAccess extends UpdateMatcher {
+public class EnrichMoreOpenAccess extends UpdateMatcher {
public EnrichMoreOpenAccess() {
super(true,
i -> Topic.ENRICH_MORE_OA_VERSION,
(p, i) -> p.getInstances().add(i),
- Instance::getUrl);
+ OaBrokerInstance::getUrl);
}
@Override
- protected List findDifferences(final OpenaireBrokerResult source,
- final OpenaireBrokerResult target) {
+ protected List findDifferences(final OaBrokerMainEntity source,
+ final OaBrokerMainEntity target) {
final Set urls = target
.getInstances()
.stream()
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java
index d1f2e60225..43b4f0628d 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java
@@ -5,12 +5,12 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
-import eu.dnetlib.broker.objects.TypedValue;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.broker.objects.OaBrokerTypedValue;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-public class EnrichMorePid extends UpdateMatcher {
+public class EnrichMorePid extends UpdateMatcher {
public EnrichMorePid() {
super(true,
@@ -20,8 +20,8 @@ public class EnrichMorePid extends UpdateMatcher {
}
@Override
- protected List findDifferences(final OpenaireBrokerResult source,
- final OpenaireBrokerResult target) {
+ protected List findDifferences(final OaBrokerMainEntity source,
+ final OaBrokerMainEntity target) {
final Set existingPids = target
.getPids()
.stream()
@@ -35,7 +35,7 @@ public class EnrichMorePid extends UpdateMatcher {
.collect(Collectors.toList());
}
- private static String pidAsString(final TypedValue pid) {
+ private static String pidAsString(final OaBrokerTypedValue pid) {
return pid.getType() + "::" + pid.getValue();
}
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java
index 39225e8ab8..04fb494ef9 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java
@@ -5,12 +5,12 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
-import eu.dnetlib.broker.objects.TypedValue;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.broker.objects.OaBrokerTypedValue;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-public class EnrichMoreSubject extends UpdateMatcher {
+public class EnrichMoreSubject extends UpdateMatcher {
public EnrichMoreSubject() {
super(true,
@@ -20,8 +20,8 @@ public class EnrichMoreSubject extends UpdateMatcher {
}
@Override
- protected List findDifferences(final OpenaireBrokerResult source,
- final OpenaireBrokerResult target) {
+ protected List findDifferences(final OaBrokerMainEntity source,
+ final OaBrokerMainEntity target) {
final Set existingSubjects = target
.getSubjects()
.stream()
@@ -35,7 +35,7 @@ public class EnrichMoreSubject extends UpdateMatcher {
.collect(Collectors.toList());
}
- private static String subjectAsString(final TypedValue s) {
+ private static String subjectAsString(final OaBrokerTypedValue s) {
return s.getType() + "::" + s.getValue();
}
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java
new file mode 100644
index 0000000000..8bcea5e6e3
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java
@@ -0,0 +1,31 @@
+
+package eu.dnetlib.dhp.broker.oa.util;
+
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.common.HdfsSupport;
+
+public class ClusterUtils {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static void removeDir(final SparkSession spark, final String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+ public static Dataset readPath(
+ final SparkSession spark,
+ final String inputPath,
+ final Class clazz) {
+ return spark
+ .read()
+ .textFile(inputPath)
+ .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java
index 730d065199..b61d5e7cc7 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java
@@ -15,8 +15,16 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
-import eu.dnetlib.broker.objects.TypedValue;
+import eu.dnetlib.broker.objects.OaBrokerAuthor;
+import eu.dnetlib.broker.objects.OaBrokerExternalReference;
+import eu.dnetlib.broker.objects.OaBrokerInstance;
+import eu.dnetlib.broker.objects.OaBrokerJournal;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.broker.objects.OaBrokerProject;
+import eu.dnetlib.broker.objects.OaBrokerRelatedDataset;
+import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
+import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
+import eu.dnetlib.broker.objects.OaBrokerTypedValue;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.ExternalReference;
@@ -35,13 +43,13 @@ public class ConversionUtils {
private static final Logger log = LoggerFactory.getLogger(ConversionUtils.class);
- public static List oafInstanceToBrokerInstances(final Instance i) {
+ public static List oafInstanceToBrokerInstances(final Instance i) {
if (i == null) {
return new ArrayList<>();
}
return mappedList(i.getUrl(), url -> {
- final eu.dnetlib.broker.objects.Instance res = new eu.dnetlib.broker.objects.Instance();
+ final OaBrokerInstance res = new OaBrokerInstance();
res.setUrl(url);
res.setInstancetype(classId(i.getInstancetype()));
res.setLicense(BrokerConstants.OPEN_ACCESS);
@@ -50,20 +58,21 @@ public class ConversionUtils {
});
}
- public static TypedValue oafPidToBrokerPid(final StructuredProperty sp) {
+ public static OaBrokerTypedValue oafPidToBrokerPid(final StructuredProperty sp) {
return oafStructPropToBrokerTypedValue(sp);
}
- public static TypedValue oafStructPropToBrokerTypedValue(final StructuredProperty sp) {
- return sp != null ? new TypedValue(classId(sp.getQualifier()), sp.getValue()) : null;
+ public static OaBrokerTypedValue oafStructPropToBrokerTypedValue(final StructuredProperty sp) {
+ return sp != null ? new OaBrokerTypedValue(classId(sp.getQualifier()), sp.getValue()) : null;
}
- public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) {
+ public static final OaBrokerRelatedDataset oafDatasetToBrokerDataset(final Dataset d) {
if (d == null) {
return null;
}
- final eu.dnetlib.broker.objects.Dataset res = new eu.dnetlib.broker.objects.Dataset();
+ final OaBrokerRelatedDataset res = new OaBrokerRelatedDataset();
+ res.setOpenaireId(d.getId());
res.setOriginalId(first(d.getOriginalId()));
res.setTitle(structPropValue(d.getTitle()));
res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid));
@@ -72,12 +81,13 @@ public class ConversionUtils {
return res;
}
- public static eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication p) {
+ public static OaBrokerRelatedPublication oafPublicationToBrokerPublication(final Publication p) {
if (p == null) {
return null;
}
- final eu.dnetlib.broker.objects.Publication res = new eu.dnetlib.broker.objects.Publication();
+ final OaBrokerRelatedPublication res = new OaBrokerRelatedPublication();
+ res.setOpenaireId(p.getId());
res.setOriginalId(first(p.getOriginalId()));
res.setTitle(structPropValue(p.getTitle()));
res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid));
@@ -87,12 +97,12 @@ public class ConversionUtils {
return res;
}
- public static final OpenaireBrokerResult oafResultToBrokerResult(final Result result) {
+ public static final OaBrokerMainEntity oafResultToBrokerResult(final Result result) {
if (result == null) {
return null;
}
- final OpenaireBrokerResult res = new OpenaireBrokerResult();
+ final OaBrokerMainEntity res = new OaBrokerMainEntity();
res.setOpenaireId(result.getId());
res.setOriginalId(first(result.getOriginalId()));
@@ -118,7 +128,7 @@ public class ConversionUtils {
return res;
}
- private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) {
+ private static OaBrokerAuthor oafAuthorToBrokerAuthor(final Author author) {
if (author == null) {
return null;
}
@@ -135,15 +145,15 @@ public class ConversionUtils {
.findFirst()
.orElse(null) : null;
- return new eu.dnetlib.broker.objects.Author(author.getFullname(), pids);
+ return new OaBrokerAuthor(author.getFullname(), pids);
}
- private static eu.dnetlib.broker.objects.Journal oafJournalToBrokerJournal(final Journal journal) {
+ private static OaBrokerJournal oafJournalToBrokerJournal(final Journal journal) {
if (journal == null) {
return null;
}
- final eu.dnetlib.broker.objects.Journal res = new eu.dnetlib.broker.objects.Journal();
+ final OaBrokerJournal res = new OaBrokerJournal();
res.setName(journal.getName());
res.setIssn(journal.getIssnPrinted());
res.setEissn(journal.getIssnOnline());
@@ -152,12 +162,12 @@ public class ConversionUtils {
return res;
}
- private static eu.dnetlib.broker.objects.ExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) {
+ private static OaBrokerExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) {
if (ref == null) {
return null;
}
- final eu.dnetlib.broker.objects.ExternalReference res = new eu.dnetlib.broker.objects.ExternalReference();
+ final OaBrokerExternalReference res = new OaBrokerExternalReference();
res.setRefidentifier(ref.getRefidentifier());
res.setSitename(ref.getSitename());
res.setType(classId(ref.getQualifier()));
@@ -165,12 +175,13 @@ public class ConversionUtils {
return res;
}
- public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) {
+ public static final OaBrokerProject oafProjectToBrokerProject(final Project p) {
if (p == null) {
return null;
}
- final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project();
+ final OaBrokerProject res = new OaBrokerProject();
+ res.setOpenaireId(p.getId());
res.setTitle(fieldValue(p.getTitle()));
res.setAcronym(fieldValue(p.getAcronym()));
res.setCode(fieldValue(p.getCode()));
@@ -190,12 +201,13 @@ public class ConversionUtils {
return res;
}
- public static final eu.dnetlib.broker.objects.Software oafSoftwareToBrokerSoftware(final Software sw) {
+ public static final OaBrokerRelatedSoftware oafSoftwareToBrokerSoftware(final Software sw) {
if (sw == null) {
return null;
}
- final eu.dnetlib.broker.objects.Software res = new eu.dnetlib.broker.objects.Software();
+ final OaBrokerRelatedSoftware res = new OaBrokerRelatedSoftware();
+ res.setOpenaireId(sw.getId());
res.setName(structPropValue(sw.getTitle()));
res.setDescription(fieldValue(sw.getDescription()));
res.setRepository(fieldValue(sw.getCodeRepositoryUrl()));
@@ -247,7 +259,7 @@ public class ConversionUtils {
: new ArrayList<>();
}
- private static List structPropTypedList(final List list) {
+ private static List structPropTypedList(final List list) {
if (list == null) {
return new ArrayList<>();
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java
index 4c20ac5cae..7451e58910 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java
@@ -4,7 +4,7 @@ package eu.dnetlib.dhp.broker.oa.util;
import java.util.ArrayList;
import java.util.List;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.EventFactory;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy;
@@ -68,7 +68,7 @@ public class EventFinder {
public static EventGroup generateEvents(final ResultGroup results, final DedupConfig dedupConfig) {
final List> list = new ArrayList<>();
- for (final OpenaireBrokerResult target : results.getData()) {
+ for (final OaBrokerMainEntity target : results.getData()) {
for (final UpdateMatcher> matcher : matchers) {
list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig));
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
index 2c4bda53d6..25d0d2bcad 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
@@ -9,10 +9,10 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
-import eu.dnetlib.broker.objects.Instance;
-import eu.dnetlib.broker.objects.OpenAireEventPayload;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
-import eu.dnetlib.broker.objects.Provenance;
+import eu.dnetlib.broker.objects.OaBrokerEventPayload;
+import eu.dnetlib.broker.objects.OaBrokerInstance;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.broker.objects.OaBrokerProvenance;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
@@ -25,11 +25,11 @@ public final class UpdateInfo {
private final T highlightValue;
- private final OpenaireBrokerResult source;
+ private final OaBrokerMainEntity source;
- private final OpenaireBrokerResult target;
+ private final OaBrokerMainEntity target;
- private final BiConsumer compileHighlight;
+ private final BiConsumer compileHighlight;
private final Function highlightToString;
@@ -37,9 +37,9 @@ public final class UpdateInfo {
private static final Logger log = LoggerFactory.getLogger(UpdateInfo.class);
- public UpdateInfo(final Topic topic, final T highlightValue, final OpenaireBrokerResult source,
- final OpenaireBrokerResult target,
- final BiConsumer compileHighlight,
+ public UpdateInfo(final Topic topic, final T highlightValue, final OaBrokerMainEntity source,
+ final OaBrokerMainEntity target,
+ final BiConsumer compileHighlight,
final Function highlightToString,
final DedupConfig dedupConfig) {
this.topic = topic;
@@ -55,17 +55,17 @@ public final class UpdateInfo {
return highlightValue;
}
- public OpenaireBrokerResult getSource() {
+ public OaBrokerMainEntity getSource() {
return source;
}
- public OpenaireBrokerResult getTarget() {
+ public OaBrokerMainEntity getTarget() {
return target;
}
private float calculateTrust(final DedupConfig dedupConfig,
- final OpenaireBrokerResult r1,
- final OpenaireBrokerResult r2) {
+ final OaBrokerMainEntity r1,
+ final OaBrokerMainEntity r2) {
if (dedupConfig == null) {
return BrokerConstants.MIN_TRUST;
@@ -104,11 +104,11 @@ public final class UpdateInfo {
return highlightToString.apply(getHighlightValue());
}
- public OpenAireEventPayload asBrokerPayload() {
+ public OaBrokerEventPayload asBrokerPayload() {
compileHighlight.accept(target, getHighlightValue());
- final OpenaireBrokerResult hl = new OpenaireBrokerResult();
+ final OaBrokerMainEntity hl = new OaBrokerMainEntity();
compileHighlight.accept(hl, getHighlightValue());
final String provId = getSource().getOriginalId();
@@ -117,14 +117,14 @@ public final class UpdateInfo {
final String provUrl = getSource()
.getInstances()
.stream()
- .map(Instance::getUrl)
+ .map(OaBrokerInstance::getUrl)
.findFirst()
.orElse(null);
;
- final Provenance provenance = new Provenance(provId, provRepo, provUrl);
+ final OaBrokerProvenance provenance = new OaBrokerProvenance(provId, provRepo, provUrl);
- final OpenAireEventPayload res = new OpenAireEventPayload();
+ final OaBrokerEventPayload res = new OaBrokerEventPayload();
res.setResult(target);
res.setHighlight(hl);
res.setTrust(trust);
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java
index a46fde445d..ee1c8963e8 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java
@@ -5,11 +5,11 @@ import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
-public class ResultAggregator extends Aggregator, ResultGroup, ResultGroup> {
+public class ResultAggregator extends Aggregator, ResultGroup, ResultGroup> {
/**
*
@@ -22,7 +22,7 @@ public class ResultAggregator extends Aggregator t) {
+ public ResultGroup reduce(final ResultGroup group, final Tuple2 t) {
group.getData().add(t._1);
return group;
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java
index 3f9dbe8af9..e718e0f1c4 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java
@@ -5,7 +5,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
public class ResultGroup implements Serializable {
@@ -14,13 +14,13 @@ public class ResultGroup implements Serializable {
*/
private static final long serialVersionUID = -3360828477088669296L;
- private List data = new ArrayList<>();
+ private List data = new ArrayList<>();
- public List getData() {
+ public List getData() {
return data;
}
- public void setData(final List data) {
+ public void setData(final List data) {
this.data = data;
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OaBrokerMainEntityAggregator.java
similarity index 59%
rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java
rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OaBrokerMainEntityAggregator.java
index e72dcb9880..6a2d9b06dc 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OaBrokerMainEntityAggregator.java
@@ -5,11 +5,11 @@ import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
-import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import scala.Tuple2;
-public class OpenaireBrokerResultAggregator
- extends Aggregator, OpenaireBrokerResult, OpenaireBrokerResult> {
+public class OaBrokerMainEntityAggregator
+ extends Aggregator, OaBrokerMainEntity, OaBrokerMainEntity> {
/**
*
@@ -17,17 +17,17 @@ public class OpenaireBrokerResultAggregator
private static final long serialVersionUID = -3687878788861013488L;
@Override
- public OpenaireBrokerResult zero() {
- return new OpenaireBrokerResult();
+ public OaBrokerMainEntity zero() {
+ return new OaBrokerMainEntity();
}
@Override
- public OpenaireBrokerResult finish(final OpenaireBrokerResult g) {
+ public OaBrokerMainEntity finish(final OaBrokerMainEntity g) {
return g;
}
@Override
- public OpenaireBrokerResult reduce(final OpenaireBrokerResult g, final Tuple2 t) {
+ public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2 t) {
if (g.getOriginalId() == null) {
return t._1;
} else if (t._2 instanceof RelatedSoftware) {
@@ -38,13 +38,15 @@ public class OpenaireBrokerResultAggregator
g.getPublications().add(((RelatedPublication) t._2).getRelPublication());
} else if (t._2 instanceof RelatedProject) {
g.getProjects().add(((RelatedProject) t._2).getRelProject());
+ } else {
+ throw new RuntimeException("Invalid Object: " + t._2.getClass());
}
return g;
}
@Override
- public OpenaireBrokerResult merge(final OpenaireBrokerResult g1, final OpenaireBrokerResult g2) {
+ public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (g1.getOriginalId() != null) {
g1.getSoftwares().addAll(g2.getSoftwares());
g1.getDatasets().addAll(g2.getDatasets());
@@ -57,13 +59,13 @@ public class OpenaireBrokerResultAggregator
}
@Override
- public Encoder bufferEncoder() {
- return Encoders.bean(OpenaireBrokerResult.class);
+ public Encoder bufferEncoder() {
+ return Encoders.bean(OaBrokerMainEntity.class);
}
@Override
- public Encoder outputEncoder() {
- return Encoders.bean(OpenaireBrokerResult.class);
+ public Encoder outputEncoder() {
+ return Encoders.bean(OaBrokerMainEntity.class);
}
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java
index 6a5fb258cc..daf75ea2ef 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java
@@ -3,7 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import java.io.Serializable;
-import eu.dnetlib.broker.objects.Dataset;
+import eu.dnetlib.broker.objects.OaBrokerRelatedDataset;
public class RelatedDataset implements Serializable {
@@ -13,12 +13,12 @@ public class RelatedDataset implements Serializable {
private static final long serialVersionUID = 774487705184038324L;
private String source;
private String relType;
- private Dataset relDataset;
+ private OaBrokerRelatedDataset relDataset;
public RelatedDataset() {
}
- public RelatedDataset(final String source, final String relType, final Dataset relDataset) {
+ public RelatedDataset(final String source, final String relType, final OaBrokerRelatedDataset relDataset) {
this.source = source;
this.relType = relType;
this.relDataset = relDataset;
@@ -40,11 +40,11 @@ public class RelatedDataset implements Serializable {
this.relType = relType;
}
- public Dataset getRelDataset() {
+ public OaBrokerRelatedDataset getRelDataset() {
return relDataset;
}
- public void setRelDataset(final Dataset relDataset) {
+ public void setRelDataset(final OaBrokerRelatedDataset relDataset) {
this.relDataset = relDataset;
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java
index fafec1e19c..4116c8c77b 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java
@@ -3,7 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import java.io.Serializable;
-import eu.dnetlib.broker.objects.Project;
+import eu.dnetlib.broker.objects.OaBrokerProject;
public class RelatedProject implements Serializable {
@@ -14,12 +14,12 @@ public class RelatedProject implements Serializable {
private String source;
private String relType;
- private Project relProject;
+ private OaBrokerProject relProject;
public RelatedProject() {
}
- public RelatedProject(final String source, final String relType, final Project relProject) {
+ public RelatedProject(final String source, final String relType, final OaBrokerProject relProject) {
this.source = source;
this.relType = relType;
this.relProject = relProject;
@@ -41,11 +41,11 @@ public class RelatedProject implements Serializable {
this.relType = relType;
}
- public Project getRelProject() {
+ public OaBrokerProject getRelProject() {
return relProject;
}
- public void setRelProject(final Project relProject) {
+ public void setRelProject(final OaBrokerProject relProject) {
this.relProject = relProject;
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java
index 8a31ddf7e2..9e222a9523 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java
@@ -3,7 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import java.io.Serializable;
-import eu.dnetlib.broker.objects.Publication;
+import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
public class RelatedPublication implements Serializable {
@@ -14,12 +14,13 @@ public class RelatedPublication implements Serializable {
private String source;
private String relType;
- private Publication relPublication;
+ private OaBrokerRelatedPublication relPublication;
public RelatedPublication() {
}
- public RelatedPublication(final String source, final String relType, final Publication relPublication) {
+ public RelatedPublication(final String source, final String relType,
+ final OaBrokerRelatedPublication relPublication) {
this.source = source;
this.relType = relType;
this.relPublication = relPublication;
@@ -41,11 +42,11 @@ public class RelatedPublication implements Serializable {
this.relType = relType;
}
- public Publication getRelPublication() {
+ public OaBrokerRelatedPublication getRelPublication() {
return relPublication;
}
- public void setRelPublication(final Publication relPublication) {
+ public void setRelPublication(final OaBrokerRelatedPublication relPublication) {
this.relPublication = relPublication;
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java
index 3193874696..2f3b8668c2 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java
@@ -3,7 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import java.io.Serializable;
-import eu.dnetlib.broker.objects.Software;
+import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
public class RelatedSoftware implements Serializable {
@@ -13,12 +13,12 @@ public class RelatedSoftware implements Serializable {
private static final long serialVersionUID = 7573383356943300157L;
private String source;
private String relType;
- private Software relSoftware;
+ private OaBrokerRelatedSoftware relSoftware;
public RelatedSoftware() {
}
- public RelatedSoftware(final String source, final String relType, final Software relSoftware) {
+ public RelatedSoftware(final String source, final String relType, final OaBrokerRelatedSoftware relSoftware) {
this.source = source;
this.relType = relType;
this.relSoftware = relSoftware;
@@ -40,11 +40,11 @@ public class RelatedSoftware implements Serializable {
this.relType = relType;
}
- public Software getRelSoftware() {
+ public OaBrokerRelatedSoftware getRelSoftware() {
return relSoftware;
}
- public void setRelSoftware(final Software relSoftware) {
+ public void setRelSoftware(final OaBrokerRelatedSoftware relSoftware) {
this.relSoftware = relSoftware;
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_relations.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_relations.json
new file mode 100644
index 0000000000..32fd1d8f32
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_relations.json
@@ -0,0 +1,14 @@
+[
+ {
+ "paramName": "g",
+ "paramLongName": "graphPath",
+ "paramDescription": "the path where there the graph is stored",
+ "paramRequired": true
+ },
+ {
+ "paramName": "o",
+ "paramLongName": "relsPath",
+ "paramDescription": "the path where the generated relations will be stored",
+ "paramRequired": true
+ }
+]
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json
new file mode 100644
index 0000000000..6f5e330f60
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json
@@ -0,0 +1,14 @@
+[
+ {
+ "paramName": "g",
+ "paramLongName": "graphPath",
+ "paramDescription": "the path where there the graph is stored",
+ "paramRequired": true
+ },
+ {
+ "paramName": "o",
+ "paramLongName": "simpleEntitiesPath",
+ "paramDescription": "the path where the generated simple entities (without relations) will be stored",
+ "paramRequired": true
+ }
+]