Alert Bulk Export - v7 API

Forward Alerts to an S3 Bucket

The Data Forwarder is the recommended export method for reliable and guaranteed delivery of Carbon Black Cloud Alerts. This method works at scale to support any size customer or MSSP by writing jsonl zipped content to an S3 bucket. The Data Forwarder can be configured in the Carbon Black Cloud console under Settings > Data Forwarder or using the Data Forwarder API.

Exporting Alerts Continuously via the Alerts API

If the Data Forwarder doesn’t work for you then the following algorithm will allow you to fetch alerts with no duplicates using the Alerts API. You can run the logic on a cycle to continually fetch alerts or you could do a single time historical export to fetch a time range of alerts. For large historical exports greater than 30 days, repeat the following algorithm for each month by using a start and end for the first and last day of the month.

If you are repeating on a cycle, we recommend keeping a 60 second delay ( end = now - 60s) from current time to allow for Carbon Black Cloud’s asynchronous alerts processing to complete; or if you want to wait for updates to stop for a CB Analytic alerts, keep a delay of 15 minutes.

A cycle should be no shorter than 60s and should use a start and end time range for time_range, which searches on the field backend_timestamp.

Filtering with only time_range will ensure no duplicate alerts. This means that you will miss any updates to CB Analytics alerts, including changes to the primary event properties such as process, reputation, and severity. CB Analytics alerts can be updated for up to 15 minutes following the original backend_timestamp, after which time the alert is considered immutable.

If you wish to receive updates to alerts, then filter with a time_range value of one day and an additional backend_update_timestamp filter scoped for the range you want to query. The time_range filter is always applied and defaults to the last two weeks; additional timestamp fields in the criteria filter within the results that meet the time_range filter.

  1. Fetch Alerts for 5min window and set start to at least 60 seconds earlier than the current time to allow for Carbon Black Cloud backend to make all Alerts available to API

    curl -s -H "X-Auth-Token: $ACCESS_TOKEN" -H "Content-Type: application/json" https://$CBD/api/alerts/v7/orgs/$ORG/alerts/_search \
    -d '{
        "time_range": {
            "start": "2023-08-28T05:00:00.000Z",
            "end": "2023-08-28T05:04:59.999Z"
        },
        "criteria": {},
        "rows": 10000,
        "start": 1,
        "sort": [
            {
                "field": "backend_timestamp",
                "order": "ASC"
            }
        ]
    }'
    
  2. Verify if num_found <= 10000 in API response, if less than or equal to 10k results wait for next 5min window and repeat step 1

  3. If there are more than 10k results then you will need additional API requests to fetch all Alerts for the 5min window.

    a. Get the backend_timestamp timestamp from the last record of the results, you will use this as the start for the time range

    Example Response:

    {
      "num_found": "23568"
      "results": [
        ...,
        {
          ...
          "backend_timestamp": "2023-08-28T05:02:29.200Z"
          ...
        }
      ]
    }
    

    b. Add one millisecond to the timestamp to ensure the Alert is not duplicated.

    Note: This does risk missing an alert if there are multiple alerts with the same 'backend_timestamp'. If you want to avoid missing any alerts then you can use the same 'backend_timestamp' however you will need to add additional processing logic to remove the duplicate alert(s) which will be the first index(es) based on how many alerts were previously fetched with the same 'backend_timestamp'.
    curl -s -H "X-Auth-Token: $ACCESS_TOKEN" -H "Content-Type: application/json" https://$CBD/api/alerts/v7/orgs/$ORG/alerts/_search \
    -d '{
        "time_range": {
            "start": "2023-08-28T05:02:29.201Z",
            "end": "2023-08-28T05:02:34.200Z"
        },
        "criteria": {},
        "rows": 10000,
        "start": 1,
        "sort": [
            {
                "field": "backend_timestamp",
                "order": "ASC"
            }
        ]
    }'
    

    c. Verify if num_found <= 10000 in API response, if less than or equal to 10k results wait for next 5min window and return to step 1 otherwise repeat from step 3a


If you want to use the Carbon Black Cloud Python SDK the following code will fetch the alerts and concatenate them into a single list for you to add custom logic or write the output to a file.

Note: This guide was updated on Oct 25th 2023 to align with the release of the Carbon Black Cloud Python SDK v1.5.0 which uses Alerts v7 API

The python file is available in the Platform Examples folder of the SDK, filename alerts_bulk_export.py.

"""Algorithm to Export Alerts from Carbon Black Cloud"""
import sys
from cbc_sdk import CBCloudAPI
from cbc_sdk.platform import Alert
from datetime import datetime, timedelta, timezone

api = CBCloudAPI(profile="YOUR_PROFILE_HERE")

# Time field and format to use
time_field = "backend_timestamp"
time_format = "%Y-%m-%dT%H:%M:%S.%fZ"

# Time window to fetch.
# Uses current time - 60s to allow for Carbon Black Cloud asynchronous event processing completion
end = datetime.now(timezone.utc) - timedelta(seconds=60)
start = end - timedelta(minutes=5)

# Fetch initial Alert batch
# the time stamp can be set either as a datetime object:
# alerts_time_as_object = list(api.select(Alert)
#                                 .set_time_range(start=start, end=end)
#                                 .sort_by(time_field, "ASC"))
# or as ISO 8601 strings
alerts = list(api.select(Alert)
                 .set_time_range(start=start.isoformat(), end=end.isoformat())
                 .sort_by(time_field, "ASC"))

# Check if 10k limit was hit.
# Iteratively fetch remaining alerts by increasing start time to the last alert fetched
if len(alerts) >= 10000:
    last_alert = alerts[-1]
    while True:
        new_start = datetime.strptime(last_alert.create_time, time_format) + timedelta(milliseconds=1)
        overflow = list(api.select(Alert)
                           .set_time_range(start=new_start, end=end)
                           .sort_by(time_field, "ASC"))

        # Extend alert list with follow up alert batches
        alerts.extend(overflow)
        if len(overflow) >= 10000:
            last_alert = overflow[-1]
        else:
            break

print(f"Fetched {len(alerts)} alert(s) from {start.strftime(time_format)} to {end.strftime(time_format)}")

Give Feedback

New survey coming soon!


Last modified on October 23, 2023