StatsDB workflow to export actionsets about OA routes, diamond, and publicly-funded #355

Merged
claudio.atzori merged 4 commits from dimitris.pierrakos/dnet-hadoop:beta into beta 2023-12-01 15:03:58 +01:00
9 changed files with 116 additions and 469 deletions
Showing only changes of commit d524e30866 - Show all commits

View File

@ -8,46 +8,7 @@
</parent>
<artifactId>dhp-stats-actionsets</artifactId>
<!-- <build>-->
<!-- <plugins>-->
<!-- <plugin>-->
<!-- <groupId>net.alchim31.maven</groupId>-->
<!-- <artifactId>scala-maven-plugin</artifactId>-->
<!-- <version>${net.alchim31.maven.version}</version>-->
<!-- <executions>-->
<!-- <execution>-->
<!-- <id>scala-compile-first</id>-->
<!-- <phase>initialize</phase>-->
<!-- <goals>-->
<!-- <goal>add-source</goal>-->
<!-- <goal>compile</goal>-->
<!-- </goals>-->
<!-- </execution>-->
<!-- <execution>-->
<!-- <id>scala-test-compile</id>-->
<!-- <phase>process-test-resources</phase>-->
<!-- <goals>-->
<!-- <goal>testCompile</goal>-->
<!-- </goals>-->
<!-- </execution>-->
<!-- <execution>-->
<!-- <id>scala-doc</id>-->
<!-- <phase>process-resources</phase> &lt;!&ndash; or wherever &ndash;&gt;-->
<!-- <goals>-->
<!-- <goal>doc</goal>-->
<!-- </goals>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- <configuration>-->
<!-- <scalaVersion>${scala.version}</scalaVersion>-->
<!-- </configuration>-->
<!-- </plugin>-->
<!-- </plugins>-->
<!-- </build>-->
<dependencies>
dimitris.pierrakos marked this conversation as resolved
Review

If this block is commented out and it is not needed, then please remove it

If this block is commented out and it is not needed, then please remove it
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>

View File

