actions promotion can optionally avoid grouping objects by id (configured via shouldGroupById parameter)

This commit is contained in:
Claudio Atzori 2020-12-07 13:45:18 +01:00
parent 943b961cf6
commit 21ddcf3a73
8 changed files with 66 additions and 15 deletions

View File

@ -68,6 +68,12 @@ public class PromoteActionPayloadForGraphTableJob {
MergeAndGet.Strategy strategy = MergeAndGet.Strategy.valueOf(parser.get("mergeAndGetStrategy").toUpperCase()); MergeAndGet.Strategy strategy = MergeAndGet.Strategy.valueOf(parser.get("mergeAndGetStrategy").toUpperCase());
logger.info("strategy: {}", strategy); logger.info("strategy: {}", strategy);
Boolean shouldGroupById = Optional
.ofNullable(parser.get("shouldGroupById"))
.map(Boolean::valueOf)
.orElse(true);
logger.info("shouldGroupById: {}", shouldGroupById);
Class<? extends Oaf> rowClazz = (Class<? extends Oaf>) Class.forName(graphTableClassName); Class<? extends Oaf> rowClazz = (Class<? extends Oaf>) Class.forName(graphTableClassName);
Class<? extends Oaf> actionPayloadClazz = (Class<? extends Oaf>) Class.forName(actionPayloadClassName); Class<? extends Oaf> actionPayloadClazz = (Class<? extends Oaf>) Class.forName(actionPayloadClassName);
@ -89,7 +95,8 @@ public class PromoteActionPayloadForGraphTableJob {
outputGraphTablePath, outputGraphTablePath,
strategy, strategy,
rowClazz, rowClazz,
actionPayloadClazz); actionPayloadClazz,
shouldGroupById);
}); });
} }
@ -109,18 +116,18 @@ public class PromoteActionPayloadForGraphTableJob {
} }
private static <G extends Oaf, A extends Oaf> void promoteActionPayloadForGraphTable( private static <G extends Oaf, A extends Oaf> void promoteActionPayloadForGraphTable(
SparkSession spark, SparkSession spark,
String inputGraphTablePath, String inputGraphTablePath,
String inputActionPayloadPath, String inputActionPayloadPath,
String outputGraphTablePath, String outputGraphTablePath,
MergeAndGet.Strategy strategy, MergeAndGet.Strategy strategy,
Class<G> rowClazz, Class<G> rowClazz,
Class<A> actionPayloadClazz) { Class<A> actionPayloadClazz, Boolean shouldGroupById) {
Dataset<G> rowDS = readGraphTable(spark, inputGraphTablePath, rowClazz); Dataset<G> rowDS = readGraphTable(spark, inputGraphTablePath, rowClazz);
Dataset<A> actionPayloadDS = readActionPayload(spark, inputActionPayloadPath, actionPayloadClazz); Dataset<A> actionPayloadDS = readActionPayload(spark, inputActionPayloadPath, actionPayloadClazz);
Dataset<G> result = promoteActionPayloadForGraphTable( Dataset<G> result = promoteActionPayloadForGraphTable(
rowDS, actionPayloadDS, strategy, rowClazz, actionPayloadClazz) rowDS, actionPayloadDS, strategy, rowClazz, actionPayloadClazz, shouldGroupById)
.map((MapFunction<G, G>) value -> value, Encoders.bean(rowClazz)); .map((MapFunction<G, G>) value -> value, Encoders.bean(rowClazz));
saveGraphTable(result, outputGraphTablePath); saveGraphTable(result, outputGraphTablePath);
@ -174,7 +181,8 @@ public class PromoteActionPayloadForGraphTableJob {
Dataset<A> actionPayloadDS, Dataset<A> actionPayloadDS,
MergeAndGet.Strategy strategy, MergeAndGet.Strategy strategy,
Class<G> rowClazz, Class<G> rowClazz,
Class<A> actionPayloadClazz) { Class<A> actionPayloadClazz,
Boolean shouldGroupById) {
logger logger
.info( .info(
"Promoting action payload for graph table: payload={}, table={}", "Promoting action payload for graph table: payload={}, table={}",
@ -198,9 +206,13 @@ public class PromoteActionPayloadForGraphTableJob {
rowClazz, rowClazz,
actionPayloadClazz); actionPayloadClazz);
return PromoteActionPayloadFunctions if (shouldGroupById) {
.groupGraphTableByIdAndMerge( return PromoteActionPayloadFunctions
joinedAndMerged, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, rowClazz); .groupGraphTableByIdAndMerge(
joinedAndMerged, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, rowClazz);
} else {
return joinedAndMerged;
}
} }
private static <T extends Oaf> SerializableSupplier<T> zeroFn(Class<T> clazz) { private static <T extends Oaf> SerializableSupplier<T> zeroFn(Class<T> clazz) {

View File

@ -40,5 +40,11 @@
"paramLongName": "mergeAndGetStrategy", "paramLongName": "mergeAndGetStrategy",
"paramDescription": "strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET", "paramDescription": "strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET",
"paramRequired": true "paramRequired": true
},
{
"paramName": "sgid",
"paramLongName": "shouldGroupById",
"paramDescription": "indicates whether the promotion operation should group objects in the graph by id or not",
"paramRequired": true
} }
] ]

