WIP: materialize graph as Hive DB, mergeAggregatorGraphs

This commit is contained in:
Claudio Atzori 2020-08-04 12:26:09 +02:00
parent 0da1d2c0c9
commit 771bf8bcc4
3 changed files with 130 additions and 90 deletions

View File

@ -1,11 +1,15 @@
package eu.dnetlib.dhp.oa.graph.merge;
import static eu.dnetlib.dhp.common.GraphSupport.deleteGraphTable;
import static eu.dnetlib.dhp.common.GraphSupport.saveGraphTable;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Objects;
import java.util.Optional;
import eu.dnetlib.dhp.common.GraphFormat;
import eu.dnetlib.dhp.common.GraphSupport;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
@ -20,7 +24,6 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
@ -35,8 +38,6 @@ public class MergeGraphSparkJob {
private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String PRIORITY_DEFAULT = "BETA"; // BETA | PROD
public static void main(String[] args) throws Exception {
@ -60,19 +61,31 @@ public class MergeGraphSparkJob {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String betaInputPath = parser.get("betaInputPath");
log.info("betaInputPath: {}", betaInputPath);
String betaInputGraph = parser.get("betaInputGraph");
log.info("betaInputGraph: {}", betaInputGraph);
String prodInputPath = parser.get("prodInputPath");
log.info("prodInputPath: {}", prodInputPath);
String prodInputGraph = parser.get("prodInputGraph");
log.info("prodInputGraph: {}", prodInputGraph);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String outputGraph = parser.get("outputGraph");
log.info("outputGraph: {}", outputGraph);
GraphFormat inputGraphFormat = Optional
.ofNullable(parser.get("inputGraphFormat"))
.map(GraphFormat::valueOf)
.orElse(GraphFormat.DEFAULT);
log.info("inputGraphFormat: {}", inputGraphFormat);
GraphFormat outputGraphFormat = Optional
.ofNullable(parser.get("outputGraphFormat"))
.map(GraphFormat::valueOf)
.orElse(GraphFormat.DEFAULT);
log.info("outputGraphFormat: {}", outputGraphFormat);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
Class<? extends Oaf> clazz = (Class<? extends Oaf>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
@ -82,41 +95,39 @@ public class MergeGraphSparkJob {
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
mergeGraphTable(spark, priority, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath);
deleteGraphTable(spark, clazz, outputGraph, outputGraphFormat);
mergeGraphTable(spark, priority, betaInputGraph, prodInputGraph, clazz, clazz, outputGraph, inputGraphFormat, outputGraphFormat);
});
}
private static <P extends Oaf, B extends Oaf> void mergeGraphTable(
SparkSession spark,
String priority,
String betaInputPath,
String prodInputPath,
Class<P> p_clazz,
Class<B> b_clazz,
String outputPath) {
SparkSession spark,
String priority,
String betaInputGraph,
String prodInputGraph,
Class<P> p_clazz,
Class<B> b_clazz,
String outputGraph, GraphFormat inputGraphFormat, GraphFormat outputGraphFormat) {
Dataset<Tuple2<String, B>> beta = readTableFromPath(spark, betaInputPath, b_clazz);
Dataset<Tuple2<String, P>> prod = readTableFromPath(spark, prodInputPath, p_clazz);
Dataset<Tuple2<String, B>> beta = readGraph(spark, betaInputGraph, b_clazz, inputGraphFormat);
Dataset<Tuple2<String, P>> prod = readGraph(spark, prodInputGraph, p_clazz, inputGraphFormat);
prod
.joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer")
.map((MapFunction<Tuple2<Tuple2<String, P>, Tuple2<String, B>>, P>) value -> {
Optional<P> p = Optional.ofNullable(value._1()).map(Tuple2::_2);
Optional<B> b = Optional.ofNullable(value._2()).map(Tuple2::_2);
switch (priority) {
default:
case "BETA":
return mergeWithPriorityToBETA(p, b);
case "PROD":
return mergeWithPriorityToPROD(p, b);
}
}, Encoders.bean(p_clazz))
.filter((FilterFunction<P>) Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
Dataset<P> merged = prod
.joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer")
.map((MapFunction<Tuple2<Tuple2<String, P>, Tuple2<String, B>>, P>) value -> {
Optional<P> p = Optional.ofNullable(value._1()).map(Tuple2::_2);
Optional<B> b = Optional.ofNullable(value._2()).map(Tuple2::_2);
switch (priority) {
default:
case "BETA":
return mergeWithPriorityToBETA(p, b);
case "PROD":
return mergeWithPriorityToPROD(p, b);
}
}, Encoders.bean(p_clazz))
.filter((FilterFunction<P>) Objects::nonNull);
saveGraphTable(merged, p_clazz, outputGraph, outputGraphFormat);
}
private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToPROD(Optional<P> p, Optional<B> b) {
@ -139,24 +150,15 @@ public class MergeGraphSparkJob {
return null;
}
private static <T extends Oaf> Dataset<Tuple2<String, T>> readTableFromPath(
SparkSession spark, String inputEntityPath, Class<T> clazz) {
private static <T extends Oaf> Dataset<Tuple2<String, T>> readGraph(
SparkSession spark, String inputGraph, Class<T> clazz, GraphFormat inputGraphFormat) {
log.info("Reading Graph table from: {}", inputEntityPath);
return spark
.read()
.textFile(inputEntityPath)
.map(
(MapFunction<String, Tuple2<String, T>>) value -> {
final T t = OBJECT_MAPPER.readValue(value, clazz);
log.info("Reading Graph table from: {}", inputGraph);
return GraphSupport.readGraph(spark, inputGraph, clazz, inputGraphFormat)
.map((MapFunction<T, Tuple2<String, T>>) t -> {
final String id = ModelSupport.idFn().apply(t);
return new Tuple2<>(id, t);
},
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}, Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
}
}

View File

@ -2,21 +2,31 @@
<parameters>
<property>
<name>betaInputGgraphPath</name>
<description>the beta graph root path</description>
<name>betaInputGraph</name>
<description>the beta graph name (or path)</description>
</property>
<property>
<name>prodInputGgraphPath</name>
<description>the production graph root path</description>
<name>prodInputGraph</name>
<description>the production graph name (or path)</description>
</property>
<property>
<name>graphOutputPath</name>
<description>the output merged graph root path</description>
<name>outputGraph</name>
<description>the merged graph name (or path)</description>
</property>
<property>
<name>priority</name>
<description>decides from which infrastructure the content must win in case of ID clash</description>
</property>
<property>
<name>inputGraphFormat</name>
<value>HIVE</value>
<description>the input graph data format</description>
</property>
<property>
<name>outputGraphFormat</name>
<value>HIVE</value>
<description>the output graph data format</description>
</property>
<property>
<name>sparkDriverMemory</name>
@ -88,9 +98,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/publication</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/publication</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
@ -115,9 +127,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/dataset</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/dataset</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
@ -142,9 +156,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/otherresearchproduct</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
@ -169,9 +185,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/software</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/software</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
@ -196,9 +214,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/datasource</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/datasource</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg>
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
@ -223,9 +243,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/organization</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/organization</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
@ -250,9 +272,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/project</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/project</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
@ -277,9 +301,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/relation</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/relation</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>

View File

@ -7,20 +7,32 @@
},
{
"paramName": "bin",
"paramLongName": "betaInputPath",
"paramDescription": "the beta graph root path",
"paramLongName": "betaInputGraph",
"paramDescription": "the beta graph name (or path)",
"paramRequired": true
},
{
"paramName": "pin",
"paramLongName": "prodInputPath",
"paramDescription": "the production graph root path",
"paramLongName": "prodInputGraph",
"paramDescription": "the production graph name (or path)",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the output merged graph root path",
"paramLongName": "outputGraph",
"paramDescription": "the output merged graph (or path)",
"paramRequired": true
},
{
"paramName": "igf",
"paramLongName": "inputGraphFormat",
"paramDescription": "the format of the input graphs",
"paramRequired": true
},
{
"paramName": "ogf",
"paramLongName": "outputGraphFormat",
"paramDescription": "the format of the output merged graph",
"paramRequired": true
},
{