initial stage

This commit is contained in:
Giambattista Bloisi 2024-06-06 19:48:13 +02:00
parent 1bc94cd835
commit 94b4add8cd
1 changed files with 101 additions and 0 deletions

View File

@ -0,0 +1,101 @@
import os
from datetime import timedelta
import pendulum
import requests
from airflow.decorators import dag
from airflow.decorators import task
from airflow.hooks.base import BaseHook
from opensearchpy import OpenSearch, helpers
S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}
@dag(
dag_display_name="Open Data Portal harvest",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
default_args=default_args,
params={
"S3_CONN_ID": "s3_conn",
"OPENSEARCH_CONN_ID": "opensearch_default",
"OS_INDEX_NAME": "euodp_raw"
},
tags=["aggregation"],
)
def harvest():
@task
def everything(**context):
indexName = context["params"]["OS_INDEX_NAME"]
conn = BaseHook.get_connection(context["params"]["OPENSEARCH_CONN_ID"])
client = OpenSearch(
hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(conn.login, conn.password),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
pool_maxsize=20
)
if not client.indices.exists(indexName):
client.indices.create(indexName, {
"settings": {
"index": {
"number_of_shards": 3,
"number_of_replicas": 0,
"codec": "zstd_no_dict",
"replication.type": "SEGMENT"
},
},
"mappings": {
"dynamic": False
}
})
def store_results(hits):
def _generate_data():
for r in hits:
r['_index'] = indexName
r['_id'] = r['id']
yield r
succeeded = 0
failed = 0
for success, item in helpers.parallel_bulk(client, actions=_generate_data(),
raise_on_exception=False,
raise_on_error=False,
chunk_size=5000,
max_chunk_bytes=50 * 1024 * 1024,
timeout=180):
if success:
succeeded = succeeded + 1
else:
print(item["index"]["error"])
failed = failed + 1
headers = {'Accept': 'application/json'}
r = requests.get('https://data.europa.eu/api/hub/search/search?filter=dataset&aggregation=false&limit=500&showScore=true&scroll=true', headers=headers).json()
scroll_id = r['result']['scrollId']
results = r['result']['results']
store_results(results)
while scroll_id:
r = requests.get('https://data.europa.eu/api/hub/search/scroll?scrollId=' + scroll_id, headers=headers).json()
scroll_id = r['result']['scrollId']
results = r['result']['results']
store_results(results)
everything()
harvest()