From 89c4dfbaf455d9621e76d0eebf2ca6ea7c718686 Mon Sep 17 00:00:00 2001 From: dimitrispie Date: Tue, 24 Oct 2023 09:48:23 +0300 Subject: [PATCH 1/4] StatsDB workflow to export actionsets about OA routes, diamond, and publicly-funded A new oozie workflow capable to read from the stats db to produce a new actionSet for updating results with: - green_oa ={true, false} - openAccesColor = {gold, hybrid, bronze} - in_diamond_journal={true, false} - publicly_funded={true, false} Inputs: - outputPath - statsDB --- dhp-workflows/dhp-stats-actionsets/pom.xml | 114 +++++++ .../dnetlib/dhp/actionmanager/Constants.java | 102 +++++++ .../StatsAtomicActionsJob.java | 286 ++++++++++++++++++ .../stats_actionsets/StatsDiamondOAModel.java | 29 ++ .../stats_actionsets/StatsGreenOAModel.java | 29 ++ .../stats_actionsets/StatsOAColourModel.java | 47 +++ .../StatsPubliclyFundedModel.java | 29 ++ .../input_actionset_parameter.json | 32 ++ .../oozie_app/config-default.xml | 30 ++ .../stats_actionsets/oozie_app/workflow.xml | 125 ++++++++ .../src/main/resources/log4j.properties | 12 + 11 files changed, 835 insertions(+) create mode 100644 dhp-workflows/dhp-stats-actionsets/pom.xml create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsDiamondOAModel.java create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsGreenOAModel.java create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsOAColourModel.java create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsPubliclyFundedModel.java create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/resources/log4j.properties diff --git a/dhp-workflows/dhp-stats-actionsets/pom.xml b/dhp-workflows/dhp-stats-actionsets/pom.xml new file mode 100644 index 000000000..670fd4a5e --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/pom.xml @@ -0,0 +1,114 @@ + + + 4.0.0 + + eu.dnetlib.dhp + dhp-workflows + 1.2.5-SNAPSHOT + + dhp-stats-actionsets + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + org.apache.httpcomponents + httpclient + + + + org.apache.spark + spark-core_2.11 + + + org.apache.spark + spark-sql_2.11 + + + + eu.dnetlib.dhp + dhp-common + ${project.version} + + + + net.sf.saxon + Saxon-HE + + + dom4j + dom4j + + + + xml-apis + xml-apis + + + + jaxen + jaxen + + + + org.json + json + + + + + org.apache.poi + poi-ooxml + + + + + org.apache.commons + commons-compress + + + + org.mongodb + mongo-java-driver + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java new file mode 100644 index 000000000..0df34364d --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java @@ -0,0 +1,102 @@ + +package eu.dnetlib.dhp.actionmanager; + +import java.util.Optional; + +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.schema.oaf.Subject; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; + +public class Constants { + + public static final String DOI = "doi"; + public static final String DOI_CLASSNAME = "Digital Object Identifier"; + + public static final String DEFAULT_DELIMITER = ","; + public static final String DEFAULT_FOS_DELIMITER = "\t"; + + public static final String UPDATE_DATA_INFO_TYPE = "update"; +// public static final String UPDATE_SUBJECT_FOS_CLASS_ID = "subject:fos"; + public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE"; +// public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip"; +// public static final String UPDATE_SUBJECT_SDG_CLASS_ID = "subject:sdg"; +// public static final String UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID = "measure:usage_counts"; +// public static final String UPDATE_KEY_USAGE_COUNTS = "count"; + + public static final String UPDATE_MEASURE_STATS_MODEL_CLASS_ID = "measure:stats_model"; + public static final String UPDATE_KEY_STATS_MODEL = "stats_model"; + +// public static final String UPDATE_MEASURE_PUBLICLY_FUNDED_CLASS_ID = "measure:publicly_funded"; +// public static final String UPDATE_KEY_PUBLICLY_FUNDED = "publicly_funded"; + +// public static final String FOS_CLASS_ID = "FOS"; +// public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification"; +// +// public static final String SDG_CLASS_ID = "SDG"; +// public static final String SDG_CLASS_NAME = "Sustainable Development Goals"; + + public static final String NULL = "NULL"; + + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private Constants() { + } + + public static Boolean isSparkSessionManaged(ArgumentApplicationParser parser) { + return Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + } + + public static Dataset readPath( + SparkSession spark, String inputPath, Class clazz) { + return spark + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); + } + + public static Subject getSubject(String sbj, String classid, String classname, + String diqualifierclassid) { + if (sbj == null || sbj.equals(NULL)) + return null; + Subject s = new Subject(); + s.setValue(sbj); + s + .setQualifier( + OafMapperUtils + .qualifier( + classid, + classname, + ModelConstants.DNET_SUBJECT_TYPOLOGIES, + ModelConstants.DNET_SUBJECT_TYPOLOGIES)); + s + .setDataInfo( + OafMapperUtils + .dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils + .qualifier( + diqualifierclassid, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "")); + + return s; + + } +} diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java new file mode 100644 index 000000000..f6e2cfe01 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java @@ -0,0 +1,286 @@ + +package eu.dnetlib.dhp.actionmanager.stats_actionsets; + +import static eu.dnetlib.dhp.actionmanager.Constants.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import scala.Tuple2; + +/** + * created the Atomic Action for each type of results + */ +public class StatsAtomicActionsJob implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(StatsAtomicActionsJob.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + StatsAtomicActionsJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + + final String dbname = parser.get("statsDB"); + + final String workingPath = parser.get("workingPath"); + + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + prepareGreenData(dbname, spark, workingPath + "/greenOADB", "indi_pub_green_oa", "id"); + prepareDiamondData(dbname, spark, workingPath + "/diamondOADΒ", "indi_pub_diamond", "id"); + preparePubliclyFundedData( + dbname, spark, workingPath + "/publiclyFundedDΒ", "indi_funded_result_with_fundref", "id"); + prepareOAColourData(dbname, spark, workingPath + "/oacolourDB", "", "id"); + writeActionSet(spark, workingPath, outputPath); + }); + } + + private static void prepareGreenData(String dbname, SparkSession spark, String workingPath, String tableName, + String resultAttributeName) { + spark + .sql( + String + .format( + "select %s as id, green_oa as green_oa " + + "from %s.%s", + resultAttributeName, dbname, tableName)) + .as(Encoders.bean(StatsGreenOAModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); + } + + private static void prepareDiamondData(String dbname, SparkSession spark, String workingPath, String tableName, + String resultAttributeName) { + spark + .sql( + String + .format( + "select %s as id, in_diamond_journal as in_diamond_journal " + + "from %s.%s", + resultAttributeName, dbname, tableName)) + .as(Encoders.bean(StatsDiamondOAModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); + } + + private static void preparePubliclyFundedData(String dbname, SparkSession spark, String workingPath, + String tableName, + String resultAttributeName) { + spark + .sql( + String + .format( + "select %s as id, fundref as publicly_funded " + + "from %s.%s", + resultAttributeName, dbname, tableName)) + .as(Encoders.bean(StatsPubliclyFundedModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); + } + + private static void prepareOAColourData(String dbname, SparkSession spark, String workingPath, String tableName, + String resultAttributeName) { + spark + .sql( + String + .format( + "select b.%s as id, is_gold, is_bronze_oa, is_hybrid from %s.indi_pub_bronze_oa b " + + "left outer join %s.indi_pub_gold_oa g on g.id=b.id " + + "left outer join %s.indi_pub_hybrid h on b.id=h.id", + resultAttributeName, dbname, dbname, dbname)) + .as(Encoders.bean(StatsOAColourModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); + } + + public static void writeActionSet(SparkSession spark, String inputPath, String outputPath) { + + getFinalIndicatorsGreenResult(spark, inputPath + "/greenOADB") + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .union( + getFinalIndicatorsDiamondResult(spark, inputPath + "/diamondOADΒ") + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p))) + .union( + getFinalIndicatorsPubliclyFundedResult(spark, inputPath + "/publiclyFundedDΒ") + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p))) + .union( + getFinalIndicatorsOAColourResult(spark, inputPath + "/oacolourDB") + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p))) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + } + + public static Measure newMeasureInstance(String id) { + Measure m = new Measure(); + m.setId(id); + m.setUnit(new ArrayList<>()); + return m; + } + + private static Dataset getFinalIndicatorsGreenResult(SparkSession spark, String inputPath) { + + return readPath(spark, inputPath, StatsGreenOAModel.class) + .map((MapFunction) usm -> { + Result r = new Result(); + r.setId("50|" + usm.getId()); + r.setMeasures(getMeasure(usm.isGreen_oa(), "green_oa")); + return r; + }, Encoders.bean(Result.class)); + } + + private static Dataset getFinalIndicatorsDiamondResult(SparkSession spark, String inputPath) { + + return readPath(spark, inputPath, StatsDiamondOAModel.class) + .map((MapFunction) usm -> { + Result r = new Result(); + r.setId("50|" + usm.getId()); + r.setMeasures(getMeasure(usm.isIn_diamond_journal(), "in_diamond_journal")); + return r; + }, Encoders.bean(Result.class)); + } + + private static Dataset getFinalIndicatorsPubliclyFundedResult(SparkSession spark, String inputPath) { + + return readPath(spark, inputPath, StatsPubliclyFundedModel.class) + .map((MapFunction) usm -> { + Result r = new Result(); + r.setId("50|" + usm.getId()); + r.setMeasures(getMeasure(usm.isPublicly_funded(), "publicly_funded")); + return r; + }, Encoders.bean(Result.class)); + } + + private static Dataset getFinalIndicatorsOAColourResult(SparkSession spark, String inputPath) { + + return readPath(spark, inputPath, StatsOAColourModel.class) + .map((MapFunction) usm -> { + Result r = new Result(); + r.setId("50|" + usm.getId()); + r.setMeasures(getMeasureOAColour(usm.isIs_gold(), usm.isIs_bronze_oa(), usm.isIs_hybrid())); + return r; + }, Encoders.bean(Result.class)); + } + + private static List getMeasure(Boolean is_model_oa, String model_type) { + DataInfo dataInfo = OafMapperUtils + .dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils + .qualifier( + UPDATE_MEASURE_STATS_MODEL_CLASS_ID, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + ""); + + return Arrays + .asList( + OafMapperUtils + .newMeasureInstance(model_type, String.valueOf(is_model_oa), UPDATE_KEY_STATS_MODEL, dataInfo)); + } + + private static List getMeasureOAColour(Boolean is_gold, Boolean is_bronze_oa, Boolean is_hybrid) { + DataInfo dataInfo = OafMapperUtils + .dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils + .qualifier( + UPDATE_MEASURE_STATS_MODEL_CLASS_ID, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + ""); + + return Arrays + .asList( + OafMapperUtils + .newMeasureInstance("is_gold", String.valueOf(is_gold), UPDATE_KEY_STATS_MODEL, dataInfo), + OafMapperUtils + .newMeasureInstance("is_bronze_oa", String.valueOf(is_bronze_oa), UPDATE_KEY_STATS_MODEL, dataInfo), + OafMapperUtils + .newMeasureInstance("is_hybrid", String.valueOf(is_hybrid), UPDATE_KEY_STATS_MODEL, dataInfo)); + + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + + public static Dataset readPath( + SparkSession spark, String inputPath, Class clazz) { + return spark + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); + } + +} diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsDiamondOAModel.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsDiamondOAModel.java new file mode 100644 index 000000000..340790fa8 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsDiamondOAModel.java @@ -0,0 +1,29 @@ + +package eu.dnetlib.dhp.actionmanager.stats_actionsets; + +import java.io.Serializable; + +/** + * @author dimitris.pierrakos + * @Date 30/10/23 + */ +public class StatsDiamondOAModel implements Serializable { + private String id; + private boolean in_diamond_journal; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public boolean isIn_diamond_journal() { + return in_diamond_journal; + } + + public void setIn_diamond_journal(boolean in_diamond_journal) { + this.in_diamond_journal = in_diamond_journal; + } +} diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsGreenOAModel.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsGreenOAModel.java new file mode 100644 index 000000000..3d59bdf95 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsGreenOAModel.java @@ -0,0 +1,29 @@ + +package eu.dnetlib.dhp.actionmanager.stats_actionsets; + +import java.io.Serializable; + +/** + * @author dimitris.pierrakos + * @Date 30/10/23 + */ +public class StatsGreenOAModel implements Serializable { + private String id; + private boolean green_oa; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public boolean isGreen_oa() { + return green_oa; + } + + public void setGreen_oa(boolean green_oa) { + this.green_oa = green_oa; + } +} diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsOAColourModel.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsOAColourModel.java new file mode 100644 index 000000000..a83eb4f60 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsOAColourModel.java @@ -0,0 +1,47 @@ + +package eu.dnetlib.dhp.actionmanager.stats_actionsets; + +import java.io.Serializable; + +/** + * @author dimitris.pierrakos + * @Date 30/10/23 + */ +public class StatsOAColourModel implements Serializable { + private String id; + private boolean is_gold; + private boolean is_bronze_oa; + private boolean is_hybrid; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public boolean isIs_gold() { + return is_gold; + } + + public void setIs_gold(boolean is_gold) { + this.is_gold = is_gold; + } + + public boolean isIs_bronze_oa() { + return is_bronze_oa; + } + + public void setIs_bronze_oa(boolean is_bronze_oa) { + this.is_bronze_oa = is_bronze_oa; + } + + public boolean isIs_hybrid() { + return is_hybrid; + } + + public void setIs_hybrid(boolean is_hybrid) { + this.is_hybrid = is_hybrid; + } +} diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsPubliclyFundedModel.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsPubliclyFundedModel.java new file mode 100644 index 000000000..6bdc78220 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsPubliclyFundedModel.java @@ -0,0 +1,29 @@ + +package eu.dnetlib.dhp.actionmanager.stats_actionsets; + +import java.io.Serializable; + +/** + * @author dimitris.pierrakos + * @Date 30/10/23 + */ +public class StatsPubliclyFundedModel implements Serializable { + private String id; + private boolean publicly_funded; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public boolean isPublicly_funded() { + return publicly_funded; + } + + public void setPublicly_funded(boolean publicly_funded) { + this.publicly_funded = publicly_funded; + } +} diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json new file mode 100644 index 000000000..bb24d4c99 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "hmu", + "paramLongName": "hive_metastore_uris", + "paramDescription": "the URI for the hive metastore", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "outputPath", + "paramDescription": "the path of the new ActionSet", + "paramRequired": true + }, + { + "paramName": "sdb", + "paramLongName": "statsDB", + "paramDescription": "the name of the stats db to be used", + "paramRequired": true + }, + { + "paramName": "wp", + "paramLongName": "workingPath", + "paramDescription": "the workingPath where to save the content of the usage_stats table", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/config-default.xml b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/config-default.xml new file mode 100644 index 000000000..d262cb6e0 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/config-default.xml @@ -0,0 +1,30 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + hiveDbName + openaire + + + oozie.launcher.mapreduce.user.classpath.first + true + + diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/workflow.xml b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/workflow.xml new file mode 100644 index 000000000..1aa4e1050 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/workflow.xml @@ -0,0 +1,125 @@ + + + + outputPath + the path where to store the actionset + + + statsDB + the name of the stats db to be used + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Produces the atomic action with the usage stats count for results + + eu.dnetlib.dhp.actionmanager.stats_actionsets.SparkAtomicActionGreenOAJob + dhp-stats-actionsets-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --hive_metastore_uris${hiveMetastoreUris} + --outputPath${outputPath} + --statsDB${statsDB} + --workingPath${workingDir} + + + + + + + + yarn + cluster + Produces the atomic action with the stats green_oa for results + + eu.dnetlib.dhp.actionmanager.stats_actionsets.StatsAtomicActionsJob + dhp-stats-actionsets-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --hive_metastore_uris${hiveMetastoreUris} + --outputPath${outputPath} + --statsDB${statsDB} + --workingPath${workingDir} + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/resources/log4j.properties b/dhp-workflows/dhp-stats-actionsets/src/main/resources/log4j.properties new file mode 100644 index 000000000..81458d1f7 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/resources/log4j.properties @@ -0,0 +1,12 @@ +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=INFO, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +log4j.logger.org.apache.spark=FATAL +log4j.logger.org.spark_project=FATAL From d524e3086630f20e129ae339072e0198b16c7392 Mon Sep 17 00:00:00 2001 From: dimitrispie Date: Tue, 14 Nov 2023 09:46:52 +0200 Subject: [PATCH 2/4] Changes to actionsets Resolve comments from https://code-repo.d4science.org/D-Net/dnet-hadoop/pulls/355 --- dhp-workflows/dhp-stats-actionsets/pom.xml | 39 ---- .../dnetlib/dhp/actionmanager/Constants.java | 102 --------- .../StatsAtomicActionsJob.java | 206 ++++-------------- .../stats_actionsets/StatsDiamondOAModel.java | 29 --- .../stats_actionsets/StatsGreenOAModel.java | 29 --- .../stats_actionsets/StatsOAColourModel.java | 47 ---- .../StatsPubliclyFundedModel.java | 29 --- .../StatsResultEnhancementModel.java | 76 +++++++ .../stats_actionsets/oozie_app/workflow.xml | 28 --- 9 files changed, 116 insertions(+), 469 deletions(-) delete mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java delete mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsDiamondOAModel.java delete mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsGreenOAModel.java delete mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsOAColourModel.java delete mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsPubliclyFundedModel.java create mode 100644 dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsResultEnhancementModel.java diff --git a/dhp-workflows/dhp-stats-actionsets/pom.xml b/dhp-workflows/dhp-stats-actionsets/pom.xml index 670fd4a5e..3daa8f995 100644 --- a/dhp-workflows/dhp-stats-actionsets/pom.xml +++ b/dhp-workflows/dhp-stats-actionsets/pom.xml @@ -8,46 +8,7 @@ dhp-stats-actionsets - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - org.apache.httpcomponents httpclient diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java deleted file mode 100644 index 0df34364d..000000000 --- a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java +++ /dev/null @@ -1,102 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager; - -import java.util.Optional; - -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SparkSession; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -import eu.dnetlib.dhp.schema.oaf.Subject; -import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; - -public class Constants { - - public static final String DOI = "doi"; - public static final String DOI_CLASSNAME = "Digital Object Identifier"; - - public static final String DEFAULT_DELIMITER = ","; - public static final String DEFAULT_FOS_DELIMITER = "\t"; - - public static final String UPDATE_DATA_INFO_TYPE = "update"; -// public static final String UPDATE_SUBJECT_FOS_CLASS_ID = "subject:fos"; - public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE"; -// public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip"; -// public static final String UPDATE_SUBJECT_SDG_CLASS_ID = "subject:sdg"; -// public static final String UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID = "measure:usage_counts"; -// public static final String UPDATE_KEY_USAGE_COUNTS = "count"; - - public static final String UPDATE_MEASURE_STATS_MODEL_CLASS_ID = "measure:stats_model"; - public static final String UPDATE_KEY_STATS_MODEL = "stats_model"; - -// public static final String UPDATE_MEASURE_PUBLICLY_FUNDED_CLASS_ID = "measure:publicly_funded"; -// public static final String UPDATE_KEY_PUBLICLY_FUNDED = "publicly_funded"; - -// public static final String FOS_CLASS_ID = "FOS"; -// public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification"; -// -// public static final String SDG_CLASS_ID = "SDG"; -// public static final String SDG_CLASS_NAME = "Sustainable Development Goals"; - - public static final String NULL = "NULL"; - - public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - private Constants() { - } - - public static Boolean isSparkSessionManaged(ArgumentApplicationParser parser) { - return Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - } - - public static Dataset readPath( - SparkSession spark, String inputPath, Class clazz) { - return spark - .read() - .textFile(inputPath) - .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); - } - - public static Subject getSubject(String sbj, String classid, String classname, - String diqualifierclassid) { - if (sbj == null || sbj.equals(NULL)) - return null; - Subject s = new Subject(); - s.setValue(sbj); - s - .setQualifier( - OafMapperUtils - .qualifier( - classid, - classname, - ModelConstants.DNET_SUBJECT_TYPOLOGIES, - ModelConstants.DNET_SUBJECT_TYPOLOGIES)); - s - .setDataInfo( - OafMapperUtils - .dataInfo( - false, - UPDATE_DATA_INFO_TYPE, - true, - false, - OafMapperUtils - .qualifier( - diqualifierclassid, - UPDATE_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS, - ModelConstants.DNET_PROVENANCE_ACTIONS), - "")); - - return s; - - } -} diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java index f6e2cfe01..e984d68fc 100644 --- a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java +++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java @@ -1,7 +1,6 @@ package eu.dnetlib.dhp.actionmanager.stats_actionsets; -import static eu.dnetlib.dhp.actionmanager.Constants.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.io.Serializable; @@ -12,14 +11,22 @@ import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +72,9 @@ public class StatsAtomicActionsJob implements Serializable { SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + conf.set("spark.speculation", "false"); + conf.set("spark.hadoop.mapreduce.map.speculative", "false"); + conf.set("spark.hadoop.mapreduce.reduce.speculative", "false"); final String dbname = parser.get("statsDB"); @@ -75,75 +85,26 @@ public class StatsAtomicActionsJob implements Serializable { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareGreenData(dbname, spark, workingPath + "/greenOADB", "indi_pub_green_oa", "id"); - prepareDiamondData(dbname, spark, workingPath + "/diamondOADΒ", "indi_pub_diamond", "id"); - preparePubliclyFundedData( - dbname, spark, workingPath + "/publiclyFundedDΒ", "indi_funded_result_with_fundref", "id"); - prepareOAColourData(dbname, spark, workingPath + "/oacolourDB", "", "id"); + prepareResultEnhancement(dbname, spark, workingPath + "/resultEnhancements", "id"); writeActionSet(spark, workingPath, outputPath); }); } - private static void prepareGreenData(String dbname, SparkSession spark, String workingPath, String tableName, + private static void prepareResultEnhancement(String dbname, SparkSession spark, String workingPath, String resultAttributeName) { spark .sql( String .format( - "select %s as id, green_oa as green_oa " + - "from %s.%s", - resultAttributeName, dbname, tableName)) - .as(Encoders.bean(StatsGreenOAModel.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingPath); - } - - private static void prepareDiamondData(String dbname, SparkSession spark, String workingPath, String tableName, - String resultAttributeName) { - spark - .sql( - String - .format( - "select %s as id, in_diamond_journal as in_diamond_journal " + - "from %s.%s", - resultAttributeName, dbname, tableName)) - .as(Encoders.bean(StatsDiamondOAModel.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingPath); - } - - private static void preparePubliclyFundedData(String dbname, SparkSession spark, String workingPath, - String tableName, - String resultAttributeName) { - spark - .sql( - String - .format( - "select %s as id, fundref as publicly_funded " + - "from %s.%s", - resultAttributeName, dbname, tableName)) - .as(Encoders.bean(StatsPubliclyFundedModel.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingPath); - } - - private static void prepareOAColourData(String dbname, SparkSession spark, String workingPath, String tableName, - String resultAttributeName) { - spark - .sql( - String - .format( - "select b.%s as id, is_gold, is_bronze_oa, is_hybrid from %s.indi_pub_bronze_oa b " + + "select b.%s as id, is_gold, is_bronze_oa, is_hybrid,green_oa, in_diamond_journal,f.fundref as publicly_funded " + + "from %s.indi_pub_bronze_oa b " + "left outer join %s.indi_pub_gold_oa g on g.id=b.id " + - "left outer join %s.indi_pub_hybrid h on b.id=h.id", - resultAttributeName, dbname, dbname, dbname)) - .as(Encoders.bean(StatsOAColourModel.class)) + "left outer join %s.indi_pub_hybrid h on b.id=h.id " + + "left outer join %s.indi_pub_green_oa gr on b.id=gr.id " + + "left outer join %s.indi_pub_diamond d on b.id=d.id " + + "left outer join %s.indi_funded_result_with_fundref f on b.id=f.id ", + resultAttributeName, dbname, dbname, dbname, dbname, dbname, dbname)) + .as(Encoders.bean(StatsResultEnhancementModel.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") @@ -152,125 +113,39 @@ public class StatsAtomicActionsJob implements Serializable { public static void writeActionSet(SparkSession spark, String inputPath, String outputPath) { - getFinalIndicatorsGreenResult(spark, inputPath + "/greenOADB") + getResultEnhancements(spark, inputPath + "/resultEnhancements") .toJavaRDD() .map(p -> new AtomicAction(p.getClass(), p)) - .union( - getFinalIndicatorsDiamondResult(spark, inputPath + "/diamondOADΒ") - .toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p))) - .union( - getFinalIndicatorsPubliclyFundedResult(spark, inputPath + "/publiclyFundedDΒ") - .toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p))) - .union( - getFinalIndicatorsOAColourResult(spark, inputPath + "/oacolourDB") - .toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p))) .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + .saveAsHadoopFile( + outputPath, + Text.class, + Text.class, + SequenceFileOutputFormat.class, + GzipCodec.class); } - public static Measure newMeasureInstance(String id) { - Measure m = new Measure(); - m.setId(id); - m.setUnit(new ArrayList<>()); - return m; - } + private static Dataset getResultEnhancements(SparkSession spark, String inputPath) { - private static Dataset getFinalIndicatorsGreenResult(SparkSession spark, String inputPath) { - - return readPath(spark, inputPath, StatsGreenOAModel.class) - .map((MapFunction) usm -> { + return readPath(spark, inputPath, StatsResultEnhancementModel.class) + .map((MapFunction) usm -> { Result r = new Result(); r.setId("50|" + usm.getId()); - r.setMeasures(getMeasure(usm.isGreen_oa(), "green_oa")); + r.setIsInDiamondJournal(usm.isIn_diamond_journal()); + r.setIsGreen(usm.isGreen_oa()); + r.setPubliclyFunded(usm.isPublicly_funded()); + if (usm.isIs_bronze_oa()) + r.setOpenAccessColor(OpenAccessColor.bronze); + else if (usm.isIs_gold()) + r.setOpenAccessColor(OpenAccessColor.bronze); + else if (usm.isIs_gold()) + r.setOpenAccessColor(OpenAccessColor.gold); return r; }, Encoders.bean(Result.class)); } - private static Dataset getFinalIndicatorsDiamondResult(SparkSession spark, String inputPath) { - - return readPath(spark, inputPath, StatsDiamondOAModel.class) - .map((MapFunction) usm -> { - Result r = new Result(); - r.setId("50|" + usm.getId()); - r.setMeasures(getMeasure(usm.isIn_diamond_journal(), "in_diamond_journal")); - return r; - }, Encoders.bean(Result.class)); - } - - private static Dataset getFinalIndicatorsPubliclyFundedResult(SparkSession spark, String inputPath) { - - return readPath(spark, inputPath, StatsPubliclyFundedModel.class) - .map((MapFunction) usm -> { - Result r = new Result(); - r.setId("50|" + usm.getId()); - r.setMeasures(getMeasure(usm.isPublicly_funded(), "publicly_funded")); - return r; - }, Encoders.bean(Result.class)); - } - - private static Dataset getFinalIndicatorsOAColourResult(SparkSession spark, String inputPath) { - - return readPath(spark, inputPath, StatsOAColourModel.class) - .map((MapFunction) usm -> { - Result r = new Result(); - r.setId("50|" + usm.getId()); - r.setMeasures(getMeasureOAColour(usm.isIs_gold(), usm.isIs_bronze_oa(), usm.isIs_hybrid())); - return r; - }, Encoders.bean(Result.class)); - } - - private static List getMeasure(Boolean is_model_oa, String model_type) { - DataInfo dataInfo = OafMapperUtils - .dataInfo( - false, - UPDATE_DATA_INFO_TYPE, - true, - false, - OafMapperUtils - .qualifier( - UPDATE_MEASURE_STATS_MODEL_CLASS_ID, - UPDATE_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS, - ModelConstants.DNET_PROVENANCE_ACTIONS), - ""); - - return Arrays - .asList( - OafMapperUtils - .newMeasureInstance(model_type, String.valueOf(is_model_oa), UPDATE_KEY_STATS_MODEL, dataInfo)); - } - - private static List getMeasureOAColour(Boolean is_gold, Boolean is_bronze_oa, Boolean is_hybrid) { - DataInfo dataInfo = OafMapperUtils - .dataInfo( - false, - UPDATE_DATA_INFO_TYPE, - true, - false, - OafMapperUtils - .qualifier( - UPDATE_MEASURE_STATS_MODEL_CLASS_ID, - UPDATE_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS, - ModelConstants.DNET_PROVENANCE_ACTIONS), - ""); - - return Arrays - .asList( - OafMapperUtils - .newMeasureInstance("is_gold", String.valueOf(is_gold), UPDATE_KEY_STATS_MODEL, dataInfo), - OafMapperUtils - .newMeasureInstance("is_bronze_oa", String.valueOf(is_bronze_oa), UPDATE_KEY_STATS_MODEL, dataInfo), - OafMapperUtils - .newMeasureInstance("is_hybrid", String.valueOf(is_hybrid), UPDATE_KEY_STATS_MODEL, dataInfo)); - - } - private static void removeOutputDir(SparkSession spark, String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } @@ -282,5 +157,4 @@ public class StatsAtomicActionsJob implements Serializable { .textFile(inputPath) .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); } - } diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsDiamondOAModel.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsDiamondOAModel.java deleted file mode 100644 index 340790fa8..000000000 --- a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsDiamondOAModel.java +++ /dev/null @@ -1,29 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.stats_actionsets; - -import java.io.Serializable; - -/** - * @author dimitris.pierrakos - * @Date 30/10/23 - */ -public class StatsDiamondOAModel implements Serializable { - private String id; - private boolean in_diamond_journal; - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public boolean isIn_diamond_journal() { - return in_diamond_journal; - } - - public void setIn_diamond_journal(boolean in_diamond_journal) { - this.in_diamond_journal = in_diamond_journal; - } -} diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsGreenOAModel.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsGreenOAModel.java deleted file mode 100644 index 3d59bdf95..000000000 --- a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsGreenOAModel.java +++ /dev/null @@ -1,29 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.stats_actionsets; - -import java.io.Serializable; - -/** - * @author dimitris.pierrakos - * @Date 30/10/23 - */ -public class StatsGreenOAModel implements Serializable { - private String id; - private boolean green_oa; - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public boolean isGreen_oa() { - return green_oa; - } - - public void setGreen_oa(boolean green_oa) { - this.green_oa = green_oa; - } -} diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsOAColourModel.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsOAColourModel.java deleted file mode 100644 index a83eb4f60..000000000 --- a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsOAColourModel.java +++ /dev/null @@ -1,47 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.stats_actionsets; - -import java.io.Serializable; - -/** - * @author dimitris.pierrakos - * @Date 30/10/23 - */ -public class StatsOAColourModel implements Serializable { - private String id; - private boolean is_gold; - private boolean is_bronze_oa; - private boolean is_hybrid; - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public boolean isIs_gold() { - return is_gold; - } - - public void setIs_gold(boolean is_gold) { - this.is_gold = is_gold; - } - - public boolean isIs_bronze_oa() { - return is_bronze_oa; - } - - public void setIs_bronze_oa(boolean is_bronze_oa) { - this.is_bronze_oa = is_bronze_oa; - } - - public boolean isIs_hybrid() { - return is_hybrid; - } - - public void setIs_hybrid(boolean is_hybrid) { - this.is_hybrid = is_hybrid; - } -} diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsPubliclyFundedModel.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsPubliclyFundedModel.java deleted file mode 100644 index 6bdc78220..000000000 --- a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsPubliclyFundedModel.java +++ /dev/null @@ -1,29 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.stats_actionsets; - -import java.io.Serializable; - -/** - * @author dimitris.pierrakos - * @Date 30/10/23 - */ -public class StatsPubliclyFundedModel implements Serializable { - private String id; - private boolean publicly_funded; - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public boolean isPublicly_funded() { - return publicly_funded; - } - - public void setPublicly_funded(boolean publicly_funded) { - this.publicly_funded = publicly_funded; - } -} diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsResultEnhancementModel.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsResultEnhancementModel.java new file mode 100644 index 000000000..f794c5591 --- /dev/null +++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsResultEnhancementModel.java @@ -0,0 +1,76 @@ + +package eu.dnetlib.dhp.actionmanager.stats_actionsets; + +import java.io.Serializable; + +import eu.dnetlib.dhp.schema.oaf.*; + +/** + * @author dimitris.pierrakos + * @Date 30/10/23 + */ +public class StatsResultEnhancementModel implements Serializable { + private String id; + private Boolean is_gold; + private Boolean is_bronze_oa; + private Boolean is_hybrid; + private boolean in_diamond_journal; + private boolean green_oa; + private boolean publicly_funded; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public Boolean isIs_gold() { + return is_gold; + } + + public void setIs_gold(Boolean is_gold) { + this.is_gold = is_gold; + } + + public Boolean isIs_bronze_oa() { + return is_bronze_oa; + } + + public void setIs_bronze_oa(Boolean is_bronze_oa) { + this.is_bronze_oa = is_bronze_oa; + } + + public Boolean isIs_hybrid() { + return is_hybrid; + } + + public void setIs_hybrid(Boolean is_hybrid) { + this.is_hybrid = is_hybrid; + } + + public boolean isIn_diamond_journal() { + return in_diamond_journal; + } + + public void setIn_diamond_journal(boolean in_diamond_journal) { + this.in_diamond_journal = in_diamond_journal; + } + + public boolean isGreen_oa() { + return green_oa; + } + + public void setGreen_oa(boolean green_oa) { + this.green_oa = green_oa; + } + + public boolean isPublicly_funded() { + return publicly_funded; + } + + public void setPublicly_funded(boolean publicly_funded) { + this.publicly_funded = publicly_funded; + } +} diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/workflow.xml b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/workflow.xml index 1aa4e1050..79fd3f56e 100644 --- a/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-stats-actionsets/src/main/resources/eu/dnetlib/dhp/actionmanager/stats_actionsets/oozie_app/workflow.xml @@ -68,39 +68,11 @@ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - yarn - cluster - Produces the atomic action with the usage stats count for results - - eu.dnetlib.dhp.actionmanager.stats_actionsets.SparkAtomicActionGreenOAJob - dhp-stats-actionsets-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - - --hive_metastore_uris${hiveMetastoreUris} - --outputPath${outputPath} - --statsDB${statsDB} - --workingPath${workingDir} - - - - - yarn cluster Produces the atomic action with the stats green_oa for results - eu.dnetlib.dhp.actionmanager.stats_actionsets.StatsAtomicActionsJob dhp-stats-actionsets-${projectVersion}.jar From 359e81b7a60d987db509b7c53121863380c64aff Mon Sep 17 00:00:00 2001 From: dimitrispie Date: Thu, 23 Nov 2023 10:48:55 +0200 Subject: [PATCH 3/4] Update StatsAtomicActionsJob.java Bug fix for duplicate bronze checks --- .../actionmanager/stats_actionsets/StatsAtomicActionsJob.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java index e984d68fc..a64a63c24 100644 --- a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java +++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java @@ -138,10 +138,10 @@ public class StatsAtomicActionsJob implements Serializable { r.setPubliclyFunded(usm.isPublicly_funded()); if (usm.isIs_bronze_oa()) r.setOpenAccessColor(OpenAccessColor.bronze); - else if (usm.isIs_gold()) - r.setOpenAccessColor(OpenAccessColor.bronze); else if (usm.isIs_gold()) r.setOpenAccessColor(OpenAccessColor.gold); + else if (usm.isIs_hybrid()) + r.setOpenAccessColor(OpenAccessColor.hybrid); return r; }, Encoders.bean(Result.class)); } From 48430a32a63cc80b32bce14104b9f11037960adf Mon Sep 17 00:00:00 2001 From: dimitrispie Date: Fri, 1 Dec 2023 11:35:01 +0200 Subject: [PATCH 4/4] Update StatsAtomicActionsJob.java Added indi_funded_result_with_fundref indicator --- .../StatsAtomicActionsJob.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java index a64a63c24..0c3c2f3c0 100644 --- a/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java +++ b/dhp-workflows/dhp-stats-actionsets/src/main/java/eu/dnetlib/dhp/actionmanager/stats_actionsets/StatsAtomicActionsJob.java @@ -96,14 +96,16 @@ public class StatsAtomicActionsJob implements Serializable { .sql( String .format( - "select b.%s as id, is_gold, is_bronze_oa, is_hybrid,green_oa, in_diamond_journal,f.fundref as publicly_funded " - + "from %s.indi_pub_bronze_oa b " + - "left outer join %s.indi_pub_gold_oa g on g.id=b.id " + - "left outer join %s.indi_pub_hybrid h on b.id=h.id " + - "left outer join %s.indi_pub_green_oa gr on b.id=gr.id " + + "select r.%s as id, is_gold, is_bronze_oa, is_hybrid,green_oa, in_diamond_journal,f.publicly_funded as publicly_funded " + + + "from %s.publication r " + + "left outer join %s.indi_pub_bronze_oa b on r.id=b.id " + + "left outer join %s.indi_pub_gold_oa g on r.id=g.id " + + "left outer join %s.indi_pub_hybrid h on r.id=h.id " + + "left outer join %s.indi_pub_green_oa gr on r.id=gr.id " + "left outer join %s.indi_pub_diamond d on b.id=d.id " + - "left outer join %s.indi_funded_result_with_fundref f on b.id=f.id ", - resultAttributeName, dbname, dbname, dbname, dbname, dbname, dbname)) + "left outer join %s.indi_pub_publicly_funded f on r.id=f.id ", + resultAttributeName, dbname, dbname, dbname, dbname, dbname, dbname, dbname)) .as(Encoders.bean(StatsResultEnhancementModel.class)) .write() .mode(SaveMode.Overwrite) @@ -136,12 +138,12 @@ public class StatsAtomicActionsJob implements Serializable { r.setIsInDiamondJournal(usm.isIn_diamond_journal()); r.setIsGreen(usm.isGreen_oa()); r.setPubliclyFunded(usm.isPublicly_funded()); - if (usm.isIs_bronze_oa()) - r.setOpenAccessColor(OpenAccessColor.bronze); - else if (usm.isIs_gold()) + if (usm.isIs_gold()) r.setOpenAccessColor(OpenAccessColor.gold); else if (usm.isIs_hybrid()) r.setOpenAccessColor(OpenAccessColor.hybrid); + else if (usm.isIs_bronze_oa()) + r.setOpenAccessColor(OpenAccessColor.bronze); return r; }, Encoders.bean(Result.class)); }