added resource profiles

This commit is contained in:
sandro.labruzzo 2024-11-18 16:19:55 +01:00
parent 9ae7152afb
commit 6e396f7e34
3 changed files with 56 additions and 13 deletions

View File

@ -1,5 +1,6 @@
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from spark_configurator import SparkResourceProfile
BUILD_PHASES = {
"raw": "01_graph_raw",
@ -15,6 +16,34 @@ BUILD_PHASES = {
"scholexplorer":"scholexplorer_graph"
}
SPARK_RESOURCES_PROFILES = {
"small": SparkResourceProfile(
driver_cores=1,
driver_memory="1G",
executor_cores=2,
executor_memory="2G",
executor_memoryOverhead="1G",
executor_instances=1
),
"medium": SparkResourceProfile(
driver_cores=1,
driver_memory="1G",
executor_cores=8,
executor_memory="8G",
executor_memoryOverhead="3G",
executor_instances=1
),
"large": SparkResourceProfile(
driver_cores=1,
driver_memory="1G",
executor_cores=8,
executor_memory="16G",
executor_memoryOverhead="8G",
executor_instances=1
)
}
def get_bucket_name(context: dict, hook: S3Hook, param_name: str):
bucket_name = context["params"][param_name]
if not bucket_name:

View File

@ -5,7 +5,7 @@ from airflow.decorators import dag
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from dag_utils import SPARK_RESOURCES_PROFILES
from spark_configurator import SparkConfigurator
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
@ -160,6 +160,7 @@ def results_deduplication_dag():
name="copyrelations-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCopyRelationsNoOpenorgs",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
profile=SPARK_RESOURCES_PROFILES['medium'],
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
"--dedupGraphPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}"

View File

@ -1,3 +1,15 @@
from dataclasses import dataclass
@dataclass
class SparkResourceProfile:
driver_cores: int
driver_memory: str
executor_cores:int
executor_memory:str
executor_memoryOverhead:str
executor_instances:str
class SparkConfigurator:
def __init__(self,
@ -8,12 +20,13 @@ class SparkConfigurator:
apiVersion=None,
namespace="dnet-spark-jobs",
image= "dnet-spark:1.0.0",
driver_cores=1,
driver_memory='1G',
executor_cores=8,
executor_memory="16G",
executor_memoryOverhead="8G",
executor_instances=1
profile: SparkResourceProfile = SparkResourceProfile(driver_cores=1,
driver_memory="1G",
executor_cores=8,
executor_memory="16G",
executor_memoryOverhead="8G",
executor_instances=1)
) -> None:
if apiVersion:
self.apiVersion = apiVersion
@ -46,12 +59,12 @@ class SparkConfigurator:
"spark.hadoop.fs.s3a.ssl.enabled": "false"
}
self.sparkResoruceConf= {
'driver_cores':driver_cores,
'driver_memory':driver_memory,
'executor_cores':executor_cores,
'executor_memory':executor_memory,
'executor_instances':executor_instances,
'memoryOverhead':executor_memoryOverhead
'driver_cores':profile.driver_cores,
'driver_memory':profile.driver_memory,
'executor_cores':profile.executor_cores,
'executor_memory':profile.executor_memory,
'executor_instances':profile.executor_instances,
'memoryOverhead':profile.executor_memoryOverhead
}