Merge pull request 'oct-update' (#1) from oct-update into master
Reviewed-on: #1
This commit is contained in:
commit
3e5781e52e
|
@ -0,0 +1,7 @@
|
|||
# docker build -t spark-operator:2.0.2 . && kind load docker-image -n dnet-data-platform spark-operator:2.0.2
|
||||
FROM kubeflow/spark-operator:2.0.2
|
||||
|
||||
ENV SPARK_HOME /opt/spark
|
||||
USER root
|
||||
RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar -o ${SPARK_HOME}/jars/hadoop-aws-3.3.4.jar
|
||||
RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar -o ${SPARK_HOME}/jars/aws-java-sdk-bundle-1.12.262.jar
|
|
@ -1,13 +0,0 @@
|
|||
|
||||
webapp:
|
||||
ingress:
|
||||
enabled: true
|
||||
className: "nginx"
|
||||
annotations:
|
||||
kubernetes.io/ingress.class: nginx
|
||||
hosts:
|
||||
- host: localhost
|
||||
paths:
|
||||
- path: /
|
||||
pathType: ImplementationSpecific
|
||||
tls: []
|
|
@ -31,9 +31,9 @@ dags:
|
|||
enabled: true
|
||||
gitSync:
|
||||
enabled: true
|
||||
repo: "https://code-repo.d4science.org/D-Net/code-infrasturcutre-lab.git"
|
||||
branch: "airflow"
|
||||
subPath: "airflow/dags"
|
||||
repo: "https://code-repo.d4science.org/D-Net/code-infrastructure-lab.git"
|
||||
branch: "master"
|
||||
subPath: "workflow/dnet"
|
||||
|
||||
config:
|
||||
webserver:
|
||||
|
@ -42,7 +42,7 @@ config:
|
|||
logging:
|
||||
remote_logging: "True"
|
||||
logging_level: "INFO"
|
||||
remote_base_log_folder: "s3://dnet-airflow/logs"
|
||||
remote_base_log_folder: "s3://workflow-logs/logs"
|
||||
remote_log_conn_id: "s3_conn"
|
||||
encrypt_s3_logs: "False"
|
||||
|
||||
|
|
|
@ -1,42 +1,3 @@
|
|||
|
||||
|
||||
###
|
||||
# Root key for dynamically creating a secret for use with configuring root MinIO User
|
||||
# Specify the ``name`` and then a list of environment variables.
|
||||
#
|
||||
# .. important::
|
||||
#
|
||||
# Do not use this in production environments.
|
||||
# This field is intended for use with rapid development or testing only.
|
||||
#
|
||||
# For example:
|
||||
#
|
||||
# .. code-block:: yaml
|
||||
#
|
||||
# name: myminio-env-configuration
|
||||
# accessKey: minio
|
||||
# secretKey: minio123
|
||||
#
|
||||
secrets:
|
||||
name: myminio-env-configuration
|
||||
accessKey: minio
|
||||
secretKey: minio123
|
||||
###
|
||||
# The name of an existing Kubernetes secret to import to the MinIO Tenant
|
||||
# The secret must contain a key ``config.env``.
|
||||
# The values should be a series of export statements to set environment variables for the Tenant.
|
||||
# For example:
|
||||
#
|
||||
# .. code-block:: shell
|
||||
#
|
||||
# stringData:
|
||||
# config.env: | -
|
||||
# export MINIO_ROOT_USER=ROOTUSERNAME
|
||||
# export MINIO_ROOT_PASSWORD=ROOTUSERPASSWORD
|
||||
#
|
||||
#existingSecret:
|
||||
# name: myminio-env-configuration
|
||||
###
|
||||
# Root key for MinIO Tenant Chart
|
||||
tenant:
|
||||
###
|
||||
|
@ -47,14 +8,14 @@ tenant:
|
|||
###
|
||||
# Specify the Operator container image to use for the deployment.
|
||||
# ``image.tag``
|
||||
# For example, the following sets the image to the ``quay.io/minio/operator`` repo and the v5.0.12 tag.
|
||||
# For example, the following sets the image to the ``quay.io/minio/operator`` repo and the v6.0.4 tag.
|
||||
# The container pulls the image if not already present:
|
||||
#
|
||||
# .. code-block:: yaml
|
||||
#
|
||||
# image:
|
||||
# repository: quay.io/minio/minio
|
||||
# tag: RELEASE.2024-02-09T21-25-16Z
|
||||
# tag: RELEASE.2024-10-02T17-50-41Z
|
||||
# pullPolicy: IfNotPresent
|
||||
#
|
||||
# The chart also supports specifying an image based on digest value:
|
||||
|
@ -69,7 +30,7 @@ tenant:
|
|||
#
|
||||
image:
|
||||
repository: quay.io/minio/minio
|
||||
tag: RELEASE.2024-02-09T21-25-16Z
|
||||
tag: RELEASE.2024-10-02T17-50-41Z
|
||||
pullPolicy: IfNotPresent
|
||||
###
|
||||
#
|
||||
|
@ -87,6 +48,44 @@ tenant:
|
|||
configuration:
|
||||
name: myminio-env-configuration
|
||||
###
|
||||
# Root key for dynamically creating a secret for use with configuring root MinIO User
|
||||
# Specify the ``name`` and then a list of environment variables.
|
||||
#
|
||||
# .. important::
|
||||
#
|
||||
# Do not use this in production environments.
|
||||
# This field is intended for use with rapid development or testing only.
|
||||
#
|
||||
# For example:
|
||||
#
|
||||
# .. code-block:: yaml
|
||||
#
|
||||
# name: myminio-env-configuration
|
||||
# accessKey: minio
|
||||
# secretKey: minio123
|
||||
#
|
||||
configSecret:
|
||||
name: myminio-env-configuration
|
||||
accessKey: minio
|
||||
secretKey: minio123
|
||||
#existingSecret: true
|
||||
|
||||
###
|
||||
# If this variable is set to true, then enable the usage of an existing Kubernetes secret to set environment variables for the Tenant.
|
||||
# The existing Kubernetes secret name must be placed under .tenant.configuration.name e.g. existing-minio-env-configuration
|
||||
# The secret must contain a key ``config.env``.
|
||||
# The values should be a series of export statements to set environment variables for the Tenant.
|
||||
# For example:
|
||||
#
|
||||
# .. code-block:: shell
|
||||
#
|
||||
# stringData:
|
||||
# config.env: |-
|
||||
# export MINIO_ROOT_USER=ROOTUSERNAME
|
||||
# export MINIO_ROOT_PASSWORD=ROOTUSERPASSWORD
|
||||
#
|
||||
# existingSecret: false
|
||||
###
|
||||
# Top level key for configuring MinIO Pool(s) in this Tenant.
|
||||
#
|
||||
# See `Operator CRD: Pools <https://min.io/docs/minio/kubernetes/upstream/reference/operator-crd.html#pool>`__ for more information on all subfields.
|
||||
|
@ -104,7 +103,7 @@ tenant:
|
|||
volumesPerServer: 4
|
||||
###
|
||||
# The capacity per volume requested per MinIO Tenant Pod.
|
||||
size: 1Gi
|
||||
size: 50Gi
|
||||
###
|
||||
# The `storageClass <https://kubernetes.io/docs/concepts/storage/storage-classes/>`__ to associate with volumes generated for this pool.
|
||||
#
|
||||
|
@ -166,6 +165,12 @@ tenant:
|
|||
runAsUser: 1000
|
||||
runAsGroup: 1000
|
||||
runAsNonRoot: true
|
||||
allowPrivilegeEscalation: false
|
||||
capabilities:
|
||||
drop:
|
||||
- ALL
|
||||
seccompProfile:
|
||||
type: RuntimeDefault
|
||||
###
|
||||
#
|
||||
# An array of `Topology Spread Constraints <https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/>`__ to associate to Operator Console pods.
|
||||
|
@ -225,6 +230,10 @@ tenant:
|
|||
# Enable automatic Kubernetes based `certificate generation and signing <https://kubernetes.io/docs/tasks/tls/managing-tls-in-a-cluster>`__
|
||||
requestAutoCert: true
|
||||
###
|
||||
# The minimum number of days to expiry before an alert for an expiring certificate is fired.
|
||||
# In the below example, if a given certificate will expire in 7 days then expiration events will only be triggered 1 day before expiry
|
||||
# certExpiryAlertThreshold: 1
|
||||
###
|
||||
# This field is used only when ``requestAutoCert: true``.
|
||||
# Use this field to set CommonName for the auto-generated certificate.
|
||||
# MinIO defaults to using the internal Kubernetes DNS name for the pod
|
||||
|
@ -249,6 +258,8 @@ tenant:
|
|||
# objectLock: false # optional
|
||||
# region: us-east-1 # optional
|
||||
buckets: [ ]
|
||||
|
||||
|
||||
###
|
||||
# Array of Kubernetes secrets from which the Operator generates MinIO users during tenant provisioning.
|
||||
#
|
||||
|
@ -271,6 +282,9 @@ tenant:
|
|||
# Refer
|
||||
startup: { }
|
||||
###
|
||||
# The `Lifecycle hooks <https://kubernetes.io/docs/concepts/containers/container-lifecycle-hooks/>`__ for container.
|
||||
lifecycle: { }
|
||||
###
|
||||
# Directs the Operator to deploy the MinIO S3 API and Console services as LoadBalancer objects.
|
||||
#
|
||||
# If the Kubernetes cluster has a configured LoadBalancer, it can attempt to route traffic to those services automatically.
|
||||
|
@ -337,14 +351,14 @@ tenant:
|
|||
# # Image from tag (original behavior), for example:
|
||||
# # image:
|
||||
# # repository: quay.io/minio/kes
|
||||
# # tag: 2024-01-11T13-09-29Z
|
||||
# # tag: 2024-09-11T07-22-50Z
|
||||
# # Image from digest (added after original behavior), for example:
|
||||
# # image:
|
||||
# # repository: quay.io/minio/kes@sha256
|
||||
# # digest: fb15af611149892f357a8a99d1bcd8bf5dae713bd64c15e6eb27fbdb88fc208b
|
||||
# image:
|
||||
# repository: quay.io/minio/kes
|
||||
# tag: 2024-01-11T13-09-29Z
|
||||
# tag: 2024-09-11T07-22-50Z
|
||||
# pullPolicy: IfNotPresent
|
||||
# env: [ ]
|
||||
# replicas: 2
|
||||
|
@ -417,6 +431,17 @@ tenant:
|
|||
# runAsGroup: 1000
|
||||
# runAsNonRoot: true
|
||||
# fsGroup: 1000
|
||||
# containerSecurityContext:
|
||||
# runAsUser: 1000
|
||||
# runAsGroup: 1000
|
||||
# runAsNonRoot: true
|
||||
# allowPrivilegeEscalation: false
|
||||
# capabilities:
|
||||
# drop:
|
||||
# - ALL
|
||||
# seccompProfile:
|
||||
# type: RuntimeDefault
|
||||
|
||||
###
|
||||
# Configures `Ingress <https://kubernetes.io/docs/concepts/services-networking/ingress/>`__ for the Tenant S3 API and Console.
|
||||
#
|
||||
|
@ -428,7 +453,7 @@ ingress:
|
|||
labels: { }
|
||||
annotations:
|
||||
nginx.ingress.kubernetes.io/backend-protocol: "HTTPS"
|
||||
nginx.ingress.kubernetes.io/proxy-body-size: 100m
|
||||
nginx.ingress.kubernetes.io/proxy-body-size: 10000m
|
||||
tls: [ ]
|
||||
host: minio.local
|
||||
path: /
|
||||
|
@ -439,6 +464,7 @@ ingress:
|
|||
labels: { }
|
||||
annotations:
|
||||
nginx.ingress.kubernetes.io/backend-protocol: "HTTPS"
|
||||
nginx.ingress.kubernetes.io/proxy-body-size: 10000m
|
||||
tls: [ ]
|
||||
host: minio-console.local
|
||||
path: /
|
||||
|
@ -451,7 +477,7 @@ ingress:
|
|||
# kind: Secret
|
||||
# type: Opaque
|
||||
# metadata:
|
||||
# name: {{ dig "secrets" "existingSecret" "" (.Values | merge (dict)) }}
|
||||
# name: {{ dig "tenant" "configSecret" "name" "" (.Values | merge (dict)) }}
|
||||
# stringData:
|
||||
# config.env: |-
|
||||
# export MINIO_ROOT_USER='minio'
|
||||
|
|
14
main.tf
14
main.tf
|
@ -1,7 +1,8 @@
|
|||
module "minio" {
|
||||
source = "./modules/minio"
|
||||
kube_context = var.kube_context
|
||||
namespace_prefix=var.namespace_prefix
|
||||
namespace_prefix = var.namespace_prefix
|
||||
buckets = var.minio_buckets
|
||||
}
|
||||
|
||||
|
||||
|
@ -10,12 +11,21 @@ module "airflow" {
|
|||
kube_context = var.kube_context
|
||||
admin_user = var.admin_user
|
||||
admin_password = var.admin_password
|
||||
namespace_prefix= var.namespace_prefix
|
||||
namespace_prefix = var.namespace_prefix
|
||||
admin_hash = var.admin_hash
|
||||
env = var.env
|
||||
domain = var.domain
|
||||
s3_endpoint = var.s3_endpoint
|
||||
s3_key = var.s3_key
|
||||
s3_secret = var.s3_secret
|
||||
branch_name = var.dag_branch_name
|
||||
dag_path = var.dag_path_name
|
||||
|
||||
}
|
||||
|
||||
module "jupyterhub" {
|
||||
source = "./modules/jupyterhub"
|
||||
kube_context = var.kube_context
|
||||
namespace_prefix = var.namespace_prefix
|
||||
domain = var.domain
|
||||
}
|
||||
|
|
|
@ -22,14 +22,16 @@ resource "kubernetes_role" "airflow_spark_role" {
|
|||
|
||||
rule {
|
||||
api_groups = ["sparkoperator.k8s.io"]
|
||||
resources = ["sparkapplications", "sparkapplications/status",
|
||||
"scheduledsparkapplications", "scheduledsparkapplications/status"]
|
||||
resources = [
|
||||
"sparkapplications", "sparkapplications/status",
|
||||
"scheduledsparkapplications", "scheduledsparkapplications/status"
|
||||
]
|
||||
verbs = ["*"]
|
||||
}
|
||||
|
||||
rule {
|
||||
api_groups = [""]
|
||||
resources = ["pods/log"]
|
||||
resources = ["pods", "pods/log"]
|
||||
verbs = ["*"]
|
||||
}
|
||||
}
|
||||
|
@ -73,28 +75,6 @@ resource "kubernetes_role_binding_v1" "airflow_spark_role_binding2" {
|
|||
name = "spark-role"
|
||||
}
|
||||
}
|
||||
#
|
||||
#
|
||||
# resource "kubernetes_role_binding_v1" "spark_role_binding" {
|
||||
# depends_on = [kubernetes_namespace.spark_jobs_namespace]
|
||||
# metadata {
|
||||
# name = "spark-role-binding"
|
||||
# namespace = "${var.namespace_prefix}spark-jobs"
|
||||
# }
|
||||
#
|
||||
# subject {
|
||||
# kind = "ServiceAccount"
|
||||
# name = "spark"
|
||||
# namespace = "${var.namespace_prefix}spark-jobs"
|
||||
# }
|
||||
#
|
||||
# role_ref {
|
||||
# api_group = "rbac.authorization.k8s.io"
|
||||
# kind = "Role"
|
||||
# name = "spark-role"
|
||||
# }
|
||||
# }
|
||||
#
|
||||
|
||||
resource "helm_release" "gcp_spark_operator" {
|
||||
depends_on = [kubernetes_namespace.spark_jobs_namespace]
|
||||
|
@ -104,25 +84,40 @@ resource "helm_release" "gcp_spark_operator" {
|
|||
create_namespace = "true"
|
||||
namespace = "${var.namespace_prefix}gcp-spark-operator"
|
||||
dependency_update = "true"
|
||||
version = "1.2.7"
|
||||
version = "2.0.2"
|
||||
|
||||
set {
|
||||
name = "image.repository"
|
||||
value = "kubeflow/spark-operator"
|
||||
value = "spark-operator"
|
||||
}
|
||||
|
||||
set {
|
||||
name = "image.tag"
|
||||
value = "v1beta2-1.4.5-3.5.0"
|
||||
value = "2.0.2"
|
||||
}
|
||||
|
||||
set {
|
||||
name = "sparkJobNamespaces"
|
||||
name = "spark.jobNamespaces"
|
||||
value = "{${var.namespace_prefix}spark-jobs}"
|
||||
}
|
||||
|
||||
set {
|
||||
name = "serviceAccounts.spark.name"
|
||||
name = "spark.serviceAccount.create"
|
||||
value = "true"
|
||||
}
|
||||
|
||||
set {
|
||||
name = "spark.serviceAccount.name"
|
||||
value = "spark"
|
||||
}
|
||||
|
||||
set {
|
||||
name = "controller.serviceAccount.create"
|
||||
value = "true"
|
||||
}
|
||||
|
||||
set {
|
||||
name = "controller.serviceAccount.name"
|
||||
value = "spark"
|
||||
}
|
||||
|
||||
|
@ -132,7 +127,7 @@ resource "helm_release" "gcp_spark_operator" {
|
|||
}
|
||||
|
||||
set {
|
||||
name = "ingressUrlFormat"
|
||||
name = "driver.ingressUrlFormat"
|
||||
value = "\\{\\{$appName\\}\\}.\\{\\{$appNamespace\\}\\}.${var.domain}"
|
||||
type = "string"
|
||||
}
|
||||
|
@ -171,7 +166,6 @@ EOT
|
|||
}
|
||||
|
||||
|
||||
|
||||
resource "helm_release" "airflow" {
|
||||
depends_on = [kubernetes_secret.s3_conn_secrets]
|
||||
|
||||
|
@ -180,7 +174,7 @@ resource "helm_release" "airflow" {
|
|||
repository = "https://airflow.apache.org"
|
||||
namespace = "${var.namespace_prefix}airflow"
|
||||
dependency_update = "true"
|
||||
version = "1.13.0"
|
||||
version = "1.15.0"
|
||||
|
||||
values = [
|
||||
file("./envs/${var.env}/airflow.yaml")
|
||||
|
@ -211,13 +205,28 @@ resource "helm_release" "airflow" {
|
|||
}
|
||||
|
||||
set {
|
||||
name = "images.airflow.repository"
|
||||
value = "gbloisi/airflow"
|
||||
name = "dags.gitSync.repo"
|
||||
value = var.repo_url
|
||||
}
|
||||
|
||||
set {
|
||||
name = "dags.gitSync.branch"
|
||||
value = var.branch_name
|
||||
}
|
||||
|
||||
set {
|
||||
name = "dags.gitSync.subPath"
|
||||
value = var.dag_path
|
||||
}
|
||||
|
||||
# set {
|
||||
# name = "images.airflow.repository"
|
||||
# value = "gbloisi/airflow"
|
||||
# }
|
||||
|
||||
set {
|
||||
name = "images.airflow.tag"
|
||||
value = "2.8.3rc1-python3.11"
|
||||
value = "2.9.3-python3.11"
|
||||
}
|
||||
|
||||
set {
|
||||
|
|
|
@ -1,12 +1,9 @@
|
|||
provider "helm" {
|
||||
# Several Kubernetes authentication methods are possible: https://registry.terraform.io/providers/hashicorp/kubernetes/latest/docs#authentication
|
||||
kubernetes {
|
||||
config_path = pathexpand(var.kube_config)
|
||||
config_context = var.kube_context
|
||||
terraform {
|
||||
required_providers {
|
||||
helm = {
|
||||
}
|
||||
|
||||
kubernetes = {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
provider "kubernetes" {
|
||||
config_path = pathexpand(var.kube_config)
|
||||
config_context = var.kube_context
|
||||
}
|
|
@ -49,3 +49,18 @@ variable "admin_password" {
|
|||
variable "admin_hash" {
|
||||
type = string
|
||||
}
|
||||
|
||||
variable "repo_url" {
|
||||
type = string
|
||||
default = "https://code-repo.d4science.org/D-Net/code-infrastructure-lab.git"
|
||||
}
|
||||
|
||||
variable "branch_name" {
|
||||
type = string
|
||||
default = "master"
|
||||
}
|
||||
|
||||
variable "dag_path" {
|
||||
type = string
|
||||
default = "workflow/dnet"
|
||||
}
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
resource "helm_release" "jupyterhub" {
|
||||
name = "jupyterhub"
|
||||
chart = "jupyterhub"
|
||||
repository = "https://hub.jupyter.org/helm-chart/"
|
||||
create_namespace = "true"
|
||||
namespace = "${var.namespace_prefix}spark-jobs"
|
||||
dependency_update = "true"
|
||||
version = "3.3.8"
|
||||
|
||||
set {
|
||||
name = "ingress.enabled"
|
||||
value = "true"
|
||||
}
|
||||
|
||||
set {
|
||||
name = "ingress.ingressClassName"
|
||||
value = "nginx"
|
||||
}
|
||||
|
||||
set {
|
||||
name = "ingress.hosts[0]"
|
||||
value = "jupyter.${var.domain}"
|
||||
}
|
||||
|
||||
set {
|
||||
name = "singleuser.image.name"
|
||||
value = "jupyter/all-spark-notebook"
|
||||
}
|
||||
|
||||
set {
|
||||
name = "singleuser.image.tag"
|
||||
value = "spark-3.5.0"
|
||||
}
|
||||
|
||||
set {
|
||||
name = "singleuser.cmd"
|
||||
value = "start-notebook.py"
|
||||
}
|
||||
|
||||
set {
|
||||
name = "singleuser.serviceAccountName"
|
||||
value = "spark"
|
||||
}
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
terraform {
|
||||
required_providers {
|
||||
helm = {
|
||||
}
|
||||
|
||||
kubernetes = {
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
variable "env" {
|
||||
type = string
|
||||
default = "local"
|
||||
}
|
||||
|
||||
variable "kube_config" {
|
||||
type = string
|
||||
default = "~/.kube/config"
|
||||
}
|
||||
|
||||
variable "kube_context" {
|
||||
type = string
|
||||
default = "default"
|
||||
}
|
||||
|
||||
variable "namespace_prefix" {
|
||||
type = string
|
||||
default = "lot1-"
|
||||
}
|
||||
|
||||
variable "domain" {
|
||||
type = string
|
||||
default = "local-dataplatform"
|
||||
}
|
||||
|
|
@ -1,34 +0,0 @@
|
|||
apiVersion: batch/v1
|
||||
kind: Job
|
||||
metadata:
|
||||
name: create-bucket
|
||||
namespace: block-storage
|
||||
spec:
|
||||
template:
|
||||
spec:
|
||||
containers:
|
||||
- name: createbucket
|
||||
image: amazon/aws-cli
|
||||
command: ["aws"]
|
||||
args:
|
||||
- s3api
|
||||
- create-bucket
|
||||
- --bucket
|
||||
- postgres
|
||||
- --endpoint-url
|
||||
- http://minio:80
|
||||
env:
|
||||
- name: AWS_ACCESS_KEY_ID
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: minio-secret
|
||||
key: accesskey
|
||||
|
||||
- name: AWS_SECRET_ACCESS_KEY
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: minio-secret
|
||||
key: secretkey
|
||||
|
||||
restartPolicy: Never
|
||||
backoffLimit: 1
|
|
@ -5,5 +5,5 @@ resource "helm_release" "minio_operator" {
|
|||
create_namespace = "true"
|
||||
namespace = "minio-operator"
|
||||
dependency_update = "true"
|
||||
version = "5.0.12"
|
||||
version = "6.0.4"
|
||||
}
|
|
@ -6,7 +6,7 @@ resource "helm_release" "minio_tenant" {
|
|||
create_namespace = "true"
|
||||
namespace = "${var.namespace_prefix}minio-tenant"
|
||||
dependency_update = "true"
|
||||
version = "5.0.12"
|
||||
version = "6.0.4"
|
||||
|
||||
values = [
|
||||
file("./envs/${var.env}/minio-tenant.yaml")
|
||||
|
@ -21,40 +21,26 @@ resource "helm_release" "minio_tenant" {
|
|||
name = "ingress.console.host"
|
||||
value = "console-minio.${var.domain}"
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
resource "kubernetes_manifest" "minio_ingress" {
|
||||
manifest = yamldecode(<<YAML
|
||||
apiVersion: networking.k8s.io/v1
|
||||
kind: Ingress
|
||||
metadata:
|
||||
name: ingress-minio
|
||||
namespace: block-storage
|
||||
annotations:
|
||||
kubernetes.io/ingress.class: "nginx"
|
||||
## Remove if using CA signed certificate
|
||||
nginx.ingress.kubernetes.io/proxy-ssl-verify: "off"
|
||||
nginx.ingress.kubernetes.io/backend-protocol: "HTTPS"
|
||||
nginx.ingress.kubernetes.io/rewrite-target: /
|
||||
nginx.ingress.kubernetes.io/proxy-body-size: "0"
|
||||
spec:
|
||||
ingressClassName: nginx
|
||||
tls:
|
||||
- hosts:
|
||||
- minio.${var.domain}
|
||||
secretName: nginx-tls
|
||||
rules:
|
||||
- host: minio.${var.domain}
|
||||
http:
|
||||
paths:
|
||||
- path: /
|
||||
pathType: Prefix
|
||||
backend:
|
||||
service:
|
||||
name: minio
|
||||
port:
|
||||
number: 443
|
||||
YAML
|
||||
)
|
||||
}*/
|
||||
dynamic "set" {
|
||||
for_each = var.buckets
|
||||
content {
|
||||
name = "tenant.buckets[${set.key}].name"
|
||||
value = set.value.name
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# set {
|
||||
# name = "tenant.buckets[0].name"
|
||||
# value = "workflow-logs"
|
||||
# }
|
||||
|
||||
# set {
|
||||
# name = "tenant.buckets[1].name"
|
||||
# value = "binaries"
|
||||
# }
|
||||
|
||||
|
||||
# ,"binaries","graph","pippo"]
|
||||
}
|
|
@ -1,12 +1,9 @@
|
|||
provider "helm" {
|
||||
# Several Kubernetes authentication methods are possible: https://registry.terraform.io/providers/hashicorp/kubernetes/latest/docs#authentication
|
||||
kubernetes {
|
||||
config_path = pathexpand(var.kube_config)
|
||||
config_context = var.kube_context
|
||||
terraform {
|
||||
required_providers {
|
||||
helm = {
|
||||
}
|
||||
|
||||
kubernetes = {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
provider "kubernetes" {
|
||||
config_path = pathexpand(var.kube_config)
|
||||
config_context = var.kube_context
|
||||
}
|
|
@ -22,3 +22,9 @@ variable "domain" {
|
|||
type = string
|
||||
default = "local-dataplatform"
|
||||
}
|
||||
|
||||
variable "buckets" {
|
||||
type = list(map(string))
|
||||
default = [ ]
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
provider "helm" {
|
||||
# Several Kubernetes authentication methods are possible: https://registry.terraform.io/providers/hashicorp/kubernetes/latest/docs#authentication
|
||||
kubernetes {
|
||||
config_path = pathexpand(var.kube_config)
|
||||
config_context = var.kube_context
|
||||
}
|
||||
}
|
||||
|
||||
provider "kubernetes" {
|
||||
config_path = pathexpand(var.kube_config)
|
||||
config_context = var.kube_context
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
FROM spark:3.5.1-scala2.12-java17-ubuntu
|
||||
# docker build -t dnet-spark:1.0.0 . && kind load docker-image -n dnet-data-platform dnet-spark:1.0.0
|
||||
FROM spark:3.5.3-scala2.12-java17-ubuntu
|
||||
|
||||
user root
|
||||
USER root
|
||||
RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar -o ${SPARK_HOME}/jars/hadoop-aws-3.3.4.jar
|
||||
RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar -o ${SPARK_HOME}/jars/aws-java-sdk-bundle-1.12.262.jar
|
||||
|
||||
|
|
18
variables.tf
18
variables.tf
|
@ -44,3 +44,21 @@ variable "s3_key" {
|
|||
variable "s3_secret" {
|
||||
default = "minio123"
|
||||
}
|
||||
|
||||
variable "minio_buckets" {
|
||||
type = list(map(string))
|
||||
default = [
|
||||
{ name = "workflow-logs" },
|
||||
{ name = "binaries" },
|
||||
{ name = "graph" },
|
||||
]
|
||||
}
|
||||
|
||||
variable "dag_branch_name" {
|
||||
default = "master"
|
||||
}
|
||||
|
||||
variable "dag_path_name" {
|
||||
default = "workflow/dnet"
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
from airflow.hooks.base import BaseHook
|
||||
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
|
||||
|
||||
def get_bucket_name(context: dict, hook: S3Hook, param_name: str):
|
||||
bucket_name = context["params"][param_name]
|
||||
if not bucket_name:
|
||||
bucket_name = hook.extra_args['bucket_name']
|
||||
return bucket_name
|
||||
|
||||
|
||||
def get_default_bucket():
|
||||
hook = S3Hook("s3_conn", transfer_config_args={'use_threads': False})
|
||||
try:
|
||||
return hook.service_config['bucket_name']
|
||||
except KeyError:
|
||||
return ''
|
|
@ -0,0 +1,100 @@
|
|||
import os
|
||||
import tarfile
|
||||
import time
|
||||
from datetime import timedelta
|
||||
|
||||
import pendulum
|
||||
from airflow.decorators import dag
|
||||
from airflow.decorators import task
|
||||
from airflow.models.param import Param
|
||||
from airflow.operators.python import get_current_context
|
||||
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
import dag_utils
|
||||
|
||||
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
||||
|
||||
default_args = {
|
||||
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
|
||||
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
|
||||
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
|
||||
}
|
||||
|
||||
|
||||
def load_file_obj_with_backoff(hook: S3Hook, fileobj, key: str, bucket: str, replace: bool) -> bool:
|
||||
delay = 10 # initial delay
|
||||
delay_incr = 10 # additional delay in each loop
|
||||
max_delay = 60 # max delay of one loop. Total delay is (max_delay**2)/2
|
||||
|
||||
while delay < max_delay:
|
||||
try:
|
||||
return hook.load_file_obj(fileobj,
|
||||
key,
|
||||
bucket,
|
||||
replace=replace)
|
||||
except ClientError as err:
|
||||
code = err.response.get('Error', {}).get('Code', '')
|
||||
if code in ['NoSuchBucket']:
|
||||
print(f"Error: {code}. Check s3path: s3://{bucket}/{key}")
|
||||
raise err
|
||||
time.sleep(delay)
|
||||
delay += delay_incr
|
||||
|
||||
|
||||
@dag(
|
||||
dag_id="s3_untar",
|
||||
dag_display_name="S3 streaming untar",
|
||||
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
|
||||
schedule=None,
|
||||
catchup=False,
|
||||
default_args=default_args,
|
||||
params={
|
||||
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
|
||||
"src_bucket": Param(dag_utils.get_default_bucket(), type='string',
|
||||
description="Override S3 default bucket for source"),
|
||||
"src_key": Param("", type='string', description="File to untar"),
|
||||
"dst_bucket": Param(dag_utils.get_default_bucket(), type='string',
|
||||
description="Override S3 default bucket for destination"),
|
||||
"dst_key_prefix": Param("", type='string', description="Key prefix for unarchived files"),
|
||||
},
|
||||
tags=["s3", "tools"],
|
||||
)
|
||||
def s3_untar():
|
||||
@task
|
||||
def untar():
|
||||
context = get_current_context()
|
||||
hook = S3Hook(context["params"]["S3_CONN_ID"], transfer_config_args={'use_threads': False})
|
||||
src_bucket = context['params']['src_bucket']
|
||||
dst_bucket = context['params']['dst_bucket']
|
||||
dst_key_prefix = os.path.normpath(context["params"]["dst_key_prefix"])
|
||||
|
||||
print(f"Existing keys with prefix: {dst_key_prefix}/")
|
||||
existing_keys = dict.fromkeys(hook.list_keys(bucket_name=dst_bucket,
|
||||
prefix=dst_key_prefix + "/"), 0)
|
||||
for k in existing_keys.keys():
|
||||
print(f"{k}")
|
||||
s3_obj = hook.get_key(context["params"]["src_key"], bucket_name=src_bucket)
|
||||
|
||||
with tarfile.open(fileobj=s3_obj.get()["Body"], mode='r|*') as tar:
|
||||
for member in tar:
|
||||
dst_key = os.path.normpath(dst_key_prefix + "/" + member.name)
|
||||
# Ignore directories, links, devices, fifos, etc.
|
||||
if (not member.isfile()) or member.name.endswith('/'):
|
||||
print(f"Skipping {member.name}: is not a file")
|
||||
continue
|
||||
if dst_key in existing_keys:
|
||||
print(f"Skipping {member.name}: already exists as {dst_key}")
|
||||
continue
|
||||
print(f"Extracting {member.name} to {dst_key}")
|
||||
fileobj = tar.extractfile(member)
|
||||
fileobj.seekable = lambda: False
|
||||
load_file_obj_with_backoff(hook, fileobj,
|
||||
dst_key,
|
||||
dst_bucket,
|
||||
replace=True)
|
||||
|
||||
untar()
|
||||
|
||||
|
||||
s3_untar()
|
|
@ -0,0 +1,75 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from airflow.decorators import dag
|
||||
from airflow.models.baseoperator import chain
|
||||
from airflow.models.param import Param
|
||||
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
|
||||
|
||||
import dag_utils
|
||||
|
||||
|
||||
@dag(
|
||||
dag_id="build_openaire_graph",
|
||||
dag_display_name="Build the OpenAIRE graph",
|
||||
params={
|
||||
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection for S3 endpoint"),
|
||||
"GRAPH_PATH": Param("s3a://graph/tmp/prod_provision/graph", type='string', description=""),
|
||||
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir", type='string', description=""),
|
||||
"IS_LOOKUP_URL": Param("http://services.openaire.eu:8280/is/services/isLookUp?wsdl", type='string',
|
||||
description=""),
|
||||
"DEDUP_CONFIG_ID": Param("dedup-result-decisiontree-v4", type='string', description=""),
|
||||
"ORCID_PATH": Param("s3a://graph/data/orcid_2023/tables", type='string', description="")
|
||||
},
|
||||
tags=["openaire"]
|
||||
)
|
||||
def build_new_graph():
|
||||
chain(
|
||||
TriggerDagRunOperator(
|
||||
task_id="dedup",
|
||||
task_display_name="Deduplicate Research Results",
|
||||
trigger_dag_id="results_deduplication",
|
||||
wait_for_completion=True,
|
||||
conf={
|
||||
"S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
|
||||
|
||||
"INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["inference"],
|
||||
"OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["dedup"],
|
||||
"WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/dedup",
|
||||
"IS_LOOKUP_URL": "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
|
||||
"DEDUP_CONFIG_ID": "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}"
|
||||
}
|
||||
),
|
||||
TriggerDagRunOperator(
|
||||
task_id="consistency",
|
||||
task_display_name="Enforce Consistency of Graph",
|
||||
trigger_dag_id="consistency_graph",
|
||||
wait_for_completion=True,
|
||||
|
||||
conf={
|
||||
"S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
|
||||
|
||||
"INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["dedup"],
|
||||
"OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"],
|
||||
"WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/dedup",
|
||||
"IS_LOOKUP_URL": "{{ dag_run.conf.get('IS_LOOKUP_URL') }}"
|
||||
}
|
||||
),
|
||||
TriggerDagRunOperator(
|
||||
task_id="orcid_enrichment",
|
||||
task_display_name="Enrich Graph with ORCID data",
|
||||
trigger_dag_id="orcid_enrichment_graph",
|
||||
wait_for_completion=True,
|
||||
|
||||
conf={
|
||||
"S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
|
||||
|
||||
"ORCID_PATH": "{{ dag_run.conf.get('ORCID_PATH') }}",
|
||||
"INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"],
|
||||
"OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["orcid_enhancement"],
|
||||
"WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/orcid_enrichment"
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
build_new_graph()
|
|
@ -0,0 +1,173 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from airflow.decorators import dag
|
||||
from airflow.models.baseoperator import chain
|
||||
from airflow.models.param import Param
|
||||
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
|
||||
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
|
||||
|
||||
import dag_utils
|
||||
from spark_configurator import SparkConfigurator
|
||||
|
||||
|
||||
@dag(
|
||||
dag_id="build_openaire_graph_incremental",
|
||||
dag_display_name="Build the OpenAIRE graph incrementally",
|
||||
params={
|
||||
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection for S3 endpoint"),
|
||||
"GRAPH_PATH": Param("s3a://graph/tmp/prod_provision/graph", type='string', description=""),
|
||||
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir", type='string', description=""),
|
||||
"IS_LOOKUP_URL": Param("http://services.openaire.eu:8280/is/services/isLookUp?wsdl", type='string',
|
||||
description=""),
|
||||
"DEDUP_CONFIG_ID": Param("dedup-result-decisiontree-v4", type='string', description=""),
|
||||
"ORCID_PATH": Param("s3a://graph/data/orcid_2023/tables", type='string', description=""),
|
||||
"DELTA_PATH": Param("s3a://graph/data/delta", type='string', description=""),
|
||||
},
|
||||
tags=["openaire"]
|
||||
)
|
||||
def build_new_graph():
|
||||
chain(
|
||||
SparkKubernetesOperator(
|
||||
task_id='raw_graph',
|
||||
task_display_name="Generate Raw Graph",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="rawgraph-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.graph.raw.CopyIncrementalOafSparkApplication",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--inputPath", "{{ dag_run.conf.get('DELTA_PATH') }}",
|
||||
"--graphOutputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["raw"]
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
),
|
||||
|
||||
SparkKubernetesOperator(
|
||||
task_id='grouped_graph',
|
||||
task_display_name="Generate Grouped-by-id Graph",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="groupedgraph-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--graphInputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["raw"],
|
||||
"--outputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["grouped"],
|
||||
"--checkpointPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}/grouped_entities",
|
||||
"--isLookupUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
|
||||
"--filterInvisible", "false"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
),
|
||||
|
||||
SparkKubernetesOperator(
|
||||
task_id='copygroupedrels',
|
||||
task_display_name="Copy relations to Grouped-by-id Graph",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="copygroupedrels-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.merge.CopyEntitiesSparkJob",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--graphInputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["raw"],
|
||||
"--outputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["grouped"],
|
||||
"--entities", "relation",
|
||||
"--format", "text"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
),
|
||||
|
||||
TriggerDagRunOperator(
|
||||
task_id="clean_graph",
|
||||
task_display_name="Clean Results",
|
||||
trigger_dag_id="clean_graph",
|
||||
wait_for_completion=True,
|
||||
conf={
|
||||
"S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
|
||||
|
||||
"INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["grouped"],
|
||||
"OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["clean"],
|
||||
"WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/dedup",
|
||||
"IS_LOOKUP_URL": "{{ dag_run.conf.get('IS_LOOKUP_URL') }}"
|
||||
}
|
||||
),
|
||||
|
||||
SparkKubernetesOperator(
|
||||
task_id='resolverels',
|
||||
task_display_name="Resolve Relations",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="resolverels-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.graph.resolution.SparkResolveRelationById",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--graphBasePath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["clean"],
|
||||
"--targetPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["resolve"],
|
||||
"--relationPath",
|
||||
"{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"] + "/relation"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
),
|
||||
|
||||
SparkKubernetesOperator(
|
||||
task_id='copyresolveents',
|
||||
task_display_name="Copy entities to Resolved Graph",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="copyresolveents-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.merge.CopyEntitiesSparkJob",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--graphInputPath",
|
||||
"{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["clean"],
|
||||
"--outputPath", "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["resolve"],
|
||||
"--entities", ",".join([item for item in dag_utils.GRAPH_ENTITIES if item != "relation"]),
|
||||
"--format", "text"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
|
||||
# , TriggerDagRunOperator(
|
||||
# task_id="dedup",
|
||||
# task_display_name="Deduplicate Research Results",
|
||||
# trigger_dag_id="results_deduplication",
|
||||
# wait_for_completion=True,
|
||||
# conf={
|
||||
# "S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
|
||||
#
|
||||
# "INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["inference"],
|
||||
# "OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["dedup"],
|
||||
# "WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/dedup",
|
||||
# "IS_LOOKUP_URL": "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
|
||||
# "DEDUP_CONFIG_ID": "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}"
|
||||
# }
|
||||
# ),
|
||||
# TriggerDagRunOperator(
|
||||
# task_id="consistency",
|
||||
# task_display_name="Enforce Consistency of Graph",
|
||||
# trigger_dag_id="consistency_graph",
|
||||
# wait_for_completion=True,
|
||||
#
|
||||
# conf={
|
||||
# "S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
|
||||
#
|
||||
# "INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["dedup"],
|
||||
# "OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"],
|
||||
# "WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/dedup",
|
||||
# "IS_LOOKUP_URL": "{{ dag_run.conf.get('IS_LOOKUP_URL') }}"
|
||||
# }
|
||||
# ),
|
||||
# TriggerDagRunOperator(
|
||||
# task_id="orcid_enrichment",
|
||||
# task_display_name="Enrich Graph with ORCID data",
|
||||
# trigger_dag_id="orcid_enrichment_graph",
|
||||
# wait_for_completion=True,
|
||||
#
|
||||
# conf={
|
||||
# "S3_CONN_ID": "{{ dag_run.conf.get('S3_CONN_ID') }}",
|
||||
#
|
||||
# "ORCID_PATH": "{{ dag_run.conf.get('ORCID_PATH') }}",
|
||||
# "INPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["consistency"],
|
||||
# "OUTPUT_PATH": "{{ dag_run.conf.get('GRAPH_PATH') }}/" + dag_utils.BUILD_PHASES["orcid_enhancement"],
|
||||
# "WRKDIR_PATH": "{{ dag_run.conf.get('WRKDIR_PATH') }}/orcid_enrichment"
|
||||
# }
|
||||
# )
|
||||
)
|
||||
|
||||
|
||||
build_new_graph()
|
|
@ -0,0 +1,109 @@
|
|||
import os
|
||||
from datetime import timedelta
|
||||
|
||||
from airflow.decorators import dag
|
||||
from airflow.models.baseoperator import chain
|
||||
from airflow.models.param import Param
|
||||
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
|
||||
|
||||
import dag_utils
|
||||
from spark_configurator import SparkConfigurator
|
||||
|
||||
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
||||
|
||||
default_args = {
|
||||
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
|
||||
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
|
||||
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
|
||||
}
|
||||
|
||||
|
||||
@dag(
|
||||
dag_id="clean_graph",
|
||||
dag_display_name="Cleaning of Graph",
|
||||
default_args=default_args,
|
||||
params={
|
||||
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
|
||||
"POSTGRES_CONN_ID": Param("postgres_conn", type='string', description=""),
|
||||
|
||||
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/02_graph_grouped", type='string', description=""),
|
||||
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/03_graph_cleaned", type='string', description=""),
|
||||
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir/clean", type='string', description=""),
|
||||
"IS_LOOKUP_URL": Param("http://services.openaire.eu:8280/is/services/isLookUp?wsdl", type='string',
|
||||
description=""),
|
||||
"COUNTRY": Param("NL", type='string', description=""),
|
||||
"SHOULD_CLEAN": Param("false", type='string', description=""),
|
||||
"CONTEXT_ID": Param("sobigdata", type='string', description=""),
|
||||
"VERIFY_PARAM": Param("gcube", type='string', description=""),
|
||||
"VERIFY_COUNTRY_PARAM": Param("10.17632;10.5061", type='string', description=""),
|
||||
"COLLECTED_FROM": Param("NARCIS", type='string', description="")
|
||||
},
|
||||
tags=["openaire"]
|
||||
)
|
||||
def clean_graph_dag():
|
||||
getdatasourcefromcountry = SparkKubernetesOperator(
|
||||
task_id='getdatasourcefromcountry',
|
||||
task_display_name="Get datasource from Country",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="getdatasourcefromcountry-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.graph.clean.GetDatasourceFromCountry",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--inputPath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
||||
"--country", "{{ dag_run.conf.get('COUNTRY') }}",
|
||||
"--workingDir", "{{ dag_run.conf.get('WRKDIR_PATH') }}/working/hostedby"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
|
||||
masterduplicateaction = SparkKubernetesOperator(
|
||||
task_id='masterduplicateaction',
|
||||
task_display_name="MasterDuplicateAction",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="masterduplicateaction-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.graph.clean.MasterDuplicateAction",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--hdfsNameNode", "s3a://graph/",
|
||||
"--hdfsPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}/masterduplicate",
|
||||
"--postgresUrl", "jdbc:postgresql://{{ conn.get(dag_run.conf.get('POSTGRES_CONN_ID')).host }}:{{ conn.get(dag_run.conf.get('POSTGRES_CONN_ID')).port }}/dnet_openaireplus",
|
||||
"--postgresUser", "{{ conn.get(dag_run.conf.get('POSTGRES_CONN_ID')).login }}",
|
||||
"--postgresPassword", "{{ conn.get(dag_run.conf.get('POSTGRES_CONN_ID')).password }}"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
|
||||
clean_tasks = []
|
||||
for entity in dag_utils.GRAPH_ENTITIES:
|
||||
clean_tasks.append(SparkKubernetesOperator(
|
||||
task_id='cleansparkjob_' + entity,
|
||||
task_display_name="Clean " + entity,
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="cleansparkjob-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=[
|
||||
"--inputPath", "{{ dag_run.conf.get('INPUT_PATH') }}/" + entity,
|
||||
"--outputPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}/" + entity,
|
||||
"--graphTableClassName", dag_utils.GRAPH_ENTITIES_CLASS_NAMES[entity],
|
||||
"--isLookupUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
|
||||
"--contextId", "{{ dag_run.conf.get('CONTEXT_ID') }}",
|
||||
"--verifyParam", "{{ dag_run.conf.get('VERIFY_PARAM') }}",
|
||||
"--country", "{{ dag_run.conf.get('COUNTRY') }}",
|
||||
"--verifyCountryParam", "{{ dag_run.conf.get('VERIFY_COUNTRY_PARAM') }}",
|
||||
"--hostedBy", "{{ dag_run.conf.get('WRKDIR_PATH') }}/working/hostedby",
|
||||
"--collectedfrom", "{{ dag_run.conf.get('COLLECTED_FROM') }}",
|
||||
"--masterDuplicatePath", "{{ dag_run.conf.get('WRKDIR_PATH') }}/masterduplicate",
|
||||
"--deepClean", "{{ dag_run.conf.get('SHOULD_CLEAN') }}"
|
||||
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
))
|
||||
|
||||
chain(getdatasourcefromcountry,
|
||||
#masterduplicateaction,
|
||||
clean_tasks)
|
||||
|
||||
|
||||
clean_graph_dag()
|
|
@ -0,0 +1,71 @@
|
|||
import os
|
||||
from datetime import timedelta
|
||||
|
||||
from airflow.decorators import dag
|
||||
from airflow.models.baseoperator import chain
|
||||
from airflow.models.param import Param
|
||||
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
|
||||
|
||||
from spark_configurator import SparkConfigurator
|
||||
|
||||
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
||||
|
||||
default_args = {
|
||||
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
|
||||
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
|
||||
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
|
||||
}
|
||||
|
||||
|
||||
@dag(
|
||||
dag_id="consistency_graph",
|
||||
dag_display_name="Enforce Consistency of Graph",
|
||||
default_args=default_args,
|
||||
params={
|
||||
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
|
||||
|
||||
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/06_graph_dedup", type='string', description=""),
|
||||
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/07_graph_consistent", type='string', description=""),
|
||||
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir/dedup", type='string', description=""),
|
||||
"IS_LOOKUP_URL": Param("http://services.openaire.eu:8280/is/services/isLookUp?wsdl", type='string',
|
||||
description="")
|
||||
},
|
||||
tags=["openaire"]
|
||||
)
|
||||
def consistency_graph_dag():
|
||||
propagate_rel = SparkKubernetesOperator(
|
||||
task_id='PropagateRelation',
|
||||
task_display_name="Propagate Relations",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="propagaterels-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.dedup.SparkPropagateRelation",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
||||
"--graphOutputPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}",
|
||||
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
|
||||
group_entities = SparkKubernetesOperator(
|
||||
task_id='GroupEntities',
|
||||
task_display_name="Group results by id",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="groupentities-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--graphInputPath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
||||
"--checkpointPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}/grouped_entities",
|
||||
"--outputPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}",
|
||||
"--isLookupUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
|
||||
"--filterInvisible", "true"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
|
||||
chain(propagate_rel, group_entities)
|
||||
|
||||
|
||||
consistency_graph_dag()
|
|
@ -0,0 +1,44 @@
|
|||
from airflow.hooks.base import BaseHook
|
||||
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
|
||||
|
||||
BUILD_PHASES = {
|
||||
"raw": "01_graph_raw",
|
||||
"grouped": "02_graph_grouped",
|
||||
"clean": "03_graph_cleaned",
|
||||
"resolve": "04_graph_resolved",
|
||||
|
||||
"inference": "05_graph_inferred",
|
||||
"dedup": "06_graph_dedup",
|
||||
"consistency": "07_graph_consistent",
|
||||
"enrichment": "08_graph_dedup_enriched", # actionset
|
||||
"orcid_enhancement": "09_graph_orcid_enriched"
|
||||
}
|
||||
|
||||
def get_bucket_name(context: dict, hook: S3Hook, param_name: str):
|
||||
bucket_name = context["params"][param_name]
|
||||
if not bucket_name:
|
||||
bucket_name = hook.extra_args['bucket_name']
|
||||
return bucket_name
|
||||
|
||||
|
||||
def get_default_bucket():
|
||||
hook = S3Hook("s3_conn", transfer_config_args={'use_threads': False})
|
||||
try:
|
||||
return hook.service_config['bucket_name']
|
||||
except KeyError:
|
||||
return ''
|
||||
|
||||
|
||||
GRAPH_ENTITIES = ["publication", "dataset", "otherresearchproduct", "software", "datasource", "organization", "project", "relation"]
|
||||
|
||||
|
||||
GRAPH_ENTITIES_CLASS_NAMES = {
|
||||
"publication": "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||
"dataset": "eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||
"otherresearchproduct": "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
|
||||
"software": "eu.dnetlib.dhp.schema.oaf.Software",
|
||||
"datasource": "eu.dnetlib.dhp.schema.oaf.Datasource",
|
||||
"organization": "eu.dnetlib.dhp.schema.oaf.Organization",
|
||||
"project": "eu.dnetlib.dhp.schema.oaf.Project",
|
||||
"relation": "eu.dnetlib.dhp.schema.oaf.Relation"
|
||||
}
|
|
@ -0,0 +1,173 @@
|
|||
import os
|
||||
from datetime import timedelta
|
||||
|
||||
from airflow.decorators import dag
|
||||
from airflow.models.baseoperator import chain
|
||||
from airflow.models.param import Param
|
||||
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
|
||||
|
||||
from spark_configurator import SparkConfigurator
|
||||
|
||||
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
||||
|
||||
default_args = {
|
||||
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
|
||||
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
|
||||
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
|
||||
}
|
||||
|
||||
|
||||
@dag(
|
||||
dag_id="results_deduplication",
|
||||
dag_display_name="Deduplicate Research Results",
|
||||
default_args=default_args,
|
||||
params={
|
||||
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
|
||||
|
||||
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/05_graph_inferred", type='string', description=""),
|
||||
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/06_graph_dedup", type='string', description=""),
|
||||
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir/dedup", type='string', description=""),
|
||||
"IS_LOOKUP_URL": Param("http://services.openaire.eu:8280/is/services/isLookUp?wsdl", type='string',
|
||||
description=""),
|
||||
"DEDUP_CONFIG_ID": Param("dedup-result-decisiontree-v4", type='string', description="")
|
||||
},
|
||||
tags=["openaire"]
|
||||
)
|
||||
def results_deduplication_dag():
|
||||
simrel = SparkKubernetesOperator(
|
||||
task_id='CreateSimRel',
|
||||
task_display_name="Create Similarity Relations",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="createsimrels-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
||||
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
|
||||
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
|
||||
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
|
||||
"--numPartitions", "64"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
|
||||
whitelist = SparkKubernetesOperator(
|
||||
task_id='WhitelistSimRels',
|
||||
task_display_name="Add Whitelist Similarity Relations",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="whitelistsimrels-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.dedup.SparkWhitelistSimRels",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
||||
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
|
||||
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
|
||||
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
|
||||
"--whiteListPath", "s3a://graph/data/dedup/whitelist_prod", # TODO: copy!
|
||||
"--numPartitions", "64"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
|
||||
createmergerel = SparkKubernetesOperator(
|
||||
task_id='CreateMergeRels',
|
||||
task_display_name="Create Merge Relations",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="createmergerels-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
||||
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
|
||||
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
|
||||
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
|
||||
"--cutConnectedComponent", "200",
|
||||
"--hiveMetastoreUris", "",
|
||||
"--pivotHistoryDatabase", ""
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
|
||||
creatededuprecord = SparkKubernetesOperator(
|
||||
task_id='CreateDedupRecord',
|
||||
task_display_name="Create Dedup Record",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="creatededuprecord-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
||||
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
|
||||
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
|
||||
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
|
||||
copyopenorgsmergerel = SparkKubernetesOperator(
|
||||
task_id='CopyOpenorgsMergeRels',
|
||||
task_display_name="Copy Openorgs Merge Relations",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="copyopenorgsmergerels-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgsMergeRels",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
||||
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
|
||||
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
|
||||
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
|
||||
"--numPartitions", "64"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
|
||||
createorgsdeduprecord = SparkKubernetesOperator(
|
||||
task_id='CreateOrgsDedupRecord',
|
||||
task_display_name="Create Organizations Dedup Records",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="createorgsdeduprecord-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCreateOrgsDedupRecord",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
||||
"--isLookUpUrl", "{{ dag_run.conf.get('IS_LOOKUP_URL') }}",
|
||||
"--actionSetId", "{{ dag_run.conf.get('DEDUP_CONFIG_ID') }}",
|
||||
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
|
||||
updateentity = SparkKubernetesOperator(
|
||||
task_id='UpdateEntity',
|
||||
task_display_name="Update Entity",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="updateentity-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.dedup.SparkUpdateEntity",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
||||
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
|
||||
"--dedupGraphPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
|
||||
copyrelations = SparkKubernetesOperator(
|
||||
task_id='copyRelations',
|
||||
task_display_name="Copy Non-Openorgs Relations",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="copyrelations-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.dedup.SparkCopyRelationsNoOpenorgs",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--graphBasePath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
||||
"--workingPath", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
|
||||
"--dedupGraphPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
|
||||
chain(simrel, whitelist, createmergerel, creatededuprecord, copyopenorgsmergerel, createorgsdeduprecord, updateentity, copyrelations)
|
||||
|
||||
|
||||
results_deduplication_dag()
|
|
@ -0,0 +1,70 @@
|
|||
import os
|
||||
from datetime import timedelta
|
||||
|
||||
from airflow.decorators import dag
|
||||
from airflow.models.baseoperator import chain
|
||||
from airflow.models.param import Param
|
||||
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
|
||||
|
||||
from spark_configurator import SparkConfigurator
|
||||
|
||||
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
||||
|
||||
default_args = {
|
||||
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
|
||||
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
|
||||
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
|
||||
}
|
||||
|
||||
|
||||
@dag(
|
||||
dag_id="orcid_enrichment_graph",
|
||||
dag_display_name="Enrich Graph with ORCID data",
|
||||
default_args=default_args,
|
||||
params={
|
||||
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
|
||||
"ORCID_PATH": Param("s3a://graph/data/orcid_2023/tables", type='string', description=""),
|
||||
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/07_graph_consistent", type='string', description=""),
|
||||
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/09_graph_orcid_enriched", type='string',
|
||||
description=""),
|
||||
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir/orcid_enrichment", type='string',
|
||||
description="")
|
||||
},
|
||||
tags=["openaire"]
|
||||
)
|
||||
def orcid_enrichment_dag():
|
||||
chain(SparkKubernetesOperator(
|
||||
task_id='EnrichGraphWithOrcidAuthors',
|
||||
task_display_name='Enrich Authors with ORCID',
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="orcidenrich-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.enrich.orcid.SparkEnrichGraphWithOrcidAuthors",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--orcidPath", "{{ dag_run.conf.get('ORCID_PATH') }}",
|
||||
"--graphPath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
||||
"--targetPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}",
|
||||
"--workingDir", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
|
||||
"--master", ""
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
),
|
||||
SparkKubernetesOperator(
|
||||
task_id='copyorcidenrichrels',
|
||||
task_display_name="Copy relations to ORCID Enriched graph",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="copygroupedrels-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.oa.merge.CopyEntitiesSparkJob",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--graphInputPath", "{{ dag_run.conf.get('INPUT_PATH') }}/",
|
||||
"--outputPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}/",
|
||||
"--entities", "relation",
|
||||
"--format", "text"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
orcid_enrichment_dag()
|
|
@ -0,0 +1,55 @@
|
|||
import os
|
||||
from datetime import timedelta
|
||||
|
||||
from airflow.decorators import dag
|
||||
from airflow.models.param import Param
|
||||
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
|
||||
|
||||
from spark_configurator import SparkConfigurator
|
||||
|
||||
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
||||
|
||||
default_args = {
|
||||
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
|
||||
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
|
||||
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60)))
|
||||
}
|
||||
|
||||
|
||||
@dag(
|
||||
dag_id="orcid_propagation_graph",
|
||||
dag_display_name="Propagate ORCID data in graph",
|
||||
default_args=default_args,
|
||||
params={
|
||||
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
|
||||
"ORCID_PATH": Param("s3a://graph/tmp/prod_provision/graph/09_graph_orcid_enriched", type='string', description=""),
|
||||
"INPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/09_graph_orcid_enriched", type='string', description=""),
|
||||
"OUTPUT_PATH": Param("s3a://graph/tmp/prod_provision/graph/10_graph_propagated", type='string',
|
||||
description=""),
|
||||
"WRKDIR_PATH": Param("s3a://graph/tmp/prod_provision/working_dir/orcid_propagation", type='string',
|
||||
description="")
|
||||
},
|
||||
tags=["openaire"]
|
||||
)
|
||||
def orcid_propagation_dag():
|
||||
orcid_propagate = SparkKubernetesOperator(
|
||||
task_id='PropagateGraphWithOrcid',
|
||||
task_display_name="Propagate ORCID data",
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=SparkConfigurator(
|
||||
name="orcidpropagate-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkPropagateOrcidAuthor",
|
||||
jarLocation='s3a://binaries/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments=["--orcidPath", "{{ dag_run.conf.get('ORCID_PATH') }}",
|
||||
"--graphPath", "{{ dag_run.conf.get('INPUT_PATH') }}",
|
||||
"--targetPath", "{{ dag_run.conf.get('OUTPUT_PATH') }}",
|
||||
"--workingDir", "{{ dag_run.conf.get('WRKDIR_PATH') }}",
|
||||
"--matchingSource", "graph"
|
||||
]).get_configuration(),
|
||||
kubernetes_conn_id="kubernetes_default"
|
||||
)
|
||||
|
||||
orcid_propagate
|
||||
|
||||
|
||||
orcid_propagation_dag()
|
|
@ -0,0 +1,87 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""
|
||||
This is an example DAG which uses SparkKubernetesOperator and SparkKubernetesSensor.
|
||||
In this example, we create two tasks which execute sequentially.
|
||||
The first task is to submit sparkApplication on Kubernetes cluster(the example uses spark-pi application).
|
||||
and the second task is to check the final state of the sparkApplication that submitted in the first state.
|
||||
|
||||
Spark-on-k8s operator is required to be already installed on Kubernetes
|
||||
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
|
||||
"""
|
||||
|
||||
from os import path
|
||||
from datetime import timedelta, datetime
|
||||
from spark_configurator import SparkConfigurator
|
||||
|
||||
# [START import_module]
|
||||
# The DAG object; we'll need this to instantiate a DAG
|
||||
from airflow import DAG
|
||||
# Operators; we need this to operate!
|
||||
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
|
||||
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
|
||||
from airflow.utils.dates import days_ago
|
||||
|
||||
|
||||
# [END import_module]
|
||||
|
||||
# [START default_args]
|
||||
# These args will get passed on to each operator
|
||||
# You can override them on a per-task basis during operator initialization
|
||||
default_args = {
|
||||
'owner': 'airflow',
|
||||
'depends_on_past': False,
|
||||
'start_date': days_ago(1),
|
||||
'email': ['airflow@example.com'],
|
||||
'email_on_failure': False,
|
||||
'email_on_retry': False,
|
||||
'max_active_runs': 1,
|
||||
'retries': 3
|
||||
}
|
||||
|
||||
spec =SparkConfigurator(
|
||||
name="spark-scholix-{{ ds }}-{{ task_instance.try_number }}",
|
||||
mainClass="eu.dnetlib.dhp.sx.graph.SparkCreateScholexplorerDump",
|
||||
jarLocation = 's3a://deps/dhp-shade-package-1.2.5-SNAPSHOT.jar',
|
||||
arguments =[ "--sourcePath", "s3a://raw-graph/01", "--targetPath", "s3a://scholix"],\
|
||||
executor_cores=4,
|
||||
executor_memory="4G",
|
||||
executor_instances=1,
|
||||
executor_memoryOverhead="3G").get_configuration()
|
||||
|
||||
dag = DAG(
|
||||
'spark_run_test',
|
||||
default_args=default_args,
|
||||
schedule_interval=None,
|
||||
tags=['example', 'spark']
|
||||
)
|
||||
|
||||
submit = SparkKubernetesOperator(
|
||||
task_id='spark-scholix',
|
||||
namespace='dnet-spark-jobs',
|
||||
template_spec=spec,
|
||||
kubernetes_conn_id="kubernetes_default",
|
||||
# do_xcom_push=True,
|
||||
# delete_on_termination=True,
|
||||
# base_container_name="spark-kubernetes-driver",
|
||||
dag=dag
|
||||
)
|
||||
|
||||
|
||||
|
||||
submit
|
|
@ -0,0 +1,106 @@
|
|||
|
||||
class SparkConfigurator:
|
||||
def __init__(self,
|
||||
name,
|
||||
mainClass,
|
||||
jarLocation:str,
|
||||
arguments,
|
||||
apiVersion=None,
|
||||
namespace="dnet-spark-jobs",
|
||||
image= "dnet-spark:1.0.0",
|
||||
driver_cores=1,
|
||||
driver_memory='1G',
|
||||
executor_cores=8,
|
||||
executor_memory="16G",
|
||||
executor_memoryOverhead="8G",
|
||||
executor_instances=1
|
||||
) -> None:
|
||||
if apiVersion:
|
||||
self.apiVersion = apiVersion
|
||||
else:
|
||||
self.apiVersion = "sparkoperator.k8s.io/v1beta2"
|
||||
self.namespace= namespace
|
||||
self.name = name
|
||||
self.image= image
|
||||
self.mainClass = mainClass
|
||||
self.jarLocation = jarLocation
|
||||
self.arguments= arguments
|
||||
self.s3Configuration = {
|
||||
"spark.driver.extraJavaOptions": "-Divy.cache.dir=/tmp -Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true",
|
||||
"spark.executor.extraJavaOptions": "-Divy.cache.dir=/tmp -Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true",
|
||||
"spark.hadoop.fs.defaultFS": "s3a://graph",
|
||||
"spark.hadoop.fs.s3a.access.key": "minio",
|
||||
"spark.hadoop.fs.s3a.secret.key": "minio123",
|
||||
"spark.hadoop.fs.s3a.endpoint": "https://minio.dnet-minio-tenant.svc.cluster.local",
|
||||
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
|
||||
"spark.hadoop.fs.s3a.path.style.access": "true",
|
||||
"spark.hadoop.fs.s3a.attempts.maximum": "1",
|
||||
"spark.hadoop.fs.s3a.connection.establish.timeout": "5000",
|
||||
"spark.hadoop.fs.s3a.connection.timeout": "10001",
|
||||
"spark.hadoop.fs.s3a.connection.ssl.enabled": "false",
|
||||
"com.amazonaws.sdk.disableCertChecking": "true",
|
||||
"com.cloudera.com.amazonaws.sdk.disableCertChecking": "true",
|
||||
"fs.s3a.connection.ssl.strictverify": "false",
|
||||
"fs.s3a.connection.ssl.enabled": "false",
|
||||
"fs.s3a.ssl.enabled": "false",
|
||||
"spark.hadoop.fs.s3a.ssl.enabled": "false"
|
||||
}
|
||||
self.sparkResoruceConf= {
|
||||
'driver_cores':driver_cores,
|
||||
'driver_memory':driver_memory,
|
||||
'executor_cores':executor_cores,
|
||||
'executor_memory':executor_memory,
|
||||
'executor_instances':executor_instances,
|
||||
'memoryOverhead':executor_memoryOverhead
|
||||
|
||||
}
|
||||
|
||||
def get_configuration(self) -> dict:
|
||||
return {
|
||||
"apiVersion": self.apiVersion,
|
||||
"kind": "SparkApplication",
|
||||
"metadata": {
|
||||
"name": self.name,
|
||||
"namespace": self.namespace
|
||||
},
|
||||
"spec": {
|
||||
"type": "Scala",
|
||||
"mode": "cluster",
|
||||
"image":self.image,
|
||||
"imagePullPolicy": "IfNotPresent",
|
||||
"mainClass": self.mainClass,
|
||||
"mainApplicationFile": self.jarLocation,
|
||||
"deps": {
|
||||
"jar": [self.jarLocation]
|
||||
},
|
||||
"arguments": self.arguments,
|
||||
"sparkVersion": "3.5.3",
|
||||
"sparkConf": self.s3Configuration,
|
||||
"restartPolicy": {
|
||||
"type": "Never"
|
||||
},
|
||||
"dynamicAllocation": {
|
||||
"enables": True
|
||||
},
|
||||
"driver": {
|
||||
"javaOptions": "-Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true",
|
||||
"cores": self.sparkResoruceConf['driver_cores'],
|
||||
"coreLimit": "1200m",
|
||||
"memory": self.sparkResoruceConf['driver_memory'],
|
||||
"labels": {
|
||||
"version": "3.5.3"
|
||||
},
|
||||
"serviceAccount": "spark"
|
||||
},
|
||||
"executor": {
|
||||
"javaOptions": "-Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true",
|
||||
"cores": self.sparkResoruceConf['executor_cores'],
|
||||
"memoryOverhead": self.sparkResoruceConf['memoryOverhead'],
|
||||
"memory": self.sparkResoruceConf['executor_memory'],
|
||||
"instances": self.sparkResoruceConf['executor_instances'],
|
||||
"labels": {
|
||||
"version": "3.5.3"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue