initial stage

This commit is contained in:
Giambattista Bloisi 2024-03-25 15:45:43 +01:00
parent 072fb76a26
commit 349db6f602
4 changed files with 22 additions and 30 deletions

View File

@ -1,22 +0,0 @@
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'catchup': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('hello_world', default_args=default_args, schedule_interval=timedelta(days=1), catchup=False)
t1 = BashOperator(
task_id='say_hello',
bash_command='echo "Hello World from Airflow!"',
dag=dag,
)

View File

@ -49,6 +49,24 @@ def strip_prefix(s, p):
return s
def map_access_right(ar: str) -> str:
match ar:
case 'open':
return 'Open Access'
case 'closed':
return 'Closed'
case 'embargo':
return 'Embargo'
case 'restricted':
return 'Restricted'
case _:
return ''
def map_product(p: dict) -> dict:
p['accessRight'] = list(filter(lambda ar: ar != '', map(lambda m: map_access_right(m.get('access_rights')), p.get('manifestations'))))
return p
@dag(
schedule=None,
dagrun_timeout=None,
@ -116,14 +134,6 @@ def import_EOSC_graph():
},
"mappings": mappings[entity]
# "mappings":{
# "dynamic": False,
# "properties": {
# "local_identifier": {
# "type": "keyword"
# }
# }
# }
})
def compute_batches(ds=None, **kwargs):

View File

@ -554,6 +554,10 @@ mappings['products'] = {
}
}
},
"type": {
"type": "alias",
"path": "product_type"
},
"topics": {
"type": "object",
"properties": {