api

logging.basicConfig(level=logging.INFO)

API Clients

Returns preconfigured clients for various services, cached for performance


source

Clients

 Clients ()

Clients for various services, cached for performance

Exported source
class Clients:
    """
    Clients for various services, cached for performance
    """
    @cached_property
    def config(self):
        login()
        return cache.get("config", load_config())

    @cached_property
    def runzero(self):
        """
        Returns a runzero client
        """
        return httpx_cache.Client(base_url="https://console.rumble.run/api/v1.0", headers={"Authorization": f"Bearer {self.config.runzero_apitoken}"})

    @cached_property
    def abuseipdb(self):
        """
        Returns an abuseipdb client
        """
        from abuseipdb_wrapper import AbuseIPDB
        return AbuseIPDB(api_key=self.config.abuseipdb_api_key)

    @cached_property
    def jira(self):
        """
        Returns a jira client
        """
        return Jira(url=self.config.jira_url, username=self.config.jira_username, password=self.config.jira_password)

    @cached_property
    def tio(self):
        """
        Returns a TenableIO client
        """
        return TenableIO(self.config.tenable_access_key, self.config.tenable_secret_key)


clients = Clients()
# RunZero
response = clients.runzero.get("/export/org/assets.csv", params={"search": "has_public:t AND alive:t AND (protocol:rdp OR protocol:vnc OR protocol:teamviewer OR protocol:telnet OR protocol:ftp)"})
pandas.read_csv(io.StringIO(response.text)).head(10)
# Jira
pandas.json_normalize(clients.jira.jql("updated > -1d")["issues"]).head(10)
# AbuseIPDB
clients.abuseipdb.check_ip("1.1.1.1")
# TenableIO
pandas.DataFrame(clients.tio.scans.list()).head(10)

List Workspaces

The list_workspaces function retreives a list of workspaces from blob storage and returns it in various formats


source

list_workspaces

 list_workspaces (fmt:str='df', agency:str='ALL')
Type Default Details
fmt str df df, csv, json, list
agency str ALL Agency alias or ALL
Exported source
@memoize_stampede(cache, expire=60 * 60 * 3) # cache for 3 hours
def list_workspaces_safe(fmt: str = "df", # df, csv, json, list
                    agency: str = "ALL"): # Agency alias or ALL
    path = datalake_path()
    df = pandas.read_csv((path / "notebooks/lists/SentinelWorkspaces.csv").open())
    df = df.join(pandas.read_csv((path / "notebooks/lists/SecOps Groups.csv").open()).set_index("Alias"), on="SecOps Group", rsuffix="_secops")
    df = df.rename(columns={"SecOps Group": "alias", "Domains and IPs": "domains"})
    df = df.dropna(subset=["customerId"]).sort_values(by="alias").convert_dtypes().reset_index()
    persisted = f"{dirs.user_cache_dir}/list_workspaces.parquet"
    df.to_parquet(persisted)
    return persisted

def list_workspaces(fmt: str = "df", # df, csv, json, list
                    agency: str = "ALL"): # Agency alias or ALL
    df = pandas.read_parquet(list_workspaces_safe(fmt, agency))
    if agency != "ALL":
        df = df[df["alias"] == agency]
    if fmt == "df":
        return df
    elif fmt == "csv":
        return df.to_csv()
    elif fmt == "json":
        return df.fillna("").to_dict("records")
    elif fmt == "list":
        return list(df["customerId"].unique())
    else:
        raise ValueError("Invalid format")

source

list_workspaces_safe

 list_workspaces_safe (fmt:str='df', agency:str='ALL')
Type Default Details
fmt str df df, csv, json, list
agency str ALL Agency alias or ALL

You can use it do lookup an agencies alias based on its customerId (also known as TenantId, Log Analytics WorkspaceId)

workspaces = list_workspaces()
workspaces.query(f'customerId == "{workspaces["customerId"][0]}"')["alias"].str.cat()

