moved to class, logs, main with args, functions for jupiter notebook

This commit is contained in:
Alfredo Oliviero 2024-10-31 13:55:13 +01:00
parent 4364642bb9
commit cbc734eeb0
1 changed files with 272 additions and 192 deletions

View File

@ -1,214 +1,294 @@
import os
import cdsapi
CONFIG_FILE = ".cdsapirc"
CURRENT_CDSAPIRC = "./" + CONFIG_FILE
HOME_CDSAPIRC = "~/" + CONFIG_FILE
DEFAULT_URL = 'https://cds-beta.climate.copernicus.eu/api'
ENV_CDSAPI_URL = 'CDSAPI_URL'
ENV_CDSAPI_KEY = 'CDSAPI_KEY'
CONFIG_URL = 'url'
CONFIG_KEY = 'key'
import argparse
def create_config(url, key):
return {'url': url, 'key': key}
def cds_help():
"""Provide instructions on how to use methods programmatically from Jupyter."""
help_message = '''
CDS Authentication Helper - Jupyter Usage Guide
Available Functions:
---------------------
- cds_authenticate(verbose=True): Perform authentication and return CDS client.
- cds_remove_conf(verbose=True): Remove saved configurations from default paths.
- cds_save_conf(config=None, verbose=True): Save configuration to the default path.
- cds_remove_env(verbose=True): Remove environment-based credentials for CDS.
- cds_show_conf(verbose=True): Display current configuration from environment and files.
def config_from_env(verbose=False):
config = {}
url = os.environ.get(ENV_CDSAPI_URL)
key = os.environ.get(ENV_CDSAPI_KEY)
Usage Examples:
---------------
1. Authenticate and get CDS client:
client = cds_authenticate()
if url is not None:
config[CONFIG_URL] = url
2. Remove configurations from default paths:
cds_remove_conf()
if key is not None:
config[CONFIG_KEY] = key
3. Save the current or new configuration:
cds_save_conf()
if url is not None and key is not None:
if verbose:
print("config from environment %s - %s" %
(ENV_CDSAPI_URL, ENV_CDSAPI_KEY))
4. Remove environment variables for CDS API:
cds_remove_env()
5. Show current configuration from environment and files:
cds_show_conf()
'''
print(help_message)
class AuthCDS:
def __init__(self, verbose=False):
"""Initialize AuthCDS with optional verbose logging."""
self.verbose = verbose
self.CONFIG_FILE = ".cdsapirc"
self.CURRENT_CDSAPIRC = "./" + self.CONFIG_FILE
# Expands `~` to the home directory path
self.HOME_CDSAPIRC = os.path.expanduser("~/" + self.CONFIG_FILE)
self.DEFAULT_URL = 'https://cds-beta.climate.copernicus.eu/api'
self.ENV_CDSAPI_URL = 'CDSAPI_URL'
self.ENV_CDSAPI_KEY = 'CDSAPI_KEY'
self.CONFIG_URL = 'url'
self.CONFIG_KEY = 'key'
def log(self, message):
"""Log messages if verbose mode is enabled."""
if self.verbose:
print(message)
def create_config(self, url, key):
"""Create a configuration dictionary."""
return {self.CONFIG_URL: url, self.CONFIG_KEY: key}
def config_from_env(self):
"""Retrieve configuration from environment variables."""
config = {}
url = os.environ.get(self.ENV_CDSAPI_URL)
key = os.environ.get(self.ENV_CDSAPI_KEY)
if url is not None:
config[self.CONFIG_URL] = url
if key is not None:
config[self.CONFIG_KEY] = key
if url and key:
self.log(f"Configuration from environment {config}")
return config
return None
def config_to_env(self, config):
"""Set configuration in environment variables."""
os.environ[self.ENV_CDSAPI_URL] = config[self.CONFIG_URL]
os.environ[self.ENV_CDSAPI_KEY] = config[self.CONFIG_KEY]
self.log(f"Set environment variables {
self.ENV_CDSAPI_URL}, {self.ENV_CDSAPI_KEY}")
def config_to_file(self, config, config_path=None):
"""Save configuration to a file."""
if not config_path:
config_path = self.HOME_CDSAPIRC
else:
# Expands `~` to the full home directory path
config_path = os.path.expanduser(config_path)
URL = config[self.CONFIG_URL]
KEY = config[self.CONFIG_KEY]
with open(config_path, 'w') as file:
file.write(f"url: {URL}\nkey: {KEY}\n")
self.log(f"Saved Configuration file {config_path}")
def config_to_home(self, config):
"""Save configuration to user home."""
self.config_to_file(config, self.HOME_CDSAPIRC)
def remove_config_from_home(self, config):
"""remove configuration from user home."""
self.remove_conf_credentials(config, from_home=True)
def config_from_file(self, scan_all=False, custom_path=None):
"""Retrieve configuration from a file."""
config = None
check_paths = [self.CURRENT_CDSAPIRC, self.HOME_CDSAPIRC]
if custom_path:
check_paths = [custom_path]
for path in check_paths:
if os.path.exists(path):
found_config = cdsapi.api.read_config(path)
self.log(f"Configuration from file {path}: {found_config}")
config = config or found_config
if not scan_all:
break
return config
return None
def authenticate(self, set_environment=True):
"""Authenticate and set environment variables."""
config = self.config_from_env()
if not config:
config = self.config_from_file()
if not config:
config = self.query_credentials()
if set_environment and config:
self.config_to_env(config)
return config
def query_credentials(self):
"""Prompt user for URL and key if no configuration is found."""
url = input(
f"Insert URL (default: {self.DEFAULT_URL}): ") or self.DEFAULT_URL
key = input("Insert your key: ")
config = self.create_config(url, key)
save_option = input("Save config? (y/n): ").lower() == "y"
if save_option:
self.config_to_file(config)
return config
def get_authenticated_client(self):
"""Return an authenticated CDS client."""
config = self.authenticate()
if not config:
self.log("Cannot obtain authentication")
return None
return cdsapi.Client(url=config[self.CONFIG_URL], key=config[self.CONFIG_KEY])
def show_credentials(self):
"""Display current configuration from env and files."""
config_env = self.config_from_env()
config_file = self.config_from_file(scan_all=True)
# self.log(f"Configuration from environment: {config_env}")
# self.log(f"Configuration from file: {config_file}")
return config_env, config_file
def remove_conf_credentials(self, from_home=True, from_current_path=True, custom_path=None):
"""Remove saved configurations from files."""
paths_removed = []
if from_home and os.path.exists(self.HOME_CDSAPIRC):
os.remove(self.HOME_CDSAPIRC)
paths_removed.append(self.HOME_CDSAPIRC)
self.log(f"Removed configuration {self.HOME_CDSAPIRC}")
if from_current_path and os.path.exists(self.CURRENT_CDSAPIRC):
os.remove(self.CURRENT_CDSAPIRC)
paths_removed.append(self.CURRENT_CDSAPIRC)
self.log(f"Removed configuration {self.CURRENT_CDSAPIRC}")
if custom_path and os.path.exists(custom_path):
os.remove(custom_path)
paths_removed.append(custom_path)
self.log(f"Removed configuration {custom_path}")
return paths_removed
def config_to_env(config, verbose=False):
os.environ[ENV_CDSAPI_URL] = config[CONFIG_URL]
os.environ[ENV_CDSAPI_KEY] = config[CONFIG_KEY]
if verbose:
print("set envs %s , %s" % (ENV_CDSAPI_URL, ENV_CDSAPI_KEY))
def remove_env_credentials(self):
"""Remove authentication credentials from environment variables."""
removed_envs = []
if self.ENV_CDSAPI_URL in os.environ:
del os.environ[self.ENV_CDSAPI_URL]
self.log(f"Removed environment variable {self.ENV_CDSAPI_URL}")
if self.ENV_CDSAPI_KEY in os.environ:
del os.environ[self.ENV_CDSAPI_KEY]
self.log(f"Removed environment variable {self.ENV_CDSAPI_KEY}")
return removed_envs
# Standalone function for authentication from Jupyter
def config_to_file(config, config_path=HOME_CDSAPIRC, verbose=False):
URL = config[CONFIG_URL]
KEY = config[CONFIG_KEY]
with open(config_path, 'w') as file:
file.write(f"url: {URL}\n")
file.write(f"key: {KEY}\n")
if verbose:
print("saved config file %s" % config_path)
def cds_authenticate(verbose=True):
"""Perform authentication and return the CDS client if successful."""
auth = AuthCDS(verbose=verbose)
client = auth.get_authenticated_client()
return client
def config_from_file(custom_path=None, verbose=False):
config = None
check_paths = [CURRENT_CDSAPIRC, HOME_CDSAPIRC]
# Path of the .cdsapirc file
if custom_path:
check_paths = [custom_path]
for path in check_paths:
# Check if the file exists
if os.path.exists(path):
config = cdsapi.api.read_config(path)
if verbose:
print("config from file %s" % path)
break
if not config:
return None
return config
def cds_remove_conf(verbose=True):
auth = AuthCDS(verbose=verbose)
auth.remove_conf_credentials()
def authenticate(set_environment=True, verbose=False):
# config from OS
config = config_from_env(verbose=verbose)
if not config:
# config from file
config = config_from_file(verbose=verbose)
if not config:
config = query_credentials(verbose=verbose)
if set_environment and config is not None:
config_to_env(config, verbose=verbose)
return config
def input_url():
URL = input(f"Insert URL. Leave empty for default {
DEFAULT_URL}: ") or DEFAULT_URL
return URL
def input_key():
KEY = input("Insert your key: ")
return KEY
def query_url():
URL = input_url()
os.environ[ENV_CDSAPI_URL] = URL
def query_key():
KEY = input_key()
os.environ[ENV_CDSAPI_KEY] = KEY
def query_save(config, verbose=False):
save_to_file = input(
"Do you want to save the URL and KEY to the .cdsapirc file? (h)ome / (c)urrent / path. empty to skip ")
save_to_file = save_to_file.strip()
if save_to_file == "":
if verbose:
print("configuration not saved.")
return None
if save_to_file.lower().strip() == "h":
save_to_file = HOME_CDSAPIRC
elif save_to_file.lower().strip() == "c":
save_to_file = CURRENT_CDSAPIRC
else:
save_to_file + "/" + CONFIG_FILE
config_to_file(config, save_to_file)
if verbose:
print("URL and KEY saved to %s" % save_to_file)
return config
def query_credentials(verbose=False):
URL = input_url()
KEY = input_key()
config = {'url': URL, 'key': KEY}
query_save(config, verbose=verbose)
return config
def show_credentials(verbose=False):
# config from ENV
config_env = None
config_file = None
config_env = config_from_env()
if verbose:
print("config from env: %s" % config_env)
# if not verbose, we just need to find the first one
# if verbose, we need to print all the existing configs
elif config_env is not None:
return config_env
config_file = config_from_file()
if verbose:
print("config from file: %s" % config_file)
elif config_file is not None:
return config_file
config = config_env or config_file
if verbose:
if config is None:
print("cannot obtain configuration")
else:
print("current config: %s" % config)
return config
def removeCredentials(from_home=True, from_current=True, custom_path=None, verbose=True):
if from_home and os.path.exists(HOME_CDSAPIRC):
if verbose:
print("removing configuration %s" % HOME_CDSAPIRC)
os.remove(HOME_CDSAPIRC)
if from_current and os.path.exists(CURRENT_CDSAPIRC):
if verbose:
print("removing configuration %s" % CURRENT_CDSAPIRC)
os.remove(CURRENT_CDSAPIRC)
if custom_path is not None and os.path.exists(custom_path):
if verbose:
print("removing configuration %s" % custom_path)
os.remove(custom_path)
def getAuthenticatedClient(verbose=False):
config = authenticate(verbose=verbose)
def cds_save_conf(config=None, verbose=True):
auth = AuthCDS(verbose=verbose)
if config is None:
if verbose:
print("cannot obtain authentication")
return None
config = auth.authenticate()
if config is not None:
auth.config_to_file(config)
c = cdsapi.Client(url=config.get("url"), key=config.get("key"))
return c
# config from file ./.cdsapirc
# {'url': 'https://cds-beta.climate.copernicus.eu/api', 'key': 'db1f2085-6b8b-42e6-b832-625dfaf831a4'}
def cds_remove_env(verbose=True):
auth = AuthCDS(verbose=verbose)
auth.remove_env_credentials()
def cds_show_conf(verbose=True):
auth = AuthCDS(verbose=verbose)
auth.show_credentials()
def main():
parser = argparse.ArgumentParser(
description="Authenticate with CDS and configure .cdsapirc")
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Enable verbose mode for detailed logging"
)
parser.add_argument(
"-a", "--authenticate",
action="store_true",
help="Run authentication and set environment variables"
)
parser.add_argument(
"-s", "--save-config",
metavar="PATH",
type=str,
help="Save configuration to a specific file path (default: ~/.cdsapirc)"
)
parser.add_argument(
"-r", "--remove-config",
action="store_true",
help="Remove saved configurations from default paths"
)
parser.add_argument(
"-q", "--query-credentials",
action="store_true",
help="Prompt user to re-enter credentials and optionally save them"
)
parser.add_argument(
"-p", "--print-config",
action="store_true",
help="Print current configuration from environment and files"
)
args = parser.parse_args()
auth = AuthCDS(verbose=args.verbose)
if args.authenticate:
client = auth.get_authenticated_client()
if client:
auth.log("Authenticated successfully")
else:
auth.log("Authentication failed")
if args.save_config:
config = auth.config_from_env() or auth.config_from_file()
if config:
auth.config_to_file(config, config_path=args.save_config)
auth.log(f"Configuration saved to {args.save_config}")
else:
auth.log("No configuration found to save")
if args.remove_config:
removed_paths = auth.remove_conf_credentials()
auth.log(f"Removed configurations from paths: {removed_paths}")
if args.query_credentials:
auth.query_credentials()
auth.log("Credentials queried and optionally saved")
if args.print_config:
env_config, file_config = auth.show_credentials()
auth.log(f"Environment Config: {env_config}")
auth.log(f"File Config: {file_config}")