diff --git a/dhp-workflows/dhp-stats-actionsets/pom.xml b/dhp-workflows/dhp-stats-actionsets/pom.xml
new file mode 100644
index 000000000..670fd4a5e
--- /dev/null
+++ b/dhp-workflows/dhp-stats-actionsets/pom.xml
@@ -0,0 +1,114 @@
+
+
+ 4.0.0
+
+ eu.dnetlib.dhp
+ dhp-workflows
+ 1.2.5-SNAPSHOT
+
+ dhp-stats-actionsets
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+
+ org.apache.spark
+ spark-core_2.11
+
+
+ org.apache.spark
+ spark-sql_2.11
+
+
+
+ eu.dnetlib.dhp
+ dhp-common
+ ${project.version}
+
+
+
+ net.sf.saxon
+ Saxon-HE
+
+
+ dom4j
+ dom4j
+
+
+
+ xml-apis
+ xml-apis
+
+
+
+ jaxen
+ jaxen
+
+
+
+ org.json
+ json
+
+
+
+
+ org.apache.poi
+ poi-ooxml
+
+
+
+
+ org.apache.commons
+ commons-compress
+
+
+
+ org.mongodb
+ mongo-java-driver
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java
new file mode 100644
index 000000000..0df34364d
--- /dev/null
+++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java
@@ -0,0 +1,102 @@
+
+package eu.dnetlib.dhp.actionmanager;
+
+import java.util.Optional;
+
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+import eu.dnetlib.dhp.schema.oaf.Subject;
+import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
+
+public class Constants {
+
+ public static final String DOI = "doi";
+ public static final String DOI_CLASSNAME = "Digital Object Identifier";
+
+ public static final String DEFAULT_DELIMITER = ",";
+ public static final String DEFAULT_FOS_DELIMITER = "\t";
+
+ public static final String UPDATE_DATA_INFO_TYPE = "update";
+// public static final String UPDATE_SUBJECT_FOS_CLASS_ID = "subject:fos";
+ public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE";
+// public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip";
+// public static final String UPDATE_SUBJECT_SDG_CLASS_ID = "subject:sdg";
+// public static final String UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID = "measure:usage_counts";
+// public static final String UPDATE_KEY_USAGE_COUNTS = "count";
+
+ public static final String UPDATE_MEASURE_STATS_MODEL_CLASS_ID = "measure:stats_model";
+ public static final String UPDATE_KEY_STATS_MODEL = "stats_model";
+
+// public static final String UPDATE_MEASURE_PUBLICLY_FUNDED_CLASS_ID = "measure:publicly_funded";
+// public static final String UPDATE_KEY_PUBLICLY_FUNDED = "publicly_funded";
+
+// public static final String FOS_CLASS_ID = "FOS";
+// public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification";
+//
+// public static final String SDG_CLASS_ID = "SDG";
+// public static final String SDG_CLASS_NAME = "Sustainable Development Goals";
+
+ public static final String NULL = "NULL";
+
+ public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private Constants() {
+ }
+
+ public static Boolean isSparkSessionManaged(ArgumentApplicationParser parser) {
+ return Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ }
+
+ public static Dataset readPath(
+ SparkSession spark, String inputPath, Class clazz) {
+ return spark
+ .read()
+ .textFile(inputPath)
+ .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
+ }
+
+ public static Subject getSubject(String sbj, String classid, String classname,
+ String diqualifierclassid) {
+ if (sbj == null || sbj.equals(NULL))
+ return null;
+ Subject s = new Subject();
+ s.setValue(sbj);
+ s
+ .setQualifier(
+ OafMapperUtils
+ .qualifier(
+ classid,
+ classname,
+ ModelConstants.DNET_SUBJECT_TYPOLOGIES,
+ ModelConstants.DNET_SUBJECT_TYPOLOGIES));
+ s
+ .setDataInfo(
+ OafMapperUtils
+ .dataInfo(
+ false,
+ UPDATE_DATA_INFO_TYPE,
+ true,
+ false,
+ OafMapperUtils
+ .qualifier(
+ diqualifierclassid,
+ UPDATE_CLASS_NAME,
+ ModelConstants.DNET_PROVENANCE_ACTIONS,
+ ModelConstants.DNET_PROVENANCE_ACTIONS),
+ ""));
+
+ return s;
+
+ }
+}
diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java
new file mode 100644
index 000000000..f6e2cfe01
--- /dev/null
+++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java
@@ -0,0 +1,286 @@
+
+package eu.dnetlib.dhp.actionmanager.stats_actionsets;
+
+import static eu.dnetlib.dhp.actionmanager.Constants.*;
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.MapGroupsFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.schema.action.AtomicAction;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.oaf.*;
+import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
+import scala.Tuple2;
+
+/**
+ * created the Atomic Action for each type of results
+ */
+public class StatsAtomicActionsJob implements Serializable {
+
+ private static final Logger log = LoggerFactory.getLogger(StatsAtomicActionsJob.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static void main(String[] args) throws Exception {
+
+ String jsonConfiguration = IOUtils
+ .toString(
+ StatsAtomicActionsJob.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json"));
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String outputPath = parser.get("outputPath");
+ log.info("outputPath {}: ", outputPath);
+
+ SparkConf conf = new SparkConf();
+ conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
+
+ final String dbname = parser.get("statsDB");
+
+ final String workingPath = parser.get("workingPath");
+
+ runWithSparkHiveSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ removeOutputDir(spark, outputPath);
+ prepareGreenData(dbname, spark, workingPath + "/greenOADB", "indi_pub_green_oa", "id");
+ prepareDiamondData(dbname, spark, workingPath + "/diamondOADΒ", "indi_pub_diamond", "id");
+ preparePubliclyFundedData(
+ dbname, spark, workingPath + "/publiclyFundedDΒ", "indi_funded_result_with_fundref", "id");
+ prepareOAColourData(dbname, spark, workingPath + "/oacolourDB", "", "id");
+ writeActionSet(spark, workingPath, outputPath);
+ });
+ }
+
+ private static void prepareGreenData(String dbname, SparkSession spark, String workingPath, String tableName,
+ String resultAttributeName) {
+ spark
+ .sql(
+ String
+ .format(
+ "select %s as id, green_oa as green_oa " +
+ "from %s.%s",
+ resultAttributeName, dbname, tableName))
+ .as(Encoders.bean(StatsGreenOAModel.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(workingPath);
+ }
+
+ private static void prepareDiamondData(String dbname, SparkSession spark, String workingPath, String tableName,
+ String resultAttributeName) {
+ spark
+ .sql(
+ String
+ .format(
+ "select %s as id, in_diamond_journal as in_diamond_journal " +
+ "from %s.%s",
+ resultAttributeName, dbname, tableName))
+ .as(Encoders.bean(StatsDiamondOAModel.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(workingPath);
+ }
+
+ private static void preparePubliclyFundedData(String dbname, SparkSession spark, String workingPath,
+ String tableName,
+ String resultAttributeName) {
+ spark
+ .sql(
+ String
+ .format(
+ "select %s as id, fundref as publicly_funded " +
+ "from %s.%s",
+ resultAttributeName, dbname, tableName))
+ .as(Encoders.bean(StatsPubliclyFundedModel.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(workingPath);
+ }
+
+ private static void prepareOAColourData(String dbname, SparkSession spark, String workingPath, String tableName,
+ String resultAttributeName) {
+ spark
+ .sql(
+ String
+ .format(
+ "select b.%s as id, is_gold, is_bronze_oa, is_hybrid from %s.indi_pub_bronze_oa b " +
+ "left outer join %s.indi_pub_gold_oa g on g.id=b.id " +
+ "left outer join %s.indi_pub_hybrid h on b.id=h.id",
+ resultAttributeName, dbname, dbname, dbname))
+ .as(Encoders.bean(StatsOAColourModel.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(workingPath);
+ }
+
+ public static void writeActionSet(SparkSession spark, String inputPath, String outputPath) {
+
+ getFinalIndicatorsGreenResult(spark, inputPath + "/greenOADB")
+ .toJavaRDD()
+ .map(p -> new AtomicAction(p.getClass(), p))
+ .union(
+ getFinalIndicatorsDiamondResult(spark, inputPath + "/diamondOADΒ")
+ .toJavaRDD()
+ .map(p -> new AtomicAction(p.getClass(), p)))
+ .union(
+ getFinalIndicatorsPubliclyFundedResult(spark, inputPath + "/publiclyFundedDΒ")
+ .toJavaRDD()
+ .map(p -> new AtomicAction(p.getClass(), p)))
+ .union(
+ getFinalIndicatorsOAColourResult(spark, inputPath + "/oacolourDB")
+ .toJavaRDD()
+ .map(p -> new AtomicAction(p.getClass(), p)))
+ .mapToPair(
+ aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
+ new Text(OBJECT_MAPPER.writeValueAsString(aa))))
+ .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
+ }
+
+ public static Measure newMeasureInstance(String id) {
+ Measure m = new Measure();
+ m.setId(id);
+ m.setUnit(new ArrayList<>());
+ return m;
+ }
+
+ private static Dataset getFinalIndicatorsGreenResult(SparkSession spark, String inputPath) {
+
+ return readPath(spark, inputPath, StatsGreenOAModel.class)
+ .map((MapFunction) usm -> {
+ Result r = new Result();
+ r.setId("50|" + usm.getId());
+ r.setMeasures(getMeasure(usm.isGreen_oa(), "green_oa"));
+ return r;
+ }, Encoders.bean(Result.class));
+ }
+
+ private static Dataset getFinalIndicatorsDiamondResult(SparkSession spark, String inputPath) {
+
+ return readPath(spark, inputPath, StatsDiamondOAModel.class)
+ .map((MapFunction) usm -> {
+ Result r = new Result();
+ r.setId("50|" + usm.getId());
+ r.setMeasures(getMeasure(usm.isIn_diamond_journal(), "in_diamond_journal"));
+ return r;
+ }, Encoders.bean(Result.class));
+ }
+
+ private static Dataset getFinalIndicatorsPubliclyFundedResult(SparkSession spark, String inputPath) {
+
+ return readPath(spark, inputPath, StatsPubliclyFundedModel.class)
+ .map((MapFunction) usm -> {
+ Result r = new Result();
+ r.setId("50|" + usm.getId());
+ r.setMeasures(getMeasure(usm.isPublicly_funded(), "publicly_funded"));
+ return r;
+ }, Encoders.bean(Result.class));
+ }
+
+ private static Dataset getFinalIndicatorsOAColourResult(SparkSession spark, String inputPath) {
+
+ return readPath(spark, inputPath, StatsOAColourModel.class)
+ .map((MapFunction) usm -> {
+ Result r = new Result();
+ r.setId("50|" + usm.getId());
+ r.setMeasures(getMeasureOAColour(usm.isIs_gold(), usm.isIs_bronze_oa(), usm.isIs_hybrid()));
+ return r;
+ }, Encoders.bean(Result.class));
+ }
+
+ private static List getMeasure(Boolean is_model_oa, String model_type) {
+ DataInfo dataInfo = OafMapperUtils
+ .dataInfo(
+ false,
+ UPDATE_DATA_INFO_TYPE,
+ true,
+ false,
+ OafMapperUtils
+ .qualifier(
+ UPDATE_MEASURE_STATS_MODEL_CLASS_ID,
+ UPDATE_CLASS_NAME,
+ ModelConstants.DNET_PROVENANCE_ACTIONS,
+ ModelConstants.DNET_PROVENANCE_ACTIONS),
+ "");
+
+ return Arrays
+ .asList(
+ OafMapperUtils
+ .newMeasureInstance(model_type, String.valueOf(is_model_oa), UPDATE_KEY_STATS_MODEL, dataInfo));
+ }
+
+ private static List getMeasureOAColour(Boolean is_gold, Boolean is_bronze_oa, Boolean is_hybrid) {
+ DataInfo dataInfo = OafMapperUtils
+ .dataInfo(
+ false,
+ UPDATE_DATA_INFO_TYPE,
+ true,
+ false,
+ OafMapperUtils
+ .qualifier(
+ UPDATE_MEASURE_STATS_MODEL_CLASS_ID,
+ UPDATE_CLASS_NAME,
+ ModelConstants.DNET_PROVENANCE_ACTIONS,
+ ModelConstants.DNET_PROVENANCE_ACTIONS),
+ "");
+
+ return Arrays
+ .asList(
+ OafMapperUtils
+ .newMeasureInstance("is_gold", String.valueOf(is_gold), UPDATE_KEY_STATS_MODEL, dataInfo),
+ OafMapperUtils
+ .newMeasureInstance("is_bronze_oa", String.valueOf(is_bronze_oa), UPDATE_KEY_STATS_MODEL, dataInfo),
+ OafMapperUtils
+ .newMeasureInstance("is_hybrid", String.valueOf(is_hybrid), UPDATE_KEY_STATS_MODEL, dataInfo));
+
+ }
+
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+ public static Dataset readPath(
+ SparkSession spark, String inputPath, Class clazz) {
+ return spark
+ .read()
+ .textFile(inputPath)
+ .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
+ }
+
+}
diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsDiamondOAModel.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsDiamondOAModel.java
new file mode 100644
index 000000000..340790fa8
--- /dev/null
+++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsDiamondOAModel.java
@@ -0,0 +1,29 @@
+
+package eu.dnetlib.dhp.actionmanager.stats_actionsets;
+
+import java.io.Serializable;
+
+/**
+ * @author dimitris.pierrakos
+ * @Date 30/10/23
+ */
+public class StatsDiamondOAModel implements Serializable {
+ private String id;
+ private boolean in_diamond_journal;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public boolean isIn_diamond_journal() {
+ return in_diamond_journal;
+ }
+
+ public void setIn_diamond_journal(boolean in_diamond_journal) {
+ this.in_diamond_journal = in_diamond_journal;
+ }
+}
diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsGreenOAModel.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsGreenOAModel.java
new file mode 100644
index 000000000..3d59bdf95
--- /dev/null
+++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsGreenOAModel.java
@@ -0,0 +1,29 @@
+
+package eu.dnetlib.dhp.actionmanager.stats_actionsets;
+
+import java.io.Serializable;
+
+/**
+ * @author dimitris.pierrakos
+ * @Date 30/10/23
+ */
+public class StatsGreenOAModel implements Serializable {
+ private String id;
+ private boolean green_oa;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public boolean isGreen_oa() {
+ return green_oa;
+ }
+
+ public void setGreen_oa(boolean green_oa) {
+ this.green_oa = green_oa;
+ }
+}
diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsOAColourModel.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsOAColourModel.java
new file mode 100644
index 000000000..a83eb4f60
--- /dev/null
+++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsOAColourModel.java
@@ -0,0 +1,47 @@
+
+package eu.dnetlib.dhp.actionmanager.stats_actionsets;
+
+import java.io.Serializable;
+
+/**
+ * @author dimitris.pierrakos
+ * @Date 30/10/23
+ */
+public class StatsOAColourModel implements Serializable {
+ private String id;
+ private boolean is_gold;
+ private boolean is_bronze_oa;
+ private boolean is_hybrid;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public boolean isIs_gold() {
+ return is_gold;
+ }
+
+ public void setIs_gold(boolean is_gold) {
+ this.is_gold = is_gold;
+ }
+
+ public boolean isIs_bronze_oa() {
+ return is_bronze_oa;
+ }
+
+ public void setIs_bronze_oa(boolean is_bronze_oa) {
+ this.is_bronze_oa = is_bronze_oa;
+ }
+
+ public boolean isIs_hybrid() {
+ return is_hybrid;
+ }
+
+ public void setIs_hybrid(boolean is_hybrid) {
+ this.is_hybrid = is_hybrid;
+ }
+}
diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsPubliclyFundedModel.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsPubliclyFundedModel.java
new file mode 100644
index 000000000..6bdc78220
--- /dev/null
+++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsPubliclyFundedModel.java
@@ -0,0 +1,29 @@
+
+package eu.dnetlib.dhp.actionmanager.stats_actionsets;
+
+import java.io.Serializable;
+
+/**
+ * @author dimitris.pierrakos
+ * @Date 30/10/23
+ */
+public class StatsPubliclyFundedModel implements Serializable {
+ private String id;
+ private boolean publicly_funded;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public boolean isPublicly_funded() {
+ return publicly_funded;
+ }
+
+ public void setPublicly_funded(boolean publicly_funded) {
+ this.publicly_funded = publicly_funded;
+ }
+}
diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json
new file mode 100644
index 000000000..bb24d4c99
--- /dev/null
+++ b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json
@@ -0,0 +1,32 @@
+[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+ },
+ {
+ "paramName": "hmu",
+ "paramLongName": "hive_metastore_uris",
+ "paramDescription": "the URI for the hive metastore",
+ "paramRequired": true
+ },
+ {
+ "paramName": "o",
+ "paramLongName": "outputPath",
+ "paramDescription": "the path of the new ActionSet",
+ "paramRequired": true
+ },
+ {
+ "paramName": "sdb",
+ "paramLongName": "statsDB",
+ "paramDescription": "the name of the stats db to be used",
+ "paramRequired": true
+ },
+ {
+ "paramName": "wp",
+ "paramLongName": "workingPath",
+ "paramDescription": "the workingPath where to save the content of the usage_stats table",
+ "paramRequired": true
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/config-default.xml b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/config-default.xml
new file mode 100644
index 000000000..d262cb6e0
--- /dev/null
+++ b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/config-default.xml
@@ -0,0 +1,30 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ hiveMetastoreUris
+ thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
+
+
+ hiveJdbcUrl
+ jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000
+
+
+ hiveDbName
+ openaire
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/workflow.xml b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/workflow.xml
new file mode 100644
index 000000000..1aa4e1050
--- /dev/null
+++ b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/workflow.xml
@@ -0,0 +1,125 @@
+
+
+
+ outputPath
+ the path where to store the actionset
+
+
+ statsDB
+ the name of the stats db to be used
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+ oozieActionShareLibForSpark2
+ oozie action sharelib for spark 2.*
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+ spark 2.* extra listeners classname
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+ spark 2.* sql query execution listeners classname
+
+
+ spark2YarnHistoryServerAddress
+ spark 2.* yarn history server address
+
+
+ spark2EventLogDir
+ spark 2.* event log dir location
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ mapreduce.job.queuename
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.queue.name
+ ${oozieLauncherQueueName}
+
+
+ oozie.action.sharelib.for.spark
+ ${oozieActionShareLibForSpark2}
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ yarn
+ cluster
+ Produces the atomic action with the usage stats count for results
+
+ eu.dnetlib.dhp.actionmanager.stats_actionsets.SparkAtomicActionGreenOAJob
+ dhp-stats-actionsets-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
+
+ --hive_metastore_uris${hiveMetastoreUris}
+ --outputPath${outputPath}
+ --statsDB${statsDB}
+ --workingPath${workingDir}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Produces the atomic action with the stats green_oa for results
+
+ eu.dnetlib.dhp.actionmanager.stats_actionsets.StatsAtomicActionsJob
+ dhp-stats-actionsets-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
+
+ --hive_metastore_uris${hiveMetastoreUris}
+ --outputPath${outputPath}
+ --statsDB${statsDB}
+ --workingPath${workingDir}
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/resources/log4j.properties b/dhp-workflows/dhp-stats-actionsets/src/main/resources/log4j.properties
new file mode 100644
index 000000000..81458d1f7
--- /dev/null
+++ b/dhp-workflows/dhp-stats-actionsets/src/main/resources/log4j.properties
@@ -0,0 +1,12 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.org.apache.spark=FATAL
+log4j.logger.org.spark_project=FATAL