Log Analytics Query

The below function makes it easy to query all workspaces with sentinel installed using log analytics.


source

query_all

 query_all (query, fmt='df', timespan=Timedelta('14 days 00:00:00'))
Exported source
def list_subscriptions():
    return pandas.DataFrame(azcli(["account", "list"]))["id"].unique()

@memoize_stampede(cache, expire=60 * 60 * 3) # cache for 3 hours
def list_securityinsights_safe():
    return azcli([
        "graph", "query", "--first", "1000", "-q", 
        """
        resources
        | where type =~ 'microsoft.operationsmanagement/solutions'
        | where name startswith 'SecurityInsights'
        | project wlid = tolower(tostring(properties.workspaceResourceId))
        | join kind=leftouter (
            resources | where type =~ 'microsoft.operationalinsights/workspaces' | extend wlid = tolower(id))
            on wlid
        | extend customerId = properties.customerId
        """
    ])["data"]

def list_securityinsights():
    return pandas.DataFrame(list_securityinsights_safe())

def chunks(items, size):
    # Yield successive `size` chunks from `items`
    for i in range(0, len(items), size):
        yield items[i:i + size]

def loganalytics_query(queries: list[str], timespan=pandas.Timedelta("14d"), batch_size=190, batch_delay=32, sentinel_workspaces = None):
    """
    Run queries across all workspaces, in batches of `batch_size` with a minimum delay of `batch_delay` between batches.
    Returns a dictionary of queries and results
    """
    client = LogsQueryClient(AzureCliCredential())
    workspaces = list_workspaces(fmt="df")
    if sentinel_workspaces is None:
        sentinel_workspaces = list_securityinsights()
    requests, results = [], []
    for query in queries:
        for workspace_id in sentinel_workspaces["customerId"]:
            requests.append(LogsBatchQuery(workspace_id=workspace_id, query=query, timespan=timespan))
    querytime = pandas.Timestamp("now")
    logger.info(f"Executing {len(requests)} queries at {querytime}")
    for request_batch in chunks(requests, batch_size):
        while cache.get("loganalytics_query_running"):
            time.sleep(1)
        batch_start = pandas.Timestamp("now")
        cache.set("loganalytics_query_running", True, batch_delay)
        results += client.query_batch(request_batch)
        duration = pandas.Timestamp("now") - batch_start
        logger.info(f"Completed {len(results)} (+{len(request_batch)}) in {duration}")
    dfs = {}
    for request, result in zip(requests, results):
        if result.status == LogsQueryStatus.PARTIAL:
            tables = result.partial_data
            tables = [pandas.DataFrame(table.rows, columns=table.columns) for table in tables]
        elif result.status == LogsQueryStatus.SUCCESS:
            tables = result.tables
            tables = [pandas.DataFrame(table.rows, columns=table.columns) for table in tables]
        else:
            tables = [pandas.DataFrame([result.__dict__])]
        df = pandas.concat(tables).dropna(axis=1, how='all') # prune empty columns
        df["TenantId"] = request.workspace
        alias = workspaces.query(f'customerId == "{request.workspace}"')["alias"].str.cat()
        if alias == '':
            alias = sentinel_workspaces.query(f'customerId == "{request.workspace}"')["name"].str.cat()
        df["_alias"] = alias
        query = request.body["query"]
        if query in dfs:
            dfs[query].append(df)
        else:
            dfs[query] = [df]
    return {query: pandas.concat(results, ignore_index=True).convert_dtypes() for query, results in dfs.items()}

def query_all(query, fmt="df", timespan=pandas.Timedelta("14d")):
    try:
        # Check query is not a plain string and is iterable
        assert not isinstance(query, str)
        iter(query)
    except (AssertionError, TypeError):
        # if it is a plain string or it's not iterable, convert into a list of queries
        query = [query]
    df = pandas.concat(loganalytics_query(query, timespan).values())
    if fmt == "df":
        return df
    elif fmt == "csv":
        return df.to_csv()
    elif fmt == "json":
        return df.fillna("").to_dict("records")
    else:
        raise ValueError("Invalid format")

