1
0
Fork 0

workflow for the dump as sub workflows

This commit is contained in:
Miriam Baglioni 2021-07-13 18:06:44 +02:00
parent d70f8c96fd
commit c028feef4f
13 changed files with 1719 additions and 0 deletions

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,4 @@
## This is a classpath-based import file (this header is required)
dump_complete classpath eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/complete/oozie_app
dump_funder classpath eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/funder/oozie_app
dump_community classpath eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/community/oozie_app

View File

@ -0,0 +1,306 @@
<workflow-app name="dump_graph" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>singleDeposition</name>
<description>Indicates if it is a single community deposition</description>
</property>
<property>
<name>communityId</name>
<description>the id of the community to be dumped if a dump for a single community should be done</description>
</property>
<property>
<name>dumpType</name>
<description>the type of the dump one of {complete, community, funder}</description>
</property>
<property>
<name>onlyUpload</name>
<description>true if the dump is already done and should only be upload in zenodo</description>
</property>
<property>
<name>upload</name>
<description>true if the dump should be upload in zenodo</description>
</property>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>isLookUpUrl</name>
<description>the isLookup service endpoint</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
<property>
<name>resultAggregation</name>
<description>true if all the result type have to be dumped under result. false otherwise</description>
</property>
<property>
<name>accessToken</name>
<description>the access token used for the deposition in Zenodo</description>
</property>
<property>
<name>connectionUrl</name>
<description>the connection url for Zenodo</description>
</property>
<property>
<name>metadata</name>
<description> the metadata associated to the deposition</description>
</property>
<property>
<name>depositionType</name>
<description>the type of deposition we want to perform. "new" for brand new deposition, "version" for a new version of a published deposition (in this case the concept record id must be provided), "upload" to upload content to an open deposition for which we already have the deposition id (in this case the deposition id should be provided)</description>
</property>
<property>
<name>conceptRecordId</name>
<description>for new version, the id of the record for the old deposition</description>
</property>
<property>
<name>depositionId</name>
<description>the depositionId of a deposition open that has to be added content</description>
</property>
<property>
<name>organizationCommunityMap</name>
<description>the organization community map</description>
</property>
<property>
<name>hiveDbName</name>
<description>the target hive database name</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="only_upload"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<decision name="only_upload">
<switch>
<case to="send_zenodo">${wf:conf('onlyUpload') eq true}</case>
<default to="reset_outputpath"/>
</switch>
</decision>
<action name="reset_outputpath">
<fs>
<delete path="${outputPath}"/>
<mkdir path="${outputPath}"/>
</fs>
<ok to="save_community_map"/>
<error to="Kill"/>
</action>
<action name="save_community_map">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.SaveCommunityMap</main-class>
<arg>--outputPath</arg><arg>${workingDir}/communityMap</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--singleDeposition</arg><arg>${singleDeposition}</arg>
<arg>--communityId</arg><arg>${communityId}</arg>
</java>
<ok to="choose_dump"/>
<error to="Kill"/>
</action>
<decision name="choose_dump">
<switch>
<case to="dump_funder">${wf:conf('dumpType') eq "funder"}</case>
<case to="dump_community">${wf:conf('dumpType') eq "community"}</case>
<default to="dump_complete"/>
</switch>
</decision>
<!-- Sub-workflow which runs the dump for the complete graph -->
<action name="dump_complete">
<sub-workflow>
<app-path>${wf:appPath()}/dump_complete
</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>communityMapPath</name>
<value>${workingDir}/communityMap</value>
</property>
<property>
<name>outputPath</name>
<value>${workingDir}/tar</value>
</property>
<property>
<name>sourcePath</name>
<value>${sourcePath}</value>
</property>
<property>
<name>organizationCommunityMap</name>
<value>${organizationCommunityMap}</value>
</property>
<property>
<name>isLookUpUrl</name>
<value>${isLookUpUrl}</value>
</property>
<property>
<name>resultAggregation</name>
<value>${resultAggregation}</value>
</property>
</configuration>
</sub-workflow>
<ok to="make_archive" />
<error to="Kill" />
</action>
<!-- Sub-workflow which runs the dump for the complete graph -->
<action name="dump_community">
<sub-workflow>
<app-path>${wf:appPath()}/dump_community
</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>sourcePath</name>
<value>${sourcePath}</value>
</property>
<property>
<name>communityMapPath</name>
<value>${workingDir}/communityMap</value>
</property>
<property>
<name>outputPath</name>
<value>${workingDir}/tar</value>
</property>
</configuration>
</sub-workflow>
<ok to="make_archive" />
<error to="Kill" />
</action>
<action name="dump_funder">
<sub-workflow>
<app-path>${wf:appPath()}/dump_funder
</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>communityMapPath</name>
<value>${workingDir}/communityMap</value>
</property>
<property>
<name>outputPath</name>
<value>${workingDir}/tar</value>
</property>
<property>
<name>sourcePath</name>
<value>${sourcePath}</value>
</property>
<property>
<name>dumpType</name>
<value>${dumpType}</value>
</property>
</configuration>
</sub-workflow>
<ok to="make_archive" />
<error to="Kill" />
</action>
<action name="make_archive">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.MakeTar</main-class>
<arg>--hdfsPath</arg><arg>${outputPath}</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${workingDir}/tar</arg>
</java>
<ok to="should_upload"/>
<error to="Kill"/>
</action>
<decision name="should_upload">
<switch>
<case to="send_zenodo">${wf:conf('upload') eq true}</case>
<default to="End"/>
</switch>
</decision>
<action name="send_zenodo">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS</main-class>
<arg>--hdfsPath</arg><arg>${outputPath}</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--accessToken</arg><arg>${accessToken}</arg>
<arg>--connectionUrl</arg><arg>${connectionUrl}</arg>
<arg>--metadata</arg><arg>${metadata}</arg>
<arg>--conceptRecordId</arg><arg>${conceptRecordId}</arg>
<arg>--depositionType</arg><arg>${depositionType}</arg>
<arg>--depositionId</arg><arg>${depositionId}</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,347 @@
<workflow-app name="sub_dump_community_funder_results" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
<property>
<name>communityMapPath</name>
<description>the path to the community map</description>
</property>
<property>
<name>selectedResults</name>
<description>the path the the possible subset ot results to be dumped</description>
</property>
<property>
<name>hiveDbName</name>
<description>the target hive database name</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="fork_dump"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<fork name="fork_dump">
<path start="dump_publication"/>
<path start="dump_dataset"/>
<path start="dump_orp"/>
<path start="dump_software"/>
</fork>
<action name="dump_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table publication for community/funder related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${selectedResults}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/publication</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
<arg>--dumpType</arg><arg>${dumpType}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table dataset for community/funder related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${selectedResults}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/dataset</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table ORP for community related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${selectedResults}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/otherresearchproduct</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table software for community related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${selectedResults}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/software</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<join name="join_dump" to="prepareResultProject"/>
<action name="prepareResultProject">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Prepare association result subset of project info</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkPrepareResultProject</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="fork_extendWithProject"/>
<error to="Kill"/>
</action>
<fork name="fork_extendWithProject">
<path start="extend_publication"/>
<path start="extend_dataset"/>
<path start="extend_orp"/>
<path start="extend_software"/>
</fork>
<action name="extend_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend dumped publications with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/ext/publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
<error to="Kill"/>
</action>
<action name="extend_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend dumped dataset with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/ext/dataset</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
<error to="Kill"/>
</action>
<action name="extend_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend dumped ORP with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/ext/orp</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
<error to="Kill"/>
</action>
<action name="extend_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend dumped software with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/software</arg>
<arg>--outputPath</arg><arg>${outputPath}/ext/software</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
<error to="Kill"/>
</action>
<join name="join_extend" to="End"/>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,2 @@
## This is a classpath-based import file (this header is required)
dump_common classpath eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/commoncommunityfunder/oozie_app

