hive to hdfs export job with context cleaning for incremental graph
This commit is contained in:
parent
2b666c8aa6
commit
c9ec9ec726
|
@ -0,0 +1,80 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.graph.hive;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.sql.*;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
|
|
||||||
|
public class GraphHiveTableExporterJob {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(GraphHiveTableExporterJob.class);
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
GraphHiveTableExporterJob.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/oa/graph/hive_db_exporter_parameters.json")));
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
int numPartitions = Optional
|
||||||
|
.ofNullable(parser.get("numPartitions"))
|
||||||
|
.map(Integer::valueOf)
|
||||||
|
.orElse(-1);
|
||||||
|
log.info("numPartitions: {}", numPartitions);
|
||||||
|
|
||||||
|
String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
String hiveTableName = parser.get("hiveTableName");
|
||||||
|
log.info("hiveTableName: {}", hiveTableName);
|
||||||
|
|
||||||
|
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
|
||||||
|
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
|
||||||
|
|
||||||
|
String mode = parser.get("mode");
|
||||||
|
log.info("mode: {}", mode);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.set("hive.metastore.uris", hiveMetastoreUris);
|
||||||
|
|
||||||
|
runWithSparkHiveSession(
|
||||||
|
conf, isSparkSessionManaged,
|
||||||
|
spark -> saveGraphTable(spark, outputPath, hiveTableName, mode, numPartitions));
|
||||||
|
}
|
||||||
|
|
||||||
|
// protected for testing
|
||||||
|
private static <T extends Oaf> void saveGraphTable(SparkSession spark, String outputPath, String hiveTableName,
|
||||||
|
String mode, int numPartitions) {
|
||||||
|
|
||||||
|
Dataset<Row> dataset = spark.table(hiveTableName);
|
||||||
|
|
||||||
|
if (numPartitions > 0) {
|
||||||
|
log.info("repartitioning to {} partitions", numPartitions);
|
||||||
|
dataset = dataset.repartition(numPartitions);
|
||||||
|
}
|
||||||
|
|
||||||
|
dataset
|
||||||
|
.write()
|
||||||
|
.mode(mode)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,32 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "issm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "when true will stop SparkSession after job execution",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "out",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the path to the graph data dump to read",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "mode",
|
||||||
|
"paramLongName": "mode",
|
||||||
|
"paramDescription": "mode (append|overwrite)",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "hmu",
|
||||||
|
"paramLongName": "hiveMetastoreUris",
|
||||||
|
"paramDescription": "the hive metastore uris",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "db",
|
||||||
|
"paramLongName": "hiveTableName",
|
||||||
|
"paramDescription": "the input hive table identifier",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -21,19 +21,15 @@ object SparkAppendContextCleanedGraph {
|
||||||
val parser = new ArgumentApplicationParser(
|
val parser = new ArgumentApplicationParser(
|
||||||
IOUtils.toString(
|
IOUtils.toString(
|
||||||
getClass.getResourceAsStream(
|
getClass.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json"
|
"/eu/dnetlib/dhp/oa/graph/incremental/export_hive/append_context_cleaned_graph.json"
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
parser.parseArgument(args)
|
parser.parseArgument(args)
|
||||||
conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"))
|
conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"))
|
||||||
|
|
||||||
val graphBasePath = parser.get("graphBasePath")
|
val outputPath = parser.get("outputPath")
|
||||||
log.info(s"graphBasePath -> $graphBasePath")
|
log.info(s"outputPath -> $outputPath")
|
||||||
val relationPath = parser.get("relationPath")
|
|
||||||
log.info(s"relationPath -> $relationPath")
|
|
||||||
val targetPath = parser.get("targetGraph")
|
|
||||||
log.info(s"targetGraph -> $targetPath")
|
|
||||||
|
|
||||||
val hiveDbName = parser.get("hiveDbName")
|
val hiveDbName = parser.get("hiveDbName")
|
||||||
log.info(s"hiveDbName -> $hiveDbName")
|
log.info(s"hiveDbName -> $hiveDbName")
|
||||||
|
@ -46,7 +42,7 @@ object SparkAppendContextCleanedGraph {
|
||||||
.appName(getClass.getSimpleName)
|
.appName(getClass.getSimpleName)
|
||||||
.getOrCreate()
|
.getOrCreate()
|
||||||
|
|
||||||
for ((entity, clazz) <- ModelSupport.oafTypes.asScala) {
|
for ((entity, clazz) <- ModelSupport.oafTypes.asScala.filter(t => !Seq("datasource", "organization", "person", "project").contains(t._1))) {
|
||||||
if (classOf[OafEntity].isAssignableFrom(clazz)) {
|
if (classOf[OafEntity].isAssignableFrom(clazz)) {
|
||||||
val classEnc: Encoder[Oaf] = Encoders.bean(clazz).asInstanceOf[Encoder[Oaf]]
|
val classEnc: Encoder[Oaf] = Encoders.bean(clazz).asInstanceOf[Encoder[Oaf]]
|
||||||
|
|
||||||
|
@ -63,8 +59,9 @@ object SparkAppendContextCleanedGraph {
|
||||||
c.getDataInfo.asScala
|
c.getDataInfo.asScala
|
||||||
.filter(
|
.filter(
|
||||||
di =>
|
di =>
|
||||||
!di.getInferenceprovenance.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE)
|
di == null || di.getInferenceprovenance == null ||
|
||||||
&& !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE)
|
(!di.getInferenceprovenance.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE)
|
||||||
|
&& !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE))
|
||||||
)
|
)
|
||||||
.toList
|
.toList
|
||||||
.asJava
|
.asJava
|
||||||
|
@ -82,14 +79,14 @@ object SparkAppendContextCleanedGraph {
|
||||||
.write
|
.write
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.mode(SaveMode.Append)
|
.mode(SaveMode.Append)
|
||||||
.json(s"$targetPath/${entity}")
|
.json(s"$outputPath/${entity}")
|
||||||
} else {
|
} else {
|
||||||
spark
|
spark
|
||||||
.table(s"${hiveDbName}.${entity}")
|
.table(s"${hiveDbName}.${entity}")
|
||||||
.write
|
.write
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.mode(SaveMode.Append)
|
.mode(SaveMode.Append)
|
||||||
.json(s"$targetPath/${entity}")
|
.json(s"$outputPath/${entity}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "out",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the path to the graph data dump to read",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "hmu",
|
||||||
|
"paramLongName": "hiveMetastoreUris",
|
||||||
|
"paramDescription": "the hive metastore uris",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "db",
|
||||||
|
"paramLongName": "hiveDbName",
|
||||||
|
"paramDescription": "the input hive database identifier",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,26 @@
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>jobTracker</name>
|
||||||
|
<value>yarnRM</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nameNode</name>
|
||||||
|
<value>hdfs://nameservice1</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.use.system.libpath</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveMetastoreUris</name>
|
||||||
|
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveJdbcUrl</name>
|
||||||
|
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveDbName</name>
|
||||||
|
<value>openaire</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
|
@ -0,0 +1,63 @@
|
||||||
|
<workflow-app name="import_graph_as_hive_DB" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
|
||||||
|
<parameters>
|
||||||
|
<property>
|
||||||
|
<name>outputPath</name>
|
||||||
|
<description>the source path</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveDbName</name>
|
||||||
|
<description>the target hive database name</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveMetastoreUris</name>
|
||||||
|
<description>hive server metastore URIs</description>
|
||||||
|
</property>
|
||||||
|
<!-- General oozie workflow properties -->
|
||||||
|
<property>
|
||||||
|
<name>sparkClusterOpts</name>
|
||||||
|
<value>--conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
|
||||||
|
<description>spark cluster-wide options</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkResourceOpts</name>
|
||||||
|
<value>--executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
|
||||||
|
<description>spark resource options</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkApplicationOpts</name>
|
||||||
|
<value>--conf spark.sql.shuffle.partitions=1024</value>
|
||||||
|
<description>spark resource options</description>
|
||||||
|
</property>
|
||||||
|
</parameters>
|
||||||
|
|
||||||
|
<start to="merge_db_entities"/>
|
||||||
|
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
|
||||||
|
<action name="merge_db_entities">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Merge Oaf Entities from hive db</name>
|
||||||
|
<class>eu.dnetlib.dhp.incremental.SparkAppendContextCleanedGraph</class>
|
||||||
|
<jar>dhp-incremental-graph-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
|
${sparkClusterOpts}
|
||||||
|
${sparkResourceOpts}
|
||||||
|
${sparkApplicationOpts}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
|
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
|
||||||
|
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<end name="End"/>
|
||||||
|
|
||||||
|
</workflow-app>
|
|
@ -98,10 +98,10 @@
|
||||||
<arg>${nameNode}/${graphBasePath}/otherresearchproduct</arg>
|
<arg>${nameNode}/${graphBasePath}/otherresearchproduct</arg>
|
||||||
<arg>${nameNode}/${targetGraph}/otherresearchproduct</arg>
|
<arg>${nameNode}/${targetGraph}/otherresearchproduct</arg>
|
||||||
</distcp>
|
</distcp>
|
||||||
<ok to="copy_person"/>
|
<ok to="copy_project"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
<action name="copy_person">
|
<!-- <action name="copy_person">
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
<arg>${nameNode}/${graphBasePath}/person</arg>
|
<arg>${nameNode}/${graphBasePath}/person</arg>
|
||||||
<arg>${nameNode}/${targetGraph}/person</arg>
|
<arg>${nameNode}/${targetGraph}/person</arg>
|
||||||
|
@ -109,6 +109,7 @@
|
||||||
<ok to="copy_project"/>
|
<ok to="copy_project"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
-->
|
||||||
<action name="copy_project">
|
<action name="copy_project">
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
<arg>${nameNode}/${graphBasePath}/project</arg>
|
<arg>${nameNode}/${graphBasePath}/project</arg>
|
||||||
|
|
Loading…
Reference in New Issue