From 89c4dfbaf455d9621e76d0eebf2ca6ea7c718686 Mon Sep 17 00:00:00 2001 From: dimitrispie Date: Tue, 24 Oct 2023 09:48:23 +0300 Subject: [PATCH] StatsDB workflow to export actionsets about OA routes, diamond, and publicly-funded A new oozie workflow capable to read from the stats db to produce a new actionSet for updating results with: - green_oa ={true, false} - openAccesColor = {gold, hybrid, bronze} - in_diamond_journal={true, false} - publicly_funded={true, false} Inputs: - outputPath - statsDB --- dhp-workflows/dhp-stats-actionsets/pom.xml | 114 +++++++ .../dnetlib/dhp/actionmanager/Constants.java | 102 +++++++ .../StatsAtomicActionsJob.java | 286 ++++++++++++++++++ .../stats_actionsets/StatsDiamondOAModel.java | 29 ++ .../stats_actionsets/StatsGreenOAModel.java | 29 ++ .../stats_actionsets/StatsOAColourModel.java | 47 +++ .../StatsPubliclyFundedModel.java | 29 ++ .../input_actionset_parameter.json | 32 ++ .../oozie_app/config-default.xml | 30 ++ .../stats_actionsets/oozie_app/workflow.xml | 125 ++++++++ .../src/main/resources/log4j.properties | 12 + 11 files changed, 835 insertions(+) create mode 100644 dhp-workflows/dhp-stats-actionsets/pom.xml create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsDiamondOAModel.java create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsGreenOAModel.java create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsOAColourModel.java create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsPubliclyFundedModel.java create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/resources/log4j.properties diff --git a/dhp-workflows/dhp-stats-actionsets/pom.xml b/dhp-workflows/dhp-stats-actionsets/pom.xml new file mode 100644 index 000000000..670fd4a5e --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/pom.xml @@ -0,0 +1,114 @@ + + + 4.0.0 + + eu.dnetlib.dhp + dhp-workflows + 1.2.5-SNAPSHOT + + dhp-stats-actionsets + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + org.apache.httpcomponents + httpclient + + + + org.apache.spark + spark-core_2.11 + + + org.apache.spark + spark-sql_2.11 + + + + eu.dnetlib.dhp + dhp-common + ${project.version} + + + + net.sf.saxon + Saxon-HE + + + dom4j + dom4j + + + + xml-apis + xml-apis + + + + jaxen + jaxen + + + + org.json + json + + + + + org.apache.poi + poi-ooxml + + + + + org.apache.commons + commons-compress + + + + org.mongodb + mongo-java-driver + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java new file mode 100644 index 000000000..0df34364d --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java @@ -0,0 +1,102 @@ + +package eu.dnetlib.dhp.actionmanager; + +import java.util.Optional; + +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.schema.oaf.Subject; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; + +public class Constants { + + public static final String DOI = "doi"; + public static final String DOI_CLASSNAME = "Digital Object Identifier"; + + public static final String DEFAULT_DELIMITER = ","; + public static final String DEFAULT_FOS_DELIMITER = "\t"; + + public static final String UPDATE_DATA_INFO_TYPE = "update"; +// public static final String UPDATE_SUBJECT_FOS_CLASS_ID = "subject:fos"; + public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE"; +// public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip"; +// public static final String UPDATE_SUBJECT_SDG_CLASS_ID = "subject:sdg"; +// public static final String UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID = "measure:usage_counts"; +// public static final String UPDATE_KEY_USAGE_COUNTS = "count"; + + public static final String UPDATE_MEASURE_STATS_MODEL_CLASS_ID = "measure:stats_model"; + public static final String UPDATE_KEY_STATS_MODEL = "stats_model"; + +// public static final String UPDATE_MEASURE_PUBLICLY_FUNDED_CLASS_ID = "measure:publicly_funded"; +// public static final String UPDATE_KEY_PUBLICLY_FUNDED = "publicly_funded"; + +// public static final String FOS_CLASS_ID = "FOS"; +// public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification"; +// +// public static final String SDG_CLASS_ID = "SDG"; +// public static final String SDG_CLASS_NAME = "Sustainable Development Goals"; + + public static final String NULL = "NULL"; + + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private Constants() { + } + + public static Boolean isSparkSessionManaged(ArgumentApplicationParser parser) { + return Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + } + + public static Dataset readPath( + SparkSession spark, String inputPath, Class clazz) { + return spark + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); + } + + public static Subject getSubject(String sbj, String classid, String classname, + String diqualifierclassid) { + if (sbj == null || sbj.equals(NULL)) + return null; + Subject s = new Subject(); + s.setValue(sbj); + s + .setQualifier( + OafMapperUtils + .qualifier( + classid, + classname, + ModelConstants.DNET_SUBJECT_TYPOLOGIES, + ModelConstants.DNET_SUBJECT_TYPOLOGIES)); + s + .setDataInfo( + OafMapperUtils + .dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils + .qualifier( + diqualifierclassid, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "")); + + return s; + + } +} diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java new file mode 100644 index 000000000..f6e2cfe01 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java @@ -0,0 +1,286 @@ + +package eu.dnetlib.dhp.actionmanager.stats_actionsets; + +import static eu.dnetlib.dhp.actionmanager.Constants.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import scala.Tuple2; + +/** + * created the Atomic Action for each type of results + */ +public class StatsAtomicActionsJob implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(StatsAtomicActionsJob.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + StatsAtomicActionsJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + + final String dbname = parser.get("statsDB"); + + final String workingPath = parser.get("workingPath"); + + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + prepareGreenData(dbname, spark, workingPath + "/greenOADB", "indi_pub_green_oa", "id"); + prepareDiamondData(dbname, spark, workingPath + "/diamondOADΒ", "indi_pub_diamond", "id"); + preparePubliclyFundedData( + dbname, spark, workingPath + "/publiclyFundedDΒ", "indi_funded_result_with_fundref", "id"); + prepareOAColourData(dbname, spark, workingPath + "/oacolourDB", "", "id"); + writeActionSet(spark, workingPath, outputPath); + }); + } + + private static void prepareGreenData(String dbname, SparkSession spark, String workingPath, String tableName, + String resultAttributeName) { + spark + .sql( + String + .format( + "select %s as id, green_oa as green_oa " + + "from %s.%s", + resultAttributeName, dbname, tableName)) + .as(Encoders.bean(StatsGreenOAModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); + } + + private static void prepareDiamondData(String dbname, SparkSession spark, String workingPath, String tableName, + String resultAttributeName) { + spark + .sql( + String + .format( + "select %s as id, in_diamond_journal as in_diamond_journal " + + "from %s.%s", + resultAttributeName, dbname, tableName)) + .as(Encoders.bean(StatsDiamondOAModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); + } + + private static void preparePubliclyFundedData(String dbname, SparkSession spark, String workingPath, + String tableName, + String resultAttributeName) { + spark + .sql( + String + .format( + "select %s as id, fundref as publicly_funded " + + "from %s.%s", + resultAttributeName, dbname, tableName)) + .as(Encoders.bean(StatsPubliclyFundedModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); + } + + private static void prepareOAColourData(String dbname, SparkSession spark, String workingPath, String tableName, + String resultAttributeName) { + spark + .sql( + String + .format( + "select b.%s as id, is_gold, is_bronze_oa, is_hybrid from %s.indi_pub_bronze_oa b " + + "left outer join %s.indi_pub_gold_oa g on g.id=b.id " + + "left outer join %s.indi_pub_hybrid h on b.id=h.id", + resultAttributeName, dbname, dbname, dbname)) + .as(Encoders.bean(StatsOAColourModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); + } + + public static void writeActionSet(SparkSession spark, String inputPath, String outputPath) { + + getFinalIndicatorsGreenResult(spark, inputPath + "/greenOADB") + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .union( + getFinalIndicatorsDiamondResult(spark, inputPath + "/diamondOADΒ") + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p))) + .union( + getFinalIndicatorsPubliclyFundedResult(spark, inputPath + "/publiclyFundedDΒ") + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p))) + .union( + getFinalIndicatorsOAColourResult(spark, inputPath + "/oacolourDB") + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p))) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + } + + public static Measure newMeasureInstance(String id) { + Measure m = new Measure(); + m.setId(id); + m.setUnit(new ArrayList<>()); + return m; + } + + private static Dataset getFinalIndicatorsGreenResult(SparkSession spark, String inputPath) { + + return readPath(spark, inputPath, StatsGreenOAModel.class) + .map((MapFunction) usm -> { + Result r = new Result(); + r.setId("50|" + usm.getId()); + r.setMeasures(getMeasure(usm.isGreen_oa(), "green_oa")); + return r; + }, Encoders.bean(Result.class)); + } + + private static Dataset getFinalIndicatorsDiamondResult(SparkSession spark, String inputPath) { + + return readPath(spark, inputPath, StatsDiamondOAModel.class) + .map((MapFunction) usm -> { + Result r = new Result(); + r.setId("50|" + usm.getId()); + r.setMeasures(getMeasure(usm.isIn_diamond_journal(), "in_diamond_journal")); + return r; + }, Encoders.bean(Result.class)); + } + + private static Dataset getFinalIndicatorsPubliclyFundedResult(SparkSession spark, String inputPath) { + + return readPath(spark, inputPath, StatsPubliclyFundedModel.class) + .map((MapFunction) usm -> { + Result r = new Result(); + r.setId("50|" + usm.getId()); + r.setMeasures(getMeasure(usm.isPublicly_funded(), "publicly_funded")); + return r; + }, Encoders.bean(Result.class)); + } + + private static Dataset getFinalIndicatorsOAColourResult(SparkSession spark, String inputPath) { + + return readPath(spark, inputPath, StatsOAColourModel.class) + .map((MapFunction) usm -> { + Result r = new Result(); + r.setId("50|" + usm.getId()); + r.setMeasures(getMeasureOAColour(usm.isIs_gold(), usm.isIs_bronze_oa(), usm.isIs_hybrid())); + return r; + }, Encoders.bean(Result.class)); + } + + private static List getMeasure(Boolean is_model_oa, String model_type) { + DataInfo dataInfo = OafMapperUtils + .dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils + .qualifier( + UPDATE_MEASURE_STATS_MODEL_CLASS_ID, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + ""); + + return Arrays + .asList( + OafMapperUtils + .newMeasureInstance(model_type, String.valueOf(is_model_oa), UPDATE_KEY_STATS_MODEL, dataInfo)); + } + + private static List getMeasureOAColour(Boolean is_gold, Boolean is_bronze_oa, Boolean is_hybrid) { + DataInfo dataInfo = OafMapperUtils + .dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils + .qualifier( + UPDATE_MEASURE_STATS_MODEL_CLASS_ID, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + ""); + + return Arrays + .asList( + OafMapperUtils + .newMeasureInstance("is_gold", String.valueOf(is_gold), UPDATE_KEY_STATS_MODEL, dataInfo), + OafMapperUtils + .newMeasureInstance("is_bronze_oa", String.valueOf(is_bronze_oa), UPDATE_KEY_STATS_MODEL, dataInfo), + OafMapperUtils + .newMeasureInstance("is_hybrid", String.valueOf(is_hybrid), UPDATE_KEY_STATS_MODEL, dataInfo)); + + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + + public static Dataset readPath( + SparkSession spark, String inputPath, Class clazz) { + return spark + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); + } + +} diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsDiamondOAModel.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsDiamondOAModel.java new file mode 100644 index 000000000..340790fa8 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsDiamondOAModel.java @@ -0,0 +1,29 @@ + +package eu.dnetlib.dhp.actionmanager.stats_actionsets; + +import java.io.Serializable; + +/** + * @author dimitris.pierrakos + * @Date 30/10/23 + */ +public class StatsDiamondOAModel implements Serializable { + private String id; + private boolean in_diamond_journal; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public boolean isIn_diamond_journal() { + return in_diamond_journal; + } + + public void setIn_diamond_journal(boolean in_diamond_journal) { + this.in_diamond_journal = in_diamond_journal; + } +} diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsGreenOAModel.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsGreenOAModel.java new file mode 100644 index 000000000..3d59bdf95 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsGreenOAModel.java @@ -0,0 +1,29 @@ + +package eu.dnetlib.dhp.actionmanager.stats_actionsets; + +import java.io.Serializable; + +/** + * @author dimitris.pierrakos + * @Date 30/10/23 + */ +public class StatsGreenOAModel implements Serializable { + private String id; + private boolean green_oa; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public boolean isGreen_oa() { + return green_oa; + } + + public void setGreen_oa(boolean green_oa) { + this.green_oa = green_oa; + } +} diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsOAColourModel.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsOAColourModel.java new file mode 100644 index 000000000..a83eb4f60 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsOAColourModel.java @@ -0,0 +1,47 @@ + +package eu.dnetlib.dhp.actionmanager.stats_actionsets; + +import java.io.Serializable; + +/** + * @author dimitris.pierrakos + * @Date 30/10/23 + */ +public class StatsOAColourModel implements Serializable { + private String id; + private boolean is_gold; + private boolean is_bronze_oa; + private boolean is_hybrid; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public boolean isIs_gold() { + return is_gold; + } + + public void setIs_gold(boolean is_gold) { + this.is_gold = is_gold; + } + + public boolean isIs_bronze_oa() { + return is_bronze_oa; + } + + public void setIs_bronze_oa(boolean is_bronze_oa) { + this.is_bronze_oa = is_bronze_oa; + } + + public boolean isIs_hybrid() { + return is_hybrid; + } + + public void setIs_hybrid(boolean is_hybrid) { + this.is_hybrid = is_hybrid; + } +} diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsPubliclyFundedModel.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsPubliclyFundedModel.java new file mode 100644 index 000000000..6bdc78220 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsPubliclyFundedModel.java @@ -0,0 +1,29 @@ + +package eu.dnetlib.dhp.actionmanager.stats_actionsets; + +import java.io.Serializable; + +/** + * @author dimitris.pierrakos + * @Date 30/10/23 + */ +public class StatsPubliclyFundedModel implements Serializable { + private String id; + private boolean publicly_funded; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public boolean isPublicly_funded() { + return publicly_funded; + } + + public void setPublicly_funded(boolean publicly_funded) { + this.publicly_funded = publicly_funded; + } +} diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json new file mode 100644 index 000000000..bb24d4c99 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "hmu", + "paramLongName": "hive_metastore_uris", + "paramDescription": "the URI for the hive metastore", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "outputPath", + "paramDescription": "the path of the new ActionSet", + "paramRequired": true + }, + { + "paramName": "sdb", + "paramLongName": "statsDB", + "paramDescription": "the name of the stats db to be used", + "paramRequired": true + }, + { + "paramName": "wp", + "paramLongName": "workingPath", + "paramDescription": "the workingPath where to save the content of the usage_stats table", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/config-default.xml b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/config-default.xml new file mode 100644 index 000000000..d262cb6e0 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/config-default.xml @@ -0,0 +1,30 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + hiveDbName + openaire + + + oozie.launcher.mapreduce.user.classpath.first + true + + diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/workflow.xml b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/workflow.xml new file mode 100644 index 000000000..1aa4e1050 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/workflow.xml @@ -0,0 +1,125 @@ + + + + outputPath + the path where to store the actionset + + + statsDB + the name of the stats db to be used + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Produces the atomic action with the usage stats count for results + + eu.dnetlib.dhp.actionmanager.stats_actionsets.SparkAtomicActionGreenOAJob + dhp-stats-actionsets-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --hive_metastore_uris${hiveMetastoreUris} + --outputPath${outputPath} + --statsDB${statsDB} + --workingPath${workingDir} + + + + + + + + yarn + cluster + Produces the atomic action with the stats green_oa for results + + eu.dnetlib.dhp.actionmanager.stats_actionsets.StatsAtomicActionsJob + dhp-stats-actionsets-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --hive_metastore_uris${hiveMetastoreUris} + --outputPath${outputPath} + --statsDB${statsDB} + --workingPath${workingDir} + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/resources/log4j.properties b/dhp-workflows/dhp-stats-actionsets/src/main/resources/log4j.properties new file mode 100644 index 000000000..81458d1f7 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/resources/log4j.properties @@ -0,0 +1,12 @@ +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=INFO, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +log4j.logger.org.apache.spark=FATAL +log4j.logger.org.spark_project=FATAL