initial stage

This commit is contained in:
Giambattista Bloisi 2024-06-03 22:03:06 +02:00
parent d9e7528927
commit 1bc94cd835
1 changed files with 3 additions and 21 deletions

View File

@ -1,21 +1,8 @@
import gzip
import io
import json
import os import os
import zipfile import zipfile
from datetime import timedelta
from airflow.decorators import dag
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.file import TemporaryDirectory from airflow.utils.file import TemporaryDirectory
from airflow.utils.helpers import chain
from airflow.models import Variable
from opensearchpy import OpenSearch, helpers
from EOSC_indexes import mappings
def strip_prefix(s, p): def strip_prefix(s, p):
@ -34,13 +21,8 @@ def s3_dowload_unzip_upload(s3conn: str, src_key: str, src_bucket: str, dest_buc
hook.download_file(key=src_key, bucket_name=src_bucket, local_path=dwl_dir, preserve_file_name=True, hook.download_file(key=src_key, bucket_name=src_bucket, local_path=dwl_dir, preserve_file_name=True,
use_autogenerated_subdir=False) use_autogenerated_subdir=False)
with zipfile.ZipFile(archive, 'r') as zip_ref: with zipfile.ZipFile(archive, 'r') as zip_ref:
zip_ref.extractall(tmp_dir) for info in zip_ref.infolist():
with zip_ref.open(info.filename) as file:
for root, _, files in os.walk(tmp_dir): hook.load_file_obj(file, info.filename, dest_bucket, replace=True)
for file in files:
local_file_path = os.path.join(root, file)
hook.load_file(local_file_path, strip_prefix(local_file_path, tmp_dir + "/"), dest_bucket,
replace=True)