diff --git a/workflow/dnet/dag_utils.py b/workflow/dnet/dag_utils.py index 6f4dc94..2f3c77c 100644 --- a/workflow/dnet/dag_utils.py +++ b/workflow/dnet/dag_utils.py @@ -1,5 +1,6 @@ from airflow.hooks.base import BaseHook from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from spark_configurator import SparkResourceProfile BUILD_PHASES = { "raw": "01_graph_raw", @@ -15,6 +16,34 @@ BUILD_PHASES = { "scholexplorer":"scholexplorer_graph" } + +SPARK_RESOURCES_PROFILES = { + "small": SparkResourceProfile( + driver_cores=1, + driver_memory="1G", + executor_cores=2, + executor_memory="2G", + executor_memoryOverhead="1G", + executor_instances=1 + ), + "medium": SparkResourceProfile( + driver_cores=1, + driver_memory="1G", + executor_cores=8, + executor_memory="8G", + executor_memoryOverhead="3G", + executor_instances=1 + ), + "large": SparkResourceProfile( + driver_cores=1, + driver_memory="1G", + executor_cores=8, + executor_memory="16G", + executor_memoryOverhead="8G", + executor_instances=1 + ) +} + def get_bucket_name(context: dict, hook: S3Hook, param_name: str): bucket_name = context["params"][param_name] if not bucket_name: diff --git a/workflow/dnet/dedup.py b/workflow/dnet/dedup.py index e2b8050..06a2125 100644 --- a/workflow/dnet/dedup.py +++ b/workflow/dnet/dedup.py @@ -5,7 +5,7 @@ from airflow.decorators import dag from airflow.models.baseoperator import chain from airflow.models.param import Param from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator - +from dag_utils import SPARK_RESOURCES_PROFILES from spark_configurator import SparkConfigurator EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) @@ -160,6 +160,7 @@ def results_deduplication_dag(): name="copyrelations-{{ ds }}-{{ task_instance.try_number }}", mainClass="eu.dnetlib.dhp.oa.dedup.SparkCopyRelationsNoOpenorgs", jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar', + profile=SPARK_RESOURCES_PROFILES['medium'], arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}", "--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}", "--dedupGraphPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}" diff --git a/workflow/dnet/spark_configurator.py b/workflow/dnet/spark_configurator.py index 5ee534a..c7e9184 100644 --- a/workflow/dnet/spark_configurator.py +++ b/workflow/dnet/spark_configurator.py @@ -1,3 +1,15 @@ +from dataclasses import dataclass + + +@dataclass +class SparkResourceProfile: + driver_cores: int + driver_memory: str + executor_cores:int + executor_memory:str + executor_memoryOverhead:str + executor_instances:str + class SparkConfigurator: def __init__(self, @@ -8,12 +20,13 @@ class SparkConfigurator: apiVersion=None, namespace="dnet-spark-jobs", image= "dnet-spark:1.0.0", - driver_cores=1, - driver_memory='1G', - executor_cores=8, - executor_memory="16G", - executor_memoryOverhead="8G", - executor_instances=1 + profile: SparkResourceProfile = SparkResourceProfile(driver_cores=1, + driver_memory="1G", + executor_cores=8, + executor_memory="16G", + executor_memoryOverhead="8G", + executor_instances=1) + ) -> None: if apiVersion: self.apiVersion = apiVersion @@ -46,12 +59,12 @@ class SparkConfigurator: "spark.hadoop.fs.s3a.ssl.enabled": "false" } self.sparkResoruceConf= { - 'driver_cores':driver_cores, - 'driver_memory':driver_memory, - 'executor_cores':executor_cores, - 'executor_memory':executor_memory, - 'executor_instances':executor_instances, - 'memoryOverhead':executor_memoryOverhead + 'driver_cores':profile.driver_cores, + 'driver_memory':profile.driver_memory, + 'executor_cores':profile.executor_cores, + 'executor_memory':profile.executor_memory, + 'executor_instances':profile.executor_instances, + 'memoryOverhead':profile.executor_memoryOverhead }