Support for the PromoteAction strategy [master] #391
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright (c) 2024.
|
||||
* SPDX-FileCopyrightText: © 2023 Consiglio Nazionale delle Ricerche
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
package eu.dnetlib.dhp.actionmanager.promote;
|
||||
|
||||
/** Encodes the Actionset promotion strategies */
|
||||
public class PromoteAction {
|
||||
|
||||
/** The supported actionset promotion strategies
|
||||
*
|
||||
* ENRICH: promotes only records in the actionset matching another record in the
|
||||
* graph and enriches them applying the given MergeAndGet strategy
|
||||
* UPSERT: promotes all the records in an actionset, matching records are updated
|
||||
* using the given MergeAndGet strategy, the non-matching record as inserted as they are.
|
||||
*/
|
||||
public enum Strategy {
|
||||
ENRICH, UPSERT
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the string representation of the join type implementing the given PromoteAction.
|
||||
*
|
||||
* @param strategy the strategy to be used to promote the Actionset contents
|
||||
* @return the join type used to implement the promotion strategy
|
||||
*/
|
||||
public static String joinTypeForStrategy(PromoteAction.Strategy strategy) {
|
||||
switch (strategy) {
|
||||
case ENRICH:
|
||||
return "left_outer";
|
||||
case UPSERT:
|
||||
return "full_outer";
|
||||
default:
|
||||
throw new IllegalStateException("unsupported PromoteAction: " + strategy.toString());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -67,8 +67,9 @@ public class PromoteActionPayloadForGraphTableJob {
|
|||
String outputGraphTablePath = parser.get("outputGraphTablePath");
|
||||
logger.info("outputGraphTablePath: {}", outputGraphTablePath);
|
||||
|
||||
MergeAndGet.Strategy strategy = MergeAndGet.Strategy.valueOf(parser.get("mergeAndGetStrategy").toUpperCase());
|
||||
logger.info("strategy: {}", strategy);
|
||||
MergeAndGet.Strategy mergeAndGetStrategy = MergeAndGet.Strategy
|
||||
.valueOf(parser.get("mergeAndGetStrategy").toUpperCase());
|
||||
logger.info("mergeAndGetStrategy: {}", mergeAndGetStrategy);
|
||||
|
||||
Boolean shouldGroupById = Optional
|
||||
.ofNullable(parser.get("shouldGroupById"))
|
||||
|
@ -76,6 +77,12 @@ public class PromoteActionPayloadForGraphTableJob {
|
|||
.orElse(true);
|
||||
logger.info("shouldGroupById: {}", shouldGroupById);
|
||||
|
||||
PromoteAction.Strategy promoteActionStrategy = Optional
|
||||
.ofNullable(parser.get("promoteActionStrategy"))
|
||||
.map(PromoteAction.Strategy::valueOf)
|
||||
.orElse(PromoteAction.Strategy.UPSERT);
|
||||
logger.info("promoteActionStrategy: {}", promoteActionStrategy);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
Class<? extends Oaf> rowClazz = (Class<? extends Oaf>) Class.forName(graphTableClassName);
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -97,7 +104,8 @@ public class PromoteActionPayloadForGraphTableJob {
|
|||
inputGraphTablePath,
|
||||
inputActionPayloadPath,
|
||||
outputGraphTablePath,
|
||||
strategy,
|
||||
mergeAndGetStrategy,
|
||||
promoteActionStrategy,
|
||||
rowClazz,
|
||||
actionPayloadClazz,
|
||||
shouldGroupById);
|
||||
|
@ -124,14 +132,16 @@ public class PromoteActionPayloadForGraphTableJob {
|
|||
String inputGraphTablePath,
|
||||
String inputActionPayloadPath,
|
||||
String outputGraphTablePath,
|
||||
MergeAndGet.Strategy strategy,
|
||||
MergeAndGet.Strategy mergeAndGetStrategy,
|
||||
PromoteAction.Strategy promoteActionStrategy,
|
||||
Class<G> rowClazz,
|
||||
Class<A> actionPayloadClazz, Boolean shouldGroupById) {
|
||||
Dataset<G> rowDS = readGraphTable(spark, inputGraphTablePath, rowClazz);
|
||||
Dataset<A> actionPayloadDS = readActionPayload(spark, inputActionPayloadPath, actionPayloadClazz);
|
||||
|
||||
Dataset<G> result = promoteActionPayloadForGraphTable(
|
||||
rowDS, actionPayloadDS, strategy, rowClazz, actionPayloadClazz, shouldGroupById)
|
||||
rowDS, actionPayloadDS, mergeAndGetStrategy, promoteActionStrategy, rowClazz, actionPayloadClazz,
|
||||
shouldGroupById)
|
||||
.map((MapFunction<G, G>) value -> value, Encoders.bean(rowClazz));
|
||||
|
||||
saveGraphTable(result, outputGraphTablePath);
|
||||
|
@ -183,7 +193,8 @@ public class PromoteActionPayloadForGraphTableJob {
|
|||
private static <G extends Oaf, A extends Oaf> Dataset<G> promoteActionPayloadForGraphTable(
|
||||
Dataset<G> rowDS,
|
||||
Dataset<A> actionPayloadDS,
|
||||
MergeAndGet.Strategy strategy,
|
||||
MergeAndGet.Strategy mergeAndGetStrategy,
|
||||
PromoteAction.Strategy promoteActionStrategy,
|
||||
Class<G> rowClazz,
|
||||
Class<A> actionPayloadClazz,
|
||||
Boolean shouldGroupById) {
|
||||
|
@ -195,8 +206,9 @@ public class PromoteActionPayloadForGraphTableJob {
|
|||
|
||||
SerializableSupplier<Function<G, String>> rowIdFn = ModelSupport::idFn;
|
||||
SerializableSupplier<Function<A, String>> actionPayloadIdFn = ModelSupport::idFn;
|
||||
SerializableSupplier<BiFunction<G, A, G>> mergeRowWithActionPayloadAndGetFn = MergeAndGet.functionFor(strategy);
|
||||
SerializableSupplier<BiFunction<G, G, G>> mergeRowsAndGetFn = MergeAndGet.functionFor(strategy);
|
||||
SerializableSupplier<BiFunction<G, A, G>> mergeRowWithActionPayloadAndGetFn = MergeAndGet
|
||||
.functionFor(mergeAndGetStrategy);
|
||||
SerializableSupplier<BiFunction<G, G, G>> mergeRowsAndGetFn = MergeAndGet.functionFor(mergeAndGetStrategy);
|
||||
SerializableSupplier<G> zeroFn = zeroFn(rowClazz);
|
||||
SerializableSupplier<Function<G, Boolean>> isNotZeroFn = PromoteActionPayloadForGraphTableJob::isNotZeroFnUsingIdOrSourceAndTarget;
|
||||
|
||||
|
@ -207,6 +219,7 @@ public class PromoteActionPayloadForGraphTableJob {
|
|||
rowIdFn,
|
||||
actionPayloadIdFn,
|
||||
mergeRowWithActionPayloadAndGetFn,
|
||||
promoteActionStrategy,
|
||||
rowClazz,
|
||||
actionPayloadClazz);
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ public class PromoteActionPayloadFunctions {
|
|||
* @param rowIdFn Function used to get the id of graph table row
|
||||
* @param actionPayloadIdFn Function used to get id of action payload instance
|
||||
* @param mergeAndGetFn Function used to merge graph table row and action payload instance
|
||||
* @param promoteActionStrategy the Actionset promotion strategy
|
||||
* @param rowClazz Class of graph table
|
||||
* @param actionPayloadClazz Class of action payload
|
||||
* @param <G> Type of graph table row
|
||||
|
@ -46,6 +47,7 @@ public class PromoteActionPayloadFunctions {
|
|||
SerializableSupplier<Function<G, String>> rowIdFn,
|
||||
SerializableSupplier<Function<A, String>> actionPayloadIdFn,
|
||||
SerializableSupplier<BiFunction<G, A, G>> mergeAndGetFn,
|
||||
PromoteAction.Strategy promoteActionStrategy,
|
||||
Class<G> rowClazz,
|
||||
Class<A> actionPayloadClazz) {
|
||||
if (!isSubClass(rowClazz, actionPayloadClazz)) {
|
||||
|
@ -61,7 +63,7 @@ public class PromoteActionPayloadFunctions {
|
|||
.joinWith(
|
||||
actionPayloadWithIdDS,
|
||||
rowWithIdDS.col("_1").equalTo(actionPayloadWithIdDS.col("_1")),
|
||||
"full_outer")
|
||||
PromoteAction.joinTypeForStrategy(promoteActionStrategy))
|
||||
.map(
|
||||
(MapFunction<Tuple2<Tuple2<String, G>, Tuple2<String, A>>, G>) value -> {
|
||||
Optional<G> rowOpt = Optional.ofNullable(value._1()).map(Tuple2::_2);
|
||||
|
|
|
@ -41,6 +41,12 @@
|
|||
"paramDescription": "strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "pas",
|
||||
"paramLongName": "promoteActionStrategy",
|
||||
"paramDescription": "strategy for promoting the actionset contents into the graph tables, ENRICH or UPSERT (default)",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "sgid",
|
||||
"paramLongName": "shouldGroupById",
|
||||
|
|
|
@ -115,6 +115,7 @@
|
|||
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||
<arg>--outputGraphTablePath</arg><arg>${workingDir}/dataset</arg>
|
||||
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
|
||||
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
|
||||
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
|
||||
</spark>
|
||||
<ok to="DecisionPromoteResultActionPayloadForDatasetTable"/>
|
||||
|
@ -167,6 +168,7 @@
|
|||
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg>
|
||||
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/dataset</arg>
|
||||
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
|
||||
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
|
||||
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
|
|
|
@ -106,6 +106,7 @@
|
|||
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
|
||||
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/datasource</arg>
|
||||
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
|
||||
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
|
|
|
@ -106,6 +106,7 @@
|
|||
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
|
||||
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/organization</arg>
|
||||
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
|
||||
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
|
|
|
@ -114,6 +114,7 @@
|
|||
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||
<arg>--outputGraphTablePath</arg><arg>${workingDir}/otherresearchproduct</arg>
|
||||
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
|
||||
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
|
||||
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
|
||||
</spark>
|
||||
<ok to="DecisionPromoteResultActionPayloadForOtherResearchProductTable"/>
|
||||
|
@ -166,6 +167,7 @@
|
|||
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg>
|
||||
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/otherresearchproduct</arg>
|
||||
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
|
||||
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
|
||||
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
|
|
|
@ -106,6 +106,7 @@
|
|||
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
|
||||
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/project</arg>
|
||||
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
|
||||
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
|
|
|
@ -115,6 +115,7 @@
|
|||
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||
<arg>--outputGraphTablePath</arg><arg>${workingDir}/publication</arg>
|
||||
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
|
||||
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
|
||||
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
|
||||
</spark>
|
||||
<ok to="DecisionPromoteResultActionPayloadForPublicationTable"/>
|
||||
|
@ -167,6 +168,7 @@
|
|||
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg>
|
||||
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/publication</arg>
|
||||
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
|
||||
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
|
||||
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
|
|
|
@ -107,6 +107,7 @@
|
|||
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
|
||||
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/relation</arg>
|
||||
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
|
||||
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
|
|
|
@ -114,6 +114,7 @@
|
|||
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||
<arg>--outputGraphTablePath</arg><arg>${workingDir}/software</arg>
|
||||
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
|
||||
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
|
||||
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
|
||||
</spark>
|
||||
<ok to="DecisionPromoteResultActionPayloadForSoftwareTable"/>
|
||||
|
@ -166,6 +167,7 @@
|
|||
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Result</arg>
|
||||
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/software</arg>
|
||||
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
|
||||
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
|
||||
<arg>--shouldGroupById</arg><arg>${shouldGroupById}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
|
|
|
@ -54,7 +54,7 @@ public class PromoteActionPayloadFunctionsTest {
|
|||
RuntimeException.class,
|
||||
() -> PromoteActionPayloadFunctions
|
||||
.joinGraphTableWithActionPayloadAndMerge(
|
||||
null, null, null, null, null, OafImplSubSub.class, OafImpl.class));
|
||||
null, null, null, null, null, null, OafImplSubSub.class, OafImpl.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -104,6 +104,7 @@ public class PromoteActionPayloadFunctionsTest {
|
|||
rowIdFn,
|
||||
actionPayloadIdFn,
|
||||
mergeAndGetFn,
|
||||
PromoteAction.Strategy.UPSERT,
|
||||
OafImplSubSub.class,
|
||||
OafImplSubSub.class)
|
||||
.collectAsList();
|
||||
|
@ -183,6 +184,7 @@ public class PromoteActionPayloadFunctionsTest {
|
|||
rowIdFn,
|
||||
actionPayloadIdFn,
|
||||
mergeAndGetFn,
|
||||
PromoteAction.Strategy.UPSERT,
|
||||
OafImplSubSub.class,
|
||||
OafImplSub.class)
|
||||
.collectAsList();
|
||||
|
|
Loading…
Reference in New Issue