From 4822747313cf4fb6d771e8e3c11d561d6dc0643d Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Fri, 19 Jun 2020 13:53:56 +0200 Subject: [PATCH] some fixes --- .../broker/oa/GenerateEventsApplication.java | 24 ++-- .../dhp/broker/oa/util/ConversionUtils.java | 105 +++++++++--------- .../aggregators/simple/ResultAggregator.java | 6 +- .../util/aggregators/simple/ResultGroup.java | 15 +-- .../aggregators/withRels/RelatedDataset.java | 21 +++- .../aggregators/withRels/RelatedProject.java | 21 +++- .../withRels/RelatedPublication.java | 21 +++- .../aggregators/withRels/RelatedSoftware.java | 21 +++- 8 files changed, 143 insertions(+), 91 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index ae313813d..62171ac61 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -29,8 +29,9 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.utils.ISLookupClientFactory; @@ -83,9 +84,11 @@ public class GenerateEventsApplication { removeOutputDir(spark, eventsPath); // TODO REMOVE THIS - readPath(spark, graphPath + "/publication", Publication.class) - .filter(r -> r.getDataInfo().getDeletedbyinference()) - .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class)) + final Dataset projects = readPath(spark, graphPath + "/project", Project.class); + final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) + .cache(); + relatedEntities(projects, rels, RelatedProject.class) .write() .mode(SaveMode.Overwrite) .json(eventsPath); @@ -129,7 +132,7 @@ public class GenerateEventsApplication { (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) .agg(aggr) .map((MapFunction, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class)) - .filter(ResultGroup::isValid) + .filter(rg -> rg.getData().size() > 1) .map( (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), Encoders.bean(EventGroup.class)) @@ -141,15 +144,15 @@ public class GenerateEventsApplication { final String graphPath, final Class sourceClass) { - // final Dataset projects = readPath(spark, graphPath + "/project", Project.class); + final Dataset projects = readPath(spark, graphPath + "/project", Project.class); // final Dataset datasets = readPath( // spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); // final Dataset softwares = readPath(spark, graphPath + "/software", Software.class); // final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class); - // final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) - // .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) - // .cache(); + final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) + .cache(); final Dataset r0 = readPath( spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass) @@ -157,8 +160,7 @@ public class GenerateEventsApplication { .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class)); // TODO UNCOMMENT THIS - // final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, - // RelatedProject.class)); + final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); // final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, // RelatedSoftware.class)); // final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index d8f9dffbe..730d06519 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -7,7 +7,6 @@ import java.util.Objects; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.DocumentHelper; @@ -59,10 +58,6 @@ public class ConversionUtils { return sp != null ? new TypedValue(classId(sp.getQualifier()), sp.getValue()) : null; } - public static final Pair oafSubjectToPair(final StructuredProperty sp) { - return sp != null ? Pair.of(classId(sp.getQualifier()), sp.getValue()) : null; - } - public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) { if (d == null) { return null; @@ -123,55 +118,6 @@ public class ConversionUtils { return res; } - private static List structPropTypedList(final List list) { - if (list == null) { - return new ArrayList<>(); - } - - return list - .stream() - .map(ConversionUtils::oafStructPropToBrokerTypedValue) - .collect(Collectors.toList()); - } - - private static List mappedList(final List list, final Function func) { - if (list == null) { - return new ArrayList<>(); - } - - return list - .stream() - .map(func::apply) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - } - - private static List flatMappedList(final List list, final Function> func) { - if (list == null) { - return new ArrayList<>(); - } - - return list - .stream() - .map(func::apply) - .flatMap(List::stream) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - } - - private static T mappedFirst(final List list, final Function func) { - if (list == null) { - return null; - } - - return list - .stream() - .map(func::apply) - .filter(Objects::nonNull) - .findFirst() - .orElse(null); - } - private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) { if (author == null) { return null; @@ -300,4 +246,55 @@ public class ConversionUtils { .collect(Collectors.toList()) : new ArrayList<>(); } + + private static List structPropTypedList(final List list) { + if (list == null) { + return new ArrayList<>(); + } + + return list + .stream() + .map(ConversionUtils::oafStructPropToBrokerTypedValue) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + private static List mappedList(final List list, final Function func) { + if (list == null) { + return new ArrayList<>(); + } + + return list + .stream() + .map(func::apply) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + private static List flatMappedList(final List list, final Function> func) { + if (list == null) { + return new ArrayList<>(); + } + + return list + .stream() + .map(func::apply) + .flatMap(List::stream) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + private static T mappedFirst(final List list, final Function func) { + if (list == null) { + return null; + } + + return list + .stream() + .map(func::apply) + .filter(Objects::nonNull) + .findFirst() + .orElse(null); + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java index 747482198..a46fde445 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java @@ -23,12 +23,14 @@ public class ResultAggregator extends Aggregator t) { - return group.addElement(t._1); + group.getData().add(t._1); + return group; } @Override public ResultGroup merge(final ResultGroup g1, final ResultGroup g2) { - return g1.addGroup(g2); + g1.getData().addAll(g2.getData()); + return g1; } @Override diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java index 4308224a5..3f9dbe8af 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java @@ -14,23 +14,14 @@ public class ResultGroup implements Serializable { */ private static final long serialVersionUID = -3360828477088669296L; - private final List data = new ArrayList<>(); + private List data = new ArrayList<>(); public List getData() { return data; } - public ResultGroup addElement(final OpenaireBrokerResult elem) { - data.add(elem); - return this; + public void setData(final List data) { + this.data = data; } - public ResultGroup addGroup(final ResultGroup group) { - data.addAll(group.getData()); - return this; - } - - public boolean isValid() { - return data.size() > 1; - } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java index fcf1b89b1..6a5fb258c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java @@ -11,9 +11,12 @@ public class RelatedDataset implements Serializable { * */ private static final long serialVersionUID = 774487705184038324L; - private final String source; - private final String relType; - private final Dataset relDataset; + private String source; + private String relType; + private Dataset relDataset; + + public RelatedDataset() { + } public RelatedDataset(final String source, final String relType, final Dataset relDataset) { this.source = source; @@ -25,12 +28,24 @@ public class RelatedDataset implements Serializable { return source; } + public void setSource(final String source) { + this.source = source; + } + public String getRelType() { return relType; } + public void setRelType(final String relType) { + this.relType = relType; + } + public Dataset getRelDataset() { return relDataset; } + public void setRelDataset(final Dataset relDataset) { + this.relDataset = relDataset; + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java index 233041c09..fafec1e19 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java @@ -12,9 +12,12 @@ public class RelatedProject implements Serializable { */ private static final long serialVersionUID = 4941437626549329870L; - private final String source; - private final String relType; - private final Project relProject; + private String source; + private String relType; + private Project relProject; + + public RelatedProject() { + } public RelatedProject(final String source, final String relType, final Project relProject) { this.source = source; @@ -26,12 +29,24 @@ public class RelatedProject implements Serializable { return source; } + public void setSource(final String source) { + this.source = source; + } + public String getRelType() { return relType; } + public void setRelType(final String relType) { + this.relType = relType; + } + public Project getRelProject() { return relProject; } + public void setRelProject(final Project relProject) { + this.relProject = relProject; + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java index 80b92462d..8a31ddf7e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java @@ -12,9 +12,12 @@ public class RelatedPublication implements Serializable { */ private static final long serialVersionUID = 9021609640411395128L; - private final String source; - private final String relType; - private final Publication relPublication; + private String source; + private String relType; + private Publication relPublication; + + public RelatedPublication() { + } public RelatedPublication(final String source, final String relType, final Publication relPublication) { this.source = source; @@ -26,12 +29,24 @@ public class RelatedPublication implements Serializable { return source; } + public void setSource(final String source) { + this.source = source; + } + public String getRelType() { return relType; } + public void setRelType(final String relType) { + this.relType = relType; + } + public Publication getRelPublication() { return relPublication; } + public void setRelPublication(final Publication relPublication) { + this.relPublication = relPublication; + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java index 13f1f4290..319387469 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java @@ -11,9 +11,12 @@ public class RelatedSoftware implements Serializable { * */ private static final long serialVersionUID = 7573383356943300157L; - private final String source; - private final String relType; - private final Software relSoftware; + private String source; + private String relType; + private Software relSoftware; + + public RelatedSoftware() { + } public RelatedSoftware(final String source, final String relType, final Software relSoftware) { this.source = source; @@ -25,12 +28,24 @@ public class RelatedSoftware implements Serializable { return source; } + public void setSource(final String source) { + this.source = source; + } + public String getRelType() { return relType; } + public void setRelType(final String relType) { + this.relType = relType; + } + public Software getRelSoftware() { return relSoftware; } + public void setRelSoftware(final Software relSoftware) { + this.relSoftware = relSoftware; + } + }