diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java index 9b99097ce..635402a72 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java @@ -1,7 +1,8 @@ package eu.dnetlib.dhp.oa.graph.raw; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.common.GraphSupport.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.util.Objects; import java.util.Optional; @@ -9,29 +10,23 @@ import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.common.GraphFormat; import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Oaf; import scala.Tuple2; public class MergeClaimsApplication { private static final Logger log = LoggerFactory.getLogger(MergeClaimsApplication.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils @@ -53,8 +48,14 @@ public class MergeClaimsApplication { final String claimsGraphPath = parser.get("claimsGraphPath"); log.info("claimsGraphPath: {}", claimsGraphPath); - final String outputRawGaphPath = parser.get("outputRawGaphPath"); - log.info("outputRawGaphPath: {}", outputRawGaphPath); + final String outputGraph = parser.get("outputGraph"); + log.info("outputGraph: {}", outputGraph); + + final GraphFormat format = Optional + .ofNullable(parser.get("format")) + .map(GraphFormat::valueOf) + .orElse(GraphFormat.JSON); + log.info("graphFormat: {}", format); String graphTableClassName = parser.get("graphTableClassName"); log.info("graphTableClassName: {}", graphTableClassName); @@ -65,7 +66,7 @@ public class MergeClaimsApplication { conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - runWithSparkSession( + runWithSparkHiveSession( conf, isSparkSessionManaged, spark -> { @@ -73,29 +74,25 @@ public class MergeClaimsApplication { String rawPath = rawGraphPath + "/" + type; String claimPath = claimsGraphPath + "/" + type; - String outPath = outputRawGaphPath + "/" + type; - removeOutputDir(spark, outPath); - mergeByType(spark, rawPath, claimPath, outPath, clazz); + deleteGraphTable(spark, clazz, outputGraph, format); + mergeByType(spark, rawPath, claimPath, outputGraph, format, clazz); }); } private static void mergeByType( - SparkSession spark, String rawPath, String claimPath, String outPath, Class clazz) { - Dataset> raw = readFromPath(spark, rawPath, clazz) + SparkSession spark, String rawPath, String claimPath, String outputGraph, GraphFormat format, Class clazz) { + Dataset> raw = readGraphJSON(spark, rawPath, clazz) .map( (MapFunction>) value -> new Tuple2<>(ModelSupport.idFn().apply(value), value), Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); - final JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - Dataset> claim = jsc - .broadcast(readFromPath(spark, claimPath, clazz)) - .getValue() + Dataset> claim = readGraphJSON(spark, claimPath, clazz) .map( (MapFunction>) value -> new Tuple2<>(ModelSupport.idFn().apply(value), value), Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); - raw + Dataset merged = raw .joinWith(claim, raw.col("_1").equalTo(claim.col("_1")), "full_outer") .map( (MapFunction, Tuple2>, T>) value -> { @@ -107,28 +104,9 @@ public class MergeClaimsApplication { : opClaim.isPresent() ? opClaim.get()._2() : null; }, Encoders.bean(clazz)) - .filter(Objects::nonNull) - .map( - (MapFunction) value -> OBJECT_MAPPER.writeValueAsString(value), - Encoders.STRING()) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .text(outPath); + .filter(Objects::nonNull); + + saveGraphTable(merged, clazz, outputGraph, format); } - private static Dataset readFromPath( - SparkSession spark, String path, Class clazz) { - return spark - .read() - .textFile(path) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), - Encoders.bean(clazz)) - .filter((FilterFunction) value -> Objects.nonNull(ModelSupport.idFn().apply(value))); - } - - private static void removeOutputDir(SparkSession spark, String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); - } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json index 686fea643..e21f2dfb5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json @@ -6,9 +6,15 @@ "paramRequired": false }, { - "paramName": "rgp", - "paramLongName": "rawGraphPath", - "paramDescription": "the raw graph path", + "paramName": "og", + "paramLongName": "outputGraph", + "paramDescription": "the output graph (db name | path)", + "paramRequired": true + }, + { + "paramName": "gf", + "paramLongName": "graphFormat", + "paramDescription": "graph save format (json|parquet)", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index d8b61b5ea..3eba1fe93 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -2,8 +2,13 @@ - graphOutputPath - the target path to store raw graph + outputGraph + the target graph name (or path) + + + graphFormat + HIVE + the graph data format reuseContent @@ -340,7 +345,38 @@ - + + + + + ${wf:conf('graphFormat') eq 'JSON'} + ${wf:conf('graphFormat') eq 'HIVE'} + + + + + + + yarn + cluster + reset_DB + eu.dnetlib.dhp.common.ResetHiveDbApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --dbName${outputGraph} + + + + @@ -353,7 +389,6 @@ - yarn @@ -373,7 +408,8 @@ --rawGraphPath${workingDir}/graph_raw --claimsGraphPath${workingDir}/graph_claims - --outputRawGaphPath${graphOutputPath} + --outputGraph${outputGraph} + --graphFormat${graphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication @@ -399,7 +435,8 @@ --rawGraphPath${workingDir}/graph_raw --claimsGraphPath${workingDir}/graph_claims - --outputRawGaphPath${graphOutputPath} + --outputGraph${outputGraph} + --graphFormat${graphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset @@ -425,7 +462,8 @@ --rawGraphPath${workingDir}/graph_raw --claimsGraphPath${workingDir}/graph_claims - --outputRawGaphPath${graphOutputPath} + --outputGraph${outputGraph} + --graphFormat${graphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation @@ -451,7 +489,8 @@ --rawGraphPath${workingDir}/graph_raw --claimsGraphPath${workingDir}/graph_claims - --outputRawGaphPath${graphOutputPath} + --outputGraph${outputGraph} + --graphFormat${graphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software @@ -477,7 +516,8 @@ --rawGraphPath${workingDir}/graph_raw --claimsGraphPath${workingDir}/graph_claims - --outputRawGaphPath${graphOutputPath} + --outputGraph${outputGraph} + --graphFormat${graphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct @@ -503,7 +543,8 @@ --rawGraphPath${workingDir}/graph_raw --claimsGraphPath${workingDir}/graph_claims - --outputRawGaphPath${graphOutputPath} + --outputGraph${outputGraph} + --graphFormat${graphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource @@ -529,7 +570,8 @@ --rawGraphPath${workingDir}/graph_raw --claimsGraphPath${workingDir}/graph_claims - --outputRawGaphPath${graphOutputPath} + --outputGraph${outputGraph} + --graphFormat${graphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization @@ -555,7 +597,8 @@ --rawGraphPath${workingDir}/graph_raw --claimsGraphPath${workingDir}/graph_claims - --outputRawGaphPath${graphOutputPath} + --outputGraph${outputGraph} + --graphFormat${graphFormat} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphFormat.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphFormat.java new file mode 100644 index 000000000..865f4a590 --- /dev/null +++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphFormat.java @@ -0,0 +1,8 @@ + +package eu.dnetlib.dhp.common; + +public enum GraphFormat { + + JSON, HIVE + +} diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java index 720a5802b..d27c3c01d 100644 --- a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java +++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java @@ -1,12 +1,16 @@ package eu.dnetlib.dhp.common; -import org.apache.spark.sql.DataFrameWriter; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.SaveMode; +import java.util.Objects; + +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; + import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.Oaf; @@ -14,22 +18,78 @@ public class GraphSupport { private static final Logger log = LoggerFactory.getLogger(GraphSupport.class); - private static void saveGraphTable(Dataset dataset, Class clazz, String outputGraph, - eu.dnetlib.dhp.common.SaveMode saveMode) { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - log.info("saving graph in {} mode to {}", outputGraph, saveMode.toString()); + public static void deleteGraphTable(SparkSession spark, Class clazz, String outputGraph, + GraphFormat graphFormat) { + switch (graphFormat) { + + case JSON: + String outPath = outputGraph + "/" + clazz.getSimpleName().toLowerCase(); + removeOutputDir(spark, outPath); + break; + case HIVE: + String table = ModelSupport.tableIdentifier(outputGraph, clazz); + String sql = String.format("DROP TABLE IF EXISTS %s PURGE;", table); + log.info("running SQL: '{}'", sql); + spark.sql(sql); + break; + } + } + + public static void saveGraphTable(Dataset dataset, Class clazz, String outputGraph, + GraphFormat graphFormat) { final DataFrameWriter writer = dataset.write().mode(SaveMode.Overwrite); - switch (saveMode) { + switch (graphFormat) { case JSON: - writer.option("compression", "gzip").json(outputGraph); + String type = clazz.getSimpleName().toLowerCase(); + String outPath = outputGraph + "/" + type; + + log.info("saving graph in {} mode to {}", outputGraph, graphFormat.toString()); + + writer.option("compression", "gzip").json(outPath); break; - case PARQUET: + case HIVE: final String db_table = ModelSupport.tableIdentifier(outputGraph, clazz); + + log.info("saving graph in {} mode to {}", outputGraph, graphFormat.toString()); + writer.saveAsTable(db_table); break; } + } + public static Dataset readGraph( + SparkSession spark, String graph, Class clazz, GraphFormat format) { + + switch (format) { + case JSON: + return spark + .read() + .textFile(graph) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), + Encoders.bean(clazz)) + .filter((FilterFunction) value -> Objects.nonNull(ModelSupport.idFn().apply(value))); + case HIVE: + String table = ModelSupport.tableIdentifier(graph, clazz); + return spark.read().table(table).as(Encoders.bean(clazz)); + default: + throw new IllegalStateException(String.format("format not managed: '%s'", format)); + } + } + + public static Dataset readGraphPARQUET(SparkSession spark, String graph, Class clazz) { + return readGraph(spark, graph, clazz, GraphFormat.HIVE); + } + + public static Dataset readGraphJSON(SparkSession spark, String graph, Class clazz) { + return readGraph(spark, graph, clazz, GraphFormat.JSON); + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } } diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/ResetHiveDbApplication.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/ResetHiveDbApplication.java new file mode 100644 index 000000000..138b833f9 --- /dev/null +++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/ResetHiveDbApplication.java @@ -0,0 +1,50 @@ + +package eu.dnetlib.dhp.common; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class ResetHiveDbApplication { + + private static final Logger log = LoggerFactory.getLogger(ResetHiveDbApplication.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + ResetHiveDbApplication.class + .getResourceAsStream( + "/eu/dnetlib/dhp/common/reset_hive_db_parameters.json"))); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String dbName = Optional + .ofNullable(parser.get("dbName")) + .orElseThrow(() -> new IllegalArgumentException("missing DB name")); + log.info("dbName: {}", dbName); + + SparkConf conf = new SparkConf(); + + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", dbName)); + spark.sql(String.format("CREATE DATABASE %s", dbName)); + }); + } + +} diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/SaveMode.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/SaveMode.java deleted file mode 100644 index 54abdcfb6..000000000 --- a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/SaveMode.java +++ /dev/null @@ -1,8 +0,0 @@ - -package eu.dnetlib.dhp.common; - -public enum SaveMode { - - JSON, PARQUET - -} diff --git a/dhp-workflows/dhp-workflows-common/src/main/resources/eu/dnetlib/dhp/common/reset_hive_db_parameters.json b/dhp-workflows/dhp-workflows-common/src/main/resources/eu/dnetlib/dhp/common/reset_hive_db_parameters.json new file mode 100644 index 000000000..afc531148 --- /dev/null +++ b/dhp-workflows/dhp-workflows-common/src/main/resources/eu/dnetlib/dhp/common/reset_hive_db_parameters.json @@ -0,0 +1,14 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "db", + "paramLongName": "dbName", + "paramDescription": "the graph db name", + "paramRequired": true + } +] \ No newline at end of file