View File

@ -24,6 +24,10 @@
<name>mergeAndGetStrategy</name> <name>mergeAndGetStrategy</name>
<description>strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET</description> <description>strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET</description>
</property> </property>
<property>
<name>shouldGroupById</name>
<description>indicates whether the promotion operation should group objects in the graph by id or not</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -111,6 +115,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputGraphTablePath</arg><arg>${workingDir}/dataset</arg> <arg>--outputGraphTablePath</arg><arg>${workingDir}/dataset</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
</spark> </spark>
<ok to="DecisionPromoteResultActionPayloadForDatasetTable"/> <ok to="DecisionPromoteResultActionPayloadForDatasetTable"/>
<error to="Kill"/> <error to="Kill"/>
@ -162,6 +167,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg>
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/dataset</arg> <arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/dataset</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -56,6 +56,11 @@
<name>mergeAndGetStrategy</name> <name>mergeAndGetStrategy</name>
<description>strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET</description> <description>strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET</description>
</property> </property>
<property>
<name>shouldGroupById</name>
<value>false</value>
<description>indicates whether the promotion operation should group objects in the graph by id or not</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>

View File

@ -24,6 +24,10 @@
<name>mergeAndGetStrategy</name> <name>mergeAndGetStrategy</name>
<description>strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET</description> <description>strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET</description>
</property> </property>
<property>
<name>shouldGroupById</name>
<description>indicates whether the promotion operation should group objects in the graph by id or not</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -110,6 +114,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputGraphTablePath</arg><arg>${workingDir}/otherresearchproduct</arg> <arg>--outputGraphTablePath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
</spark> </spark>
<ok to="DecisionPromoteResultActionPayloadForOtherResearchProductTable"/> <ok to="DecisionPromoteResultActionPayloadForOtherResearchProductTable"/>
<error to="Kill"/> <error to="Kill"/>
@ -161,6 +166,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg>
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/otherresearchproduct</arg> <arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/otherresearchproduct</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -24,6 +24,10 @@
<name>mergeAndGetStrategy</name> <name>mergeAndGetStrategy</name>
<description>strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET</description> <description>strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET</description>
</property> </property>
<property>
<name>shouldGroupById</name>
<description>indicates whether the promotion operation should group objects in the graph by id or not</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -111,6 +115,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputGraphTablePath</arg><arg>${workingDir}/publication</arg> <arg>--outputGraphTablePath</arg><arg>${workingDir}/publication</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
</spark> </spark>
<ok to="DecisionPromoteResultActionPayloadForPublicationTable"/> <ok to="DecisionPromoteResultActionPayloadForPublicationTable"/>
<error to="Kill"/> <error to="Kill"/>
@ -162,6 +167,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg>
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/publication</arg> <arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/publication</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -24,6 +24,10 @@
<name>mergeAndGetStrategy</name> <name>mergeAndGetStrategy</name>
<description>strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET</description> <description>strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET</description>
</property> </property>
<property>
<name>shouldGroupById</name>
<description>indicates whether the promotion operation should group objects in the graph by id or not</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -110,6 +114,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputGraphTablePath</arg><arg>${workingDir}/software</arg> <arg>--outputGraphTablePath</arg><arg>${workingDir}/software</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
</spark> </spark>
<ok to="DecisionPromoteResultActionPayloadForSoftwareTable"/> <ok to="DecisionPromoteResultActionPayloadForSoftwareTable"/>
<error to="Kill"/> <error to="Kill"/>
@ -161,6 +166,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg>
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/software</arg> <arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/software</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -101,7 +101,9 @@ public class PromoteActionPayloadForGraphTableJobTest {
"-outputGraphTablePath", "-outputGraphTablePath",
"", "",
"-mergeAndGetStrategy", "-mergeAndGetStrategy",
MergeAndGet.Strategy.SELECT_NEWER_AND_GET.name() MergeAndGet.Strategy.SELECT_NEWER_AND_GET.name(),
"--shouldGroupById",
"true"
})); }));
// then // then
@ -141,7 +143,9 @@ public class PromoteActionPayloadForGraphTableJobTest {
"-outputGraphTablePath", "-outputGraphTablePath",
outputGraphTableDir.toString(), outputGraphTableDir.toString(),
"-mergeAndGetStrategy", "-mergeAndGetStrategy",
strategy.name() strategy.name(),
"--shouldGroupById",
"true"
}); });
// then // then