diff --git a/dhp-build/dhp-build-assembly-resources/pom.xml b/dhp-build/dhp-build-assembly-resources/pom.xml
index 2a89a26fd3..012ff89a36 100644
--- a/dhp-build/dhp-build-assembly-resources/pom.xml
+++ b/dhp-build/dhp-build-assembly-resources/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp-build
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-build-assembly-resources
diff --git a/dhp-build/dhp-build-properties-maven-plugin/pom.xml b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
index 5be114e3c6..256017e2c7 100644
--- a/dhp-build/dhp-build-properties-maven-plugin/pom.xml
+++ b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp-build
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-build-properties-maven-plugin
diff --git a/dhp-build/dhp-code-style/pom.xml b/dhp-build/dhp-code-style/pom.xml
index 515ed35cee..e60e8076ef 100644
--- a/dhp-build/dhp-code-style/pom.xml
+++ b/dhp-build/dhp-code-style/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp-code-style
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
jar
diff --git a/dhp-build/pom.xml b/dhp-build/pom.xml
index d2dcbc36e6..12b999b9c8 100644
--- a/dhp-build/pom.xml
+++ b/dhp-build/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-build
pom
diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index 0e7652dd3e..0819a8bd20 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
../
diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml
index 56fb8ead2a..2e5652b430 100644
--- a/dhp-schemas/pom.xml
+++ b/dhp-schemas/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
../
diff --git a/dhp-workflows/dhp-actionmanager/pom.xml b/dhp-workflows/dhp-actionmanager/pom.xml
index b50c6705b2..0b4d25700b 100644
--- a/dhp-workflows/dhp-actionmanager/pom.xml
+++ b/dhp-workflows/dhp-actionmanager/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp-workflows
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-actionmanager
diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml
index c04910a583..a1bc1c483d 100644
--- a/dhp-workflows/dhp-aggregation/pom.xml
+++ b/dhp-workflows/dhp-aggregation/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp-workflows
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-aggregation
diff --git a/dhp-workflows/dhp-blacklist/pom.xml b/dhp-workflows/dhp-blacklist/pom.xml
index 04d334cd75..9c25f7b293 100644
--- a/dhp-workflows/dhp-blacklist/pom.xml
+++ b/dhp-workflows/dhp-blacklist/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml
index 8d7d3b88c5..424015a3c6 100644
--- a/dhp-workflows/dhp-broker-events/pom.xml
+++ b/dhp-workflows/dhp-broker-events/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
deleted file mode 100644
index db59920106..0000000000
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
+++ /dev/null
@@ -1,190 +0,0 @@
-
-package eu.dnetlib.dhp.broker.oa;
-
-import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
-
-import java.util.Optional;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.TypedColumn;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import eu.dnetlib.broker.objects.OaBrokerMainEntity;
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.broker.model.Event;
-import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
-import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
-import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
-import eu.dnetlib.dhp.broker.oa.util.EventFinder;
-import eu.dnetlib.dhp.broker.oa.util.EventGroup;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OaBrokerMainEntityAggregator;
-import eu.dnetlib.dhp.schema.oaf.Publication;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.oaf.Result;
-import eu.dnetlib.dhp.utils.ISLookupClientFactory;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
-import eu.dnetlib.pace.config.DedupConfig;
-import scala.Tuple2;
-
-public class GenerateEventsApplication {
-
- private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class);
-
- public static void main(final String[] args) throws Exception {
- final ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils
- .toString(
- GenerateEventsApplication.class
- .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json")));
- parser.parseArgument(args);
-
- final Boolean isSparkSessionManaged = Optional
- .ofNullable(parser.get("isSparkSessionManaged"))
- .map(Boolean::valueOf)
- .orElse(Boolean.TRUE);
- log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
-
- final String graphPath = parser.get("graphPath");
- log.info("graphPath: {}", graphPath);
-
- final String eventsPath = parser.get("eventsPath");
- log.info("eventsPath: {}", eventsPath);
-
- final String isLookupUrl = parser.get("isLookupUrl");
- log.info("isLookupUrl: {}", isLookupUrl);
-
- final String dedupConfigProfileId = parser.get("dedupConfProfile");
- log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
-
- final SparkConf conf = new SparkConf();
- // conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- // conf.registerKryoClasses(BrokerConstants.getModelClasses());
-
- // TODO UNCOMMENT
- // final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
- final DedupConfig dedupConfig = null;
-
- runWithSparkSession(conf, isSparkSessionManaged, spark -> {
-
- ClusterUtils.removeDir(spark, eventsPath);
-
- // TODO REMOVE THIS
-
- expandResultsWithRelations(spark, graphPath, Publication.class)
- .write()
- .mode(SaveMode.Overwrite)
- .json(eventsPath);
-
- // TODO UNCOMMENT THIS
- // spark
- // .emptyDataset(Encoders.bean(Event.class))
- // .union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
- // .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
- // .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
- // .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
- // .write()
- // .mode(SaveMode.Overwrite)
- // .option("compression", "gzip")
- // .json(eventsPath);
- });
-
- }
-
- private static Dataset generateEvents(
- final SparkSession spark,
- final String graphPath,
- final Class sourceClass,
- final DedupConfig dedupConfig) {
-
- final Dataset results = expandResultsWithRelations(spark, graphPath, sourceClass);
-
- final Dataset mergedRels = ClusterUtils
- .readPath(spark, graphPath + "/relation", Relation.class)
- .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
-
- final TypedColumn, ResultGroup> aggr = new ResultAggregator()
- .toColumn();
-
- return results
- .joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner")
- .groupByKey(
- (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING())
- .agg(aggr)
- .map((MapFunction, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class))
- .filter(rg -> rg.getData().size() > 1)
- .map(
- (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig),
- Encoders.bean(EventGroup.class))
- .flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class));
- }
-
- private static Dataset expandResultsWithRelations(
- final SparkSession spark,
- final String graphPath,
- final Class sourceClass) {
-
- // final Dataset datasets = readPath(
- // spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
- // final Dataset softwares = readPath(spark, graphPath + "/software", Software.class);
- // final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class);
-
- final Dataset r0 = ClusterUtils
- .readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
- .filter(r -> r.getDataInfo().getDeletedbyinference())
- .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OaBrokerMainEntity.class));
-
- // TODO UNCOMMENT THIS
- // final Dataset r1 = join(r0, relatedProjects(spark, graphPath));
- // final Dataset r2 = join(r1, relatedDataset(spark, graphPath));
- // final Dataset r3 = join(r2, relatedPublications(spark, graphPath));
- // final Dataset r4 = join(r3, relatedSoftwares(spark, graphPath));
-
- return r0; // TODO it should be r4
- }
-
- private static Dataset join(final Dataset sources,
- final Dataset typedRels) {
-
- final TypedColumn, OaBrokerMainEntity> aggr = new OaBrokerMainEntityAggregator()
- .toColumn();
-
- return sources
- .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
- .groupByKey(
- (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
- .agg(aggr)
- .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
-
- }
-
- private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
-
- final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
-
- final String conf = isLookUpService
- .getResourceProfileByQuery(
- String
- .format(
- "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
- profId));
-
- final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
- dedupConfig.getPace().initModel();
- dedupConfig.getPace().initTranslationMap();
- // dedupConfig.getWf().setConfigurationId("???");
-
- return dedupConfig;
- }
-
-}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java
new file mode 100644
index 0000000000..3ea0086ff5
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java
@@ -0,0 +1,106 @@
+
+package eu.dnetlib.dhp.broker.oa;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.broker.model.Event;
+import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
+import eu.dnetlib.dhp.broker.oa.util.EventFinder;
+import eu.dnetlib.dhp.broker.oa.util.EventGroup;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import eu.dnetlib.pace.config.DedupConfig;
+
+public class GenerateEventsJob {
+
+ private static final Logger log = LoggerFactory.getLogger(GenerateEventsJob.class);
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ GenerateEventsJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_events.json")));
+ parser.parseArgument(args);
+
+ final Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String graphPath = parser.get("graphPath");
+ log.info("graphPath: {}", graphPath);
+
+ final String workingPath = parser.get("workingPath");
+ log.info("workingPath: {}", workingPath);
+
+ final String isLookupUrl = parser.get("isLookupUrl");
+ log.info("isLookupUrl: {}", isLookupUrl);
+
+ final String dedupConfigProfileId = parser.get("dedupConfProfile");
+ log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
+
+ final String eventsPath = workingPath + "/eventsPath";
+ log.info("eventsPath: {}", eventsPath);
+
+ final SparkConf conf = new SparkConf();
+
+ // TODO UNCOMMENT
+ // final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
+ final DedupConfig dedupConfig = null;
+
+ runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+
+ ClusterUtils.removeDir(spark, eventsPath);
+
+ final Dataset groups = ClusterUtils
+ .readPath(spark, graphPath + "/relation", ResultGroup.class);
+
+ final Dataset events = groups
+ .map(
+ (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig),
+ Encoders.bean(EventGroup.class))
+ .flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class));
+
+ events.write().mode(SaveMode.Overwrite).json(eventsPath);
+
+ });
+
+ }
+
+ private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
+
+ final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
+
+ final String conf = isLookUpService
+ .getResourceProfileByQuery(
+ String
+ .format(
+ "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
+ profId));
+
+ final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
+ dedupConfig.getPace().initModel();
+ dedupConfig.getPace().initTranslationMap();
+ // dedupConfig.getWf().setConfigurationId("???");
+
+ return dedupConfig;
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinEntitiesJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinEntitiesJob.java
new file mode 100644
index 0000000000..dac308f360
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinEntitiesJob.java
@@ -0,0 +1,94 @@
+
+package eu.dnetlib.dhp.broker.oa;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.TypedColumn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OaBrokerMainEntityAggregator;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
+import scala.Tuple2;
+
+public class JoinEntitiesJob {
+
+ private static final Logger log = LoggerFactory.getLogger(JoinEntitiesJob.class);
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ JoinEntitiesJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
+ parser.parseArgument(args);
+
+ final Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String graphPath = parser.get("graphPath");
+ log.info("graphPath: {}", graphPath);
+
+ final String workingPath = parser.get("workingPath");
+ log.info("workingPath: {}", workingPath);
+
+ final String joinedEntitiesPath = workingPath + "/joinedEntities";
+ log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
+
+ final SparkConf conf = new SparkConf();
+
+ runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+
+ ClusterUtils.removeDir(spark, joinedEntitiesPath);
+
+ final Dataset r0 = ClusterUtils
+ .readPath(spark, graphPath + "/simpleEntities", OaBrokerMainEntity.class);
+
+ final Dataset r1 = join(
+ r0, ClusterUtils.readPath(spark, graphPath + "/relatedProjects", RelatedProject.class));
+ final Dataset r2 = join(
+ r1, ClusterUtils.readPath(spark, graphPath + "/relatedDatasets", RelatedDataset.class));
+ final Dataset r3 = join(
+ r2, ClusterUtils.readPath(spark, graphPath + "/relatedPublications", RelatedPublication.class));
+ final Dataset r4 = join(
+ r3, ClusterUtils.readPath(spark, graphPath + "/relatedSoftwares", RelatedSoftware.class));
+
+ r4.write().mode(SaveMode.Overwrite).json(joinedEntitiesPath);
+
+ });
+
+ }
+
+ private static Dataset join(final Dataset sources,
+ final Dataset typedRels) {
+
+ final TypedColumn, OaBrokerMainEntity> aggr = new OaBrokerMainEntityAggregator()
+ .toColumn();
+
+ return sources
+ .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
+ .groupByKey(
+ (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
+ .agg(aggr)
+ .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java
new file mode 100644
index 0000000000..aa057eee8a
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java
@@ -0,0 +1,88 @@
+
+package eu.dnetlib.dhp.broker.oa;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.TypedColumn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
+import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import scala.Tuple2;
+
+public class PrepareGroupsJob {
+
+ private static final Logger log = LoggerFactory.getLogger(PrepareGroupsJob.class);
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ JoinEntitiesJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
+ parser.parseArgument(args);
+
+ final Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String graphPath = parser.get("graphPath");
+ log.info("graphPath: {}", graphPath);
+
+ final String workingPath = parser.get("workingPath");
+ log.info("workingPath: {}", workingPath);
+
+ final String groupsPath = workingPath + "/groups";
+ log.info("groupsPath: {}", groupsPath);
+
+ final SparkConf conf = new SparkConf();
+
+ runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+
+ ClusterUtils.removeDir(spark, groupsPath);
+
+ final Dataset results = ClusterUtils
+ .readPath(spark, graphPath + "/joinedEntities", OaBrokerMainEntity.class);
+
+ final Dataset mergedRels = ClusterUtils
+ .readPath(spark, graphPath + "/relation", Relation.class)
+ .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
+
+ final TypedColumn, ResultGroup> aggr = new ResultAggregator()
+ .toColumn();
+
+ final Dataset groups = results
+ .joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner")
+ .groupByKey(
+ (MapFunction, String>) t -> t._2.getTarget(),
+ Encoders.STRING())
+ .agg(aggr)
+ .map(
+ (MapFunction, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class))
+ .filter(rg -> rg.getData().size() > 1);
+
+ groups
+ .write()
+ .mode(SaveMode.Overwrite)
+ .json(groupsPath);
+
+ });
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedDatasets.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java
similarity index 82%
rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedDatasets.java
rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java
index 4a10fbabf1..edf9b9a7e6 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedDatasets.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java
@@ -19,16 +19,16 @@ import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
import eu.dnetlib.dhp.schema.oaf.Relation;
-public class GenerateRelatedDatasets {
+public class PrepareRelatedDatasetsJob {
- private static final Logger log = LoggerFactory.getLogger(GenerateRelatedDatasets.class);
+ private static final Logger log = LoggerFactory.getLogger(PrepareRelatedDatasetsJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
- GenerateRelatedDatasets.class
- .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json")));
+ PrepareRelatedDatasetsJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
@@ -40,7 +40,10 @@ public class GenerateRelatedDatasets {
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
- final String relsPath = parser.get("relsPath");
+ final String workingPath = parser.get("workingPath");
+ log.info("workingPath: {}", workingPath);
+
+ final String relsPath = workingPath + "/relatedDatasets";
log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf();
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedProjects.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java
similarity index 84%
rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedProjects.java
rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java
index 59ed388e7c..00957972ad 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedProjects.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java
@@ -23,9 +23,9 @@ import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation;
-public class GenerateRelatedProjects {
+public class PrepareRelatedProjectsJob {
- private static final Logger log = LoggerFactory.getLogger(GenerateRelatedProjects.class);
+ private static final Logger log = LoggerFactory.getLogger(PrepareRelatedProjectsJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -33,8 +33,8 @@ public class GenerateRelatedProjects {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
- GenerateRelatedProjects.class
- .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json")));
+ PrepareRelatedProjectsJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
@@ -46,7 +46,10 @@ public class GenerateRelatedProjects {
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
- final String relsPath = parser.get("relsPath");
+ final String workingPath = parser.get("workingPath");
+ log.info("workingPath: {}", workingPath);
+
+ final String relsPath = workingPath + "/relatedProjects";
log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf();
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedPublications.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java
similarity index 83%
rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedPublications.java
rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java
index 0c20081dc8..945fd9ed77 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedPublications.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java
@@ -22,9 +22,9 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
-public class GenerateRelatedPublications {
+public class PrepareRelatedPublicationsJob {
- private static final Logger log = LoggerFactory.getLogger(GenerateRelatedPublications.class);
+ private static final Logger log = LoggerFactory.getLogger(PrepareRelatedPublicationsJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -32,8 +32,8 @@ public class GenerateRelatedPublications {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
- GenerateRelatedPublications.class
- .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json")));
+ PrepareRelatedPublicationsJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
@@ -45,7 +45,10 @@ public class GenerateRelatedPublications {
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
- final String relsPath = parser.get("relsPath");
+ final String workingPath = parser.get("workingPath");
+ log.info("workingPath: {}", workingPath);
+
+ final String relsPath = workingPath + "/relatedPublications";
log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf();
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedSoftwares.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java
similarity index 83%
rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedSoftwares.java
rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java
index b957888463..edb8dc1c3a 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedSoftwares.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java
@@ -22,9 +22,9 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software;
-public class GenerateRelatedSoftwares {
+public class PrepareRelatedSoftwaresJob {
- private static final Logger log = LoggerFactory.getLogger(GenerateRelatedSoftwares.class);
+ private static final Logger log = LoggerFactory.getLogger(PrepareRelatedSoftwaresJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -32,8 +32,8 @@ public class GenerateRelatedSoftwares {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
- GenerateRelatedSoftwares.class
- .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json")));
+ PrepareRelatedSoftwaresJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
@@ -45,7 +45,10 @@ public class GenerateRelatedSoftwares {
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
- final String relsPath = parser.get("relsPath");
+ final String workingPath = parser.get("workingPath");
+ log.info("workingPath: {}", workingPath);
+
+ final String relsPath = workingPath + "/relatedSoftwares";
log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf();
@@ -53,6 +56,7 @@ public class GenerateRelatedSoftwares {
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, relsPath);
+
final Dataset softwares = ClusterUtils.readPath(spark, graphPath + "/software", Software.class);
final Dataset rels = ClusterUtils.readPath(spark, graphPath + "/relation", Relation.class);
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateSimpleEntitities.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java
similarity index 68%
rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateSimpleEntitities.java
rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java
index 59485d5cf0..213003db28 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateSimpleEntitities.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java
@@ -18,19 +18,21 @@ import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
+import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Result;
+import eu.dnetlib.dhp.schema.oaf.Software;
-public class GenerateSimpleEntitities {
+public class PrepareSimpleEntititiesJob {
- private static final Logger log = LoggerFactory.getLogger(GenerateSimpleEntitities.class);
+ private static final Logger log = LoggerFactory.getLogger(PrepareSimpleEntititiesJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
- GenerateSimpleEntitities.class
- .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json")));
+ PrepareSimpleEntititiesJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
@@ -42,7 +44,10 @@ public class GenerateSimpleEntitities {
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
- final String simpleEntitiesPath = parser.get("simpleEntitiesPath");
+ final String workingPath = parser.get("workingPath");
+ log.info("workingPath: {}", workingPath);
+
+ final String simpleEntitiesPath = workingPath + "/simpleEntities";
log.info("simpleEntitiesPath: {}", simpleEntitiesPath);
final SparkConf conf = new SparkConf();
@@ -51,27 +56,18 @@ public class GenerateSimpleEntitities {
ClusterUtils.removeDir(spark, simpleEntitiesPath);
- expandResultsWithRelations(spark, graphPath, Publication.class)
+ prepareSimpleEntities(spark, graphPath, Publication.class)
+ .union(prepareSimpleEntities(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class))
+ .union(prepareSimpleEntities(spark, graphPath, Software.class))
+ .union(prepareSimpleEntities(spark, graphPath, OtherResearchProduct.class))
.write()
.mode(SaveMode.Overwrite)
.json(simpleEntitiesPath);
-
- // TODO UNCOMMENT THIS
- // spark
- // .emptyDataset(Encoders.bean(Event.class))
- // .union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
- // .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
- // .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
- // .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
- // .write()
- // .mode(SaveMode.Overwrite)
- // .option("compression", "gzip")
- // .json(eventsPath);
});
}
- private static Dataset expandResultsWithRelations(
+ private static Dataset prepareSimpleEntities(
final SparkSession spark,
final String graphPath,
final Class sourceClass) {
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java
index 8bcea5e6e3..15a1ddd880 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java
@@ -14,6 +14,10 @@ public class ClusterUtils {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ public static void createDirIfMissing(final SparkSession spark, final String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
public static void removeDir(final SparkSession spark, final String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_relations.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/common_params.json
similarity index 64%
rename from dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_relations.json
rename to dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/common_params.json
index 32fd1d8f32..adee1888a1 100644
--- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_relations.json
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/common_params.json
@@ -7,8 +7,8 @@
},
{
"paramName": "o",
- "paramLongName": "relsPath",
- "paramDescription": "the path where the generated relations will be stored",
+ "paramLongName": "workingPath",
+ "paramDescription": "the path where the temporary data will be stored",
"paramRequired": true
}
]
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml
index ea9aabcfce..9783fcab60 100644
--- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml
@@ -6,8 +6,8 @@
the path where the graph is stored
- eventsOutputPath
- the path where the the events will be stored
+ workingPath
+ the path where the the generated data will be stored
isLookupUrl
@@ -73,18 +73,34 @@
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
yarn
cluster
- GenerateEvents
- eu.dnetlib.dhp.broker.oa.GenerateEventsApplication
+ PrepareSimpleEntititiesJob
+ eu.dnetlib.dhp.broker.oa.PrepareSimpleEntititiesJob
dhp-broker-events-${projectVersion}.jar
--executor-cores=${sparkExecutorCores}
@@ -97,14 +113,183 @@
--conf spark.sql.shuffle.partitions=3840
--graphPath${graphInputPath}
- --eventsPath${eventsOutputPath}
+ --workingPath${workingPath}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ PrepareRelatedDatasetsJob
+ eu.dnetlib.dhp.broker.oa.PrepareRelatedDatasetsJob
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --graphPath${graphInputPath}
+ --workingPath${workingPath}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ PrepareRelatedProjectsJob
+ eu.dnetlib.dhp.broker.oa.PrepareRelatedProjectsJob
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --graphPath${graphInputPath}
+ --workingPath${workingPath}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ PrepareRelatedPublicationsJob
+ eu.dnetlib.dhp.broker.oa.PrepareRelatedPublicationsJob
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --graphPath${graphInputPath}
+ --workingPath${workingPath}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ PrepareRelatedSoftwaresJob
+ eu.dnetlib.dhp.broker.oa.PrepareRelatedSoftwaresJob
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --graphPath${graphInputPath}
+ --workingPath${workingPath}
+
+
+
+
+
+
+
+
+
+ yarn
+ cluster
+ JoinEntitiesJob
+ eu.dnetlib.dhp.broker.oa.JoinEntitiesJob
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --graphPath${graphInputPath}
+ --workingPath${workingPath}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ PrepareGroupsJob
+ eu.dnetlib.dhp.broker.oa.PrepareGroupsJob
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --graphPath${graphInputPath}
+ --workingPath${workingPath}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ GenerateEventsJob
+ eu.dnetlib.dhp.broker.oa.GenerateEventsJob
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --graphPath${graphInputPath}
+ --workingPath${workingPath}
--isLookupUrl${isLookupUrl}
--dedupConfProfile${dedupConfProfId}
-
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_broker_events.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json
similarity index 94%
rename from dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_broker_events.json
rename to dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json
index 6ab6d9a2dd..d185bc73da 100644
--- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_broker_events.json
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json
@@ -7,7 +7,7 @@
},
{
"paramName": "o",
- "paramLongName": "eventsPath",
+ "paramLongName": "workingPath",
"paramDescription": "the path where the generated events will be stored",
"paramRequired": true
},
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json
deleted file mode 100644
index 6f5e330f60..0000000000
--- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json
+++ /dev/null
@@ -1,14 +0,0 @@
-[
- {
- "paramName": "g",
- "paramLongName": "graphPath",
- "paramDescription": "the path where there the graph is stored",
- "paramRequired": true
- },
- {
- "paramName": "o",
- "paramLongName": "simpleEntitiesPath",
- "paramDescription": "the path where the generated simple entities (without relations) will be stored",
- "paramRequired": true
- }
-]
diff --git a/dhp-workflows/dhp-dedup-openaire/pom.xml b/dhp-workflows/dhp-dedup-openaire/pom.xml
index 1f5f2620ea..03ddbcf4c8 100644
--- a/dhp-workflows/dhp-dedup-openaire/pom.xml
+++ b/dhp-workflows/dhp-dedup-openaire/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
4.0.0
dhp-dedup-openaire
diff --git a/dhp-workflows/dhp-dedup-scholexplorer/pom.xml b/dhp-workflows/dhp-dedup-scholexplorer/pom.xml
index e9e11b417e..aa4070b010 100644
--- a/dhp-workflows/dhp-dedup-scholexplorer/pom.xml
+++ b/dhp-workflows/dhp-dedup-scholexplorer/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-distcp/pom.xml b/dhp-workflows/dhp-distcp/pom.xml
index 5707ddfc54..8c10538c01 100644
--- a/dhp-workflows/dhp-distcp/pom.xml
+++ b/dhp-workflows/dhp-distcp/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-doiboost/pom.xml b/dhp-workflows/dhp-doiboost/pom.xml
index 39bb81ec13..3299c1496c 100644
--- a/dhp-workflows/dhp-doiboost/pom.xml
+++ b/dhp-workflows/dhp-doiboost/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-enrichment/pom.xml b/dhp-workflows/dhp-enrichment/pom.xml
index e71a72f3e3..d0ab77cc55 100644
--- a/dhp-workflows/dhp-enrichment/pom.xml
+++ b/dhp-workflows/dhp-enrichment/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml
index fd088cd2a2..a5bf61b5ab 100644
--- a/dhp-workflows/dhp-graph-mapper/pom.xml
+++ b/dhp-workflows/dhp-graph-mapper/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml
index 2466ca8e21..05ca7d4cec 100644
--- a/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml
+++ b/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-graph-provision/pom.xml b/dhp-workflows/dhp-graph-provision/pom.xml
index b0aec1e5da..fa19647730 100644
--- a/dhp-workflows/dhp-graph-provision/pom.xml
+++ b/dhp-workflows/dhp-graph-provision/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-stats-update/pom.xml b/dhp-workflows/dhp-stats-update/pom.xml
index 397bd8d08b..52f35ff07a 100644
--- a/dhp-workflows/dhp-stats-update/pom.xml
+++ b/dhp-workflows/dhp-stats-update/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
4.0.0
dhp-stats-update
diff --git a/dhp-workflows/dhp-worfklow-profiles/pom.xml b/dhp-workflows/dhp-worfklow-profiles/pom.xml
index e03362034b..34996a0218 100644
--- a/dhp-workflows/dhp-worfklow-profiles/pom.xml
+++ b/dhp-workflows/dhp-worfklow-profiles/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/provision.xml b/dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/graph_construction.xml
similarity index 99%
rename from dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/provision.xml
rename to dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/graph_construction.xml
index 28cbde70de..819b3e12d1 100644
--- a/dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/provision.xml
+++ b/dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/graph_construction.xml
@@ -7,7 +7,7 @@
- Data Provision [OCEAN]
+ Graph Construction [OCEAN]
Data Provision
30
diff --git a/dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/graph_to_hiveDB.xml b/dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/graph_to_hiveDB.xml
new file mode 100644
index 0000000000..0ace12ea38
--- /dev/null
+++ b/dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/graph_to_hiveDB.xml
@@ -0,0 +1,73 @@
+
+
+
+ Graph to HiveDB [OCEAN]
+ Data Provision
+ 30
+
+
+ Set the path containing the AGGREGATOR graph
+
+ inputPath
+
+
+
+
+
+
+
+ Set the target path to store the RAW graph
+
+ hiveDbName
+
+
+
+
+
+
+
+
+ wait configurations
+
+
+
+
+
+
+ create the AGGREGATOR graph
+
+ executeOozieJob
+ IIS
+
+ {
+ 'inputPath' : 'inputPath',
+ 'hiveDbName' : 'hiveDbName'
+ }
+
+
+ {
+ 'oozie.wf.application.path' : '/lib/dnet/oa/graph/hive/oozie_app'
+ }
+
+ build-report
+
+
+
+
+
+
+
+
+ wf_20200615_163630_609
+ 2020-06-15T17:08:00+00:00
+ SUCCESS
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/update_solr.xml b/dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/update_solr.xml
new file mode 100644
index 0000000000..8a7738bcfa
--- /dev/null
+++ b/dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/update_solr.xml
@@ -0,0 +1,98 @@
+
+
+
+ Update Solr [OCEAN]
+ Data Provision
+ 30
+
+
+ Set the path containing the AGGREGATOR graph
+
+ inputGraphRootPath
+
+
+
+
+
+
+
+ Set the target path to store the RAW graph
+
+ format
+ TMF
+
+
+
+
+
+
+ Set the lookup address
+
+ isLookupUrl
+ http://beta.services.openaire.eu:8280/is/services/isLookUp?wsdl
+
+
+
+
+
+
+
+ wait configurations
+
+
+
+
+
+
+ create the AGGREGATOR graph
+
+ executeOozieJob
+ IIS
+
+ {
+ 'inputGraphRootPath' : 'inputGraphRootPath',
+ 'isLookupUrl' : 'isLookupUrl',
+ 'format' : 'format'
+ }
+
+
+ {
+ 'oozie.wf.application.path' : '/lib/dnet/oa/provision/oozie_app',
+ 'maxRelations' : '100',
+ 'relPartitions' : '3000',
+ 'batchSize' : '2000',
+ 'relationFilter' : 'isAuthorInstitutionOf,produces,hasAmongTopNSimilarDocuments',
+ 'otherDsTypeId' : 'scholarcomminfra,infospace,pubsrepository::mock,entityregistry,entityregistry::projects,entityregistry::repositories,websource',
+ 'resumeFrom' : 'prepare_relations',
+ 'sparkDriverMemoryForJoining' : '3G',
+ 'sparkExecutorMemoryForJoining' : '7G',
+ 'sparkExecutorCoresForJoining' : '4',
+ 'sparkDriverMemoryForIndexing' : '2G',
+ 'sparkExecutorMemoryForIndexing' : '2G',
+ 'sparkExecutorCoresForIndexing' : '64',
+ 'sparkNetworkTimeout' : '600',
+ 'workingDir' : '/tmp/beta_provision/working_dir/update_solr'
+ }
+
+ build-report
+
+
+
+
+
+
+
+
+ wf_20200615_163630_609
+ 2020-06-15T17:08:00+00:00
+ SUCCESS
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/update_stats.xml b/dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/update_stats.xml
new file mode 100644
index 0000000000..a91b6302e1
--- /dev/null
+++ b/dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/update_stats.xml
@@ -0,0 +1,74 @@
+
+
+
+ Update Stats [OCEAN]
+ Data Provision
+ 30
+
+
+ Set the path containing the AGGREGATOR graph
+
+ openaire_db_name
+
+
+
+
+
+
+
+ Set the target path to store the RAW graph
+
+ stats_db_name
+
+
+
+
+
+
+
+
+ wait configurations
+
+
+
+
+
+
+ create the AGGREGATOR graph
+
+ executeOozieJob
+ IIS
+
+ {
+ 'openaire_db_name' : 'openaire_db_name',
+ 'stats_db_name' : 'stats_db_name'
+ }
+
+
+ {
+ 'oozie.wf.application.path' : '/lib/dnet/oa/graph/stats/oozie_app',
+ 'hive_timeout' : '3000'
+ }
+
+ build-report
+
+
+
+
+
+
+
+
+ wf_20200615_163630_609
+ 2020-06-15T17:08:00+00:00
+ SUCCESS
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml
index 8d8d57c848..9fbc6d714b 100644
--- a/dhp-workflows/pom.xml
+++ b/dhp-workflows/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
../
diff --git a/pom.xml b/pom.xml
index 06e2b7aafa..89b7e8829c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
4.0.0
eu.dnetlib.dhp
dhp
- 1.2.3-SNAPSHOT
+ 1.2.4-SNAPSHOT
pom