View File

@ -0,0 +1,145 @@
<workflow-app name="sub_dump_community_products" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
<property>
<name>hiveDbName</name>
<description>the target hive database name</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="common_action_community_funder"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="common_action_community_funder">
<sub-workflow>
<app-path>${wf:appPath()}/dump_common
</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>sourcePath</name>
<value>${sourcePath}</value>
</property>
<property>
<name>selectedResults</name>
<value>${sourcePath}</value>
</property>
<property>
<name>communityMapPath</name>
<value>${workingDir}/communityMap</value>
</property>
<property>
<name>outputPath</name>
<value>${workingDir}</value>
</property>
</configuration>
</sub-workflow>
<ok to="splitForCommunities" />
<error to="Kill" />
</action>
<action name="splitForCommunities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Split dumped result for community</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkSplitForCommunity</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/ext</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,537 @@
<workflow-app name="sub-dump_complete" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
<property>
<name>resultAggregation</name>
<description>true if all the result type have to be dumped under result. false otherwise</description>
</property>
<property>
<name>organizationCommunityMap</name>
<description>the organization community map</description>
</property>
<property>
<name>hiveDbName</name>
<description>the target hive database name</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="fork_dump" />
<fork name="fork_dump">
<path start="dump_publication"/>
<path start="dump_dataset"/>
<path start="dump_orp"/>
<path start="dump_software"/>
<path start="dump_organization"/>
<path start="dump_project"/>
<path start="dump_datasource"/>
<path start="select_relation"/>
</fork>
<action name="dump_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table publication </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpEntitiesJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/publication</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table dataset </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpEntitiesJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/dataset</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table ORP </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpEntitiesJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/otherresearchproduct</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table software </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpEntitiesJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/software</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_organization">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table organization </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpEntitiesJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/organization</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--outputPath</arg><arg>${outputPath}/organization</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table project </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpEntitiesJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/project</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--outputPath</arg><arg>${outputPath}/project</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_datasource">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table datasource </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpEntitiesJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/datasource</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--outputPath</arg><arg>${outputPath}/datasource</arg>
<arg>--communityMapPath</arg><arg>${workingDir}/communityMap</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="select_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Select valid table relation </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkSelectValidRelationsJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/validrelation</arg>
</spark>
<ok to="dump_relation"/>
<error to="Kill"/>
</action>
<action name="dump_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table relation </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpRelationJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/validrelation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation/relation</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<join name="join_dump" to="fork_context"/>
<fork name="fork_context">
<path start="create_entities_fromcontext"/>
<path start="create_relation_fromcontext"/>
<path start="create_relation_fromorgs"/>
</fork>
<action name="create_entities_fromcontext">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.complete.CreateContextEntities</main-class>
<arg>--hdfsPath</arg><arg>${outputPath}/communities_infrastructures/communities_infrastructure.json.gz</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</java>
<ok to="join_context"/>
<error to="Kill"/>
</action>
<action name="create_relation_fromcontext">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.complete.CreateContextRelation</main-class>
<arg>--hdfsPath</arg><arg>${workingDir}/relation/context</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</java>
<ok to="join_context"/>
<error to="Kill"/>
</action>
<action name="create_relation_fromorgs">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table relation </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkOrganizationRelation</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation/contextOrg</arg>
<arg>--organizationCommunityMap</arg><arg>${organizationCommunityMap}</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_context"/>
<error to="Kill"/>
</action>
<join name="join_context" to="fork_extract_relations"/>
<fork name="fork_extract_relations">
<path start="rels_from_pubs"/>
<path start="rels_from_dats"/>
<path start="rels_from_orp"/>
<path start="rels_from_sw"/>
</fork>
<action name="rels_from_pubs">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extract Relations from publication </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkExtractRelationFromEntities</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation/publication</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_extract_relations"/>
<error to="Kill"/>
</action>
<action name="rels_from_dats">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table dataset </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkExtractRelationFromEntities</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation/dataset</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_extract_relations"/>
<error to="Kill"/>
</action>
<action name="rels_from_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table ORP </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkExtractRelationFromEntities</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation/orp</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_extract_relations"/>
<error to="Kill"/>
</action>
<action name="rels_from_sw">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table software </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkExtractRelationFromEntities</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation/software</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_extract_relations"/>
<error to="Kill"/>
</action>
<join name="join_extract_relations" to="collect_and_save"/>
<action name="collect_and_save">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Collect Results and Relations and put them in the right path </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkCollectAndSave</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--resultAggregation</arg><arg>${resultAggregation}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<kill name="Kill">
<message>Sub-workflow dump complete failed with error message ${wf:errorMessage()}
</message>
</kill>
<end name="End" />
</workflow-app>

