rdd = sc
+ .sequenceFile(path, Text.class, Text.class)
+ .map(x -> RowFactory.create(x._1().toString(), x._2().toString()));
+
+ return spark.createDataFrame(rdd, KV_SCHEMA)
+ .withColumn("atomic_action", from_json(col("value"), ATOMIC_ACTION_SCHEMA))
+ .select(expr("atomic_action.*"));
+ }
+
+ private static void saveActions(Dataset actionDS,
+ String path) {
+ logger.info("Saving actions to path: {}", path);
+ actionDS
+ .write()
+ .partitionBy("clazz")
+ .mode(SaveMode.Append)
+ .parquet(path);
+ }
+
+ public ISClient getIsClient() {
+ return isClient;
+ }
+
+ public void setIsClient(ISClient isClient) {
+ this.isClient = isClient;
+ }
+}
diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/MergeAndGet.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/MergeAndGet.java
new file mode 100644
index 000000000..19b2104bc
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/MergeAndGet.java
@@ -0,0 +1,74 @@
+package eu.dnetlib.dhp.actionmanager.promote;
+
+import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier;
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+import eu.dnetlib.dhp.schema.oaf.OafEntity;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+
+import java.util.function.BiFunction;
+
+import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
+
+/**
+ * OAF model merging support.
+ */
+public class MergeAndGet {
+
+ private MergeAndGet() {
+ }
+
+ /**
+ * Strategy for merging OAF model objects.
+ *
+ * MERGE_FROM_AND_GET: use OAF 'mergeFrom' method
+ * SELECT_NEWER_AND_GET: use last update timestamp to return newer instance
+ */
+ public enum Strategy {
+ MERGE_FROM_AND_GET, SELECT_NEWER_AND_GET
+ }
+
+ /**
+ * Returns a function for merging OAF model objects.
+ *
+ * @param strategy Strategy to be used to merge objects
+ * @param Graph table type
+ * @param Action payload type
+ * @return BiFunction to be used to merge OAF objects
+ */
+ public static SerializableSupplier> functionFor(Strategy strategy) {
+ switch (strategy) {
+ case MERGE_FROM_AND_GET:
+ return () -> MergeAndGet::mergeFromAndGet;
+ case SELECT_NEWER_AND_GET:
+ return () -> MergeAndGet::selectNewerAndGet;
+ }
+ throw new RuntimeException();
+ }
+
+ private static G mergeFromAndGet(G x, A y) {
+ if (isSubClass(x, Relation.class) && isSubClass(y, Relation.class)) {
+ ((Relation) x).mergeFrom((Relation) y);
+ return x;
+ } else if (isSubClass(x, OafEntity.class) && isSubClass(y, OafEntity.class) && isSubClass(x, y)) {
+ ((OafEntity) x).mergeFrom((OafEntity) y);
+ return x;
+ }
+ throw new RuntimeException(String.format("MERGE_FROM_AND_GET incompatible types: %s, %s",
+ x.getClass().getCanonicalName(), y.getClass().getCanonicalName()));
+ }
+
+ private static G selectNewerAndGet(G x, A y) {
+ if (x.getClass().equals(y.getClass()) && x.getLastupdatetimestamp() > y.getLastupdatetimestamp()) {
+ return x;
+ } else if (x.getClass().equals(y.getClass()) && x.getLastupdatetimestamp() < y.getLastupdatetimestamp()) {
+ return (G) y;
+ } else if (isSubClass(x, y) && x.getLastupdatetimestamp() > y.getLastupdatetimestamp()) {
+ return x;
+ } else if (isSubClass(x, y) && x.getLastupdatetimestamp() < y.getLastupdatetimestamp()) {
+ throw new RuntimeException(String.format("SELECT_NEWER_AND_GET cannot return right type when it is not the same as left type: %s, %s",
+ x.getClass().getCanonicalName(), y.getClass().getCanonicalName()));
+ }
+ throw new RuntimeException(String.format("SELECT_NEWER_AND_GET cannot be used when left is not subtype of right: %s, %s",
+ x.getClass().getCanonicalName(), y.getClass().getCanonicalName()));
+ }
+}
diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java
new file mode 100644
index 000000000..20b75842c
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java
@@ -0,0 +1,254 @@
+package eu.dnetlib.dhp.actionmanager.promote;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.*;
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FilterFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+/**
+ * Applies a given action payload file to graph table of compatible type.
+ */
+public class PromoteActionPayloadForGraphTableJob {
+ private static final Logger logger = LoggerFactory.getLogger(PromoteActionPayloadForGraphTableJob.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static void main(String[] args) throws Exception {
+ String jsonConfiguration = IOUtils.toString(
+ PromoteActionPayloadForGraphTableJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/actionmanager/promote/promote_action_payload_for_graph_table_input_parameters.json"));
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ logger.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ String inputGraphTablePath = parser.get("inputGraphTablePath");
+ logger.info("inputGraphTablePath: {}", inputGraphTablePath);
+
+ String graphTableClassName = parser.get("graphTableClassName");
+ logger.info("graphTableClassName: {}", graphTableClassName);
+
+ String inputActionPayloadPath = parser.get("inputActionPayloadPath");
+ logger.info("inputActionPayloadPath: {}", inputActionPayloadPath);
+
+ String actionPayloadClassName = parser.get("actionPayloadClassName");
+ logger.info("actionPayloadClassName: {}", actionPayloadClassName);
+
+ String outputGraphTablePath = parser.get("outputGraphTablePath");
+ logger.info("outputGraphTablePath: {}", outputGraphTablePath);
+
+ MergeAndGet.Strategy strategy = MergeAndGet.Strategy.valueOf(parser.get("mergeAndGetStrategy").toUpperCase());
+ logger.info("strategy: {}", strategy);
+
+ Class extends Oaf> rowClazz = (Class extends Oaf>) Class.forName(graphTableClassName);
+ Class extends Oaf> actionPayloadClazz = (Class extends Oaf>) Class.forName(actionPayloadClassName);
+
+ throwIfGraphTableClassIsNotSubClassOfActionPayloadClass(rowClazz, actionPayloadClazz);
+
+ SparkConf conf = new SparkConf();
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.registerKryoClasses(ModelSupport.getOafModelClasses());
+
+ runWithSparkSession(conf, isSparkSessionManaged,
+ spark -> {
+ removeOutputDir(spark, outputGraphTablePath);
+ promoteActionPayloadForGraphTable(spark,
+ inputGraphTablePath,
+ inputActionPayloadPath,
+ outputGraphTablePath,
+ strategy,
+ rowClazz,
+ actionPayloadClazz);
+ });
+ }
+
+ private static void throwIfGraphTableClassIsNotSubClassOfActionPayloadClass(Class extends Oaf> rowClazz,
+ Class extends Oaf> actionPayloadClazz) {
+ if (!isSubClass(rowClazz, actionPayloadClazz)) {
+ String msg = String.format("graph table class is not a subclass of action payload class: graph=%s, action=%s",
+ rowClazz.getCanonicalName(), actionPayloadClazz.getCanonicalName());
+ throw new RuntimeException(msg);
+ }
+ }
+
+ private static void removeOutputDir(SparkSession spark,
+ String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+ private static void promoteActionPayloadForGraphTable(SparkSession spark,
+ String inputGraphTablePath,
+ String inputActionPayloadPath,
+ String outputGraphTablePath,
+ MergeAndGet.Strategy strategy,
+ Class rowClazz,
+ Class actionPayloadClazz) {
+ Dataset rowDS = readGraphTable(spark, inputGraphTablePath, rowClazz);
+ Dataset actionPayloadDS = readActionPayload(spark, inputActionPayloadPath, actionPayloadClazz);
+
+ Dataset result = promoteActionPayloadForGraphTable(rowDS, actionPayloadDS, strategy, rowClazz, actionPayloadClazz)
+ .map((MapFunction) value -> value, Encoders.bean(rowClazz));
+
+ saveGraphTable(result, outputGraphTablePath);
+ }
+
+ private static Dataset readGraphTable(SparkSession spark,
+ String path,
+ Class rowClazz) {
+ logger.info("Reading graph table from path: {}", path);
+
+ return spark.read()
+ .textFile(path)
+ .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, rowClazz), Encoders.bean(rowClazz));
+
+ /*
+ return spark
+ .read()
+ .parquet(path)
+ .as(Encoders.bean(rowClazz));
+ */
+ }
+
+ private static Dataset readActionPayload(SparkSession spark,
+ String path,
+ Class actionPayloadClazz) {
+ logger.info("Reading action payload from path: {}", path);
+ return spark
+ .read()
+ .parquet(path)
+ .map((MapFunction) value -> OBJECT_MAPPER.readValue(value.getAs("payload"),
+ actionPayloadClazz), Encoders.bean(actionPayloadClazz));
+ }
+
+ private static Dataset promoteActionPayloadForGraphTable(Dataset rowDS,
+ Dataset actionPayloadDS,
+ MergeAndGet.Strategy strategy,
+ Class rowClazz,
+ Class actionPayloadClazz) {
+ logger.info("Promoting action payload for graph table: payload={}, table={}", actionPayloadClazz.getSimpleName(), rowClazz.getSimpleName());
+
+ SerializableSupplier> rowIdFn = PromoteActionPayloadForGraphTableJob::idFn;
+ SerializableSupplier> actionPayloadIdFn = PromoteActionPayloadForGraphTableJob::idFn;
+ SerializableSupplier> mergeRowWithActionPayloadAndGetFn = MergeAndGet.functionFor(strategy);
+ SerializableSupplier> mergeRowsAndGetFn = MergeAndGet.functionFor(strategy);
+ SerializableSupplier zeroFn = zeroFn(rowClazz);
+ SerializableSupplier> isNotZeroFn = PromoteActionPayloadForGraphTableJob::isNotZeroFnUsingIdOrSource;
+
+ Dataset joinedAndMerged = PromoteActionPayloadFunctions
+ .joinGraphTableWithActionPayloadAndMerge(
+ rowDS,
+ actionPayloadDS,
+ rowIdFn,
+ actionPayloadIdFn,
+ mergeRowWithActionPayloadAndGetFn,
+ rowClazz,
+ actionPayloadClazz);
+
+ return PromoteActionPayloadFunctions
+ .groupGraphTableByIdAndMerge(
+ joinedAndMerged,
+ rowIdFn,
+ mergeRowsAndGetFn,
+ zeroFn,
+ isNotZeroFn,
+ rowClazz);
+ }
+
+ private static Function idFn() {
+ return x -> {
+ if (isSubClass(x, Relation.class)) {
+ return idFnForRelation(x);
+ }
+ return idFnForOafEntity(x);
+ };
+ }
+
+ private static String idFnForRelation(T t) {
+ Relation r = (Relation) t;
+ return Optional.ofNullable(r.getSource())
+ .map(source -> Optional.ofNullable(r.getTarget())
+ .map(target -> Optional.ofNullable(r.getRelType())
+ .map(relType -> Optional.ofNullable(r.getSubRelType())
+ .map(subRelType -> Optional.ofNullable(r.getRelClass())
+ .map(relClass -> String.join(source, target, relType, subRelType, relClass))
+ .orElse(String.join(source, target, relType, subRelType))
+ )
+ .orElse(String.join(source, target, relType))
+ )
+ .orElse(String.join(source, target))
+ )
+ .orElse(source)
+ )
+ .orElse(null);
+ }
+
+ private static String idFnForOafEntity(T t) {
+ return ((OafEntity) t).getId();
+ }
+
+ private static SerializableSupplier zeroFn(Class clazz) {
+ switch (clazz.getCanonicalName()) {
+ case "eu.dnetlib.dhp.schema.oaf.Dataset":
+ return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Dataset());
+ case "eu.dnetlib.dhp.schema.oaf.Datasource":
+ return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Datasource());
+ case "eu.dnetlib.dhp.schema.oaf.Organization":
+ return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Organization());
+ case "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct":
+ return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.OtherResearchProduct());
+ case "eu.dnetlib.dhp.schema.oaf.Project":
+ return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Project());
+ case "eu.dnetlib.dhp.schema.oaf.Publication":
+ return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Publication());
+ case "eu.dnetlib.dhp.schema.oaf.Relation":
+ return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Relation());
+ case "eu.dnetlib.dhp.schema.oaf.Software":
+ return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Software());
+ default:
+ throw new RuntimeException("unknown class: " + clazz.getCanonicalName());
+ }
+ }
+
+ private static Function isNotZeroFnUsingIdOrSource() {
+ return t -> {
+ if (isSubClass(t, Relation.class)) {
+ return Objects.nonNull(((Relation) t).getSource());
+ }
+ return Objects.nonNull(((OafEntity) t).getId());
+ };
+ }
+
+ private static void saveGraphTable(Dataset result,
+ String path) {
+ logger.info("Saving graph table to path: {}", path);
+ result
+ .toJSON()
+ .write()
+ .option("compression", "gzip")
+ .text(path);
+ }
+}
diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java
new file mode 100644
index 000000000..fda86cb19
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java
@@ -0,0 +1,170 @@
+package eu.dnetlib.dhp.actionmanager.promote;
+
+import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier;
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+import org.apache.spark.api.java.function.FilterFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.TypedColumn;
+import org.apache.spark.sql.expressions.Aggregator;
+import scala.Tuple2;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
+
+/**
+ * Promote action payload functions.
+ */
+public class PromoteActionPayloadFunctions {
+
+ private PromoteActionPayloadFunctions() {
+ }
+
+ /**
+ * Joins dataset representing graph table with dataset representing action payload using supplied functions.
+ *
+ * @param rowDS Dataset representing graph table
+ * @param actionPayloadDS Dataset representing action payload
+ * @param rowIdFn Function used to get the id of graph table row
+ * @param actionPayloadIdFn Function used to get id of action payload instance
+ * @param mergeAndGetFn Function used to merge graph table row and action payload instance
+ * @param rowClazz Class of graph table
+ * @param actionPayloadClazz Class of action payload
+ * @param Type of graph table row
+ * @param Type of action payload instance
+ * @return Dataset of merged graph table rows and action payload instances
+ */
+ public static Dataset joinGraphTableWithActionPayloadAndMerge(Dataset rowDS,
+ Dataset actionPayloadDS,
+ SerializableSupplier> rowIdFn,
+ SerializableSupplier> actionPayloadIdFn,
+ SerializableSupplier> mergeAndGetFn,
+ Class rowClazz,
+ Class actionPayloadClazz) {
+ if (!isSubClass(rowClazz, actionPayloadClazz)) {
+ throw new RuntimeException("action payload type must be the same or be a super type of table row type");
+ }
+
+ Dataset> rowWithIdDS = mapToTupleWithId(rowDS, rowIdFn, rowClazz);
+ Dataset> actionPayloadWithIdDS = mapToTupleWithId(actionPayloadDS, actionPayloadIdFn, actionPayloadClazz);
+
+ return rowWithIdDS
+ .joinWith(actionPayloadWithIdDS, rowWithIdDS.col("_1").equalTo(actionPayloadWithIdDS.col("_1")), "full_outer")
+ .map((MapFunction, Tuple2>, G>) value -> {
+ Optional rowOpt = Optional.ofNullable(value._1()).map(Tuple2::_2);
+ Optional actionPayloadOpt = Optional.ofNullable(value._2()).map(Tuple2::_2);
+ return rowOpt
+ .map(row -> actionPayloadOpt
+ .map(actionPayload -> mergeAndGetFn.get().apply(row, actionPayload))
+ .orElse(row))
+ .orElseGet(() -> actionPayloadOpt
+ .filter(actionPayload -> actionPayload.getClass().equals(rowClazz))
+ .map(rowClazz::cast)
+ .orElse(null));
+ }, Encoders.kryo(rowClazz))
+ .filter((FilterFunction) Objects::nonNull);
+ }
+
+ private static Dataset> mapToTupleWithId(Dataset ds,
+ SerializableSupplier> idFn,
+ Class clazz) {
+ return ds
+ .map((MapFunction>) value -> new Tuple2<>(idFn.get().apply(value), value),
+ Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
+ }
+
+ /**
+ * Groups graph table by id and aggregates using supplied functions.
+ *
+ * @param rowDS Dataset representing graph table
+ * @param rowIdFn Function used to get the id of graph table row
+ * @param mergeAndGetFn Function used to merge graph table rows
+ * @param zeroFn Function to create a zero/empty instance of graph table row
+ * @param isNotZeroFn Function to check if graph table row is not zero/empty
+ * @param rowClazz Class of graph table
+ * @param Type of graph table row
+ * @return Dataset of aggregated graph table rows
+ */
+ public static Dataset groupGraphTableByIdAndMerge(Dataset rowDS,
+ SerializableSupplier> rowIdFn,
+ SerializableSupplier> mergeAndGetFn,
+ SerializableSupplier zeroFn,
+ SerializableSupplier> isNotZeroFn,
+ Class rowClazz) {
+ TypedColumn aggregator = new TableAggregator<>(zeroFn, mergeAndGetFn, isNotZeroFn, rowClazz).toColumn();
+ return rowDS
+ .groupByKey((MapFunction) x -> rowIdFn.get().apply(x), Encoders.STRING())
+ .agg(aggregator)
+ .map((MapFunction, G>) Tuple2::_2, Encoders.kryo(rowClazz));
+ }
+
+ /**
+ * Aggregator to be used for aggregating graph table rows during grouping.
+ *
+ * @param Type of graph table row
+ */
+ public static class TableAggregator extends Aggregator {
+ private SerializableSupplier zeroFn;
+ private SerializableSupplier> mergeAndGetFn;
+ private SerializableSupplier> isNotZeroFn;
+ private Class rowClazz;
+
+ public TableAggregator(SerializableSupplier zeroFn,
+ SerializableSupplier> mergeAndGetFn,
+ SerializableSupplier> isNotZeroFn,
+ Class rowClazz) {
+ this.zeroFn = zeroFn;
+ this.mergeAndGetFn = mergeAndGetFn;
+ this.isNotZeroFn = isNotZeroFn;
+ this.rowClazz = rowClazz;
+ }
+
+ @Override
+ public G zero() {
+ return zeroFn.get();
+ }
+
+ @Override
+ public G reduce(G b, G a) {
+ return zeroSafeMergeAndGet(b, a);
+ }
+
+ @Override
+ public G merge(G b1, G b2) {
+ return zeroSafeMergeAndGet(b1, b2);
+ }
+
+ private G zeroSafeMergeAndGet(G left, G right) {
+ Function isNotZero = isNotZeroFn.get();
+ if (isNotZero.apply(left) && isNotZero.apply(right)) {
+ return mergeAndGetFn.get().apply(left, right);
+ } else if (isNotZero.apply(left) && !isNotZero.apply(right)) {
+ return left;
+ } else if (!isNotZero.apply(left) && isNotZero.apply(right)) {
+ return right;
+ }
+ throw new RuntimeException("internal aggregation error: left and right objects are zero");
+ }
+
+ @Override
+ public G finish(G reduction) {
+ return reduction;
+ }
+
+ @Override
+ public Encoder bufferEncoder() {
+ return Encoders.kryo(rowClazz);
+ }
+
+ @Override
+ public Encoder outputEncoder() {
+ return Encoders.kryo(rowClazz);
+ }
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/partition/partition_action_sets_by_payload_type_input_parameters.json b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/partition/partition_action_sets_by_payload_type_input_parameters.json
new file mode 100644
index 000000000..ad58fe754
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/partition/partition_action_sets_by_payload_type_input_parameters.json
@@ -0,0 +1,26 @@
+[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+ },
+ {
+ "paramName": "iasi",
+ "paramLongName": "inputActionSetIds",
+ "paramDescription": "comma separated list of action set ids to partition by payload type",
+ "paramRequired": true
+ },
+ {
+ "paramName": "op",
+ "paramLongName": "outputPath",
+ "paramDescription": "root output location for partitioned action sets",
+ "paramRequired": true
+ },
+ {
+ "paramName": "is",
+ "paramLongName": "isLookupUrl",
+ "paramDescription": "URL of the isLookUp Service",
+ "paramRequired": true
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/promote/promote_action_payload_for_graph_table_input_parameters.json b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/promote/promote_action_payload_for_graph_table_input_parameters.json
new file mode 100644
index 000000000..e111f156e
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/promote/promote_action_payload_for_graph_table_input_parameters.json
@@ -0,0 +1,44 @@
+[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+ },
+ {
+ "paramName": "igtp",
+ "paramLongName": "inputGraphTablePath",
+ "paramDescription": "location of graph table to promote",
+ "paramRequired": true
+ },
+ {
+ "paramName": "gtcn",
+ "paramLongName": "graphTableClassName",
+ "paramDescription": "canonical name of graph table row class",
+ "paramRequired": true
+ },
+ {
+ "paramName": "iapp",
+ "paramLongName": "inputActionPayloadPath",
+ "paramDescription": "location of action payload to promote",
+ "paramRequired": true
+ },
+ {
+ "paramName": "apcn",
+ "paramLongName": "actionPayloadClassName",
+ "paramDescription": "canonical name of action payload class",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ogtp",
+ "paramLongName": "outputGraphTablePath",
+ "paramDescription": "location of promoted graph table",
+ "paramRequired": true
+ },
+ {
+ "paramName": "mags",
+ "paramLongName": "mergeAndGetStrategy",
+ "paramDescription": "strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET",
+ "paramRequired": true
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/dataset/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/dataset/oozie_app/workflow.xml
new file mode 100644
index 000000000..f95349935
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/dataset/oozie_app/workflow.xml
@@ -0,0 +1,184 @@
+
+
+
+ activePromoteDatasetActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.Dataset payload
+
+
+ activePromoteResultActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.Result payload
+
+
+ inputGraphRootPath
+ root location of input materialized graph
+
+
+ inputActionPayloadRootPath
+ root location of action payloads to promote
+
+
+ outputGraphRootPath
+ root location for output materialized graph
+
+
+ mergeAndGetStrategy
+ strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+ oozieActionShareLibForSpark2
+ oozie action sharelib for spark 2.*
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+ spark 2.* extra listeners classname
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+ spark 2.* sql query execution listeners classname
+
+
+ spark2YarnHistoryServerAddress
+ spark 2.* yarn history server address
+
+
+ spark2EventLogDir
+ spark 2.* event log dir location
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ oozie.action.sharelib.for.spark
+ ${oozieActionShareLibForSpark2}
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+ ${(activePromoteDatasetActionPayload eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputGraphRootPath')),'/'),'dataset')) eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.Dataset')) eq "true")}
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ PromoteDatasetActionPayloadForDatasetTable
+ eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob
+ dhp-actionmanager-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=2560
+
+ --inputGraphTablePath${inputGraphRootPath}/dataset
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset
+ --inputActionPayloadPath${inputActionPayloadRootPath}/clazz=eu.dnetlib.dhp.schema.oaf.Dataset
+ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Dataset
+ --outputGraphTablePath${workingDir}/dataset
+ --mergeAndGetStrategy${mergeAndGetStrategy}
+
+
+
+
+
+
+
+
+
+
+ -pb
+ ${inputGraphRootPath}/dataset
+ ${workingDir}/dataset
+
+
+
+
+
+
+
+
+ ${(activePromoteResultActionPayload eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.Result')) eq "true")}
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ PromoteResultActionPayloadForDatasetTable
+ eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob
+ dhp-actionmanager-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=2560
+
+ --inputGraphTablePath${workingDir}/dataset
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset
+ --inputActionPayloadPath${inputActionPayloadRootPath}/clazz=eu.dnetlib.dhp.schema.oaf.Result
+ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Result
+ --outputGraphTablePath${outputGraphRootPath}/dataset
+ --mergeAndGetStrategy${mergeAndGetStrategy}
+
+
+
+
+
+
+
+
+
+
+ -pb
+ ${workingDir}/dataset
+ ${outputGraphRootPath}/dataset
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/datasource/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/datasource/oozie_app/workflow.xml
new file mode 100644
index 000000000..c85ba4ac1
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/datasource/oozie_app/workflow.xml
@@ -0,0 +1,128 @@
+
+
+
+ activePromoteDatasourceActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.Datasource payload
+
+
+ inputGraphRootPath
+ root location of input materialized graph
+
+
+ inputActionPayloadRootPath
+ root location of action payloads to promote
+
+
+ outputGraphRootPath
+ root location for output materialized graph
+
+
+ mergeAndGetStrategy
+ strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+ oozieActionShareLibForSpark2
+ oozie action sharelib for spark 2.*
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+ spark 2.* extra listeners classname
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+ spark 2.* sql query execution listeners classname
+
+
+ spark2YarnHistoryServerAddress
+ spark 2.* yarn history server address
+
+
+ spark2EventLogDir
+ spark 2.* event log dir location
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ oozie.action.sharelib.for.spark
+ ${oozieActionShareLibForSpark2}
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+ ${(activePromoteDatasourceActionPayload eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputGraphRootPath')),'/'),'datasource')) eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.Datasource')) eq "true")}
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ PromoteDatasourceActionPayloadForDatasourceTable
+ eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob
+ dhp-actionmanager-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --inputGraphTablePath${inputGraphRootPath}/datasource
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource
+ --inputActionPayloadPath${inputActionPayloadRootPath}/clazz=eu.dnetlib.dhp.schema.oaf.Datasource
+ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Datasource
+ --outputGraphTablePath${outputGraphRootPath}/datasource
+ --mergeAndGetStrategy${mergeAndGetStrategy}
+
+
+
+
+
+
+
+
+
+
+ -pb
+ ${inputGraphRootPath}/datasource
+ ${outputGraphRootPath}/datasource
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/import.txt b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/import.txt
new file mode 100644
index 000000000..dd8f5e14e
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/import.txt
@@ -0,0 +1,9 @@
+## This is a classpath-based import file (this header is required)
+promote_action_payload_for_dataset_table classpath eu/dnetlib/dhp/actionmanager/wf/dataset/oozie_app
+promote_action_payload_for_datasource_table classpath eu/dnetlib/dhp/actionmanager/wf/datasource/oozie_app
+promote_action_payload_for_organization_table classpath eu/dnetlib/dhp/actionmanager/wf/organization/oozie_app
+promote_action_payload_for_otherresearchproduct_table classpath eu/dnetlib/dhp/actionmanager/wf/otherresearchproduct/oozie_app
+promote_action_payload_for_project_table classpath eu/dnetlib/dhp/actionmanager/wf/project/oozie_app
+promote_action_payload_for_publication_table classpath eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app
+promote_action_payload_for_relation_table classpath eu/dnetlib/dhp/actionmanager/wf/relation/oozie_app
+promote_action_payload_for_software_table classpath eu/dnetlib/dhp/actionmanager/wf/software/oozie_app
diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/workflow.xml
new file mode 100644
index 000000000..25afc34c9
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/workflow.xml
@@ -0,0 +1,270 @@
+
+
+
+ activePromoteDatasetActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.Dataset payload
+
+
+ activePromoteDatasourceActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.Datasource payload
+
+
+ activePromoteOrganizationActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.Organization payload
+
+
+ activePromoteOtherResearchProductActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.OtherResearchProduct payload
+
+
+ activePromoteProjectActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.Project payload
+
+
+ activePromotePublicationActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.Publication payload
+
+
+ activePromoteRelationActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.Relation payload
+
+
+ activePromoteResultActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.Result payload
+
+
+ activePromoteSoftwareActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.Software payload
+
+
+ inputGraphRootPath
+ root location of input materialized graph
+
+
+ isLookupUrl
+ URL of the ISLookupService
+
+
+ inputActionSetIds
+ comma separated list of action set ids to promote
+
+
+ outputGraphRootPath
+ root location for output materialized graph
+
+
+ mergeAndGetStrategy
+ strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+ oozieActionShareLibForSpark2
+ oozie action sharelib for spark 2.*
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+ spark 2.* extra listeners classname
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+ spark 2.* sql query execution listeners classname
+
+
+ spark2YarnHistoryServerAddress
+ spark 2.* yarn history server address
+
+
+ spark2EventLogDir
+ spark 2.* event log dir location
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ oozie.action.sharelib.for.spark
+ ${oozieActionShareLibForSpark2}
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ yarn-cluster
+ cluster
+ PartitionActionSetsByPayloadType
+ eu.dnetlib.dhp.actionmanager.partition.PartitionActionSetsByPayloadTypeJob
+ dhp-actionmanager-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --inputActionSetIds${inputActionSetIds}
+ --outputPath${workingDir}/action_payload_by_type
+ --isLookupUrl${isLookupUrl}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${wf:appPath()}/promote_action_payload_for_dataset_table
+
+
+
+ inputActionPayloadRootPath
+ ${workingDir}/action_payload_by_type
+
+
+
+
+
+
+
+
+
+ ${wf:appPath()}/promote_action_payload_for_datasource_table
+
+
+
+ inputActionPayloadRootPath
+ ${workingDir}/action_payload_by_type
+
+
+
+
+
+
+
+
+
+ ${wf:appPath()}/promote_action_payload_for_organization_table
+
+
+
+ inputActionPayloadRootPath
+ ${workingDir}/action_payload_by_type
+
+
+
+
+
+
+
+
+
+ ${wf:appPath()}/promote_action_payload_for_otherresearchproduct_table
+
+
+
+ inputActionPayloadRootPath
+ ${workingDir}/action_payload_by_type
+
+
+
+
+
+
+
+
+
+ ${wf:appPath()}/promote_action_payload_for_project_table
+
+
+
+ inputActionPayloadRootPath
+ ${workingDir}/action_payload_by_type
+
+
+
+
+
+
+
+
+
+ ${wf:appPath()}/promote_action_payload_for_publication_table
+
+
+
+ inputActionPayloadRootPath
+ ${workingDir}/action_payload_by_type
+
+
+
+
+
+
+
+
+
+ ${wf:appPath()}/promote_action_payload_for_relation_table
+
+
+
+ inputActionPayloadRootPath
+ ${workingDir}/action_payload_by_type
+
+
+
+
+
+
+
+
+
+ ${wf:appPath()}/promote_action_payload_for_software_table
+
+
+
+ inputActionPayloadRootPath
+ ${workingDir}/action_payload_by_type
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/organization/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/organization/oozie_app/workflow.xml
new file mode 100644
index 000000000..412cad70b
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/organization/oozie_app/workflow.xml
@@ -0,0 +1,128 @@
+
+
+
+ activePromoteOrganizationActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.Organization payload
+
+
+ inputGraphRootPath
+ root location of input materialized graph
+
+
+ inputActionPayloadRootPath
+ root location of action payloads to promote
+
+
+ outputGraphRootPath
+ root location for output materialized graph
+
+
+ mergeAndGetStrategy
+ strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+ oozieActionShareLibForSpark2
+ oozie action sharelib for spark 2.*
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+ spark 2.* extra listeners classname
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+ spark 2.* sql query execution listeners classname
+
+
+ spark2YarnHistoryServerAddress
+ spark 2.* yarn history server address
+
+
+ spark2EventLogDir
+ spark 2.* event log dir location
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ oozie.action.sharelib.for.spark
+ ${oozieActionShareLibForSpark2}
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+ ${(activePromoteOrganizationActionPayload eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputGraphRootPath')),'/'),'organization')) eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.Organization')) eq "true")}
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ PromoteOrganizationActionPayloadForOrganizationTable
+ eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob
+ dhp-actionmanager-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --inputGraphTablePath${inputGraphRootPath}/organization
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization
+ --inputActionPayloadPath${inputActionPayloadRootPath}/clazz=eu.dnetlib.dhp.schema.oaf.Organization
+ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Organization
+ --outputGraphTablePath${outputGraphRootPath}/organization
+ --mergeAndGetStrategy${mergeAndGetStrategy}
+
+
+
+
+
+
+
+
+
+
+ -pb
+ ${inputGraphRootPath}/organization
+ ${outputGraphRootPath}/organization
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/otherresearchproduct/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/otherresearchproduct/oozie_app/workflow.xml
new file mode 100644
index 000000000..0deb1b945
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/otherresearchproduct/oozie_app/workflow.xml
@@ -0,0 +1,183 @@
+
+
+
+ activePromoteOtherResearchProductActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.OtherResearchProduct payload
+
+
+ activePromoteResultActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.Result payload
+
+
+ inputGraphRootPath
+ root location of input materialized graph
+
+
+ inputActionPayloadRootPath
+ root location of action payloads to promote
+
+
+ outputGraphRootPath
+ root location for output materialized graph
+
+
+ mergeAndGetStrategy
+ strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+ oozieActionShareLibForSpark2
+ oozie action sharelib for spark 2.*
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+ spark 2.* extra listeners classname
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+ spark 2.* sql query execution listeners classname
+
+
+ spark2YarnHistoryServerAddress
+ spark 2.* yarn history server address
+
+
+ spark2EventLogDir
+ spark 2.* event log dir location
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ oozie.action.sharelib.for.spark
+ ${oozieActionShareLibForSpark2}
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+ ${(activePromoteOtherResearchProductActionPayload eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputGraphRootPath')),'/'),'otherresearchproduct')) eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.OtherResearchProduct')) eq "true")}
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ PromoteOtherResearchProductActionPayloadForOtherResearchProductTable
+ eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob
+ dhp-actionmanager-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --inputGraphTablePath${inputGraphRootPath}/otherresearchproduct
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct
+ --inputActionPayloadPath${inputActionPayloadRootPath}/clazz=eu.dnetlib.dhp.schema.oaf.OtherResearchProduct
+ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct
+ --outputGraphTablePath${workingDir}/otherresearchproduct
+ --mergeAndGetStrategy${mergeAndGetStrategy}
+
+
+
+
+
+
+
+
+
+
+ -pb
+ ${inputGraphRootPath}/otherresearchproduct
+ ${workingDir}/otherresearchproduct
+
+
+
+
+
+
+
+
+ ${(activePromoteResultActionPayload eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.Result')) eq "true")}
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ PromoteResultActionPayloadForOtherResearchProductTable
+ eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob
+ dhp-actionmanager-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=2560
+
+ --inputGraphTablePath${workingDir}/otherresearchproduct
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct
+ --inputActionPayloadPath${inputActionPayloadRootPath}/clazz=eu.dnetlib.dhp.schema.oaf.Result
+ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Result
+ --outputGraphTablePath${outputGraphRootPath}/otherresearchproduct
+ --mergeAndGetStrategy${mergeAndGetStrategy}
+
+
+
+
+
+
+
+
+
+
+ -pb
+ ${workingDir}/otherresearchproduct
+ ${outputGraphRootPath}/otherresearchproduct
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/project/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/project/oozie_app/workflow.xml
new file mode 100644
index 000000000..daf48e9d7
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/project/oozie_app/workflow.xml
@@ -0,0 +1,128 @@
+
+
+
+ activePromoteProjectActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.OtherResearchProduct payload
+
+
+ inputGraphRootPath
+ root location of input materialized graph
+
+
+ inputActionPayloadRootPath
+ root location of action payloads to promote
+
+
+ outputGraphRootPath
+ root location for output materialized graph
+
+
+ mergeAndGetStrategy
+ strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+ oozieActionShareLibForSpark2
+ oozie action sharelib for spark 2.*
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+ spark 2.* extra listeners classname
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+ spark 2.* sql query execution listeners classname
+
+
+ spark2YarnHistoryServerAddress
+ spark 2.* yarn history server address
+
+
+ spark2EventLogDir
+ spark 2.* event log dir location
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ oozie.action.sharelib.for.spark
+ ${oozieActionShareLibForSpark2}
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+ ${(activePromoteProjectActionPayload eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputGraphRootPath')),'/'),'project')) eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.Project')) eq "true")}
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ PromoteProjectActionPayloadForProjectTable
+ eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob
+ dhp-actionmanager-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --inputGraphTablePath${inputGraphRootPath}/project
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project
+ --inputActionPayloadPath${inputActionPayloadRootPath}/clazz=eu.dnetlib.dhp.schema.oaf.Project
+ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Project
+ --outputGraphTablePath${outputGraphRootPath}/project
+ --mergeAndGetStrategy${mergeAndGetStrategy}
+
+
+
+
+
+
+
+
+
+
+ -pb
+ ${inputGraphRootPath}/project
+ ${outputGraphRootPath}/project
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app/workflow.xml
new file mode 100644
index 000000000..70400a123
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app/workflow.xml
@@ -0,0 +1,184 @@
+
+
+
+ activePromotePublicationActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.Publication payload
+
+
+ activePromoteResultActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.Result payload
+
+
+ inputGraphRootPath
+ root location of input materialized graph
+
+
+ inputActionPayloadRootPath
+ root location of action payloads to promote
+
+
+ outputGraphRootPath
+ root location for output materialized graph
+
+
+ mergeAndGetStrategy
+ strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+ oozieActionShareLibForSpark2
+ oozie action sharelib for spark 2.*
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+ spark 2.* extra listeners classname
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+ spark 2.* sql query execution listeners classname
+
+
+ spark2YarnHistoryServerAddress
+ spark 2.* yarn history server address
+
+
+ spark2EventLogDir
+ spark 2.* event log dir location
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ oozie.action.sharelib.for.spark
+ ${oozieActionShareLibForSpark2}
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+ ${(activePromotePublicationActionPayload eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputGraphRootPath')),'/'),'publication')) eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.Publication')) eq "true")}
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ PromotePublicationActionPayloadForPublicationTable
+ eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob
+ dhp-actionmanager-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=2560
+
+ --inputGraphTablePath${inputGraphRootPath}/publication
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication
+ --inputActionPayloadPath${inputActionPayloadRootPath}/clazz=eu.dnetlib.dhp.schema.oaf.Publication
+ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Publication
+ --outputGraphTablePath${workingDir}/publication
+ --mergeAndGetStrategy${mergeAndGetStrategy}
+
+
+
+
+
+
+
+
+
+
+ -pb
+ ${inputGraphRootPath}/publication
+ ${workingDir}/publication
+
+
+
+
+
+
+
+
+ ${(activePromoteResultActionPayload eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.Result')) eq "true")}
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ PromoteResultActionPayloadForPublicationTable
+ eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob
+ dhp-actionmanager-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=2560
+
+ --inputGraphTablePath${workingDir}/publication
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication
+ --inputActionPayloadPath${inputActionPayloadRootPath}/clazz=eu.dnetlib.dhp.schema.oaf.Result
+ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Result
+ --outputGraphTablePath${outputGraphRootPath}/publication
+ --mergeAndGetStrategy${mergeAndGetStrategy}
+
+
+
+
+
+
+
+
+
+
+ -pb
+ ${workingDir}/publication
+ ${outputGraphRootPath}/publication
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/relation/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/relation/oozie_app/workflow.xml
new file mode 100644
index 000000000..a7dce8f2f
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/relation/oozie_app/workflow.xml
@@ -0,0 +1,129 @@
+
+
+
+ activePromoteRelationActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.Relation payload
+
+
+ inputGraphRootPath
+ root location of input materialized graph
+
+
+ inputActionPayloadRootPath
+ root location of action payloads to promote
+
+
+ outputGraphRootPath
+ root location for output materialized graph
+
+
+ mergeAndGetStrategy
+ strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+ oozieActionShareLibForSpark2
+ oozie action sharelib for spark 2.*
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+ spark 2.* extra listeners classname
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+ spark 2.* sql query execution listeners classname
+
+
+ spark2YarnHistoryServerAddress
+ spark 2.* yarn history server address
+
+
+ spark2EventLogDir
+ spark 2.* event log dir location
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ oozie.action.sharelib.for.spark
+ ${oozieActionShareLibForSpark2}
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+ ${(activePromoteRelationActionPayload eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputGraphRootPath')),'/'),'relation')) eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.Relation')) eq "true")}
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ PromoteRelationActionPayloadForRelationTable
+ eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob
+ dhp-actionmanager-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=2560
+
+ --inputGraphTablePath${inputGraphRootPath}/relation
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation
+ --inputActionPayloadPath${inputActionPayloadRootPath}/clazz=eu.dnetlib.dhp.schema.oaf.Relation
+ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Relation
+ --outputGraphTablePath${outputGraphRootPath}/relation
+ --mergeAndGetStrategy${mergeAndGetStrategy}
+
+
+
+
+
+
+
+
+
+
+ -pb
+ ${inputGraphRootPath}/relation
+ ${outputGraphRootPath}/relation
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/software/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/software/oozie_app/workflow.xml
new file mode 100644
index 000000000..396e27721
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/software/oozie_app/workflow.xml
@@ -0,0 +1,183 @@
+
+
+
+ activePromoteSoftwareActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.Software payload
+
+
+ activePromoteResultActionPayload
+ when true will promote actions with eu.dnetlib.dhp.schema.oaf.Result payload
+
+
+ inputGraphRootPath
+ root location of input materialized graph
+
+
+ inputActionPayloadRootPath
+ root location of action payloads to promote
+
+
+ outputGraphRootPath
+ root location for output materialized graph
+
+
+ mergeAndGetStrategy
+ strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+ oozieActionShareLibForSpark2
+ oozie action sharelib for spark 2.*
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+ spark 2.* extra listeners classname
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+ spark 2.* sql query execution listeners classname
+
+
+ spark2YarnHistoryServerAddress
+ spark 2.* yarn history server address
+
+
+ spark2EventLogDir
+ spark 2.* event log dir location
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ oozie.action.sharelib.for.spark
+ ${oozieActionShareLibForSpark2}
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+ ${(activePromoteSoftwareActionPayload eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputGraphRootPath')),'/'),'software')) eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.Software')) eq "true")}
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ PromoteSoftwareActionPayloadForSoftwareTable
+ eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob
+ dhp-actionmanager-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --inputGraphTablePath${inputGraphRootPath}/software
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software
+ --inputActionPayloadPath${inputActionPayloadRootPath}/clazz=eu.dnetlib.dhp.schema.oaf.Software
+ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Software
+ --outputGraphTablePath${workingDir}/software
+ --mergeAndGetStrategy${mergeAndGetStrategy}
+
+
+
+
+
+
+
+
+
+
+ -pb
+ ${inputGraphRootPath}/software
+ ${workingDir}/software
+
+
+
+
+
+
+
+
+ ${(activePromoteResultActionPayload eq "true") and
+ (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.Result')) eq "true")}
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ PromoteResultActionPayloadForSoftwareTable
+ eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob
+ dhp-actionmanager-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=2560
+
+ --inputGraphTablePath${workingDir}/software
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software
+ --inputActionPayloadPath${inputActionPayloadRootPath}/clazz=eu.dnetlib.dhp.schema.oaf.Result
+ --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Result
+ --outputGraphTablePath${outputGraphRootPath}/software
+ --mergeAndGetStrategy${mergeAndGetStrategy}
+
+
+
+
+
+
+
+
+
+
+ -pb
+ ${workingDir}/software
+ ${outputGraphRootPath}/software
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJobTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJobTest.java
new file mode 100644
index 000000000..bd5dc9a5d
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJobTest.java
@@ -0,0 +1,221 @@
+package eu.dnetlib.dhp.actionmanager.partition;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import eu.dnetlib.dhp.actionmanager.ISClient;
+import eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJobTest;
+import eu.dnetlib.dhp.schema.oaf.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.*;
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import scala.Tuple2;
+import scala.collection.mutable.Seq;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static eu.dnetlib.dhp.common.ThrowingSupport.rethrowAsRuntimeException;
+import static org.apache.spark.sql.functions.*;
+import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+import static scala.collection.JavaConversions.mutableSeqAsJavaList;
+
+@ExtendWith(MockitoExtension.class)
+public class PartitionActionSetsByPayloadTypeJobTest {
+ private static final ClassLoader cl = PartitionActionSetsByPayloadTypeJobTest.class.getClassLoader();
+
+ private static Configuration configuration;
+ private static SparkSession spark;
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private static final StructType ATOMIC_ACTION_SCHEMA = StructType$.MODULE$.apply(
+ Arrays.asList(
+ StructField$.MODULE$.apply("clazz", DataTypes.StringType, false, Metadata.empty()),
+ StructField$.MODULE$.apply("payload", DataTypes.StringType, false, Metadata.empty())
+ ));
+
+ @BeforeAll
+ public static void beforeAll() throws IOException {
+ configuration = Job.getInstance().getConfiguration();
+ SparkConf conf = new SparkConf();
+ conf.setAppName(PromoteActionPayloadForGraphTableJobTest.class.getSimpleName());
+ conf.setMaster("local");
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ spark = SparkSession.builder().config(conf).getOrCreate();
+ }
+
+ @AfterAll
+ public static void afterAll() {
+ spark.stop();
+ }
+
+ @DisplayName("Job")
+ @Nested
+ class Main {
+
+ @Mock
+ private ISClient isClient;
+
+ @Test
+ public void shouldPartitionActionSetsByPayloadType(@TempDir Path workingDir) throws Exception {
+ // given
+ Path inputActionSetsBaseDir = workingDir.resolve("input").resolve("action_sets");
+ Path outputDir = workingDir.resolve("output");
+
+ Map> oafsByClassName = createActionSets(inputActionSetsBaseDir);
+
+ List inputActionSetsPaths = resolveInputActionSetPaths(inputActionSetsBaseDir);
+
+ // when
+ Mockito.when(isClient.getLatestRawsetPaths(Mockito.anyString())).thenReturn(inputActionSetsPaths);
+
+ PartitionActionSetsByPayloadTypeJob job = new PartitionActionSetsByPayloadTypeJob();
+ job.setIsClient(isClient);
+ job.run(
+ Boolean.FALSE,
+ "", // it can be empty we're mocking the response from isClient to resolve the paths
+ outputDir.toString()
+ );
+
+ // then
+ Files.exists(outputDir);
+
+ assertForOafType(outputDir, oafsByClassName, eu.dnetlib.dhp.schema.oaf.Dataset.class);
+ assertForOafType(outputDir, oafsByClassName, Datasource.class);
+ assertForOafType(outputDir, oafsByClassName, Organization.class);
+ assertForOafType(outputDir, oafsByClassName, OtherResearchProduct.class);
+ assertForOafType(outputDir, oafsByClassName, Project.class);
+ assertForOafType(outputDir, oafsByClassName, Publication.class);
+ assertForOafType(outputDir, oafsByClassName, Result.class);
+ assertForOafType(outputDir, oafsByClassName, Relation.class);
+ assertForOafType(outputDir, oafsByClassName, Software.class);
+ }
+ }
+
+ private List resolveInputActionSetPaths(Path inputActionSetsBaseDir) throws IOException {
+ Path inputActionSetJsonDumpsDir = getInputActionSetJsonDumpsDir();
+ return Files
+ .list(inputActionSetJsonDumpsDir)
+ .map(path -> {
+ String inputActionSetId = path.getFileName().toString();
+ return inputActionSetsBaseDir.resolve(inputActionSetId).toString();
+ })
+ .collect(Collectors.toCollection(ArrayList::new));
+ }
+
+ private static Map> createActionSets(Path inputActionSetsDir) throws IOException {
+ Path inputActionSetJsonDumpsDir = getInputActionSetJsonDumpsDir();
+
+ Map> oafsByType = new HashMap<>();
+ Files
+ .list(inputActionSetJsonDumpsDir)
+ .forEach(inputActionSetJsonDumpFile -> {
+ String inputActionSetId = inputActionSetJsonDumpFile.getFileName().toString();
+ Path inputActionSetDir = inputActionSetsDir.resolve(inputActionSetId);
+
+ Dataset actionDS = readActionsFromJsonDump(inputActionSetJsonDumpFile.toString())
+ .cache();
+
+ writeActionsAsJobInput(actionDS, inputActionSetId, inputActionSetDir.toString());
+
+ Map> actionSetOafsByType = actionDS
+ .withColumn("atomic_action", from_json(col("value"), ATOMIC_ACTION_SCHEMA))
+ .select(expr("atomic_action.*"))
+ .groupBy(col("clazz"))
+ .agg(collect_list(col("payload")).as("payload_list"))
+ .collectAsList()
+ .stream()
+ .map(row -> new AbstractMap.SimpleEntry<>(row.getAs("clazz"),
+ mutableSeqAsJavaList(row.>getAs("payload_list"))))
+ .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
+
+ actionSetOafsByType.keySet()
+ .forEach(x -> {
+ if (oafsByType.containsKey(x)) {
+ List collected = new ArrayList<>();
+ collected.addAll(oafsByType.get(x));
+ collected.addAll(actionSetOafsByType.get(x));
+ oafsByType.put(x, collected);
+ } else {
+ oafsByType.put(x, actionSetOafsByType.get(x));
+ }
+ });
+ });
+
+ return oafsByType;
+ }
+
+ private static Path getInputActionSetJsonDumpsDir() {
+ return Paths
+ .get(Objects.requireNonNull(cl.getResource("eu/dnetlib/dhp/actionmanager/partition/input/"))
+ .getFile());
+ }
+
+ private static Dataset readActionsFromJsonDump(String path) {
+ return spark
+ .read()
+ .textFile(path);
+ }
+
+ private static void writeActionsAsJobInput(Dataset actionDS,
+ String inputActionSetId,
+ String path) {
+ actionDS
+ .javaRDD()
+ .mapToPair(json -> new Tuple2<>(new Text(inputActionSetId), new Text(json)))
+ .saveAsNewAPIHadoopFile(path,
+ Text.class,
+ Text.class,
+ SequenceFileOutputFormat.class,
+ configuration);
+ }
+
+ private static void assertForOafType(Path outputDir, Map> oafsByClassName, Class clazz) {
+ Path outputDatasetDir = outputDir.resolve(String.format("clazz=%s", clazz.getCanonicalName()));
+ Files.exists(outputDatasetDir);
+
+ List actuals = readActionPayloadFromJobOutput(outputDatasetDir.toString(), clazz).collectAsList();
+ actuals.sort(Comparator.comparingInt(Object::hashCode));
+
+ List expecteds = oafsByClassName.get(clazz.getCanonicalName()).stream()
+ .map(json -> mapToOaf(json, clazz))
+ .sorted(Comparator.comparingInt(Object::hashCode))
+ .collect(Collectors.toList());
+
+ assertIterableEquals(expecteds, actuals);
+ }
+
+ private static Dataset readActionPayloadFromJobOutput(String path,
+ Class clazz) {
+ return spark
+ .read()
+ .parquet(path)
+ .map((MapFunction) value -> OBJECT_MAPPER.readValue(value.getAs("payload"), clazz),
+ Encoders.bean(clazz));
+ }
+
+ private static T mapToOaf(String json, Class clazz) {
+ return rethrowAsRuntimeException(
+ () -> OBJECT_MAPPER.readValue(json, clazz),
+ String.format("failed to map json to class: json=%s, class=%s", json, clazz.getCanonicalName())
+ );
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/MergeAndGetTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/MergeAndGetTest.java
new file mode 100644
index 000000000..154e0a331
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/MergeAndGetTest.java
@@ -0,0 +1,268 @@
+package eu.dnetlib.dhp.actionmanager.promote;
+
+import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier;
+import eu.dnetlib.dhp.schema.oaf.*;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.util.function.BiFunction;
+
+import static eu.dnetlib.dhp.actionmanager.promote.MergeAndGet.Strategy;
+import static eu.dnetlib.dhp.actionmanager.promote.MergeAndGet.functionFor;
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+public class MergeAndGetTest {
+
+ @Nested
+ class MergeFromAndGetStrategy {
+
+ @Test
+ public void shouldThrowForOafAndOaf() {
+ // given
+ Oaf a = mock(Oaf.class);
+ Oaf b = mock(Oaf.class);
+
+ // when
+ SerializableSupplier> fn = functionFor(Strategy.MERGE_FROM_AND_GET);
+
+ // then
+ assertThrows(RuntimeException.class, () ->
+ fn.get().apply(a, b));
+ }
+
+ @Test
+ public void shouldThrowForOafAndRelation() {
+ // given
+ Oaf a = mock(Oaf.class);
+ Relation b = mock(Relation.class);
+
+ // when
+ SerializableSupplier> fn = functionFor(Strategy.MERGE_FROM_AND_GET);
+
+ // then
+ assertThrows(RuntimeException.class, () ->
+ fn.get().apply(a, b));
+ }
+
+ @Test
+ public void shouldThrowForOafAndOafEntity() {
+ // given
+ Oaf a = mock(Oaf.class);
+ OafEntity b = mock(OafEntity.class);
+
+ // when
+ SerializableSupplier> fn = functionFor(Strategy.MERGE_FROM_AND_GET);
+
+ // then
+ assertThrows(RuntimeException.class, () ->
+ fn.get().apply(a, b));
+ }
+
+ @Test
+ public void shouldThrowForRelationAndOaf() {
+ // given
+ Relation a = mock(Relation.class);
+ Oaf b = mock(Oaf.class);
+
+ // when
+ SerializableSupplier> fn = functionFor(Strategy.MERGE_FROM_AND_GET);
+
+ // then
+ assertThrows(RuntimeException.class, () ->
+ fn.get().apply(a, b));
+ }
+
+ @Test
+ public void shouldThrowForRelationAndOafEntity() {
+ // given
+ Relation a = mock(Relation.class);
+ OafEntity b = mock(OafEntity.class);
+
+ // when
+ SerializableSupplier> fn = functionFor(Strategy.MERGE_FROM_AND_GET);
+
+ // then
+ assertThrows(RuntimeException.class, () ->
+ fn.get().apply(a, b));
+ }
+
+ @Test
+ public void shouldBehaveProperlyForRelationAndRelation() {
+ // given
+ Relation a = mock(Relation.class);
+ Relation b = mock(Relation.class);
+
+ // when
+ SerializableSupplier> fn = functionFor(Strategy.MERGE_FROM_AND_GET);
+
+ // then
+ Oaf x = fn.get().apply(a, b);
+ assertTrue(Relation.class.isAssignableFrom(x.getClass()));
+ verify(a).mergeFrom(b);
+ assertEquals(a, x);
+ }
+
+ @Test
+ public void shouldThrowForOafEntityAndOaf() {
+ // given
+ OafEntity a = mock(OafEntity.class);
+ Oaf b = mock(Oaf.class);
+
+ // when
+ SerializableSupplier> fn = functionFor(Strategy.MERGE_FROM_AND_GET);
+
+ // then
+ assertThrows(RuntimeException.class, () ->
+ fn.get().apply(a, b));
+ }
+
+ @Test
+ public void shouldThrowForOafEntityAndRelation() {
+ // given
+ OafEntity a = mock(OafEntity.class);
+ Relation b = mock(Relation.class);
+
+ // when
+ SerializableSupplier> fn = functionFor(Strategy.MERGE_FROM_AND_GET);
+
+ // then
+ assertThrows(RuntimeException.class, () ->
+ fn.get().apply(a, b));
+ }
+
+ @Test
+ public void shouldThrowForOafEntityAndOafEntityButNotSubclasses() {
+ // given
+ class OafEntitySub1 extends OafEntity {
+ }
+ class OafEntitySub2 extends OafEntity {
+ }
+
+ OafEntitySub1 a = mock(OafEntitySub1.class);
+ OafEntitySub2 b = mock(OafEntitySub2.class);
+
+ // when
+ SerializableSupplier> fn = functionFor(Strategy.MERGE_FROM_AND_GET);
+
+ // then
+ assertThrows(RuntimeException.class, () ->
+ fn.get().apply(a, b));
+ }
+
+ @Test
+ public void shouldBehaveProperlyForOafEntityAndOafEntity() {
+ // given
+ OafEntity a = mock(OafEntity.class);
+ OafEntity b = mock(OafEntity.class);
+
+ // when
+ SerializableSupplier> fn = functionFor(Strategy.MERGE_FROM_AND_GET);
+
+ // then
+ Oaf x = fn.get().apply(a, b);
+ assertTrue(OafEntity.class.isAssignableFrom(x.getClass()));
+ verify(a).mergeFrom(b);
+ assertEquals(a, x);
+ }
+ }
+
+ @Nested
+ class SelectNewerAndGetStrategy {
+
+ @Test
+ public void shouldThrowForOafEntityAndRelation() {
+ // given
+ OafEntity a = mock(OafEntity.class);
+ Relation b = mock(Relation.class);
+
+ // when
+ SerializableSupplier> fn = functionFor(Strategy.SELECT_NEWER_AND_GET);
+
+ // then
+ assertThrows(RuntimeException.class, () ->
+ fn.get().apply(a, b));
+ }
+
+ @Test
+ public void shouldThrowForRelationAndOafEntity() {
+ // given
+ Relation a = mock(Relation.class);
+ OafEntity b = mock(OafEntity.class);
+
+ // when
+ SerializableSupplier> fn = functionFor(Strategy.SELECT_NEWER_AND_GET);
+
+ // then
+ assertThrows(RuntimeException.class, () ->
+ fn.get().apply(a, b));
+ }
+
+ @Test
+ public void shouldThrowForOafEntityAndResult() {
+ // given
+ OafEntity a = mock(OafEntity.class);
+ Result b = mock(Result.class);
+
+ // when
+ SerializableSupplier> fn = functionFor(Strategy.SELECT_NEWER_AND_GET);
+
+ // then
+ assertThrows(RuntimeException.class, () ->
+ fn.get().apply(a, b));
+ }
+
+ @Test
+ public void shouldThrowWhenSuperTypeIsNewerForResultAndOafEntity() {
+ // given
+ // real types must be used because subclass-superclass resolution does not work for mocks
+ Dataset a = new Dataset();
+ a.setLastupdatetimestamp(1L);
+ Result b = new Result();
+ b.setLastupdatetimestamp(2L);
+
+ // when
+ SerializableSupplier> fn = functionFor(Strategy.SELECT_NEWER_AND_GET);
+
+ // then
+ assertThrows(RuntimeException.class, () ->
+ fn.get().apply(a, b));
+ }
+
+ @Test
+ public void shouldShouldReturnLeftForOafEntityAndOafEntity() {
+ // given
+ OafEntity a = mock(OafEntity.class);
+ when(a.getLastupdatetimestamp()).thenReturn(1L);
+ OafEntity b = mock(OafEntity.class);
+ when(b.getLastupdatetimestamp()).thenReturn(2L);
+
+ // when
+ SerializableSupplier> fn =
+ functionFor(Strategy.SELECT_NEWER_AND_GET);
+
+ // then
+ Oaf x = fn.get().apply(a, b);
+ assertTrue(OafEntity.class.isAssignableFrom(x.getClass()));
+ assertEquals(b, x);
+ }
+
+ @Test
+ public void shouldShouldReturnRightForOafEntityAndOafEntity() {
+ // given
+ OafEntity a = mock(OafEntity.class);
+ when(a.getLastupdatetimestamp()).thenReturn(2L);
+ OafEntity b = mock(OafEntity.class);
+ when(b.getLastupdatetimestamp()).thenReturn(1L);
+
+ // when
+ SerializableSupplier> fn =
+ functionFor(Strategy.SELECT_NEWER_AND_GET);
+
+ // then
+ Oaf x = fn.get().apply(a, b);
+ assertTrue(OafEntity.class.isAssignableFrom(x.getClass()));
+ assertEquals(a, x);
+ }
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest.java
new file mode 100644
index 000000000..6f53fbec2
--- /dev/null
+++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest.java
@@ -0,0 +1,239 @@
+package eu.dnetlib.dhp.actionmanager.promote;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.*;
+import org.apache.commons.io.FileUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+
+public class PromoteActionPayloadForGraphTableJobTest {
+ private static final ClassLoader cl = PromoteActionPayloadForGraphTableJobTest.class.getClassLoader();
+
+ private static SparkSession spark;
+
+ private Path workingDir;
+ private Path inputDir;
+ private Path inputGraphRootDir;
+ private Path inputActionPayloadRootDir;
+ private Path outputDir;
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @BeforeAll
+ public static void beforeAll() {
+ SparkConf conf = new SparkConf();
+ conf.setAppName(PromoteActionPayloadForGraphTableJobTest.class.getSimpleName());
+ conf.setMaster("local");
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.registerKryoClasses(ModelSupport.getOafModelClasses());
+ spark = SparkSession.builder().config(conf).getOrCreate();
+ }
+
+ @BeforeEach
+ public void beforeEach() throws IOException {
+ workingDir = Files.createTempDirectory(PromoteActionPayloadForGraphTableJobTest.class.getSimpleName());
+ inputDir = workingDir.resolve("input");
+ inputGraphRootDir = inputDir.resolve("graph");
+ inputActionPayloadRootDir = inputDir.resolve("action_payload");
+ outputDir = workingDir.resolve("output");
+ }
+
+ @AfterEach
+ public void afterEach() throws IOException {
+ FileUtils.deleteDirectory(workingDir.toFile());
+ }
+
+ @AfterAll
+ public static void afterAll() {
+ spark.stop();
+ }
+
+ @DisplayName("Job")
+ @Nested
+ class Main {
+
+ @Test
+ public void shouldThrowWhenGraphTableClassIsNotASubClassOfActionPayloadClass() {
+ // given
+ Class rowClazz = Relation.class;
+ Class actionPayloadClazz = OafEntity.class;
+
+ // when
+ RuntimeException exception = assertThrows(RuntimeException.class, () ->
+ PromoteActionPayloadForGraphTableJob.main(new String[]{
+ "-isSparkSessionManaged", Boolean.FALSE.toString(),
+ "-inputGraphTablePath", "",
+ "-graphTableClassName", rowClazz.getCanonicalName(),
+ "-inputActionPayloadPath", "",
+ "-actionPayloadClassName", actionPayloadClazz.getCanonicalName(),
+ "-outputGraphTablePath", "",
+ "-mergeAndGetStrategy", MergeAndGet.Strategy.SELECT_NEWER_AND_GET.name()
+ }));
+
+ // then
+ String msg = String.format("graph table class is not a subclass of action payload class: graph=%s, action=%s",
+ rowClazz.getCanonicalName(), actionPayloadClazz.getCanonicalName());
+ assertTrue(exception.getMessage().contains(msg));
+ }
+
+ @ParameterizedTest(name = "strategy: {0}, graph table: {1}, action payload: {2}")
+ @MethodSource("eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJobTest#promoteJobTestParams")
+ public void shouldPromoteActionPayloadForGraphTable(MergeAndGet.Strategy strategy,
+ Class extends Oaf> rowClazz,
+ Class extends Oaf> actionPayloadClazz) throws Exception {
+ // given
+ Path inputGraphTableDir = createGraphTable(inputGraphRootDir, rowClazz);
+ Path inputActionPayloadDir = createActionPayload(inputActionPayloadRootDir, rowClazz, actionPayloadClazz);
+ Path outputGraphTableDir = outputDir.resolve("graph").resolve(rowClazz.getSimpleName().toLowerCase());
+
+ // when
+ PromoteActionPayloadForGraphTableJob.main(new String[]{
+ "-isSparkSessionManaged", Boolean.FALSE.toString(),
+ "-inputGraphTablePath", inputGraphTableDir.toString(),
+ "-graphTableClassName", rowClazz.getCanonicalName(),
+ "-inputActionPayloadPath", inputActionPayloadDir.toString(),
+ "-actionPayloadClassName", actionPayloadClazz.getCanonicalName(),
+ "-outputGraphTablePath", outputGraphTableDir.toString(),
+ "-mergeAndGetStrategy", strategy.name()
+ });
+
+ // then
+ assertTrue(Files.exists(outputGraphTableDir));
+
+ List extends Oaf> actualOutputRows = readGraphTableFromJobOutput(outputGraphTableDir.toString(), rowClazz)
+ .collectAsList()
+ .stream()
+ .sorted(Comparator.comparingInt(Object::hashCode))
+ .collect(Collectors.toList());
+ String expectedOutputGraphTableJsonDumpPath = resultFileLocation(strategy, rowClazz, actionPayloadClazz);
+ Path expectedOutputGraphTableJsonDumpFile = Paths
+ .get(Objects.requireNonNull(cl.getResource(expectedOutputGraphTableJsonDumpPath)).getFile());
+ List extends Oaf> expectedOutputRows = readGraphTableFromJsonDump(expectedOutputGraphTableJsonDumpFile.toString(), rowClazz)
+ .collectAsList()
+ .stream()
+ .sorted(Comparator.comparingInt(Object::hashCode))
+ .collect(Collectors.toList());
+ assertIterableEquals(expectedOutputRows, actualOutputRows);
+ }
+ }
+
+ public static Stream promoteJobTestParams() {
+ return Stream.of(
+ arguments(MergeAndGet.Strategy.MERGE_FROM_AND_GET, eu.dnetlib.dhp.schema.oaf.Dataset.class, eu.dnetlib.dhp.schema.oaf.Dataset.class),
+ arguments(MergeAndGet.Strategy.MERGE_FROM_AND_GET, eu.dnetlib.dhp.schema.oaf.Dataset.class, eu.dnetlib.dhp.schema.oaf.Result.class),
+ arguments(MergeAndGet.Strategy.MERGE_FROM_AND_GET, Datasource.class, Datasource.class),
+ arguments(MergeAndGet.Strategy.MERGE_FROM_AND_GET, Organization.class, Organization.class),
+ arguments(MergeAndGet.Strategy.MERGE_FROM_AND_GET, OtherResearchProduct.class, OtherResearchProduct.class),
+ arguments(MergeAndGet.Strategy.MERGE_FROM_AND_GET, OtherResearchProduct.class, Result.class),
+ arguments(MergeAndGet.Strategy.MERGE_FROM_AND_GET, Project.class, Project.class),
+ arguments(MergeAndGet.Strategy.MERGE_FROM_AND_GET, Publication.class, Publication.class),
+ arguments(MergeAndGet.Strategy.MERGE_FROM_AND_GET, Publication.class, Result.class),
+ arguments(MergeAndGet.Strategy.MERGE_FROM_AND_GET, Relation.class, Relation.class),
+ arguments(MergeAndGet.Strategy.MERGE_FROM_AND_GET, Software.class, Software.class),
+ arguments(MergeAndGet.Strategy.MERGE_FROM_AND_GET, Software.class, Result.class)
+ );
+ }
+
+ private static Path createGraphTable(Path inputGraphRootDir,
+ Class rowClazz) {
+ String inputGraphTableJsonDumpPath = inputGraphTableJsonDumpLocation(rowClazz);
+ Path inputGraphTableJsonDumpFile = Paths
+ .get(Objects.requireNonNull(cl.getResource(inputGraphTableJsonDumpPath)).getFile());
+ Dataset rowDS = readGraphTableFromJsonDump(inputGraphTableJsonDumpFile.toString(), rowClazz);
+ String inputGraphTableName = rowClazz.getSimpleName().toLowerCase();
+ Path inputGraphTableDir = inputGraphRootDir.resolve(inputGraphTableName);
+ writeGraphTableAaJobInput(rowDS, inputGraphTableDir.toString());
+ return inputGraphTableDir;
+ }
+
+ private static String inputGraphTableJsonDumpLocation(Class extends Oaf> rowClazz) {
+ return String.format(
+ "%s/%s.json", "eu/dnetlib/dhp/actionmanager/promote/input/graph", rowClazz.getSimpleName().toLowerCase());
+ }
+
+ private static Dataset readGraphTableFromJsonDump(String path,
+ Class rowClazz) {
+ return spark
+ .read()
+ .textFile(path)
+ .map((MapFunction) json -> OBJECT_MAPPER.readValue(json, rowClazz), Encoders.bean(rowClazz));
+ }
+
+ private static void writeGraphTableAaJobInput(Dataset rowDS,
+ String path) {
+ rowDS
+ .write()
+ .option("compression", "gzip")
+ .json(path);
+ }
+
+ private static Path createActionPayload(Path inputActionPayloadRootDir,
+ Class rowClazz,
+ Class actionPayloadClazz) {
+ String inputActionPayloadJsonDumpPath = inputActionPayloadJsonDumpLocation(rowClazz, actionPayloadClazz);
+ Path inputActionPayloadJsonDumpFile = Paths
+ .get(Objects.requireNonNull(cl.getResource(inputActionPayloadJsonDumpPath)).getFile());
+ Dataset