added workflow and last step of collection and save

This commit is contained in:
Miriam Baglioni 2020-12-01 14:30:56 +01:00
parent 45d06c45c7
commit 62ff4999e3
2 changed files with 94 additions and 98 deletions

View File

@ -16,17 +16,5 @@
"paramLongName": "outputPath", "paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet", "paramDescription": "the path of the new ActionSet",
"paramRequired": true "paramRequired": true
}, }
{
"paramName": "rtn",
"paramLongName": "resultTableName",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
},
{
"paramName": "bsp",
"paramLongName": "bipScorePath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
}
] ]

View File

@ -1,22 +1,17 @@
<workflow-app name="H2020Programme" xmlns="uri:oozie:workflow:0.5"> <workflow-app name="BipFinderScore" xmlns="uri:oozie:workflow:0.5">
<parameters> <parameters>
<property> <property>
<name>projectFileURL</name> <name>inputPath</name>
<description>the url where to get the projects file</description> <description>the input path of the resources to be extended</description>
</property> </property>
<property> <property>
<name>programmeFileURL</name> <name>bipScorePath</name>
<description>the url where to get the programme file</description> <description>the path where to find the bipFinder scores</description>
</property>
<property>
<name>topicFileURL</name>
<description>the url where to get the topic file</description>
</property> </property>
<property> <property>
<name>outputPath</name> <name>outputPath</name>
<description>path where to store the action set</description> <description>the path where to store the actionset</description>
</property> </property>
</parameters> </parameters>
@ -31,128 +26,141 @@
<delete path='${workingDir}'/> <delete path='${workingDir}'/>
<mkdir path='${workingDir}'/> <mkdir path='${workingDir}'/>
</fs> </fs>
<ok to="get_project_file"/> <ok to="atomicactions"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="get_project_file"> <fork name="atomicactions">
<java> <path start="atomicactions_publication"/>
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ReadCSV</main-class> <path start="atomicactions_dataset"/>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg> <path start="atomicactions_orp"/>
<arg>--fileURL</arg><arg>${projectFileURL}</arg> <path start="atomicactions_software"/>
<arg>--hdfsPath</arg><arg>${workingDir}/projects</arg> </fork>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.CSVProject</arg>
</java>
<ok to="get_programme_file"/>
<error to="Kill"/>
</action>
<action name="get_programme_file"> <action name="atomicactions_publication">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ReadCSV</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--fileURL</arg><arg>${programmeFileURL}</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/programme</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.CSVProgramme</arg>
</java>
<ok to="get_topic_file"/>
<error to="Kill"/>
</action>
<action name="get_topic_file">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ReadExcel</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--fileURL</arg><arg>${topicFileURL}</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/topic</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.EXCELTopic</arg>
</java>
<ok to="read_projects"/>
<error to="Kill"/>
</action>
<action name="read_projects">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.project.ReadProjectsFromDB</main-class>
<arg>--hdfsPath</arg><arg>${workingDir}/dbProjects</arg>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
</java>
<ok to="prepare_programme"/>
<error to="Kill"/>
</action>
<action name="prepare_programme">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>PrepareProgramme</name> <name>Produces the atomic action with the bip finder scores for publications</name>
<class>eu.dnetlib.dhp.actionmanager.project.PrepareProgramme</class> <class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar> <jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts> </spark-opts>
<arg>--programmePath</arg><arg>${workingDir}/programme</arg> <arg>--inputPath</arg><arg>${inputPath}/publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedProgramme</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/publication</arg>
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
</spark> </spark>
<ok to="prepare_project"/> <ok to="join_aa"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="prepare_project"> <action name="atomicactions_dataset">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>PrepareProjects</name> <name>Produces the atomic action with the bip finder scores for datasets</name>
<class>eu.dnetlib.dhp.actionmanager.project.PrepareProjects</class> <class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar> <jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts> </spark-opts>
<arg>--projectPath</arg><arg>${workingDir}/projects</arg> <arg>--inputPath</arg><arg>${inputPath}/dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedProjects</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--dbProjectPath</arg><arg>${workingDir}/dbProjects</arg> <arg>--outputPath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
</spark> </spark>
<ok to="create_updates"/> <ok to="join_aa"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="create_updates"> <action name="atomicactions_orp">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>ProjectProgrammeAS</name> <name>Produces the atomic action with the bip finder scores for orp</name>
<class>eu.dnetlib.dhp.actionmanager.project.SparkAtomicActionJob</class> <class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar> <jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts> </spark-opts>
<arg>--projectPath</arg><arg>${workingDir}/preparedProjects</arg> <arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg>
<arg>--programmePath</arg><arg>${workingDir}/preparedProgramme</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--topicPath</arg><arg>${workingDir}/topic</arg> <arg>--outputPath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
</spark>
<ok to="join_aa"/>
<error to="Kill"/>
</action>
<action name="atomicactions_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Produces the atomic action with the bip finder scores for software</name>
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/software</arg>
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
</spark>
<ok to="join_aa"/>
<error to="Kill"/>
</action>
<join name="join_aa" to="collectandsave"/>
<action name="collectandsave">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>saves all the aa produced for the several types of results in the as output path</name>
<class>eu.dnetlib.dhp.actionmanager.bipfinder.CollectAndSave</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>