StatsDB workflow to export actionsets about OA routes, diamond, and publicly-funded #355

Merged
claudio.atzori merged 4 commits from dimitris.pierrakos/dnet-hadoop:beta into beta 2023-12-01 15:03:58 +01:00
7 changed files with 484 additions and 0 deletions

View File

@ -0,0 +1,75 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows</artifactId>
<version>1.2.5-SNAPSHOT</version>
</parent>
<artifactId>dhp-stats-actionsets</artifactId>
<dependencies>
dimitris.pierrakos marked this conversation as resolved
Review

If this block is commented out and it is not needed, then please remove it

If this block is commented out and it is not needed, then please remove it
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>net.sf.saxon</groupId>
<artifactId>Saxon-HE</artifactId>
</dependency>
<dependency>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
</dependency>
<dependency>
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
</dependency>
<dependency>
<groupId>jaxen</groupId>
<artifactId>jaxen</artifactId>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.poi/poi-ooxml -->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,162 @@
package eu.dnetlib.dhp.actionmanager.stats_actionsets;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2;
/**
* created the Atomic Action for each type of results
*/
public class StatsAtomicActionsJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(StatsAtomicActionsJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static <I extends Result> void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
StatsAtomicActionsJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
conf.set("spark.speculation", "false");
conf.set("spark.hadoop.mapreduce.map.speculative", "false");
conf.set("spark.hadoop.mapreduce.reduce.speculative", "false");
final String dbname = parser.get("statsDB");
final String workingPath = parser.get("workingPath");
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
prepareResultEnhancement(dbname, spark, workingPath + "/resultEnhancements", "id");
writeActionSet(spark, workingPath, outputPath);
});
}
private static void prepareResultEnhancement(String dbname, SparkSession spark, String workingPath,
String resultAttributeName) {
spark
.sql(
String
.format(
"select r.%s as id, is_gold, is_bronze_oa, is_hybrid,green_oa, in_diamond_journal,f.publicly_funded as publicly_funded "
+
"from %s.publication r " +
"left outer join %s.indi_pub_bronze_oa b on r.id=b.id " +
"left outer join %s.indi_pub_gold_oa g on r.id=g.id " +
"left outer join %s.indi_pub_hybrid h on r.id=h.id " +
"left outer join %s.indi_pub_green_oa gr on r.id=gr.id " +
"left outer join %s.indi_pub_diamond d on b.id=d.id " +
"left outer join %s.indi_pub_publicly_funded f on r.id=f.id ",
resultAttributeName, dbname, dbname, dbname, dbname, dbname, dbname, dbname))
.as(Encoders.bean(StatsResultEnhancementModel.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath);
}
public static void writeActionSet(SparkSession spark, String inputPath, String outputPath) {
getResultEnhancements(spark, inputPath + "/resultEnhancements")
.toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(
outputPath,
Text.class,
Text.class,
SequenceFileOutputFormat.class,
GzipCodec.class);
}
private static Dataset<Result> getResultEnhancements(SparkSession spark, String inputPath) {
return readPath(spark, inputPath, StatsResultEnhancementModel.class)
.map((MapFunction<StatsResultEnhancementModel, Result>) usm -> {
Result r = new Result();
r.setId("50|" + usm.getId());
r.setIsInDiamondJournal(usm.isIn_diamond_journal());
r.setIsGreen(usm.isGreen_oa());
r.setPubliclyFunded(usm.isPublicly_funded());
if (usm.isIs_gold())
r.setOpenAccessColor(OpenAccessColor.gold);
else if (usm.isIs_hybrid())
dimitris.pierrakos marked this conversation as resolved Outdated

Please check the chain of the conditions, it seems the usm.isIs_gold() is checked twice, leading to different then blocks, one sets the OpenAccess color to bronze, the other to gold, which seems counter intuitive. I assume it might be not what you were expecting to write.

Please check the chain of the conditions, it seems the `usm.isIs_gold()` is checked twice, leading to different then blocks, one sets the OpenAccess color to bronze, the other to gold, which seems counter intuitive. I assume it might be not what you were expecting to write.
r.setOpenAccessColor(OpenAccessColor.hybrid);
else if (usm.isIs_bronze_oa())
r.setOpenAccessColor(OpenAccessColor.bronze);
return r;
}, Encoders.bean(Result.class));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
public static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
dimitris.pierrakos marked this conversation as resolved
Review

Gitea warns me about the character B used in the string "/diamondOADΒ"

B [U+0392] is confusable with B [U+0042]

Any chance that it is a typo from a different keyboard setting?

Gitea warns me about the character `B` used in the string `"/diamondOADΒ"` ``` B [U+0392] is confusable with B [U+0042] ``` Any chance that it is a typo from a different keyboard setting?
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
}

View File

@ -0,0 +1,76 @@
package eu.dnetlib.dhp.actionmanager.stats_actionsets;
import java.io.Serializable;
import eu.dnetlib.dhp.schema.oaf.*;
/**
* @author dimitris.pierrakos
* @Date 30/10/23
*/
public class StatsResultEnhancementModel implements Serializable {
private String id;
private Boolean is_gold;
private Boolean is_bronze_oa;
private Boolean is_hybrid;
private boolean in_diamond_journal;
private boolean green_oa;
private boolean publicly_funded;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Boolean isIs_gold() {
return is_gold;
}
public void setIs_gold(Boolean is_gold) {
this.is_gold = is_gold;
}
public Boolean isIs_bronze_oa() {
return is_bronze_oa;
}
public void setIs_bronze_oa(Boolean is_bronze_oa) {
this.is_bronze_oa = is_bronze_oa;
}
public Boolean isIs_hybrid() {
return is_hybrid;
}
public void setIs_hybrid(Boolean is_hybrid) {
this.is_hybrid = is_hybrid;
}
public boolean isIn_diamond_journal() {
return in_diamond_journal;
}
public void setIn_diamond_journal(boolean in_diamond_journal) {
this.in_diamond_journal = in_diamond_journal;
}
public boolean isGreen_oa() {
return green_oa;
}
public void setGreen_oa(boolean green_oa) {
this.green_oa = green_oa;
}
public boolean isPublicly_funded() {
return publicly_funded;
}
public void setPublicly_funded(boolean publicly_funded) {
this.publicly_funded = publicly_funded;
}
}

View File

@ -0,0 +1,32 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "hmu",
"paramLongName": "hive_metastore_uris",
"paramDescription": "the URI for the hive metastore",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
},
{
"paramName": "sdb",
"paramLongName": "statsDB",
"paramDescription": "the name of the stats db to be used",
"paramRequired": true
},
{
"paramName": "wp",
"paramLongName": "workingPath",
"paramDescription": "the workingPath where to save the content of the usage_stats table",
"paramRequired": true
}
]

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,97 @@
<workflow-app name="StatsActionSets" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>outputPath</name>
<description>the path where to store the actionset</description>
</property>
<property>
<name>statsDB</name>
<description>the name of the stats db to be used</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="atomicactionsStats"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="atomicactionsStats">
dimitris.pierrakos marked this conversation as resolved
Review

