diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java index 5fa9e67235..be775f3587 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java @@ -68,6 +68,12 @@ public class PromoteActionPayloadForGraphTableJob { MergeAndGet.Strategy strategy = MergeAndGet.Strategy.valueOf(parser.get("mergeAndGetStrategy").toUpperCase()); logger.info("strategy: {}", strategy); + Boolean shouldGroupById = Optional + .ofNullable(parser.get("shouldGroupById")) + .map(Boolean::valueOf) + .orElse(true); + logger.info("shouldGroupById: {}", shouldGroupById); + Class rowClazz = (Class) Class.forName(graphTableClassName); Class actionPayloadClazz = (Class) Class.forName(actionPayloadClassName); @@ -89,7 +95,8 @@ public class PromoteActionPayloadForGraphTableJob { outputGraphTablePath, strategy, rowClazz, - actionPayloadClazz); + actionPayloadClazz, + shouldGroupById); }); } @@ -109,18 +116,18 @@ public class PromoteActionPayloadForGraphTableJob { } private static void promoteActionPayloadForGraphTable( - SparkSession spark, - String inputGraphTablePath, - String inputActionPayloadPath, - String outputGraphTablePath, - MergeAndGet.Strategy strategy, - Class rowClazz, - Class actionPayloadClazz) { + SparkSession spark, + String inputGraphTablePath, + String inputActionPayloadPath, + String outputGraphTablePath, + MergeAndGet.Strategy strategy, + Class rowClazz, + Class actionPayloadClazz, Boolean shouldGroupById) { Dataset rowDS = readGraphTable(spark, inputGraphTablePath, rowClazz); Dataset actionPayloadDS = readActionPayload(spark, inputActionPayloadPath, actionPayloadClazz); Dataset result = promoteActionPayloadForGraphTable( - rowDS, actionPayloadDS, strategy, rowClazz, actionPayloadClazz) + rowDS, actionPayloadDS, strategy, rowClazz, actionPayloadClazz, shouldGroupById) .map((MapFunction) value -> value, Encoders.bean(rowClazz)); saveGraphTable(result, outputGraphTablePath); @@ -174,7 +181,8 @@ public class PromoteActionPayloadForGraphTableJob { Dataset actionPayloadDS, MergeAndGet.Strategy strategy, Class rowClazz, - Class actionPayloadClazz) { + Class actionPayloadClazz, + Boolean shouldGroupById) { logger .info( "Promoting action payload for graph table: payload={}, table={}", @@ -198,9 +206,13 @@ public class PromoteActionPayloadForGraphTableJob { rowClazz, actionPayloadClazz); - return PromoteActionPayloadFunctions - .groupGraphTableByIdAndMerge( - joinedAndMerged, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, rowClazz); + if (shouldGroupById) { + return PromoteActionPayloadFunctions + .groupGraphTableByIdAndMerge( + joinedAndMerged, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, rowClazz); + } else { + return joinedAndMerged; + } } private static SerializableSupplier zeroFn(Class clazz) { diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/promote/promote_action_payload_for_graph_table_input_parameters.json b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/promote/promote_action_payload_for_graph_table_input_parameters.json index e111f156e5..f5eae8a302 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/promote/promote_action_payload_for_graph_table_input_parameters.json +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/promote/promote_action_payload_for_graph_table_input_parameters.json @@ -40,5 +40,11 @@ "paramLongName": "mergeAndGetStrategy", "paramDescription": "strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET", "paramRequired": true + }, + { + "paramName": "sgid", + "paramLongName": "shouldGroupById", + "paramDescription": "indicates whether the promotion operation should group objects in the graph by id or not", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/dataset/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/dataset/oozie_app/workflow.xml index f95349935e..4dc250c292 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/dataset/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/dataset/oozie_app/workflow.xml @@ -24,6 +24,10 @@ mergeAndGetStrategy strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET + + shouldGroupById + indicates whether the promotion operation should group objects in the graph by id or not + sparkDriverMemory memory for driver process @@ -111,6 +115,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Dataset --outputGraphTablePath${workingDir}/dataset --mergeAndGetStrategy${mergeAndGetStrategy} + --shouldGroupById${shouldGroupById} @@ -162,6 +167,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Result --outputGraphTablePath${outputGraphRootPath}/dataset --mergeAndGetStrategy${mergeAndGetStrategy} + --shouldGroupById${shouldGroupById} diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/workflow.xml index 25afc34c99..393f04e89c 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/workflow.xml @@ -56,6 +56,11 @@ mergeAndGetStrategy strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET + + shouldGroupById + false + indicates whether the promotion operation should group objects in the graph by id or not + sparkDriverMemory memory for driver process diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/otherresearchproduct/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/otherresearchproduct/oozie_app/workflow.xml index 0deb1b945c..7bac760e2c 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/otherresearchproduct/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/otherresearchproduct/oozie_app/workflow.xml @@ -24,6 +24,10 @@ mergeAndGetStrategy strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET + + shouldGroupById + indicates whether the promotion operation should group objects in the graph by id or not + sparkDriverMemory memory for driver process @@ -110,6 +114,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --outputGraphTablePath${workingDir}/otherresearchproduct --mergeAndGetStrategy${mergeAndGetStrategy} + --shouldGroupById${shouldGroupById} @@ -161,6 +166,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Result --outputGraphTablePath${outputGraphRootPath}/otherresearchproduct --mergeAndGetStrategy${mergeAndGetStrategy} + --shouldGroupById${shouldGroupById} diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app/workflow.xml index 70400a123c..2450bdad74 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app/workflow.xml @@ -24,6 +24,10 @@ mergeAndGetStrategy strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET + + shouldGroupById + indicates whether the promotion operation should group objects in the graph by id or not + sparkDriverMemory memory for driver process @@ -111,6 +115,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Publication --outputGraphTablePath${workingDir}/publication --mergeAndGetStrategy${mergeAndGetStrategy} + --shouldGroupById${shouldGroupById} @@ -162,6 +167,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Result --outputGraphTablePath${outputGraphRootPath}/publication --mergeAndGetStrategy${mergeAndGetStrategy} + --shouldGroupById${shouldGroupById} diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/software/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/software/oozie_app/workflow.xml index 396e277217..b5673b18f4 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/software/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/software/oozie_app/workflow.xml @@ -24,6 +24,10 @@ mergeAndGetStrategy strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET + + shouldGroupById + indicates whether the promotion operation should group objects in the graph by id or not + sparkDriverMemory memory for driver process @@ -110,6 +114,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Software --outputGraphTablePath${workingDir}/software --mergeAndGetStrategy${mergeAndGetStrategy} + --shouldGroupById${shouldGroupById} @@ -161,6 +166,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Result --outputGraphTablePath${outputGraphRootPath}/software --mergeAndGetStrategy${mergeAndGetStrategy} + --shouldGroupById${shouldGroupById} diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest.java index 129daadcc8..79ab55e072 100644 --- a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest.java +++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest.java @@ -101,7 +101,9 @@ public class PromoteActionPayloadForGraphTableJobTest { "-outputGraphTablePath", "", "-mergeAndGetStrategy", - MergeAndGet.Strategy.SELECT_NEWER_AND_GET.name() + MergeAndGet.Strategy.SELECT_NEWER_AND_GET.name(), + "--shouldGroupById", + "true" })); // then @@ -141,7 +143,9 @@ public class PromoteActionPayloadForGraphTableJobTest { "-outputGraphTablePath", outputGraphTableDir.toString(), "-mergeAndGetStrategy", - strategy.name() + strategy.name(), + "--shouldGroupById", + "true" }); // then