=logging.INFO) logging.basicConfig(level
api
API Clients
Returns preconfigured clients for various services, cached for performance
Clients
Clients ()
Clients for various services, cached for performance
Exported source
class Clients:
"""
Clients for various services, cached for performance
"""
@cached_property
def config(self):
login()return cache.get("config", load_config())
@cached_property
def runzero(self):
"""
Returns a runzero client
"""
return httpx_cache.Client(base_url="https://console.rumble.run/api/v1.0", headers={"Authorization": f"Bearer {self.config.runzero_apitoken}"})
@cached_property
def abuseipdb(self):
"""
Returns an abuseipdb client
"""
from abuseipdb_wrapper import AbuseIPDB
return AbuseIPDB(api_key=self.config.abuseipdb_api_key)
@cached_property
def jira(self):
"""
Returns a jira client
"""
return Jira(url=self.config.jira_url, username=self.config.jira_username, password=self.config.jira_password)
@cached_property
def tio(self):
"""
Returns a TenableIO client
"""
return TenableIO(self.config.tenable_access_key, self.config.tenable_secret_key)
= Clients() clients
# RunZero
= clients.runzero.get("/export/org/assets.csv", params={"search": "has_public:t AND alive:t AND (protocol:rdp OR protocol:vnc OR protocol:teamviewer OR protocol:telnet OR protocol:ftp)"})
response 10) pandas.read_csv(io.StringIO(response.text)).head(
# Jira
"updated > -1d")["issues"]).head(10) pandas.json_normalize(clients.jira.jql(
# AbuseIPDB
"1.1.1.1") clients.abuseipdb.check_ip(
# TenableIO
list()).head(10) pandas.DataFrame(clients.tio.scans.
List Workspaces
The list_workspaces
function retreives a list of workspaces from blob storage and returns it in various formats
list_workspaces
list_workspaces (fmt:str='df', agency:str='ALL')
Type | Default | Details | |
---|---|---|---|
fmt | str | df | df, csv, json, list |
agency | str | ALL | Agency alias or ALL |
Exported source
@memoize_stampede(cache, expire=60 * 60 * 3) # cache for 3 hours
def list_workspaces_safe(fmt: str = "df", # df, csv, json, list
str = "ALL"): # Agency alias or ALL
agency: = datalake_path()
path = pandas.read_csv((path / "notebooks/lists/SentinelWorkspaces.csv").open())
df = df.join(pandas.read_csv((path / "notebooks/lists/SecOps Groups.csv").open()).set_index("Alias"), on="SecOps Group", rsuffix="_secops")
df = df.rename(columns={"SecOps Group": "alias", "Domains and IPs": "domains"})
df = df.dropna(subset=["customerId"]).sort_values(by="alias").convert_dtypes().reset_index()
df = f"{dirs.user_cache_dir}/list_workspaces.parquet"
persisted
df.to_parquet(persisted)return persisted
def list_workspaces(fmt: str = "df", # df, csv, json, list
str = "ALL"): # Agency alias or ALL
agency: = pandas.read_parquet(list_workspaces_safe(fmt, agency))
df if agency != "ALL":
= df[df["alias"] == agency]
df if fmt == "df":
return df
elif fmt == "csv":
return df.to_csv()
elif fmt == "json":
return df.fillna("").to_dict("records")
elif fmt == "list":
return list(df["customerId"].unique())
else:
raise ValueError("Invalid format")
list_workspaces_safe
list_workspaces_safe (fmt:str='df', agency:str='ALL')
Type | Default | Details | |
---|---|---|---|
fmt | str | df | df, csv, json, list |
agency | str | ALL | Agency alias or ALL |
You can use it do lookup an agencies alias based on its customerId
(also known as TenantId
, Log Analytics WorkspaceId
)
= list_workspaces()
workspaces f'customerId == "{workspaces["customerId"][0]}"')["alias"].str.cat() workspaces.query(
Log Analytics Query
The below function makes it easy to query all workspaces with sentinel installed using log analytics.
query_all
query_all (query, fmt='df', timespan=Timedelta('14 days 00:00:00'))
Exported source
def list_subscriptions():
return pandas.DataFrame(azcli(["account", "list"]))["id"].unique()
@memoize_stampede(cache, expire=60 * 60 * 3) # cache for 3 hours
def list_securityinsights_safe():
return azcli([
"graph", "query", "--first", "1000", "-q",
"""
resources
| where type =~ 'microsoft.operationsmanagement/solutions'
| where name startswith 'SecurityInsights'
| project wlid = tolower(tostring(properties.workspaceResourceId))
| join kind=leftouter (
resources | where type =~ 'microsoft.operationalinsights/workspaces' | extend wlid = tolower(id))
on wlid
| extend customerId = properties.customerId
"""
"data"]
])[
def list_securityinsights():
return pandas.DataFrame(list_securityinsights_safe())
def chunks(items, size):
# Yield successive `size` chunks from `items`
for i in range(0, len(items), size):
yield items[i:i + size]
def loganalytics_query(queries: list[str], timespan=pandas.Timedelta("14d"), batch_size=190, batch_delay=32, sentinel_workspaces = None):
"""
Run queries across all workspaces, in batches of `batch_size` with a minimum delay of `batch_delay` between batches.
Returns a dictionary of queries and results
"""
= LogsQueryClient(AzureCliCredential())
client = list_workspaces(fmt="df")
workspaces if sentinel_workspaces is None:
= list_securityinsights()
sentinel_workspaces = [], []
requests, results for query in queries:
for workspace_id in sentinel_workspaces["customerId"]:
=workspace_id, query=query, timespan=timespan))
requests.append(LogsBatchQuery(workspace_id= pandas.Timestamp("now")
querytime f"Executing {len(requests)} queries at {querytime}")
logger.info(for request_batch in chunks(requests, batch_size):
while cache.get("loganalytics_query_running"):
1)
time.sleep(= pandas.Timestamp("now")
batch_start set("loganalytics_query_running", True, batch_delay)
cache.+= client.query_batch(request_batch)
results = pandas.Timestamp("now") - batch_start
duration f"Completed {len(results)} (+{len(request_batch)}) in {duration}")
logger.info(= {}
dfs for request, result in zip(requests, results):
if result.status == LogsQueryStatus.PARTIAL:
= result.partial_data
tables = [pandas.DataFrame(table.rows, columns=table.columns) for table in tables]
tables elif result.status == LogsQueryStatus.SUCCESS:
= result.tables
tables = [pandas.DataFrame(table.rows, columns=table.columns) for table in tables]
tables else:
= [pandas.DataFrame([result.__dict__])]
tables = pandas.concat(tables).dropna(axis=1, how='all') # prune empty columns
df "TenantId"] = request.workspace
df[= workspaces.query(f'customerId == "{request.workspace}"')["alias"].str.cat()
alias if alias == '':
= sentinel_workspaces.query(f'customerId == "{request.workspace}"')["name"].str.cat()
alias "_alias"] = alias
df[= request.body["query"]
query if query in dfs:
dfs[query].append(df)else:
= [df]
dfs[query] return {query: pandas.concat(results, ignore_index=True).convert_dtypes() for query, results in dfs.items()}
def query_all(query, fmt="df", timespan=pandas.Timedelta("14d")):
try:
# Check query is not a plain string and is iterable
assert not isinstance(query, str)
iter(query)
except (AssertionError, TypeError):
# if it is a plain string or it's not iterable, convert into a list of queries
= [query]
query = pandas.concat(loganalytics_query(query, timespan).values())
df if fmt == "df":
return df
elif fmt == "csv":
return df.to_csv()
elif fmt == "json":
return df.fillna("").to_dict("records")
else:
raise ValueError("Invalid format")
loganalytics_query
loganalytics_query (queries:list[str], timespan=Timedelta('14 days 00:00:00'), batch_size=190, batch_delay=32, sentinel_workspaces=None)
Run queries across all workspaces, in batches of batch_size
with a minimum delay of batch_delay
between batches. Returns a dictionary of queries and results
chunks
chunks (items, size)
list_securityinsights
list_securityinsights ()
list_securityinsights_safe
list_securityinsights_safe ()
list_subscriptions
list_subscriptions ()
"SecurityAlert"]).groupby("_alias")[["Tactics", "TenantId"]].nunique().sort_values(by="Tactics", ascending=False).head() query_all([
Threat hunting helper
Scan across all databases and tables for columns with a given predicate
hunt
hunt (indicators, expression='has', columns=['AADEmail', 'AccountName', 'AccountUPN', 'AccountUpn', 'Account', 'CompromisedEntity', 'DestinationUserName', 'Computer', 'DisplayName', 'EmailSenderAddress', 'FileName', 'FilePath', 'FolderPath', 'FullyQualifiedSubjectUserName', 'InitiatingProcessAccountUpn', 'MailboxOwnerUPN', 'Name', 'NewProcessName', 'Owner', 'ParentProcessName', 'Process', 'CommandLine', 'ProcessCommandLine', 'RecipientEmailAddress', 'RequesterUpn', 'SenderMailFromAddress', 'SourceIdentity', 'SourceUserName', 'SubjectUserName', 'TargetUserName', 'TargetUser', 'Upn', 'UserName', 'userName', 'UserPrincipalName', 'Caller', 'DestinationUserID', 'SourceUserID', 'UserId', 'CallerIpAddress', 'DestinationIP', 'DstIpAddr', 'EmailSourceIpAddress', 'IPAddresses', 'IPAddress', 'IpAddress', 'NetworkDestinationIP', 'NetworkIP', 'NetworkSourceIP', 'RemoteIP', 'SourceIP', 'SrcIpAddr', 'DomainName', 'FileOriginUrl', 'FQDN', 'RemoteUrl', 'Url', 'MD5', 'SHA1', 'SHA256', 'FileHashValue', 'FileHash', 'InitiatingProcessSHA256'], workspaces=None, timespans=['1d', '14d', '90d', '700d'], take=5000)
Exported source
= benedict({
columns_of_interest "name": ['AADEmail', 'AccountName', 'AccountUPN', 'AccountUpn', 'Account', 'CompromisedEntity', 'DestinationUserName',
"Computer", "DisplayName", "EmailSenderAddress", "FileName", 'FilePath', "FolderPath", 'FullyQualifiedSubjectUserName', 'InitiatingProcessAccountUpn',
'MailboxOwnerUPN', 'Name', 'NewProcessName', 'Owner', 'ParentProcessName', 'Process', 'CommandLine', 'ProcessCommandLine', 'RecipientEmailAddress',
'RequesterUpn', 'SenderMailFromAddress', 'SourceIdentity', 'SourceUserName', 'SubjectUserName', 'TargetUserName', 'TargetUser', 'Upn',
'UserName', 'userName', 'UserPrincipalName'],
"guid": ['Caller', "DestinationUserID", 'SourceUserID', 'UserId'],
"ip": ['CallerIpAddress', 'DestinationIP', 'DstIpAddr', 'EmailSourceIpAddress', 'IPAddresses', 'IPAddress', 'IpAddress',
'NetworkDestinationIP', 'NetworkIP', 'NetworkSourceIP', 'RemoteIP', 'SourceIP', 'SrcIpAddr'],
"url": ["DomainName", 'FileOriginUrl', 'FQDN', 'RemoteUrl', 'Url'],
"hash": ["MD5", "SHA1", "SHA256", 'FileHashValue', 'FileHash', 'InitiatingProcessSHA256']
})
= [column for area in columns_of_interest.values() for column in area]
columns
def finalise_query(query, take):
return f"{query} | take {take} | extend placeholder_=dynamic({{'':null}}) | evaluate bag_unpack(column_ifexists('pack_', placeholder_))"
def hunt(indicators, expression="has", columns=columns, workspaces=None, timespans=["1d", "14d", "90d", "700d"], take=5000):
= []
queries if workspaces is None:
= list_securityinsights()
workspaces else:
= list_securityinsights()
df = df[df["customerId"].isin(workspaces)]
workspaces = False
querylogged if expression in ['has_any']:
= f"let indicators = dynamic({indicators}); "
query
for count, column in enumerate(columns):
if count == 0:
+= f"find where {column} has_any (indicators)"
query else:
+= f" or {column} has_any (indicators)"
query = finalise_query(query, take)
final_query
queries.append(final_query)else:
for indicator in indicators:
if expression not in ['has_all']:
= f"'{indicator}'" # wrap indicator in quotes unless expecting dynamic
indicator if not querylogged:
f"Test Query: find where {columns[0]} {expression} {indicator} | take {take}")
logger.info(= True
querylogged for chunk in chunks([f"{column} {expression} {indicator}" for column in columns], 20):
= " or ".join(chunk)
query = finalise_query(f"find where {query}", take)
final_query
queries.append(final_query)for timespan in timespans:
= pandas.concat(loganalytics_query(queries, pandas.Timedelta(timespan), sentinel_workspaces = workspaces).values())
results if 'placeholder_' in results.columns:
= results.drop('placeholder_', axis=1)
results if results.empty:
f"No results in {timespan}, extending hunt")
logger.info(continue
f"Found {indicators} in {timespan}, returning")
logger.info(return results
else:
raise Exception("No results found!")
finalise_query
finalise_query (query, take)
'("ntdsutil", "ifm")'], expression="has_all", columns=["InitiatingProcessCommandLine", "ProcessCommandLine", "CommandLine"], timespans=["90d"]) hunt([
atlaskit_transformer
atlaskit_transformer (inputtext, inputfmt='md', outputfmt='wiki', runtime='node')
Exported source
def atlaskit_transformer(inputtext, inputfmt="md", outputfmt="wiki", runtime="node"):
= dirs.user_cache_path / f"atlaskit-transformer.bundle_v{version('nbdev_squ')}.js"
transformer if not transformer.exists():
"nbdev_squ", "atlaskit-transformer.bundle.js"))
transformer.write_bytes(pkgutil.get_data(= [runtime, str(transformer), inputfmt, outputfmt]
cmd " ".join(cmd))
logger.debug(try:
return run(cmd, input=inputtext, text=True, capture_output=True, check=True).stdout
except CalledProcessError:
input=inputtext, text=True, check=True) run(cmd,
print(atlaskit_transformer("""# Heading 1
- a bullet
[a link](https://github.com]
"""))
Sentinel Incident sync helpers
Overall process below will be to grab all the incidents from Sentinel, and save them into individual files per day based on their last updated time. A subsequent process can then load those files (e.g. ingest into ADX) and/or synchronise the updates into Jira.
security_alerts
security_alerts (start=Timestamp('2024-07-22 02:47:42.503198+0000', tz='UTC'), timedelta=Timedelta('1 days 00:00:00'))
Exported source
def security_incidents(start=pandas.Timestamp("now", tz="UTC") - pandas.Timedelta("1d"), timedelta=pandas.Timedelta("1d")):
# Queries for security incidents from `start` time for `timedelta` and returns a dataframe
# Sorts by TimeGenerated (TODO)
= "SecurityIncident | summarize arg_max(TimeGenerated, *) by IncidentNumber"
query return query_all(query, timespan=(start.to_pydatetime(), timedelta))
def security_alerts(start=pandas.Timestamp("now", tz="UTC") - pandas.Timedelta("1d"), timedelta=pandas.Timedelta("1d")):
# Queries for security alerts from `start` time for `timedelta` and returns a dataframe
# Sorts by TimeGenerated (TODO)
= "SecurityAlert | summarize arg_max(TimeGenerated, *) by SystemAlertId"
query return query_all(query, timespan=(start.to_pydatetime(), timedelta))
security_incidents
security_incidents (start=Timestamp('2024-07-22 02:47:42.503030+0000', tz='UTC'), timedelta=Timedelta('1 days 00:00:00'))
= security_incidents(start=pandas.Timestamp("now", tz="UTC") - pandas.Timedelta("14d"), timedelta=pandas.Timedelta("14d")) df
dbt-duckdb plugin
The below squ plugin makes querying kql in duckdb projects via the DuckDB user-defined function (UDF) interface much easier. This could be extended to other clients pretty easily, just have to make sure data is returned as a dataframe. To use it there are a few dbt project files that need to be configured:
DBT ./profiles.yml
default:
outputs:
dev:
type: duckdb
path: target/dev.duckdb
plugins:
- module: nbdev_squ.api
alias: squ
target: dev
DBT ./models/squ/schema.yml
See DBT Add sources to your DAG for how to add ‘externally defined’ sources, this is using the code below based on the dbt-duckdb plugin architecture
version: 2
sources:
- name: kql_source
config:
plugin: squ
meta:
kql_path: "models/squ/{name}.kql"
tables:
- name: T1547_001
models:
- name: hunt
config:
materialized: table
Once the source is defined, dbt cli tools and other sql models can refer to it, the dbt-duckdb framework makes it available as a referencable view usable throughout the project:
DBT ./models/squ/hunt.sql
See DBT SQL models for how to write the select statement templates DBT organises into the DAG
select * from {{source('kql_source', 'T1547_001')}}
DBT cli usage
See About the dbt run command (can use --empty
to validate before a full run)
cd dbt_example_project
dbt run # will build the whole dag including any materialisations like the hunt table above
dbt show --inline "select * from {{ source('kql_source', 'T1547_001') }}" # will use live source
dbt show --inline "select * from {{ ref('hunt') }}" # will use materialized table in db built by `dbt run`
dbt docs generate # will build documentation for the project
Plugin
Plugin (name:str, plugin_config:Dict[str,Any])
BasePlugin is the base class for creating plugins. A plugin can be created from a module name, an optional configuration, and an alias. Each plugin contains a name and its configuration.
Exported source
class Plugin(BasePlugin):
def initialize(self, config):
login()
def configure_cursor(self, cursor):
pass
def load(self, source_config: SourceConfig):
if "kql_path" in source_config:
= source_config["kql_path"]
kql_path = kql_path.format(**source_config.as_dict())
kql_path = Path(kql_path).read_text()
query return query_all(query, timespan=pandas.Timedelta(source_config.get("timespan", "14d")))
raise Exception("huh")
elif "list_workspaces" in source_config: # untested
return list_workspaces()
elif "client_api" in source_config: # untested
= getattr(clients, source_config["client_api"])(**json.loads(source_config.get("kwargs", "{}")))
api_result if isinstance(api_result, pandas.DataFrame):
return api_result
else:
return pandas.DataFrame(api_result)
else:
raise Exception("No valid config found for squ plugin (kql_path or api required)")
def default_materialization(self):
return "view"