changed the input params and the workflow definition to tackle the Result as all result product produced

This commit is contained in:
Miriam Baglioni 2020-06-18 11:25:05 +02:00
parent 8211cbb9fe
commit e8b3e972f2
4 changed files with 135 additions and 144 deletions

View File

@ -36,18 +36,6 @@
"paramLongName":"resultTableName",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
},
{
"paramName":"dn",
"paramLongName":"dumpTableName",
"paramDescription": "the name of the corresondent dump element ",
"paramRequired": true
},
{
"paramName":"rt",
"paramLongName":"resultType",
"paramDescription": "the name of the corresondent dump element ",
"paramRequired": false
}
]

View File

@ -121,9 +121,7 @@
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--resultType</arg><arg>publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--dumpTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/publication</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
@ -149,9 +147,7 @@
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultType</arg><arg>dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--dumpTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/dataset</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
@ -177,9 +173,7 @@
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultType</arg><arg>otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--dumpTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
@ -205,9 +199,7 @@
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultType</arg><arg>software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--dumpTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/software</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
@ -267,7 +259,6 @@
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/ext/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.Publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
@ -293,7 +284,6 @@
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/ext/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.Dataset</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
@ -318,7 +308,6 @@
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/ext/orp</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.OtherResearchProduct</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
@ -343,128 +332,154 @@
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/software</arg>
<arg>--outputPath</arg><arg>${workingDir}/ext/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.Software</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
<error to="Kill"/>
</action>
<join name="join_extend" to="splitForCommunities"/>
<join name="join_extend" to="fork_splitForCommunities"/>
<action name="splitForCommunities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Split dumped result for community</name>
<class>eu.dnetlib.dhp.oa.graph.dump.SparkSplitForCommunity</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/ext</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<!-- <arg>&#45;&#45;resultTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.Publication</arg>-->
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<!-- <join name="join_extend" to="fork_splitForCommunities"/>-->
<fork name="fork_splitForCommunities">
<path start="split_publication"/>
<path start="split_dataset"/>
<path start="split_orp"/>
<path start="split_software"/>
</fork>
<!-- <fork name="fork_splitForCommunities">-->
<!-- <path start="split_publication"/>-->
<!-- <path start="split_dataset"/>-->
<!-- <path start="split_orp"/>-->
<!-- <path start="split_software"/>-->
<!-- </fork>-->
<action name="split_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Split dumped result for community</name>
<class>eu.dnetlib.dhp.oa.graph.dump.SparkSplitForCommunity</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/ext/publication</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.Publication</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
<ok to="join_split"/>
<error to="Kill"/>
</action>
<!-- <action name="split_publication">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <master>yarn</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>Split dumped result for community</name>-->
<!-- <class>eu.dnetlib.dhp.oa.graph.dump.SparkSplitForCommunity</class>-->
<!-- <jar>dhp-graph-mapper-${projectVersion}.jar</jar>-->
<!-- <spark-opts>-->
<!-- &#45;&#45;executor-memory=${sparkExecutorMemory}-->
<!-- &#45;&#45;executor-cores=${sparkExecutorCores}-->
<!-- &#45;&#45;driver-memory=${sparkDriverMemory}-->
<!-- &#45;&#45;conf spark.extraListeners=${spark2ExtraListeners}-->
<!-- &#45;&#45;conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
<!-- &#45;&#45;conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
<!-- &#45;&#45;conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
<!-- &#45;&#45;conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}-->
<!-- </spark-opts>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${workingDir}/ext/publication</arg>-->
<!-- <arg>&#45;&#45;outputPath</arg><arg>${outputPath}</arg>-->
<!-- <arg>&#45;&#45;resultTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.Publication</arg>-->
<!-- <arg>&#45;&#45;isLookUpUrl</arg><arg>${isLookUpUrl}</arg>-->
<!-- </spark>-->
<!-- <ok to="join_split"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<action name="split_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Split dumped result for community</name>
<class>eu.dnetlib.dhp.oa.graph.dump.SparkSplitForCommunity</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/ext/dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.Dataset</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
<ok to="join_split"/>
<error to="Kill"/>
</action>
<action name="split_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Split dumped result for community</name>
<class>eu.dnetlib.dhp.oa.graph.dump.SparkSplitForCommunity</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/ext/orp</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.OtherResearchProduct</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
<ok to="join_split"/>
<error to="Kill"/>
</action>
<action name="split_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Split dumped result for community</name>
<class>eu.dnetlib.dhp.oa.graph.dump.SparkSplitForCommunity</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/ext/software</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.Software</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
<ok to="join_split"/>
<error to="Kill"/>
</action>
<!-- <action name="split_dataset">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <master>yarn</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>Split dumped result for community</name>-->
<!-- <class>eu.dnetlib.dhp.oa.graph.dump.SparkSplitForCommunity</class>-->
<!-- <jar>dhp-graph-mapper-${projectVersion}.jar</jar>-->
<!-- <spark-opts>-->
<!-- &#45;&#45;executor-memory=${sparkExecutorMemory}-->
<!-- &#45;&#45;executor-cores=${sparkExecutorCores}-->
<!-- &#45;&#45;driver-memory=${sparkDriverMemory}-->
<!-- &#45;&#45;conf spark.extraListeners=${spark2ExtraListeners}-->
<!-- &#45;&#45;conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
<!-- &#45;&#45;conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
<!-- &#45;&#45;conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
<!-- &#45;&#45;conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}-->
<!-- </spark-opts>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${workingDir}/ext/dataset</arg>-->
<!-- <arg>&#45;&#45;outputPath</arg><arg>${outputPath}</arg>-->
<!-- <arg>&#45;&#45;resultTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.Dataset</arg>-->
<!-- <arg>&#45;&#45;isLookUpUrl</arg><arg>${isLookUpUrl}</arg>-->
<!-- </spark>-->
<!-- <ok to="join_split"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<!-- <action name="split_orp">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <master>yarn</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>Split dumped result for community</name>-->
<!-- <class>eu.dnetlib.dhp.oa.graph.dump.SparkSplitForCommunity</class>-->
<!-- <jar>dhp-graph-mapper-${projectVersion}.jar</jar>-->
<!-- <spark-opts>-->
<!-- &#45;&#45;executor-memory=${sparkExecutorMemory}-->
<!-- &#45;&#45;executor-cores=${sparkExecutorCores}-->
<!-- &#45;&#45;driver-memory=${sparkDriverMemory}-->
<!-- &#45;&#45;conf spark.extraListeners=${spark2ExtraListeners}-->
<!-- &#45;&#45;conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
<!-- &#45;&#45;conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
<!-- &#45;&#45;conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
<!-- &#45;&#45;conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}-->
<!-- </spark-opts>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${workingDir}/ext/orp</arg>-->
<!-- <arg>&#45;&#45;outputPath</arg><arg>${outputPath}</arg>-->
<!-- <arg>&#45;&#45;resultTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.OtherResearchProduct</arg>-->
<!-- <arg>&#45;&#45;isLookUpUrl</arg><arg>${isLookUpUrl}</arg>-->
<!-- </spark>-->
<!-- <ok to="join_split"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<!-- <action name="split_software">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <master>yarn</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>Split dumped result for community</name>-->
<!-- <class>eu.dnetlib.dhp.oa.graph.dump.SparkSplitForCommunity</class>-->
<!-- <jar>dhp-graph-mapper-${projectVersion}.jar</jar>-->
<!-- <spark-opts>-->
<!-- &#45;&#45;executor-memory=${sparkExecutorMemory}-->
<!-- &#45;&#45;executor-cores=${sparkExecutorCores}-->
<!-- &#45;&#45;driver-memory=${sparkDriverMemory}-->
<!-- &#45;&#45;conf spark.extraListeners=${spark2ExtraListeners}-->
<!-- &#45;&#45;conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
<!-- &#45;&#45;conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
<!-- &#45;&#45;conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
<!-- &#45;&#45;conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}-->
<!-- </spark-opts>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${workingDir}/ext/software</arg>-->
<!-- <arg>&#45;&#45;outputPath</arg><arg>${outputPath}</arg>-->
<!-- <arg>&#45;&#45;resultTableName</arg><arg>eu.dnetlib.dhp.schema.dump.oaf.Software</arg>-->
<!-- <arg>&#45;&#45;isLookUpUrl</arg><arg>${isLookUpUrl}</arg>-->
<!-- </spark>-->
<!-- <ok to="join_split"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<!-- <join name="join_split" to="loadInZenodo"/>-->
<join name="join_split" to="End"/>
<!-- <join name="join_split" to="End"/>-->
<!-- <action name="loadInZenodo">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->

View File

@ -19,12 +19,6 @@
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName":"tn",
"paramLongName":"resultTableName",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
},
{
"paramName": "pip",
"paramLongName": "preparedInfoPath",

View File

@ -24,12 +24,6 @@
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName":"tn",
"paramLongName":"resultTableName",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
},
{
"paramName":"map",
"paramLongName":"communityMap",