From 751125fdf95b9435f5ef86769a9d5b9623cfca14 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 23 Mar 2021 17:34:32 +0100 Subject: [PATCH] [Actionmanager] zero function considers empty entity.id as well as rel.source/rel.target --- .../promote/PromoteActionPayloadForGraphTableJob.java | 11 ++++++----- .../promote/PromoteActionPayloadFunctions.java | 1 - .../java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java index bab4377bd..0052026d4 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java @@ -5,12 +5,12 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass; import java.io.IOException; -import java.util.Objects; import java.util.Optional; import java.util.function.BiFunction; import java.util.function.Function; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -194,7 +194,7 @@ public class PromoteActionPayloadForGraphTableJob { SerializableSupplier> mergeRowWithActionPayloadAndGetFn = MergeAndGet.functionFor(strategy); SerializableSupplier> mergeRowsAndGetFn = MergeAndGet.functionFor(strategy); SerializableSupplier zeroFn = zeroFn(rowClazz); - SerializableSupplier> isNotZeroFn = PromoteActionPayloadForGraphTableJob::isNotZeroFnUsingIdOrSource; + SerializableSupplier> isNotZeroFn = PromoteActionPayloadForGraphTableJob::isNotZeroFnUsingIdOrSourceAndTarget; Dataset joinedAndMerged = PromoteActionPayloadFunctions .joinGraphTableWithActionPayloadAndMerge( @@ -238,12 +238,13 @@ public class PromoteActionPayloadForGraphTableJob { } } - private static Function isNotZeroFnUsingIdOrSource() { + private static Function isNotZeroFnUsingIdOrSourceAndTarget() { return t -> { if (isSubClass(t, Relation.class)) { - return Objects.nonNull(((Relation) t).getSource()); + final Relation rel = (Relation) t; + return StringUtils.isNotBlank(rel.getSource()) && StringUtils.isNotBlank(rel.getTarget()); } - return Objects.nonNull(((OafEntity) t).getId()); + return StringUtils.isNotBlank(((OafEntity) t).getId()); }; } diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java index c0192cddb..d799c646b 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java @@ -111,7 +111,6 @@ public class PromoteActionPayloadFunctions { SerializableSupplier> isNotZeroFn, Class rowClazz) { TypedColumn aggregator = new TableAggregator<>(zeroFn, mergeAndGetFn, isNotZeroFn, rowClazz).toColumn(); - return rowDS .filter((FilterFunction) o -> isNotZeroFn.get().apply(o)) .groupByKey((MapFunction) x -> rowIdFn.get().apply(x), Encoders.STRING()) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index f2786dd9d..c86e31280 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -11,7 +11,6 @@ import java.io.IOException; import java.util.List; import java.util.Optional; -import eu.dnetlib.dhp.schema.oaf.utils.PidType; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.BeforeEach; @@ -33,6 +32,7 @@ import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @ExtendWith(MockitoExtension.class)