diff --git a/airflow/dags/OpenDataPortal_harvest.py b/airflow/dags/OpenDataPortal_harvest.py new file mode 100644 index 0000000..2aacf6c --- /dev/null +++ b/airflow/dags/OpenDataPortal_harvest.py @@ -0,0 +1,101 @@ +import os +from datetime import timedelta + +import pendulum +import requests +from airflow.decorators import dag +from airflow.decorators import task +from airflow.hooks.base import BaseHook +from opensearchpy import OpenSearch, helpers + + +S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn") +EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) + +default_args = { + "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), + "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)), + "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), +} + + +@dag( + dag_display_name="Open Data Portal harvest", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule=None, + catchup=False, + default_args=default_args, + params={ + "S3_CONN_ID": "s3_conn", + "OPENSEARCH_CONN_ID": "opensearch_default", + "OS_INDEX_NAME": "euodp_raw" + }, + tags=["aggregation"], +) +def harvest(): + @task + def everything(**context): + indexName = context["params"]["OS_INDEX_NAME"] + conn = BaseHook.get_connection(context["params"]["OPENSEARCH_CONN_ID"]) + client = OpenSearch( + hosts=[{'host': conn.host, 'port': conn.port}], + http_auth=(conn.login, conn.password), + use_ssl=True, + verify_certs=False, + ssl_show_warn=False, + pool_maxsize=20 + ) + + if not client.indices.exists(indexName): + client.indices.create(indexName, { + "settings": { + "index": { + "number_of_shards": 3, + "number_of_replicas": 0, + "codec": "zstd_no_dict", + "replication.type": "SEGMENT" + }, + }, + "mappings": { + "dynamic": False + } + }) + + def store_results(hits): + def _generate_data(): + for r in hits: + r['_index'] = indexName + r['_id'] = r['id'] + yield r + succeeded = 0 + failed = 0 + for success, item in helpers.parallel_bulk(client, actions=_generate_data(), + raise_on_exception=False, + raise_on_error=False, + chunk_size=5000, + max_chunk_bytes=50 * 1024 * 1024, + timeout=180): + if success: + succeeded = succeeded + 1 + else: + print(item["index"]["error"]) + failed = failed + 1 + + + headers = {'Accept': 'application/json'} + + r = requests.get('https://data.europa.eu/api/hub/search/search?filter=dataset&aggregation=false&limit=500&showScore=true&scroll=true', headers=headers).json() + scroll_id = r['result']['scrollId'] + results = r['result']['results'] + store_results(results) + + while scroll_id: + r = requests.get('https://data.europa.eu/api/hub/search/scroll?scrollId=' + scroll_id, headers=headers).json() + scroll_id = r['result']['scrollId'] + results = r['result']['results'] + store_results(results) + + everything() + + +harvest()