diff --git a/d4science_copernicus/auth_cds.py b/d4science_copernicus/auth_cds.py index b06c858..ee5e3dc 100644 --- a/d4science_copernicus/auth_cds.py +++ b/d4science_copernicus/auth_cds.py @@ -1,214 +1,294 @@ + import os import cdsapi - -CONFIG_FILE = ".cdsapirc" - -CURRENT_CDSAPIRC = "./" + CONFIG_FILE -HOME_CDSAPIRC = "~/" + CONFIG_FILE - -DEFAULT_URL = 'https://cds-beta.climate.copernicus.eu/api' - -ENV_CDSAPI_URL = 'CDSAPI_URL' -ENV_CDSAPI_KEY = 'CDSAPI_KEY' - -CONFIG_URL = 'url' -CONFIG_KEY = 'key' +import argparse -def create_config(url, key): - return {'url': url, 'key': key} +def cds_help(): + """Provide instructions on how to use methods programmatically from Jupyter.""" + help_message = ''' + CDS Authentication Helper - Jupyter Usage Guide + Available Functions: + --------------------- + - cds_authenticate(verbose=True): Perform authentication and return CDS client. + - cds_remove_conf(verbose=True): Remove saved configurations from default paths. + - cds_save_conf(config=None, verbose=True): Save configuration to the default path. + - cds_remove_env(verbose=True): Remove environment-based credentials for CDS. + - cds_show_conf(verbose=True): Display current configuration from environment and files. -def config_from_env(verbose=False): - config = {} - url = os.environ.get(ENV_CDSAPI_URL) - key = os.environ.get(ENV_CDSAPI_KEY) + Usage Examples: + --------------- + 1. Authenticate and get CDS client: + client = cds_authenticate() - if url is not None: - config[CONFIG_URL] = url + 2. Remove configurations from default paths: + cds_remove_conf() - if key is not None: - config[CONFIG_KEY] = key + 3. Save the current or new configuration: + cds_save_conf() - if url is not None and key is not None: - if verbose: - print("config from environment %s - %s" % - (ENV_CDSAPI_URL, ENV_CDSAPI_KEY)) + 4. Remove environment variables for CDS API: + cds_remove_env() + + 5. Show current configuration from environment and files: + cds_show_conf() + + ''' + print(help_message) + +class AuthCDS: + def __init__(self, verbose=False): + """Initialize AuthCDS with optional verbose logging.""" + self.verbose = verbose + self.CONFIG_FILE = ".cdsapirc" + self.CURRENT_CDSAPIRC = "./" + self.CONFIG_FILE + # Expands `~` to the home directory path + self.HOME_CDSAPIRC = os.path.expanduser("~/" + self.CONFIG_FILE) + self.DEFAULT_URL = 'https://cds-beta.climate.copernicus.eu/api' + self.ENV_CDSAPI_URL = 'CDSAPI_URL' + self.ENV_CDSAPI_KEY = 'CDSAPI_KEY' + self.CONFIG_URL = 'url' + self.CONFIG_KEY = 'key' + + def log(self, message): + """Log messages if verbose mode is enabled.""" + if self.verbose: + print(message) + + def create_config(self, url, key): + """Create a configuration dictionary.""" + return {self.CONFIG_URL: url, self.CONFIG_KEY: key} + + def config_from_env(self): + """Retrieve configuration from environment variables.""" + config = {} + url = os.environ.get(self.ENV_CDSAPI_URL) + key = os.environ.get(self.ENV_CDSAPI_KEY) + + if url is not None: + config[self.CONFIG_URL] = url + if key is not None: + config[self.CONFIG_KEY] = key + + if url and key: + self.log(f"Configuration from environment {config}") + return config + return None + + def config_to_env(self, config): + """Set configuration in environment variables.""" + os.environ[self.ENV_CDSAPI_URL] = config[self.CONFIG_URL] + os.environ[self.ENV_CDSAPI_KEY] = config[self.CONFIG_KEY] + self.log(f"Set environment variables { + self.ENV_CDSAPI_URL}, {self.ENV_CDSAPI_KEY}") + + def config_to_file(self, config, config_path=None): + """Save configuration to a file.""" + if not config_path: + config_path = self.HOME_CDSAPIRC + else: + # Expands `~` to the full home directory path + config_path = os.path.expanduser(config_path) + + URL = config[self.CONFIG_URL] + KEY = config[self.CONFIG_KEY] + with open(config_path, 'w') as file: + file.write(f"url: {URL}\nkey: {KEY}\n") + self.log(f"Saved Configuration file {config_path}") + + def config_to_home(self, config): + """Save configuration to user home.""" + self.config_to_file(config, self.HOME_CDSAPIRC) + + def remove_config_from_home(self, config): + """remove configuration from user home.""" + self.remove_conf_credentials(config, from_home=True) + + def config_from_file(self, scan_all=False, custom_path=None): + """Retrieve configuration from a file.""" + config = None + check_paths = [self.CURRENT_CDSAPIRC, self.HOME_CDSAPIRC] + if custom_path: + check_paths = [custom_path] + + for path in check_paths: + if os.path.exists(path): + found_config = cdsapi.api.read_config(path) + self.log(f"Configuration from file {path}: {found_config}") + config = config or found_config + if not scan_all: + break return config - return None + def authenticate(self, set_environment=True): + """Authenticate and set environment variables.""" + config = self.config_from_env() + if not config: + config = self.config_from_file() + if not config: + config = self.query_credentials() + + if set_environment and config: + self.config_to_env(config) + return config + + def query_credentials(self): + """Prompt user for URL and key if no configuration is found.""" + url = input( + f"Insert URL (default: {self.DEFAULT_URL}): ") or self.DEFAULT_URL + key = input("Insert your key: ") + config = self.create_config(url, key) + save_option = input("Save config? (y/n): ").lower() == "y" + if save_option: + self.config_to_file(config) + return config + + def get_authenticated_client(self): + """Return an authenticated CDS client.""" + config = self.authenticate() + if not config: + self.log("Cannot obtain authentication") + return None + return cdsapi.Client(url=config[self.CONFIG_URL], key=config[self.CONFIG_KEY]) + + def show_credentials(self): + """Display current configuration from env and files.""" + config_env = self.config_from_env() + config_file = self.config_from_file(scan_all=True) + # self.log(f"Configuration from environment: {config_env}") + # self.log(f"Configuration from file: {config_file}") + return config_env, config_file + + def remove_conf_credentials(self, from_home=True, from_current_path=True, custom_path=None): + """Remove saved configurations from files.""" + paths_removed = [] + if from_home and os.path.exists(self.HOME_CDSAPIRC): + os.remove(self.HOME_CDSAPIRC) + paths_removed.append(self.HOME_CDSAPIRC) + self.log(f"Removed configuration {self.HOME_CDSAPIRC}") + + if from_current_path and os.path.exists(self.CURRENT_CDSAPIRC): + os.remove(self.CURRENT_CDSAPIRC) + paths_removed.append(self.CURRENT_CDSAPIRC) + self.log(f"Removed configuration {self.CURRENT_CDSAPIRC}") + + if custom_path and os.path.exists(custom_path): + os.remove(custom_path) + paths_removed.append(custom_path) + self.log(f"Removed configuration {custom_path}") + + return paths_removed -def config_to_env(config, verbose=False): - os.environ[ENV_CDSAPI_URL] = config[CONFIG_URL] - os.environ[ENV_CDSAPI_KEY] = config[CONFIG_KEY] - if verbose: - print("set envs %s , %s" % (ENV_CDSAPI_URL, ENV_CDSAPI_KEY)) + def remove_env_credentials(self): + """Remove authentication credentials from environment variables.""" + removed_envs = [] + if self.ENV_CDSAPI_URL in os.environ: + del os.environ[self.ENV_CDSAPI_URL] + self.log(f"Removed environment variable {self.ENV_CDSAPI_URL}") + + if self.ENV_CDSAPI_KEY in os.environ: + del os.environ[self.ENV_CDSAPI_KEY] + self.log(f"Removed environment variable {self.ENV_CDSAPI_KEY}") + + return removed_envs + +# Standalone function for authentication from Jupyter -def config_to_file(config, config_path=HOME_CDSAPIRC, verbose=False): - URL = config[CONFIG_URL] - KEY = config[CONFIG_KEY] - with open(config_path, 'w') as file: - file.write(f"url: {URL}\n") - file.write(f"key: {KEY}\n") - if verbose: - print("saved config file %s" % config_path) +def cds_authenticate(verbose=True): + """Perform authentication and return the CDS client if successful.""" + auth = AuthCDS(verbose=verbose) + client = auth.get_authenticated_client() + return client -def config_from_file(custom_path=None, verbose=False): - config = None - - check_paths = [CURRENT_CDSAPIRC, HOME_CDSAPIRC] - - # Path of the .cdsapirc file - if custom_path: - check_paths = [custom_path] - - for path in check_paths: - # Check if the file exists - if os.path.exists(path): - config = cdsapi.api.read_config(path) - if verbose: - print("config from file %s" % path) - break - - if not config: - return None - - return config +def cds_remove_conf(verbose=True): + auth = AuthCDS(verbose=verbose) + auth.remove_conf_credentials() -def authenticate(set_environment=True, verbose=False): - # config from OS - config = config_from_env(verbose=verbose) - - if not config: - # config from file - config = config_from_file(verbose=verbose) - - if not config: - config = query_credentials(verbose=verbose) - - if set_environment and config is not None: - config_to_env(config, verbose=verbose) - - return config - - -def input_url(): - URL = input(f"Insert URL. Leave empty for default { - DEFAULT_URL}: ") or DEFAULT_URL - return URL - - -def input_key(): - KEY = input("Insert your key: ") - return KEY - - -def query_url(): - URL = input_url() - os.environ[ENV_CDSAPI_URL] = URL - - -def query_key(): - KEY = input_key() - os.environ[ENV_CDSAPI_KEY] = KEY - - -def query_save(config, verbose=False): - save_to_file = input( - "Do you want to save the URL and KEY to the .cdsapirc file? (h)ome / (c)urrent / path. empty to skip ") - save_to_file = save_to_file.strip() - - if save_to_file == "": - if verbose: - print("configuration not saved.") - return None - - if save_to_file.lower().strip() == "h": - save_to_file = HOME_CDSAPIRC - - elif save_to_file.lower().strip() == "c": - save_to_file = CURRENT_CDSAPIRC - - else: - save_to_file + "/" + CONFIG_FILE - - config_to_file(config, save_to_file) - - if verbose: - print("URL and KEY saved to %s" % save_to_file) - - return config - - -def query_credentials(verbose=False): - URL = input_url() - KEY = input_key() - config = {'url': URL, 'key': KEY} - query_save(config, verbose=verbose) - return config - - -def show_credentials(verbose=False): - # config from ENV - config_env = None - config_file = None - - config_env = config_from_env() - - if verbose: - print("config from env: %s" % config_env) - - # if not verbose, we just need to find the first one - # if verbose, we need to print all the existing configs - elif config_env is not None: - return config_env - - config_file = config_from_file() - if verbose: - print("config from file: %s" % config_file) - elif config_file is not None: - return config_file - - config = config_env or config_file - if verbose: - if config is None: - print("cannot obtain configuration") - else: - print("current config: %s" % config) - - return config - - -def removeCredentials(from_home=True, from_current=True, custom_path=None, verbose=True): - if from_home and os.path.exists(HOME_CDSAPIRC): - if verbose: - print("removing configuration %s" % HOME_CDSAPIRC) - os.remove(HOME_CDSAPIRC) - - if from_current and os.path.exists(CURRENT_CDSAPIRC): - if verbose: - print("removing configuration %s" % CURRENT_CDSAPIRC) - os.remove(CURRENT_CDSAPIRC) - - if custom_path is not None and os.path.exists(custom_path): - if verbose: - print("removing configuration %s" % custom_path) - os.remove(custom_path) - - -def getAuthenticatedClient(verbose=False): - config = authenticate(verbose=verbose) +def cds_save_conf(config=None, verbose=True): + auth = AuthCDS(verbose=verbose) if config is None: - if verbose: - print("cannot obtain authentication") - return None + config = auth.authenticate() + if config is not None: + auth.config_to_file(config) - c = cdsapi.Client(url=config.get("url"), key=config.get("key")) - return c -# config from file ./.cdsapirc -# {'url': 'https://cds-beta.climate.copernicus.eu/api', 'key': 'db1f2085-6b8b-42e6-b832-625dfaf831a4'} +def cds_remove_env(verbose=True): + auth = AuthCDS(verbose=verbose) + auth.remove_env_credentials() + + +def cds_show_conf(verbose=True): + auth = AuthCDS(verbose=verbose) + auth.show_credentials() + + +def main(): + parser = argparse.ArgumentParser( + description="Authenticate with CDS and configure .cdsapirc") + parser.add_argument( + "-v", "--verbose", + action="store_true", + help="Enable verbose mode for detailed logging" + ) + parser.add_argument( + "-a", "--authenticate", + action="store_true", + help="Run authentication and set environment variables" + ) + parser.add_argument( + "-s", "--save-config", + metavar="PATH", + type=str, + help="Save configuration to a specific file path (default: ~/.cdsapirc)" + ) + parser.add_argument( + "-r", "--remove-config", + action="store_true", + help="Remove saved configurations from default paths" + ) + parser.add_argument( + "-q", "--query-credentials", + action="store_true", + help="Prompt user to re-enter credentials and optionally save them" + ) + parser.add_argument( + "-p", "--print-config", + action="store_true", + help="Print current configuration from environment and files" + ) + + args = parser.parse_args() + auth = AuthCDS(verbose=args.verbose) + + if args.authenticate: + client = auth.get_authenticated_client() + if client: + auth.log("Authenticated successfully") + else: + auth.log("Authentication failed") + + if args.save_config: + config = auth.config_from_env() or auth.config_from_file() + if config: + auth.config_to_file(config, config_path=args.save_config) + auth.log(f"Configuration saved to {args.save_config}") + else: + auth.log("No configuration found to save") + + if args.remove_config: + removed_paths = auth.remove_conf_credentials() + auth.log(f"Removed configurations from paths: {removed_paths}") + + if args.query_credentials: + auth.query_credentials() + auth.log("Credentials queried and optionally saved") + + if args.print_config: + env_config, file_config = auth.show_credentials() + auth.log(f"Environment Config: {env_config}") + auth.log(f"File Config: {file_config}")