forked from D-Net/dnet-hadoop
Merge pull request 'StatsDB workflow to export actionsets about OA routes, diamond, and publicly-funded' (#355) from dimitris.pierrakos/dnet-hadoop:beta into beta
Reviewed-on: D-Net/dnet-hadoop#355
This commit is contained in:
commit
0c3c9ea43d
|
@ -0,0 +1,75 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-workflows</artifactId>
|
||||
<version>1.2.5-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>dhp-stats-actionsets</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpclient</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_2.11</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql_2.11</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-common</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>net.sf.saxon</groupId>
|
||||
<artifactId>Saxon-HE</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>dom4j</groupId>
|
||||
<artifactId>dom4j</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>xml-apis</groupId>
|
||||
<artifactId>xml-apis</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>jaxen</groupId>
|
||||
<artifactId>jaxen</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.json</groupId>
|
||||
<artifactId>json</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.poi/poi-ooxml -->
|
||||
<dependency>
|
||||
<groupId>org.apache.poi</groupId>
|
||||
<artifactId>poi-ooxml</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-compress</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.mongodb</groupId>
|
||||
<artifactId>mongo-java-driver</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,162 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.stats_actionsets;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.apache.spark.sql.types.DataTypes;
|
||||
import org.apache.spark.sql.types.StructField;
|
||||
import org.apache.spark.sql.types.StructType;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
import scala.Tuple2;
|
||||
|
||||
/**
|
||||
* created the Atomic Action for each type of results
|
||||
*/
|
||||
public class StatsAtomicActionsJob implements Serializable {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(StatsAtomicActionsJob.class);
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static <I extends Result> void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
StatsAtomicActionsJob.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/actionmanager/stats_actionsets/input_actionset_parameter.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath {}: ", outputPath);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
||||
conf.set("spark.speculation", "false");
|
||||
conf.set("spark.hadoop.mapreduce.map.speculative", "false");
|
||||
conf.set("spark.hadoop.mapreduce.reduce.speculative", "false");
|
||||
|
||||
final String dbname = parser.get("statsDB");
|
||||
|
||||
final String workingPath = parser.get("workingPath");
|
||||
|
||||
runWithSparkHiveSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
removeOutputDir(spark, outputPath);
|
||||
prepareResultEnhancement(dbname, spark, workingPath + "/resultEnhancements", "id");
|
||||
writeActionSet(spark, workingPath, outputPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static void prepareResultEnhancement(String dbname, SparkSession spark, String workingPath,
|
||||
String resultAttributeName) {
|
||||
spark
|
||||
.sql(
|
||||
String
|
||||
.format(
|
||||
"select r.%s as id, is_gold, is_bronze_oa, is_hybrid,green_oa, in_diamond_journal,f.publicly_funded as publicly_funded "
|
||||
+
|
||||
"from %s.publication r " +
|
||||
"left outer join %s.indi_pub_bronze_oa b on r.id=b.id " +
|
||||
"left outer join %s.indi_pub_gold_oa g on r.id=g.id " +
|
||||
"left outer join %s.indi_pub_hybrid h on r.id=h.id " +
|
||||
"left outer join %s.indi_pub_green_oa gr on r.id=gr.id " +
|
||||
"left outer join %s.indi_pub_diamond d on b.id=d.id " +
|
||||
"left outer join %s.indi_pub_publicly_funded f on r.id=f.id ",
|
||||
resultAttributeName, dbname, dbname, dbname, dbname, dbname, dbname, dbname))
|
||||
.as(Encoders.bean(StatsResultEnhancementModel.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingPath);
|
||||
}
|
||||
|
||||
public static void writeActionSet(SparkSession spark, String inputPath, String outputPath) {
|
||||
|
||||
getResultEnhancements(spark, inputPath + "/resultEnhancements")
|
||||
.toJavaRDD()
|
||||
.map(p -> new AtomicAction(p.getClass(), p))
|
||||
.mapToPair(
|
||||
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
||||
.saveAsHadoopFile(
|
||||
outputPath,
|
||||
Text.class,
|
||||
Text.class,
|
||||
SequenceFileOutputFormat.class,
|
||||
GzipCodec.class);
|
||||
}
|
||||
|
||||
private static Dataset<Result> getResultEnhancements(SparkSession spark, String inputPath) {
|
||||
|
||||
return readPath(spark, inputPath, StatsResultEnhancementModel.class)
|
||||
.map((MapFunction<StatsResultEnhancementModel, Result>) usm -> {
|
||||
Result r = new Result();
|
||||
r.setId("50|" + usm.getId());
|
||||
r.setIsInDiamondJournal(usm.isIn_diamond_journal());
|
||||
r.setIsGreen(usm.isGreen_oa());
|
||||
r.setPubliclyFunded(usm.isPublicly_funded());
|
||||
if (usm.isIs_gold())
|
||||
r.setOpenAccessColor(OpenAccessColor.gold);
|
||||
else if (usm.isIs_hybrid())
|
||||
r.setOpenAccessColor(OpenAccessColor.hybrid);
|
||||
else if (usm.isIs_bronze_oa())
|
||||
r.setOpenAccessColor(OpenAccessColor.bronze);
|
||||
return r;
|
||||
}, Encoders.bean(Result.class));
|
||||
}
|
||||
|
||||
private static void removeOutputDir(SparkSession spark, String path) {
|
||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||
}
|
||||
|
||||
public static <R> Dataset<R> readPath(
|
||||
SparkSession spark, String inputPath, Class<R> clazz) {
|
||||
return spark
|
||||
.read()
|
||||
.textFile(inputPath)
|
||||
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.stats_actionsets;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
|
||||
/**
|
||||
* @author dimitris.pierrakos
|
||||
* @Date 30/10/23
|
||||
*/
|
||||
public class StatsResultEnhancementModel implements Serializable {
|
||||
private String id;
|
||||
private Boolean is_gold;
|
||||
private Boolean is_bronze_oa;
|
||||
private Boolean is_hybrid;
|
||||
private boolean in_diamond_journal;
|
||||
private boolean green_oa;
|
||||
private boolean publicly_funded;
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public Boolean isIs_gold() {
|
||||
return is_gold;
|
||||
}
|
||||
|
||||
public void setIs_gold(Boolean is_gold) {
|
||||
this.is_gold = is_gold;
|
||||
}
|
||||
|
||||
public Boolean isIs_bronze_oa() {
|
||||
return is_bronze_oa;
|
||||
}
|
||||
|
||||
public void setIs_bronze_oa(Boolean is_bronze_oa) {
|
||||
this.is_bronze_oa = is_bronze_oa;
|
||||
}
|
||||
|
||||
public Boolean isIs_hybrid() {
|
||||
return is_hybrid;
|
||||
}
|
||||
|
||||
public void setIs_hybrid(Boolean is_hybrid) {
|
||||
this.is_hybrid = is_hybrid;
|
||||
}
|
||||
|
||||
public boolean isIn_diamond_journal() {
|
||||
return in_diamond_journal;
|
||||
}
|
||||
|
||||
public void setIn_diamond_journal(boolean in_diamond_journal) {
|
||||
this.in_diamond_journal = in_diamond_journal;
|
||||
}
|
||||
|
||||
public boolean isGreen_oa() {
|
||||
return green_oa;
|
||||
}
|
||||
|
||||
public void setGreen_oa(boolean green_oa) {
|
||||
this.green_oa = green_oa;
|
||||
}
|
||||
|
||||
public boolean isPublicly_funded() {
|
||||
return publicly_funded;
|
||||
}
|
||||
|
||||
public void setPublicly_funded(boolean publicly_funded) {
|
||||
this.publicly_funded = publicly_funded;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "hmu",
|
||||
"paramLongName": "hive_metastore_uris",
|
||||
"paramDescription": "the URI for the hive metastore",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "o",
|
||||
"paramLongName": "outputPath",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "sdb",
|
||||
"paramLongName": "statsDB",
|
||||
"paramDescription": "the name of the stats db to be used",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "wp",
|
||||
"paramLongName": "workingPath",
|
||||
"paramDescription": "the workingPath where to save the content of the usage_stats table",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,30 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveJdbcUrl</name>
|
||||
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveDbName</name>
|
||||
<value>openaire</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,97 @@
|
|||
<workflow-app name="StatsActionSets" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>outputPath</name>
|
||||
<description>the path where to store the actionset</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>statsDB</name>
|
||||
<description>the name of the stats db to be used</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozieActionShareLibForSpark2</name>
|
||||
<description>oozie action sharelib for spark 2.*</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
<description>spark 2.* extra listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
<description>spark 2.* sql query execution listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<description>spark 2.* yarn history server address</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<description>spark 2.* event log dir location</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>mapreduce.job.queuename</name>
|
||||
<value>${queueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||
<value>${oozieLauncherQueueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
</global>
|
||||
<start to="atomicactionsStats"/>
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="atomicactionsStats">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Produces the atomic action with the stats green_oa for results</name>
|
||||
<class>eu.dnetlib.dhp.actionmanager.stats_actionsets.StatsAtomicActionsJob</class>
|
||||
<jar>dhp-stats-actionsets-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
</spark-opts>
|
||||
<arg>--hive_metastore_uris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||
<arg>--statsDB</arg><arg>${statsDB}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -0,0 +1,12 @@
|
|||
# Set root logger level to DEBUG and its only appender to A1.
|
||||
log4j.rootLogger=INFO, A1
|
||||
|
||||
# A1 is set to be a ConsoleAppender.
|
||||
log4j.appender.A1=org.apache.log4j.ConsoleAppender
|
||||
|
||||
# A1 uses PatternLayout.
|
||||
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
|
||||
|
||||
log4j.logger.org.apache.spark=FATAL
|
||||
log4j.logger.org.spark_project=FATAL
|
Loading…
Reference in New Issue