WIP: materialize graph as Hive DB, aggregator graph

This commit is contained in:
Claudio Atzori 2020-07-29 19:25:11 +02:00
parent ee0b2191f8
commit 9e594cf4c2
8 changed files with 227 additions and 76 deletions

View File

@ -1,7 +1,8 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.common.GraphSupport.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Objects;
import java.util.Optional;
@ -9,29 +10,23 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.GraphFormat;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import scala.Tuple2;
public class MergeClaimsApplication {
private static final Logger log = LoggerFactory.getLogger(MergeClaimsApplication.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
@ -53,8 +48,14 @@ public class MergeClaimsApplication {
final String claimsGraphPath = parser.get("claimsGraphPath");
log.info("claimsGraphPath: {}", claimsGraphPath);
final String outputRawGaphPath = parser.get("outputRawGaphPath");
log.info("outputRawGaphPath: {}", outputRawGaphPath);
final String outputGraph = parser.get("outputGraph");
log.info("outputGraph: {}", outputGraph);
final GraphFormat format = Optional
.ofNullable(parser.get("format"))
.map(GraphFormat::valueOf)
.orElse(GraphFormat.JSON);
log.info("graphFormat: {}", format);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
@ -65,7 +66,7 @@ public class MergeClaimsApplication {
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession(
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
@ -73,29 +74,25 @@ public class MergeClaimsApplication {
String rawPath = rawGraphPath + "/" + type;
String claimPath = claimsGraphPath + "/" + type;
String outPath = outputRawGaphPath + "/" + type;
removeOutputDir(spark, outPath);
mergeByType(spark, rawPath, claimPath, outPath, clazz);
deleteGraphTable(spark, clazz, outputGraph, format);
mergeByType(spark, rawPath, claimPath, outputGraph, format, clazz);
});
}
private static <T extends Oaf> void mergeByType(
SparkSession spark, String rawPath, String claimPath, String outPath, Class<T> clazz) {
Dataset<Tuple2<String, T>> raw = readFromPath(spark, rawPath, clazz)
SparkSession spark, String rawPath, String claimPath, String outputGraph, GraphFormat format, Class<T> clazz) {
Dataset<Tuple2<String, T>> raw = readGraphJSON(spark, rawPath, clazz)
.map(
(MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(ModelSupport.idFn().apply(value), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
final JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Dataset<Tuple2<String, T>> claim = jsc
.broadcast(readFromPath(spark, claimPath, clazz))
.getValue()
Dataset<Tuple2<String, T>> claim = readGraphJSON(spark, claimPath, clazz)
.map(
(MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(ModelSupport.idFn().apply(value), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
raw
Dataset<T> merged = raw
.joinWith(claim, raw.col("_1").equalTo(claim.col("_1")), "full_outer")
.map(
(MapFunction<Tuple2<Tuple2<String, T>, Tuple2<String, T>>, T>) value -> {
@ -107,28 +104,9 @@ public class MergeClaimsApplication {
: opClaim.isPresent() ? opClaim.get()._2() : null;
},
Encoders.bean(clazz))
.filter(Objects::nonNull)
.map(
(MapFunction<T, String>) value -> OBJECT_MAPPER.writeValueAsString(value),
Encoders.STRING())
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.text(outPath);
.filter(Objects::nonNull);
saveGraphTable(merged, clazz, outputGraph, format);
}
private static <T extends Oaf> Dataset<T> readFromPath(
SparkSession spark, String path, Class<T> clazz) {
return spark
.read()
.textFile(path)
.map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz),
Encoders.bean(clazz))
.filter((FilterFunction<T>) value -> Objects.nonNull(ModelSupport.idFn().apply(value)));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -6,9 +6,15 @@
"paramRequired": false
},
{
"paramName": "rgp",
"paramLongName": "rawGraphPath",
"paramDescription": "the raw graph path",
"paramName": "og",
"paramLongName": "outputGraph",
"paramDescription": "the output graph (db name | path)",
"paramRequired": true
},
{
"paramName": "gf",
"paramLongName": "graphFormat",
"paramDescription": "graph save format (json|parquet)",
"paramRequired": true
},
{

View File

@ -2,8 +2,13 @@
<parameters>
<property>
<name>graphOutputPath</name>
<description>the target path to store raw graph</description>
<name>outputGraph</name>
<description>the target graph name (or path)</description>
</property>
<property>
<name>graphFormat</name>
<value>HIVE</value>
<description>the graph data format</description>
</property>
<property>
<name>reuseContent</name>
@ -340,7 +345,38 @@
<error to="Kill"/>
</action>
<join name="wait_graphs" to="fork_merge_claims"/>
<join name="wait_graphs" to="should_drop_db"/>
<decision name="should_drop_db">
<switch>
<case to="fork_merge_claims">${wf:conf('graphFormat') eq 'JSON'}</case>
<case to="reset_DB">${wf:conf('graphFormat') eq 'HIVE'}</case>
<default to="reset_DB"/>
</switch>
</decision>
<action name="reset_DB">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>reset_DB</name>
<class>eu.dnetlib.dhp.common.ResetHiveDbApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--dbName</arg><arg>${outputGraph}</arg>
</spark>
<ok to="fork_merge_claims"/>
<error to="Kill"/>
</action>
<fork name="fork_merge_claims">
<path start="merge_claims_publication"/>
@ -353,7 +389,6 @@
<path start="merge_claims_relation"/>
</fork>
<action name="merge_claims_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
@ -373,7 +408,8 @@
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
</spark>
<ok to="wait_merge"/>
@ -399,7 +435,8 @@
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
</spark>
<ok to="wait_merge"/>
@ -425,7 +462,8 @@
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
</spark>
<ok to="wait_merge"/>
@ -451,7 +489,8 @@
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
</spark>
<ok to="wait_merge"/>
@ -477,7 +516,8 @@
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
</spark>
<ok to="wait_merge"/>
@ -503,7 +543,8 @@
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
</spark>
<ok to="wait_merge"/>
@ -529,7 +570,8 @@
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
</spark>
<ok to="wait_merge"/>
@ -555,7 +597,8 @@
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
</spark>
<ok to="wait_merge"/>

View File

@ -0,0 +1,8 @@
package eu.dnetlib.dhp.common;
public enum GraphFormat {
JSON, HIVE
}

View File

@ -1,12 +1,16 @@
package eu.dnetlib.dhp.common;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import java.util.Objects;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
@ -14,22 +18,78 @@ public class GraphSupport {
private static final Logger log = LoggerFactory.getLogger(GraphSupport.class);
private static <T extends Oaf> void saveGraphTable(Dataset<T> dataset, Class<T> clazz, String outputGraph,
eu.dnetlib.dhp.common.SaveMode saveMode) {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
log.info("saving graph in {} mode to {}", outputGraph, saveMode.toString());
public static <T extends Oaf> void deleteGraphTable(SparkSession spark, Class<T> clazz, String outputGraph,
GraphFormat graphFormat) {
switch (graphFormat) {
case JSON:
String outPath = outputGraph + "/" + clazz.getSimpleName().toLowerCase();
removeOutputDir(spark, outPath);
break;
case HIVE:
String table = ModelSupport.tableIdentifier(outputGraph, clazz);
String sql = String.format("DROP TABLE IF EXISTS %s PURGE;", table);
log.info("running SQL: '{}'", sql);
spark.sql(sql);
break;
}
}
public static <T extends Oaf> void saveGraphTable(Dataset<T> dataset, Class<T> clazz, String outputGraph,
GraphFormat graphFormat) {
final DataFrameWriter<T> writer = dataset.write().mode(SaveMode.Overwrite);
switch (saveMode) {
switch (graphFormat) {
case JSON:
writer.option("compression", "gzip").json(outputGraph);
String type = clazz.getSimpleName().toLowerCase();
String outPath = outputGraph + "/" + type;
log.info("saving graph in {} mode to {}", outputGraph, graphFormat.toString());
writer.option("compression", "gzip").json(outPath);
break;
case PARQUET:
case HIVE:
final String db_table = ModelSupport.tableIdentifier(outputGraph, clazz);
log.info("saving graph in {} mode to {}", outputGraph, graphFormat.toString());
writer.saveAsTable(db_table);
break;
}
}
public static <T extends Oaf> Dataset<T> readGraph(
SparkSession spark, String graph, Class<T> clazz, GraphFormat format) {
switch (format) {
case JSON:
return spark
.read()
.textFile(graph)
.map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz),
Encoders.bean(clazz))
.filter((FilterFunction<T>) value -> Objects.nonNull(ModelSupport.idFn().apply(value)));
case HIVE:
String table = ModelSupport.tableIdentifier(graph, clazz);
return spark.read().table(table).as(Encoders.bean(clazz));
default:
throw new IllegalStateException(String.format("format not managed: '%s'", format));
}
}
public static <T extends Oaf> Dataset<T> readGraphPARQUET(SparkSession spark, String graph, Class<T> clazz) {
return readGraph(spark, graph, clazz, GraphFormat.HIVE);
}
public static <T extends Oaf> Dataset<T> readGraphJSON(SparkSession spark, String graph, Class<T> clazz) {
return readGraph(spark, graph, clazz, GraphFormat.JSON);
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -0,0 +1,50 @@
package eu.dnetlib.dhp.common;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class ResetHiveDbApplication {
private static final Logger log = LoggerFactory.getLogger(ResetHiveDbApplication.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
ResetHiveDbApplication.class
.getResourceAsStream(
"/eu/dnetlib/dhp/common/reset_hive_db_parameters.json")));
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String dbName = Optional
.ofNullable(parser.get("dbName"))
.orElseThrow(() -> new IllegalArgumentException("missing DB name"));
log.info("dbName: {}", dbName);
SparkConf conf = new SparkConf();
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", dbName));
spark.sql(String.format("CREATE DATABASE %s", dbName));
});
}
}

View File

@ -1,8 +0,0 @@
package eu.dnetlib.dhp.common;
public enum SaveMode {
JSON, PARQUET
}

View File

@ -0,0 +1,14 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "db",
"paramLongName": "dbName",
"paramDescription": "the graph db name",
"paramRequired": true
}
]