oct-update #1

Merged
sandro.labruzzo merged 42 commits from oct-update into master 2024-11-18 10:43:07 +01:00
33 changed files with 1425 additions and 231 deletions

View File

@ -0,0 +1,7 @@
# docker build -t spark-operator:2.0.2 . && kind load docker-image -n dnet-data-platform spark-operator:2.0.2
FROM kubeflow/spark-operator:2.0.2
ENV SPARK_HOME /opt/spark
USER root
RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar -o ${SPARK_HOME}/jars/hadoop-aws-3.3.4.jar
RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar -o ${SPARK_HOME}/jars/aws-java-sdk-bundle-1.12.262.jar

View File

@ -1,13 +0,0 @@
webapp:
ingress:
enabled: true
className: "nginx"
annotations:
kubernetes.io/ingress.class: nginx
hosts:
- host: localhost
paths:
- path: /
pathType: ImplementationSpecific
tls: []

View File

@ -31,9 +31,9 @@ dags:
enabled: true
gitSync:
enabled: true
repo: "https://code-repo.d4science.org/D-Net/code-infrasturcutre-lab.git"
branch: "airflow"
subPath: "airflow/dags"
repo: "https://code-repo.d4science.org/D-Net/code-infrastructure-lab.git"
branch: "master"
subPath: "workflow/dnet"
config:
webserver:
@ -42,7 +42,7 @@ config:
logging:
remote_logging: "True"
logging_level: "INFO"
remote_base_log_folder: "s3://dnet-airflow/logs"
remote_base_log_folder: "s3://workflow-logs/logs"
remote_log_conn_id: "s3_conn"
encrypt_s3_logs: "False"

View File

@ -1,42 +1,3 @@
###
# Root key for dynamically creating a secret for use with configuring root MinIO User
# Specify the ``name`` and then a list of environment variables.
#
# .. important::
#
# Do not use this in production environments.
# This field is intended for use with rapid development or testing only.
#
# For example:
#
# .. code-block:: yaml
#
# name: myminio-env-configuration
# accessKey: minio
# secretKey: minio123
#
secrets:
name: myminio-env-configuration
accessKey: minio
secretKey: minio123
###
# The name of an existing Kubernetes secret to import to the MinIO Tenant
# The secret must contain a key ``config.env``.
# The values should be a series of export statements to set environment variables for the Tenant.
# For example:
#
# .. code-block:: shell
#
# stringData:
# config.env: | -
# export MINIO_ROOT_USER=ROOTUSERNAME
# export MINIO_ROOT_PASSWORD=ROOTUSERPASSWORD
#
#existingSecret:
# name: myminio-env-configuration
###
# Root key for MinIO Tenant Chart
tenant:
###
@ -47,14 +8,14 @@ tenant:
###
# Specify the Operator container image to use for the deployment.
# ``image.tag``
# For example, the following sets the image to the ``quay.io/minio/operator`` repo and the v5.0.12 tag.
# For example, the following sets the image to the ``quay.io/minio/operator`` repo and the v6.0.4 tag.
# The container pulls the image if not already present:
#
# .. code-block:: yaml
#
# image:
# repository: quay.io/minio/minio
# tag: RELEASE.2024-02-09T21-25-16Z
# tag: RELEASE.2024-10-02T17-50-41Z
# pullPolicy: IfNotPresent
#
# The chart also supports specifying an image based on digest value:
@ -69,7 +30,7 @@ tenant:
#
image:
repository: quay.io/minio/minio
tag: RELEASE.2024-02-09T21-25-16Z
tag: RELEASE.2024-10-02T17-50-41Z
pullPolicy: IfNotPresent
###
#
@ -87,6 +48,44 @@ tenant:
configuration:
name: myminio-env-configuration
###
# Root key for dynamically creating a secret for use with configuring root MinIO User
# Specify the ``name`` and then a list of environment variables.
#
# .. important::
#
# Do not use this in production environments.
# This field is intended for use with rapid development or testing only.
#
# For example:
#
# .. code-block:: yaml
#
# name: myminio-env-configuration
# accessKey: minio
# secretKey: minio123
#
configSecret:
name: myminio-env-configuration
accessKey: minio
secretKey: minio123
#existingSecret: true
###
# If this variable is set to true, then enable the usage of an existing Kubernetes secret to set environment variables for the Tenant.
# The existing Kubernetes secret name must be placed under .tenant.configuration.name e.g. existing-minio-env-configuration
# The secret must contain a key ``config.env``.
# The values should be a series of export statements to set environment variables for the Tenant.
# For example:
#
# .. code-block:: shell
#
# stringData:
# config.env: |-
# export MINIO_ROOT_USER=ROOTUSERNAME
# export MINIO_ROOT_PASSWORD=ROOTUSERPASSWORD
#
# existingSecret: false
###
# Top level key for configuring MinIO Pool(s) in this Tenant.
#
# See `Operator CRD: Pools <https://min.io/docs/minio/kubernetes/upstream/reference/operator-crd.html#pool>`__ for more information on all subfields.
@ -104,7 +103,7 @@ tenant:
volumesPerServer: 4
###
# The capacity per volume requested per MinIO Tenant Pod.
size: 1Gi
size: 50Gi
###
# The `storageClass <https://kubernetes.io/docs/concepts/storage/storage-classes/>`__ to associate with volumes generated for this pool.
#
@ -166,6 +165,12 @@ tenant:
runAsUser: 1000
runAsGroup: 1000
runAsNonRoot: true
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
seccompProfile:
type: RuntimeDefault
###
#
# An array of `Topology Spread Constraints <https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/>`__ to associate to Operator Console pods.
@ -225,6 +230,10 @@ tenant:
# Enable automatic Kubernetes based `certificate generation and signing <https://kubernetes.io/docs/tasks/tls/managing-tls-in-a-cluster>`__
requestAutoCert: true
###
# The minimum number of days to expiry before an alert for an expiring certificate is fired.
# In the below example, if a given certificate will expire in 7 days then expiration events will only be triggered 1 day before expiry
# certExpiryAlertThreshold: 1
###
# This field is used only when ``requestAutoCert: true``.
# Use this field to set CommonName for the auto-generated certificate.
# MinIO defaults to using the internal Kubernetes DNS name for the pod
@ -249,6 +258,8 @@ tenant:
# objectLock: false # optional
# region: us-east-1 # optional
buckets: [ ]
###
# Array of Kubernetes secrets from which the Operator generates MinIO users during tenant provisioning.
#
@ -271,6 +282,9 @@ tenant:
# Refer
startup: { }
###
# The `Lifecycle hooks <https://kubernetes.io/docs/concepts/containers/container-lifecycle-hooks/>`__ for container.
lifecycle: { }
###
# Directs the Operator to deploy the MinIO S3 API and Console services as LoadBalancer objects.
#
# If the Kubernetes cluster has a configured LoadBalancer, it can attempt to route traffic to those services automatically.
@ -337,14 +351,14 @@ tenant:
# # Image from tag (original behavior), for example:
# # image:
# # repository: quay.io/minio/kes
# # tag: 2024-01-11T13-09-29Z
# # tag: 2024-09-11T07-22-50Z
# # Image from digest (added after original behavior), for example:
# # image:
# # repository: quay.io/minio/kes@sha256
# # digest: fb15af611149892f357a8a99d1bcd8bf5dae713bd64c15e6eb27fbdb88fc208b
# image:
# repository: quay.io/minio/kes
# tag: 2024-01-11T13-09-29Z
# tag: 2024-09-11T07-22-50Z
# pullPolicy: IfNotPresent
# env: [ ]
# replicas: 2
@ -417,6 +431,17 @@ tenant:
# runAsGroup: 1000
# runAsNonRoot: true
# fsGroup: 1000
# containerSecurityContext:
# runAsUser: 1000
# runAsGroup: 1000
# runAsNonRoot: true
# allowPrivilegeEscalation: false
# capabilities:
# drop:
# - ALL
# seccompProfile:
# type: RuntimeDefault
###
# Configures `Ingress <https://kubernetes.io/docs/concepts/services-networking/ingress/>`__ for the Tenant S3 API and Console.
#
@ -428,7 +453,7 @@ ingress:
labels: { }
annotations:
nginx.ingress.kubernetes.io/backend-protocol: "HTTPS"
nginx.ingress.kubernetes.io/proxy-body-size: 100m
nginx.ingress.kubernetes.io/proxy-body-size: 10000m
tls: [ ]
host: minio.local
path: /
@ -439,6 +464,7 @@ ingress:
labels: { }
annotations:
nginx.ingress.kubernetes.io/backend-protocol: "HTTPS"
nginx.ingress.kubernetes.io/proxy-body-size: 10000m
tls: [ ]
host: minio-console.local
path: /
@ -451,7 +477,7 @@ ingress:
# kind: Secret
# type: Opaque
# metadata:
# name: {{ dig "secrets" "existingSecret" "" (.Values | merge (dict)) }}
# name: {{ dig "tenant" "configSecret" "name" "" (.Values | merge (dict)) }}
# stringData:
# config.env: |-
# export MINIO_ROOT_USER='minio'

