WIP: promote job functions implementation snapshot

This commit is contained in:
Przemysław Jacewicz 2020-03-11 17:02:57 +01:00
parent cc63cdc9e6
commit 2e996d610f
1 changed files with 115 additions and 26 deletions

View File

@ -1,45 +1,134 @@
package eu.dnetlib.dhp.actionmanager; package eu.dnetlib.dhp.actionmanager;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;
import scala.Tuple2; import scala.Tuple2;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Function;
public class PromoteActionSetFromHDFSFunctions { public class PromoteActionSetFromHDFSFunctions {
public static <T extends OafEntity> Dataset<T> groupEntitiesByIdAndMerge(Dataset<T> entityDS, public static <T extends Oaf> Dataset<T> joinOafEntityWithActionPayloadAndMerge(Dataset<T> oafDS,
Dataset<String> actionPayloadDS,
SerializableSupplier<Function<T, String>> oafIdFn,
SerializableSupplier<BiFunction<String, Class<T>, T>> actionPayloadToOafFn,
SerializableSupplier<BiFunction<T, T, T>> mergeAndGetFn,
Class<T> clazz) { Class<T> clazz) {
return entityDS Dataset<Tuple2<String, T>> oafWithIdDS = oafDS
.groupByKey((MapFunction<T, String>) OafEntity::getId, Encoders.STRING()) .map((MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(oafIdFn.get().apply(value), value),
.reduceGroups((ReduceFunction<T>) (x1, x2) -> { Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
x1.mergeFrom(x2);
return x1; Dataset<Tuple2<String, T>> actionPayloadWithIdDS = actionPayloadDS
}) .map((MapFunction<String, T>) value -> actionPayloadToOafFn.get().apply(value, clazz), Encoders.kryo(clazz))
.map((MapFunction<Tuple2<String, T>, T>) pair -> pair._2, Encoders.bean(clazz)); .filter((FilterFunction<T>) Objects::nonNull)
.map((MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(oafIdFn.get().apply(value), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
return oafWithIdDS
.joinWith(actionPayloadWithIdDS, oafWithIdDS.col("_1").equalTo(actionPayloadWithIdDS.col("_1")), "left_outer")
.map((MapFunction<Tuple2<Tuple2<String, T>, Tuple2<String, T>>, T>) value -> {
T left = value._1()._2();
return Optional
.ofNullable(value._2())
.map(Tuple2::_2)
.map(x -> mergeAndGetFn.get().apply(left, x))
.orElse(left);
}, Encoders.kryo(clazz));
} }
public static <T extends OafEntity, S> Dataset<T> joinEntitiesWithActionPayloadAndMerge(Dataset<T> entityDS, public static <T extends Oaf> Dataset<T> groupOafByIdAndMerge(Dataset<T> oafDS,
Dataset<S> actionPayloadDS, SerializableSupplier<Function<T, String>> oafIdFn,
BiFunction<Dataset<T>, Dataset<S>, Column> entityToActionPayloadJoinExpr, SerializableSupplier<BiFunction<T, T, T>> mergeAndGetFn,
BiFunction<S, Class<T>, T> actionPayloadToEntityFn,
Class<T> clazz) { Class<T> clazz) {
return entityDS return oafDS
.joinWith(actionPayloadDS, entityToActionPayloadJoinExpr.apply(entityDS, actionPayloadDS), "left_outer") .groupByKey((MapFunction<T, String>) x -> oafIdFn.get().apply(x), Encoders.STRING())
.map((MapFunction<Tuple2<T, S>, T>) pair -> Optional .reduceGroups((ReduceFunction<T>) (v1, v2) -> mergeAndGetFn.get().apply(v1, v2))
.ofNullable(pair._2()) .map((MapFunction<Tuple2<String, T>, T>) Tuple2::_2, Encoders.kryo(clazz));
.map(x -> {
T entity = actionPayloadToEntityFn.apply(x, clazz);
pair._1().mergeFrom(entity);
return pair._1();
})
.orElse(pair._1()), Encoders.bean(clazz));
} }
public static <T extends Oaf> Dataset<T> groupOafByIdAndMergeUsingAggregator(Dataset<T> oafDS,
SerializableSupplier<T> zeroFn,
SerializableSupplier<Function<T, String>> idFn,
Class<T> clazz) {
TypedColumn<T, T> aggregator = new OafAggregator<>(zeroFn, clazz).toColumn();
return oafDS
.groupByKey((MapFunction<T, String>) x -> idFn.get().apply(x), Encoders.STRING())
.agg(aggregator)
.map((MapFunction<Tuple2<String, T>, T>) Tuple2::_2, Encoders.kryo(clazz));
}
public static class OafAggregator<T extends Oaf> extends Aggregator<T, T, T> {
private SerializableSupplier<T> zero;
private Class<T> clazz;
public OafAggregator(SerializableSupplier<T> zero, Class<T> clazz) {
this.zero = zero;
this.clazz = clazz;
}
@Override
public T zero() {
return zero.get();
}
@Override
public T reduce(T b, T a) {
return mergeFrom(b, a);
}
@Override
public T merge(T b1, T b2) {
return mergeFrom(b1, b2);
}
private T mergeFrom(T left, T right) {
if (isNonNull(left)) {
if (left instanceof Relation) {
((Relation) left).mergeFrom((Relation) right);
return left;
}
((OafEntity) left).mergeFrom((OafEntity) right);
return left;
}
if (right instanceof Relation) {
((Relation) right).mergeFrom((Relation) left);
return right;
}
((OafEntity) right).mergeFrom((OafEntity) left);
return right;
}
private Boolean isNonNull(T a) {
return Objects.nonNull(a.getLastupdatetimestamp());
}
@Override
public T finish(T reduction) {
return reduction;
}
@Override
public Encoder<T> bufferEncoder() {
return Encoders.kryo(clazz);
}
@Override
public Encoder<T> outputEncoder() {
return Encoders.kryo(clazz);
}
}
} }