From 009dcf6aea063ff3ebd16967b08791a6b5e1c812 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 5 Feb 2024 16:43:40 +0200 Subject: [PATCH 1/2] [actiosets] introduced support for the PromoteAction strategy --- .../actionmanager/promote/PromoteAction.java | 39 +++++++++++++++++++ .../PromoteActionPayloadForGraphTableJob.java | 29 ++++++++++---- .../PromoteActionPayloadFunctions.java | 4 +- ...load_for_graph_table_input_parameters.json | 6 +++ .../wf/dataset/oozie_app/workflow.xml | 2 + .../wf/datasource/oozie_app/workflow.xml | 1 + .../wf/organization/oozie_app/workflow.xml | 1 + .../oozie_app/workflow.xml | 2 + .../wf/project/oozie_app/workflow.xml | 1 + .../wf/publication/oozie_app/workflow.xml | 2 + .../wf/relation/oozie_app/workflow.xml | 1 + .../wf/software/oozie_app/workflow.xml | 2 + .../PromoteActionPayloadFunctionsTest.java | 4 +- 13 files changed, 84 insertions(+), 10 deletions(-) create mode 100644 dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteAction.java diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteAction.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteAction.java new file mode 100644 index 000000000..163a8708e --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteAction.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2024. + * SPDX-FileCopyrightText: © 2023 Consiglio Nazionale delle Ricerche + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package eu.dnetlib.dhp.actionmanager.promote; + +/** Encodes the Actionset promotion strategies */ +public class PromoteAction { + + /** The supported actionset promotion strategies + * + * ENRICH: promotes only records in the actionset matching another record in the + * graph and enriches them applying the given MergeAndGet strategy + * UPSERT: promotes all the records in an actionset, matching records are updated + * using the given MergeAndGet strategy, the non-matching record as inserted as they are. + */ + public enum Strategy { + ENRICH, UPSERT + } + + /** + * Returns the string representation of the join type implementing the given PromoteAction. + * + * @param strategy the strategy to be used to promote the Actionset contents + * @return the join type used to implement the promotion strategy + */ + public static String joinTypeForStrategy(PromoteAction.Strategy strategy) { + switch (strategy) { + case ENRICH: + return "join"; + case UPSERT: + return "full_outer"; + default: + throw new IllegalStateException("unsupported PromoteAction: " + strategy.toString()); + } + } +} diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java index 7b024bea8..56cbda4d6 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java @@ -67,8 +67,9 @@ public class PromoteActionPayloadForGraphTableJob { String outputGraphTablePath = parser.get("outputGraphTablePath"); logger.info("outputGraphTablePath: {}", outputGraphTablePath); - MergeAndGet.Strategy strategy = MergeAndGet.Strategy.valueOf(parser.get("mergeAndGetStrategy").toUpperCase()); - logger.info("strategy: {}", strategy); + MergeAndGet.Strategy mergeAndGetStrategy = MergeAndGet.Strategy + .valueOf(parser.get("mergeAndGetStrategy").toUpperCase()); + logger.info("mergeAndGetStrategy: {}", mergeAndGetStrategy); Boolean shouldGroupById = Optional .ofNullable(parser.get("shouldGroupById")) @@ -76,6 +77,12 @@ public class PromoteActionPayloadForGraphTableJob { .orElse(true); logger.info("shouldGroupById: {}", shouldGroupById); + PromoteAction.Strategy promoteActionStrategy = Optional + .ofNullable(parser.get("promoteActionStrategy")) + .map(PromoteAction.Strategy::valueOf) + .orElse(PromoteAction.Strategy.UPSERT); + logger.info("promoteActionStrategy: {}", promoteActionStrategy); + @SuppressWarnings("unchecked") Class rowClazz = (Class) Class.forName(graphTableClassName); @SuppressWarnings("unchecked") @@ -97,7 +104,8 @@ public class PromoteActionPayloadForGraphTableJob { inputGraphTablePath, inputActionPayloadPath, outputGraphTablePath, - strategy, + mergeAndGetStrategy, + promoteActionStrategy, rowClazz, actionPayloadClazz, shouldGroupById); @@ -124,14 +132,16 @@ public class PromoteActionPayloadForGraphTableJob { String inputGraphTablePath, String inputActionPayloadPath, String outputGraphTablePath, - MergeAndGet.Strategy strategy, + MergeAndGet.Strategy mergeAndGetStrategy, + PromoteAction.Strategy promoteActionStrategy, Class rowClazz, Class actionPayloadClazz, Boolean shouldGroupById) { Dataset rowDS = readGraphTable(spark, inputGraphTablePath, rowClazz); Dataset actionPayloadDS = readActionPayload(spark, inputActionPayloadPath, actionPayloadClazz); Dataset result = promoteActionPayloadForGraphTable( - rowDS, actionPayloadDS, strategy, rowClazz, actionPayloadClazz, shouldGroupById) + rowDS, actionPayloadDS, mergeAndGetStrategy, promoteActionStrategy, rowClazz, actionPayloadClazz, + shouldGroupById) .map((MapFunction) value -> value, Encoders.bean(rowClazz)); saveGraphTable(result, outputGraphTablePath); @@ -183,7 +193,8 @@ public class PromoteActionPayloadForGraphTableJob { private static Dataset promoteActionPayloadForGraphTable( Dataset rowDS, Dataset actionPayloadDS, - MergeAndGet.Strategy strategy, + MergeAndGet.Strategy mergeAndGetStrategy, + PromoteAction.Strategy promoteActionStrategy, Class rowClazz, Class actionPayloadClazz, Boolean shouldGroupById) { @@ -195,8 +206,9 @@ public class PromoteActionPayloadForGraphTableJob { SerializableSupplier> rowIdFn = ModelSupport::idFn; SerializableSupplier> actionPayloadIdFn = ModelSupport::idFn; - SerializableSupplier> mergeRowWithActionPayloadAndGetFn = MergeAndGet.functionFor(strategy); - SerializableSupplier> mergeRowsAndGetFn = MergeAndGet.functionFor(strategy); + SerializableSupplier> mergeRowWithActionPayloadAndGetFn = MergeAndGet + .functionFor(mergeAndGetStrategy); + SerializableSupplier> mergeRowsAndGetFn = MergeAndGet.functionFor(mergeAndGetStrategy); SerializableSupplier zeroFn = zeroFn(rowClazz); SerializableSupplier> isNotZeroFn = PromoteActionPayloadForGraphTableJob::isNotZeroFnUsingIdOrSourceAndTarget; @@ -207,6 +219,7 @@ public class PromoteActionPayloadForGraphTableJob { rowIdFn, actionPayloadIdFn, mergeRowWithActionPayloadAndGetFn, + promoteActionStrategy, rowClazz, actionPayloadClazz); diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java index d799c646b..f0b094240 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java @@ -34,6 +34,7 @@ public class PromoteActionPayloadFunctions { * @param rowIdFn Function used to get the id of graph table row * @param actionPayloadIdFn Function used to get id of action payload instance * @param mergeAndGetFn Function used to merge graph table row and action payload instance + * @param promoteActionStrategy the Actionset promotion strategy * @param rowClazz Class of graph table * @param actionPayloadClazz Class of action payload * @param Type of graph table row @@ -46,6 +47,7 @@ public class PromoteActionPayloadFunctions { SerializableSupplier> rowIdFn, SerializableSupplier> actionPayloadIdFn, SerializableSupplier> mergeAndGetFn, + PromoteAction.Strategy promoteActionStrategy, Class rowClazz, Class actionPayloadClazz) { if (!isSubClass(rowClazz, actionPayloadClazz)) { @@ -61,7 +63,7 @@ public class PromoteActionPayloadFunctions { .joinWith( actionPayloadWithIdDS, rowWithIdDS.col("_1").equalTo(actionPayloadWithIdDS.col("_1")), - "full_outer") + PromoteAction.joinTypeForStrategy(promoteActionStrategy)) .map( (MapFunction, Tuple2>, G>) value -> { Optional rowOpt = Optional.ofNullable(value._1()).map(Tuple2::_2); diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/promote/promote_action_payload_for_graph_table_input_parameters.json b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/promote/promote_action_payload_for_graph_table_input_parameters.json index 00c9404ef..81a7c77d7 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/promote/promote_action_payload_for_graph_table_input_parameters.json +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/promote/promote_action_payload_for_graph_table_input_parameters.json @@ -41,6 +41,12 @@ "paramDescription": "strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET", "paramRequired": true }, + { + "paramName": "pas", + "paramLongName": "promoteActionStrategy", + "paramDescription": "strategy for promoting the actionset contents into the graph tables, ENRICH or UPSERT (default)", + "paramRequired": false + }, { "paramName": "sgid", "paramLongName": "shouldGroupById", diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/dataset/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/dataset/oozie_app/workflow.xml index 4f374a75a..5401b45ca 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/dataset/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/dataset/oozie_app/workflow.xml @@ -115,6 +115,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Dataset --outputGraphTablePath${workingDir}/dataset --mergeAndGetStrategy${mergeAndGetStrategy} + --promoteActionStrategy${promoteActionStrategy} --shouldGroupById${shouldGroupById} @@ -167,6 +168,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Result --outputGraphTablePath${outputGraphRootPath}/dataset --mergeAndGetStrategy${mergeAndGetStrategy} + --promoteActionStrategy${promoteActionStrategy} --shouldGroupById${shouldGroupById} diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/datasource/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/datasource/oozie_app/workflow.xml index c85ba4ac1..f9bd66ae3 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/datasource/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/datasource/oozie_app/workflow.xml @@ -106,6 +106,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Datasource --outputGraphTablePath${outputGraphRootPath}/datasource --mergeAndGetStrategy${mergeAndGetStrategy} + --promoteActionStrategy${promoteActionStrategy} diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/organization/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/organization/oozie_app/workflow.xml index 412cad70b..ebfdeee31 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/organization/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/organization/oozie_app/workflow.xml @@ -106,6 +106,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Organization --outputGraphTablePath${outputGraphRootPath}/organization --mergeAndGetStrategy${mergeAndGetStrategy} + --promoteActionStrategy${promoteActionStrategy} diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/otherresearchproduct/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/otherresearchproduct/oozie_app/workflow.xml index 7bac760e2..02399ed9b 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/otherresearchproduct/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/otherresearchproduct/oozie_app/workflow.xml @@ -114,6 +114,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --outputGraphTablePath${workingDir}/otherresearchproduct --mergeAndGetStrategy${mergeAndGetStrategy} + --promoteActionStrategy${promoteActionStrategy} --shouldGroupById${shouldGroupById} @@ -166,6 +167,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Result --outputGraphTablePath${outputGraphRootPath}/otherresearchproduct --mergeAndGetStrategy${mergeAndGetStrategy} + --promoteActionStrategy${promoteActionStrategy} --shouldGroupById${shouldGroupById} diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/project/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/project/oozie_app/workflow.xml index daf48e9d7..57c2357b4 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/project/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/project/oozie_app/workflow.xml @@ -106,6 +106,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Project --outputGraphTablePath${outputGraphRootPath}/project --mergeAndGetStrategy${mergeAndGetStrategy} + --promoteActionStrategy${promoteActionStrategy} diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app/workflow.xml index b76dc82f1..92b114776 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app/workflow.xml @@ -115,6 +115,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Publication --outputGraphTablePath${workingDir}/publication --mergeAndGetStrategy${mergeAndGetStrategy} + --promoteActionStrategy${promoteActionStrategy} --shouldGroupById${shouldGroupById} @@ -167,6 +168,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Result --outputGraphTablePath${outputGraphRootPath}/publication --mergeAndGetStrategy${mergeAndGetStrategy} + --promoteActionStrategy${promoteActionStrategy} --shouldGroupById${shouldGroupById} diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/relation/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/relation/oozie_app/workflow.xml index d3086dbdc..e9e5f0b45 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/relation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/relation/oozie_app/workflow.xml @@ -107,6 +107,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Relation --outputGraphTablePath${outputGraphRootPath}/relation --mergeAndGetStrategy${mergeAndGetStrategy} + --promoteActionStrategy${promoteActionStrategy} diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/software/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/software/oozie_app/workflow.xml index b5673b18f..1d36ddf94 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/software/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/software/oozie_app/workflow.xml @@ -114,6 +114,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Software --outputGraphTablePath${workingDir}/software --mergeAndGetStrategy${mergeAndGetStrategy} + --promoteActionStrategy${promoteActionStrategy} --shouldGroupById${shouldGroupById} @@ -166,6 +167,7 @@ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Result --outputGraphTablePath${outputGraphRootPath}/software --mergeAndGetStrategy${mergeAndGetStrategy} + --promoteActionStrategy${promoteActionStrategy} --shouldGroupById${shouldGroupById} diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctionsTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctionsTest.java index cbc1bfaba..777e2fa1c 100644 --- a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctionsTest.java +++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctionsTest.java @@ -54,7 +54,7 @@ public class PromoteActionPayloadFunctionsTest { RuntimeException.class, () -> PromoteActionPayloadFunctions .joinGraphTableWithActionPayloadAndMerge( - null, null, null, null, null, OafImplSubSub.class, OafImpl.class)); + null, null, null, null, null, null, OafImplSubSub.class, OafImpl.class)); } @Test @@ -104,6 +104,7 @@ public class PromoteActionPayloadFunctionsTest { rowIdFn, actionPayloadIdFn, mergeAndGetFn, + PromoteAction.Strategy.UPSERT, OafImplSubSub.class, OafImplSubSub.class) .collectAsList(); @@ -183,6 +184,7 @@ public class PromoteActionPayloadFunctionsTest { rowIdFn, actionPayloadIdFn, mergeAndGetFn, + PromoteAction.Strategy.UPSERT, OafImplSubSub.class, OafImplSub.class) .collectAsList(); From fd17c1f17c4470be0e45dfdf3c13255087ce8e59 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 5 Feb 2024 16:55:36 +0200 Subject: [PATCH 2/2] [actiosets] fixed join type --- .../eu/dnetlib/dhp/actionmanager/promote/PromoteAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteAction.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteAction.java index 163a8708e..8fb9c8c95 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteAction.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteAction.java @@ -29,7 +29,7 @@ public class PromoteAction { public static String joinTypeForStrategy(PromoteAction.Strategy strategy) { switch (strategy) { case ENRICH: - return "join"; + return "left_outer"; case UPSERT: return "full_outer"; default: