Compare commits

...

12 Commits

Author SHA1 Message Date
Giambattista Bloisi f1e7bedfc6 Update zeppelin and fix deployment error 2024-12-12 14:46:33 +01:00
Giambattista Bloisi 113a662ba8 Load Airflow dags from local folder 2024-11-23 21:03:27 +01:00
Giambattista Bloisi 7d353702d1 Revised documentation 2024-11-23 21:02:53 +01:00
sandro.labruzzo e1ef2438e1 added spark-cloud dependency for the committer 2024-11-19 11:52:44 +01:00
sandro.labruzzo 8bbbbf51ae added committer to configuration 2024-11-19 09:59:20 +01:00
sandro.labruzzo 9682e964b3 fixed workflow parameters 2024-11-18 16:42:16 +01:00
sandro.labruzzo 4372ab9e1c fixed import 2024-11-18 16:34:07 +01:00
sandro.labruzzo 961c24afcc fixed import 2024-11-18 16:30:16 +01:00
sandro.labruzzo 9da33a9f00 removed test workflow 2024-11-18 16:29:09 +01:00
sandro.labruzzo 6e396f7e34 added resource profiles 2024-11-18 16:19:55 +01:00
sandro.labruzzo 9ae7152afb added scholexplorer workflow 2024-11-18 15:23:34 +01:00
sandro.labruzzo 30f2b345d6 added scholexplorer workflow 2024-11-18 15:23:17 +01:00
13 changed files with 274 additions and 138 deletions

110
README.md
View File

@ -1,30 +1,32 @@
# Code Infrastructure Lab
This module defines configurations for creating a Kubernetes cluster for generating the OpenAIRE Research Graph.
This project allows the user to install a local Kubernetes cluster running services to test and develop locally the generation of the OpenAIRE graph.
## Cluster definition
The Kubernetes cluster will include some essential services for testing OpenAIRE Graph generation:
- Storage: Minio will be used as storage.
- Workflow Orchestrator: Airflow will be used as the workflow orchestrator.
- Processing Framework: Spark-Operator will be used as the processing framework.
- Storage: Minio as a S3-compatible storage.
- Workflow Orchestrator: Airflow to define and run pipelines.
- Processing Framework: Apache Spark as the processing framework.
- Notebook: Jupyter to inspect data and conduct analysis
### Storage
[Minio](https://min.io/)": is an open-source object storage service that will be used to store the data that is used to generate the intermediate version of the OpenAIRE Research Graph.
[Minio](https://min.io/): is an open-source object storage service that will be used to store the data that is used to generate the intermediate version of the OpenAIRE Research Graph.
### Workflow Orchestrator
[Airflow](https://airflow.apache.org/) is an open-source workflow management platform that will be used to orchestrate the generation of the OpenAIRE Research Graph. Airflow is a powerful and flexible workflow orchestration tool that can be used to automate complex workflows.
### Processing Framework
[Spark-Operator](https://github.com/kubeflow/spark-operator) The Kubernetes Operator for Apache Spark aims to make specifying and running Spark applications as easy and idiomatic as running other workloads on Kubernetes. It uses Kubernetes custom resources for specifying, running, and surfacing status of Spark applications. For a complete reference of the custom resource definitions, please refer to the API Definition. For details on its design, please refer to the design doc.
[Spark-Operator](https://github.com/kubeflow/spark-operator) The Kubernetes Operator for Apache Spark aims to make specifying and running Spark applications as easy and idiomatic as running other workloads on Kubernetes. It uses Kubernetes custom resources for specifying, running, and surfacing status of Spark applications. For a complete reference of the custom resource definitions, please refer to the API Definition.
### Notebook
[Jupyter](https://jupyter.org/) Jupyter is an interactive development environment for notebooks, code, and data. Its allows users to configure and arrange workflows in data science, scientific computing, computational journalism, and machine learning. A modular design invites extensions to expand and enrich functionality.
## How to use in local
### Prerequisite
This section outlines the prerequisites for setting up a developer cluster on your local machine using Docker and Kind. A developer cluster is a small-scale Kubernetes cluster that can be used for developing and testing applications. Docker and Kind are two tools that can be used to create and manage Kubernetes clusters locally.
Before you begin, you will need to install the following software on your local machine:
@ -33,17 +35,31 @@ Before you begin, you will need to install the following software on your local
2. Kind: Kind is a tool for creating local Kubernetes clusters using Docker container "nodes". You can download Kind from https://kind.sigs.k8s.io/docs/user/quick-start/.
3. Terraform: Terraform lets you define what your infrastructure looks like in a single configuration file. This file describes things like virtual machines, storage, and networking. Terraform then takes that configuration and provisions (creates) all the resources you need in the cloud, following your instructions.
3. OpenTofu: 3. OpenTofu: Terraform lets you define what your infrastructure looks like in a single configuration file. This file describes lets you define what your infrastructure looks like in a single configuration file. This file describes things like virtual machines, storage, and networking. 3. OpenTofu: Terraform lets you define what your infrastructure looks like in a single configuration file. This file describes then takes that configuration and provisions (creates) all the resources you need in the cloud, following your instructions. You can install opentofu variant with brew: brew install Opentofu
4. k9s: K9s is a terminal based UI to interact with your Kubernetes clusters.
### Create Kubernetes cluster
For creating kubernetes cluster run the following command
### Install required tools with homebrew
```console
brew install kind opentofu k9s
```
### Clone the repository
```console
git clone https://code-repo.d4science.org/D-Net/code-infrastructure-lab.git
cd code-infrastructure-lab
```
### Create Kubernetes cluster with kind
For creating kubernetes cluster run the following command after ensuring docker daemon is running
```console
kind create cluster --config clusters/local/kind-cluster-config.yaml
```
this command will generate a cluster named `dnet-data-platform`
this command will generate a kind cluster named `dnet-data-platform`
Then we create Ingress that is a Kubernetes resource that allows you to manage external access to services running on a cluster (like minio console or sparkUI).
@ -54,28 +70,68 @@ To enable ingress run the command:
kubectl apply --context kind-dnet-data-platform -f ./clusters/local/nginx-kind-deploy.yaml
```
### Build and load custom images required in the next steps
```console
cd docker-images/spark-operator
docker build -t spark-operator:2.0.2 .
kind load docker-image -n dnet-data-platform spark-operator:2.0.2
cd -
```
```console
cd docker-images/spark-image
docker build -t dnet-spark:1.0.0 .
kind load docker-image -n dnet-data-platform dnet-spark:1.0.0
```
### Define the cluster
- Generate a terraform variable file starting from the file ```local.tfvars.template``` and save as ```local.tfvars```
```console
copy local.tfvars.template local.tfvars
````
- Initialize terraform:
```console
terraform init
tofu init
````
- Build and add the Docker image of Spark-Operator:
```console
cd docker-images/spark-operator
docker build -t spark-operator:2.0.2 . && kind load docker-image -n dnet-data-platform spark-operator:2.0.2
```
```console
cd docker-images/spark-image
docker build -t dnet-spark:1.0.0 . && kind load docker-image -n dnet-data-platform dnet-spark:1.0.0
```
- Create the cluster:
```console
terraform apply -var-file="local.tfvars"
tofu apply -var-file="local.tfvars"
```
### Configure local DNS to point to
Kind is preconfigured to expose services to your local machine's 80 and 443 ports (extraPortMappings section). Kind kubernets cluster is setup with an nginx proxy services the following urls:
- http://airflow.local-dataplatform/home (admin / admin)
- https://console-minio.local-dataplatform/login (minio / minio123)
- https://jupyter.local-dataplatform/ (yuvian / jupyter)
those server names needs to be resolved to home (127.0.0.1) IP address.
A simple solution to enable DNS resolution is to add them in your /etc/hosts configuration file, but for extensibility reasons, it is recommended to register a local domain through dnsmasq:
#### Install dnsmask
```console
brew install dnsmasq
```
#### Configure dnsmasq for *.localhost
```console
mkdir -pv $(brew --prefix)/etc/
echo address=/.local-dataplatform/127.0.0.1 >> $(brew --prefix)/etc/dnsmasq.conf
```
#### Configure the port dnsmasq
```console
echo port=53 >> $(brew —- prefix)/etc/dnsmasq.conf
```
#### Start dnsmasq as a service so it automatically starts at login
```console
sudo brew services start dnsmasq
```
#### Create dnsresolver
```console
sudo mkdir -v /etc/resolver
sudo bash -c echo “nameserver 127.0.0.1” > /etc/resolver/local-dataplatform
```

View File

@ -19,7 +19,11 @@ nodes:
- containerPort: 443
hostPort: 443
protocol: TCP
extraMounts:
- hostPath: ./workflow/dnet
containerPath: /dnet-airflow
readOnly: true
propagation: None
containerdConfigPatches:
- |-

View File

@ -4,6 +4,6 @@ FROM spark:3.5.3-scala2.12-java17-ubuntu
USER root
RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar -o ${SPARK_HOME}/jars/hadoop-aws-3.3.4.jar
RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar -o ${SPARK_HOME}/jars/aws-java-sdk-bundle-1.12.262.jar
RUN curl https://repo1.maven.org/maven2/org/apache/spark/spark-hadoop-cloud_2.12/3.5.3/spark-hadoop-cloud_2.12-3.5.3.jar -o ${SPARK_HOME}/jars/spark-hadoop-cloud_2.12-3.5.3.jar
user spark

View File

@ -5,3 +5,4 @@ ENV SPARK_HOME /opt/spark
USER root
RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar -o ${SPARK_HOME}/jars/hadoop-aws-3.3.4.jar
RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar -o ${SPARK_HOME}/jars/aws-java-sdk-bundle-1.12.262.jar
RUN curl https://repo1.maven.org/maven2/org/apache/spark/spark-hadoop-cloud_2.12/3.5.3/spark-hadoop-cloud_2.12-3.5.3.jar -o ${SPARK_HOME}/jars/spark-hadoop-cloud_2.12-3.5.3.jar

View File

@ -25,15 +25,15 @@ secret:
secretName: "s3-conn-secrets"
secretKey: "AIRFLOW_CONN_S3_CONN"
# - envName: "AIRFLOW_CONN_POSTGRES_CONN"
# secretName: "postgres-conn-secrets"
# secretKey: "AIRFLOW_CONN_POSTGRES_CONN"
dags:
persistence:
enabled: true
existingClaim: "local-dnet-airflow"
gitSync:
enabled: true
repo: "https://code-repo.d4science.org/D-Net/code-infrastructure-lab.git"
branch: "master"
subPath: "workflow/dnet"
enabled: false
config:
webserver:

View File

@ -165,6 +165,53 @@ EOT
type = "Opaque"
}
resource "kubernetes_persistent_volume" "dags" {
metadata {
name = "local-dnet-airflow"
labels = {
type = "local"
}
}
spec {
node_affinity {
required {
node_selector_term {
match_expressions {
key = "kubernetes.io/hostname"
operator = "In"
values = ["dnet-data-platform-control-plane"]
}
}
}
}
storage_class_name = "standard"
capacity = {
storage = "1Gi"
}
access_modes = ["ReadWriteOnce"]
persistent_volume_source {
local {
path = "/dnet-airflow"
}
}
}
}
resource "kubernetes_persistent_volume_claim" "dags" {
metadata {
name = "local-dnet-airflow"
namespace = "${var.namespace_prefix}airflow"
}
spec {
storage_class_name = "standard"
access_modes = ["ReadWriteOnce"]
resources {
requests = {
storage = "1Gi"
}
}
}
}
resource "helm_release" "airflow" {
depends_on = [kubernetes_secret.s3_conn_secrets]

View File

@ -5,13 +5,18 @@ resource "helm_release" "jupyterhub" {
create_namespace = "true"
namespace = "${var.namespace_prefix}spark-jobs"
dependency_update = "true"
version = "3.3.8"
version = "4.0.0"
set {
name = "ingress.enabled"
value = "true"
}
set {
name = "proxy.service.type"
value = "NodePort"
}
set {
name = "ingress.ingressClassName"
value = "nginx"

View File

@ -62,13 +62,25 @@ def build_new_graph():
conf={
"S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
"ORCID_PATH": "{{ dag_run.conf.get('ORCID_PATH') }}",
"INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"],
"OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["orcid_enhancement"],
"WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/orcid_enrichment"
}
),
TriggerDagRunOperator(
task_id="scholix_graph",
task_display_name="Create Scholexplorer Graph",
trigger_dag_id="build_scholexplorer_graph",
wait_for_completion=True,
conf={
"S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
"INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["dedup"],
"OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["scholexplorer"]
}
)
)

View File

@ -0,0 +1,51 @@
import os
from datetime import timedelta
from airflow.decorators import dag
from airflow.models.param import Param
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from spark_configurator import SparkConfigurator
from dag_utils import SPARK_RESOURCES_PROFILES
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
}
@dag(
dag_id="build_scholexplorer_graph",
dag_display_name="Create Scholexplorer Graph",
default_args=default_args,
params={
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/06_graph_dedup", type='string', description=""),
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/scholexplorer_graph", type='string',
description="")
},
tags=["openaire", "scholexplorer"]
)
def build_scholexplorer_dag():
scholix_graph = SparkKubernetesOperator(
task_id='BuildScholexplorerGraph',
task_display_name="Create Scholexplorer Graph",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="orcidpropagate-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.sx.graph.SparkCreateScholexplorerDump",
profile=SPARK_RESOURCES_PROFILES['medium'],
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--sourcePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--targetPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}",
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
scholix_graph
build_scholexplorer_dag()

View File

@ -1,5 +1,6 @@
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from spark_configurator import SparkResourceProfile
BUILD_PHASES = {
"raw": "01_graph_raw",
@ -11,7 +12,36 @@ BUILD_PHASES = {
"dedup": "06_graph_dedup",
"consistency": "07_graph_consistent",
"enrichment": "08_graph_dedup_enriched", # actionset
"orcid_enhancement": "09_graph_orcid_enriched"
"orcid_enhancement": "09_graph_orcid_enriched",
"scholexplorer":"scholexplorer_graph"
}
SPARK_RESOURCES_PROFILES = {
"small": SparkResourceProfile(
driver_cores=1,
driver_memory="1G",
executor_cores=2,
executor_memory="2G",
executor_memoryOverhead="1G",
executor_instances=1
),
"medium": SparkResourceProfile(
driver_cores=1,
driver_memory="1G",
executor_cores=8,
executor_memory="8G",
executor_memoryOverhead="3G",
executor_instances=1
),
"large": SparkResourceProfile(
driver_cores=1,
driver_memory="1G",
executor_cores=8,
executor_memory="16G",
executor_memoryOverhead="8G",
executor_instances=1
)
}
def get_bucket_name(context: dict, hook: S3Hook, param_name: str):

View File

@ -5,7 +5,7 @@ from airflow.decorators import dag
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from dag_utils import SPARK_RESOURCES_PROFILES
from spark_configurator import SparkConfigurator
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
@ -160,6 +160,7 @@ def results_deduplication_dag():
name="copyrelations-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCopyRelationsNoOpenorgs",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
profile=SPARK_RESOURCES_PROFILES['medium'],
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
"--dedupGraphPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}"

View File

@ -1,87 +0,0 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This is an example DAG which uses SparkKubernetesOperator and SparkKubernetesSensor.
In this example, we create two tasks which execute sequentially.
The first task is to submit sparkApplication on Kubernetes cluster(the example uses spark-pi application).
and the second task is to check the final state of the sparkApplication that submitted in the first state.
Spark-on-k8s operator is required to be already installed on Kubernetes
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
"""
from os import path
from datetime import timedelta, datetime
from spark_configurator import SparkConfigurator
# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
from airflow.utils.dates import days_ago
# [END import_module]
# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'max_active_runs': 1,
'retries': 3
}
spec =SparkConfigurator(
name="spark-scholix-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.sx.graph.SparkCreateScholexplorerDump",
jarLocation = 's3a://deps/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments =[ "--sourcePath", "s3a://raw-graph/01", "--targetPath", "s3a://scholix"],\
executor_cores=4,
executor_memory="4G",
executor_instances=1,
executor_memoryOverhead="3G").get_configuration()
dag = DAG(
'spark_run_test',
default_args=default_args,
schedule_interval=None,
tags=['example', 'spark']
)
submit = SparkKubernetesOperator(
task_id='spark-scholix',
namespace='dnet-spark-jobs',
template_spec=spec,
kubernetes_conn_id="kubernetes_default",
# do_xcom_push=True,
# delete_on_termination=True,
# base_container_name="spark-kubernetes-driver",
dag=dag
)
submit

View File

@ -1,3 +1,15 @@
from dataclasses import dataclass
@dataclass
class SparkResourceProfile:
driver_cores: int
driver_memory: str
executor_cores:int
executor_memory:str
executor_memoryOverhead:str
executor_instances:str
class SparkConfigurator:
def __init__(self,
@ -8,12 +20,13 @@ class SparkConfigurator:
apiVersion=None,
namespace="dnet-spark-jobs",
image= "dnet-spark:1.0.0",
driver_cores=1,
driver_memory='1G',
executor_cores=8,
executor_memory="16G",
executor_memoryOverhead="8G",
executor_instances=1
profile: SparkResourceProfile = SparkResourceProfile(driver_cores=1,
driver_memory="1G",
executor_cores=8,
executor_memory="16G",
executor_memoryOverhead="8G",
executor_instances=1)
) -> None:
if apiVersion:
self.apiVersion = apiVersion
@ -43,15 +56,18 @@ class SparkConfigurator:
"fs.s3a.connection.ssl.strictverify": "false",
"fs.s3a.connection.ssl.enabled": "false",
"fs.s3a.ssl.enabled": "false",
"spark.hadoop.fs.s3a.ssl.enabled": "false"
"spark.hadoop.fs.s3a.ssl.enabled": "false",
"fs.s3a.committer.name": "magic",
"spark.sql.sources.commitProtocolClass": "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol",
"spark.sql.parquet.output.committer.class": "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter"
}
self.sparkResoruceConf= {
'driver_cores':driver_cores,
'driver_memory':driver_memory,
'executor_cores':executor_cores,
'executor_memory':executor_memory,
'executor_instances':executor_instances,
'memoryOverhead':executor_memoryOverhead
'driver_cores':profile.driver_cores,
'driver_memory':profile.driver_memory,
'executor_cores':profile.executor_cores,
'executor_memory':profile.executor_memory,
'executor_instances':profile.executor_instances,
'memoryOverhead':profile.executor_memoryOverhead
}