import os from datetime import timedelta import time import pendulum import requests from airflow.decorators import dag from airflow.decorators import task from airflow.hooks.base import BaseHook from opensearchpy import OpenSearch, helpers S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn") EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) default_args = { "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)), "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), } @dag( dag_id="open_data_portal_harvest", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule=None, dagrun_timeout=None, catchup=False, default_args=default_args, params={ "S3_CONN_ID": "s3_conn", "OPENSEARCH_CONN_ID": "opensearch_default", "OS_INDEX_NAME": "euodp_raw" }, tags=["aggregation"] ) def harvest(): @task def everything(**context): index_name = context["params"]["OS_INDEX_NAME"] conn = BaseHook.get_connection(context["params"]["OPENSEARCH_CONN_ID"]) client = OpenSearch( hosts=[{'host': conn.host, 'port': conn.port}], http_auth=(conn.login, conn.password), use_ssl=True, verify_certs=False, ssl_show_warn=False, pool_maxsize=20 ) if not client.indices.exists(index_name): client.indices.create(index_name, { "settings": { "index": { "number_of_shards": 3, "number_of_replicas": 0, "codec": "zstd_no_dict", "replication.type": "SEGMENT" }, }, "mappings": { "dynamic": False } }) def store_results(hits): def _generate_data(): for r in hits: r['_index'] = index_name r['_id'] = r['id'] yield r succeeded = 0 failed = 0 for success, item in helpers.parallel_bulk(client, actions=_generate_data(), raise_on_exception=False, raise_on_error=False, chunk_size=5000, max_chunk_bytes=50 * 1024 * 1024, timeout=180): if success: succeeded = succeeded + 1 else: print(item["index"]["error"]) failed = failed + 1 headers = {'Accept': 'application/json'} r = requests.get('https://data.europa.eu/api/hub/search/search?filter=dataset&aggregation=false&limit=300&showScore=true&scroll=true', headers=headers).json() scroll_id = r['result']['scrollId'] results = r['result']['results'] store_results(results) max_retries = 10 while scroll_id: try: r = requests.get('https://data.europa.eu/api/hub/search/scroll?scrollId=' + scroll_id, headers=headers) r.raise_for_status() except Exception as e: print(f"Error:" + str(e)) time.sleep(0.1) max_retries = max_retries - 1 if max_retries == 0: raise Exception("Cannot fetch data") continue max_retries = 10 r = r.json() scroll_id = r['result']['scrollId'] results = r['result']['results'] if len(results) <= 0: return store_results(results) everything() harvest()