diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java deleted file mode 100644 index db59920106..0000000000 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ /dev/null @@ -1,190 +0,0 @@ - -package eu.dnetlib.dhp.broker.oa; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.util.Optional; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.TypedColumn; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.broker.objects.OaBrokerMainEntity; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.broker.model.Event; -import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; -import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; -import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; -import eu.dnetlib.dhp.broker.oa.util.EventFinder; -import eu.dnetlib.dhp.broker.oa.util.EventGroup; -import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator; -import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OaBrokerMainEntityAggregator; -import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.config.DedupConfig; -import scala.Tuple2; - -public class GenerateEventsApplication { - - private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class); - - public static void main(final String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - GenerateEventsApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); - parser.parseArgument(args); - - final Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String graphPath = parser.get("graphPath"); - log.info("graphPath: {}", graphPath); - - final String eventsPath = parser.get("eventsPath"); - log.info("eventsPath: {}", eventsPath); - - final String isLookupUrl = parser.get("isLookupUrl"); - log.info("isLookupUrl: {}", isLookupUrl); - - final String dedupConfigProfileId = parser.get("dedupConfProfile"); - log.info("dedupConfigProfileId: {}", dedupConfigProfileId); - - final SparkConf conf = new SparkConf(); - // conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - // conf.registerKryoClasses(BrokerConstants.getModelClasses()); - - // TODO UNCOMMENT - // final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId); - final DedupConfig dedupConfig = null; - - runWithSparkSession(conf, isSparkSessionManaged, spark -> { - - ClusterUtils.removeDir(spark, eventsPath); - - // TODO REMOVE THIS - - expandResultsWithRelations(spark, graphPath, Publication.class) - .write() - .mode(SaveMode.Overwrite) - .json(eventsPath); - - // TODO UNCOMMENT THIS - // spark - // .emptyDataset(Encoders.bean(Event.class)) - // .union(generateEvents(spark, graphPath, Publication.class, dedupConfig)) - // .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig)) - // .union(generateEvents(spark, graphPath, Software.class, dedupConfig)) - // .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig)) - // .write() - // .mode(SaveMode.Overwrite) - // .option("compression", "gzip") - // .json(eventsPath); - }); - - } - - private static Dataset generateEvents( - final SparkSession spark, - final String graphPath, - final Class sourceClass, - final DedupConfig dedupConfig) { - - final Dataset results = expandResultsWithRelations(spark, graphPath, sourceClass); - - final Dataset mergedRels = ClusterUtils - .readPath(spark, graphPath + "/relation", Relation.class) - .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); - - final TypedColumn, ResultGroup> aggr = new ResultAggregator() - .toColumn(); - - return results - .joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner") - .groupByKey( - (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) - .agg(aggr) - .map((MapFunction, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class)) - .filter(rg -> rg.getData().size() > 1) - .map( - (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), - Encoders.bean(EventGroup.class)) - .flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class)); - } - - private static Dataset expandResultsWithRelations( - final SparkSession spark, - final String graphPath, - final Class sourceClass) { - - // final Dataset datasets = readPath( - // spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); - // final Dataset softwares = readPath(spark, graphPath + "/software", Software.class); - // final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class); - - final Dataset r0 = ClusterUtils - .readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass) - .filter(r -> r.getDataInfo().getDeletedbyinference()) - .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OaBrokerMainEntity.class)); - - // TODO UNCOMMENT THIS - // final Dataset r1 = join(r0, relatedProjects(spark, graphPath)); - // final Dataset r2 = join(r1, relatedDataset(spark, graphPath)); - // final Dataset r3 = join(r2, relatedPublications(spark, graphPath)); - // final Dataset r4 = join(r3, relatedSoftwares(spark, graphPath)); - - return r0; // TODO it should be r4 - } - - private static Dataset join(final Dataset sources, - final Dataset typedRels) { - - final TypedColumn, OaBrokerMainEntity> aggr = new OaBrokerMainEntityAggregator() - .toColumn(); - - return sources - .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") - .groupByKey( - (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) - .agg(aggr) - .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); - - } - - private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception { - - final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); - - final String conf = isLookUpService - .getResourceProfileByQuery( - String - .format( - "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", - profId)); - - final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class); - dedupConfig.getPace().initModel(); - dedupConfig.getPace().initTranslationMap(); - // dedupConfig.getWf().setConfigurationId("???"); - - return dedupConfig; - } - -} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java new file mode 100644 index 0000000000..3ea0086ff5 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java @@ -0,0 +1,106 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.EventFinder; +import eu.dnetlib.dhp.broker.oa.util.EventGroup; +import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.config.DedupConfig; + +public class GenerateEventsJob { + + private static final Logger log = LoggerFactory.getLogger(GenerateEventsJob.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + GenerateEventsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_events.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String graphPath = parser.get("graphPath"); + log.info("graphPath: {}", graphPath); + + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + final String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: {}", isLookupUrl); + + final String dedupConfigProfileId = parser.get("dedupConfProfile"); + log.info("dedupConfigProfileId: {}", dedupConfigProfileId); + + final String eventsPath = workingPath + "/eventsPath"; + log.info("eventsPath: {}", eventsPath); + + final SparkConf conf = new SparkConf(); + + // TODO UNCOMMENT + // final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId); + final DedupConfig dedupConfig = null; + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils.removeDir(spark, eventsPath); + + final Dataset groups = ClusterUtils + .readPath(spark, graphPath + "/relation", ResultGroup.class); + + final Dataset events = groups + .map( + (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), + Encoders.bean(EventGroup.class)) + .flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class)); + + events.write().mode(SaveMode.Overwrite).json(eventsPath); + + }); + + } + + private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception { + + final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); + + final String conf = isLookUpService + .getResourceProfileByQuery( + String + .format( + "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", + profId)); + + final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class); + dedupConfig.getPace().initModel(); + dedupConfig.getPace().initTranslationMap(); + // dedupConfig.getWf().setConfigurationId("???"); + + return dedupConfig; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinEntitiesJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinEntitiesJob.java new file mode 100644 index 0000000000..dac308f360 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinEntitiesJob.java @@ -0,0 +1,94 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.TypedColumn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OaBrokerMainEntityAggregator; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware; +import scala.Tuple2; + +public class JoinEntitiesJob { + + private static final Logger log = LoggerFactory.getLogger(JoinEntitiesJob.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + JoinEntitiesJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String graphPath = parser.get("graphPath"); + log.info("graphPath: {}", graphPath); + + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + final String joinedEntitiesPath = workingPath + "/joinedEntities"; + log.info("joinedEntitiesPath: {}", joinedEntitiesPath); + + final SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils.removeDir(spark, joinedEntitiesPath); + + final Dataset r0 = ClusterUtils + .readPath(spark, graphPath + "/simpleEntities", OaBrokerMainEntity.class); + + final Dataset r1 = join( + r0, ClusterUtils.readPath(spark, graphPath + "/relatedProjects", RelatedProject.class)); + final Dataset r2 = join( + r1, ClusterUtils.readPath(spark, graphPath + "/relatedDatasets", RelatedDataset.class)); + final Dataset r3 = join( + r2, ClusterUtils.readPath(spark, graphPath + "/relatedPublications", RelatedPublication.class)); + final Dataset r4 = join( + r3, ClusterUtils.readPath(spark, graphPath + "/relatedSoftwares", RelatedSoftware.class)); + + r4.write().mode(SaveMode.Overwrite).json(joinedEntitiesPath); + + }); + + } + + private static Dataset join(final Dataset sources, + final Dataset typedRels) { + + final TypedColumn, OaBrokerMainEntity> aggr = new OaBrokerMainEntityAggregator() + .toColumn(); + + return sources + .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") + .groupByKey( + (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) + .agg(aggr) + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); + + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java new file mode 100644 index 0000000000..aa057eee8a --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java @@ -0,0 +1,88 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.TypedColumn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator; +import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; +import eu.dnetlib.dhp.schema.oaf.Relation; +import scala.Tuple2; + +public class PrepareGroupsJob { + + private static final Logger log = LoggerFactory.getLogger(PrepareGroupsJob.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + JoinEntitiesJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String graphPath = parser.get("graphPath"); + log.info("graphPath: {}", graphPath); + + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + final String groupsPath = workingPath + "/groups"; + log.info("groupsPath: {}", groupsPath); + + final SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils.removeDir(spark, groupsPath); + + final Dataset results = ClusterUtils + .readPath(spark, graphPath + "/joinedEntities", OaBrokerMainEntity.class); + + final Dataset mergedRels = ClusterUtils + .readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); + + final TypedColumn, ResultGroup> aggr = new ResultAggregator() + .toColumn(); + + final Dataset groups = results + .joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner") + .groupByKey( + (MapFunction, String>) t -> t._2.getTarget(), + Encoders.STRING()) + .agg(aggr) + .map( + (MapFunction, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class)) + .filter(rg -> rg.getData().size() > 1); + + groups + .write() + .mode(SaveMode.Overwrite) + .json(groupsPath); + + }); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedDatasets.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java similarity index 82% rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedDatasets.java rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java index 4a10fbabf1..edf9b9a7e6 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedDatasets.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java @@ -19,16 +19,16 @@ import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset; import eu.dnetlib.dhp.schema.oaf.Relation; -public class GenerateRelatedDatasets { +public class PrepareRelatedDatasetsJob { - private static final Logger log = LoggerFactory.getLogger(GenerateRelatedDatasets.class); + private static final Logger log = LoggerFactory.getLogger(PrepareRelatedDatasetsJob.class); public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - GenerateRelatedDatasets.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json"))); + PrepareRelatedDatasetsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -40,7 +40,10 @@ public class GenerateRelatedDatasets { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String relsPath = parser.get("relsPath"); + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + final String relsPath = workingPath + "/relatedDatasets"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedProjects.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java similarity index 84% rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedProjects.java rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java index 59ed388e7c..00957972ad 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedProjects.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java @@ -23,9 +23,9 @@ import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Relation; -public class GenerateRelatedProjects { +public class PrepareRelatedProjectsJob { - private static final Logger log = LoggerFactory.getLogger(GenerateRelatedProjects.class); + private static final Logger log = LoggerFactory.getLogger(PrepareRelatedProjectsJob.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -33,8 +33,8 @@ public class GenerateRelatedProjects { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - GenerateRelatedProjects.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json"))); + PrepareRelatedProjectsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -46,7 +46,10 @@ public class GenerateRelatedProjects { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String relsPath = parser.get("relsPath"); + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + final String relsPath = workingPath + "/relatedProjects"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedPublications.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java similarity index 83% rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedPublications.java rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java index 0c20081dc8..945fd9ed77 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedPublications.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java @@ -22,9 +22,9 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication; import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Relation; -public class GenerateRelatedPublications { +public class PrepareRelatedPublicationsJob { - private static final Logger log = LoggerFactory.getLogger(GenerateRelatedPublications.class); + private static final Logger log = LoggerFactory.getLogger(PrepareRelatedPublicationsJob.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -32,8 +32,8 @@ public class GenerateRelatedPublications { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - GenerateRelatedPublications.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json"))); + PrepareRelatedPublicationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -45,7 +45,10 @@ public class GenerateRelatedPublications { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String relsPath = parser.get("relsPath"); + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + final String relsPath = workingPath + "/relatedPublications"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedSoftwares.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java similarity index 83% rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedSoftwares.java rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java index b957888463..edb8dc1c3a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateRelatedSoftwares.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java @@ -22,9 +22,9 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Software; -public class GenerateRelatedSoftwares { +public class PrepareRelatedSoftwaresJob { - private static final Logger log = LoggerFactory.getLogger(GenerateRelatedSoftwares.class); + private static final Logger log = LoggerFactory.getLogger(PrepareRelatedSoftwaresJob.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -32,8 +32,8 @@ public class GenerateRelatedSoftwares { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - GenerateRelatedSoftwares.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json"))); + PrepareRelatedSoftwaresJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -45,7 +45,10 @@ public class GenerateRelatedSoftwares { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String relsPath = parser.get("relsPath"); + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + final String relsPath = workingPath + "/relatedSoftwares"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); @@ -53,6 +56,7 @@ public class GenerateRelatedSoftwares { runWithSparkSession(conf, isSparkSessionManaged, spark -> { ClusterUtils.removeDir(spark, relsPath); + final Dataset softwares = ClusterUtils.readPath(spark, graphPath + "/software", Software.class); final Dataset rels = ClusterUtils.readPath(spark, graphPath + "/relation", Relation.class); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateSimpleEntitities.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java similarity index 68% rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateSimpleEntitities.java rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java index 59485d5cf0..213003db28 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateSimpleEntitities.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java @@ -18,19 +18,21 @@ import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.Software; -public class GenerateSimpleEntitities { +public class PrepareSimpleEntititiesJob { - private static final Logger log = LoggerFactory.getLogger(GenerateSimpleEntitities.class); + private static final Logger log = LoggerFactory.getLogger(PrepareSimpleEntititiesJob.class); public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - GenerateSimpleEntitities.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json"))); + PrepareSimpleEntititiesJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -42,7 +44,10 @@ public class GenerateSimpleEntitities { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String simpleEntitiesPath = parser.get("simpleEntitiesPath"); + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + final String simpleEntitiesPath = workingPath + "/simpleEntities"; log.info("simpleEntitiesPath: {}", simpleEntitiesPath); final SparkConf conf = new SparkConf(); @@ -51,27 +56,18 @@ public class GenerateSimpleEntitities { ClusterUtils.removeDir(spark, simpleEntitiesPath); - expandResultsWithRelations(spark, graphPath, Publication.class) + prepareSimpleEntities(spark, graphPath, Publication.class) + .union(prepareSimpleEntities(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class)) + .union(prepareSimpleEntities(spark, graphPath, Software.class)) + .union(prepareSimpleEntities(spark, graphPath, OtherResearchProduct.class)) .write() .mode(SaveMode.Overwrite) .json(simpleEntitiesPath); - - // TODO UNCOMMENT THIS - // spark - // .emptyDataset(Encoders.bean(Event.class)) - // .union(generateEvents(spark, graphPath, Publication.class, dedupConfig)) - // .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig)) - // .union(generateEvents(spark, graphPath, Software.class, dedupConfig)) - // .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig)) - // .write() - // .mode(SaveMode.Overwrite) - // .option("compression", "gzip") - // .json(eventsPath); }); } - private static Dataset expandResultsWithRelations( + private static Dataset prepareSimpleEntities( final SparkSession spark, final String graphPath, final Class sourceClass) { diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java index 8bcea5e6e3..15a1ddd880 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java @@ -14,6 +14,10 @@ public class ClusterUtils { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static void createDirIfMissing(final SparkSession spark, final String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + public static void removeDir(final SparkSession spark, final String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_relations.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/common_params.json similarity index 64% rename from dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_relations.json rename to dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/common_params.json index 32fd1d8f32..adee1888a1 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_relations.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/common_params.json @@ -7,8 +7,8 @@ }, { "paramName": "o", - "paramLongName": "relsPath", - "paramDescription": "the path where the generated relations will be stored", + "paramLongName": "workingPath", + "paramDescription": "the path where the temporary data will be stored", "paramRequired": true } ] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index ea9aabcfce..9783fcab60 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -6,8 +6,8 @@ the path where the graph is stored - eventsOutputPath - the path where the the events will be stored + workingPath + the path where the the generated data will be stored isLookupUrl @@ -73,18 +73,34 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + + + + + + + + + + + + + + + + + yarn cluster - GenerateEvents - eu.dnetlib.dhp.broker.oa.GenerateEventsApplication + PrepareSimpleEntititiesJob + eu.dnetlib.dhp.broker.oa.PrepareSimpleEntititiesJob dhp-broker-events-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -97,14 +113,183 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --eventsPath${eventsOutputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + PrepareRelatedDatasetsJob + eu.dnetlib.dhp.broker.oa.PrepareRelatedDatasetsJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + PrepareRelatedProjectsJob + eu.dnetlib.dhp.broker.oa.PrepareRelatedProjectsJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + PrepareRelatedPublicationsJob + eu.dnetlib.dhp.broker.oa.PrepareRelatedPublicationsJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + PrepareRelatedSoftwaresJob + eu.dnetlib.dhp.broker.oa.PrepareRelatedSoftwaresJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + + + yarn + cluster + JoinEntitiesJob + eu.dnetlib.dhp.broker.oa.JoinEntitiesJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + PrepareGroupsJob + eu.dnetlib.dhp.broker.oa.PrepareGroupsJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + GenerateEventsJob + eu.dnetlib.dhp.broker.oa.GenerateEventsJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} --isLookupUrl${isLookupUrl} --dedupConfProfile${dedupConfProfId} - diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_broker_events.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json similarity index 94% rename from dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_broker_events.json rename to dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json index 6ab6d9a2dd..d185bc73da 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_broker_events.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json @@ -7,7 +7,7 @@ }, { "paramName": "o", - "paramLongName": "eventsPath", + "paramLongName": "workingPath", "paramDescription": "the path where the generated events will be stored", "paramRequired": true }, diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json deleted file mode 100644 index 6f5e330f60..0000000000 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json +++ /dev/null @@ -1,14 +0,0 @@ -[ - { - "paramName": "g", - "paramLongName": "graphPath", - "paramDescription": "the path where there the graph is stored", - "paramRequired": true - }, - { - "paramName": "o", - "paramLongName": "simpleEntitiesPath", - "paramDescription": "the path where the generated simple entities (without relations) will be stored", - "paramRequired": true - } -]