=logging.INFO) logging.basicConfig(level
core
Caching and retry helpers
The below cache
sets up a persistent per user disk cache (to ensure security) that can be used throughout api setup and configuration. retryer
will try to run a function again up to 3 times with a random exponential backoff to handle upstream api exceptions.
Exported source
= PlatformDirs("nbdev-squ")
dirs = Cache(dirs.user_cache_dir)
cache = Retrying(wait=wait_random_exponential(), stop=stop_after_attempt(3), reraise=True) retryer
Login and secrets management
The squ library depends on authentication configured and ready to go. There are 2 paths to login used based on environment variables available. Once logged in it will attempt to populate cache["config"]
with secrets from a configuration keyvault.
login
login (refresh:bool=False)
Type | Default | Details | |
---|---|---|---|
refresh | bool | False | Force relogin |
Exported source
def load_config(path = None # Path to read json config into cache from
):= benedict()
config if path:
= benedict(path.read_text(), format="json")
config try:
"config", "set", "extension.use_dynamic_install=yes_without_prompt"])
_cli([= benedict(_cli(["keyvault", "secret", "show",
config "--vault-name", cache["vault_name"],
"--name", f"squconfig-{cache['tenant_id']}"]).value, format="json")
except subprocess.CalledProcessError:
"logged_in") # clear the logged in state
cache.delete(
config.standardize()return config
def login(refresh: bool=False # Force relogin
):if "/" in os.environ.get("SQU_CONFIG", ""):
"vault_name"], cache["tenant_id"] = os.environ["SQU_CONFIG"].split("/")
cache[= cache.get("tenant_id")
tenant try:
"account", "show"])
_cli([if tenant:
= len(_cli(["account", "list"]).search(tenant)) > 0
tenant_visible assert tenant_visible > 0
set("logged_in", True, 60 * 60 * 3) # cache login state for 3 hrs
cache.except:
"logged_in")
cache.delete(while not cache.get("logged_in"):
"Cache doesn't look logged in, attempting login")
logger.info(try:
# See if we can login with a managed identity in under 5 secs and see the configured tenant
"timeout", "5", sys.executable, "-m", "azure.cli", "login", "--identity", "-o", "none", "--allow-no-subscriptions"], check=True)
subprocess.run([if tenant:
= len(_cli(["account", "list"]).search(tenant)) > 0
tenant_visible assert tenant_visible > 0
except:
# If managed identity unavailable, fall back on a manual login
if tenant:
= ["--tenant", tenant]
tenant_scope else:
= []
tenant_scope "login", *tenant_scope, "--use-device-code", "--allow-no-subscriptions", "-o", "none"], capture_output=False)
_cli([# Finally, validate the login once more, and set the login state
try:
"account", "show"])
_cli([set("logged_in", True, 60 * 60 * 3) # cache login state for 3 hrs
cache.except subprocess.CalledProcessError:
"logged_in")
cache.delete("Cache state is logged in")
logger.info(if cache.get("vault_name"): # Always reload config on any login call
"Loading config from keyvault")
logger.info("config"] = load_config() # Config lasts forever, don't expire cache[
load_config
load_config (path=None)
Type | Default | Details | |
---|---|---|---|
path | NoneType | None | Path to read json config into cache from |
How to login
The login function will be called automatically if the azcli
function defined below is used and the cache has no login timestamp, otherwise it can be called manually as well to refresh the keyvault config items with load_config
(this directly loads a keyvault secret into the cache based on the SQU_CONFIG environment variable).
login()"config"].keys() cache[
azcli
azcli (basecmd:list[str])
Exported source
def azcli(basecmd: list[str]):
if not cache.get("logged_in"):
login()return _cli(basecmd)
Datalake Path
The datalake_path
function below, returns a UPath
pathlib style object pointing to a configured datalake location in the cache
.
'config']['datalake_container'] cache[
datalake_path
datalake_path (expiry_days:int=3, permissions:str='racwdlt')
Type | Default | Details | |
---|---|---|---|
expiry_days | int | 3 | Number of days until the SAS token expires |
permissions | str | racwdlt | Permissions to grant on the SAS token |
Exported source
@memoize_stampede(cache, expire=60 * 60 * 24)
def datalake_path_safe(expiry_days, permissions):
if not cache.get("logged_in"): # Have to login to grab keyvault config
login()= pandas.Timestamp("now") + pandas.Timedelta(days=expiry_days)
expiry = cache["config"]["datalake_account"].split(".")[0] # Grab the account name, not the full FQDN
account = cache['config']['datalake_container']
container = azcli(["storage", "container", "generate-sas", "--auth-mode", "login", "--as-user",
sas "--account-name", account, "--name", container, "--permissions", permissions, "--expiry", str(expiry.date())])
return (container, account, sas)
def datalake_path(expiry_days: int=3, # Number of days until the SAS token expires
str="racwdlt" # Permissions to grant on the SAS token
permissions:
):= datalake_path_safe(expiry_days, permissions)
container, account, sas return UPath(f"az://{container}", account_name=account, sas_token=sas)
datalake_path_safe
datalake_path_safe (expiry_days, permissions)
= datalake_path()
path print("\n".join([str(p) for p in path.ls()]))