diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index 62171ac611..f15d918c96 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -28,12 +28,17 @@ import eu.dnetlib.dhp.broker.oa.util.EventGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; @@ -84,11 +89,8 @@ public class GenerateEventsApplication { removeOutputDir(spark, eventsPath); // TODO REMOVE THIS - final Dataset projects = readPath(spark, graphPath + "/project", Project.class); - final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) - .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) - .cache(); - relatedEntities(projects, rels, RelatedProject.class) + + relatedProjects(spark, graphPath) .write() .mode(SaveMode.Overwrite) .json(eventsPath); @@ -144,7 +146,6 @@ public class GenerateEventsApplication { final String graphPath, final Class sourceClass) { - final Dataset projects = readPath(spark, graphPath + "/project", Project.class); // final Dataset datasets = readPath( // spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); // final Dataset softwares = readPath(spark, graphPath + "/software", Software.class); @@ -160,25 +161,78 @@ public class GenerateEventsApplication { .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class)); // TODO UNCOMMENT THIS - final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); - // final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, - // RelatedSoftware.class)); - // final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, - // RelatedDataset.class)); - // final Dataset r4 = join(r3, rels, relatedEntities(publications, rels, - // RelatedPublication.class));; + final Dataset r1 = join(r0, rels, relatedProjects(spark, graphPath)); + // final Dataset r2 = join(r1, rels, relatedDataset(spark, graphPath)); + // final Dataset r3 = join(r2, rels, relatedPublications(spark, graphPath)); + // final Dataset r4 = join(r3, rels, relatedSoftwares(spark, graphPath)); - return r0; // TODO it should be r4 + return r1; // TODO it should be r4 } - private static Dataset relatedEntities(final Dataset targets, - final Dataset rels, - final Class clazz) { + private static Dataset relatedProjects(final SparkSession spark, final String graphPath) { + + final Dataset projects = readPath(spark, graphPath + "/project", Project.class); + + final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT)); + return rels - .joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner") + .joinWith(projects, projects.col("id").equalTo(rels.col("target")), "inner") .map( - t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), - Encoders.bean(clazz)); + t -> new RelatedProject( + t._1.getSource(), + t._1.getRelType(), + ConversionUtils.oafProjectToBrokerProject(t._2)), + Encoders.bean(RelatedProject.class)); + } + + private static Dataset relatedDataset(final SparkSession spark, final String graphPath) { + + final Dataset datasets = readPath( + spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); + + final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class); + + return rels + .joinWith(datasets, datasets.col("id").equalTo(rels.col("target")), "inner") + .map( + t -> new RelatedDataset( + t._1.getSource(), + t._1.getRelType(), + ConversionUtils.oafDatasetToBrokerDataset(t._2)), + Encoders.bean(RelatedDataset.class)); + } + + private static Dataset relatedSoftwares(final SparkSession spark, final String graphPath) { + + final Dataset softwares = readPath(spark, graphPath + "/software", Software.class); + + final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class); + + return rels + .joinWith(softwares, softwares.col("id").equalTo(rels.col("target")), "inner") + .map( + t -> new RelatedSoftware( + t._1.getSource(), + t._1.getRelType(), + ConversionUtils.oafSoftwareToBrokerSoftware(t._2)), + Encoders.bean(RelatedSoftware.class)); + } + + private static Dataset relatedPublications(final SparkSession spark, final String graphPath) { + + final Dataset pubs = readPath(spark, graphPath + "/publication", Publication.class); + + final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class); + + return rels + .joinWith(pubs, pubs.col("id").equalTo(rels.col("target")), "inner") + .map( + t -> new RelatedPublication( + t._1.getSource(), + t._1.getRelType(), + ConversionUtils.oafPublicationToBrokerPublication(t._2)), + Encoders.bean(RelatedPublication.class)); } private static Dataset join(final Dataset sources, diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java deleted file mode 100644 index c60d4f1417..0000000000 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java +++ /dev/null @@ -1,34 +0,0 @@ - -package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; - -import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; -import eu.dnetlib.dhp.schema.oaf.Dataset; -import eu.dnetlib.dhp.schema.oaf.Project; -import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.Software; - -public class RelatedEntityFactory { - - @SuppressWarnings("unchecked") - public static RT newRelatedEntity(final String sourceId, - final String relType, - final T target, - final Class clazz) { - - if (clazz == RelatedProject.class) { - return (RT) new RelatedProject(sourceId, relType, - ConversionUtils.oafProjectToBrokerProject((Project) target)); - } else if (clazz == RelatedSoftware.class) { - return (RT) new RelatedSoftware(sourceId, relType, - ConversionUtils.oafSoftwareToBrokerSoftware((Software) target)); - } else if (clazz == RelatedDataset.class) { - return (RT) new RelatedDataset(sourceId, relType, - ConversionUtils.oafDatasetToBrokerDataset((Dataset) target)); - } else if (clazz == RelatedPublication.class) { - return (RT) new RelatedPublication(sourceId, relType, - ConversionUtils.oafPublicationToBrokerPublication((Publication) target)); - } else { - return null; - } - } -}