@ -1,102 +0,0 @@
package eu.dnetlib.dhp.actionmanager;
import java.util.Optional;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.Subject;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
public class Constants {
public static final String DOI = "doi";
public static final String DOI_CLASSNAME = "Digital Object Identifier";
public static final String DEFAULT_DELIMITER = ",";
public static final String DEFAULT_FOS_DELIMITER = "\t";
public static final String UPDATE_DATA_INFO_TYPE = "update";
// public static final String UPDATE_SUBJECT_FOS_CLASS_ID = "subject:fos";
public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE";
// public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip";
// public static final String UPDATE_SUBJECT_SDG_CLASS_ID = "subject:sdg";
// public static final String UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID = "measure:usage_counts";
// public static final String UPDATE_KEY_USAGE_COUNTS = "count";
public static final String UPDATE_MEASURE_STATS_MODEL_CLASS_ID = "measure:stats_model";
public static final String UPDATE_KEY_STATS_MODEL = "stats_model";
// public static final String UPDATE_MEASURE_PUBLICLY_FUNDED_CLASS_ID = "measure:publicly_funded";
// public static final String UPDATE_KEY_PUBLICLY_FUNDED = "publicly_funded";
// public static final String FOS_CLASS_ID = "FOS";
// public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification";
//
// public static final String SDG_CLASS_ID = "SDG";
// public static final String SDG_CLASS_NAME = "Sustainable Development Goals";
public static final String NULL = "NULL";
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private Constants() {
}
public static Boolean isSparkSessionManaged(ArgumentApplicationParser parser) {
return Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
}
public static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
public static Subject getSubject(String sbj, String classid, String classname,
String diqualifierclassid) {
if (sbj == null || sbj.equals(NULL))
return null;
Subject s = new Subject();
s.setValue(sbj);
s
.setQualifier(
OafMapperUtils
.qualifier(
classid,
classname,
ModelConstants.DNET_SUBJECT_TYPOLOGIES,
ModelConstants.DNET_SUBJECT_TYPOLOGIES));
s
.setDataInfo(
OafMapperUtils
.dataInfo(
false,
UPDATE_DATA_INFO_TYPE,
true,
false,
OafMapperUtils
.qualifier(
diqualifierclassid,
UPDATE_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
""));
return s;
}
}

View File

@ -1,7 +1,6 @@
package eu.dnetlib.dhp.actionmanager.stats_actionsets;
import static eu.dnetlib.dhp.actionmanager.Constants.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.io.Serializable;
@ -12,14 +11,22 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -65,6 +72,9 @@ public class StatsAtomicActionsJob implements Serializable {
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
conf.set("spark.speculation", "false");
conf.set("spark.hadoop.mapreduce.map.speculative", "false");
conf.set("spark.hadoop.mapreduce.reduce.speculative", "false");
final String dbname = parser.get("statsDB");
@ -75,75 +85,26 @@ public class StatsAtomicActionsJob implements Serializable {
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
prepareGreenData(dbname, spark, workingPath + "/greenOADB", "indi_pub_green_oa", "id");
prepareDiamondData(dbname, spark, workingPath + "/diamondOADΒ", "indi_pub_diamond", "id");
preparePubliclyFundedData(
dbname, spark, workingPath + "/publiclyFundedDΒ", "indi_funded_result_with_fundref", "id");
prepareOAColourData(dbname, spark, workingPath + "/oacolourDB", "", "id");
prepareResultEnhancement(dbname, spark, workingPath + "/resultEnhancements", "id");
writeActionSet(spark, workingPath, outputPath);
});
}
private static void prepareGreenData(String dbname, SparkSession spark, String workingPath, String tableName,
private static void prepareResultEnhancement(String dbname, SparkSession spark, String workingPath,
String resultAttributeName) {
spark
.sql(
String
.format(
"select %s as id, green_oa as green_oa " +
"from %s.%s",
resultAttributeName, dbname, tableName))
.as(Encoders.bean(StatsGreenOAModel.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath);
}
private static void prepareDiamondData(String dbname, SparkSession spark, String workingPath, String tableName,
String resultAttributeName) {
spark
.sql(
String
.format(
"select %s as id, in_diamond_journal as in_diamond_journal " +
"from %s.%s",
resultAttributeName, dbname, tableName))
.as(Encoders.bean(StatsDiamondOAModel.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath);
}
private static void preparePubliclyFundedData(String dbname, SparkSession spark, String workingPath,
String tableName,
String resultAttributeName) {
spark
.sql(
String
.format(
"select %s as id, fundref as publicly_funded " +
"from %s.%s",
resultAttributeName, dbname, tableName))
.as(Encoders.bean(StatsPubliclyFundedModel.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath);
}
private static void prepareOAColourData(String dbname, SparkSession spark, String workingPath, String tableName,
String resultAttributeName) {
spark
.sql(
String
.format(
"select b.%s as id, is_gold, is_bronze_oa, is_hybrid from %s.indi_pub_bronze_oa b " +
"select b.%s as id, is_gold, is_bronze_oa, is_hybrid,green_oa, in_diamond_journal,f.fundref as publicly_funded "
+ "from %s.indi_pub_bronze_oa b " +
"left outer join %s.indi_pub_gold_oa g on g.id=b.id " +
"left outer join %s.indi_pub_hybrid h on b.id=h.id",
resultAttributeName, dbname, dbname, dbname))
.as(Encoders.bean(StatsOAColourModel.class))
"left outer join %s.indi_pub_hybrid h on b.id=h.id " +
"left outer join %s.indi_pub_green_oa gr on b.id=gr.id " +
"left outer join %s.indi_pub_diamond d on b.id=d.id " +
"left outer join %s.indi_funded_result_with_fundref f on b.id=f.id ",
resultAttributeName, dbname, dbname, dbname, dbname, dbname, dbname))
.as(Encoders.bean(StatsResultEnhancementModel.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
@ -152,125 +113,39 @@ public class StatsAtomicActionsJob implements Serializable {
public static void writeActionSet(SparkSession spark, String inputPath, String outputPath) {
getFinalIndicatorsGreenResult(spark, inputPath + "/greenOADB")
getResultEnhancements(spark, inputPath + "/resultEnhancements")
.toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p))
.union(
getFinalIndicatorsDiamondResult(spark, inputPath + "/diamondOADΒ")
.toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p)))
.union(
getFinalIndicatorsPubliclyFundedResult(spark, inputPath + "/publiclyFundedDΒ")
.toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p)))
.union(
getFinalIndicatorsOAColourResult(spark, inputPath + "/oacolourDB")
.toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p)))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
.saveAsHadoopFile(
outputPath,
Text.class,
Text.class,
SequenceFileOutputFormat.class,
GzipCodec.class);
}
public static Measure newMeasureInstance(String id) {
Measure m = new Measure();
m.setId(id);
m.setUnit(new ArrayList<>());
return m;
}
private static Dataset<Result> getResultEnhancements(SparkSession spark, String inputPath) {
private static Dataset<Result> getFinalIndicatorsGreenResult(SparkSession spark, String inputPath) {
return readPath(spark, inputPath, StatsGreenOAModel.class)
.map((MapFunction<StatsGreenOAModel, Result>) usm -> {
return readPath(spark, inputPath, StatsResultEnhancementModel.class)
.map((MapFunction<StatsResultEnhancementModel, Result>) usm -> {
Result r = new Result();
r.setId("50|" + usm.getId());
r.setMeasures(getMeasure(usm.isGreen_oa(), "green_oa"));
r.setIsInDiamondJournal(usm.isIn_diamond_journal());
r.setIsGreen(usm.isGreen_oa());
r.setPubliclyFunded(usm.isPublicly_funded());
if (usm.isIs_bronze_oa())
r.setOpenAccessColor(OpenAccessColor.bronze);
else if (usm.isIs_gold())
r.setOpenAccessColor(OpenAccessColor.bronze);
else if (usm.isIs_gold())
dimitris.pierrakos marked this conversation as resolved Outdated

Please check the chain of the conditions, it seems the usm.isIs_gold() is checked twice, leading to different then blocks, one sets the OpenAccess color to bronze, the other to gold, which seems counter intuitive. I assume it might be not what you were expecting to write.

Please check the chain of the conditions, it seems the `usm.isIs_gold()` is checked twice, leading to different then blocks, one sets the OpenAccess color to bronze, the other to gold, which seems counter intuitive. I assume it might be not what you were expecting to write.
r.setOpenAccessColor(OpenAccessColor.gold);
return r;
}, Encoders.bean(Result.class));
}
private static Dataset<Result> getFinalIndicatorsDiamondResult(SparkSession spark, String inputPath) {
return readPath(spark, inputPath, StatsDiamondOAModel.class)
.map((MapFunction<StatsDiamondOAModel, Result>) usm -> {
Result r = new Result();
r.setId("50|" + usm.getId());
r.setMeasures(getMeasure(usm.isIn_diamond_journal(), "in_diamond_journal"));
return r;
}, Encoders.bean(Result.class));
}
private static Dataset<Result> getFinalIndicatorsPubliclyFundedResult(SparkSession spark, String inputPath) {
return readPath(spark, inputPath, StatsPubliclyFundedModel.class)
.map((MapFunction<StatsPubliclyFundedModel, Result>) usm -> {
Result r = new Result();
r.setId("50|" + usm.getId());
r.setMeasures(getMeasure(usm.isPublicly_funded(), "publicly_funded"));
return r;
}, Encoders.bean(Result.class));
}
private static Dataset<Result> getFinalIndicatorsOAColourResult(SparkSession spark, String inputPath) {
return readPath(spark, inputPath, StatsOAColourModel.class)
.map((MapFunction<StatsOAColourModel, Result>) usm -> {
Result r = new Result();
r.setId("50|" + usm.getId());
r.setMeasures(getMeasureOAColour(usm.isIs_gold(), usm.isIs_bronze_oa(), usm.isIs_hybrid()));
return r;
}, Encoders.bean(Result.class));
}
private static List<Measure> getMeasure(Boolean is_model_oa, String model_type) {
DataInfo dataInfo = OafMapperUtils
.dataInfo(
false,
UPDATE_DATA_INFO_TYPE,
true,
false,
OafMapperUtils
.qualifier(
UPDATE_MEASURE_STATS_MODEL_CLASS_ID,
UPDATE_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
"");
return Arrays
.asList(
OafMapperUtils
.newMeasureInstance(model_type, String.valueOf(is_model_oa), UPDATE_KEY_STATS_MODEL, dataInfo));
}
private static List<Measure> getMeasureOAColour(Boolean is_gold, Boolean is_bronze_oa, Boolean is_hybrid) {
DataInfo dataInfo = OafMapperUtils
.dataInfo(
false,
UPDATE_DATA_INFO_TYPE,
true,
false,
OafMapperUtils
.qualifier(
UPDATE_MEASURE_STATS_MODEL_CLASS_ID,
UPDATE_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
"");
return Arrays
.asList(
OafMapperUtils
.newMeasureInstance("is_gold", String.valueOf(is_gold), UPDATE_KEY_STATS_MODEL, dataInfo),
OafMapperUtils
.newMeasureInstance("is_bronze_oa", String.valueOf(is_bronze_oa), UPDATE_KEY_STATS_MODEL, dataInfo),
OafMapperUtils
.newMeasureInstance("is_hybrid", String.valueOf(is_hybrid), UPDATE_KEY_STATS_MODEL, dataInfo));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
@ -282,5 +157,4 @@ public class StatsAtomicActionsJob implements Serializable {
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
dimitris.pierrakos marked this conversation as resolved
Review

Gitea warns me about the character B used in the string "/diamondOADΒ"

B [U+0392] is confusable with B [U+0042]

Any chance that it is a typo from a different keyboard setting?

Gitea warns me about the character `B` used in the string `"/diamondOADΒ"` ``` B [U+0392] is confusable with B [U+0042] ``` Any chance that it is a typo from a different keyboard setting?
}

View File

@ -1,29 +0,0 @@
package eu.dnetlib.dhp.actionmanager.stats_actionsets;
import java.io.Serializable;
/**
* @author dimitris.pierrakos
* @Date 30/10/23
*/
public class StatsDiamondOAModel implements Serializable {
private String id;
private boolean in_diamond_journal;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public boolean isIn_diamond_journal() {
return in_diamond_journal;
}
public void setIn_diamond_journal(boolean in_diamond_journal) {
this.in_diamond_journal = in_diamond_journal;
}
}

View File

@ -1,29 +0,0 @@
package eu.dnetlib.dhp.actionmanager.stats_actionsets;
import java.io.Serializable;
/**
* @author dimitris.pierrakos
* @Date 30/10/23
*/
public class StatsGreenOAModel implements Serializable {
private String id;
private boolean green_oa;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public boolean isGreen_oa() {
return green_oa;
}
public void setGreen_oa(boolean green_oa) {
this.green_oa = green_oa;
}
}

View File

@ -1,47 +0,0 @@
package eu.dnetlib.dhp.actionmanager.stats_actionsets;
import java.io.Serializable;
/**
* @author dimitris.pierrakos
* @Date 30/10/23
*/
public class StatsOAColourModel implements Serializable {
private String id;
private boolean is_gold;
private boolean is_bronze_oa;
private boolean is_hybrid;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public boolean isIs_gold() {
return is_gold;
}
public void setIs_gold(boolean is_gold) {
this.is_gold = is_gold;
}
public boolean isIs_bronze_oa() {
return is_bronze_oa;
}
public void setIs_bronze_oa(boolean is_bronze_oa) {
this.is_bronze_oa = is_bronze_oa;
}
public boolean isIs_hybrid() {
return is_hybrid;
}
public void setIs_hybrid(boolean is_hybrid) {
this.is_hybrid = is_hybrid;
}
}

View File

@ -1,29 +0,0 @@
package eu.dnetlib.dhp.actionmanager.stats_actionsets;
import java.io.Serializable;
/**
* @author dimitris.pierrakos
* @Date 30/10/23
*/
public class StatsPubliclyFundedModel implements Serializable {
private String id;
private boolean publicly_funded;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public boolean isPublicly_funded() {
return publicly_funded;
}
public void setPublicly_funded(boolean publicly_funded) {
this.publicly_funded = publicly_funded;
}
}

View File

@ -0,0 +1,76 @@
package eu.dnetlib.dhp.actionmanager.stats_actionsets;
import java.io.Serializable;
import eu.dnetlib.dhp.schema.oaf.*;
/**
* @author dimitris.pierrakos
* @Date 30/10/23
*/
public class StatsResultEnhancementModel implements Serializable {
private String id;
private Boolean is_gold;
private Boolean is_bronze_oa;
private Boolean is_hybrid;
private boolean in_diamond_journal;
private boolean green_oa;
private boolean publicly_funded;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Boolean isIs_gold() {
return is_gold;
}
public void setIs_gold(Boolean is_gold) {
this.is_gold = is_gold;
}
public Boolean isIs_bronze_oa() {
return is_bronze_oa;
}
public void setIs_bronze_oa(Boolean is_bronze_oa) {
this.is_bronze_oa = is_bronze_oa;
}
public Boolean isIs_hybrid() {
return is_hybrid;
}
public void setIs_hybrid(Boolean is_hybrid) {
this.is_hybrid = is_hybrid;
}
public boolean isIn_diamond_journal() {
return in_diamond_journal;
}
public void setIn_diamond_journal(boolean in_diamond_journal) {
this.in_diamond_journal = in_diamond_journal;
}
public boolean isGreen_oa() {
return green_oa;
}
public void setGreen_oa(boolean green_oa) {
this.green_oa = green_oa;
}
public boolean isPublicly_funded() {
return publicly_funded;
}
public void setPublicly_funded(boolean publicly_funded) {
this.publicly_funded = publicly_funded;
}
}

View File

@ -68,39 +68,11 @@
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="atomicactions">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Produces the atomic action with the usage stats count for results</name>
<!-- <class>eu.dnetlib.dhp.actionmanager.stats_actionsets.SparkAtomicActionUsageJob</class>-->
<class>eu.dnetlib.dhp.actionmanager.stats_actionsets.SparkAtomicActionGreenOAJob</class>
<jar>dhp-stats-actionsets-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--hive_metastore_uris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--statsDB</arg><arg>${statsDB}</arg>
<arg>--workingPath</arg><arg>${workingDir}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="atomicactionsStats">
dimitris.pierrakos marked this conversation as resolved
Review

If I well understand from the spark action name, this action would produce an actionset contaning the usage stats metrics (views / downloads). However, an oozie workflow responsible for exporting such information is already available under

dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml

Why would we need to duplicate it? Can you elaborate?

Furthermore, I see the tag points to eu.dnetlib.dhp.actionmanager.stats_actionsets.SparkAtomicActionGreenOAJob which does not exist in the classpath.

If I well understand from the spark action name, this action would produce an actionset contaning the usage stats metrics (views / downloads). However, an oozie workflow responsible for exporting such information is already available under ``` dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml ``` Why would we need to duplicate it? Can you elaborate? Furthermore, I see the <class> tag points to `eu.dnetlib.dhp.actionmanager.stats_actionsets.SparkAtomicActionGreenOAJob` which does not exist in the classpath.
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Produces the atomic action with the stats green_oa for results</name>
<!-- <class>eu.dnetlib.dhp.actionmanager.stats_actionsets.SparkAtomicActionUsageJob</class>-->
<class>eu.dnetlib.dhp.actionmanager.stats_actionsets.StatsAtomicActionsJob</class>
<jar>dhp-stats-actionsets-${projectVersion}.jar</jar>
<spark-opts>