Compare commits
12 Commits
Author | SHA1 | Date |
---|---|---|
Giambattista Bloisi | f1e7bedfc6 | |
Giambattista Bloisi | 113a662ba8 | |
Giambattista Bloisi | 7d353702d1 | |
sandro.labruzzo | e1ef2438e1 | |
sandro.labruzzo | 8bbbbf51ae | |
sandro.labruzzo | 9682e964b3 | |
sandro.labruzzo | 4372ab9e1c | |
sandro.labruzzo | 961c24afcc | |
sandro.labruzzo | 9da33a9f00 | |
sandro.labruzzo | 6e396f7e34 | |
sandro.labruzzo | 9ae7152afb | |
sandro.labruzzo | 30f2b345d6 |
110
README.md
110
README.md
|
@ -1,30 +1,32 @@
|
|||
# Code Infrastructure Lab
|
||||
|
||||
This module defines configurations for creating a Kubernetes cluster for generating the OpenAIRE Research Graph.
|
||||
This project allows the user to install a local Kubernetes cluster running services to test and develop locally the generation of the OpenAIRE graph.
|
||||
|
||||
## Cluster definition
|
||||
|
||||
The Kubernetes cluster will include some essential services for testing OpenAIRE Graph generation:
|
||||
|
||||
- Storage: Minio will be used as storage.
|
||||
- Workflow Orchestrator: Airflow will be used as the workflow orchestrator.
|
||||
- Processing Framework: Spark-Operator will be used as the processing framework.
|
||||
- Storage: Minio as a S3-compatible storage.
|
||||
- Workflow Orchestrator: Airflow to define and run pipelines.
|
||||
- Processing Framework: Apache Spark as the processing framework.
|
||||
- Notebook: Jupyter to inspect data and conduct analysis
|
||||
|
||||
### Storage
|
||||
[Minio](https://min.io/)": is an open-source object storage service that will be used to store the data that is used to generate the intermediate version of the OpenAIRE Research Graph.
|
||||
|
||||
[Minio](https://min.io/): is an open-source object storage service that will be used to store the data that is used to generate the intermediate version of the OpenAIRE Research Graph.
|
||||
|
||||
### Workflow Orchestrator
|
||||
[Airflow](https://airflow.apache.org/) is an open-source workflow management platform that will be used to orchestrate the generation of the OpenAIRE Research Graph. Airflow is a powerful and flexible workflow orchestration tool that can be used to automate complex workflows.
|
||||
|
||||
### Processing Framework
|
||||
[Spark-Operator](https://github.com/kubeflow/spark-operator) The Kubernetes Operator for Apache Spark aims to make specifying and running Spark applications as easy and idiomatic as running other workloads on Kubernetes. It uses Kubernetes custom resources for specifying, running, and surfacing status of Spark applications. For a complete reference of the custom resource definitions, please refer to the API Definition. For details on its design, please refer to the design doc.
|
||||
[Spark-Operator](https://github.com/kubeflow/spark-operator) The Kubernetes Operator for Apache Spark aims to make specifying and running Spark applications as easy and idiomatic as running other workloads on Kubernetes. It uses Kubernetes custom resources for specifying, running, and surfacing status of Spark applications. For a complete reference of the custom resource definitions, please refer to the API Definition.
|
||||
|
||||
### Notebook
|
||||
[Jupyter](https://jupyter.org/) Jupyter is an interactive development environment for notebooks, code, and data. Its allows users to configure and arrange workflows in data science, scientific computing, computational journalism, and machine learning. A modular design invites extensions to expand and enrich functionality.
|
||||
|
||||
## How to use in local
|
||||
|
||||
### Prerequisite
|
||||
|
||||
|
||||
This section outlines the prerequisites for setting up a developer cluster on your local machine using Docker and Kind. A developer cluster is a small-scale Kubernetes cluster that can be used for developing and testing applications. Docker and Kind are two tools that can be used to create and manage Kubernetes clusters locally.
|
||||
|
||||
Before you begin, you will need to install the following software on your local machine:
|
||||
|
@ -33,17 +35,31 @@ Before you begin, you will need to install the following software on your local
|
|||
|
||||
2. Kind: Kind is a tool for creating local Kubernetes clusters using Docker container "nodes". You can download Kind from https://kind.sigs.k8s.io/docs/user/quick-start/.
|
||||
|
||||
3. Terraform: Terraform lets you define what your infrastructure looks like in a single configuration file. This file describes things like virtual machines, storage, and networking. Terraform then takes that configuration and provisions (creates) all the resources you need in the cloud, following your instructions.
|
||||
3. OpenTofu: 3. OpenTofu: Terraform lets you define what your infrastructure looks like in a single configuration file. This file describes lets you define what your infrastructure looks like in a single configuration file. This file describes things like virtual machines, storage, and networking. 3. OpenTofu: Terraform lets you define what your infrastructure looks like in a single configuration file. This file describes then takes that configuration and provisions (creates) all the resources you need in the cloud, following your instructions. You can install opentofu variant with brew: brew install Opentofu
|
||||
|
||||
4. k9s: K9s is a terminal based UI to interact with your Kubernetes clusters.
|
||||
|
||||
### Create Kubernetes cluster
|
||||
For creating kubernetes cluster run the following command
|
||||
### Install required tools with homebrew
|
||||
|
||||
```console
|
||||
brew install kind opentofu k9s
|
||||
```
|
||||
|
||||
### Clone the repository
|
||||
|
||||
```console
|
||||
git clone https://code-repo.d4science.org/D-Net/code-infrastructure-lab.git
|
||||
cd code-infrastructure-lab
|
||||
```
|
||||
|
||||
### Create Kubernetes cluster with kind
|
||||
For creating kubernetes cluster run the following command after ensuring docker daemon is running
|
||||
|
||||
```console
|
||||
kind create cluster --config clusters/local/kind-cluster-config.yaml
|
||||
```
|
||||
|
||||
this command will generate a cluster named `dnet-data-platform`
|
||||
this command will generate a kind cluster named `dnet-data-platform`
|
||||
|
||||
Then we create Ingress that is a Kubernetes resource that allows you to manage external access to services running on a cluster (like minio console or sparkUI).
|
||||
|
||||
|
@ -54,28 +70,68 @@ To enable ingress run the command:
|
|||
kubectl apply --context kind-dnet-data-platform -f ./clusters/local/nginx-kind-deploy.yaml
|
||||
```
|
||||
|
||||
### Build and load custom images required in the next steps
|
||||
|
||||
```console
|
||||
cd docker-images/spark-operator
|
||||
docker build -t spark-operator:2.0.2 .
|
||||
kind load docker-image -n dnet-data-platform spark-operator:2.0.2
|
||||
cd -
|
||||
```
|
||||
|
||||
```console
|
||||
cd docker-images/spark-image
|
||||
docker build -t dnet-spark:1.0.0 .
|
||||
kind load docker-image -n dnet-data-platform dnet-spark:1.0.0
|
||||
```
|
||||
|
||||
|
||||
### Define the cluster
|
||||
|
||||
- Generate a terraform variable file starting from the file ```local.tfvars.template``` and save as ```local.tfvars```
|
||||
```console
|
||||
copy local.tfvars.template local.tfvars
|
||||
````
|
||||
- Initialize terraform:
|
||||
```console
|
||||
terraform init
|
||||
tofu init
|
||||
````
|
||||
- Build and add the Docker image of Spark-Operator:
|
||||
|
||||
```console
|
||||
cd docker-images/spark-operator
|
||||
docker build -t spark-operator:2.0.2 . && kind load docker-image -n dnet-data-platform spark-operator:2.0.2
|
||||
```
|
||||
|
||||
```console
|
||||
cd docker-images/spark-image
|
||||
docker build -t dnet-spark:1.0.0 . && kind load docker-image -n dnet-data-platform dnet-spark:1.0.0
|
||||
```
|
||||
|
||||
- Create the cluster:
|
||||
```console
|
||||
terraform apply -var-file="local.tfvars"
|
||||
tofu apply -var-file="local.tfvars"
|
||||
```
|
||||
|
||||
### Configure local DNS to point to
|
||||
|
||||
Kind is preconfigured to expose services to your local machine's 80 and 443 ports (extraPortMappings section). Kind kubernets cluster is setup with an nginx proxy services the following urls:
|
||||
- http://airflow.local-dataplatform/home (admin / admin)
|
||||
- https://console-minio.local-dataplatform/login (minio / minio123)
|
||||
- https://jupyter.local-dataplatform/ (yuvian / jupyter)
|
||||
|
||||
those server names needs to be resolved to home (127.0.0.1) IP address.
|
||||
|
||||
A simple solution to enable DNS resolution is to add them in your /etc/hosts configuration file, but for extensibility reasons, it is recommended to register a local domain through dnsmasq:
|
||||
|
||||
#### Install dnsmask
|
||||
```console
|
||||
brew install dnsmasq
|
||||
```
|
||||
#### Configure dnsmasq for *.localhost
|
||||
```console
|
||||
mkdir -pv $(brew --prefix)/etc/
|
||||
echo ‘address=/.local-dataplatform/127.0.0.1’ >> $(brew --prefix)/etc/dnsmasq.conf
|
||||
```
|
||||
#### Configure the port dnsmasq
|
||||
```console
|
||||
echo ‘port=53’ >> $(brew —- prefix)/etc/dnsmasq.conf
|
||||
```
|
||||
|
||||
|
||||
#### Start dnsmasq as a service so it automatically starts at login
|
||||
```console
|
||||
sudo brew services start dnsmasq
|
||||
```
|
||||
#### Create dnsresolver
|
||||
```console
|
||||
sudo mkdir -v /etc/resolver
|
||||
sudo bash -c ‘echo “nameserver 127.0.0.1” > /etc/resolver/local-dataplatform’
|
||||
```
|
||||
|
|
|
@ -19,7 +19,11 @@ nodes:
|
|||
- containerPort: 443
|
||||
hostPort: 443
|
||||
protocol: TCP
|
||||
|
||||
extraMounts:
|
||||
- hostPath: ./workflow/dnet
|
||||
containerPath: /dnet-airflow
|
||||
readOnly: true
|
||||
propagation: None
|
||||
|
||||
containerdConfigPatches:
|
||||
- |-
|
||||
|
|
|
@ -4,6 +4,6 @@ FROM spark:3.5.3-scala2.12-java17-ubuntu
|
|||
USER root
|
||||
RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar -o ${SPARK_HOME}/jars/hadoop-aws-3.3.4.jar
|
||||
RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar -o ${SPARK_HOME}/jars/aws-java-sdk-bundle-1.12.262.jar
|
||||
|
||||
RUN curl https://repo1.maven.org/maven2/org/apache/spark/spark-hadoop-cloud_2.12/3.5.3/spark-hadoop-cloud_2.12-3.5.3.jar -o ${SPARK_HOME}/jars/spark-hadoop-cloud_2.12-3.5.3.jar
|
||||
|
||||
user spark
|
|
@ -5,3 +5,4 @@ ENV SPARK_HOME /opt/spark
|
|||
USER root
|
||||
RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar -o ${SPARK_HOME}/jars/hadoop-aws-3.3.4.jar
|
||||
RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar -o ${SPARK_HOME}/jars/aws-java-sdk-bundle-1.12.262.jar
|
||||
RUN curl https://repo1.maven.org/maven2/org/apache/spark/spark-hadoop-cloud_2.12/3.5.3/spark-hadoop-cloud_2.12-3.5.3.jar -o ${SPARK_HOME}/jars/spark-hadoop-cloud_2.12-3.5.3.jar
|
||||
|
|
|
@ -25,15 +25,15 @@ secret:
|
|||
secretName: "s3-conn-secrets"
|
||||
secretKey: "AIRFLOW_CONN_S3_CONN"
|
||||
|
||||
|
||||
# - envName: "AIRFLOW_CONN_POSTGRES_CONN"
|
||||
# secretName: "postgres-conn-secrets"
|
||||
# secretKey: "AIRFLOW_CONN_POSTGRES_CONN"
|
||||
dags:
|
||||
persistence:
|
||||
enabled: true
|
||||
existingClaim: "local-dnet-airflow"
|
||||
gitSync:
|
||||
enabled: true
|
||||
repo: "https://code-repo.d4science.org/D-Net/code-infrastructure-lab.git"
|
||||
branch: "master"
|
||||
subPath: "workflow/dnet"
|
||||
enabled: false
|
||||
|
||||
config:
|
||||
webserver:
|
||||
|
|
|
@ -165,6 +165,53 @@ EOT
|
|||
type = "Opaque"
|
||||
}
|
||||
|
||||
resource "kubernetes_persistent_volume" "dags" {
|
||||
metadata {
|
||||
name = "local-dnet-airflow"
|
||||
labels = {
|
||||
type = "local"
|
||||
}
|
||||
}
|
||||
spec {
|
||||
node_affinity {
|
||||
required {
|
||||
node_selector_term {
|
||||
match_expressions {
|
||||
key = "kubernetes.io/hostname"
|
||||
operator = "In"
|
||||
values = ["dnet-data-platform-control-plane"]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
storage_class_name = "standard"
|
||||
capacity = {
|
||||
storage = "1Gi"
|
||||
}
|
||||
access_modes = ["ReadWriteOnce"]
|
||||
persistent_volume_source {
|
||||
local {
|
||||
path = "/dnet-airflow"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
resource "kubernetes_persistent_volume_claim" "dags" {
|
||||
metadata {
|
||||
name = "local-dnet-airflow"
|
||||
namespace = "${var.namespace_prefix}airflow"
|
||||
}
|
||||
spec {
|
||||
storage_class_name = "standard"
|
||||
access_modes = ["ReadWriteOnce"]
|
||||
resources {
|
||||
requests = {
|
||||
storage = "1Gi"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
resource "helm_release" "airflow" {
|
||||
depends_on = [kubernetes_secret.s3_conn_secrets]
|
||||
|
|
|
@ -5,13 +5,18 @@ resource "helm_release" "jupyterhub" {
|
|||
create_namespace = "true"
|
||||
namespace = "${var.namespace_prefix}spark-jobs"
|
||||
dependency_update = "true"
|
||||
version = "3.3.8"
|
||||
version = "4.0.0"
|
||||
|
||||
set {
|
||||
name = "ingress.enabled"
|
||||
value = "true"
|
||||
}
|
||||
|
||||
set {
|
||||
name = "proxy.service.type"
|
||||
value = "NodePort"
|
||||
}
|
||||
|
||||
set {
|
||||
name = "ingress.ingressClassName"
|
||||
value = "nginx"
|
||||
|
|
|
@ -62,13 +62,25 @@ def build_new_graph():
|
|||
|
||||
conf={
|
||||
"S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
|
||||
|
||||
"ORCID_PATH": "{{ dag_run.conf.get('ORCID_PATH') }}",
|
||||
"INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"],
|
||||
"OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["orcid_enhancement"],
|
||||
"WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/orcid_enrichment"
|
||||
}
|
||||
),
|
||||
TriggerDagRunOperator(
|
||||
task_id="scholix_graph",
|
||||
task_display_name="Create Scholexplorer Graph",
|
||||
trigger_dag_id="build_scholexplorer_graph",
|
||||
wait_for_completion=True,
|
||||
|
||||
conf={
|
||||
"S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
|
||||
"INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["dedup"],
|
||||
"OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["scholexplorer"]
|
||||
}
|
||||
)
|
||||
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
import os
|
||||
from datetime import timedelta
|
||||
|
||||
from airflow.decorators import dag
|
||||
from airflow.models.param import Param
|
||||
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
|
||||
|
||||
from spark_configurator import SparkConfigurator
|
||||
from dag_utils import SPARK_RESOURCES_PROFILES
|
||||
|
||||
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
||||
|
||||
|
||||
default_args = {
|
||||
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
|
||||
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
|
||||
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
|
||||
}
|
||||
|
||||
@dag(
|
||||
dag_id="build_scholexplorer_graph",
|
||||
dag_display_name="Create Scholexplorer Graph",
|
||||
default_args=default_args,
|
||||
params={
|
||||
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
|
||||
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/06_graph_dedup", type='string', description=""),
|
||||
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/scholexplorer_graph", type='string',
|
||||
description="")
|
||||
},
|
||||
tags=["openaire", "scholexplorer"]
|
||||
)
|
||||
def build_scholexplorer_dag():
|
||||
scholix_graph = SparkKubernetesOperator(
|
||||
task_id='BuildScholexplorerGraph',
|
||||
task_display_name="Create Scholexplorer Graph",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="orcidpropagate-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.sx.graph.SparkCreateScholexplorerDump",
|
||||
profile=SPARK_RESOURCES_PROFILES['medium'],
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--sourcePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
||||
"--targetPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}",
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
|
||||
scholix_graph
|
||||
|
||||
|
||||
build_scholexplorer_dag()
|
|
@ -1,5 +1,6 @@
|
|||
from airflow.hooks.base import BaseHook
|
||||
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
|
||||
from spark_configurator import SparkResourceProfile
|
||||
|
||||
BUILD_PHASES = {
|
||||
"raw": "01_graph_raw",
|
||||
|
@ -11,7 +12,36 @@ BUILD_PHASES = {
|
|||
"dedup": "06_graph_dedup",
|
||||
"consistency": "07_graph_consistent",
|
||||
"enrichment": "08_graph_dedup_enriched", # actionset
|
||||
"orcid_enhancement": "09_graph_orcid_enriched"
|
||||
"orcid_enhancement": "09_graph_orcid_enriched",
|
||||
"scholexplorer":"scholexplorer_graph"
|
||||
}
|
||||
|
||||
|
||||
SPARK_RESOURCES_PROFILES = {
|
||||
"small": SparkResourceProfile(
|
||||
driver_cores=1,
|
||||
driver_memory="1G",
|
||||
executor_cores=2,
|
||||
executor_memory="2G",
|
||||
executor_memoryOverhead="1G",
|
||||
executor_instances=1
|
||||
),
|
||||
"medium": SparkResourceProfile(
|
||||
driver_cores=1,
|
||||
driver_memory="1G",
|
||||
executor_cores=8,
|
||||
executor_memory="8G",
|
||||
executor_memoryOverhead="3G",
|
||||
executor_instances=1
|
||||
),
|
||||
"large": SparkResourceProfile(
|
||||
driver_cores=1,
|
||||
driver_memory="1G",
|
||||
executor_cores=8,
|
||||
executor_memory="16G",
|
||||
executor_memoryOverhead="8G",
|
||||
executor_instances=1
|
||||
)
|
||||
}
|
||||
|
||||
def get_bucket_name(context: dict, hook: S3Hook, param_name: str):
|
||||
|
|
|
@ -5,7 +5,7 @@ from airflow.decorators import dag
|
|||
from airflow.models.baseoperator import chain
|
||||
from airflow.models.param import Param
|
||||
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
|
||||
|
||||
from dag_utils import SPARK_RESOURCES_PROFILES
|
||||
from spark_configurator import SparkConfigurator
|
||||
|
||||
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
||||
|
@ -160,6 +160,7 @@ def results_deduplication_dag():
|
|||
name="copyrelations-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCopyRelationsNoOpenorgs",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
profile=SPARK_RESOURCES_PROFILES['medium'],
|
||||
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
||||
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
|
||||
"--dedupGraphPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}"
|
||||
|
|
|
@ -1,87 +0,0 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""
|
||||
This is an example DAG which uses SparkKubernetesOperator and SparkKubernetesSensor.
|
||||
In this example, we create two tasks which execute sequentially.
|
||||
The first task is to submit sparkApplication on Kubernetes cluster(the example uses spark-pi application).
|
||||
and the second task is to check the final state of the sparkApplication that submitted in the first state.
|
||||
|
||||
Spark-on-k8s operator is required to be already installed on Kubernetes
|
||||
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
|
||||
"""
|
||||
|
||||
from os import path
|
||||
from datetime import timedelta, datetime
|
||||
from spark_configurator import SparkConfigurator
|
||||
|
||||
# [START import_module]
|
||||
# The DAG object; we'll need this to instantiate a DAG
|
||||
from airflow import DAG
|
||||
# Operators; we need this to operate!
|
||||
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
|
||||
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
|
||||
from airflow.utils.dates import days_ago
|
||||
|
||||
|
||||
# [END import_module]
|
||||
|
||||
# [START default_args]
|
||||
# These args will get passed on to each operator
|
||||
# You can override them on a per-task basis during operator initialization
|
||||
default_args = {
|
||||
'owner': 'airflow',
|
||||
'depends_on_past': False,
|
||||
'start_date': days_ago(1),
|
||||
'email': ['airflow@example.com'],
|
||||
'email_on_failure': False,
|
||||
'email_on_retry': False,
|
||||
'max_active_runs': 1,
|
||||
'retries': 3
|
||||
}
|
||||
|
||||
spec =SparkConfigurator(
|
||||
name="spark-scholix-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.sx.graph.SparkCreateScholexplorerDump",
|
||||
jarLocation = 's3a://deps/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments =[ "--sourcePath", "s3a://raw-graph/01", "--targetPath", "s3a://scholix"],\
|
||||
executor_cores=4,
|
||||
executor_memory="4G",
|
||||
executor_instances=1,
|
||||
executor_memoryOverhead="3G").get_configuration()
|
||||
|
||||
dag = DAG(
|
||||
'spark_run_test',
|
||||
default_args=default_args,
|
||||
schedule_interval=None,
|
||||
tags=['example', 'spark']
|
||||
)
|
||||
|
||||
submit = SparkKubernetesOperator(
|
||||
task_id='spark-scholix',
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=spec,
|
||||
kubernetes_conn_id="kubernetes_default",
|
||||
# do_xcom_push=True,
|
||||
# delete_on_termination=True,
|
||||
# base_container_name="spark-kubernetes-driver",
|
||||
dag=dag
|
||||
)
|
||||
|
||||
|
||||
|
||||
submit
|
|
@ -1,3 +1,15 @@
|
|||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class SparkResourceProfile:
|
||||
driver_cores: int
|
||||
driver_memory: str
|
||||
executor_cores:int
|
||||
executor_memory:str
|
||||
executor_memoryOverhead:str
|
||||
executor_instances:str
|
||||
|
||||
|
||||
class SparkConfigurator:
|
||||
def __init__(self,
|
||||
|
@ -8,12 +20,13 @@ class SparkConfigurator:
|
|||
apiVersion=None,
|
||||
namespace="dnet-spark-jobs",
|
||||
image= "dnet-spark:1.0.0",
|
||||
driver_cores=1,
|
||||
driver_memory='1G',
|
||||
executor_cores=8,
|
||||
executor_memory="16G",
|
||||
executor_memoryOverhead="8G",
|
||||
executor_instances=1
|
||||
profile: SparkResourceProfile = SparkResourceProfile(driver_cores=1,
|
||||
driver_memory="1G",
|
||||
executor_cores=8,
|
||||
executor_memory="16G",
|
||||
executor_memoryOverhead="8G",
|
||||
executor_instances=1)
|
||||
|
||||
) -> None:
|
||||
if apiVersion:
|
||||
self.apiVersion = apiVersion
|
||||
|
@ -43,15 +56,18 @@ class SparkConfigurator:
|
|||
"fs.s3a.connection.ssl.strictverify": "false",
|
||||
"fs.s3a.connection.ssl.enabled": "false",
|
||||
"fs.s3a.ssl.enabled": "false",
|
||||
"spark.hadoop.fs.s3a.ssl.enabled": "false"
|
||||
"spark.hadoop.fs.s3a.ssl.enabled": "false",
|
||||
"fs.s3a.committer.name": "magic",
|
||||
"spark.sql.sources.commitProtocolClass": "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol",
|
||||
"spark.sql.parquet.output.committer.class": "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter"
|
||||
}
|
||||
self.sparkResoruceConf= {
|
||||
'driver_cores':driver_cores,
|
||||
'driver_memory':driver_memory,
|
||||
'executor_cores':executor_cores,
|
||||
'executor_memory':executor_memory,
|
||||
'executor_instances':executor_instances,
|
||||
'memoryOverhead':executor_memoryOverhead
|
||||
'driver_cores':profile.driver_cores,
|
||||
'driver_memory':profile.driver_memory,
|
||||
'executor_cores':profile.executor_cores,
|
||||
'executor_memory':profile.executor_memory,
|
||||
'executor_instances':profile.executor_instances,
|
||||
'memoryOverhead':profile.executor_memoryOverhead
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue