diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/generate_dataset_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/generate_dataset_params.json new file mode 100644 index 000000000..66f080000 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/generate_dataset_params.json @@ -0,0 +1,39 @@ +[ + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the source mdstore path", + "paramRequired": false + }, + { + "paramName": "t", + "paramLongName": "targetPath", + "paramDescription": "the path where storing final dataset", + "paramRequired": false + }, + { + "paramName": "mo", + "paramLongName": "mdstoreOutputVersion", + "paramDescription": "the target mdstore path", + "paramRequired": false + }, + { + "paramName": "m", + "paramLongName": "master", + "paramDescription": "the master name", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "isLookupUrl", + "paramDescription": "the isLookup URL", + "paramRequired": false + }, + { + "paramName": "l", + "paramLongName": "exportLinks", + "paramDescription": "should export also links", + "paramRequired": false + } + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/oozie_app/config-default.xml new file mode 100644 index 000000000..c6b281604 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/oozie_app/config-default.xml @@ -0,0 +1,69 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + jobTracker + hadoop-rm3.garr-pa1.d4science.org:8032 + + + nameNode + hdfs://hadoop-rm1.garr-pa1.d4science.org:8020 + + + hive_metastore_uris + thrift://hadoop-edge3.garr-pa1.d4science.org:9083 + + + spark2YarnHistoryServerAddress + http://hadoop-rm2.garr-pa1.d4science.org:19888 + + + oozie.launcher.mapreduce.user.classpath.first + true + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + spark2ExtraListeners + "com.cloudera.spark.lineage.NavigatorAppListener" + + + spark2SqlQueryExecutionListeners + "com.cloudera.spark.lineage.NavigatorQueryListener" + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/oozie_app/workflow.xml new file mode 100644 index 000000000..88beed396 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/oozie_app/workflow.xml @@ -0,0 +1,114 @@ + + + + mainPath + /data/bioschema/ped + the working path of Bioschema stores + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionNEW_VERSION + --mdStoreID${mdStoreOutputId} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + + + yarn-cluster + cluster + TransformJob + eu.dnetlib.dhp.bioschema.GenerateBioschemaDatasetSpark + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${mainPath}/json-datacite + --targetPath${mainPath}/dataset + --exportLinkstrue + --masteryarn-cluster + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionCOMMIT + --namenode${nameNode} + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionREAD_UNLOCK + --mdStoreManagerURI${mdStoreManagerURI} + --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']} + + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionROLLBACK + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala index 73ea0e374..857d57b08 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala @@ -8,7 +8,7 @@ import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile import org.apache.spark.sql.{Encoder, Encoders, SparkSession} -import org.slf4j.Logger +import org.slf4j.{Logger, LoggerFactory} class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], log: Logger) extends AbstractScalaApplication(propertyPath, args, log: Logger) { @@ -22,18 +22,21 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l log.info(s"SourcePath is '$sourcePath'") val exportLinks = "true".equalsIgnoreCase(parser.get("exportLinks")) log.info(s"exportLinks is '$exportLinks'") - val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") - log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'") - val mapper = new ObjectMapper() - val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) - val outputBasePath = cleanedMdStoreVersion.getHdfsPath - log.info(s"outputBasePath is '$outputBasePath'") - val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH" - log.info(s"targetPath is '$targetPath'") + +// val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") +// log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'") +// val mapper = new ObjectMapper() +// val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) +// val outputBasePath = cleanedMdStoreVersion.getHdfsPath +// log.info(s"outputBasePath is '$outputBasePath'") +// val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH" +// log.info(s"targetPath is '$targetPath'") + + val targetPath = parser.get("targetPath") generateBioschemaDataset(sourcePath, exportLinks, targetPath, spark) - reportTotalSize(targetPath, outputBasePath) +// reportTotalSize(targetPath, outputBasePath) } /** For working with MDStore we need to store in a file on hdfs the size of @@ -75,3 +78,16 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l ) } } + +object GenerateBioschemaDatasetSpark { + + val log: Logger = LoggerFactory.getLogger(GenerateBioschemaDatasetSpark.getClass) + + def main(args: Array[String]): Unit = { + new GenerateBioschemaDatasetSpark( + "/eu/dnetlib/dhp/bioschema/generate_dataset_params.json", + args, + log + ).initialize().run() + } +} \ No newline at end of file