simple test DAG
This commit is contained in:
parent
dead48e9b2
commit
0ca0da3cc9
|
@ -8,6 +8,7 @@ import os
|
||||||
import zipfile
|
import zipfile
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
|
||||||
|
import k8s
|
||||||
import pendulum
|
import pendulum
|
||||||
from airflow.decorators import dag
|
from airflow.decorators import dag
|
||||||
from airflow.decorators import task
|
from airflow.decorators import task
|
||||||
|
@ -139,7 +140,23 @@ def skg_if_pipeline():
|
||||||
|
|
||||||
return list(split_list(pieces, len(pieces)//BULK_PARALLELISM))
|
return list(split_list(pieces, len(pieces)//BULK_PARALLELISM))
|
||||||
|
|
||||||
@task(execution_timeout=timedelta(days=3))
|
@task(execution_timeout=timedelta(days=3), executor_config={
|
||||||
|
"pod_override": k8s.V1Pod(
|
||||||
|
spec=k8s.V1PodSpec(
|
||||||
|
containers=[
|
||||||
|
k8s.V1Container(
|
||||||
|
name="base",
|
||||||
|
resources=k8s.V1ResourceRequirements(
|
||||||
|
requests={
|
||||||
|
"cpu": "500m",
|
||||||
|
"memory": "256Mi"
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
})
|
||||||
def bulk_load(files: list[(str, str)]):
|
def bulk_load(files: list[(str, str)]):
|
||||||
client = OpenSearch(
|
client = OpenSearch(
|
||||||
hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}],
|
hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}],
|
||||||
|
|
Loading…
Reference in New Issue