d4science_copernicus_cds/d4science_auth_cds/AuthCDS.py

169 lines
6.2 KiB
Python
Raw Normal View History

2024-10-31 17:45:55 +01:00
import os
import cdsapi
DEFAULT_URL = 'https://cds-beta.climate.copernicus.eu/api'
ENV_CDSAPI_URL = 'CDSAPI_URL'
ENV_CDSAPI_KEY = 'CDSAPI_KEY'
CONFIG_URL = 'url'
CONFIG_KEY = 'key'
class AuthCDS:
def __init__(self, verbose=False):
"""Initialize AuthCDS with optional verbose logging."""
self.verbose = verbose
self.CONFIG_FILE = ".cdsapirc"
self.CURRENT_CDSAPIRC = "./" + self.CONFIG_FILE
# Expands `~` to the home directory path
self.HOME_CDSAPIRC = os.path.expanduser("~/" + self.CONFIG_FILE)
self.DEFAULT_URL = DEFAULT_URL
# self.ENV_CDSAPI_URL = ENV_CDSAPI_URL
# self.ENV_CDSAPI_KEY = ENV_CDSAPI_KEY
# self.CONFIG_URL = CONFIG_URL
# self.CONFIG_KEY = CONFIG_KEY
def log(self, message, force=False):
"""Log messages if verbose mode is enabled."""
if self.verbose or force:
print(message)
def create_config(self, url, key):
"""Create a configuration dictionary."""
return {self.CONFIG_URL: url, self.CONFIG_KEY: key}
def config_from_env(self):
"""Retrieve configuration from environment variables."""
url = os.environ.get(ENV_CDSAPI_URL)
key = os.environ.get(ENV_CDSAPI_KEY)
self.log("ENV - %s %s" % ( url, key))
if url is None and key is None:
self.log("env is not configured")
return None
config = {}
config[CONFIG_URL] = url
config[CONFIG_KEY] = key
self.log(f"Configuration from environment {config}")
return config
def config_to_env(self, config):
"""Set configuration in environment variables."""
os.environ[ENV_CDSAPI_URL] = config[CONFIG_URL]
os.environ[ENV_CDSAPI_KEY] = config[CONFIG_KEY]
self.log(f"Set environment variables {
ENV_CDSAPI_URL}, {ENV_CDSAPI_KEY}")
def config_to_file(self, config, config_path=None):
"""Save configuration to a file."""
if not config_path:
config_path = self.HOME_CDSAPIRC
else:
# Expands `~` to the full home directory path
config_path = os.path.expanduser(config_path)
URL = config[CONFIG_URL]
KEY = config[CONFIG_KEY]
with open(config_path, 'w') as file:
file.write(f"url: {URL}\nkey: {KEY}\n")
self.log(f"Saved Configuration file {config_path}")
def config_to_home(self, config):
"""Save configuration to user home."""
self.config_to_file(config, self.HOME_CDSAPIRC)
def remove_config_from_home(self, config):
"""remove configuration from user home."""
self.remove_conf_credentials(config, from_home=True)
def config_from_file(self, scan_all=False, custom_path=None):
"""Retrieve configuration from a file."""
config = None
check_paths = [self.CURRENT_CDSAPIRC, self.HOME_CDSAPIRC]
if custom_path:
check_paths = [custom_path]
for path in check_paths:
if os.path.exists(path):
found_config = cdsapi.api.read_config(path)
self.log(f"Configuration from file {path}: {found_config}")
config = config or found_config
if not scan_all:
break
return config
def authenticate(self, set_environment=True):
"""Authenticate and set environment variables."""
config = self.config_from_env()
if not config:
config = self.config_from_file()
if not config:
config = self.query_credentials()
if set_environment and config:
self.log("saving config to env")
self.config_to_env(config)
return config
def query_credentials(self):
"""Prompt user for URL and key if no configuration is found."""
url = input(
f"Insert URL (default: {self.DEFAULT_URL}): ") or self.DEFAULT_URL
key = input("Insert your key: ")
config = self.create_config(url, key)
save_option = input("Save config? (y/n): ").lower() == "y"
if save_option:
self.config_to_file(config)
return config
def get_authenticated_client(self):
"""Return an authenticated CDS client."""
config = self.authenticate()
if not config:
self.log("Cannot obtain authentication")
return None
return cdsapi.Client(url=config[CONFIG_URL], key=config[CONFIG_KEY])
def show_credentials(self):
"""Display current configuration from env and files."""
config_env = self.config_from_env()
config_file = self.config_from_file(scan_all=True)
self.log(f"Configuration from environment: {config_env}")
self.log(f"Configuration from file: {config_file}")
return config_env, config_file
def remove_conf_credentials(self, from_home=True, from_current_path=True, custom_path=None):
"""Remove saved configurations from files."""
paths_removed = []
if from_home and os.path.exists(self.HOME_CDSAPIRC):
os.remove(self.HOME_CDSAPIRC)
paths_removed.append(self.HOME_CDSAPIRC)
self.log(f"Removed configuration {self.HOME_CDSAPIRC}")
if from_current_path and os.path.exists(self.CURRENT_CDSAPIRC):
os.remove(self.CURRENT_CDSAPIRC)
paths_removed.append(self.CURRENT_CDSAPIRC)
self.log(f"Removed configuration {self.CURRENT_CDSAPIRC}")
if custom_path and os.path.exists(custom_path):
os.remove(custom_path)
paths_removed.append(custom_path)
self.log(f"Removed configuration {custom_path}")
return paths_removed
def remove_env_credentials(self):
"""Remove authentication credentials from environment variables."""
removed_envs = []
if self.ENV_CDSAPI_URL in os.environ:
del os.environ[self.ENV_CDSAPI_URL]
self.log(f"Removed environment variable {self.ENV_CDSAPI_URL}")
if self.ENV_CDSAPI_KEY in os.environ:
del os.environ[self.ENV_CDSAPI_KEY]
self.log(f"Removed environment variable {self.ENV_CDSAPI_KEY}")
return removed_envs