299 lines
8.7 KiB
Python
299 lines
8.7 KiB
Python
from datetime import datetime
|
|
from enum import Enum
|
|
from flask_openapi3 import Info, Tag
|
|
from flask_openapi3 import OpenAPI
|
|
from jsonargparse import ArgumentParser
|
|
from opensearchpy import OpenSearch, NotFoundError, helpers
|
|
from pydantic import BaseModel, SecretStr
|
|
import logging
|
|
|
|
|
|
|
|
parser = ArgumentParser(env_prefix="CURATION", default_env=True)
|
|
parser.add_argument("--opensearch.host", default='opensearch-cluster.local-dataplatform')
|
|
parser.add_argument("--opensearch.port", default=443, type=int)
|
|
parser.add_argument("--opensearch.user", default="admin", type=SecretStr)
|
|
parser.add_argument("--opensearch.password", default="admin", type=SecretStr)
|
|
parser.add_argument("--debug", default=False, type=bool)
|
|
cfg = parser.parse_args()
|
|
|
|
print(cfg.as_dict())
|
|
|
|
client = OpenSearch(
|
|
hosts=[{'host': cfg.get("opensearch.host"), 'port': cfg.get("opensearch.port")}],
|
|
http_auth=(cfg.get("opensearch.user").get_secret_value(), cfg.get("opensearch.password").get_secret_value()),
|
|
use_ssl=True,
|
|
verify_certs=False,
|
|
ssl_show_warn=False,
|
|
pool_maxsize=20,
|
|
)
|
|
|
|
# if client.indices.exists("curation"):
|
|
# client.indices.delete("curation")
|
|
|
|
if not client.indices.exists("curation"):
|
|
client.indices.create("curation", {
|
|
"settings": {
|
|
"index": {
|
|
"number_of_shards": 10,
|
|
"number_of_replicas": 0,
|
|
"codec": "zstd_no_dict",
|
|
"replication.type": "SEGMENT"
|
|
},
|
|
},
|
|
"mappings": {
|
|
"dynamic": "strict",
|
|
"properties": {
|
|
"local_identifier": {
|
|
"type": "keyword"
|
|
},
|
|
"timestamp": {
|
|
"type": "date",
|
|
"format": "date_hour_minute_second_fraction"
|
|
},
|
|
"creator": {
|
|
"type": "keyword"
|
|
},
|
|
"status": {
|
|
"type": "keyword"
|
|
},
|
|
"note": {
|
|
"index": False,
|
|
"type": "text"
|
|
},
|
|
|
|
"log": {
|
|
"type": "object",
|
|
"properties": {
|
|
"timestamp": {
|
|
"format": "date_hour_minute_second_fraction",
|
|
"type": "date"
|
|
},
|
|
"creator": {
|
|
"type": "keyword"
|
|
},
|
|
"status": {
|
|
"index": False,
|
|
"type": "keyword"
|
|
},
|
|
"note": {
|
|
"index": False,
|
|
"type": "text"
|
|
},
|
|
}
|
|
}
|
|
}
|
|
}
|
|
})
|
|
|
|
|
|
info = Info(title="Curator API", version="1.0.0")
|
|
app = OpenAPI(__name__, info=info)
|
|
curation_tag = Tag(name="curation", description="Curator API")
|
|
|
|
|
|
class CurationStatus(str, Enum):
|
|
valid = "valid"
|
|
withdrawn = "withdrawn"
|
|
alert = "alert"
|
|
restore = "restore"
|
|
reset = "reset"
|
|
|
|
|
|
class CurationRequest(BaseModel):
|
|
local_identifier: str
|
|
creator: str
|
|
status: CurationStatus
|
|
note: str
|
|
|
|
|
|
class LogEntry(BaseModel):
|
|
timestamp: str
|
|
creator: str
|
|
status: CurationStatus
|
|
note: str
|
|
|
|
|
|
class CurationResponse(BaseModel):
|
|
local_identifier: str
|
|
timestamp: str
|
|
creator: str
|
|
status: CurationStatus
|
|
note: str
|
|
log: list[LogEntry]
|
|
|
|
|
|
@app.route('/health')
|
|
def health_check():
|
|
if all_required_services_are_running():
|
|
return 'OK', 200
|
|
else:
|
|
return 'Service Unavailable', 500
|
|
|
|
|
|
def all_required_services_are_running():
|
|
os_health = client.cluster.health()
|
|
return os_health['status'] in ['green', 'yellow'] and os_health['number_of_nodes'] > 0
|
|
|
|
|
|
@app.post("/curation", summary="set curation",
|
|
responses={200: CurationResponse},
|
|
tags=[curation_tag])
|
|
def post_curation(query: CurationRequest):
|
|
"""
|
|
set curation status
|
|
"""
|
|
curation = dict()
|
|
|
|
try:
|
|
hit = client.get(index="curation", id=query.local_identifier)
|
|
curation = hit['_source']
|
|
|
|
if query.status.name == curation['status']:
|
|
return {"msg": "status is not changed"}, 403
|
|
|
|
# move current status in history
|
|
annotations = curation['log'] if 'log' in curation else list()
|
|
if isinstance(annotations, dict):
|
|
annotations = [annotations]
|
|
annotations.insert(0, {
|
|
"timestamp": curation['timestamp'],
|
|
"creator": curation['creator'],
|
|
"status": curation['status'],
|
|
"note": curation['note'],
|
|
})
|
|
annotations = annotations[0:100]
|
|
curation['log'] = annotations
|
|
curation['timestamp'] = datetime.now().isoformat()
|
|
curation['creator'] = query.creator
|
|
curation['note'] = query.note
|
|
|
|
print(curation)
|
|
|
|
# todo check status transition
|
|
match query.status.name:
|
|
case "valid":
|
|
if curation['status'] not in ('restore', 'reset'):
|
|
return {"msg": "status cannot be updated to 'valid'"}, 403
|
|
curation['status'] = query.status.name
|
|
case "withdrawn":
|
|
curation['status'] = query.status.name
|
|
case "alert":
|
|
curation['status'] = query.status.name
|
|
case "restore":
|
|
if curation['status'] != "withdrawn":
|
|
return {"msg": "only withdrawn records can be restored'"}, 403
|
|
curation['status'] = query.status.name
|
|
case "reset":
|
|
curation['status'] = query.status.name
|
|
|
|
#TODO transactionality in case of failure?
|
|
client.index(
|
|
index='curation',
|
|
id=query.local_identifier,
|
|
body=curation,
|
|
refresh=True,
|
|
if_primary_term=hit['_primary_term'],
|
|
if_seq_no=hit['_seq_no']
|
|
)
|
|
metadata_status = curation['status']
|
|
|
|
if metadata_status == 'reset':
|
|
client.update(
|
|
index='products',
|
|
id=query.local_identifier,
|
|
body={
|
|
"script": {"source": "ctx._source.remove(\"status\")"}
|
|
},
|
|
refresh=True
|
|
)
|
|
else:
|
|
if metadata_status == "restore":
|
|
metadata_status = 'valid'
|
|
|
|
client.update(
|
|
index='products',
|
|
id=query.local_identifier,
|
|
body={
|
|
"doc": {"status": metadata_status}
|
|
},
|
|
refresh=True
|
|
)
|
|
except NotFoundError:
|
|
curation['local_identifier'] = query.local_identifier
|
|
curation['timestamp'] = datetime.now().isoformat()
|
|
curation['status'] = query.status.name
|
|
curation['creator'] = query.creator
|
|
curation['note'] = query.note
|
|
|
|
match query.status.name:
|
|
case "restore":
|
|
return {"msg": "cannot restore: status does not exist'"}, 403
|
|
case "reset":
|
|
return {"msg": "cannot reset: status does not exist'"}, 403
|
|
|
|
client.index(
|
|
index='curation',
|
|
id=query.local_identifier,
|
|
body=curation,
|
|
refresh=True,
|
|
op_type='create'
|
|
)
|
|
client.update(
|
|
index='products',
|
|
id=query.local_identifier,
|
|
body={
|
|
"doc": {"status": curation['status']}
|
|
},
|
|
refresh=True
|
|
)
|
|
|
|
return curation
|
|
|
|
|
|
@app.get("/curation", summary="get curation", tags=[curation_tag])
|
|
def get_curation(local_identifier: str):
|
|
"""
|
|
to get a curation record
|
|
"""
|
|
try:
|
|
hit = client.get(index="curation", id=local_identifier)
|
|
|
|
return {
|
|
"code": 0,
|
|
"message": "ok",
|
|
"data": hit['_source']
|
|
}
|
|
except NotFoundError:
|
|
return {"msg": f"Cannot fetch: '{local_identifier}' does not exist'"}, 403
|
|
|
|
|
|
@app.get("/alerts", summary="get curation in alert status", tags=[curation_tag])
|
|
def get_alerts():
|
|
"""
|
|
to get a curation record
|
|
"""
|
|
query = {
|
|
"query": {
|
|
"terms": {
|
|
"status": [CurationStatus.alert]
|
|
}
|
|
}
|
|
}
|
|
return {
|
|
"code": 0,
|
|
"message": "ok",
|
|
"data": list(helpers.scan(client, index="curation", query=query))
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
debug = False
|
|
if debug:
|
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(message)s')
|
|
app.run(debug=True)
|
|
else:
|
|
from waitress import serve
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
|
|
serve(app, host="0.0.0.0", port=5000)
|