Merge pull request 'Support for the PromoteAction strategy' (#389) from promote_actions_join_type into beta

Reviewed-on: D-Net/dnet-hadoop#389
This commit is contained in:
Claudio Atzori 2024-02-08 15:08:05 +01:00
commit e6bdee86d1
13 changed files with 84 additions and 10 deletions

View File

@ -0,0 +1,39 @@
/*
* Copyright (c) 2024.
* SPDX-FileCopyrightText: © 2023 Consiglio Nazionale delle Ricerche
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package eu.dnetlib.dhp.actionmanager.promote;
/** Encodes the Actionset promotion strategies */
public class PromoteAction {
/** The supported actionset promotion strategies
*
* ENRICH: promotes only records in the actionset matching another record in the
* graph and enriches them applying the given MergeAndGet strategy
* UPSERT: promotes all the records in an actionset, matching records are updated
* using the given MergeAndGet strategy, the non-matching record as inserted as they are.
*/
public enum Strategy {
ENRICH, UPSERT
}
/**
* Returns the string representation of the join type implementing the given PromoteAction.
*
* @param strategy the strategy to be used to promote the Actionset contents
* @return the join type used to implement the promotion strategy
*/
public static String joinTypeForStrategy(PromoteAction.Strategy strategy) {
switch (strategy) {
case ENRICH:
return "left_outer";
case UPSERT:
return "full_outer";
default:
throw new IllegalStateException("unsupported PromoteAction: " + strategy.toString());
}
}
}

View File

@ -67,8 +67,9 @@ public class PromoteActionPayloadForGraphTableJob {
String outputGraphTablePath = parser.get("outputGraphTablePath"); String outputGraphTablePath = parser.get("outputGraphTablePath");
logger.info("outputGraphTablePath: {}", outputGraphTablePath); logger.info("outputGraphTablePath: {}", outputGraphTablePath);
MergeAndGet.Strategy strategy = MergeAndGet.Strategy.valueOf(parser.get("mergeAndGetStrategy").toUpperCase()); MergeAndGet.Strategy mergeAndGetStrategy = MergeAndGet.Strategy
logger.info("strategy: {}", strategy); .valueOf(parser.get("mergeAndGetStrategy").toUpperCase());
logger.info("mergeAndGetStrategy: {}", mergeAndGetStrategy);
Boolean shouldGroupById = Optional Boolean shouldGroupById = Optional
.ofNullable(parser.get("shouldGroupById")) .ofNullable(parser.get("shouldGroupById"))
@ -76,6 +77,12 @@ public class PromoteActionPayloadForGraphTableJob {
.orElse(true); .orElse(true);
logger.info("shouldGroupById: {}", shouldGroupById); logger.info("shouldGroupById: {}", shouldGroupById);
PromoteAction.Strategy promoteActionStrategy = Optional
.ofNullable(parser.get("promoteActionStrategy"))
.map(PromoteAction.Strategy::valueOf)
.orElse(PromoteAction.Strategy.UPSERT);
logger.info("promoteActionStrategy: {}", promoteActionStrategy);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Class<? extends Oaf> rowClazz = (Class<? extends Oaf>) Class.forName(graphTableClassName); Class<? extends Oaf> rowClazz = (Class<? extends Oaf>) Class.forName(graphTableClassName);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -97,7 +104,8 @@ public class PromoteActionPayloadForGraphTableJob {
inputGraphTablePath, inputGraphTablePath,
inputActionPayloadPath, inputActionPayloadPath,
outputGraphTablePath, outputGraphTablePath,
strategy, mergeAndGetStrategy,
promoteActionStrategy,
rowClazz, rowClazz,
actionPayloadClazz, actionPayloadClazz,
shouldGroupById); shouldGroupById);
@ -124,14 +132,16 @@ public class PromoteActionPayloadForGraphTableJob {
String inputGraphTablePath, String inputGraphTablePath,
String inputActionPayloadPath, String inputActionPayloadPath,
String outputGraphTablePath, String outputGraphTablePath,
MergeAndGet.Strategy strategy, MergeAndGet.Strategy mergeAndGetStrategy,
PromoteAction.Strategy promoteActionStrategy,
Class<G> rowClazz, Class<G> rowClazz,
Class<A> actionPayloadClazz, Boolean shouldGroupById) { Class<A> actionPayloadClazz, Boolean shouldGroupById) {
Dataset<G> rowDS = readGraphTable(spark, inputGraphTablePath, rowClazz); Dataset<G> rowDS = readGraphTable(spark, inputGraphTablePath, rowClazz);
Dataset<A> actionPayloadDS = readActionPayload(spark, inputActionPayloadPath, actionPayloadClazz); Dataset<A> actionPayloadDS = readActionPayload(spark, inputActionPayloadPath, actionPayloadClazz);
Dataset<G> result = promoteActionPayloadForGraphTable( Dataset<G> result = promoteActionPayloadForGraphTable(
rowDS, actionPayloadDS, strategy, rowClazz, actionPayloadClazz, shouldGroupById) rowDS, actionPayloadDS, mergeAndGetStrategy, promoteActionStrategy, rowClazz, actionPayloadClazz,
shouldGroupById)
.map((MapFunction<G, G>) value -> value, Encoders.bean(rowClazz)); .map((MapFunction<G, G>) value -> value, Encoders.bean(rowClazz));
saveGraphTable(result, outputGraphTablePath); saveGraphTable(result, outputGraphTablePath);
@ -183,7 +193,8 @@ public class PromoteActionPayloadForGraphTableJob {
private static <G extends Oaf, A extends Oaf> Dataset<G> promoteActionPayloadForGraphTable( private static <G extends Oaf, A extends Oaf> Dataset<G> promoteActionPayloadForGraphTable(
Dataset<G> rowDS, Dataset<G> rowDS,
Dataset<A> actionPayloadDS, Dataset<A> actionPayloadDS,
MergeAndGet.Strategy strategy, MergeAndGet.Strategy mergeAndGetStrategy,
PromoteAction.Strategy promoteActionStrategy,
Class<G> rowClazz, Class<G> rowClazz,
Class<A> actionPayloadClazz, Class<A> actionPayloadClazz,
Boolean shouldGroupById) { Boolean shouldGroupById) {
@ -195,8 +206,9 @@ public class PromoteActionPayloadForGraphTableJob {
SerializableSupplier<Function<G, String>> rowIdFn = ModelSupport::idFn; SerializableSupplier<Function<G, String>> rowIdFn = ModelSupport::idFn;
SerializableSupplier<Function<A, String>> actionPayloadIdFn = ModelSupport::idFn; SerializableSupplier<Function<A, String>> actionPayloadIdFn = ModelSupport::idFn;
SerializableSupplier<BiFunction<G, A, G>> mergeRowWithActionPayloadAndGetFn = MergeAndGet.functionFor(strategy); SerializableSupplier<BiFunction<G, A, G>> mergeRowWithActionPayloadAndGetFn = MergeAndGet
SerializableSupplier<BiFunction<G, G, G>> mergeRowsAndGetFn = MergeAndGet.functionFor(strategy); .functionFor(mergeAndGetStrategy);
SerializableSupplier<BiFunction<G, G, G>> mergeRowsAndGetFn = MergeAndGet.functionFor(mergeAndGetStrategy);
SerializableSupplier<G> zeroFn = zeroFn(rowClazz); SerializableSupplier<G> zeroFn = zeroFn(rowClazz);
SerializableSupplier<Function<G, Boolean>> isNotZeroFn = PromoteActionPayloadForGraphTableJob::isNotZeroFnUsingIdOrSourceAndTarget; SerializableSupplier<Function<G, Boolean>> isNotZeroFn = PromoteActionPayloadForGraphTableJob::isNotZeroFnUsingIdOrSourceAndTarget;
@ -207,6 +219,7 @@ public class PromoteActionPayloadForGraphTableJob {
rowIdFn, rowIdFn,
actionPayloadIdFn, actionPayloadIdFn,
mergeRowWithActionPayloadAndGetFn, mergeRowWithActionPayloadAndGetFn,
promoteActionStrategy,
rowClazz, rowClazz,
actionPayloadClazz); actionPayloadClazz);

View File

@ -34,6 +34,7 @@ public class PromoteActionPayloadFunctions {
* @param rowIdFn Function used to get the id of graph table row * @param rowIdFn Function used to get the id of graph table row
* @param actionPayloadIdFn Function used to get id of action payload instance * @param actionPayloadIdFn Function used to get id of action payload instance
* @param mergeAndGetFn Function used to merge graph table row and action payload instance * @param mergeAndGetFn Function used to merge graph table row and action payload instance
* @param promoteActionStrategy the Actionset promotion strategy
* @param rowClazz Class of graph table * @param rowClazz Class of graph table
* @param actionPayloadClazz Class of action payload * @param actionPayloadClazz Class of action payload
* @param <G> Type of graph table row * @param <G> Type of graph table row
@ -46,6 +47,7 @@ public class PromoteActionPayloadFunctions {
SerializableSupplier<Function<G, String>> rowIdFn, SerializableSupplier<Function<G, String>> rowIdFn,
SerializableSupplier<Function<A, String>> actionPayloadIdFn, SerializableSupplier<Function<A, String>> actionPayloadIdFn,
SerializableSupplier<BiFunction<G, A, G>> mergeAndGetFn, SerializableSupplier<BiFunction<G, A, G>> mergeAndGetFn,
PromoteAction.Strategy promoteActionStrategy,
Class<G> rowClazz, Class<G> rowClazz,
Class<A> actionPayloadClazz) { Class<A> actionPayloadClazz) {
if (!isSubClass(rowClazz, actionPayloadClazz)) { if (!isSubClass(rowClazz, actionPayloadClazz)) {
@ -61,7 +63,7 @@ public class PromoteActionPayloadFunctions {
.joinWith( .joinWith(
actionPayloadWithIdDS, actionPayloadWithIdDS,
rowWithIdDS.col("_1").equalTo(actionPayloadWithIdDS.col("_1")), rowWithIdDS.col("_1").equalTo(actionPayloadWithIdDS.col("_1")),
"full_outer") PromoteAction.joinTypeForStrategy(promoteActionStrategy))
.map( .map(
(MapFunction<Tuple2<Tuple2<String, G>, Tuple2<String, A>>, G>) value -> { (MapFunction<Tuple2<Tuple2<String, G>, Tuple2<String, A>>, G>) value -> {
Optional<G> rowOpt = Optional.ofNullable(value._1()).map(Tuple2::_2); Optional<G> rowOpt = Optional.ofNullable(value._1()).map(Tuple2::_2);

View File

@ -41,6 +41,12 @@
"paramDescription": "strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET", "paramDescription": "strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET",
"paramRequired": true "paramRequired": true
}, },
{
"paramName": "pas",
"paramLongName": "promoteActionStrategy",
"paramDescription": "strategy for promoting the actionset contents into the graph tables, ENRICH or UPSERT (default)",
"paramRequired": false
},
{ {
"paramName": "sgid", "paramName": "sgid",
"paramLongName": "shouldGroupById", "paramLongName": "shouldGroupById",

View File

@ -115,6 +115,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputGraphTablePath</arg><arg>${workingDir}/dataset</arg> <arg>--outputGraphTablePath</arg><arg>${workingDir}/dataset</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg> <arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
</spark> </spark>
<ok to="DecisionPromoteResultActionPayloadForDatasetTable"/> <ok to="DecisionPromoteResultActionPayloadForDatasetTable"/>
@ -167,6 +168,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg>
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/dataset</arg> <arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/dataset</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg> <arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>

View File

@ -106,6 +106,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/datasource</arg> <arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/datasource</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -106,6 +106,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/organization</arg> <arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/organization</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -114,6 +114,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputGraphTablePath</arg><arg>${workingDir}/otherresearchproduct</arg> <arg>--outputGraphTablePath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg> <arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
</spark> </spark>
<ok to="DecisionPromoteResultActionPayloadForOtherResearchProductTable"/> <ok to="DecisionPromoteResultActionPayloadForOtherResearchProductTable"/>
@ -166,6 +167,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg>
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/otherresearchproduct</arg> <arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/otherresearchproduct</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg> <arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>

View File

@ -106,6 +106,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/project</arg> <arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/project</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -115,6 +115,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputGraphTablePath</arg><arg>${workingDir}/publication</arg> <arg>--outputGraphTablePath</arg><arg>${workingDir}/publication</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg> <arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
</spark> </spark>
<ok to="DecisionPromoteResultActionPayloadForPublicationTable"/> <ok to="DecisionPromoteResultActionPayloadForPublicationTable"/>
@ -167,6 +168,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg>
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/publication</arg> <arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/publication</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg> <arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>

View File

@ -107,6 +107,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/relation</arg> <arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/relation</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -114,6 +114,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputGraphTablePath</arg><arg>${workingDir}/software</arg> <arg>--outputGraphTablePath</arg><arg>${workingDir}/software</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg> <arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
</spark> </spark>
<ok to="DecisionPromoteResultActionPayloadForSoftwareTable"/> <ok to="DecisionPromoteResultActionPayloadForSoftwareTable"/>
@ -166,6 +167,7 @@
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg> <arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg>
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/software</arg> <arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/software</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg> <arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg> <arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>

View File

@ -54,7 +54,7 @@ public class PromoteActionPayloadFunctionsTest {
RuntimeException.class, RuntimeException.class,
() -> PromoteActionPayloadFunctions () -> PromoteActionPayloadFunctions
.joinGraphTableWithActionPayloadAndMerge( .joinGraphTableWithActionPayloadAndMerge(
null, null, null, null, null, OafImplSubSub.class, OafImpl.class)); null, null, null, null, null, null, OafImplSubSub.class, OafImpl.class));
} }
@Test @Test
@ -104,6 +104,7 @@ public class PromoteActionPayloadFunctionsTest {
rowIdFn, rowIdFn,
actionPayloadIdFn, actionPayloadIdFn,
mergeAndGetFn, mergeAndGetFn,
PromoteAction.Strategy.UPSERT,
OafImplSubSub.class, OafImplSubSub.class,
OafImplSubSub.class) OafImplSubSub.class)
.collectAsList(); .collectAsList();
@ -183,6 +184,7 @@ public class PromoteActionPayloadFunctionsTest {
rowIdFn, rowIdFn,
actionPayloadIdFn, actionPayloadIdFn,
mergeAndGetFn, mergeAndGetFn,
PromoteAction.Strategy.UPSERT,
OafImplSubSub.class, OafImplSubSub.class,
OafImplSub.class) OafImplSub.class)
.collectAsList(); .collectAsList();