1
0
Fork 0

Changes required to run cleaning workflow with spark 3.4

This commit is contained in:
Giambattista Bloisi 2025-01-10 13:20:05 +01:00
parent 759e060451
commit ffc7488257
3 changed files with 56 additions and 88 deletions

View File

@ -128,6 +128,12 @@
<dependency> <dependency>
<groupId>eu.dnetlib</groupId> <groupId>eu.dnetlib</groupId>
<artifactId>cnr-rmi-api</artifactId> <artifactId>cnr-rmi-api</artifactId>
<exclusions>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -11,6 +11,10 @@
<name>oozie.use.system.libpath</name> <name>oozie.use.system.libpath</name>
<value>true</value> <value>true</value>
</property> </property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
<property> <property>
<name>oozie.action.sharelib.for.spark</name> <name>oozie.action.sharelib.for.spark</name>
<value>spark2</value> <value>spark2</value>

View File

@ -81,6 +81,22 @@
<name>spark2EventLogDir</name> <name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description> <description>spark 2.* event log dir location</description>
</property> </property>
<!-- General oozie workflow properties -->
<property>
<name>sparkClusterOpts</name>
<value>--conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
<description>spark cluster-wide options</description>
</property>
<property>
<name>sparkResourceOpts</name>
<value>--executor-memory=8G --conf spark.executor.memoryOverhead=6G --executor-cores=6 --driver-memory=9G --driver-cores=4</value>
<description>spark resource options</description>
</property>
<property>
<name>sparkApplicationOpts</name>
<value>--conf spark.sql.shuffle.partitions=1000</value>
<description>spark resource options</description>
</property>
</parameters> </parameters>
<start to="prepare_info"/> <start to="prepare_info"/>
@ -102,13 +118,9 @@
<class>eu.dnetlib.dhp.oa.graph.clean.GetDatasourceFromCountry</class> <class>eu.dnetlib.dhp.oa.graph.clean.GetDatasourceFromCountry</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} ${sparkClusterOpts}
--executor-memory=${sparkExecutorMemory} ${sparkResourceOpts}
--driver-memory=${sparkDriverMemory} ${sparkApplicationOpts}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=10000 --conf spark.sql.shuffle.partitions=10000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}</arg> <arg>--inputPath</arg><arg>${graphInputPath}</arg>
@ -154,15 +166,9 @@
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} ${sparkClusterOpts}
--executor-memory=${sparkExecutorMemory} ${sparkResourceOpts}
--driver-memory=${sparkDriverMemory} ${sparkApplicationOpts}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.autoBroadcastJoinThreshold=-1
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/publication</arg> <arg>--inputPath</arg><arg>${graphInputPath}/publication</arg>
@ -190,15 +196,9 @@
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} ${sparkClusterOpts}
--executor-memory=${sparkExecutorMemory} ${sparkResourceOpts}
--driver-memory=${sparkDriverMemory} ${sparkApplicationOpts}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.autoBroadcastJoinThreshold=-1
--conf spark.sql.shuffle.partitions=8000 --conf spark.sql.shuffle.partitions=8000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/dataset</arg> <arg>--inputPath</arg><arg>${graphInputPath}/dataset</arg>
@ -226,15 +226,9 @@
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} ${sparkClusterOpts}
--executor-memory=${sparkExecutorMemory} ${sparkResourceOpts}
--driver-memory=${sparkDriverMemory} ${sparkApplicationOpts}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.autoBroadcastJoinThreshold=-1
--conf spark.sql.shuffle.partitions=5000 --conf spark.sql.shuffle.partitions=5000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/otherresearchproduct</arg> <arg>--inputPath</arg><arg>${graphInputPath}/otherresearchproduct</arg>
@ -262,15 +256,9 @@
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} ${sparkClusterOpts}
--executor-memory=${sparkExecutorMemory} ${sparkResourceOpts}
--driver-memory=${sparkDriverMemory} ${sparkApplicationOpts}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.autoBroadcastJoinThreshold=-1
--conf spark.sql.shuffle.partitions=2000 --conf spark.sql.shuffle.partitions=2000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/software</arg> <arg>--inputPath</arg><arg>${graphInputPath}/software</arg>
@ -298,15 +286,9 @@
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} ${sparkClusterOpts}
--executor-memory=${sparkExecutorMemory} ${sparkResourceOpts}
--driver-memory=${sparkDriverMemory} ${sparkApplicationOpts}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.autoBroadcastJoinThreshold=-1
--conf spark.sql.shuffle.partitions=1000 --conf spark.sql.shuffle.partitions=1000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/datasource</arg> <arg>--inputPath</arg><arg>${graphInputPath}/datasource</arg>
@ -334,15 +316,9 @@
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} ${sparkClusterOpts}
--executor-memory=${sparkExecutorMemory} ${sparkResourceOpts}
--driver-memory=${sparkDriverMemory} ${sparkApplicationOpts}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.autoBroadcastJoinThreshold=-1
--conf spark.sql.shuffle.partitions=1000 --conf spark.sql.shuffle.partitions=1000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/organization</arg> <arg>--inputPath</arg><arg>${graphInputPath}/organization</arg>
@ -370,15 +346,9 @@
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} ${sparkClusterOpts}
--executor-memory=${sparkExecutorMemory} ${sparkResourceOpts}
--driver-memory=${sparkDriverMemory} ${sparkApplicationOpts}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.autoBroadcastJoinThreshold=-1
--conf spark.sql.shuffle.partitions=2000 --conf spark.sql.shuffle.partitions=2000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/project</arg> <arg>--inputPath</arg><arg>${graphInputPath}/project</arg>
@ -406,15 +376,9 @@
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} ${sparkClusterOpts}
--executor-memory=${sparkExecutorMemory} ${sparkResourceOpts}
--driver-memory=${sparkDriverMemory} ${sparkApplicationOpts}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.autoBroadcastJoinThreshold=-1
--conf spark.sql.shuffle.partitions=2000 --conf spark.sql.shuffle.partitions=2000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/person</arg> <arg>--inputPath</arg><arg>${graphInputPath}/person</arg>
@ -442,15 +406,9 @@
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} ${sparkClusterOpts}
--executor-memory=${sparkExecutorMemory} ${sparkResourceOpts}
--driver-memory=${sparkDriverMemory} ${sparkApplicationOpts}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.autoBroadcastJoinThreshold=-1
--conf spark.sql.shuffle.partitions=20000 --conf spark.sql.shuffle.partitions=20000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/relation</arg> <arg>--inputPath</arg><arg>${graphInputPath}/relation</arg>