If I well understand from the spark action name, this action would produce an actionset contaning the usage stats metrics (views / downloads). However, an oozie workflow responsible for exporting such information is already available under

dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml

Why would we need to duplicate it? Can you elaborate?

Furthermore, I see the tag points to eu.dnetlib.dhp.actionmanager.stats_actionsets.SparkAtomicActionGreenOAJob which does not exist in the classpath.

If I well understand from the spark action name, this action would produce an actionset contaning the usage stats metrics (views / downloads). However, an oozie workflow responsible for exporting such information is already available under ``` dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml ``` Why would we need to duplicate it? Can you elaborate? Furthermore, I see the <class> tag points to `eu.dnetlib.dhp.actionmanager.stats_actionsets.SparkAtomicActionGreenOAJob` which does not exist in the classpath.
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Produces the atomic action with the stats green_oa for results</name>
<class>eu.dnetlib.dhp.actionmanager.stats_actionsets.StatsAtomicActionsJob</class>
<jar>dhp-stats-actionsets-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--hive_metastore_uris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--statsDB</arg><arg>${statsDB}</arg>
<arg>--workingPath</arg><arg>${workingDir}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,12 @@
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=INFO, A1
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
log4j.logger.org.apache.spark=FATAL
log4j.logger.org.spark_project=FATAL