diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml
index eddc042c6b..f943ac93aa 100644
--- a/dhp-workflows/dhp-broker-events/pom.xml
+++ b/dhp-workflows/dhp-broker-events/pom.xml
@@ -24,11 +24,7 @@
org.apache.spark
spark-sql_2.11
-
- org.apache.spark
- spark-hive_2.11
- test
-
+
eu.dnetlib.dhp
@@ -45,10 +41,6 @@
dnet-pace-core
-
- com.jayway.jsonpath
- json-path
-
dom4j
dom4j
@@ -61,7 +53,7 @@
eu.dnetlib
dnet-openaire-broker-common
- [3.0.1,4.0.0)
+ [3.0.2,4.0.0)
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
index c1f12f43c5..62171ac611 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
@@ -31,12 +31,9 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAg
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
import eu.dnetlib.dhp.common.HdfsSupport;
-import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
-import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
-import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
@@ -75,25 +72,38 @@ public class GenerateEventsApplication {
log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
final SparkConf conf = new SparkConf();
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- conf.registerKryoClasses(BrokerConstants.getModelClasses());
+ // conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ // conf.registerKryoClasses(BrokerConstants.getModelClasses());
- final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
+ // TODO UNCOMMENT
+ // final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
+ final DedupConfig dedupConfig = null;
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
removeOutputDir(spark, eventsPath);
- spark
- .emptyDataset(Encoders.kryo(Event.class))
- .union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
- .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
- .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
- .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
+ // TODO REMOVE THIS
+ final Dataset projects = readPath(spark, graphPath + "/project", Project.class);
+ final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class)
+ .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
+ .cache();
+ relatedEntities(projects, rels, RelatedProject.class)
.write()
.mode(SaveMode.Overwrite)
- .option("compression", "gzip")
.json(eventsPath);
+
+ // TODO UNCOMMENT THIS
+ // spark
+ // .emptyDataset(Encoders.bean(Event.class))
+ // .union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
+ // .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
+ // .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
+ // .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
+ // .write()
+ // .mode(SaveMode.Overwrite)
+ // .option("compression", "gzip")
+ // .json(eventsPath);
});
}
@@ -117,45 +127,48 @@ public class GenerateEventsApplication {
.toColumn();
return results
- .joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner")
+ .joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner")
.groupByKey(
(MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING())
.agg(aggr)
- .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
- .filter(ResultGroup::isValid)
+ .map((MapFunction, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class))
+ .filter(rg -> rg.getData().size() > 1)
.map(
(MapFunction) g -> EventFinder.generateEvents(g, dedupConfig),
- Encoders.kryo(EventGroup.class))
- .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class));
+ Encoders.bean(EventGroup.class))
+ .flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class));
}
private static Dataset expandResultsWithRelations(
final SparkSession spark,
final String graphPath,
final Class sourceClass) {
+
final Dataset projects = readPath(spark, graphPath + "/project", Project.class);
- final Dataset datasets = readPath(
- spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
- final Dataset softwares = readPath(spark, graphPath + "/software", Software.class);
- final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class);
+ // final Dataset datasets = readPath(
+ // spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
+ // final Dataset softwares = readPath(spark, graphPath + "/software", Software.class);
+ // final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class);
final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
.cache();
final Dataset r0 = readPath(
- spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class)
+ spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
.filter(r -> r.getDataInfo().getDeletedbyinference())
- .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class));
+ .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class));
+ // TODO UNCOMMENT THIS
final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
- final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class));
- final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class));
- final Dataset r4 = join(
- r3, rels, relatedEntities(publications, rels, RelatedProject.class));
- ;
+ // final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels,
+ // RelatedSoftware.class));
+ // final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels,
+ // RelatedDataset.class));
+ // final Dataset r4 = join(r3, rels, relatedEntities(publications, rels,
+ // RelatedPublication.class));;
- return r4;
+ return r0; // TODO it should be r4
}
private static Dataset