lot1-kickoff/airflow/dags/OpenDataPortal_harvest.py

97 lines
3.5 KiB
Python
Raw Normal View History

2024-06-06 19:48:13 +02:00
import os
from datetime import timedelta
import pendulum
import requests
from airflow.decorators import dag
from airflow.decorators import task
from airflow.hooks.base import BaseHook
from opensearchpy import OpenSearch, helpers
S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}
@dag(
2024-06-06 19:52:42 +02:00
dag_id="open_data_portal_harvest",
2024-06-06 19:48:13 +02:00
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
default_args=default_args,
params={
"S3_CONN_ID": "s3_conn",
"OPENSEARCH_CONN_ID": "opensearch_default",
"OS_INDEX_NAME": "euodp_raw"
},
2024-06-06 19:50:06 +02:00
tags=["aggregation"]
2024-06-06 19:48:13 +02:00
)
def harvest():
@task
def everything(**context):
2024-06-06 19:50:06 +02:00
index_name = context["params"]["OS_INDEX_NAME"]
2024-06-06 19:48:13 +02:00
conn = BaseHook.get_connection(context["params"]["OPENSEARCH_CONN_ID"])
client = OpenSearch(
hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(conn.login, conn.password),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
pool_maxsize=20
)
2024-06-06 19:50:06 +02:00
if not client.indices.exists(index_name):
client.indices.create(index_name, {
2024-06-06 19:48:13 +02:00
"settings": {
"index": {
"number_of_shards": 3,
"number_of_replicas": 0,
"codec": "zstd_no_dict",
"replication.type": "SEGMENT"
},
},
"mappings": {
"dynamic": False
}
})
def store_results(hits):
def _generate_data():
for r in hits:
2024-06-06 19:50:06 +02:00
r['_index'] = index_name
2024-06-06 19:48:13 +02:00
r['_id'] = r['id']
yield r
succeeded = 0
failed = 0
for success, item in helpers.parallel_bulk(client, actions=_generate_data(),
raise_on_exception=False,
raise_on_error=False,
chunk_size=5000,
max_chunk_bytes=50 * 1024 * 1024,
timeout=180):
if success:
succeeded = succeeded + 1
else:
print(item["index"]["error"])
failed = failed + 1
headers = {'Accept': 'application/json'}
r = requests.get('https://data.europa.eu/api/hub/search/search?filter=dataset&aggregation=false&limit=500&showScore=true&scroll=true', headers=headers).json()
scroll_id = r['result']['scrollId']
results = r['result']['results']
store_results(results)
while scroll_id:
r = requests.get('https://data.europa.eu/api/hub/search/scroll?scrollId=' + scroll_id, headers=headers).json()
scroll_id = r['result']['scrollId']
results = r['result']['results']
store_results(results)
everything()
harvest()