added oozie workflow to generate bioschema dataset on hdfs

This commit is contained in:
Enrico Ottonello 2022-03-02 16:58:10 +01:00
parent f28d7e3b9d
commit e57216a1fa
4 changed files with 248 additions and 10 deletions

View File

@ -0,0 +1,39 @@
[
{
"paramName": "s",
"paramLongName": "sourcePath",
"paramDescription": "the source mdstore path",
"paramRequired": false
},
{
"paramName": "t",
"paramLongName": "targetPath",
"paramDescription": "the path where storing final dataset",
"paramRequired": false
},
{
"paramName": "mo",
"paramLongName": "mdstoreOutputVersion",
"paramDescription": "the target mdstore path",
"paramRequired": false
},
{
"paramName": "m",
"paramLongName": "master",
"paramDescription": "the master name",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "isLookupUrl",
"paramDescription": "the isLookup URL",
"paramRequired": false
},
{
"paramName": "l",
"paramLongName": "exportLinks",
"paramDescription": "should export also links",
"paramRequired": false
}
]

View File

@ -0,0 +1,69 @@
<configuration>
<!-- OCEAN -->
<!-- <property>-->
<!-- <name>jobTracker</name>-->
<!-- <value>yarnRM</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>nameNode</name>-->
<!-- <value>hdfs://nameservice1</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>oozie.use.system.libpath</name>-->
<!-- <value>true</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>oozie.action.sharelib.for.spark</name>-->
<!-- <value>spark2</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>oozie.launcher.mapreduce.user.classpath.first</name>-->
<!-- <value>true</value>-->
<!-- </property>-->
<!-- GARR -->
<property>
<name>jobTracker</name>
<value>hadoop-rm3.garr-pa1.d4science.org:8032</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://hadoop-rm1.garr-pa1.d4science.org:8020</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://hadoop-edge3.garr-pa1.d4science.org:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://hadoop-rm2.garr-pa1.d4science.org:19888</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
</property>
</configuration>

View File

@ -0,0 +1,114 @@
<workflow-app name="generate_bioschema" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>mainPath</name>
<value>/data/bioschema/ped</value>
<description>the working path of Bioschema stores</description>
</property>
</parameters>
<start to="TransformJob"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="StartTransaction">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>NEW_VERSION</arg>
<arg>--mdStoreID</arg><arg>${mdStoreOutputId}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<capture-output/>
</java>
<ok to="TransformJob"/>
<error to="EndReadRollBack"/>
</action>
<action name="TransformJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>TransformJob</name>
<class>eu.dnetlib.dhp.bioschema.GenerateBioschemaDatasetSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${mainPath}/json-datacite</arg>
<arg>--targetPath</arg><arg>${mainPath}/dataset</arg>
<arg>--exportLinks</arg><arg>true</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="CommitVersion">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>COMMIT</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="EndReadRollBack">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>READ_UNLOCK</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<arg>--readMDStoreId</arg><arg>${wf:actionData('BeginRead')['mdStoreReadLockVersion']}</arg>
<capture-output/>
</java>
<ok to="RollBack"/>
<error to="Kill"/>
</action>
<action name="RollBack">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>ROLLBACK</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
</java>
<ok to="Kill"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -8,7 +8,7 @@ import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.slf4j.Logger
import org.slf4j.{Logger, LoggerFactory}
class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], log: Logger)
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
@ -22,18 +22,21 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l
log.info(s"SourcePath is '$sourcePath'")
val exportLinks = "true".equalsIgnoreCase(parser.get("exportLinks"))
log.info(s"exportLinks is '$exportLinks'")
val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'")
val mapper = new ObjectMapper()
val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
val outputBasePath = cleanedMdStoreVersion.getHdfsPath
log.info(s"outputBasePath is '$outputBasePath'")
val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH"
log.info(s"targetPath is '$targetPath'")
// val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
// log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'")
// val mapper = new ObjectMapper()
// val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
// val outputBasePath = cleanedMdStoreVersion.getHdfsPath
// log.info(s"outputBasePath is '$outputBasePath'")
// val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH"
// log.info(s"targetPath is '$targetPath'")
val targetPath = parser.get("targetPath")
generateBioschemaDataset(sourcePath, exportLinks, targetPath, spark)
reportTotalSize(targetPath, outputBasePath)
// reportTotalSize(targetPath, outputBasePath)
}
/** For working with MDStore we need to store in a file on hdfs the size of
@ -75,3 +78,16 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l
)
}
}
object GenerateBioschemaDatasetSpark {
val log: Logger = LoggerFactory.getLogger(GenerateBioschemaDatasetSpark.getClass)
def main(args: Array[String]): Unit = {
new GenerateBioschemaDatasetSpark(
"/eu/dnetlib/dhp/bioschema/generate_dataset_params.json",
args,
log
).initialize().run()
}
}