|
|
|
@ -69,9 +69,9 @@ public class PromoteActionPayloadForGraphTableJob {
|
|
|
|
|
logger.info("strategy: {}", strategy);
|
|
|
|
|
|
|
|
|
|
Boolean shouldGroupById = Optional
|
|
|
|
|
.ofNullable(parser.get("shouldGroupById"))
|
|
|
|
|
.map(Boolean::valueOf)
|
|
|
|
|
.orElse(true);
|
|
|
|
|
.ofNullable(parser.get("shouldGroupById"))
|
|
|
|
|
.map(Boolean::valueOf)
|
|
|
|
|
.orElse(true);
|
|
|
|
|
logger.info("shouldGroupById: {}", shouldGroupById);
|
|
|
|
|
|
|
|
|
|
Class<? extends Oaf> rowClazz = (Class<? extends Oaf>) Class.forName(graphTableClassName);
|
|
|
|
@ -116,13 +116,13 @@ public class PromoteActionPayloadForGraphTableJob {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static <G extends Oaf, A extends Oaf> void promoteActionPayloadForGraphTable(
|
|
|
|
|
SparkSession spark,
|
|
|
|
|
String inputGraphTablePath,
|
|
|
|
|
String inputActionPayloadPath,
|
|
|
|
|
String outputGraphTablePath,
|
|
|
|
|
MergeAndGet.Strategy strategy,
|
|
|
|
|
Class<G> rowClazz,
|
|
|
|
|
Class<A> actionPayloadClazz, Boolean shouldGroupById) {
|
|
|
|
|
SparkSession spark,
|
|
|
|
|
String inputGraphTablePath,
|
|
|
|
|
String inputActionPayloadPath,
|
|
|
|
|
String outputGraphTablePath,
|
|
|
|
|
MergeAndGet.Strategy strategy,
|
|
|
|
|
Class<G> rowClazz,
|
|
|
|
|
Class<A> actionPayloadClazz, Boolean shouldGroupById) {
|
|
|
|
|
Dataset<G> rowDS = readGraphTable(spark, inputGraphTablePath, rowClazz);
|
|
|
|
|
Dataset<A> actionPayloadDS = readActionPayload(spark, inputActionPayloadPath, actionPayloadClazz);
|
|
|
|
|
|
|
|
|
@ -208,8 +208,8 @@ public class PromoteActionPayloadForGraphTableJob {
|
|
|
|
|
|
|
|
|
|
if (shouldGroupById) {
|
|
|
|
|
return PromoteActionPayloadFunctions
|
|
|
|
|
.groupGraphTableByIdAndMerge(
|
|
|
|
|
joinedAndMerged, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, rowClazz);
|
|
|
|
|
.groupGraphTableByIdAndMerge(
|
|
|
|
|
joinedAndMerged, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, rowClazz);
|
|
|
|
|
} else {
|
|
|
|
|
return joinedAndMerged;
|
|
|
|
|
}
|
|
|
|
|