diff --git a/dhp-workflows/dhp-stats-actionsets/pom.xml b/dhp-workflows/dhp-stats-actionsets/pom.xml
new file mode 100644
index 0000000000..3daa8f9959
--- /dev/null
+++ b/dhp-workflows/dhp-stats-actionsets/pom.xml
@@ -0,0 +1,75 @@
+
+
+ 4.0.0
+
+ eu.dnetlib.dhp
+ dhp-workflows
+ 1.2.5-SNAPSHOT
+
+ dhp-stats-actionsets
+
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+
+ org.apache.spark
+ spark-core_2.11
+
+
+ org.apache.spark
+ spark-sql_2.11
+
+
+
+ eu.dnetlib.dhp
+ dhp-common
+ ${project.version}
+
+
+
+ net.sf.saxon
+ Saxon-HE
+
+
+ dom4j
+ dom4j
+
+
+
+ xml-apis
+ xml-apis
+
+
+
+ jaxen
+ jaxen
+
+
+
+ org.json
+ json
+
+
+
+
+ org.apache.poi
+ poi-ooxml
+
+
+
+
+ org.apache.commons
+ commons-compress
+
+
+
+ org.mongodb
+ mongo-java-driver
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java
new file mode 100644
index 0000000000..0c3c2f3c0a
--- /dev/null
+++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java
@@ -0,0 +1,162 @@
+
+package eu.dnetlib.dhp.actionmanager.stats_actionsets;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.MapGroupsFunction;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.schema.action.AtomicAction;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.oaf.*;
+import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
+import scala.Tuple2;
+
+/**
+ * created the Atomic Action for each type of results
+ */
+public class StatsAtomicActionsJob implements Serializable {
+
+ private static final Logger log = LoggerFactory.getLogger(StatsAtomicActionsJob.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static void main(String[] args) throws Exception {
+
+ String jsonConfiguration = IOUtils
+ .toString(
+ StatsAtomicActionsJob.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json"));
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String outputPath = parser.get("outputPath");
+ log.info("outputPath {}: ", outputPath);
+
+ SparkConf conf = new SparkConf();
+ conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
+ conf.set("spark.speculation", "false");
+ conf.set("spark.hadoop.mapreduce.map.speculative", "false");
+ conf.set("spark.hadoop.mapreduce.reduce.speculative", "false");
+
+ final String dbname = parser.get("statsDB");
+
+ final String workingPath = parser.get("workingPath");
+
+ runWithSparkHiveSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ removeOutputDir(spark, outputPath);
+ prepareResultEnhancement(dbname, spark, workingPath + "/resultEnhancements", "id");
+ writeActionSet(spark, workingPath, outputPath);
+ });
+ }
+
+ private static void prepareResultEnhancement(String dbname, SparkSession spark, String workingPath,
+ String resultAttributeName) {
+ spark
+ .sql(
+ String
+ .format(
+ "select r.%s as id, is_gold, is_bronze_oa, is_hybrid,green_oa, in_diamond_journal,f.publicly_funded as publicly_funded "
+ +
+ "from %s.publication r " +
+ "left outer join %s.indi_pub_bronze_oa b on r.id=b.id " +
+ "left outer join %s.indi_pub_gold_oa g on r.id=g.id " +
+ "left outer join %s.indi_pub_hybrid h on r.id=h.id " +
+ "left outer join %s.indi_pub_green_oa gr on r.id=gr.id " +
+ "left outer join %s.indi_pub_diamond d on b.id=d.id " +
+ "left outer join %s.indi_pub_publicly_funded f on r.id=f.id ",
+ resultAttributeName, dbname, dbname, dbname, dbname, dbname, dbname, dbname))
+ .as(Encoders.bean(StatsResultEnhancementModel.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(workingPath);
+ }
+
+ public static void writeActionSet(SparkSession spark, String inputPath, String outputPath) {
+
+ getResultEnhancements(spark, inputPath + "/resultEnhancements")
+ .toJavaRDD()
+ .map(p -> new AtomicAction(p.getClass(), p))
+ .mapToPair(
+ aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
+ new Text(OBJECT_MAPPER.writeValueAsString(aa))))
+ .saveAsHadoopFile(
+ outputPath,
+ Text.class,
+ Text.class,
+ SequenceFileOutputFormat.class,
+ GzipCodec.class);
+ }
+
+ private static Dataset getResultEnhancements(SparkSession spark, String inputPath) {
+
+ return readPath(spark, inputPath, StatsResultEnhancementModel.class)
+ .map((MapFunction) usm -> {
+ Result r = new Result();
+ r.setId("50|" + usm.getId());
+ r.setIsInDiamondJournal(usm.isIn_diamond_journal());
+ r.setIsGreen(usm.isGreen_oa());
+ r.setPubliclyFunded(usm.isPublicly_funded());
+ if (usm.isIs_gold())
+ r.setOpenAccessColor(OpenAccessColor.gold);
+ else if (usm.isIs_hybrid())
+ r.setOpenAccessColor(OpenAccessColor.hybrid);
+ else if (usm.isIs_bronze_oa())
+ r.setOpenAccessColor(OpenAccessColor.bronze);
+ return r;
+ }, Encoders.bean(Result.class));
+ }
+
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+ public static Dataset readPath(
+ SparkSession spark, String inputPath, Class clazz) {
+ return spark
+ .read()
+ .textFile(inputPath)
+ .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
+ }
+}
diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsResultEnhancementModel.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsResultEnhancementModel.java
new file mode 100644
index 0000000000..f794c55916
--- /dev/null
+++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsResultEnhancementModel.java
@@ -0,0 +1,76 @@
+
+package eu.dnetlib.dhp.actionmanager.stats_actionsets;
+
+import java.io.Serializable;
+
+import eu.dnetlib.dhp.schema.oaf.*;
+
+/**
+ * @author dimitris.pierrakos
+ * @Date 30/10/23
+ */
+public class StatsResultEnhancementModel implements Serializable {
+ private String id;
+ private Boolean is_gold;
+ private Boolean is_bronze_oa;
+ private Boolean is_hybrid;
+ private boolean in_diamond_journal;
+ private boolean green_oa;
+ private boolean publicly_funded;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public Boolean isIs_gold() {
+ return is_gold;
+ }
+
+ public void setIs_gold(Boolean is_gold) {
+ this.is_gold = is_gold;
+ }
+
+ public Boolean isIs_bronze_oa() {
+ return is_bronze_oa;
+ }
+
+ public void setIs_bronze_oa(Boolean is_bronze_oa) {
+ this.is_bronze_oa = is_bronze_oa;
+ }
+
+ public Boolean isIs_hybrid() {
+ return is_hybrid;
+ }
+
+ public void setIs_hybrid(Boolean is_hybrid) {
+ this.is_hybrid = is_hybrid;
+ }
+
+ public boolean isIn_diamond_journal() {
+ return in_diamond_journal;
+ }
+
+ public void setIn_diamond_journal(boolean in_diamond_journal) {
+ this.in_diamond_journal = in_diamond_journal;
+ }
+
+ public boolean isGreen_oa() {
+ return green_oa;
+ }
+
+ public void setGreen_oa(boolean green_oa) {
+ this.green_oa = green_oa;
+ }
+
+ public boolean isPublicly_funded() {
+ return publicly_funded;
+ }
+
+ public void setPublicly_funded(boolean publicly_funded) {
+ this.publicly_funded = publicly_funded;
+ }
+}
diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json
new file mode 100644
index 0000000000..bb24d4c99d
--- /dev/null
+++ b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json
@@ -0,0 +1,32 @@
+[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+ },
+ {
+ "paramName": "hmu",
+ "paramLongName": "hive_metastore_uris",
+ "paramDescription": "the URI for the hive metastore",
+ "paramRequired": true
+ },
+ {
+ "paramName": "o",
+ "paramLongName": "outputPath",
+ "paramDescription": "the path of the new ActionSet",
+ "paramRequired": true
+ },
+ {
+ "paramName": "sdb",
+ "paramLongName": "statsDB",
+ "paramDescription": "the name of the stats db to be used",
+ "paramRequired": true
+ },
+ {
+ "paramName": "wp",
+ "paramLongName": "workingPath",
+ "paramDescription": "the workingPath where to save the content of the usage_stats table",
+ "paramRequired": true
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/config-default.xml b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/config-default.xml
new file mode 100644
index 0000000000..d262cb6e05
--- /dev/null
+++ b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/config-default.xml
@@ -0,0 +1,30 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ hiveMetastoreUris
+ thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
+
+
+ hiveJdbcUrl
+ jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000
+
+
+ hiveDbName
+ openaire
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/workflow.xml b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/workflow.xml
new file mode 100644
index 0000000000..79fd3f56e2
--- /dev/null
+++ b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/workflow.xml
@@ -0,0 +1,97 @@
+
+
+
+ outputPath
+ the path where to store the actionset
+
+
+ statsDB
+ the name of the stats db to be used
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+ oozieActionShareLibForSpark2
+ oozie action sharelib for spark 2.*
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+ spark 2.* extra listeners classname
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+ spark 2.* sql query execution listeners classname
+
+
+ spark2YarnHistoryServerAddress
+ spark 2.* yarn history server address
+
+
+ spark2EventLogDir
+ spark 2.* event log dir location
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ mapreduce.job.queuename
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.queue.name
+ ${oozieLauncherQueueName}
+
+
+ oozie.action.sharelib.for.spark
+ ${oozieActionShareLibForSpark2}
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ yarn
+ cluster
+ Produces the atomic action with the stats green_oa for results
+ eu.dnetlib.dhp.actionmanager.stats_actionsets.StatsAtomicActionsJob
+ dhp-stats-actionsets-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
+
+ --hive_metastore_uris${hiveMetastoreUris}
+ --outputPath${outputPath}
+ --statsDB${statsDB}
+ --workingPath${workingDir}
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/resources/log4j.properties b/dhp-workflows/dhp-stats-actionsets/src/main/resources/log4j.properties
new file mode 100644
index 0000000000..81458d1f7e
--- /dev/null
+++ b/dhp-workflows/dhp-stats-actionsets/src/main/resources/log4j.properties
@@ -0,0 +1,12 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.org.apache.spark=FATAL
+log4j.logger.org.spark_project=FATAL