logging.basicConfig(level=logging.INFO)api
API Clients
Returns preconfigured clients for various services, cached for performance
Clients
Clients ()
Clients for various services, cached for performance
Exported source
class Clients:
"""
Clients for various services, cached for performance
"""
@cached_property
def config(self):
login()
return cache.get("config", load_config())
@cached_property
def runzero(self):
"""
Returns a runzero client
"""
return httpx.Client(base_url="https://console.rumble.run/api/v1.0", headers={"Authorization": f"Bearer {self.config.runzero_apitoken}"})
@cached_property
def abuseipdb(self):
"""
Returns an abuseipdb client
"""
from abuseipdb_wrapper import AbuseIPDB
return AbuseIPDB(api_key=self.config.abuseipdb_api_key)
@cached_property
def jira(self):
"""
Returns a jira client
"""
return Jira(url=self.config.jira_url, username=self.config.jira_username, password=self.config.jira_password)
@cached_property
def tio(self):
"""
Returns a TenableIO client
"""
return TenableIO(self.config.tenable_access_key, self.config.tenable_secret_key)
clients = Clients()# RunZero
response = clients.runzero.get("/export/org/assets.csv", params={"search": "has_public:t AND alive:t AND (protocol:rdp OR protocol:vnc OR protocol:teamviewer OR protocol:telnet OR protocol:ftp)"})
pandas.read_csv(io.StringIO(response.text)).head(10)# Jira
pandas.json_normalize(clients.jira.jql("updated > -1d")["issues"]).head(10)# AbuseIPDB
clients.abuseipdb.check_ip("1.1.1.1")# TenableIO
pandas.DataFrame(clients.tio.scans.list()).head(10)List Workspaces
The list_workspaces function retreives a list of workspaces from blob storage and returns it in various formats
list_workspaces
list_workspaces (fmt:str='df', agency:str='ALL')
| Type | Default | Details | |
|---|---|---|---|
| fmt | str | df | df, csv, json, list |
| agency | str | ALL | Agency alias or ALL |
Exported source
@memoize_stampede(cache, expire=60 * 60 * 3) # cache for 3 hours
def list_workspaces_safe(fmt: str = "df", # df, csv, json, list
agency: str = "ALL"): # Agency alias or ALL
path = datalake_path()
df = pandas.read_csv((path / "notebooks/lists/SentinelWorkspaces.csv").open())
df = df.join(pandas.read_csv((path / "notebooks/lists/SecOps Groups.csv").open()).set_index("Alias"), on="SecOps Group", rsuffix="_secops")
df = df.rename(columns={"SecOps Group": "alias", "Domains and IPs": "domains"})
df = df.dropna(subset=["customerId"]).sort_values(by="alias").convert_dtypes().reset_index()
persisted = f"{dirs.user_cache_dir}/list_workspaces.parquet"
df.to_parquet(persisted)
return persisted
def list_workspaces(fmt: str = "df", # df, csv, json, list
agency: str = "ALL"): # Agency alias or ALL
df = pandas.read_parquet(list_workspaces_safe(fmt, agency))
if agency != "ALL":
df = df[df["alias"] == agency]
if fmt == "df":
return df
elif fmt == "csv":
return df.to_csv()
elif fmt == "json":
return df.fillna("").to_dict("records")
elif fmt == "list":
return list(df["customerId"].unique())
else:
raise ValueError("Invalid format")list_workspaces_safe
list_workspaces_safe (fmt:str='df', agency:str='ALL')
| Type | Default | Details | |
|---|---|---|---|
| fmt | str | df | df, csv, json, list |
| agency | str | ALL | Agency alias or ALL |
You can use it do lookup an agencies alias based on its customerId (also known as TenantId, Log Analytics WorkspaceId)
workspaces = list_workspaces()
workspaces.query(f'customerId == "{workspaces["customerId"][0]}"')["alias"].str.cat()Log Analytics Query
The below function makes it easy to query all workspaces with sentinel installed using log analytics.
query_all
query_all (query, fmt='df', timespan=Timedelta('14 days 00:00:00'))
Exported source
def list_subscriptions():
return pandas.DataFrame(azcli(["account", "list"]))["id"].unique()
@memoize_stampede(cache, expire=60 * 60 * 3) # cache for 3 hours
def list_securityinsights_safe():
return azcli([
"graph", "query", "--first", "1000", "-q",
"""
resources
| where type =~ 'microsoft.operationsmanagement/solutions'
| where name startswith 'SecurityInsights'
| project wlid = tolower(tostring(properties.workspaceResourceId))
| join kind=leftouter (
resources | where type =~ 'microsoft.operationalinsights/workspaces' | extend wlid = tolower(id))
on wlid
| extend customerId = properties.customerId
"""
])["data"]
def list_securityinsights():
return pandas.DataFrame(list_securityinsights_safe())
def chunks(items, size):
# Yield successive `size` chunks from `items`
for i in range(0, len(items), size):
yield items[i:i + size]
def loganalytics_query(queries: list[str], timespan=pandas.Timedelta("14d"), batch_size=190, batch_delay=32, sentinel_workspaces = None):
"""
Run queries across all workspaces, in batches of `batch_size` with a minimum delay of `batch_delay` between batches.
Returns a dictionary of queries and results
"""
client = LogsQueryClient(AzureCliCredential())
workspaces = list_workspaces(fmt="df")
if sentinel_workspaces is None:
sentinel_workspaces = list_securityinsights()
requests, results = [], []
for query in queries:
for workspace_id in sentinel_workspaces["customerId"]:
requests.append(LogsBatchQuery(workspace_id=workspace_id, query=query, timespan=timespan))
querytime = pandas.Timestamp("now")
logger.info(f"Executing {len(requests)} queries at {querytime}")
for request_batch in chunks(requests, batch_size):
while cache.get("loganalytics_query_running"):
time.sleep(1)
batch_start = pandas.Timestamp("now")
cache.set("loganalytics_query_running", True, batch_delay)
results += client.query_batch(request_batch)
duration = pandas.Timestamp("now") - batch_start
logger.info(f"Completed {len(results)} (+{len(request_batch)}) in {duration}")
dfs = {}
for request, result in zip(requests, results):
if result.status == LogsQueryStatus.PARTIAL:
tables = result.partial_data
tables = [pandas.DataFrame(table.rows, columns=table.columns) for table in tables]
elif result.status == LogsQueryStatus.SUCCESS:
tables = result.tables
tables = [pandas.DataFrame(table.rows, columns=table.columns) for table in tables]
else:
tables = [pandas.DataFrame([result.__dict__])]
df = pandas.concat(tables).dropna(axis=1, how='all') # prune empty columns
df["TenantId"] = request.workspace
alias = workspaces.query(f'customerId == "{request.workspace}"')["alias"].str.cat()
if alias == '':
alias = sentinel_workspaces.query(f'customerId == "{request.workspace}"')["name"].str.cat()
df["_alias"] = alias
query = request.body["query"]
if query in dfs:
dfs[query].append(df)
else:
dfs[query] = [df]
return {query: pandas.concat(results, ignore_index=True).convert_dtypes() for query, results in dfs.items()}
def query_all(query, fmt="df", timespan=pandas.Timedelta("14d")):
try:
# Check query is not a plain string and is iterable
assert not isinstance(query, str)
iter(query)
except (AssertionError, TypeError):
# if it is a plain string or it's not iterable, convert into a list of queries
query = [query]
df = pandas.concat(loganalytics_query(query, timespan).values())
if fmt == "df":
return df
elif fmt == "csv":
return df.to_csv()
elif fmt == "json":
return df.fillna("").to_dict("records")
else:
raise ValueError("Invalid format")loganalytics_query
loganalytics_query (queries:list[str], timespan=Timedelta('14 days 00:00:00'), batch_size=190, batch_delay=32, sentinel_workspaces=None)
Run queries across all workspaces, in batches of batch_size with a minimum delay of batch_delay between batches. Returns a dictionary of queries and results
chunks
chunks (items, size)
list_securityinsights
list_securityinsights ()
list_securityinsights_safe
list_securityinsights_safe ()
list_subscriptions
list_subscriptions ()
query_all(["SecurityAlert"]).groupby("_alias")[["Tactics", "TenantId"]].nunique().sort_values(by="Tactics", ascending=False).head()Threat hunting helper
Scan across all databases and tables for columns with a given predicate
hunt
hunt (indicators, expression='has', columns=['AADEmail', 'AccountName', 'AccountUPN', 'AccountUpn', 'Account', 'CompromisedEntity', 'DestinationUserName', 'Computer', 'DisplayName', 'EmailSenderAddress', 'FileName', 'FilePath', 'FolderPath', 'FullyQualifiedSubjectUserName', 'InitiatingProcessAccountUpn', 'MailboxOwnerUPN', 'Name', 'NewProcessName', 'Owner', 'ParentProcessName', 'Process', 'CommandLine', 'ProcessCommandLine', 'RecipientEmailAddress', 'RequesterUpn', 'SenderMailFromAddress', 'SourceIdentity', 'SourceUserName', 'SubjectUserName', 'TargetUserName', 'TargetUser', 'Upn', 'UserName', 'userName', 'UserPrincipalName', 'Caller', 'DestinationUserID', 'SourceUserID', 'UserId', 'CallerIpAddress', 'DestinationIP', 'DstIpAddr', 'EmailSourceIpAddress', 'IPAddresses', 'IPAddress', 'IpAddress', 'NetworkDestinationIP', 'NetworkIP', 'NetworkSourceIP', 'RemoteIP', 'SourceIP', 'SrcIpAddr', 'DomainName', 'FileOriginUrl', 'FQDN', 'RemoteUrl', 'Url', 'MD5', 'SHA1', 'SHA256', 'FileHashValue', 'FileHash', 'InitiatingProcessSHA256'], workspaces=None, timespans=['1d', '14d', '90d', '700d'], take=5000)
Exported source
columns_of_interest = benedict({
"name": ['AADEmail', 'AccountName', 'AccountUPN', 'AccountUpn', 'Account', 'CompromisedEntity', 'DestinationUserName',
"Computer", "DisplayName", "EmailSenderAddress", "FileName", 'FilePath', "FolderPath", 'FullyQualifiedSubjectUserName', 'InitiatingProcessAccountUpn',
'MailboxOwnerUPN', 'Name', 'NewProcessName', 'Owner', 'ParentProcessName', 'Process', 'CommandLine', 'ProcessCommandLine', 'RecipientEmailAddress',
'RequesterUpn', 'SenderMailFromAddress', 'SourceIdentity', 'SourceUserName', 'SubjectUserName', 'TargetUserName', 'TargetUser', 'Upn',
'UserName', 'userName', 'UserPrincipalName'],
"guid": ['Caller', "DestinationUserID", 'SourceUserID', 'UserId'],
"ip": ['CallerIpAddress', 'DestinationIP', 'DstIpAddr', 'EmailSourceIpAddress', 'IPAddresses', 'IPAddress', 'IpAddress',
'NetworkDestinationIP', 'NetworkIP', 'NetworkSourceIP', 'RemoteIP', 'SourceIP', 'SrcIpAddr'],
"url": ["DomainName", 'FileOriginUrl', 'FQDN', 'RemoteUrl', 'Url'],
"hash": ["MD5", "SHA1", "SHA256", 'FileHashValue', 'FileHash', 'InitiatingProcessSHA256']
})
columns = [column for area in columns_of_interest.values() for column in area]
def finalise_query(query, take):
return f"{query} | take {take} | extend placeholder_=dynamic({{'':null}}) | evaluate bag_unpack(column_ifexists('pack_', placeholder_))"
def hunt(indicators, expression="has", columns=columns, workspaces=None, timespans=["1d", "14d", "90d", "700d"], take=5000):
queries = []
if workspaces is None:
workspaces = list_securityinsights()
else:
df = list_securityinsights()
workspaces = df[df["customerId"].isin(workspaces)]
querylogged = False
if expression in ['has_any']:
query = f"let indicators = dynamic({indicators}); "
for count, column in enumerate(columns):
if count == 0:
query += f"find where {column} has_any (indicators)"
else:
query += f" or {column} has_any (indicators)"
final_query = finalise_query(query, take)
queries.append(final_query)
else:
for indicator in indicators:
if expression not in ['has_all']:
indicator = f"'{indicator}'" # wrap indicator in quotes unless expecting dynamic
if not querylogged:
logger.info(f"Test Query: find where {columns[0]} {expression} {indicator} | take {take}")
querylogged = True
for chunk in chunks([f"{column} {expression} {indicator}" for column in columns], 20):
query = " or ".join(chunk)
final_query = finalise_query(f"find where {query}", take)
queries.append(final_query)
for timespan in timespans:
results = pandas.concat(loganalytics_query(queries, pandas.Timedelta(timespan), sentinel_workspaces = workspaces).values())
if 'placeholder_' in results.columns:
results = results.drop('placeholder_', axis=1)
if results.empty:
logger.info(f"No results in {timespan}, extending hunt")
continue
logger.info(f"Found {indicators} in {timespan}, returning")
return results
else:
raise Exception("No results found!")finalise_query
finalise_query (query, take)
hunt(['("ntdsutil", "ifm")'], expression="has_all", columns=["InitiatingProcessCommandLine", "ProcessCommandLine", "CommandLine"], timespans=["90d"])atlaskit_transformer
atlaskit_transformer (inputtext, inputfmt='md', outputfmt='wiki', runtime='node')
Exported source
def atlaskit_transformer(inputtext, inputfmt="md", outputfmt="wiki", runtime="node"):
transformer = dirs.user_cache_path / f"atlaskit-transformer.bundle_v{version('nbdev_squ')}.js"
if not transformer.exists():
transformer.write_bytes(pkgutil.get_data("nbdev_squ", "atlaskit-transformer.bundle.js"))
cmd = [runtime, str(transformer), inputfmt, outputfmt]
logger.debug(" ".join(cmd))
try:
return run(cmd, input=inputtext, text=True, capture_output=True, check=True).stdout
except CalledProcessError:
run(cmd, input=inputtext, text=True, check=True)print(atlaskit_transformer("""# Heading 1
- a bullet
[a link](https://github.com]
"""))Sentinel Incident sync helpers
Overall process below will be to grab all the incidents from Sentinel, and save them into individual files per day based on their last updated time. A subsequent process can then load those files (e.g. ingest into ADX) and/or synchronise the updates into Jira.
security_alerts
security_alerts (start=Timestamp('2025-03-10 02:21:24.943800+0000', tz='UTC'), timedelta=Timedelta('1 days 00:00:00'))
Exported source
def security_incidents(start=pandas.Timestamp("now", tz="UTC") - pandas.Timedelta("1d"), timedelta=pandas.Timedelta("1d")):
# Queries for security incidents from `start` time for `timedelta` and returns a dataframe
# Sorts by TimeGenerated (TODO)
query = "SecurityIncident | summarize arg_max(TimeGenerated, *) by IncidentNumber"
return query_all(query, timespan=(start.to_pydatetime(), timedelta))
def security_alerts(start=pandas.Timestamp("now", tz="UTC") - pandas.Timedelta("1d"), timedelta=pandas.Timedelta("1d")):
# Queries for security alerts from `start` time for `timedelta` and returns a dataframe
# Sorts by TimeGenerated (TODO)
query = "SecurityAlert | summarize arg_max(TimeGenerated, *) by SystemAlertId"
return query_all(query, timespan=(start.to_pydatetime(), timedelta))security_incidents
security_incidents (start=Timestamp('2025-03-10 02:21:24.943611+0000', tz='UTC'), timedelta=Timedelta('1 days 00:00:00'))
df = security_incidents(start=pandas.Timestamp("now", tz="UTC") - pandas.Timedelta("14d"), timedelta=pandas.Timedelta("14d"))dbt-duckdb plugin
The below squ plugin makes querying kql in duckdb projects via the DuckDB user-defined function (UDF) interface much easier. This could be extended to other clients pretty easily, just have to make sure data is returned as a dataframe. To use it there are a few dbt project files that need to be configured:
DBT ./profiles.yml
default:
outputs:
dev:
type: duckdb
path: target/dev.duckdb
plugins:
- module: nbdev_squ.api
alias: squ
target: devDBT ./models/squ/schema.yml
See DBT Add sources to your DAG for how to add ‘externally defined’ sources, this is using the code below based on the dbt-duckdb plugin architecture
version: 2
sources:
- name: kql_source
config:
plugin: squ
meta:
kql_path: "models/squ/{name}.kql"
tables:
- name: T1547_001
models:
- name: hunt
config:
materialized: tableOnce the source is defined, dbt cli tools and other sql models can refer to it, the dbt-duckdb framework makes it available as a referencable view usable throughout the project:
DBT ./models/squ/hunt.sql
See DBT SQL models for how to write the select statement templates DBT organises into the DAG
select * from {{source('kql_source', 'T1547_001')}}DBT cli usage
See About the dbt run command (can use --empty to validate before a full run)
cd dbt_example_project
dbt run # will build the whole dag including any materialisations like the hunt table above
dbt show --inline "select * from {{ source('kql_source', 'T1547_001') }}" # will use live source
dbt show --inline "select * from {{ ref('hunt') }}" # will use materialized table in db built by `dbt run`
dbt docs generate # will build documentation for the projectPlugin
Plugin (name:str, plugin_config:Dict[str,Any], credentials:Optional[dbt.a dapters.duckdb.credentials.DuckDBCredentials]=None)
BasePlugin is the base class for creating plugins. A plugin can be created from a module name, an optional configuration, and an alias. Each plugin contains a name and its configuration.
Exported source
class Plugin(BasePlugin):
def initialize(self, config):
login()
def configure_cursor(self, cursor):
pass
def load(self, source_config: SourceConfig):
if "kql_path" in source_config:
kql_path = source_config["kql_path"]
kql_path = kql_path.format(**source_config.as_dict())
query = Path(kql_path).read_text()
return query_all(query, timespan=pandas.Timedelta(source_config.get("timespan", "14d")))
raise Exception("huh")
elif "list_workspaces" in source_config: # untested
return list_workspaces()
elif "client_api" in source_config: # untested
api_result = getattr(clients, source_config["client_api"])(**json.loads(source_config.get("kwargs", "{}")))
if isinstance(api_result, pandas.DataFrame):
return api_result
else:
return pandas.DataFrame(api_result)
else:
raise Exception("No valid config found for squ plugin (kql_path or api required)")
def default_materialization(self):
return "view"