core

logging.basicConfig(level=logging.INFO)

Caching and retry helpers

The below cache sets up a persistent per user disk cache (to ensure security) that can be used throughout api setup and configuration. retryer will try to run a function again up to 3 times with a random exponential backoff to handle upstream api exceptions.

Exported source
dirs = PlatformDirs("nbdev-squ")
cache = Cache(dirs.user_cache_dir)
retryer = Retrying(wait=wait_random_exponential(), stop=stop_after_attempt(3), reraise=True)

Login and secrets management

The squ library depends on authentication configured and ready to go. There are 2 paths to login used based on environment variables available. Once logged in it will attempt to populate cache["config"] with secrets from a configuration keyvault.


source

login

 login (refresh:bool=False)
Type Default Details
refresh bool False Force relogin
Exported source
def load_config(path = None # Path to read json config into cache from
               ):
    config = benedict()
    if path:
        config = benedict(path.read_text(), format="json")
    try:
        _cli(["config", "set", "extension.use_dynamic_install=yes_without_prompt"])
        config = benedict(_cli(["keyvault", "secret", "show", 
                                "--vault-name", cache["vault_name"], 
                                "--name", f"squconfig-{cache['tenant_id']}"]).value, format="json")
    except subprocess.CalledProcessError:
        cache.delete("logged_in") # clear the logged in state
    config.standardize()
    return config

def login(refresh: bool=False # Force relogin
         ):
    if "/" in os.environ.get("SQU_CONFIG", ""):
        cache["vault_name"], cache["tenant_id"] = os.environ["SQU_CONFIG"].split("/")
    tenant = cache.get("tenant_id")
    try:
        _cli(["account", "show"])
        if tenant:
            tenant_visible = len(_cli(["account", "list"]).search(tenant)) > 0
            assert tenant_visible > 0
        cache.set("logged_in", True, 60 * 60 * 3) # cache login state for 3 hrs
    except:
        cache.delete("logged_in")
    while not cache.get("logged_in"):
        logger.info("Cache doesn't look logged in, attempting login")
        try:
            # See if we can login with a managed identity in under 5 secs and see the configured tenant
            subprocess.run(["timeout", "5", sys.executable, "-m", "azure.cli", "login", "--identity", "-o", "none", "--allow-no-subscriptions"], check=True)
            if tenant:
                tenant_visible = len(_cli(["account", "list"]).search(tenant)) > 0
                assert tenant_visible > 0
        except:
            # If managed identity unavailable, fall back on a manual login
            if tenant:
                tenant_scope = ["--tenant", tenant]
            else:
                tenant_scope = []
            _cli(["login", *tenant_scope, "--use-device-code", "--allow-no-subscriptions", "-o", "none"], capture_output=False)
        # Finally, validate the login once more, and set the login state
        try:
            _cli(["account", "show"])
            cache.set("logged_in", True, 60 * 60 * 3) # cache login state for 3 hrs
        except subprocess.CalledProcessError:
            cache.delete("logged_in")
    logger.info("Cache state is logged in")
    if cache.get("vault_name"): # Always reload config on any login call
        logger.info("Loading config from keyvault")
        cache["config"] = load_config() # Config lasts forever, don't expire

source

load_config

 load_config (path=None)
Type Default Details
path NoneType None Path to read json config into cache from

How to login

The login function will be called automatically if the azcli function defined below is used and the cache has no login timestamp, otherwise it can be called manually as well to refresh the keyvault config items with load_config (this directly loads a keyvault secret into the cache based on the SQU_CONFIG environment variable).

login()
cache["config"].keys()

source

azcli

 azcli (basecmd:list[str])
Exported source
def azcli(basecmd: list[str]):
    if not cache.get("logged_in"):
        login()
    return _cli(basecmd)

Datalake Path

The datalake_path function below, returns a UPath pathlib style object pointing to a configured datalake location in the cache.

cache['config']['datalake_container']

source

datalake_path

 datalake_path (expiry_days:int=3, permissions:str='racwdlt')
Type Default Details
expiry_days int 3 Number of days until the SAS token expires
permissions str racwdlt Permissions to grant on the SAS token
Exported source
@memoize_stampede(cache, expire=60 * 60 * 24)
def datalake_path_safe(expiry_days, permissions):
    if not cache.get("logged_in"): # Have to login to grab keyvault config
        login()
    expiry = pandas.Timestamp("now") + pandas.Timedelta(days=expiry_days)
    account = cache["config"]["datalake_account"].split(".")[0] # Grab the account name, not the full FQDN
    container = cache['config']['datalake_container']
    sas = azcli(["storage", "container", "generate-sas", "--auth-mode", "login", "--as-user",
                 "--account-name", account, "--name", container, "--permissions", permissions, "--expiry", str(expiry.date())])
    return (container, account, sas)

def datalake_path(expiry_days: int=3, # Number of days until the SAS token expires
                  permissions: str="racwdlt" # Permissions to grant on the SAS token
                    ):
    container, account, sas = datalake_path_safe(expiry_days, permissions)
    return UPath(f"az://{container}", account_name=account, sas_token=sas)

source

datalake_path_safe

 datalake_path_safe (expiry_days, permissions)
path = datalake_path()
print("\n".join([str(p) for p in path.ls()]))