From 16c7a184359a66adc89d2431e2a653653d091ddc Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 22 Jun 2020 08:51:31 +0200 Subject: [PATCH] refactoring --- dhp-workflows/dhp-broker-events/pom.xml | 2 +- .../dhp/broker/model/EventFactory.java | 6 +- .../broker/oa/GenerateEventsApplication.java | 143 +++--------------- .../broker/oa/GenerateRelatedDatasets.java | 73 +++++++++ .../broker/oa/GenerateRelatedProjects.java | 79 ++++++++++ .../oa/GenerateRelatedPublications.java | 78 ++++++++++ .../broker/oa/GenerateRelatedSoftwares.java | 76 ++++++++++ .../broker/oa/GenerateSimpleEntitities.java | 85 +++++++++++ .../dhp/broker/oa/matchers/UpdateMatcher.java | 16 +- .../AbstractEnrichMissingDataset.java | 13 +- .../relatedProjects/EnrichMissingProject.java | 8 +- .../relatedProjects/EnrichMoreProject.java | 12 +- .../AbstractEnrichMissingPublication.java | 14 +- .../EnrichMissingSoftware.java | 11 +- .../relatedSoftware/EnrichMoreSoftware.java | 14 +- .../simple/EnrichMissingAbstract.java | 4 +- .../simple/EnrichMissingAuthorOrcid.java | 12 +- .../simple/EnrichMissingOpenAccess.java | 14 +- .../oa/matchers/simple/EnrichMissingPid.java | 10 +- .../simple/EnrichMissingPublicationDate.java | 6 +- .../matchers/simple/EnrichMissingSubject.java | 12 +- .../matchers/simple/EnrichMoreOpenAccess.java | 12 +- .../oa/matchers/simple/EnrichMorePid.java | 12 +- .../oa/matchers/simple/EnrichMoreSubject.java | 12 +- .../dhp/broker/oa/util/ClusterUtils.java | 31 ++++ .../dhp/broker/oa/util/ConversionUtils.java | 60 +++++--- .../dhp/broker/oa/util/EventFinder.java | 4 +- .../dhp/broker/oa/util/UpdateInfo.java | 38 ++--- .../aggregators/simple/ResultAggregator.java | 6 +- .../util/aggregators/simple/ResultGroup.java | 8 +- ...java => OaBrokerMainEntityAggregator.java} | 26 ++-- .../aggregators/withRels/RelatedDataset.java | 10 +- .../aggregators/withRels/RelatedProject.java | 10 +- .../withRels/RelatedPublication.java | 11 +- .../aggregators/withRels/RelatedSoftware.java | 10 +- .../dhp/broker/oa/generate_relations.json | 14 ++ .../broker/oa/generate_simple_entities.json | 14 ++ 37 files changed, 669 insertions(+), 297 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedDatasets.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedProjects.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedPublications.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedSoftwares.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateSimpleEntitities.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java rename dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/{OpenaireBrokerResultAggregator.java => OaBrokerMainEntityAggregator.java} (59%) create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_relations.json create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml index f943ac93a..8d7d3b88c 100644 --- a/dhp-workflows/dhp-broker-events/pom.xml +++ b/dhp-workflows/dhp-broker-events/pom.xml @@ -53,7 +53,7 @@ eu.dnetlib dnet-openaire-broker-common - [3.0.2,4.0.0) + [3.0.3,4.0.0) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java index bf4f62d24..6e38f7448 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java @@ -11,7 +11,7 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; public class EventFactory { @@ -49,8 +49,8 @@ public class EventFactory { private static Map createMapFromResult(final UpdateInfo updateInfo) { final Map map = new HashMap<>(); - final OpenaireBrokerResult source = updateInfo.getSource(); - final OpenaireBrokerResult target = updateInfo.getTarget(); + final OaBrokerMainEntity source = updateInfo.getSource(); + final OaBrokerMainEntity target = updateInfo.getTarget(); map.put("target_datasource_id", target.getCollectedFromId()); map.put("target_datasource_name", target.getCollectedFromName()); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index f15d918c9..db5992010 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -18,27 +18,20 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.model.Event; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.EventFinder; import eu.dnetlib.dhp.broker.oa.util.EventGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OaBrokerMainEntityAggregator; import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; @@ -48,8 +41,6 @@ public class GenerateEventsApplication { private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils @@ -86,11 +77,11 @@ public class GenerateEventsApplication { runWithSparkSession(conf, isSparkSessionManaged, spark -> { - removeOutputDir(spark, eventsPath); + ClusterUtils.removeDir(spark, eventsPath); // TODO REMOVE THIS - relatedProjects(spark, graphPath) + expandResultsWithRelations(spark, graphPath, Publication.class) .write() .mode(SaveMode.Overwrite) .json(eventsPath); @@ -110,28 +101,25 @@ public class GenerateEventsApplication { } - private static void removeOutputDir(final SparkSession spark, final String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); - } - private static Dataset generateEvents( final SparkSession spark, final String graphPath, final Class sourceClass, final DedupConfig dedupConfig) { - final Dataset results = expandResultsWithRelations(spark, graphPath, sourceClass); + final Dataset results = expandResultsWithRelations(spark, graphPath, sourceClass); - final Dataset mergedRels = readPath(spark, graphPath + "/relation", Relation.class) + final Dataset mergedRels = ClusterUtils + .readPath(spark, graphPath + "/relation", Relation.class) .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); - final TypedColumn, ResultGroup> aggr = new ResultAggregator() + final TypedColumn, ResultGroup> aggr = new ResultAggregator() .toColumn(); return results .joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner") .groupByKey( - (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) + (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) .agg(aggr) .map((MapFunction, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class)) .filter(rg -> rg.getData().size() > 1) @@ -141,7 +129,7 @@ public class GenerateEventsApplication { .flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class)); } - private static Dataset expandResultsWithRelations( + private static Dataset expandResultsWithRelations( final SparkSession spark, final String graphPath, final Class sourceClass) { @@ -151,116 +139,35 @@ public class GenerateEventsApplication { // final Dataset softwares = readPath(spark, graphPath + "/software", Software.class); // final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class); - final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) - .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) - .cache(); - - final Dataset r0 = readPath( - spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass) - .filter(r -> r.getDataInfo().getDeletedbyinference()) - .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class)); + final Dataset r0 = ClusterUtils + .readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass) + .filter(r -> r.getDataInfo().getDeletedbyinference()) + .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OaBrokerMainEntity.class)); // TODO UNCOMMENT THIS - final Dataset r1 = join(r0, rels, relatedProjects(spark, graphPath)); - // final Dataset r2 = join(r1, rels, relatedDataset(spark, graphPath)); - // final Dataset r3 = join(r2, rels, relatedPublications(spark, graphPath)); - // final Dataset r4 = join(r3, rels, relatedSoftwares(spark, graphPath)); + // final Dataset r1 = join(r0, relatedProjects(spark, graphPath)); + // final Dataset r2 = join(r1, relatedDataset(spark, graphPath)); + // final Dataset r3 = join(r2, relatedPublications(spark, graphPath)); + // final Dataset r4 = join(r3, relatedSoftwares(spark, graphPath)); - return r1; // TODO it should be r4 + return r0; // TODO it should be r4 } - private static Dataset relatedProjects(final SparkSession spark, final String graphPath) { - - final Dataset projects = readPath(spark, graphPath + "/project", Project.class); - - final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) - .filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT)); - - return rels - .joinWith(projects, projects.col("id").equalTo(rels.col("target")), "inner") - .map( - t -> new RelatedProject( - t._1.getSource(), - t._1.getRelType(), - ConversionUtils.oafProjectToBrokerProject(t._2)), - Encoders.bean(RelatedProject.class)); - } - - private static Dataset relatedDataset(final SparkSession spark, final String graphPath) { - - final Dataset datasets = readPath( - spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); - - final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class); - - return rels - .joinWith(datasets, datasets.col("id").equalTo(rels.col("target")), "inner") - .map( - t -> new RelatedDataset( - t._1.getSource(), - t._1.getRelType(), - ConversionUtils.oafDatasetToBrokerDataset(t._2)), - Encoders.bean(RelatedDataset.class)); - } - - private static Dataset relatedSoftwares(final SparkSession spark, final String graphPath) { - - final Dataset softwares = readPath(spark, graphPath + "/software", Software.class); - - final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class); - - return rels - .joinWith(softwares, softwares.col("id").equalTo(rels.col("target")), "inner") - .map( - t -> new RelatedSoftware( - t._1.getSource(), - t._1.getRelType(), - ConversionUtils.oafSoftwareToBrokerSoftware(t._2)), - Encoders.bean(RelatedSoftware.class)); - } - - private static Dataset relatedPublications(final SparkSession spark, final String graphPath) { - - final Dataset pubs = readPath(spark, graphPath + "/publication", Publication.class); - - final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class); - - return rels - .joinWith(pubs, pubs.col("id").equalTo(rels.col("target")), "inner") - .map( - t -> new RelatedPublication( - t._1.getSource(), - t._1.getRelType(), - ConversionUtils.oafPublicationToBrokerPublication(t._2)), - Encoders.bean(RelatedPublication.class)); - } - - private static Dataset join(final Dataset sources, - final Dataset rels, + private static Dataset join(final Dataset sources, final Dataset typedRels) { - final TypedColumn, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator() + final TypedColumn, OaBrokerMainEntity> aggr = new OaBrokerMainEntityAggregator() .toColumn(); return sources - .joinWith(typedRels, sources.col("openaireId").equalTo(rels.col("source")), "left_outer") + .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .groupByKey( - (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) + (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) - .map(t -> t._2, Encoders.bean(OpenaireBrokerResult.class)); + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); } - public static Dataset readPath( - final SparkSession spark, - final String inputPath, - final Class clazz) { - return spark - .read() - .textFile(inputPath) - .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); - } - private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception { final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedDatasets.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedDatasets.java new file mode 100644 index 000000000..4a10fbabf --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedDatasets.java @@ -0,0 +1,73 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset; +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class GenerateRelatedDatasets { + + private static final Logger log = LoggerFactory.getLogger(GenerateRelatedDatasets.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + GenerateRelatedDatasets.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String graphPath = parser.get("graphPath"); + log.info("graphPath: {}", graphPath); + + final String relsPath = parser.get("relsPath"); + log.info("relsPath: {}", relsPath); + + final SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils.removeDir(spark, relsPath); + + final Dataset datasets = ClusterUtils + .readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); + + final Dataset rels = ClusterUtils.readPath(spark, graphPath + "/relation", Relation.class); + + rels + .joinWith(datasets, datasets.col("id").equalTo(rels.col("target")), "inner") + .map( + t -> new RelatedDataset( + t._1.getSource(), + t._1.getRelType(), + ConversionUtils.oafDatasetToBrokerDataset(t._2)), + Encoders.bean(RelatedDataset.class)) + .write() + .mode(SaveMode.Overwrite) + .json(relsPath); + + }); + + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedProjects.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedProjects.java new file mode 100644 index 000000000..59ed388e7 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedProjects.java @@ -0,0 +1,79 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class GenerateRelatedProjects { + + private static final Logger log = LoggerFactory.getLogger(GenerateRelatedProjects.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + GenerateRelatedProjects.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String graphPath = parser.get("graphPath"); + log.info("graphPath: {}", graphPath); + + final String relsPath = parser.get("relsPath"); + log.info("relsPath: {}", relsPath); + + final SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils.removeDir(spark, relsPath); + + final Dataset projects = ClusterUtils.readPath(spark, graphPath + "/project", Project.class); + + final Dataset rels = ClusterUtils + .readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT)); + + rels + .joinWith(projects, projects.col("id").equalTo(rels.col("target")), "inner") + .map( + t -> new RelatedProject( + t._1.getSource(), + t._1.getRelType(), + ConversionUtils.oafProjectToBrokerProject(t._2)), + Encoders.bean(RelatedProject.class)) + .write() + .mode(SaveMode.Overwrite) + .json(relsPath); + }); + + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedPublications.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedPublications.java new file mode 100644 index 000000000..0c20081dc --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedPublications.java @@ -0,0 +1,78 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class GenerateRelatedPublications { + + private static final Logger log = LoggerFactory.getLogger(GenerateRelatedPublications.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + GenerateRelatedPublications.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String graphPath = parser.get("graphPath"); + log.info("graphPath: {}", graphPath); + + final String relsPath = parser.get("relsPath"); + log.info("relsPath: {}", relsPath); + + final SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils.removeDir(spark, relsPath); + + final Dataset pubs = ClusterUtils + .readPath(spark, graphPath + "/publication", Publication.class); + + final Dataset rels = ClusterUtils.readPath(spark, graphPath + "/relation", Relation.class); + + rels + .joinWith(pubs, pubs.col("id").equalTo(rels.col("target")), "inner") + .map( + t -> new RelatedPublication( + t._1.getSource(), + t._1.getRelType(), + ConversionUtils.oafPublicationToBrokerPublication(t._2)), + Encoders.bean(RelatedPublication.class)) + .write() + .mode(SaveMode.Overwrite) + .json(relsPath); + + }); + + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedSoftwares.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedSoftwares.java new file mode 100644 index 000000000..b95788846 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedSoftwares.java @@ -0,0 +1,76 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Software; + +public class GenerateRelatedSoftwares { + + private static final Logger log = LoggerFactory.getLogger(GenerateRelatedSoftwares.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + GenerateRelatedSoftwares.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String graphPath = parser.get("graphPath"); + log.info("graphPath: {}", graphPath); + + final String relsPath = parser.get("relsPath"); + log.info("relsPath: {}", relsPath); + + final SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils.removeDir(spark, relsPath); + final Dataset softwares = ClusterUtils.readPath(spark, graphPath + "/software", Software.class); + + final Dataset rels = ClusterUtils.readPath(spark, graphPath + "/relation", Relation.class); + + rels + .joinWith(softwares, softwares.col("id").equalTo(rels.col("target")), "inner") + .map( + t -> new RelatedSoftware( + t._1.getSource(), + t._1.getRelType(), + ConversionUtils.oafSoftwareToBrokerSoftware(t._2)), + Encoders.bean(RelatedSoftware.class)) + .write() + .mode(SaveMode.Overwrite) + .json(relsPath); + + }); + + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateSimpleEntitities.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateSimpleEntitities.java new file mode 100644 index 000000000..59485d5cf --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateSimpleEntitities.java @@ -0,0 +1,85 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Result; + +public class GenerateSimpleEntitities { + + private static final Logger log = LoggerFactory.getLogger(GenerateSimpleEntitities.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + GenerateSimpleEntitities.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String graphPath = parser.get("graphPath"); + log.info("graphPath: {}", graphPath); + + final String simpleEntitiesPath = parser.get("simpleEntitiesPath"); + log.info("simpleEntitiesPath: {}", simpleEntitiesPath); + + final SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils.removeDir(spark, simpleEntitiesPath); + + expandResultsWithRelations(spark, graphPath, Publication.class) + .write() + .mode(SaveMode.Overwrite) + .json(simpleEntitiesPath); + + // TODO UNCOMMENT THIS + // spark + // .emptyDataset(Encoders.bean(Event.class)) + // .union(generateEvents(spark, graphPath, Publication.class, dedupConfig)) + // .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig)) + // .union(generateEvents(spark, graphPath, Software.class, dedupConfig)) + // .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig)) + // .write() + // .mode(SaveMode.Overwrite) + // .option("compression", "gzip") + // .json(eventsPath); + }); + + } + + private static Dataset expandResultsWithRelations( + final SparkSession spark, + final String graphPath, + final Class sourceClass) { + + return ClusterUtils + .readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass) + .filter(r -> r.getDataInfo().getDeletedbyinference()) + .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OaBrokerMainEntity.class)); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java index 13aeefb2f..9aa6f5384 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java @@ -12,7 +12,7 @@ import java.util.function.Function; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.pace.config.DedupConfig; @@ -21,11 +21,11 @@ public abstract class UpdateMatcher { private final boolean multipleUpdate; private final Function topicFunction; - private final BiConsumer compileHighlightFunction; + private final BiConsumer compileHighlightFunction; private final Function highlightToStringFunction; public UpdateMatcher(final boolean multipleUpdate, final Function topicFunction, - final BiConsumer compileHighlightFunction, + final BiConsumer compileHighlightFunction, final Function highlightToStringFunction) { this.multipleUpdate = multipleUpdate; this.topicFunction = topicFunction; @@ -33,13 +33,13 @@ public abstract class UpdateMatcher { this.highlightToStringFunction = highlightToStringFunction; } - public Collection> searchUpdatesForRecord(final OpenaireBrokerResult res, - final Collection others, + public Collection> searchUpdatesForRecord(final OaBrokerMainEntity res, + final Collection others, final DedupConfig dedupConfig) { final Map> infoMap = new HashMap<>(); - for (final OpenaireBrokerResult source : others) { + for (final OaBrokerMainEntity source : others) { if (source != res) { for (final T hl : findDifferences(source, res)) { final Topic topic = getTopicFunction().apply(hl); @@ -68,7 +68,7 @@ public abstract class UpdateMatcher { } } - protected abstract List findDifferences(OpenaireBrokerResult source, OpenaireBrokerResult target); + protected abstract List findDifferences(OaBrokerMainEntity source, OaBrokerMainEntity target); protected static boolean isMissing(final List list) { return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0)); @@ -86,7 +86,7 @@ public abstract class UpdateMatcher { return topicFunction; } - public BiConsumer getCompileHighlightFunction() { + public BiConsumer getCompileHighlightFunction() { return compileHighlightFunction; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java index 7a58f986b..c197734a3 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java @@ -5,13 +5,12 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import eu.dnetlib.broker.objects.Dataset; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.broker.objects.OaBrokerRelatedDataset; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -public abstract class AbstractEnrichMissingDataset - extends UpdateMatcher { +public abstract class AbstractEnrichMissingDataset extends UpdateMatcher { public AbstractEnrichMissingDataset(final Topic topic) { super(true, @@ -23,14 +22,14 @@ public abstract class AbstractEnrichMissingDataset protected abstract boolean filterByType(String relType); @Override - protected final List findDifferences(final OpenaireBrokerResult source, - final OpenaireBrokerResult target) { + protected final List findDifferences(final OaBrokerMainEntity source, + final OaBrokerMainEntity target) { final Set existingDatasets = target .getDatasets() .stream() .filter(rel -> filterByType(rel.getRelType())) - .map(Dataset::getOriginalId) + .map(OaBrokerRelatedDataset::getOriginalId) .collect(Collectors.toSet()); return source diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java index fa5fde725..49c546bba 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java @@ -4,12 +4,12 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects; import java.util.ArrayList; import java.util.List; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; -import eu.dnetlib.broker.objects.Project; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.broker.objects.OaBrokerProject; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -public class EnrichMissingProject extends UpdateMatcher { +public class EnrichMissingProject extends UpdateMatcher { public EnrichMissingProject() { super(true, @@ -19,7 +19,7 @@ public class EnrichMissingProject extends UpdateMatcher { } @Override - protected List findDifferences(final OpenaireBrokerResult source, final OpenaireBrokerResult target) { + protected List findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) { if (target.getProjects().isEmpty()) { return source.getProjects(); } else { diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java index ca63aeb49..6954a3fb5 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java @@ -5,12 +5,12 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; -import eu.dnetlib.broker.objects.Project; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.broker.objects.OaBrokerProject; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -public class EnrichMoreProject extends UpdateMatcher { +public class EnrichMoreProject extends UpdateMatcher { public EnrichMoreProject() { super(true, @@ -19,13 +19,13 @@ public class EnrichMoreProject extends UpdateMatcher { prj -> projectAsString(prj)); } - private static String projectAsString(final Project prj) { + private static String projectAsString(final OaBrokerProject prj) { return prj.getFunder() + "::" + prj.getFundingProgram() + "::" + prj.getCode(); } @Override - protected List findDifferences(final OpenaireBrokerResult source, - final OpenaireBrokerResult target) { + protected List findDifferences(final OaBrokerMainEntity source, + final OaBrokerMainEntity target) { final Set existingProjects = target .getProjects() diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java index 300863949..ad6d8263b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java @@ -5,12 +5,12 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; -import eu.dnetlib.broker.objects.Publication; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.broker.objects.OaBrokerRelatedPublication; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -public abstract class AbstractEnrichMissingPublication extends UpdateMatcher { +public abstract class AbstractEnrichMissingPublication extends UpdateMatcher { public AbstractEnrichMissingPublication(final Topic topic) { super(true, @@ -23,15 +23,15 @@ public abstract class AbstractEnrichMissingPublication extends UpdateMatcher findDifferences( - final OpenaireBrokerResult source, - final OpenaireBrokerResult target) { + protected final List findDifferences( + final OaBrokerMainEntity source, + final OaBrokerMainEntity target) { final Set existingPublications = target .getPublications() .stream() .filter(rel -> filterByType(rel.getRelType())) - .map(Publication::getOriginalId) + .map(OaBrokerRelatedPublication::getOriginalId) .collect(Collectors.toSet()); return source diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java index 76ae061e6..452caa503 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java @@ -4,12 +4,13 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware; import java.util.ArrayList; import java.util.List; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; public class EnrichMissingSoftware - extends UpdateMatcher { + extends UpdateMatcher { public EnrichMissingSoftware() { super(true, @@ -19,9 +20,9 @@ public class EnrichMissingSoftware } @Override - protected List findDifferences( - final OpenaireBrokerResult source, - final OpenaireBrokerResult target) { + protected List findDifferences( + final OaBrokerMainEntity source, + final OaBrokerMainEntity target) { if (target.getSoftwares().isEmpty()) { return source.getSoftwares(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java index ebd421b8e..aaffe1249 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java @@ -5,12 +5,12 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; -import eu.dnetlib.broker.objects.Software; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -public class EnrichMoreSoftware extends UpdateMatcher { +public class EnrichMoreSoftware extends UpdateMatcher { public EnrichMoreSoftware() { super(true, @@ -20,14 +20,14 @@ public class EnrichMoreSoftware extends UpdateMatcher { } @Override - protected List findDifferences( - final OpenaireBrokerResult source, - final OpenaireBrokerResult target) { + protected List findDifferences( + final OaBrokerMainEntity source, + final OaBrokerMainEntity target) { final Set existingSoftwares = source .getSoftwares() .stream() - .map(Software::getName) + .map(OaBrokerRelatedSoftware::getName) .collect(Collectors.toSet()); return target diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java index b2cbbce2c..73462bae8 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java @@ -5,7 +5,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; @@ -19,7 +19,7 @@ public class EnrichMissingAbstract extends UpdateMatcher { } @Override - protected List findDifferences(final OpenaireBrokerResult source, final OpenaireBrokerResult target) { + protected List findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) { if (isMissing(target.getAbstracts()) && !isMissing(source.getAbstracts())) { return Arrays.asList(source.getAbstracts().get(0)); } else { diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java index c4b96e67b..2a01188a9 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java @@ -7,12 +7,12 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; -import eu.dnetlib.broker.objects.Author; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.OaBrokerAuthor; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -public class EnrichMissingAuthorOrcid extends UpdateMatcher { +public class EnrichMissingAuthorOrcid extends UpdateMatcher { public EnrichMissingAuthorOrcid() { super(true, @@ -22,13 +22,13 @@ public class EnrichMissingAuthorOrcid extends UpdateMatcher { } @Override - protected List findDifferences(final OpenaireBrokerResult source, - final OpenaireBrokerResult target) { + protected List findDifferences(final OaBrokerMainEntity source, + final OaBrokerMainEntity target) { final Set existingOrcids = target .getCreators() .stream() - .map(Author::getOrcid) + .map(OaBrokerAuthor::getOrcid) .filter(StringUtils::isNotBlank) .collect(Collectors.toSet()); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java index e870cf1fa..487382957 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java @@ -5,28 +5,28 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; -import eu.dnetlib.broker.objects.Instance; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.OaBrokerInstance; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; -public class EnrichMissingOpenAccess extends UpdateMatcher { +public class EnrichMissingOpenAccess extends UpdateMatcher { public EnrichMissingOpenAccess() { super(true, i -> Topic.ENRICH_MISSING_OA_VERSION, (p, i) -> p.getInstances().add(i), - Instance::getUrl); + OaBrokerInstance::getUrl); } @Override - protected List findDifferences(final OpenaireBrokerResult source, - final OpenaireBrokerResult target) { + protected List findDifferences(final OaBrokerMainEntity source, + final OaBrokerMainEntity target) { final long count = target .getInstances() .stream() - .map(Instance::getLicense) + .map(OaBrokerInstance::getLicense) .filter(right -> right.equals(BrokerConstants.OPEN_ACCESS)) .count(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java index cc72d9fa9..ee1617b1e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java @@ -5,12 +5,12 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; -import eu.dnetlib.broker.objects.TypedValue; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.broker.objects.OaBrokerTypedValue; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -public class EnrichMissingPid extends UpdateMatcher { +public class EnrichMissingPid extends UpdateMatcher { public EnrichMissingPid() { super(true, @@ -20,8 +20,8 @@ public class EnrichMissingPid extends UpdateMatcher { } @Override - protected List findDifferences(final OpenaireBrokerResult source, - final OpenaireBrokerResult target) { + protected List findDifferences(final OaBrokerMainEntity source, + final OaBrokerMainEntity target) { final long count = target.getPids().size(); if (count > 0) { diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java index ed8c26b5a..2c0533fa3 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java @@ -5,7 +5,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; @@ -19,8 +19,8 @@ public class EnrichMissingPublicationDate extends UpdateMatcher { } @Override - protected List findDifferences(final OpenaireBrokerResult source, - final OpenaireBrokerResult target) { + protected List findDifferences(final OaBrokerMainEntity source, + final OaBrokerMainEntity target) { if (isMissing(target.getPublicationdate()) && !isMissing(source.getPublicationdate())) { return Arrays.asList(source.getPublicationdate()); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java index 07b1fa41a..9ab9fce48 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java @@ -5,12 +5,12 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; -import eu.dnetlib.broker.objects.TypedValue; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.broker.objects.OaBrokerTypedValue; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -public class EnrichMissingSubject extends UpdateMatcher { +public class EnrichMissingSubject extends UpdateMatcher { public EnrichMissingSubject() { super(true, @@ -20,8 +20,8 @@ public class EnrichMissingSubject extends UpdateMatcher { } @Override - protected List findDifferences(final OpenaireBrokerResult source, - final OpenaireBrokerResult target) { + protected List findDifferences(final OaBrokerMainEntity source, + final OaBrokerMainEntity target) { final Set existingSubject = target .getSubjects() .stream() @@ -35,7 +35,7 @@ public class EnrichMissingSubject extends UpdateMatcher { .collect(Collectors.toList()); } - private static String subjectAsString(final TypedValue s) { + private static String subjectAsString(final OaBrokerTypedValue s) { return s.getType() + "::" + s.getValue(); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java index bfef3ee4f..e90a8f201 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java @@ -5,24 +5,24 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import eu.dnetlib.broker.objects.Instance; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.OaBrokerInstance; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; -public class EnrichMoreOpenAccess extends UpdateMatcher { +public class EnrichMoreOpenAccess extends UpdateMatcher { public EnrichMoreOpenAccess() { super(true, i -> Topic.ENRICH_MORE_OA_VERSION, (p, i) -> p.getInstances().add(i), - Instance::getUrl); + OaBrokerInstance::getUrl); } @Override - protected List findDifferences(final OpenaireBrokerResult source, - final OpenaireBrokerResult target) { + protected List findDifferences(final OaBrokerMainEntity source, + final OaBrokerMainEntity target) { final Set urls = target .getInstances() .stream() diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java index d1f2e6022..43b4f0628 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java @@ -5,12 +5,12 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; -import eu.dnetlib.broker.objects.TypedValue; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.broker.objects.OaBrokerTypedValue; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -public class EnrichMorePid extends UpdateMatcher { +public class EnrichMorePid extends UpdateMatcher { public EnrichMorePid() { super(true, @@ -20,8 +20,8 @@ public class EnrichMorePid extends UpdateMatcher { } @Override - protected List findDifferences(final OpenaireBrokerResult source, - final OpenaireBrokerResult target) { + protected List findDifferences(final OaBrokerMainEntity source, + final OaBrokerMainEntity target) { final Set existingPids = target .getPids() .stream() @@ -35,7 +35,7 @@ public class EnrichMorePid extends UpdateMatcher { .collect(Collectors.toList()); } - private static String pidAsString(final TypedValue pid) { + private static String pidAsString(final OaBrokerTypedValue pid) { return pid.getType() + "::" + pid.getValue(); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java index 39225e8ab..04fb494ef 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java @@ -5,12 +5,12 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; -import eu.dnetlib.broker.objects.TypedValue; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.broker.objects.OaBrokerTypedValue; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -public class EnrichMoreSubject extends UpdateMatcher { +public class EnrichMoreSubject extends UpdateMatcher { public EnrichMoreSubject() { super(true, @@ -20,8 +20,8 @@ public class EnrichMoreSubject extends UpdateMatcher { } @Override - protected List findDifferences(final OpenaireBrokerResult source, - final OpenaireBrokerResult target) { + protected List findDifferences(final OaBrokerMainEntity source, + final OaBrokerMainEntity target) { final Set existingSubjects = target .getSubjects() .stream() @@ -35,7 +35,7 @@ public class EnrichMoreSubject extends UpdateMatcher { .collect(Collectors.toList()); } - private static String subjectAsString(final TypedValue s) { + private static String subjectAsString(final OaBrokerTypedValue s) { return s.getType() + "::" + s.getValue(); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java new file mode 100644 index 000000000..8bcea5e6e --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java @@ -0,0 +1,31 @@ + +package eu.dnetlib.dhp.broker.oa.util; + +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.common.HdfsSupport; + +public class ClusterUtils { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void removeDir(final SparkSession spark, final String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + + public static Dataset readPath( + final SparkSession spark, + final String inputPath, + final Class clazz) { + return spark + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index 730d06519..b61d5e7cc 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -15,8 +15,16 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Function; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; -import eu.dnetlib.broker.objects.TypedValue; +import eu.dnetlib.broker.objects.OaBrokerAuthor; +import eu.dnetlib.broker.objects.OaBrokerExternalReference; +import eu.dnetlib.broker.objects.OaBrokerInstance; +import eu.dnetlib.broker.objects.OaBrokerJournal; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.broker.objects.OaBrokerProject; +import eu.dnetlib.broker.objects.OaBrokerRelatedDataset; +import eu.dnetlib.broker.objects.OaBrokerRelatedPublication; +import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware; +import eu.dnetlib.broker.objects.OaBrokerTypedValue; import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.ExternalReference; @@ -35,13 +43,13 @@ public class ConversionUtils { private static final Logger log = LoggerFactory.getLogger(ConversionUtils.class); - public static List oafInstanceToBrokerInstances(final Instance i) { + public static List oafInstanceToBrokerInstances(final Instance i) { if (i == null) { return new ArrayList<>(); } return mappedList(i.getUrl(), url -> { - final eu.dnetlib.broker.objects.Instance res = new eu.dnetlib.broker.objects.Instance(); + final OaBrokerInstance res = new OaBrokerInstance(); res.setUrl(url); res.setInstancetype(classId(i.getInstancetype())); res.setLicense(BrokerConstants.OPEN_ACCESS); @@ -50,20 +58,21 @@ public class ConversionUtils { }); } - public static TypedValue oafPidToBrokerPid(final StructuredProperty sp) { + public static OaBrokerTypedValue oafPidToBrokerPid(final StructuredProperty sp) { return oafStructPropToBrokerTypedValue(sp); } - public static TypedValue oafStructPropToBrokerTypedValue(final StructuredProperty sp) { - return sp != null ? new TypedValue(classId(sp.getQualifier()), sp.getValue()) : null; + public static OaBrokerTypedValue oafStructPropToBrokerTypedValue(final StructuredProperty sp) { + return sp != null ? new OaBrokerTypedValue(classId(sp.getQualifier()), sp.getValue()) : null; } - public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) { + public static final OaBrokerRelatedDataset oafDatasetToBrokerDataset(final Dataset d) { if (d == null) { return null; } - final eu.dnetlib.broker.objects.Dataset res = new eu.dnetlib.broker.objects.Dataset(); + final OaBrokerRelatedDataset res = new OaBrokerRelatedDataset(); + res.setOpenaireId(d.getId()); res.setOriginalId(first(d.getOriginalId())); res.setTitle(structPropValue(d.getTitle())); res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid)); @@ -72,12 +81,13 @@ public class ConversionUtils { return res; } - public static eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication p) { + public static OaBrokerRelatedPublication oafPublicationToBrokerPublication(final Publication p) { if (p == null) { return null; } - final eu.dnetlib.broker.objects.Publication res = new eu.dnetlib.broker.objects.Publication(); + final OaBrokerRelatedPublication res = new OaBrokerRelatedPublication(); + res.setOpenaireId(p.getId()); res.setOriginalId(first(p.getOriginalId())); res.setTitle(structPropValue(p.getTitle())); res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid)); @@ -87,12 +97,12 @@ public class ConversionUtils { return res; } - public static final OpenaireBrokerResult oafResultToBrokerResult(final Result result) { + public static final OaBrokerMainEntity oafResultToBrokerResult(final Result result) { if (result == null) { return null; } - final OpenaireBrokerResult res = new OpenaireBrokerResult(); + final OaBrokerMainEntity res = new OaBrokerMainEntity(); res.setOpenaireId(result.getId()); res.setOriginalId(first(result.getOriginalId())); @@ -118,7 +128,7 @@ public class ConversionUtils { return res; } - private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) { + private static OaBrokerAuthor oafAuthorToBrokerAuthor(final Author author) { if (author == null) { return null; } @@ -135,15 +145,15 @@ public class ConversionUtils { .findFirst() .orElse(null) : null; - return new eu.dnetlib.broker.objects.Author(author.getFullname(), pids); + return new OaBrokerAuthor(author.getFullname(), pids); } - private static eu.dnetlib.broker.objects.Journal oafJournalToBrokerJournal(final Journal journal) { + private static OaBrokerJournal oafJournalToBrokerJournal(final Journal journal) { if (journal == null) { return null; } - final eu.dnetlib.broker.objects.Journal res = new eu.dnetlib.broker.objects.Journal(); + final OaBrokerJournal res = new OaBrokerJournal(); res.setName(journal.getName()); res.setIssn(journal.getIssnPrinted()); res.setEissn(journal.getIssnOnline()); @@ -152,12 +162,12 @@ public class ConversionUtils { return res; } - private static eu.dnetlib.broker.objects.ExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) { + private static OaBrokerExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) { if (ref == null) { return null; } - final eu.dnetlib.broker.objects.ExternalReference res = new eu.dnetlib.broker.objects.ExternalReference(); + final OaBrokerExternalReference res = new OaBrokerExternalReference(); res.setRefidentifier(ref.getRefidentifier()); res.setSitename(ref.getSitename()); res.setType(classId(ref.getQualifier())); @@ -165,12 +175,13 @@ public class ConversionUtils { return res; } - public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) { + public static final OaBrokerProject oafProjectToBrokerProject(final Project p) { if (p == null) { return null; } - final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project(); + final OaBrokerProject res = new OaBrokerProject(); + res.setOpenaireId(p.getId()); res.setTitle(fieldValue(p.getTitle())); res.setAcronym(fieldValue(p.getAcronym())); res.setCode(fieldValue(p.getCode())); @@ -190,12 +201,13 @@ public class ConversionUtils { return res; } - public static final eu.dnetlib.broker.objects.Software oafSoftwareToBrokerSoftware(final Software sw) { + public static final OaBrokerRelatedSoftware oafSoftwareToBrokerSoftware(final Software sw) { if (sw == null) { return null; } - final eu.dnetlib.broker.objects.Software res = new eu.dnetlib.broker.objects.Software(); + final OaBrokerRelatedSoftware res = new OaBrokerRelatedSoftware(); + res.setOpenaireId(sw.getId()); res.setName(structPropValue(sw.getTitle())); res.setDescription(fieldValue(sw.getDescription())); res.setRepository(fieldValue(sw.getCodeRepositoryUrl())); @@ -247,7 +259,7 @@ public class ConversionUtils { : new ArrayList<>(); } - private static List structPropTypedList(final List list) { + private static List structPropTypedList(final List list) { if (list == null) { return new ArrayList<>(); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java index 4c20ac5ca..7451e5891 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java @@ -4,7 +4,7 @@ package eu.dnetlib.dhp.broker.oa.util; import java.util.ArrayList; import java.util.List; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.model.EventFactory; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy; @@ -68,7 +68,7 @@ public class EventFinder { public static EventGroup generateEvents(final ResultGroup results, final DedupConfig dedupConfig) { final List> list = new ArrayList<>(); - for (final OpenaireBrokerResult target : results.getData()) { + for (final OaBrokerMainEntity target : results.getData()) { for (final UpdateMatcher matcher : matchers) { list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig)); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java index 2c4bda53d..25d0d2bca 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java @@ -9,10 +9,10 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.broker.objects.Instance; -import eu.dnetlib.broker.objects.OpenAireEventPayload; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; -import eu.dnetlib.broker.objects.Provenance; +import eu.dnetlib.broker.objects.OaBrokerEventPayload; +import eu.dnetlib.broker.objects.OaBrokerInstance; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.broker.objects.OaBrokerProvenance; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.model.MapDocument; @@ -25,11 +25,11 @@ public final class UpdateInfo { private final T highlightValue; - private final OpenaireBrokerResult source; + private final OaBrokerMainEntity source; - private final OpenaireBrokerResult target; + private final OaBrokerMainEntity target; - private final BiConsumer compileHighlight; + private final BiConsumer compileHighlight; private final Function highlightToString; @@ -37,9 +37,9 @@ public final class UpdateInfo { private static final Logger log = LoggerFactory.getLogger(UpdateInfo.class); - public UpdateInfo(final Topic topic, final T highlightValue, final OpenaireBrokerResult source, - final OpenaireBrokerResult target, - final BiConsumer compileHighlight, + public UpdateInfo(final Topic topic, final T highlightValue, final OaBrokerMainEntity source, + final OaBrokerMainEntity target, + final BiConsumer compileHighlight, final Function highlightToString, final DedupConfig dedupConfig) { this.topic = topic; @@ -55,17 +55,17 @@ public final class UpdateInfo { return highlightValue; } - public OpenaireBrokerResult getSource() { + public OaBrokerMainEntity getSource() { return source; } - public OpenaireBrokerResult getTarget() { + public OaBrokerMainEntity getTarget() { return target; } private float calculateTrust(final DedupConfig dedupConfig, - final OpenaireBrokerResult r1, - final OpenaireBrokerResult r2) { + final OaBrokerMainEntity r1, + final OaBrokerMainEntity r2) { if (dedupConfig == null) { return BrokerConstants.MIN_TRUST; @@ -104,11 +104,11 @@ public final class UpdateInfo { return highlightToString.apply(getHighlightValue()); } - public OpenAireEventPayload asBrokerPayload() { + public OaBrokerEventPayload asBrokerPayload() { compileHighlight.accept(target, getHighlightValue()); - final OpenaireBrokerResult hl = new OpenaireBrokerResult(); + final OaBrokerMainEntity hl = new OaBrokerMainEntity(); compileHighlight.accept(hl, getHighlightValue()); final String provId = getSource().getOriginalId(); @@ -117,14 +117,14 @@ public final class UpdateInfo { final String provUrl = getSource() .getInstances() .stream() - .map(Instance::getUrl) + .map(OaBrokerInstance::getUrl) .findFirst() .orElse(null); ; - final Provenance provenance = new Provenance(provId, provRepo, provUrl); + final OaBrokerProvenance provenance = new OaBrokerProvenance(provId, provRepo, provUrl); - final OpenAireEventPayload res = new OpenAireEventPayload(); + final OaBrokerEventPayload res = new OaBrokerEventPayload(); res.setResult(target); res.setHighlight(hl); res.setTrust(trust); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java index a46fde445..ee1c8963e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java @@ -5,11 +5,11 @@ import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.expressions.Aggregator; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.schema.oaf.Relation; import scala.Tuple2; -public class ResultAggregator extends Aggregator, ResultGroup, ResultGroup> { +public class ResultAggregator extends Aggregator, ResultGroup, ResultGroup> { /** * @@ -22,7 +22,7 @@ public class ResultAggregator extends Aggregator t) { + public ResultGroup reduce(final ResultGroup group, final Tuple2 t) { group.getData().add(t._1); return group; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java index 3f9dbe8af..e718e0f1c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java @@ -5,7 +5,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; public class ResultGroup implements Serializable { @@ -14,13 +14,13 @@ public class ResultGroup implements Serializable { */ private static final long serialVersionUID = -3360828477088669296L; - private List data = new ArrayList<>(); + private List data = new ArrayList<>(); - public List getData() { + public List getData() { return data; } - public void setData(final List data) { + public void setData(final List data) { this.data = data; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OaBrokerMainEntityAggregator.java similarity index 59% rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OaBrokerMainEntityAggregator.java index e72dcb988..6a2d9b06d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OaBrokerMainEntityAggregator.java @@ -5,11 +5,11 @@ import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.expressions.Aggregator; -import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; import scala.Tuple2; -public class OpenaireBrokerResultAggregator - extends Aggregator, OpenaireBrokerResult, OpenaireBrokerResult> { +public class OaBrokerMainEntityAggregator + extends Aggregator, OaBrokerMainEntity, OaBrokerMainEntity> { /** * @@ -17,17 +17,17 @@ public class OpenaireBrokerResultAggregator private static final long serialVersionUID = -3687878788861013488L; @Override - public OpenaireBrokerResult zero() { - return new OpenaireBrokerResult(); + public OaBrokerMainEntity zero() { + return new OaBrokerMainEntity(); } @Override - public OpenaireBrokerResult finish(final OpenaireBrokerResult g) { + public OaBrokerMainEntity finish(final OaBrokerMainEntity g) { return g; } @Override - public OpenaireBrokerResult reduce(final OpenaireBrokerResult g, final Tuple2 t) { + public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2 t) { if (g.getOriginalId() == null) { return t._1; } else if (t._2 instanceof RelatedSoftware) { @@ -38,13 +38,15 @@ public class OpenaireBrokerResultAggregator g.getPublications().add(((RelatedPublication) t._2).getRelPublication()); } else if (t._2 instanceof RelatedProject) { g.getProjects().add(((RelatedProject) t._2).getRelProject()); + } else { + throw new RuntimeException("Invalid Object: " + t._2.getClass()); } return g; } @Override - public OpenaireBrokerResult merge(final OpenaireBrokerResult g1, final OpenaireBrokerResult g2) { + public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { if (g1.getOriginalId() != null) { g1.getSoftwares().addAll(g2.getSoftwares()); g1.getDatasets().addAll(g2.getDatasets()); @@ -57,13 +59,13 @@ public class OpenaireBrokerResultAggregator } @Override - public Encoder bufferEncoder() { - return Encoders.bean(OpenaireBrokerResult.class); + public Encoder bufferEncoder() { + return Encoders.bean(OaBrokerMainEntity.class); } @Override - public Encoder outputEncoder() { - return Encoders.bean(OpenaireBrokerResult.class); + public Encoder outputEncoder() { + return Encoders.bean(OaBrokerMainEntity.class); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java index 6a5fb258c..daf75ea2e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java @@ -3,7 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; import java.io.Serializable; -import eu.dnetlib.broker.objects.Dataset; +import eu.dnetlib.broker.objects.OaBrokerRelatedDataset; public class RelatedDataset implements Serializable { @@ -13,12 +13,12 @@ public class RelatedDataset implements Serializable { private static final long serialVersionUID = 774487705184038324L; private String source; private String relType; - private Dataset relDataset; + private OaBrokerRelatedDataset relDataset; public RelatedDataset() { } - public RelatedDataset(final String source, final String relType, final Dataset relDataset) { + public RelatedDataset(final String source, final String relType, final OaBrokerRelatedDataset relDataset) { this.source = source; this.relType = relType; this.relDataset = relDataset; @@ -40,11 +40,11 @@ public class RelatedDataset implements Serializable { this.relType = relType; } - public Dataset getRelDataset() { + public OaBrokerRelatedDataset getRelDataset() { return relDataset; } - public void setRelDataset(final Dataset relDataset) { + public void setRelDataset(final OaBrokerRelatedDataset relDataset) { this.relDataset = relDataset; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java index fafec1e19..4116c8c77 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java @@ -3,7 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; import java.io.Serializable; -import eu.dnetlib.broker.objects.Project; +import eu.dnetlib.broker.objects.OaBrokerProject; public class RelatedProject implements Serializable { @@ -14,12 +14,12 @@ public class RelatedProject implements Serializable { private String source; private String relType; - private Project relProject; + private OaBrokerProject relProject; public RelatedProject() { } - public RelatedProject(final String source, final String relType, final Project relProject) { + public RelatedProject(final String source, final String relType, final OaBrokerProject relProject) { this.source = source; this.relType = relType; this.relProject = relProject; @@ -41,11 +41,11 @@ public class RelatedProject implements Serializable { this.relType = relType; } - public Project getRelProject() { + public OaBrokerProject getRelProject() { return relProject; } - public void setRelProject(final Project relProject) { + public void setRelProject(final OaBrokerProject relProject) { this.relProject = relProject; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java index 8a31ddf7e..9e222a952 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java @@ -3,7 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; import java.io.Serializable; -import eu.dnetlib.broker.objects.Publication; +import eu.dnetlib.broker.objects.OaBrokerRelatedPublication; public class RelatedPublication implements Serializable { @@ -14,12 +14,13 @@ public class RelatedPublication implements Serializable { private String source; private String relType; - private Publication relPublication; + private OaBrokerRelatedPublication relPublication; public RelatedPublication() { } - public RelatedPublication(final String source, final String relType, final Publication relPublication) { + public RelatedPublication(final String source, final String relType, + final OaBrokerRelatedPublication relPublication) { this.source = source; this.relType = relType; this.relPublication = relPublication; @@ -41,11 +42,11 @@ public class RelatedPublication implements Serializable { this.relType = relType; } - public Publication getRelPublication() { + public OaBrokerRelatedPublication getRelPublication() { return relPublication; } - public void setRelPublication(final Publication relPublication) { + public void setRelPublication(final OaBrokerRelatedPublication relPublication) { this.relPublication = relPublication; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java index 319387469..2f3b8668c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java @@ -3,7 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; import java.io.Serializable; -import eu.dnetlib.broker.objects.Software; +import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware; public class RelatedSoftware implements Serializable { @@ -13,12 +13,12 @@ public class RelatedSoftware implements Serializable { private static final long serialVersionUID = 7573383356943300157L; private String source; private String relType; - private Software relSoftware; + private OaBrokerRelatedSoftware relSoftware; public RelatedSoftware() { } - public RelatedSoftware(final String source, final String relType, final Software relSoftware) { + public RelatedSoftware(final String source, final String relType, final OaBrokerRelatedSoftware relSoftware) { this.source = source; this.relType = relType; this.relSoftware = relSoftware; @@ -40,11 +40,11 @@ public class RelatedSoftware implements Serializable { this.relType = relType; } - public Software getRelSoftware() { + public OaBrokerRelatedSoftware getRelSoftware() { return relSoftware; } - public void setRelSoftware(final Software relSoftware) { + public void setRelSoftware(final OaBrokerRelatedSoftware relSoftware) { this.relSoftware = relSoftware; } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_relations.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_relations.json new file mode 100644 index 000000000..32fd1d8f3 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_relations.json @@ -0,0 +1,14 @@ +[ + { + "paramName": "g", + "paramLongName": "graphPath", + "paramDescription": "the path where there the graph is stored", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "relsPath", + "paramDescription": "the path where the generated relations will be stored", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json new file mode 100644 index 000000000..6f5e330f6 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json @@ -0,0 +1,14 @@ +[ + { + "paramName": "g", + "paramLongName": "graphPath", + "paramDescription": "the path where there the graph is stored", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "simpleEntitiesPath", + "paramDescription": "the path where the generated simple entities (without relations) will be stored", + "paramRequired": true + } +]