source

loganalytics_query

 loganalytics_query (queries:list[str], timespan=Timedelta('14 days
                     00:00:00'), batch_size=190, batch_delay=32,
                     sentinel_workspaces=None)

Run queries across all workspaces, in batches of batch_size with a minimum delay of batch_delay between batches. Returns a dictionary of queries and results


source

chunks

 chunks (items, size)

source

list_securityinsights

 list_securityinsights ()

source

list_securityinsights_safe

 list_securityinsights_safe ()

source

list_subscriptions

 list_subscriptions ()
query_all(["SecurityAlert"]).groupby("_alias")[["Tactics", "TenantId"]].nunique().sort_values(by="Tactics", ascending=False).head()

Threat hunting helper

Scan across all databases and tables for columns with a given predicate


source

hunt

 hunt (indicators, expression='has', columns=['AADEmail', 'AccountName',
       'AccountUPN', 'AccountUpn', 'Account', 'CompromisedEntity',
       'DestinationUserName', 'Computer', 'DisplayName',
       'EmailSenderAddress', 'FileName', 'FilePath', 'FolderPath',
       'FullyQualifiedSubjectUserName', 'InitiatingProcessAccountUpn',
       'MailboxOwnerUPN', 'Name', 'NewProcessName', 'Owner',
       'ParentProcessName', 'Process', 'CommandLine',
       'ProcessCommandLine', 'RecipientEmailAddress', 'RequesterUpn',
       'SenderMailFromAddress', 'SourceIdentity', 'SourceUserName',
       'SubjectUserName', 'TargetUserName', 'TargetUser', 'Upn',
       'UserName', 'userName', 'UserPrincipalName', 'Caller',
       'DestinationUserID', 'SourceUserID', 'UserId', 'CallerIpAddress',
       'DestinationIP', 'DstIpAddr', 'EmailSourceIpAddress',
       'IPAddresses', 'IPAddress', 'IpAddress', 'NetworkDestinationIP',
       'NetworkIP', 'NetworkSourceIP', 'RemoteIP', 'SourceIP',
       'SrcIpAddr', 'DomainName', 'FileOriginUrl', 'FQDN', 'RemoteUrl',
       'Url', 'MD5', 'SHA1', 'SHA256', 'FileHashValue', 'FileHash',
       'InitiatingProcessSHA256'], workspaces=None, timespans=['1d',
       '14d', '90d', '700d'], take=5000)
Exported source
columns_of_interest = benedict({
    "name": ['AADEmail', 'AccountName', 'AccountUPN', 'AccountUpn', 'Account', 'CompromisedEntity', 'DestinationUserName', 
             "Computer", "DisplayName", "EmailSenderAddress", "FileName", 'FilePath', "FolderPath", 'FullyQualifiedSubjectUserName', 'InitiatingProcessAccountUpn',
            'MailboxOwnerUPN', 'Name', 'NewProcessName', 'Owner', 'ParentProcessName', 'Process', 'CommandLine', 'ProcessCommandLine', 'RecipientEmailAddress',
            'RequesterUpn', 'SenderMailFromAddress', 'SourceIdentity', 'SourceUserName', 'SubjectUserName', 'TargetUserName', 'TargetUser', 'Upn',
            'UserName', 'userName', 'UserPrincipalName'],
    "guid": ['Caller', "DestinationUserID", 'SourceUserID', 'UserId'],
    "ip": ['CallerIpAddress', 'DestinationIP', 'DstIpAddr', 'EmailSourceIpAddress', 'IPAddresses', 'IPAddress', 'IpAddress',
           'NetworkDestinationIP', 'NetworkIP', 'NetworkSourceIP', 'RemoteIP', 'SourceIP', 'SrcIpAddr'],
    "url": ["DomainName", 'FileOriginUrl', 'FQDN', 'RemoteUrl', 'Url'],
    "hash": ["MD5", "SHA1", "SHA256", 'FileHashValue', 'FileHash', 'InitiatingProcessSHA256']
})

columns = [column for area in columns_of_interest.values() for column in area]

def finalise_query(query, take):
    return f"{query} | take {take} | extend placeholder_=dynamic({{'':null}}) | evaluate bag_unpack(column_ifexists('pack_', placeholder_))"

def hunt(indicators, expression="has", columns=columns, workspaces=None, timespans=["1d", "14d", "90d", "700d"], take=5000):
    queries = []
    if workspaces is None:
        workspaces = list_securityinsights()
    else:
        df = list_securityinsights()
        workspaces = df[df["customerId"].isin(workspaces)]
    querylogged = False
    if expression in ['has_any']:
        query = f"let indicators = dynamic({indicators}); "

        for count, column in enumerate(columns):
            if count == 0:
                query += f"find where {column} has_any (indicators)"
            else:
                query += f" or {column} has_any (indicators)"
        final_query = finalise_query(query, take)
        queries.append(final_query)
    else:
        for indicator in indicators:
            if expression not in ['has_all']:
                indicator = f"'{indicator}'" # wrap indicator in quotes unless expecting dynamic
            if not querylogged:
                logger.info(f"Test Query: find where {columns[0]} {expression} {indicator} | take {take}")
                querylogged = True
            for chunk in chunks([f"{column} {expression} {indicator}" for column in columns], 20):
                query = " or ".join(chunk)
                final_query = finalise_query(f"find where {query}", take)
                queries.append(final_query)
    for timespan in timespans:
        results = pandas.concat(loganalytics_query(queries, pandas.Timedelta(timespan), sentinel_workspaces = workspaces).values())
        if 'placeholder_' in results.columns:
            results = results.drop('placeholder_', axis=1)
        if results.empty:
            logger.info(f"No results in {timespan}, extending hunt")
            continue
        logger.info(f"Found {indicators} in {timespan}, returning")
        return results
    else:
        raise Exception("No results found!")

source

finalise_query

 finalise_query (query, take)
hunt(['("ntdsutil", "ifm")'], expression="has_all", columns=["InitiatingProcessCommandLine", "ProcessCommandLine", "CommandLine"], timespans=["90d"])

source

atlaskit_transformer

 atlaskit_transformer (inputtext, inputfmt='md', outputfmt='wiki',
                       runtime='node')
Exported source
def atlaskit_transformer(inputtext, inputfmt="md", outputfmt="wiki", runtime="node"):
    transformer = dirs.user_cache_path / f"atlaskit-transformer.bundle_v{version('nbdev_squ')}.js"
    if not transformer.exists():
        transformer.write_bytes(pkgutil.get_data("nbdev_squ", "atlaskit-transformer.bundle.js"))
    cmd = [runtime, str(transformer), inputfmt, outputfmt]
    logger.debug(" ".join(cmd))
    try:
        return run(cmd, input=inputtext, text=True, capture_output=True, check=True).stdout
    except CalledProcessError:
        run(cmd, input=inputtext, text=True, check=True)
print(atlaskit_transformer("""# Heading 1

- a bullet
[a link](https://github.com]
"""))

Sentinel Incident sync helpers

Overall process below will be to grab all the incidents from Sentinel, and save them into individual files per day based on their last updated time. A subsequent process can then load those files (e.g. ingest into ADX) and/or synchronise the updates into Jira.


source

security_alerts

 security_alerts (start=Timestamp('2024-07-22 02:47:42.503198+0000',
                  tz='UTC'), timedelta=Timedelta('1 days 00:00:00'))
Exported source
def security_incidents(start=pandas.Timestamp("now", tz="UTC") - pandas.Timedelta("1d"), timedelta=pandas.Timedelta("1d")):
    # Queries for security incidents from `start` time for `timedelta` and returns a dataframe
    # Sorts by TimeGenerated (TODO)
    query = "SecurityIncident | summarize arg_max(TimeGenerated, *) by IncidentNumber"
    return query_all(query, timespan=(start.to_pydatetime(), timedelta))

def security_alerts(start=pandas.Timestamp("now", tz="UTC") - pandas.Timedelta("1d"), timedelta=pandas.Timedelta("1d")):
    # Queries for security alerts from `start` time for `timedelta` and returns a dataframe
    # Sorts by TimeGenerated (TODO)
    query = "SecurityAlert | summarize arg_max(TimeGenerated, *) by SystemAlertId"
    return query_all(query, timespan=(start.to_pydatetime(), timedelta))

source

security_incidents

 security_incidents (start=Timestamp('2024-07-22 02:47:42.503030+0000',
                     tz='UTC'), timedelta=Timedelta('1 days 00:00:00'))
df = security_incidents(start=pandas.Timestamp("now", tz="UTC") - pandas.Timedelta("14d"), timedelta=pandas.Timedelta("14d"))

dbt-duckdb plugin

The below squ plugin makes querying kql in duckdb projects via the DuckDB user-defined function (UDF) interface much easier. This could be extended to other clients pretty easily, just have to make sure data is returned as a dataframe. To use it there are a few dbt project files that need to be configured:

DBT ./profiles.yml

See DBT Connection Profiles

default:
  outputs:
    dev:
      type: duckdb
      path: target/dev.duckdb
      plugins: 
        - module: nbdev_squ.api
          alias: squ

  target: dev

DBT ./models/squ/schema.yml

See DBT Add sources to your DAG for how to add ‘externally defined’ sources, this is using the code below based on the dbt-duckdb plugin architecture

version: 2

sources:
  - name: kql_source
    config:
      plugin: squ
    meta:
      kql_path: "models/squ/{name}.kql"
    tables:
    - name: T1547_001

models:
  - name: hunt
    config:
      materialized: table

Once the source is defined, dbt cli tools and other sql models can refer to it, the dbt-duckdb framework makes it available as a referencable view usable throughout the project:

DBT ./models/squ/hunt.sql

See DBT SQL models for how to write the select statement templates DBT organises into the DAG

select * from {{source('kql_source', 'T1547_001')}}

DBT cli usage

See About the dbt run command (can use --empty to validate before a full run)

cd dbt_example_project
dbt run # will build the whole dag including any materialisations like the hunt table above
dbt show --inline "select * from {{ source('kql_source', 'T1547_001') }}" # will use live source
dbt show --inline "select * from {{ ref('hunt') }}" # will use materialized table in db built by `dbt run`
dbt docs generate # will build documentation for the project

source

Plugin

 Plugin (name:str, plugin_config:Dict[str,Any])

BasePlugin is the base class for creating plugins. A plugin can be created from a module name, an optional configuration, and an alias. Each plugin contains a name and its configuration.

Exported source
class Plugin(BasePlugin):
    def initialize(self, config):
        login()

    def configure_cursor(self, cursor):
        pass

    def load(self, source_config: SourceConfig):
        if "kql_path" in source_config:
            kql_path = source_config["kql_path"]
            kql_path = kql_path.format(**source_config.as_dict())
            query = Path(kql_path).read_text()
            return query_all(query, timespan=pandas.Timedelta(source_config.get("timespan", "14d")))
            raise Exception("huh")
        elif "list_workspaces" in source_config: # untested
            return list_workspaces()
        elif "client_api" in source_config: # untested
            api_result = getattr(clients, source_config["client_api"])(**json.loads(source_config.get("kwargs", "{}")))
            if isinstance(api_result, pandas.DataFrame):
                return api_result
            else:
                return pandas.DataFrame(api_result)
        else:
            raise Exception("No valid config found for squ plugin (kql_path or api required)")

    def default_materialization(self):
        return "view"