View File

@ -0,0 +1,2 @@
## This is a classpath-based import file (this header is required)
dump_common classpath eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/commoncommunityfunder/oozie_app

View File

@ -0,0 +1,256 @@
<workflow-app name="sub_dump_funder_results" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
<property>
<name>hiveDbName</name>
<description>the target hive database name</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="fork_result_linked_to_projects"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<fork name="fork_result_linked_to_projects">
<path start="select_publication_linked_to_projects"/>
<path start="select_dataset_linked_to_projects"/>
<path start="select_orp_linked_to_project"/>
<path start="select_software_linked_to_projects"/>
</fork>
<action name="select_publication_linked_to_projects">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump funder results </name>
<class>eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkResultLinkedToProject</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/publication</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
</spark>
<ok to="join_link"/>
<error to="Kill"/>
</action>
<action name="select_dataset_linked_to_projects">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump funder results </name>
<class>eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkResultLinkedToProject</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/dataset</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
</spark>
<ok to="join_link"/>
<error to="Kill"/>
</action>
<action name="select_orp_linked_to_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump funder results </name>
<class>eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkResultLinkedToProject</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/otherresearchproduct</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
</spark>
<ok to="join_link"/>
<error to="Kill"/>
</action>
<action name="select_software_linked_to_projects">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump funder results </name>
<class>eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkResultLinkedToProject</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/software</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
</spark>
<ok to="join_link"/>
<error to="Kill"/>
</action>
<join name="join_link" to="common_action_community_funder"/>
<action name="common_action_community_funder">
<sub-workflow>
<app-path>${wf:appPath()}/dump_common
</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>sourcePath</name>
<value>${sourcePath}</value>
</property>
<property>
<name>selectedResults</name>
<value>${workingDir}/result</value>
</property>
<property>
<name>communityMapPath</name>
<value>${workingDir}/communityMap</value>
</property>
<property>
<name>outputPath</name>
<value>${workingDir}</value>
</property>
</configuration>
</sub-workflow>
<ok to="dump_funder_results" />
<error to="Kill" />
</action>
<action name="dump_funder_results">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump funder results </name>
<class>eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkDumpFunderResults</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/ext</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>