From 9a847b45577d3eceb4abab00b81a2546cc133fa1 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 18 Jun 2020 13:14:10 +0200 Subject: [PATCH 1/7] some wf fixing --- dhp-workflows/dhp-broker-events/pom.xml | 10 +-- .../broker/oa/GenerateEventsApplication.java | 63 ++++++++----------- .../dhp/broker/oa/util/UpdateInfo.java | 8 ++- .../oa/generate_all/oozie_app/workflow.xml | 26 +++++--- 4 files changed, 53 insertions(+), 54 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml index eddc042c6..cd3257991 100644 --- a/dhp-workflows/dhp-broker-events/pom.xml +++ b/dhp-workflows/dhp-broker-events/pom.xml @@ -24,11 +24,7 @@ org.apache.spark spark-sql_2.11 - - org.apache.spark - spark-hive_2.11 - test - + eu.dnetlib.dhp @@ -45,10 +41,6 @@ dnet-pace-core - - com.jayway.jsonpath - json-path - dom4j dom4j diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index c1f12f43c..980b01fe1 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -31,7 +31,6 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAg import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Relation; @@ -51,9 +50,8 @@ public class GenerateEventsApplication { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - GenerateEventsApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); + .toString(GenerateEventsApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -78,18 +76,21 @@ public class GenerateEventsApplication { conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.registerKryoClasses(BrokerConstants.getModelClasses()); - final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId); + // TODO UNCOMMENT + // final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId); + final DedupConfig dedupConfig = null; runWithSparkSession(conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, eventsPath); + // TODO UNCOMMENT spark .emptyDataset(Encoders.kryo(Event.class)) .union(generateEvents(spark, graphPath, Publication.class, dedupConfig)) - .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig)) - .union(generateEvents(spark, graphPath, Software.class, dedupConfig)) - .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig)) + // .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig)) + // .union(generateEvents(spark, graphPath, Software.class, dedupConfig)) + // .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") @@ -117,15 +118,12 @@ public class GenerateEventsApplication { .toColumn(); return results - .joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner") - .groupByKey( - (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) + .joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner") + .groupByKey((MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) .agg(aggr) .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) .filter(ResultGroup::isValid) - .map( - (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), - Encoders.kryo(EventGroup.class)) + .map((MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), Encoders.kryo(EventGroup.class)) .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class)); } @@ -133,9 +131,9 @@ public class GenerateEventsApplication { final SparkSession spark, final String graphPath, final Class sourceClass) { + final Dataset projects = readPath(spark, graphPath + "/project", Project.class); - final Dataset datasets = readPath( - spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); + final Dataset datasets = readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); final Dataset softwares = readPath(spark, graphPath + "/software", Software.class); final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class); @@ -143,17 +141,14 @@ public class GenerateEventsApplication { .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .cache(); - final Dataset r0 = readPath( - spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) - .filter(r -> r.getDataInfo().getDeletedbyinference()) - .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); + final Dataset r0 = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()) + .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class)); final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class)); - final Dataset r4 = join( - r3, rels, relatedEntities(publications, rels, RelatedProject.class)); - ; + final Dataset r4 = join(r3, rels, relatedEntities(publications, rels, RelatedProject.class));; return r4; } @@ -163,9 +158,7 @@ public class GenerateEventsApplication { final Class clazz) { return rels .joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner") - .map( - t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), - Encoders.kryo(clazz)); + .map(t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), Encoders.kryo(clazz)); } private static Dataset join(final Dataset sources, @@ -173,15 +166,13 @@ public class GenerateEventsApplication { final Dataset typedRels) { final TypedColumn, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator() - .toColumn(); - ; + .toColumn();; - return sources - .joinWith(typedRels, sources.col("result.id").equalTo(rels.col("source")), "left_outer") - .groupByKey( - (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) + return sources.joinWith(typedRels, sources.col("openaireId").equalTo(rels.col("source")), "left_outer") + .groupByKey((MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) .map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class)); + } public static Dataset readPath( @@ -195,14 +186,12 @@ public class GenerateEventsApplication { } private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception { + final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); final String conf = isLookUpService - .getResourceProfileByQuery( - String - .format( - "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", - profId)); + .getResourceProfileByQuery(String + .format("for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", profId)); final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class); dedupConfig.getPace().initModel(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java index 82d017864..fca9cf89e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java @@ -63,8 +63,14 @@ public final class UpdateInfo { return target; } - private float calculateTrust(final DedupConfig dedupConfig, final OpenaireBrokerResult r1, + private float calculateTrust(final DedupConfig dedupConfig, + final OpenaireBrokerResult r1, final OpenaireBrokerResult r2) { + + if (dedupConfig == null) { + return BrokerConstants.MIN_TRUST; + } + try { final ObjectMapper objectMapper = new ObjectMapper(); final MapDocument doc1 = MapDocumentUtil diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index da573ae9c..ea9aabcfc 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -78,21 +78,33 @@ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + - - - - - eu.dnetlib.dhp.broker.oa.GenerateEventsApplication + + yarn + cluster + GenerateEvents + eu.dnetlib.dhp.broker.oa.GenerateEventsApplication + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + --graphPath${graphInputPath} --eventsPath${eventsOutputPath} --isLookupUrl${isLookupUrl} --dedupConfProfile${dedupConfProfId} - + + From e659b02e6bca84954cdda331f19c9ed1c9503c7a Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 18 Jun 2020 13:15:13 +0200 Subject: [PATCH 2/7] some wf fixing --- .../broker/oa/GenerateEventsApplication.java | 46 +++++++++++++------ 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index 980b01fe1..ae72e83ce 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -50,8 +50,9 @@ public class GenerateEventsApplication { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(GenerateEventsApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); + .toString( + GenerateEventsApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -119,11 +120,14 @@ public class GenerateEventsApplication { return results .joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner") - .groupByKey((MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) + .groupByKey( + (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) .agg(aggr) .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) .filter(ResultGroup::isValid) - .map((MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), Encoders.kryo(EventGroup.class)) + .map( + (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), + Encoders.kryo(EventGroup.class)) .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class)); } @@ -133,7 +137,8 @@ public class GenerateEventsApplication { final Class sourceClass) { final Dataset projects = readPath(spark, graphPath + "/project", Project.class); - final Dataset datasets = readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); + final Dataset datasets = readPath( + spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); final Dataset softwares = readPath(spark, graphPath + "/software", Software.class); final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class); @@ -141,14 +146,17 @@ public class GenerateEventsApplication { .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .cache(); - final Dataset r0 = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) - .filter(r -> r.getDataInfo().getDeletedbyinference()) - .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); + final Dataset r0 = readPath( + spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()) + .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class)); final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class)); - final Dataset r4 = join(r3, rels, relatedEntities(publications, rels, RelatedProject.class));; + final Dataset r4 = join( + r3, rels, relatedEntities(publications, rels, RelatedProject.class)); + ; return r4; } @@ -158,7 +166,9 @@ public class GenerateEventsApplication { final Class clazz) { return rels .joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner") - .map(t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), Encoders.kryo(clazz)); + .map( + t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), + Encoders.kryo(clazz)); } private static Dataset join(final Dataset sources, @@ -166,10 +176,13 @@ public class GenerateEventsApplication { final Dataset typedRels) { final TypedColumn, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator() - .toColumn();; + .toColumn(); + ; - return sources.joinWith(typedRels, sources.col("openaireId").equalTo(rels.col("source")), "left_outer") - .groupByKey((MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) + return sources + .joinWith(typedRels, sources.col("openaireId").equalTo(rels.col("source")), "left_outer") + .groupByKey( + (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) .map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class)); @@ -190,8 +203,11 @@ public class GenerateEventsApplication { final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); final String conf = isLookUpService - .getResourceProfileByQuery(String - .format("for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", profId)); + .getResourceProfileByQuery( + String + .format( + "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", + profId)); final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class); dedupConfig.getPace().initModel(); From 61634fbfe0ada4067e7c40de5933d384e001daa1 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 18 Jun 2020 14:09:58 +0200 Subject: [PATCH 3/7] removed kryo encoding --- .../broker/oa/GenerateEventsApplication.java | 49 ++++++++++++------- .../aggregators/simple/ResultAggregator.java | 4 +- .../OpenaireBrokerResultAggregator.java | 4 +- 3 files changed, 34 insertions(+), 23 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index ae72e83ce..0673e7810 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -28,8 +28,11 @@ import eu.dnetlib.dhp.broker.oa.util.EventGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Publication; @@ -74,8 +77,8 @@ public class GenerateEventsApplication { log.info("dedupConfigProfileId: {}", dedupConfigProfileId); final SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(BrokerConstants.getModelClasses()); + // conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + // conf.registerKryoClasses(BrokerConstants.getModelClasses()); // TODO UNCOMMENT // final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId); @@ -85,17 +88,24 @@ public class GenerateEventsApplication { removeOutputDir(spark, eventsPath); - // TODO UNCOMMENT - spark - .emptyDataset(Encoders.kryo(Event.class)) - .union(generateEvents(spark, graphPath, Publication.class, dedupConfig)) - // .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig)) - // .union(generateEvents(spark, graphPath, Software.class, dedupConfig)) - // .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig)) + // TODO REMOVE THIS + expandResultsWithRelations(spark, graphPath, Publication.class) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(eventsPath); + + // TODO UNCOMMENT THIS + // spark + // .emptyDataset(Encoders.bean(Event.class)) + // .union(generateEvents(spark, graphPath, Publication.class, dedupConfig)) + // .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig)) + // .union(generateEvents(spark, graphPath, Software.class, dedupConfig)) + // .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig)) + // .write() + // .mode(SaveMode.Overwrite) + // .option("compression", "gzip") + // .json(eventsPath); }); } @@ -123,12 +133,12 @@ public class GenerateEventsApplication { .groupByKey( (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) .agg(aggr) - .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) + .map((MapFunction, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class)) .filter(ResultGroup::isValid) .map( (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), - Encoders.kryo(EventGroup.class)) - .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class)); + Encoders.bean(EventGroup.class)) + .flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class)); } private static Dataset expandResultsWithRelations( @@ -147,15 +157,16 @@ public class GenerateEventsApplication { .cache(); final Dataset r0 = readPath( - spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) + spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass) .filter(r -> r.getDataInfo().getDeletedbyinference()) - .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); + .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class)); final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); - final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class)); - final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class)); + final Dataset r2 = join( + r1, rels, relatedEntities(softwares, rels, RelatedSoftware.class)); + final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedDataset.class)); final Dataset r4 = join( - r3, rels, relatedEntities(publications, rels, RelatedProject.class)); + r3, rels, relatedEntities(publications, rels, RelatedPublication.class)); ; return r4; @@ -168,7 +179,7 @@ public class GenerateEventsApplication { .joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner") .map( t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), - Encoders.kryo(clazz)); + Encoders.bean(clazz)); } private static Dataset join(final Dataset sources, @@ -184,7 +195,7 @@ public class GenerateEventsApplication { .groupByKey( (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) - .map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class)); + .map(t -> t._2, Encoders.bean(OpenaireBrokerResult.class)); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java index dabe2bb4d..747482198 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java @@ -38,13 +38,13 @@ public class ResultAggregator extends Aggregator bufferEncoder() { - return Encoders.kryo(ResultGroup.class); + return Encoders.bean(ResultGroup.class); } @Override public Encoder outputEncoder() { - return Encoders.kryo(ResultGroup.class); + return Encoders.bean(ResultGroup.class); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java index b44fbe367..e72dcb988 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java @@ -58,12 +58,12 @@ public class OpenaireBrokerResultAggregator @Override public Encoder bufferEncoder() { - return Encoders.kryo(OpenaireBrokerResult.class); + return Encoders.bean(OpenaireBrokerResult.class); } @Override public Encoder outputEncoder() { - return Encoders.kryo(OpenaireBrokerResult.class); + return Encoders.bean(OpenaireBrokerResult.class); } } From 52f62d5d8c0492abc2a8549dc84bf176bd0d06b3 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 18 Jun 2020 14:49:13 +0200 Subject: [PATCH 4/7] events --- .../broker/oa/GenerateEventsApplication.java | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index 0673e7810..3357710f0 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -28,11 +28,7 @@ import eu.dnetlib.dhp.broker.oa.util.EventGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Publication; @@ -92,7 +88,6 @@ public class GenerateEventsApplication { expandResultsWithRelations(spark, graphPath, Publication.class) .write() .mode(SaveMode.Overwrite) - .option("compression", "gzip") .json(eventsPath); // TODO UNCOMMENT THIS @@ -161,15 +156,17 @@ public class GenerateEventsApplication { .filter(r -> r.getDataInfo().getDeletedbyinference()) .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class)); - final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); - final Dataset r2 = join( - r1, rels, relatedEntities(softwares, rels, RelatedSoftware.class)); - final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedDataset.class)); - final Dataset r4 = join( - r3, rels, relatedEntities(publications, rels, RelatedPublication.class)); - ; + // TODO UNCOMMENT THIS + // final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, + // RelatedProject.class)); + // final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, + // RelatedSoftware.class)); + // final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, + // RelatedDataset.class)); + // final Dataset r4 = join(r3, rels, relatedEntities(publications, rels, + // RelatedPublication.class));; - return r4; + return r0; // TODO it should be r4 } private static Dataset relatedEntities(final Dataset targets, From d0ac7514b225d08a89159fa59e2ad3a79bacaf42 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 18 Jun 2020 19:37:25 +0200 Subject: [PATCH 5/7] cleaning workflow to include cleaning of default values --- .../oa/graph/clean/CleanGraphSparkJob.java | 77 +++++++++++++++++++ .../dhp/oa/graph/clean/CleaningRuleMap.java | 34 ++++---- .../oa/graph/raw/common/VocabularyGroup.java | 6 +- .../oa/graph/clean/CleaningFunctionTest.java | 7 ++ .../eu/dnetlib/dhp/oa/graph/clean/result.json | 6 ++ 5 files changed, 114 insertions(+), 16 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index b2c7152d5..c90898814 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -3,9 +3,13 @@ package eu.dnetlib.dhp.oa.graph.clean; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.io.BufferedInputStream; +import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -19,7 +23,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -84,12 +90,83 @@ public class CleanGraphSparkJob { readTableFromPath(spark, inputPath, clazz) .map((MapFunction) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz)) + .map((MapFunction) value -> fixDefaults(value), Encoders.bean(clazz)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); } + private static T fixDefaults(T value) { + if (value instanceof Datasource) { + // nothing to clean here + } else if (value instanceof Project) { + // nothing to clean here + } else if (value instanceof Organization) { + Organization o = (Organization) value; + if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) { + o.setCountry(qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_COUNTRY_TYPE)); + } + } else if (value instanceof Relation) { + // nothing to clean here + } else if (value instanceof Result) { + + Result r = (Result) value; + if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) { + r + .setLanguage( + qualifier("und", "Undetermined", ModelConstants.DNET_LANGUAGES)); + } + if (Objects.nonNull(r.getSubject())) { + r + .setSubject( + r + .getSubject() + .stream() + .filter(Objects::nonNull) + .filter(sp -> StringUtils.isNotBlank(sp.getValue())) + .filter(sp -> Objects.nonNull(sp.getQualifier())) + .filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid())) + .collect(Collectors.toList())); + } + if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) { + r + .setResourcetype( + qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE)); + } + if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) { + r + .setBestaccessright( + qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES)); + } + if (Objects.nonNull(r.getInstance())) { + for (Instance i : r.getInstance()) { + if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) { + i.setAccessright(qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES)); + } + } + } + + if (value instanceof Publication) { + + } else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) { + + } else if (value instanceof OtherResearchProduct) { + + } else if (value instanceof Software) { + + } + } + + return value; + } + + private static Qualifier qualifier(String classid, String classname, String scheme) { + return OafMapperUtils + .qualifier( + classid, classname, scheme, scheme); + } + private static Dataset readTableFromPath( SparkSession spark, String inputEntityPath, Class clazz) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java index 8006f7300..d2d4e118f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java @@ -4,10 +4,13 @@ package eu.dnetlib.dhp.oa.graph.clean; import java.io.Serializable; import java.util.HashMap; +import org.apache.commons.lang3.StringUtils; + import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Country; import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; public class CleaningRuleMap extends HashMap> implements Serializable { @@ -18,23 +21,24 @@ public class CleaningRuleMap extends HashMap */ public static CleaningRuleMap create(VocabularyGroup vocabularies) { CleaningRuleMap mapping = new CleaningRuleMap(); - mapping.put(Qualifier.class, o -> { - Qualifier q = (Qualifier) o; - if (vocabularies.vocabularyExists(q.getSchemeid())) { - Qualifier newValue = vocabularies.lookup(q.getSchemeid(), q.getClassid()); - q.setClassid(newValue.getClassid()); - q.setClassname(newValue.getClassname()); + mapping.put(Qualifier.class, o -> cleanQualifier(vocabularies, (Qualifier) o)); + mapping.put(Country.class, o -> { + final Country c = (Country) o; + if (StringUtils.isBlank(c.getSchemeid())) { + c.setSchemeid(ModelConstants.DNET_COUNTRY_TYPE); + c.setSchemename(ModelConstants.DNET_COUNTRY_TYPE); } - }); - mapping.put(StructuredProperty.class, o -> { - StructuredProperty sp = (StructuredProperty) o; - // TODO implement a policy - /* - * if (StringUtils.isBlank(sp.getValue())) { sp.setValue(null); sp.setQualifier(null); sp.setDataInfo(null); - * } - */ + cleanQualifier(vocabularies, c); }); return mapping; } + private static void cleanQualifier(VocabularyGroup vocabularies, Q q) { + if (vocabularies.vocabularyExists(q.getSchemeid())) { + Qualifier newValue = vocabularies.lookup(q.getSchemeid(), q.getClassid()); + q.setClassid(newValue.getClassid()); + q.setClassname(newValue.getClassname()); + } + } + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java index d9ff62596..334339d3b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java @@ -122,7 +122,11 @@ public class VocabularyGroup implements Serializable { } public boolean vocabularyExists(final String vocId) { - return vocs.containsKey(vocId.toLowerCase()); + return Optional + .ofNullable(vocId) + .map(String::toLowerCase) + .map(id -> vocs.containsKey(id)) + .orElse(false); } private void addSynonyms(final String vocId, final String termId, final String syn) { diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java index 1b21ce2d3..4783aa81f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -56,6 +57,9 @@ public class CleaningFunctionTest { String json = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result.json")); Publication p_in = MAPPER.readValue(json, Publication.class); + assertTrue(p_in instanceof Result); + assertTrue(p_in instanceof Publication); + Publication p_out = OafCleaner.apply(p_in, mapping); assertNotNull(p_out); @@ -63,6 +67,9 @@ public class CleaningFunctionTest { assertEquals("und", p_out.getLanguage().getClassid()); assertEquals("Undetermined", p_out.getLanguage().getClassname()); + assertEquals("DE", p_out.getCountry().get(0).getClassid()); + assertEquals("Germany", p_out.getCountry().get(0).getClassname()); + assertEquals("0018", p_out.getInstance().get(0).getInstancetype().getClassid()); assertEquals("Annotation", p_out.getInstance().get(0).getInstancetype().getClassname()); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json index b63a12f61..2c1d5017d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json @@ -202,6 +202,12 @@ "contributor": [ ], "country": [ + { + "classid": "DE", + "classname": "DE", + "schemeid": "dnet:countries", + "schemename": "dnet:countries" + } ], "coverage": [ ], From 834f139e6e89490972343e0436c291c237520735 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Fri, 19 Jun 2020 12:33:29 +0200 Subject: [PATCH 6/7] fixed some NPE --- dhp-workflows/dhp-broker-events/pom.xml | 2 +- .../broker/oa/GenerateEventsApplication.java | 23 +- .../dhp/broker/oa/util/ConversionUtils.java | 284 +++++++++++------- .../dhp/broker/oa/util/UpdateInfo.java | 14 +- 4 files changed, 199 insertions(+), 124 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml index cd3257991..f943ac93a 100644 --- a/dhp-workflows/dhp-broker-events/pom.xml +++ b/dhp-workflows/dhp-broker-events/pom.xml @@ -53,7 +53,7 @@ eu.dnetlib dnet-openaire-broker-common - [3.0.1,4.0.0) + [3.0.2,4.0.0) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index 3357710f0..ae313813d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -30,11 +30,9 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; @@ -85,7 +83,9 @@ public class GenerateEventsApplication { removeOutputDir(spark, eventsPath); // TODO REMOVE THIS - expandResultsWithRelations(spark, graphPath, Publication.class) + readPath(spark, graphPath + "/publication", Publication.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()) + .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class)) .write() .mode(SaveMode.Overwrite) .json(eventsPath); @@ -141,15 +141,15 @@ public class GenerateEventsApplication { final String graphPath, final Class sourceClass) { - final Dataset projects = readPath(spark, graphPath + "/project", Project.class); - final Dataset datasets = readPath( - spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); - final Dataset softwares = readPath(spark, graphPath + "/software", Software.class); - final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class); + // final Dataset projects = readPath(spark, graphPath + "/project", Project.class); + // final Dataset datasets = readPath( + // spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); + // final Dataset softwares = readPath(spark, graphPath + "/software", Software.class); + // final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class); - final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) - .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) - .cache(); + // final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) + // .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) + // .cache(); final Dataset r0 = readPath( spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass) @@ -185,7 +185,6 @@ public class GenerateEventsApplication { final TypedColumn, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator() .toColumn(); - ; return sources .joinWith(typedRels, sources.col("openaireId").equalTo(rels.col("source")), "left_outer") diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index d04ef45a0..d8f9dffbe 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -13,6 +14,8 @@ import org.dom4j.DocumentHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Function; + import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.TypedValue; import eu.dnetlib.dhp.schema.oaf.Author; @@ -24,6 +27,7 @@ import eu.dnetlib.dhp.schema.oaf.Journal; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; @@ -33,133 +37,186 @@ public class ConversionUtils { private static final Logger log = LoggerFactory.getLogger(ConversionUtils.class); public static List oafInstanceToBrokerInstances(final Instance i) { - return i.getUrl().stream().map(url -> { - return new eu.dnetlib.broker.objects.Instance() - .setUrl(url) - .setInstancetype(i.getInstancetype().getClassid()) - .setLicense(BrokerConstants.OPEN_ACCESS) - .setHostedby(i.getHostedby().getValue()); - }).collect(Collectors.toList()); + if (i == null) { + return new ArrayList<>(); + } + + return mappedList(i.getUrl(), url -> { + final eu.dnetlib.broker.objects.Instance res = new eu.dnetlib.broker.objects.Instance(); + res.setUrl(url); + res.setInstancetype(classId(i.getInstancetype())); + res.setLicense(BrokerConstants.OPEN_ACCESS); + res.setHostedby(kvValue(i.getHostedby())); + return res; + }); } public static TypedValue oafPidToBrokerPid(final StructuredProperty sp) { - return sp != null ? new TypedValue() - .setValue(sp.getValue()) - .setType(sp.getQualifier().getClassid()) : null; + return oafStructPropToBrokerTypedValue(sp); + } + + public static TypedValue oafStructPropToBrokerTypedValue(final StructuredProperty sp) { + return sp != null ? new TypedValue(classId(sp.getQualifier()), sp.getValue()) : null; } public static final Pair oafSubjectToPair(final StructuredProperty sp) { - return sp != null ? Pair.of(sp.getQualifier().getClassid(), sp.getValue()) : null; + return sp != null ? Pair.of(classId(sp.getQualifier()), sp.getValue()) : null; } public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) { - return d != null ? new eu.dnetlib.broker.objects.Dataset() - .setOriginalId(d.getOriginalId().get(0)) - .setTitle(structPropValue(d.getTitle())) - .setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) - .setInstances( - d - .getInstance() - .stream() - .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(List::stream) - .collect(Collectors.toList())) - .setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) - : null; + if (d == null) { + return null; + } + + final eu.dnetlib.broker.objects.Dataset res = new eu.dnetlib.broker.objects.Dataset(); + res.setOriginalId(first(d.getOriginalId())); + res.setTitle(structPropValue(d.getTitle())); + res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid)); + res.setInstances(flatMappedList(d.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); + res.setCollectedFrom(mappedFirst(d.getCollectedfrom(), KeyValue::getValue)); + return res; } public static eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication p) { - return p != null ? new eu.dnetlib.broker.objects.Publication() - .setOriginalId(p.getOriginalId().get(0)) - .setTitle(structPropValue(p.getTitle())) - .setPids(p.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) - .setInstances( - p - .getInstance() - .stream() - .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(List::stream) - .collect(Collectors.toList())) - .setCollectedFrom(p.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) - : null; + if (p == null) { + return null; + } + + final eu.dnetlib.broker.objects.Publication res = new eu.dnetlib.broker.objects.Publication(); + res.setOriginalId(first(p.getOriginalId())); + res.setTitle(structPropValue(p.getTitle())); + res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid)); + res.setInstances(flatMappedList(p.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); + res.setCollectedFrom(mappedFirst(p.getCollectedfrom(), KeyValue::getValue)); + + return res; } public static final OpenaireBrokerResult oafResultToBrokerResult(final Result result) { + if (result == null) { + return null; + } - return result != null ? new OpenaireBrokerResult() - .setOpenaireId(result.getId()) - .setOriginalId(result.getOriginalId().get(0)) - .setTypology(result.getResulttype().getClassid()) - .setTitles(structPropList(result.getTitle())) - .setAbstracts(fieldList(result.getDescription())) - .setLanguage(result.getLanguage().getClassid()) - .setSubjects(structPropTypedList(result.getSubject())) - .setCreators( - result.getAuthor().stream().map(ConversionUtils::oafAuthorToBrokerAuthor).collect(Collectors.toList())) - .setPublicationdate(result.getDateofacceptance().getValue()) - .setPublisher(fieldValue(result.getPublisher())) - .setEmbargoenddate(fieldValue(result.getEmbargoenddate())) - .setContributor(fieldList(result.getContributor())) + final OpenaireBrokerResult res = new OpenaireBrokerResult(); + + res.setOpenaireId(result.getId()); + res.setOriginalId(first(result.getOriginalId())); + res.setTypology(classId(result.getResulttype())); + res.setTitles(structPropList(result.getTitle())); + res.setAbstracts(fieldList(result.getDescription())); + res.setLanguage(classId(result.getLanguage())); + res.setSubjects(structPropTypedList(result.getSubject())); + res.setCreators(mappedList(result.getAuthor(), ConversionUtils::oafAuthorToBrokerAuthor)); + res.setPublicationdate(fieldValue(result.getDateofacceptance())); + res.setPublisher(fieldValue(result.getPublisher())); + res.setEmbargoenddate(fieldValue(result.getEmbargoenddate())); + res.setContributor(fieldList(result.getContributor())); + res .setJournal( - result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null) - .setCollectedFromId(result.getCollectedfrom().stream().map(KeyValue::getKey).findFirst().orElse(null)) - .setCollectedFromName(result.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) - .setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) - .setInstances( - result - .getInstance() - .stream() - .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(List::stream) - .collect(Collectors.toList())) - .setExternalReferences( - result - .getExternalReference() - .stream() - .map(ConversionUtils::oafExtRefToBrokerExtRef) - .collect(Collectors.toList())) - : null; + result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null); + res.setCollectedFromId(mappedFirst(result.getCollectedfrom(), KeyValue::getKey)); + res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue)); + res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid)); + res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); + res.setExternalReferences(mappedList(result.getExternalReference(), ConversionUtils::oafExtRefToBrokerExtRef)); + + return res; } private static List structPropTypedList(final List list) { + if (list == null) { + return new ArrayList<>(); + } + return list .stream() - .map( - p -> new TypedValue() - .setValue(p.getValue()) - .setType(p.getQualifier().getClassid())) + .map(ConversionUtils::oafStructPropToBrokerTypedValue) .collect(Collectors.toList()); } + private static List mappedList(final List list, final Function func) { + if (list == null) { + return new ArrayList<>(); + } + + return list + .stream() + .map(func::apply) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + private static List flatMappedList(final List list, final Function> func) { + if (list == null) { + return new ArrayList<>(); + } + + return list + .stream() + .map(func::apply) + .flatMap(List::stream) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + private static T mappedFirst(final List list, final Function func) { + if (list == null) { + return null; + } + + return list + .stream() + .map(func::apply) + .filter(Objects::nonNull) + .findFirst() + .orElse(null); + } + private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) { - return author != null ? new eu.dnetlib.broker.objects.Author() - .setFullname(author.getFullname()) - .setOrcid( - author - .getPid() - .stream() - .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) - .map(pid -> pid.getValue()) - .findFirst() - .orElse(null)) - : null; + if (author == null) { + return null; + } + + final String pids = author.getPid() != null ? author + .getPid() + .stream() + .filter(pid -> pid != null) + .filter(pid -> pid.getQualifier() != null) + .filter(pid -> pid.getQualifier().getClassid() != null) + .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) + .map(pid -> pid.getValue()) + .filter(StringUtils::isNotBlank) + .findFirst() + .orElse(null) : null; + + return new eu.dnetlib.broker.objects.Author(author.getFullname(), pids); } private static eu.dnetlib.broker.objects.Journal oafJournalToBrokerJournal(final Journal journal) { - return journal != null ? new eu.dnetlib.broker.objects.Journal() - .setName(journal.getName()) - .setIssn(journal.getIssnPrinted()) - .setEissn(journal.getIssnOnline()) - .setLissn(journal.getIssnLinking()) : null; + if (journal == null) { + return null; + } + + final eu.dnetlib.broker.objects.Journal res = new eu.dnetlib.broker.objects.Journal(); + res.setName(journal.getName()); + res.setIssn(journal.getIssnPrinted()); + res.setEissn(journal.getIssnOnline()); + res.setLissn(journal.getIssnLinking()); + + return res; } private static eu.dnetlib.broker.objects.ExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) { - return ref != null ? new eu.dnetlib.broker.objects.ExternalReference() - .setRefidentifier(ref.getRefidentifier()) - .setSitename(ref.getSitename()) - .setType(ref.getQualifier().getClassid()) - .setUrl(ref.getUrl()) - : null; + if (ref == null) { + return null; + } + + final eu.dnetlib.broker.objects.ExternalReference res = new eu.dnetlib.broker.objects.ExternalReference(); + res.setRefidentifier(ref.getRefidentifier()); + res.setSitename(ref.getSitename()); + res.setType(classId(ref.getQualifier())); + res.setUrl(ref.getUrl()); + return res; } public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) { @@ -167,10 +224,10 @@ public class ConversionUtils { return null; } - final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project() - .setTitle(fieldValue(p.getTitle())) - .setAcronym(fieldValue(p.getAcronym())) - .setCode(fieldValue(p.getCode())); + final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project(); + res.setTitle(fieldValue(p.getTitle())); + res.setAcronym(fieldValue(p.getAcronym())); + res.setCode(fieldValue(p.getCode())); final String ftree = fieldValue(p.getFundingtree()); if (StringUtils.isNotBlank(ftree)) { @@ -188,12 +245,25 @@ public class ConversionUtils { } public static final eu.dnetlib.broker.objects.Software oafSoftwareToBrokerSoftware(final Software sw) { - return sw != null ? new eu.dnetlib.broker.objects.Software() - .setName(structPropValue(sw.getTitle())) - .setDescription(fieldValue(sw.getDescription())) - .setRepository(fieldValue(sw.getCodeRepositoryUrl())) - .setLandingPage(fieldValue(sw.getDocumentationUrl())) - : null; + if (sw == null) { + return null; + } + + final eu.dnetlib.broker.objects.Software res = new eu.dnetlib.broker.objects.Software(); + res.setName(structPropValue(sw.getTitle())); + res.setDescription(fieldValue(sw.getDescription())); + res.setRepository(fieldValue(sw.getCodeRepositoryUrl())); + res.setLandingPage(fieldValue(sw.getDocumentationUrl())); + + return res; + } + + private static String first(final List list) { + return list != null && list.size() > 0 ? list.get(0) : null; + } + + private static String kvValue(final KeyValue kv) { + return kv != null ? kv.getValue() : null; } private static String fieldValue(final Field f) { @@ -205,6 +275,10 @@ public class ConversionUtils { : null; } + private static String classId(final Qualifier q) { + return q != null ? q.getClassid() : null; + } + private static String structPropValue(final List props) { return props != null ? props.stream().map(StructuredProperty::getValue).filter(StringUtils::isNotBlank).findFirst().orElse(null) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java index fca9cf89e..2c4bda53d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java @@ -122,13 +122,15 @@ public final class UpdateInfo { .orElse(null); ; - final Provenance provenance = new Provenance().setId(provId).setRepositoryName(provRepo).setUrl(provUrl); + final Provenance provenance = new Provenance(provId, provRepo, provUrl); - return new OpenAireEventPayload() - .setPublication(target) - .setHighlight(hl) - .setTrust(trust) - .setProvenance(provenance); + final OpenAireEventPayload res = new OpenAireEventPayload(); + res.setResult(target); + res.setHighlight(hl); + res.setTrust(trust); + res.setProvenance(provenance); + + return res; } } From 4822747313cf4fb6d771e8e3c11d561d6dc0643d Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Fri, 19 Jun 2020 13:53:56 +0200 Subject: [PATCH 7/7] some fixes --- .../broker/oa/GenerateEventsApplication.java | 24 ++-- .../dhp/broker/oa/util/ConversionUtils.java | 105 +++++++++--------- .../aggregators/simple/ResultAggregator.java | 6 +- .../util/aggregators/simple/ResultGroup.java | 15 +-- .../aggregators/withRels/RelatedDataset.java | 21 +++- .../aggregators/withRels/RelatedProject.java | 21 +++- .../withRels/RelatedPublication.java | 21 +++- .../aggregators/withRels/RelatedSoftware.java | 21 +++- 8 files changed, 143 insertions(+), 91 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index ae313813d..62171ac61 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -29,8 +29,9 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.utils.ISLookupClientFactory; @@ -83,9 +84,11 @@ public class GenerateEventsApplication { removeOutputDir(spark, eventsPath); // TODO REMOVE THIS - readPath(spark, graphPath + "/publication", Publication.class) - .filter(r -> r.getDataInfo().getDeletedbyinference()) - .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class)) + final Dataset projects = readPath(spark, graphPath + "/project", Project.class); + final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) + .cache(); + relatedEntities(projects, rels, RelatedProject.class) .write() .mode(SaveMode.Overwrite) .json(eventsPath); @@ -129,7 +132,7 @@ public class GenerateEventsApplication { (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) .agg(aggr) .map((MapFunction, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class)) - .filter(ResultGroup::isValid) + .filter(rg -> rg.getData().size() > 1) .map( (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), Encoders.bean(EventGroup.class)) @@ -141,15 +144,15 @@ public class GenerateEventsApplication { final String graphPath, final Class sourceClass) { - // final Dataset projects = readPath(spark, graphPath + "/project", Project.class); + final Dataset projects = readPath(spark, graphPath + "/project", Project.class); // final Dataset datasets = readPath( // spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); // final Dataset softwares = readPath(spark, graphPath + "/software", Software.class); // final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class); - // final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) - // .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) - // .cache(); + final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) + .cache(); final Dataset r0 = readPath( spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass) @@ -157,8 +160,7 @@ public class GenerateEventsApplication { .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class)); // TODO UNCOMMENT THIS - // final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, - // RelatedProject.class)); + final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); // final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, // RelatedSoftware.class)); // final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index d8f9dffbe..730d06519 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -7,7 +7,6 @@ import java.util.Objects; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.DocumentHelper; @@ -59,10 +58,6 @@ public class ConversionUtils { return sp != null ? new TypedValue(classId(sp.getQualifier()), sp.getValue()) : null; } - public static final Pair oafSubjectToPair(final StructuredProperty sp) { - return sp != null ? Pair.of(classId(sp.getQualifier()), sp.getValue()) : null; - } - public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) { if (d == null) { return null; @@ -123,55 +118,6 @@ public class ConversionUtils { return res; } - private static List structPropTypedList(final List list) { - if (list == null) { - return new ArrayList<>(); - } - - return list - .stream() - .map(ConversionUtils::oafStructPropToBrokerTypedValue) - .collect(Collectors.toList()); - } - - private static List mappedList(final List list, final Function func) { - if (list == null) { - return new ArrayList<>(); - } - - return list - .stream() - .map(func::apply) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - } - - private static List flatMappedList(final List list, final Function> func) { - if (list == null) { - return new ArrayList<>(); - } - - return list - .stream() - .map(func::apply) - .flatMap(List::stream) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - } - - private static T mappedFirst(final List list, final Function func) { - if (list == null) { - return null; - } - - return list - .stream() - .map(func::apply) - .filter(Objects::nonNull) - .findFirst() - .orElse(null); - } - private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) { if (author == null) { return null; @@ -300,4 +246,55 @@ public class ConversionUtils { .collect(Collectors.toList()) : new ArrayList<>(); } + + private static List structPropTypedList(final List list) { + if (list == null) { + return new ArrayList<>(); + } + + return list + .stream() + .map(ConversionUtils::oafStructPropToBrokerTypedValue) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + private static List mappedList(final List list, final Function func) { + if (list == null) { + return new ArrayList<>(); + } + + return list + .stream() + .map(func::apply) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + private static List flatMappedList(final List list, final Function> func) { + if (list == null) { + return new ArrayList<>(); + } + + return list + .stream() + .map(func::apply) + .flatMap(List::stream) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + private static T mappedFirst(final List list, final Function func) { + if (list == null) { + return null; + } + + return list + .stream() + .map(func::apply) + .filter(Objects::nonNull) + .findFirst() + .orElse(null); + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java index 747482198..a46fde445 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java @@ -23,12 +23,14 @@ public class ResultAggregator extends Aggregator t) { - return group.addElement(t._1); + group.getData().add(t._1); + return group; } @Override public ResultGroup merge(final ResultGroup g1, final ResultGroup g2) { - return g1.addGroup(g2); + g1.getData().addAll(g2.getData()); + return g1; } @Override diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java index 4308224a5..3f9dbe8af 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java @@ -14,23 +14,14 @@ public class ResultGroup implements Serializable { */ private static final long serialVersionUID = -3360828477088669296L; - private final List data = new ArrayList<>(); + private List data = new ArrayList<>(); public List getData() { return data; } - public ResultGroup addElement(final OpenaireBrokerResult elem) { - data.add(elem); - return this; + public void setData(final List data) { + this.data = data; } - public ResultGroup addGroup(final ResultGroup group) { - data.addAll(group.getData()); - return this; - } - - public boolean isValid() { - return data.size() > 1; - } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java index fcf1b89b1..6a5fb258c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java @@ -11,9 +11,12 @@ public class RelatedDataset implements Serializable { * */ private static final long serialVersionUID = 774487705184038324L; - private final String source; - private final String relType; - private final Dataset relDataset; + private String source; + private String relType; + private Dataset relDataset; + + public RelatedDataset() { + } public RelatedDataset(final String source, final String relType, final Dataset relDataset) { this.source = source; @@ -25,12 +28,24 @@ public class RelatedDataset implements Serializable { return source; } + public void setSource(final String source) { + this.source = source; + } + public String getRelType() { return relType; } + public void setRelType(final String relType) { + this.relType = relType; + } + public Dataset getRelDataset() { return relDataset; } + public void setRelDataset(final Dataset relDataset) { + this.relDataset = relDataset; + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java index 233041c09..fafec1e19 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java @@ -12,9 +12,12 @@ public class RelatedProject implements Serializable { */ private static final long serialVersionUID = 4941437626549329870L; - private final String source; - private final String relType; - private final Project relProject; + private String source; + private String relType; + private Project relProject; + + public RelatedProject() { + } public RelatedProject(final String source, final String relType, final Project relProject) { this.source = source; @@ -26,12 +29,24 @@ public class RelatedProject implements Serializable { return source; } + public void setSource(final String source) { + this.source = source; + } + public String getRelType() { return relType; } + public void setRelType(final String relType) { + this.relType = relType; + } + public Project getRelProject() { return relProject; } + public void setRelProject(final Project relProject) { + this.relProject = relProject; + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java index 80b92462d..8a31ddf7e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java @@ -12,9 +12,12 @@ public class RelatedPublication implements Serializable { */ private static final long serialVersionUID = 9021609640411395128L; - private final String source; - private final String relType; - private final Publication relPublication; + private String source; + private String relType; + private Publication relPublication; + + public RelatedPublication() { + } public RelatedPublication(final String source, final String relType, final Publication relPublication) { this.source = source; @@ -26,12 +29,24 @@ public class RelatedPublication implements Serializable { return source; } + public void setSource(final String source) { + this.source = source; + } + public String getRelType() { return relType; } + public void setRelType(final String relType) { + this.relType = relType; + } + public Publication getRelPublication() { return relPublication; } + public void setRelPublication(final Publication relPublication) { + this.relPublication = relPublication; + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java index 13f1f4290..319387469 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java @@ -11,9 +11,12 @@ public class RelatedSoftware implements Serializable { * */ private static final long serialVersionUID = 7573383356943300157L; - private final String source; - private final String relType; - private final Software relSoftware; + private String source; + private String relType; + private Software relSoftware; + + public RelatedSoftware() { + } public RelatedSoftware(final String source, final String relType, final Software relSoftware) { this.source = source; @@ -25,12 +28,24 @@ public class RelatedSoftware implements Serializable { return source; } + public void setSource(final String source) { + this.source = source; + } + public String getRelType() { return relType; } + public void setRelType(final String relType) { + this.relType = relType; + } + public Software getRelSoftware() { return relSoftware; } + public void setRelSoftware(final Software relSoftware) { + this.relSoftware = relSoftware; + } + }