38
main.tf
View File

@ -1,21 +1,31 @@
module "minio" {
source = "./modules/minio"
kube_context = var.kube_context
namespace_prefix=var.namespace_prefix
source = "./modules/minio"
kube_context = var.kube_context
namespace_prefix = var.namespace_prefix
buckets = var.minio_buckets
}
module "airflow" {
source = "./modules/airflow"
kube_context = var.kube_context
admin_user = var.admin_user
admin_password = var.admin_password
namespace_prefix= var.namespace_prefix
admin_hash = var.admin_hash
env = var.env
domain = var.domain
s3_endpoint = var.s3_endpoint
s3_key = var.s3_key
s3_secret = var.s3_secret
source = "./modules/airflow"
kube_context = var.kube_context
admin_user = var.admin_user
admin_password = var.admin_password
namespace_prefix = var.namespace_prefix
admin_hash = var.admin_hash
env = var.env
domain = var.domain
s3_endpoint = var.s3_endpoint
s3_key = var.s3_key
s3_secret = var.s3_secret
branch_name = var.dag_branch_name
dag_path = var.dag_path_name
}
module "jupyterhub" {
source = "./modules/jupyterhub"
kube_context = var.kube_context
namespace_prefix = var.namespace_prefix
domain = var.domain
}

View File

