1
0
Fork 0

incremental/export_hive job has to clean propagation context

This commit is contained in:
Giambattista Bloisi 2025-01-13 12:10:03 +01:00
parent ffc7488257
commit 62bb82a8ef
3 changed files with 19 additions and 120 deletions

View File

@ -21,19 +21,15 @@ object SparkAppendContextCleanedGraph {
val parser = new ArgumentApplicationParser( val parser = new ArgumentApplicationParser(
IOUtils.toString( IOUtils.toString(
getClass.getResourceAsStream( getClass.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json" "/eu/dnetlib/dhp/oa/graph/incremental/export_hive/append_context_cleaned_graph.json"
) )
) )
) )
parser.parseArgument(args) parser.parseArgument(args)
conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris")) conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"))
val graphBasePath = parser.get("graphBasePath") val outputPath = parser.get("outputPath")
log.info(s"graphBasePath -> $graphBasePath") log.info(s"outputPath -> $outputPath")
val relationPath = parser.get("relationPath")
log.info(s"relationPath -> $relationPath")
val targetPath = parser.get("targetGraph")
log.info(s"targetGraph -> $targetPath")
val hiveDbName = parser.get("hiveDbName") val hiveDbName = parser.get("hiveDbName")
log.info(s"hiveDbName -> $hiveDbName") log.info(s"hiveDbName -> $hiveDbName")
@ -46,7 +42,7 @@ object SparkAppendContextCleanedGraph {
.appName(getClass.getSimpleName) .appName(getClass.getSimpleName)
.getOrCreate() .getOrCreate()
for ((entity, clazz) <- ModelSupport.oafTypes.asScala) { for ((entity, clazz) <- ModelSupport.oafTypes.asScala.filter(t => !Seq("datasource", "organization", "person", "project").contains(t._1))) {
if (classOf[OafEntity].isAssignableFrom(clazz)) { if (classOf[OafEntity].isAssignableFrom(clazz)) {
val classEnc: Encoder[Oaf] = Encoders.bean(clazz).asInstanceOf[Encoder[Oaf]] val classEnc: Encoder[Oaf] = Encoders.bean(clazz).asInstanceOf[Encoder[Oaf]]
@ -63,8 +59,9 @@ object SparkAppendContextCleanedGraph {
c.getDataInfo.asScala c.getDataInfo.asScala
.filter( .filter(
di => di =>
!di.getInferenceprovenance.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE) di == null || di.getInferenceprovenance == null ||
&& !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE) (!di.getInferenceprovenance.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE)
&& !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE))
) )
.toList .toList
.asJava .asJava
@ -82,14 +79,14 @@ object SparkAppendContextCleanedGraph {
.write .write
.option("compression", "gzip") .option("compression", "gzip")
.mode(SaveMode.Append) .mode(SaveMode.Append)
.json(s"$targetPath/${entity}") .json(s"$outputPath/${entity}")
} else { } else {
spark spark
.table(s"${hiveDbName}.${entity}") .table(s"${hiveDbName}.${entity}")
.write .write
.option("compression", "gzip") .option("compression", "gzip")
.mode(SaveMode.Append) .mode(SaveMode.Append)
.json(s"$targetPath/${entity}") .json(s"$outputPath/${entity}")
} }
} }
} }

View File

@ -31,127 +31,32 @@
</property> </property>
</parameters> </parameters>
<start to="fork_export"/> <start to="merge_db_entities"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<fork name="fork_export"> <action name="merge_db_entities">
<path start="export_publication"/>
<path start="export_dataset"/>
<path start="export_otherresearchproduct"/>
<path start="export_software"/>
<path start="export_relation"/>
</fork>
<action name="export_publication">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Merge table publication</name> <name>Merge Oaf Entities from hive db</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob</class> <class>eu.dnetlib.dhp.incremental.SparkAppendContextCleanedGraph</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-incremental-graph-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts} ${sparkClusterOpts}
${sparkResourceOpts} ${sparkResourceOpts}
${sparkApplicationOpts} ${sparkApplicationOpts}
</spark-opts> </spark-opts>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--mode</arg><arg>append</arg> <arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--hiveTableName</arg><arg>${hiveDbName}.publication</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg> <arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="join_export"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="export_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge table dataset</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
<arg>--mode</arg><arg>append</arg>
<arg>--hiveTableName</arg><arg>${hiveDbName}.dataset</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="join_export"/>
<error to="Kill"/>
</action>
<action name="export_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge table otherresearchproduct</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
<arg>--mode</arg><arg>append</arg>
<arg>--hiveTableName</arg><arg>${hiveDbName}.otherresearchproduct</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="join_export"/>
<error to="Kill"/>
</action>
<action name="export_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge table software</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
<arg>--mode</arg><arg>append</arg>
<arg>--hiveTableName</arg><arg>${hiveDbName}.software</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="join_export"/>
<error to="Kill"/>
</action>
<action name="export_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge table relation</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--outputPath</arg><arg>${outputPath}/relation</arg>
<arg>--mode</arg><arg>append</arg>
<arg>--hiveTableName</arg><arg>${hiveDbName}.relation</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="join_export"/>
<error to="Kill"/>
</action>
<join name="join_export" to="End"/>
<end name="End"/> <end name="End"/>

View File

@ -15,13 +15,10 @@
<name>hiveDbName</name> <name>hiveDbName</name>
<description>hive database containing last generated graph</description> <description>hive database containing last generated graph</description>
</property> </property>
<property>
<name>action</name>
</property>
<!-- General oozie workflow properties --> <!-- General oozie workflow properties -->
<property> <property>
<name>sparkClusterOpts</name> <name>sparkClusterOpts</name>
<value>--conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value> <value>--conf spark.network.timeout=600 --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
<description>spark cluster-wide options</description> <description>spark cluster-wide options</description>
</property> </property>
<property> <property>
@ -56,7 +53,7 @@
${sparkApplicationOpts} ${sparkApplicationOpts}
</spark-opts> </spark-opts>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg> <arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--sql</arg><arg>eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/${action}.sql</arg> <arg>--sql</arg><arg>eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/migration.sql</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg> <arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
</spark> </spark>