forked from D-Net/dnet-hadoop
[Actionmanager] zero function considers empty entity.id as well as rel.source/rel.target
This commit is contained in:
parent
1e423fdc07
commit
751125fdf9
|
@ -5,12 +5,12 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
|
import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.function.BiFunction;
|
import java.util.function.BiFunction;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
|
@ -194,7 +194,7 @@ public class PromoteActionPayloadForGraphTableJob {
|
||||||
SerializableSupplier<BiFunction<G, A, G>> mergeRowWithActionPayloadAndGetFn = MergeAndGet.functionFor(strategy);
|
SerializableSupplier<BiFunction<G, A, G>> mergeRowWithActionPayloadAndGetFn = MergeAndGet.functionFor(strategy);
|
||||||
SerializableSupplier<BiFunction<G, G, G>> mergeRowsAndGetFn = MergeAndGet.functionFor(strategy);
|
SerializableSupplier<BiFunction<G, G, G>> mergeRowsAndGetFn = MergeAndGet.functionFor(strategy);
|
||||||
SerializableSupplier<G> zeroFn = zeroFn(rowClazz);
|
SerializableSupplier<G> zeroFn = zeroFn(rowClazz);
|
||||||
SerializableSupplier<Function<G, Boolean>> isNotZeroFn = PromoteActionPayloadForGraphTableJob::isNotZeroFnUsingIdOrSource;
|
SerializableSupplier<Function<G, Boolean>> isNotZeroFn = PromoteActionPayloadForGraphTableJob::isNotZeroFnUsingIdOrSourceAndTarget;
|
||||||
|
|
||||||
Dataset<G> joinedAndMerged = PromoteActionPayloadFunctions
|
Dataset<G> joinedAndMerged = PromoteActionPayloadFunctions
|
||||||
.joinGraphTableWithActionPayloadAndMerge(
|
.joinGraphTableWithActionPayloadAndMerge(
|
||||||
|
@ -238,12 +238,13 @@ public class PromoteActionPayloadForGraphTableJob {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T extends Oaf> Function<T, Boolean> isNotZeroFnUsingIdOrSource() {
|
private static <T extends Oaf> Function<T, Boolean> isNotZeroFnUsingIdOrSourceAndTarget() {
|
||||||
return t -> {
|
return t -> {
|
||||||
if (isSubClass(t, Relation.class)) {
|
if (isSubClass(t, Relation.class)) {
|
||||||
return Objects.nonNull(((Relation) t).getSource());
|
final Relation rel = (Relation) t;
|
||||||
|
return StringUtils.isNotBlank(rel.getSource()) && StringUtils.isNotBlank(rel.getTarget());
|
||||||
}
|
}
|
||||||
return Objects.nonNull(((OafEntity) t).getId());
|
return StringUtils.isNotBlank(((OafEntity) t).getId());
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -111,7 +111,6 @@ public class PromoteActionPayloadFunctions {
|
||||||
SerializableSupplier<Function<G, Boolean>> isNotZeroFn,
|
SerializableSupplier<Function<G, Boolean>> isNotZeroFn,
|
||||||
Class<G> rowClazz) {
|
Class<G> rowClazz) {
|
||||||
TypedColumn<G, G> aggregator = new TableAggregator<>(zeroFn, mergeAndGetFn, isNotZeroFn, rowClazz).toColumn();
|
TypedColumn<G, G> aggregator = new TableAggregator<>(zeroFn, mergeAndGetFn, isNotZeroFn, rowClazz).toColumn();
|
||||||
|
|
||||||
return rowDS
|
return rowDS
|
||||||
.filter((FilterFunction<G>) o -> isNotZeroFn.get().apply(o))
|
.filter((FilterFunction<G>) o -> isNotZeroFn.get().apply(o))
|
||||||
.groupByKey((MapFunction<G, String>) x -> rowIdFn.get().apply(x), Encoders.STRING())
|
.groupByKey((MapFunction<G, String>) x -> rowIdFn.get().apply(x), Encoders.STRING())
|
||||||
|
|
|
@ -11,7 +11,6 @@ import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
@ -33,6 +32,7 @@ import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
|
|
||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
|
Loading…
Reference in New Issue