lot1-kickoff/airflow/dags/OpenDataPortal_harvest.py

107 lines
3.7 KiB
Python

import os
from datetime import timedelta, time
import pendulum
import requests
from airflow.decorators import dag
from airflow.decorators import task
from airflow.hooks.base import BaseHook
from opensearchpy import OpenSearch, helpers
S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}
@dag(
dag_id="open_data_portal_harvest",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
default_args=default_args,
params={
"S3_CONN_ID": "s3_conn",
"OPENSEARCH_CONN_ID": "opensearch_default",
"OS_INDEX_NAME": "euodp_raw"
},
tags=["aggregation"]
)
def harvest():
@task
def everything(**context):
index_name = context["params"]["OS_INDEX_NAME"]
conn = BaseHook.get_connection(context["params"]["OPENSEARCH_CONN_ID"])
client = OpenSearch(
hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(conn.login, conn.password),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
pool_maxsize=20
)
if not client.indices.exists(index_name):
client.indices.create(index_name, {
"settings": {
"index": {
"number_of_shards": 3,
"number_of_replicas": 0,
"codec": "zstd_no_dict",
"replication.type": "SEGMENT"
},
},
"mappings": {
"dynamic": False
}
})
def store_results(hits):
def _generate_data():
for r in hits:
r['_index'] = index_name
r['_id'] = r['id']
yield r
succeeded = 0
failed = 0
for success, item in helpers.parallel_bulk(client, actions=_generate_data(),
raise_on_exception=False,
raise_on_error=False,
chunk_size=5000,
max_chunk_bytes=50 * 1024 * 1024,
timeout=180):
if success:
succeeded = succeeded + 1
else:
print(item["index"]["error"])
failed = failed + 1
headers = {'Accept': 'application/json'}
r = requests.get('https://data.europa.eu/api/hub/search/search?filter=dataset&aggregation=false&limit=300&showScore=true&scroll=true', headers=headers).json()
scroll_id = r['result']['scrollId']
results = r['result']['results']
store_results(results)
max_retries = 10
while scroll_id:
r = requests.get('https://data.europa.eu/api/hub/search/scroll?scrollId=' + scroll_id, headers=headers)
if r.status_code != 200:
print(f"Error n. {r.status_code}: {r.text}")
time.sleep(0.1)
max_retries = max_retries - 1
if max_retries == 0:
raise Exception("Cannot fetch data")
continue
max_retries = 10
r = r.json()
scroll_id = r['result']['scrollId']
results = r['result']['results']
store_results(results)
everything()
harvest()