WIP promote job functions updated

This commit is contained in:
Przemysław Jacewicz 2020-03-13 12:36:42 +01:00 committed by przemek
parent 8d9b3c5de2
commit d0c9b0cdd6
2 changed files with 25 additions and 45 deletions

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.actionmanager;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.Dataset;
@ -21,20 +20,12 @@ import java.util.function.Function;
public class PromoteActionSetFromHDFSFunctions {
public static <T extends Oaf> Dataset<T> joinOafEntityWithActionPayloadAndMerge(Dataset<T> oafDS,
Dataset<String> actionPayloadDS,
Dataset<T> actionPayloadOafDS,
SerializableSupplier<Function<T, String>> oafIdFn,
SerializableSupplier<BiFunction<String, Class<T>, T>> actionPayloadToOafFn,
SerializableSupplier<BiFunction<T, T, T>> mergeAndGetFn,
Class<T> clazz) {
Dataset<Tuple2<String, T>> oafWithIdDS = oafDS
.map((MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(oafIdFn.get().apply(value), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
Dataset<Tuple2<String, T>> actionPayloadWithIdDS = actionPayloadDS
.map((MapFunction<String, T>) value -> actionPayloadToOafFn.get().apply(value, clazz), Encoders.kryo(clazz))
.filter((FilterFunction<T>) Objects::nonNull)
.map((MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(oafIdFn.get().apply(value), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
Dataset<Tuple2<String, T>> oafWithIdDS = mapToTupleWithId(oafDS, oafIdFn, clazz);
Dataset<Tuple2<String, T>> actionPayloadWithIdDS = mapToTupleWithId(actionPayloadOafDS, oafIdFn, clazz);
return oafWithIdDS
.joinWith(actionPayloadWithIdDS, oafWithIdDS.col("_1").equalTo(actionPayloadWithIdDS.col("_1")), "left_outer")
@ -48,6 +39,14 @@ public class PromoteActionSetFromHDFSFunctions {
}, Encoders.kryo(clazz));
}
private static <T extends Oaf> Dataset<Tuple2<String, T>> mapToTupleWithId(Dataset<T> ds,
SerializableSupplier<Function<T, String>> idFn,
Class<T> clazz) {
return ds
.map((MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(idFn.get().apply(value), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
}
public static <T extends Oaf> Dataset<T> groupOafByIdAndMerge(Dataset<T> oafDS,
SerializableSupplier<Function<T, String>> oafIdFn,
SerializableSupplier<BiFunction<T, T, T>> mergeAndGetFn,

View File

@ -1,7 +1,5 @@
package eu.dnetlib.dhp.actionmanager;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
@ -11,7 +9,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.function.BiFunction;
@ -50,38 +47,24 @@ public class PromoteActionSetFromHDFSFunctionsTest {
);
Dataset<OafImpl> oafDS = spark.createDataset(oafData, Encoders.bean(OafImpl.class));
List<String> actionPayloadData = Arrays.asList(
createActionPayload(id1),
createActionPayload(id2), createActionPayload(id2),
createActionPayload(id3), createActionPayload(id3), createActionPayload(id3)
List<OafImpl> actionPayloadData = Arrays.asList(
createOafImpl(id1),
createOafImpl(id2), createOafImpl(id2),
createOafImpl(id3), createOafImpl(id3), createOafImpl(id3)
);
Dataset<String> actionPayloadDS = spark.createDataset(actionPayloadData, Encoders.STRING());
Dataset<OafImpl> actionPayloadDS = spark.createDataset(actionPayloadData, Encoders.bean(OafImpl.class));
SerializableSupplier<Function<OafImpl, String>> oafIdFn = () -> OafImpl::getId;
SerializableSupplier<BiFunction<String, Class<OafImpl>, OafImpl>> actionPayloadToOafFn = () -> (s, clazz) -> {
try {
JsonNode jsonNode = new ObjectMapper().readTree(s);
String id = jsonNode.at("/id").asText();
return createOafImpl(id);
} catch (IOException e) {
throw new RuntimeException(e);
}
};
SerializableSupplier<BiFunction<OafImpl, OafImpl, OafImpl>> mergeAndGetFn = () -> (x, y) -> {
x.mergeFrom(y);
return x;
};
SerializableSupplier<BiFunction<OafImpl, OafImpl, OafImpl>> mergeAndGetFn = () -> OafImpl::mergeAngGet;
// when
List<OafImpl> results = PromoteActionSetFromHDFSFunctions
.joinOafEntityWithActionPayloadAndMerge(oafDS,
actionPayloadDS,
oafIdFn,
actionPayloadToOafFn,
mergeAndGetFn,
OafImpl.class)
.collectAsList();
// System.out.println(results.stream().map(x -> String.format("%s:%d", x.getId(), x.merged)).collect(Collectors.joining(",")));
// then
assertEquals(7, results.size());
@ -95,6 +78,8 @@ public class PromoteActionSetFromHDFSFunctionsTest {
case "id4":
assertEquals(1, result.merged);
break;
default:
throw new RuntimeException();
}
});
}
@ -112,10 +97,7 @@ public class PromoteActionSetFromHDFSFunctionsTest {
);
Dataset<OafImpl> oafDS = spark.createDataset(oafData, Encoders.bean(OafImpl.class));
SerializableSupplier<Function<OafImpl, String>> idFn = () -> OafImpl::getId;
SerializableSupplier<BiFunction<OafImpl, OafImpl, OafImpl>> mergeAndGetFn = () -> (x, y) -> {
x.mergeFrom(y);
return x;
};
SerializableSupplier<BiFunction<OafImpl, OafImpl, OafImpl>> mergeAndGetFn = () -> OafImpl::mergeAngGet;
// when
List<OafImpl> results = PromoteActionSetFromHDFSFunctions
@ -124,7 +106,6 @@ public class PromoteActionSetFromHDFSFunctionsTest {
mergeAndGetFn,
OafImpl.class)
.collectAsList();
// System.out.println(results.stream().map(x -> String.format("%s:%d", x.getId(), x.merged)).collect(Collectors.joining(",")));
// then
assertEquals(3, results.size());
@ -139,6 +120,8 @@ public class PromoteActionSetFromHDFSFunctionsTest {
case "id3":
assertEquals(3, result.merged);
break;
default:
throw new RuntimeException();
}
});
}
@ -147,8 +130,9 @@ public class PromoteActionSetFromHDFSFunctionsTest {
private String id;
private int merged = 1;
public void mergeFrom(Oaf e) {
merged += ((OafImpl) e).merged;
public OafImpl mergeAngGet(OafImpl e) {
merged += e.merged;
return this;
}
public String getId() {
@ -174,7 +158,4 @@ public class PromoteActionSetFromHDFSFunctionsTest {
return x;
}
private static String createActionPayload(String id) {
return String.format("{\"id\":\"%s\"}", id);
}
}