@ -22,15 +22,17 @@ resource "kubernetes_role" "airflow_spark_role" {
rule {
api_groups = ["sparkoperator.k8s.io"]
resources = ["sparkapplications", "sparkapplications/status",
"scheduledsparkapplications", "scheduledsparkapplications/status"]
verbs = ["*"]
resources = [
"sparkapplications", "sparkapplications/status",
"scheduledsparkapplications", "scheduledsparkapplications/status"
]
verbs = ["*"]
}
rule {
api_groups = [""]
resources = ["pods/log"]
verbs = ["*"]
resources = ["pods", "pods/log"]
verbs = ["*"]
}
}
@ -55,74 +57,67 @@ resource "kubernetes_role_binding_v1" "airflow_spark_role_binding" {
}
resource "kubernetes_role_binding_v1" "airflow_spark_role_binding2" {
depends_on = [kubernetes_namespace.spark_jobs_namespace]
metadata {
name = "airflow-spark-role-binding2"
namespace = "${var.namespace_prefix}spark-jobs"
}
depends_on = [kubernetes_namespace.spark_jobs_namespace]
metadata {
name = "airflow-spark-role-binding2"
namespace = "${var.namespace_prefix}spark-jobs"
}
subject {
kind = "ServiceAccount"
name = "airflow-worker"
namespace = "${var.namespace_prefix}airflow"
}
subject {
kind = "ServiceAccount"
name = "airflow-worker"
namespace = "${var.namespace_prefix}airflow"
}
role_ref {
api_group = "rbac.authorization.k8s.io"
kind = "Role"
name = "spark-role"
}
role_ref {
api_group = "rbac.authorization.k8s.io"
kind = "Role"
name = "spark-role"
}
}
#
#
# resource "kubernetes_role_binding_v1" "spark_role_binding" {
# depends_on = [kubernetes_namespace.spark_jobs_namespace]
# metadata {
# name = "spark-role-binding"
# namespace = "${var.namespace_prefix}spark-jobs"
# }
#
# subject {
# kind = "ServiceAccount"
# name = "spark"
# namespace = "${var.namespace_prefix}spark-jobs"
# }
#
# role_ref {
# api_group = "rbac.authorization.k8s.io"
# kind = "Role"
# name = "spark-role"
# }
# }
#
resource "helm_release" "gcp_spark_operator" {
depends_on = [kubernetes_namespace.spark_jobs_namespace]
depends_on = [kubernetes_namespace.spark_jobs_namespace]
name = "gcp-spark-operator"
chart = "spark-operator"
repository = "https://kubeflow.github.io/spark-operator"
create_namespace = "true"
namespace = "${var.namespace_prefix}gcp-spark-operator"
dependency_update = "true"
version = "1.2.7"
version = "2.0.2"
set {
name = "image.repository"
value = "kubeflow/spark-operator"
value = "spark-operator"
}
set {
name = "image.tag"
value = "v1beta2-1.4.5-3.5.0"
value = "2.0.2"
}
set {
name = "sparkJobNamespaces"
name = "spark.jobNamespaces"
value = "{${var.namespace_prefix}spark-jobs}"
}
set {
name = "serviceAccounts.spark.name"
name = "spark.serviceAccount.create"
value = "true"
}
set {
name = "spark.serviceAccount.name"
value = "spark"
}
set {
name = "controller.serviceAccount.create"
value = "true"
}
set {
name = "controller.serviceAccount.name"
value = "spark"
}
@ -132,7 +127,7 @@ resource "helm_release" "gcp_spark_operator" {
}
set {
name = "ingressUrlFormat"
name = "driver.ingressUrlFormat"
value = "\\{\\{$appName\\}\\}.\\{\\{$appNamespace\\}\\}.${var.domain}"
type = "string"
}
@ -147,13 +142,13 @@ resource "kubernetes_namespace" "airflow" {
resource "kubernetes_secret" "s3_conn_secrets" {
depends_on = [kubernetes_namespace.airflow]
metadata {
name = "s3-conn-secrets"
name = "s3-conn-secrets"
namespace = "${var.namespace_prefix}airflow"
}
data = {
username = var.s3_key
password = var.s3_secret
username = var.s3_key
password = var.s3_secret
AIRFLOW_CONN_S3_CONN = <<EOT
{
"conn_type": "aws",
@ -171,7 +166,6 @@ EOT
}
resource "helm_release" "airflow" {
depends_on = [kubernetes_secret.s3_conn_secrets]
@ -180,7 +174,7 @@ resource "helm_release" "airflow" {
repository = "https://airflow.apache.org"
namespace = "${var.namespace_prefix}airflow"
dependency_update = "true"
version = "1.13.0"
version = "1.15.0"
values = [
file("./envs/${var.env}/airflow.yaml")
@ -197,7 +191,7 @@ resource "helm_release" "airflow" {
}
set {
name = "spec.values.env"
name = "spec.values.env"
value = yamlencode([
{
name = "AIRFLOW__WEBSERVER__BASE_URL",
@ -211,13 +205,28 @@ resource "helm_release" "airflow" {
}
set {
name = "images.airflow.repository"
value = "gbloisi/airflow"
name = "dags.gitSync.repo"
value = var.repo_url
}
set {
name = "dags.gitSync.branch"
value = var.branch_name
}
set {
name = "dags.gitSync.subPath"
value = var.dag_path
}
# set {
# name = "images.airflow.repository"
# value = "gbloisi/airflow"
# }
set {
name = "images.airflow.tag"
value = "2.8.3rc1-python3.11"
value = "2.9.3-python3.11"
}
set {

View File

@ -1,12 +1,9 @@
provider "helm" {
# Several Kubernetes authentication methods are possible: https://registry.terraform.io/providers/hashicorp/kubernetes/latest/docs#authentication
kubernetes {
config_path = pathexpand(var.kube_config)
config_context = var.kube_context
terraform {
required_providers {
helm = {
}
kubernetes = {
}
}
}
provider "kubernetes" {
config_path = pathexpand(var.kube_config)
config_context = var.kube_context
}

View File

@ -49,3 +49,18 @@ variable "admin_password" {
variable "admin_hash" {
type = string
}
variable "repo_url" {
type = string
default = "https://code-repo.d4science.org/D-Net/code-infrastructure-lab.git"
}
variable "branch_name" {
type = string
default = "master"
}
variable "dag_path" {
type = string
default = "workflow/dnet"
}

View File

@ -0,0 +1,44 @@
resource "helm_release" "jupyterhub" {
name = "jupyterhub"
chart = "jupyterhub"
repository = "https://hub.jupyter.org/helm-chart/"
create_namespace = "true"
namespace = "${var.namespace_prefix}spark-jobs"
dependency_update = "true"
version = "3.3.8"
set {
name = "ingress.enabled"
value = "true"
}
set {
name = "ingress.ingressClassName"
value = "nginx"
}
set {
name = "ingress.hosts[0]"
value = "jupyter.${var.domain}"
}
set {
name = "singleuser.image.name"
value = "jupyter/all-spark-notebook"
}
set {
name = "singleuser.image.tag"
value = "spark-3.5.0"
}
set {
name = "singleuser.cmd"
value = "start-notebook.py"
}
set {
name = "singleuser.serviceAccountName"
value = "spark"
}
}

View File

@ -0,0 +1,9 @@
terraform {
required_providers {
helm = {
}
kubernetes = {
}
}
}

View File

@ -0,0 +1,25 @@
variable "env" {
type = string
default = "local"
}
variable "kube_config" {
type = string
default = "~/.kube/config"
}
variable "kube_context" {
type = string
default = "default"
}
variable "namespace_prefix" {
type = string
default = "lot1-"
}
variable "domain" {
type = string
default = "local-dataplatform"
}

View File

@ -1,34 +0,0 @@
apiVersion: batch/v1
kind: Job
metadata:
name: create-bucket
namespace: block-storage
spec:
template:
spec:
containers:
- name: createbucket
image: amazon/aws-cli
command: ["aws"]
args:
- s3api
- create-bucket
- --bucket
- postgres
- --endpoint-url
- http://minio:80
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret
key: accesskey
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: secretkey
restartPolicy: Never
backoffLimit: 1

View File

@ -5,5 +5,5 @@ resource "helm_release" "minio_operator" {
create_namespace = "true"
namespace = "minio-operator"
dependency_update = "true"
version = "5.0.12"
version = "6.0.4"
}

View File

@ -6,7 +6,7 @@ resource "helm_release" "minio_tenant" {
create_namespace = "true"
namespace = "${var.namespace_prefix}minio-tenant"
dependency_update = "true"
version = "5.0.12"
version = "6.0.4"
values = [
file("./envs/${var.env}/minio-tenant.yaml")
@ -21,40 +21,26 @@ resource "helm_release" "minio_tenant" {
name = "ingress.console.host"
value = "console-minio.${var.domain}"
}
}
/*
resource "kubernetes_manifest" "minio_ingress" {
manifest = yamldecode(<<YAML
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: ingress-minio
namespace: block-storage
annotations:
kubernetes.io/ingress.class: "nginx"
## Remove if using CA signed certificate
nginx.ingress.kubernetes.io/proxy-ssl-verify: "off"
nginx.ingress.kubernetes.io/backend-protocol: "HTTPS"
nginx.ingress.kubernetes.io/rewrite-target: /
nginx.ingress.kubernetes.io/proxy-body-size: "0"
spec:
ingressClassName: nginx
tls:
- hosts:
- minio.${var.domain}
secretName: nginx-tls
rules:
- host: minio.${var.domain}
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: minio
port:
number: 443
YAML
)
}*/
dynamic "set" {
for_each = var.buckets
content {
name = "tenant.buckets[${set.key}].name"
value = set.value.name
}
}
# set {
# name = "tenant.buckets[0].name"
# value = "workflow-logs"
# }
# set {
# name = "tenant.buckets[1].name"
# value = "binaries"
# }
# ,"binaries","graph","pippo"]
}

View File

@ -1,12 +1,9 @@
provider "helm" {
# Several Kubernetes authentication methods are possible: https://registry.terraform.io/providers/hashicorp/kubernetes/latest/docs#authentication
kubernetes {
config_path = pathexpand(var.kube_config)
config_context = var.kube_context
terraform {
required_providers {
helm = {
}
kubernetes = {
}
}
}
provider "kubernetes" {
config_path = pathexpand(var.kube_config)
config_context = var.kube_context
}

View File

@ -22,3 +22,9 @@ variable "domain" {
type = string
default = "local-dataplatform"
}
variable "buckets" {
type = list(map(string))
default = [ ]
}

12
providers.tf Normal file
View File

@ -0,0 +1,12 @@
provider "helm" {
# Several Kubernetes authentication methods are possible: https://registry.terraform.io/providers/hashicorp/kubernetes/latest/docs#authentication
kubernetes {
config_path = pathexpand(var.kube_config)
config_context = var.kube_context
}
}
provider "kubernetes" {
config_path = pathexpand(var.kube_config)
config_context = var.kube_context
}

View File

@ -1,6 +1,7 @@
FROM spark:3.5.1-scala2.12-java17-ubuntu
# docker build -t dnet-spark:1.0.0 . && kind load docker-image -n dnet-data-platform dnet-spark:1.0.0
FROM spark:3.5.3-scala2.12-java17-ubuntu
user root
USER root
RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar -o ${SPARK_HOME}/jars/hadoop-aws-3.3.4.jar
RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar -o ${SPARK_HOME}/jars/aws-java-sdk-bundle-1.12.262.jar

View File

@ -44,3 +44,21 @@ variable "s3_key" {
variable "s3_secret" {
default = "minio123"
}
variable "minio_buckets" {
type = list(map(string))
default = [
{ name = "workflow-logs" },
{ name = "binaries" },
{ name = "graph" },
]
}
variable "dag_branch_name" {
default = "master"
}
variable "dag_path_name" {
default = "workflow/dnet"
}

0
workflow/__init__.py Normal file
View File

16
workflow/dag_utils.py Normal file
View File

@ -0,0 +1,16 @@
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
def get_bucket_name(context: dict, hook: S3Hook, param_name: str):
bucket_name = context["params"][param_name]
if not bucket_name:
bucket_name = hook.extra_args['bucket_name']
return bucket_name
def get_default_bucket():
hook = S3Hook("s3_conn", transfer_config_args={'use_threads': False})
try:
return hook.service_config['bucket_name']
except KeyError:
return ''

100
workflow/dnet/S3_untar.py Normal file
View File

@ -0,0 +1,100 @@
import os
import tarfile
import time
from datetime import timedelta
import pendulum
from airflow.decorators import dag
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.python import get_current_context
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from botocore.exceptions import ClientError
import dag_utils
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}
def load_file_obj_with_backoff(hook: S3Hook, fileobj, key: str, bucket: str, replace: bool) -> bool:
delay = 10 # initial delay
delay_incr = 10 # additional delay in each loop
max_delay = 60 # max delay of one loop. Total delay is (max_delay**2)/2
while delay < max_delay:
try:
return hook.load_file_obj(fileobj,
key,
bucket,
replace=replace)
except ClientError as err:
code = err.response.get('Error', {}).get('Code', '')
if code in ['NoSuchBucket']:
print(f"Error: {code}. Check s3path: s3://{bucket}/{key}")
raise err
time.sleep(delay)
delay += delay_incr
@dag(
dag_id="s3_untar",
dag_display_name="S3 streaming untar",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
default_args=default_args,
params={
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
"src_bucket": Param(dag_utils.get_default_bucket(), type='string',
description="Override S3 default bucket for source"),
"src_key": Param("", type='string', description="File to untar"),
"dst_bucket": Param(dag_utils.get_default_bucket(), type='string',
description="Override S3 default bucket for destination"),
"dst_key_prefix": Param("", type='string', description="Key prefix for unarchived files"),
},
tags=["s3", "tools"],
)
def s3_untar():
@task
def untar():
context = get_current_context()
hook = S3Hook(context["params"]["S3_CONN_ID"], transfer_config_args={'use_threads': False})
src_bucket = context['params']['src_bucket']
dst_bucket = context['params']['dst_bucket']
dst_key_prefix = os.path.normpath(context["params"]["dst_key_prefix"])
print(f"Existing keys with prefix: {dst_key_prefix}/")
existing_keys = dict.fromkeys(hook.list_keys(bucket_name=dst_bucket,
prefix=dst_key_prefix + "/"), 0)
for k in existing_keys.keys():
print(f"{k}")
s3_obj = hook.get_key(context["params"]["src_key"], bucket_name=src_bucket)
with tarfile.open(fileobj=s3_obj.get()["Body"], mode='r|*') as tar:
for member in tar:
dst_key = os.path.normpath(dst_key_prefix + "/" + member.name)
# Ignore directories, links, devices, fifos, etc.
if (not member.isfile()) or member.name.endswith('/'):
print(f"Skipping {member.name}: is not a file")
continue
if dst_key in existing_keys:
print(f"Skipping {member.name}: already exists as {dst_key}")
continue
print(f"Extracting {member.name} to {dst_key}")
fileobj = tar.extractfile(member)
fileobj.seekable = lambda: False
load_file_obj_with_backoff(hook, fileobj,
dst_key,
dst_bucket,
replace=True)
untar()
s3_untar()

View File

View File

@ -0,0 +1,75 @@
from __future__ import annotations
from airflow.decorators import dag
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
import dag_utils
@dag(
dag_id="build_openaire_graph",
dag_display_name="Build the OpenAIRE graph",
params={
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection for S3 endpoint"),
"GRAPH_PATH": Param("s3a://graph/tmp/prod_provision/graph", type='string', description=""),
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir", type='string', description=""),
"IS_LOOKUP_URL": Param("http://services.openaire.eu:8280/is/services/isLookUp?wsdl", type='string',
description=""),
"DEDUP_CONFIG_ID": Param("dedup-result-decisiontree-v4", type='string', description=""),
"ORCID_PATH": Param("s3a://graph/data/orcid_2023/tables", type='string', description="")
},
tags=["openaire"]
)
def build_new_graph():
chain(
TriggerDagRunOperator(
task_id="dedup",
task_display_name="Deduplicate Research Results",
trigger_dag_id="results_deduplication",
wait_for_completion=True,
conf={
"S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
"INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["inference"],
"OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["dedup"],
"WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/dedup",
"IS_LOOKUP_URL": "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
"DEDUP_CONFIG_ID": "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}"
}
),
TriggerDagRunOperator(
task_id="consistency",
task_display_name="Enforce Consistency of Graph",
trigger_dag_id="consistency_graph",
wait_for_completion=True,
conf={
"S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
"INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["dedup"],
"OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"],
"WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/dedup",
"IS_LOOKUP_URL": "{{ dag_run.conf.get('IS_LOOKUP_URL') }}"
}
),
TriggerDagRunOperator(
task_id="orcid_enrichment",
task_display_name="Enrich Graph with ORCID data",
trigger_dag_id="orcid_enrichment_graph",
wait_for_completion=True,
conf={
"S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
"ORCID_PATH": "{{ dag_run.conf.get('ORCID_PATH') }}",
"INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"],
"OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["orcid_enhancement"],
"WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/orcid_enrichment"
}
)
)
build_new_graph()

View File

@ -0,0 +1,173 @@
from __future__ import annotations
from airflow.decorators import dag
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
import dag_utils
from spark_configurator import SparkConfigurator
@dag(
dag_id="build_openaire_graph_incremental",
dag_display_name="Build the OpenAIRE graph incrementally",
params={
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection for S3 endpoint"),
"GRAPH_PATH": Param("s3a://graph/tmp/prod_provision/graph", type='string', description=""),
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir", type='string', description=""),
"IS_LOOKUP_URL": Param("http://services.openaire.eu:8280/is/services/isLookUp?wsdl", type='string',
description=""),
"DEDUP_CONFIG_ID": Param("dedup-result-decisiontree-v4", type='string', description=""),
"ORCID_PATH": Param("s3a://graph/data/orcid_2023/tables", type='string', description=""),
"DELTA_PATH": Param("s3a://graph/data/delta", type='string', description=""),
},
tags=["openaire"]
)
def build_new_graph():
chain(
SparkKubernetesOperator(
task_id='raw_graph',
task_display_name="Generate Raw Graph",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="rawgraph-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.graph.raw.CopyIncrementalOafSparkApplication",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--inputPath", "{{ dag_run.conf.get('DELTA_PATH') }}",
"--graphOutputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["raw"]
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
),
SparkKubernetesOperator(
task_id='grouped_graph',
task_display_name="Generate Grouped-by-id Graph",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="groupedgraph-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphInputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["raw"],
"--outputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["grouped"],
"--checkpointPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}/grouped_entities",
"--isLookupUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
"--filterInvisible", "false"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
),
SparkKubernetesOperator(
task_id='copygroupedrels',
task_display_name="Copy relations to Grouped-by-id Graph",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="copygroupedrels-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.merge.CopyEntitiesSparkJob",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphInputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["raw"],
"--outputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["grouped"],
"--entities", "relation",
"--format", "text"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
),
TriggerDagRunOperator(
task_id="clean_graph",
task_display_name="Clean Results",
trigger_dag_id="clean_graph",
wait_for_completion=True,
conf={
"S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
"INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["grouped"],
"OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["clean"],
"WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/dedup",
"IS_LOOKUP_URL": "{{ dag_run.conf.get('IS_LOOKUP_URL') }}"
}
),
SparkKubernetesOperator(
task_id='resolverels',
task_display_name="Resolve Relations",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="resolverels-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.graph.resolution.SparkResolveRelationById",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["clean"],
"--targetPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["resolve"],
"--relationPath",
"{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"] + "/relation"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
),
SparkKubernetesOperator(
task_id='copyresolveents',
task_display_name="Copy entities to Resolved Graph",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="copyresolveents-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.merge.CopyEntitiesSparkJob",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphInputPath",
"{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["clean"],
"--outputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["resolve"],
"--entities", ",".join([item for item in dag_utils.GRAPH_ENTITIES if item != "relation"]),
"--format", "text"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
# , TriggerDagRunOperator(
# task_id="dedup",
# task_display_name="Deduplicate Research Results",
# trigger_dag_id="results_deduplication",
# wait_for_completion=True,
# conf={
# "S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
#
# "INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["inference"],
# "OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["dedup"],
# "WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/dedup",
# "IS_LOOKUP_URL": "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
# "DEDUP_CONFIG_ID": "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}"
# }
# ),
# TriggerDagRunOperator(
# task_id="consistency",
# task_display_name="Enforce Consistency of Graph",
# trigger_dag_id="consistency_graph",
# wait_for_completion=True,
#
# conf={
# "S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
#
# "INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["dedup"],
# "OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"],
# "WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/dedup",
# "IS_LOOKUP_URL": "{{ dag_run.conf.get('IS_LOOKUP_URL') }}"
# }
# ),
# TriggerDagRunOperator(
# task_id="orcid_enrichment",
# task_display_name="Enrich Graph with ORCID data",
# trigger_dag_id="orcid_enrichment_graph",
# wait_for_completion=True,
#
# conf={
# "S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
#
# "ORCID_PATH": "{{ dag_run.conf.get('ORCID_PATH') }}",
# "INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"],
# "OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["orcid_enhancement"],
# "WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/orcid_enrichment"
# }
# )
)
build_new_graph()

109
workflow/dnet/clean.py Normal file
View File

@ -0,0 +1,109 @@
import os
from datetime import timedelta
from airflow.decorators import dag
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
import dag_utils
from spark_configurator import SparkConfigurator
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
}
@dag(
dag_id="clean_graph",
dag_display_name="Cleaning of Graph",
default_args=default_args,
params={
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
"POSTGRES_CONN_ID": Param("postgres_conn", type='string', description=""),
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/02_graph_grouped", type='string', description=""),
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/03_graph_cleaned", type='string', description=""),
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir/clean", type='string', description=""),
"IS_LOOKUP_URL": Param("http://services.openaire.eu:8280/is/services/isLookUp?wsdl", type='string',
description=""),
"COUNTRY": Param("NL", type='string', description=""),
"SHOULD_CLEAN": Param("false", type='string', description=""),
"CONTEXT_ID": Param("sobigdata", type='string', description=""),
"VERIFY_PARAM": Param("gcube", type='string', description=""),
"VERIFY_COUNTRY_PARAM": Param("10.17632;10.5061", type='string', description=""),
"COLLECTED_FROM": Param("NARCIS", type='string', description="")
},
tags=["openaire"]
)
def clean_graph_dag():
getdatasourcefromcountry = SparkKubernetesOperator(
task_id='getdatasourcefromcountry',
task_display_name="Get datasource from Country",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="getdatasourcefromcountry-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.graph.clean.GetDatasourceFromCountry",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--inputPath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--country", "{{ dag_run.conf.get('COUNTRY') }}",
"--workingDir", "{{ dag_run.conf.get('WRKDIR_PATH') }}/working/hostedby"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
masterduplicateaction = SparkKubernetesOperator(
task_id='masterduplicateaction',
task_display_name="MasterDuplicateAction",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="masterduplicateaction-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.graph.clean.MasterDuplicateAction",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--hdfsNameNode", "s3a://graph/",
"--hdfsPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}/masterduplicate",
"--postgresUrl", "jdbc:postgresql://{{ conn.get(dag_run.conf.get('POSTGRES_CONN_ID')).host }}:{{ conn.get(dag_run.conf.get('POSTGRES_CONN_ID')).port }}/dnet_openaireplus",
"--postgresUser", "{{ conn.get(dag_run.conf.get('POSTGRES_CONN_ID')).login }}",
"--postgresPassword", "{{ conn.get(dag_run.conf.get('POSTGRES_CONN_ID')).password }}"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
clean_tasks = []
for entity in dag_utils.GRAPH_ENTITIES:
clean_tasks.append(SparkKubernetesOperator(
task_id='cleansparkjob_' + entity,
task_display_name="Clean " + entity,
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="cleansparkjob-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=[
"--inputPath", "{{ dag_run.conf.get('INPUT_PATH') }}/" + entity,
"--outputPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}/" + entity,
"--graphTableClassName", dag_utils.GRAPH_ENTITIES_CLASS_NAMES[entity],
"--isLookupUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
"--contextId", "{{ dag_run.conf.get('CONTEXT_ID') }}",
"--verifyParam", "{{ dag_run.conf.get('VERIFY_PARAM') }}",
"--country", "{{ dag_run.conf.get('COUNTRY') }}",
"--verifyCountryParam", "{{ dag_run.conf.get('VERIFY_COUNTRY_PARAM') }}",
"--hostedBy", "{{ dag_run.conf.get('WRKDIR_PATH') }}/working/hostedby",
"--collectedfrom", "{{ dag_run.conf.get('COLLECTED_FROM') }}",
"--masterDuplicatePath", "{{ dag_run.conf.get('WRKDIR_PATH') }}/masterduplicate",
"--deepClean", "{{ dag_run.conf.get('SHOULD_CLEAN') }}"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
))
chain(getdatasourcefromcountry,
#masterduplicateaction,
clean_tasks)
clean_graph_dag()

View File

@ -0,0 +1,71 @@
import os
from datetime import timedelta
from airflow.decorators import dag
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from spark_configurator import SparkConfigurator
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
}
@dag(
dag_id="consistency_graph",
dag_display_name="Enforce Consistency of Graph",
default_args=default_args,
params={
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/06_graph_dedup", type='string', description=""),
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/07_graph_consistent", type='string', description=""),
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir/dedup", type='string', description=""),
"IS_LOOKUP_URL": Param("http://services.openaire.eu:8280/is/services/isLookUp?wsdl", type='string',
description="")
},
tags=["openaire"]
)
def consistency_graph_dag():
propagate_rel = SparkKubernetesOperator(
task_id='PropagateRelation',
task_display_name="Propagate Relations",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="propagaterels-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkPropagateRelation",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--graphOutputPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
group_entities = SparkKubernetesOperator(
task_id='GroupEntities',
task_display_name="Group results by id",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="groupentities-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphInputPath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--checkpointPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}/grouped_entities",
"--outputPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}",
"--isLookupUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
"--filterInvisible", "true"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
chain(propagate_rel, group_entities)
consistency_graph_dag()

View File

@ -0,0 +1,44 @@
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
BUILD_PHASES = {
"raw": "01_graph_raw",
"grouped": "02_graph_grouped",
"clean": "03_graph_cleaned",
"resolve": "04_graph_resolved",
"inference": "05_graph_inferred",
"dedup": "06_graph_dedup",
"consistency": "07_graph_consistent",
"enrichment": "08_graph_dedup_enriched", # actionset
"orcid_enhancement": "09_graph_orcid_enriched"
}
def get_bucket_name(context: dict, hook: S3Hook, param_name: str):
bucket_name = context["params"][param_name]
if not bucket_name:
bucket_name = hook.extra_args['bucket_name']
return bucket_name
def get_default_bucket():
hook = S3Hook("s3_conn", transfer_config_args={'use_threads': False})
try:
return hook.service_config['bucket_name']
except KeyError:
return ''
GRAPH_ENTITIES = ["publication", "dataset", "otherresearchproduct", "software", "datasource", "organization", "project", "relation"]
GRAPH_ENTITIES_CLASS_NAMES = {
"publication": "eu.dnetlib.dhp.schema.oaf.Publication",
"dataset": "eu.dnetlib.dhp.schema.oaf.Dataset",
"otherresearchproduct": "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
"software": "eu.dnetlib.dhp.schema.oaf.Software",
"datasource": "eu.dnetlib.dhp.schema.oaf.Datasource",
"organization": "eu.dnetlib.dhp.schema.oaf.Organization",
"project": "eu.dnetlib.dhp.schema.oaf.Project",
"relation": "eu.dnetlib.dhp.schema.oaf.Relation"
}

173
workflow/dnet/dedup.py Normal file
View File

@ -0,0 +1,173 @@
import os
from datetime import timedelta
from airflow.decorators import dag
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from spark_configurator import SparkConfigurator
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
}
@dag(
dag_id="results_deduplication",
dag_display_name="Deduplicate Research Results",
default_args=default_args,
params={
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/05_graph_inferred", type='string', description=""),
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/06_graph_dedup", type='string', description=""),
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir/dedup", type='string', description=""),
"IS_LOOKUP_URL": Param("http://services.openaire.eu:8280/is/services/isLookUp?wsdl", type='string',
description=""),
"DEDUP_CONFIG_ID": Param("dedup-result-decisiontree-v4", type='string', description="")
},
tags=["openaire"]
)
def results_deduplication_dag():
simrel = SparkKubernetesOperator(
task_id='CreateSimRel',
task_display_name="Create Similarity Relations",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="createsimrels-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
"--numPartitions", "64"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
whitelist = SparkKubernetesOperator(
task_id='WhitelistSimRels',
task_display_name="Add Whitelist Similarity Relations",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="whitelistsimrels-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkWhitelistSimRels",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
"--whiteListPath", "s3a://graph/data/dedup/whitelist_prod", # TODO: copy!
"--numPartitions", "64"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
createmergerel = SparkKubernetesOperator(
task_id='CreateMergeRels',
task_display_name="Create Merge Relations",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="createmergerels-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
"--cutConnectedComponent", "200",
"--hiveMetastoreUris", "",
"--pivotHistoryDatabase", ""
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
creatededuprecord = SparkKubernetesOperator(
task_id='CreateDedupRecord',
task_display_name="Create Dedup Record",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="creatededuprecord-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
copyopenorgsmergerel = SparkKubernetesOperator(
task_id='CopyOpenorgsMergeRels',
task_display_name="Copy Openorgs Merge Relations",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="copyopenorgsmergerels-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgsMergeRels",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
"--numPartitions", "64"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
createorgsdeduprecord = SparkKubernetesOperator(
task_id='CreateOrgsDedupRecord',
task_display_name="Create Organizations Dedup Records",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="createorgsdeduprecord-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCreateOrgsDedupRecord",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
updateentity = SparkKubernetesOperator(
task_id='UpdateEntity',
task_display_name="Update Entity",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="updateentity-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkUpdateEntity",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
"--dedupGraphPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
copyrelations = SparkKubernetesOperator(
task_id='copyRelations',
task_display_name="Copy Non-Openorgs Relations",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="copyrelations-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCopyRelationsNoOpenorgs",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
"--dedupGraphPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
chain(simrel, whitelist, createmergerel, creatededuprecord, copyopenorgsmergerel, createorgsdeduprecord, updateentity, copyrelations)
results_deduplication_dag()

View File

@ -0,0 +1,70 @@
import os
from datetime import timedelta
from airflow.decorators import dag
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from spark_configurator import SparkConfigurator
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
}
@dag(
dag_id="orcid_enrichment_graph",
dag_display_name="Enrich Graph with ORCID data",
default_args=default_args,
params={
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
"ORCID_PATH": Param("s3a://graph/data/orcid_2023/tables", type='string', description=""),
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/07_graph_consistent", type='string', description=""),
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/09_graph_orcid_enriched", type='string',
description=""),
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir/orcid_enrichment", type='string',
description="")
},
tags=["openaire"]
)
def orcid_enrichment_dag():
chain(SparkKubernetesOperator(
task_id='EnrichGraphWithOrcidAuthors',
task_display_name='Enrich Authors with ORCID',
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="orcidenrich-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.enrich.orcid.SparkEnrichGraphWithOrcidAuthors",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--orcidPath", "{{ dag_run.conf.get('ORCID_PATH') }}",
"--graphPath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--targetPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}",
"--workingDir", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
"--master", ""
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
),
SparkKubernetesOperator(
task_id='copyorcidenrichrels',
task_display_name="Copy relations to ORCID Enriched graph",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="copygroupedrels-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.oa.merge.CopyEntitiesSparkJob",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--graphInputPath", "{{ dag_run.conf.get('INPUT_PATH') }}/",
"--outputPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}/",
"--entities", "relation",
"--format", "text"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
)
orcid_enrichment_dag()

View File

@ -0,0 +1,55 @@
import os
from datetime import timedelta
from airflow.decorators import dag
from airflow.models.param import Param
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from spark_configurator import SparkConfigurator
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
}
@dag(
dag_id="orcid_propagation_graph",
dag_display_name="Propagate ORCID data in graph",
default_args=default_args,
params={
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
"ORCID_PATH": Param("s3a://graph/tmp/prod_provision/graph/09_graph_orcid_enriched", type='string', description=""),
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/09_graph_orcid_enriched", type='string', description=""),
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/10_graph_propagated", type='string',
description=""),
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir/orcid_propagation", type='string',
description="")
},
tags=["openaire"]
)
def orcid_propagation_dag():
orcid_propagate = SparkKubernetesOperator(
task_id='PropagateGraphWithOrcid',
task_display_name="Propagate ORCID data",
namespace='dnet-spark-jobs',
template_spec=SparkConfigurator(
name="orcidpropagate-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkPropagateOrcidAuthor",
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments=["--orcidPath", "{{ dag_run.conf.get('ORCID_PATH') }}",
"--graphPath", "{{ dag_run.conf.get('INPUT_PATH') }}",
"--targetPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}",
"--workingDir", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
"--matchingSource", "graph"
]).get_configuration(),
kubernetes_conn_id="kubernetes_default"
)
orcid_propagate
orcid_propagation_dag()

View File

@ -0,0 +1,87 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This is an example DAG which uses SparkKubernetesOperator and SparkKubernetesSensor.
In this example, we create two tasks which execute sequentially.
The first task is to submit sparkApplication on Kubernetes cluster(the example uses spark-pi application).
and the second task is to check the final state of the sparkApplication that submitted in the first state.
Spark-on-k8s operator is required to be already installed on Kubernetes
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
"""
from os import path
from datetime import timedelta, datetime
from spark_configurator import SparkConfigurator
# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
from airflow.utils.dates import days_ago
# [END import_module]
# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'max_active_runs': 1,
'retries': 3
}
spec =SparkConfigurator(
name="spark-scholix-{{ ds }}-{{ task_instance.try_number }}",
mainClass="eu.dnetlib.dhp.sx.graph.SparkCreateScholexplorerDump",
jarLocation = 's3a://deps/dhp-shade-package-1.2.5-SNAPSHOT.jar',
arguments =[ "--sourcePath", "s3a://raw-graph/01", "--targetPath", "s3a://scholix"],\
executor_cores=4,
executor_memory="4G",
executor_instances=1,
executor_memoryOverhead="3G").get_configuration()
dag = DAG(
'spark_run_test',
default_args=default_args,
schedule_interval=None,
tags=['example', 'spark']
)
submit = SparkKubernetesOperator(
task_id='spark-scholix',
namespace='dnet-spark-jobs',
template_spec=spec,
kubernetes_conn_id="kubernetes_default",
# do_xcom_push=True,
# delete_on_termination=True,
# base_container_name="spark-kubernetes-driver",
dag=dag
)
submit

View File

@ -0,0 +1,106 @@
class SparkConfigurator:
def __init__(self,
name,
mainClass,
jarLocation:str,
arguments,
apiVersion=None,
namespace="dnet-spark-jobs",
image= "dnet-spark:1.0.0",
driver_cores=1,
driver_memory='1G',
executor_cores=8,
executor_memory="16G",
executor_memoryOverhead="8G",
executor_instances=1
) -> None:
if apiVersion:
self.apiVersion = apiVersion
else:
self.apiVersion = "sparkoperator.k8s.io/v1beta2"
self.namespace= namespace
self.name = name
self.image= image
self.mainClass = mainClass
self.jarLocation = jarLocation
self.arguments= arguments
self.s3Configuration = {
"spark.driver.extraJavaOptions": "-Divy.cache.dir=/tmp -Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true",
"spark.executor.extraJavaOptions": "-Divy.cache.dir=/tmp -Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true",
"spark.hadoop.fs.defaultFS": "s3a://graph",
"spark.hadoop.fs.s3a.access.key": "minio",
"spark.hadoop.fs.s3a.secret.key": "minio123",
"spark.hadoop.fs.s3a.endpoint": "https://minio.dnet-minio-tenant.svc.cluster.local",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3a.path.style.access": "true",
"spark.hadoop.fs.s3a.attempts.maximum": "1",
"spark.hadoop.fs.s3a.connection.establish.timeout": "5000",
"spark.hadoop.fs.s3a.connection.timeout": "10001",
"spark.hadoop.fs.s3a.connection.ssl.enabled": "false",
"com.amazonaws.sdk.disableCertChecking": "true",
"com.cloudera.com.amazonaws.sdk.disableCertChecking": "true",
"fs.s3a.connection.ssl.strictverify": "false",
"fs.s3a.connection.ssl.enabled": "false",
"fs.s3a.ssl.enabled": "false",
"spark.hadoop.fs.s3a.ssl.enabled": "false"
}
self.sparkResoruceConf= {
'driver_cores':driver_cores,
'driver_memory':driver_memory,
'executor_cores':executor_cores,
'executor_memory':executor_memory,
'executor_instances':executor_instances,
'memoryOverhead':executor_memoryOverhead
}
def get_configuration(self) -> dict:
return {
"apiVersion": self.apiVersion,
"kind": "SparkApplication",
"metadata": {
"name": self.name,
"namespace": self.namespace
},
"spec": {
"type": "Scala",
"mode": "cluster",
"image":self.image,
"imagePullPolicy": "IfNotPresent",
"mainClass": self.mainClass,
"mainApplicationFile": self.jarLocation,
"deps": {
"jar": [self.jarLocation]
},
"arguments": self.arguments,
"sparkVersion": "3.5.3",
"sparkConf": self.s3Configuration,
"restartPolicy": {
"type": "Never"
},
"dynamicAllocation": {
"enables": True
},
"driver": {
"javaOptions": "-Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true",
"cores": self.sparkResoruceConf['driver_cores'],
"coreLimit": "1200m",
"memory": self.sparkResoruceConf['driver_memory'],
"labels": {
"version": "3.5.3"
},
"serviceAccount": "spark"
},
"executor": {
"javaOptions": "-Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true",
"cores": self.sparkResoruceConf['executor_cores'],
"memoryOverhead": self.sparkResoruceConf['memoryOverhead'],
"memory": self.sparkResoruceConf['executor_memory'],
"instances": self.sparkResoruceConf['executor_instances'],
"labels": {
"version": "3.5.3"
}
}
}
}