Components: Detectors
Detectors process structured logs from Parsers and emit alerts when anomalies are detected.
| Schema | Description | |
|---|---|---|
| Input | ParserSchema | Structured log |
| Output | DetectorSchema | Alert / finding |
This document describes the minimal API, implementation guidance, a short example detector and a unit test pattern.
CoreDetector — minimal API
class CoreDetectorConfig(CoreConfig):
component_type: str = "detectors"
method_type: str = "core_detector"
parser: str = "<PLACEHOLDER>"
auto_config: bool = False
class CoreDetector(CoreComponent):
def run(
self, input_: List[ParserSchema] | ParserSchema, output_: DetectorSchema
) -> bool:
"""Define in the Core detector"""
def detect(
self,
input_: List[ParserSchema] | ParserSchema,
output_: DetectorSchema,
) -> bool:
"""Empty, must be define in the specific detector"""
def train(
self, input_: ParserSchema | list[ParserSchema]
) -> None:
"""Empty, can be define in the detector. It trains the detector"""
Implementing a detector — example
Simple detector that raises an alert when a numeric variable exceeds a threshold.
class SimpleThresholdConfig(CoreDetectorConfig):
method_type: str = "simple_threshold"
threshold: float = 0.0
class SimpleThresholdDetector(CoreDetector):
def __init__(
self, name: str = "SimpleThreshold",
config: SimpleThresholdConfig | dict[str, Any] = SimpleThresholdConfig()
):
if isinstance(config, dict):
config = SimpleThresholdConfig.from_dict(config, name)
super().__init__(name=name, buffer_mode=BufferMode.NO_BUF, config=config)
def detect(
self,
input_: schemas.ParserSchema,
output_: schemas.DetectorSchema
) -> bool:
# calculate is a dummy method
if calculate(input_) > self.config.threshold:
output_["alertID"] = f"{self.name}-{int(time.time())}"
output_["logIDs"].extend([ev.logID] if ev.logID else [])
output_["score"] = float(value)
output_["description"] = f"Value {value} > threshold {self.config.threshold}"
return True
return False
To configure the number of logs receive as input, you need to configure the buffer in the initialization of the Detector.
Detectors methods
List of detectors:
- Random detector: Generates random alerts.
- New Value: Detect new values in the variables in the logs.
- Combo Detector: Detect new combination of variables in the logs.
- New Event: Detect new events in the variables in the logs.
- Value Range: Detect numeric value ranges in variables in the logs.
- Rule Based: Detect anomalies based in a set of rules.
- Bigram Frequency: Detect bigram-frequency-based anomalies in the logs.
- Charset: Detect new characters in the variables in the logs.
Configuration
When auto_config is set to False, the detector expects an explicit events or global block that specifies exactly which variables to monitor. eventsrefers to event-specific variables while global refers to variables, that are not bound to events (header_variablescan but don't have to be event bound):
detectors:
NewValueDetector:
method_type: new_value_detector
auto_config: False
params: {} # global parameters
events: # event-specific configuration
1: # event_id
instance1: # name of instance (arbitrary)
params: {} # additional params
variables:
- pos: 0 # location of an unnamed variable from the log message
name: var1 # name of variable (arbitrary)
header_variables:
- pos: level # location of a named variable (defined in log_format of parser)
global: # define global instance for new_value_detector similar to "events"
global_instance1: # define instance name
header_variables: # same logic as header_variables in "events"
- pos: Status
Configuration semantics (preliminary)
events key — The integer key is the EventID (or event_id) to monitor (see the Template Matcher docs for how the EventID is assigned.
global key - This one has a similar functionality as the events key but refers to variables, that are not bound to events (thus can only contain header_variables).
variables[].pos — The 0-indexed position of the <*> wildcard in the matched template, counting from left to right starting at 0. For example, given:
pid=<*> uid=<*> auid=<*> ses=<*> msg='op=<*> acct=<*> exe=<*> hostname=<*> addr=<*> terminal=<*> res=<*>'
pos: 0 captures pid=, pos: 6 captures exe=, etc.
header_variables[].pos — A named field from the log format string (e.g., Type, Time, Content) rather than a wildcard position.
Auto-configuration (optional)
Detectors can optionally support auto-configuration — a process where the detector automatically discovers which variables are worth monitoring, instead of requiring the user to specify them manually.
Auto-configuration is controlled by the auto_config flag in the pipeline config (e.g. config/pipeline_config_default.yaml):
detectors:
NewValueDetector:
method_type: new_value_detector
auto_config: True # enable auto-configuration
params: {}
# no "events" block needed — it will be generated automatically
How it works
When auto-configuration is enabled, the detector goes through two extra phases before training:
Phase 1 — configure(input_): The detector ingests events into an EventPersistency instance that uses a tracker backend to analyze variable behavior — for example, whether each variable is stable, random, or still has insufficient data. This instance is typically separate from the one used for training, because the configuration phase needs to observe all variables to decide which ones are worth monitoring, while training only tracks the variables that were selected as a result.
Phase 2 — set_configuration(): After enough data has been ingested, the detector queries the tracker to select variables that meet its criteria (e.g. only stable variables). It then generates a full events configuration from those results and updates its own config. At this point auto_config is set to False in the generated config, since the configuration is now explicit.
After these two phases, the detector proceeds with the normal train() and detect() lifecycle using the generated configuration.
Implementation pattern
A detector that supports auto-configuration typically creates a separate EventPersistency instance for this purpose (but doesn't have to):
class MyDetector(CoreDetector):
def __init__(self, ...):
super().__init__(...)
# main persistency for training / detection
self.persistency = EventPersistency(
event_data_class=EventStabilityTracker,
)
# separate persistency for auto-configuration
self.auto_conf_persistency = EventPersistency(
event_data_class=EventStabilityTracker,
)
The configure() method ingests all available variables (not just configured ones) so the tracker can assess each one:
def configure(self, input_):
self.auto_conf_persistency.ingest_event(
event_id=input_["EventID"],
event_template=input_["template"],
variables=input_["variables"],
named_variables=input_["logFormatVariables"],
)
The set_configuration() method queries the tracker results and generates the final config:
def set_configuration(self):
variables = {}
for event_id, tracker in self.auto_conf_persistency.get_events_data().items():
stable_vars = tracker.get_features_by_classification("STABLE")
variables[event_id] = stable_vars
config_dict = generate_detector_config(
variable_selection=variables,
detector_name=self.name,
method_type=self.config.method_type,
)
self.config = MyDetectorConfig.from_dict(config_dict, self.name)
Full lifecycle with auto-configuration
1. configure(input_) # call for each event in the dataset
2. set_configuration() # finalize which variables to monitor
3. train(input_) # call for each event in the dataset
4. detect(input_, output_) # call for each event to detect anomalies
When auto_config is False, steps 1 and 2 are skipped entirely.
Saving state (persist)
Detectors can persist their training state to disk (or cloud storage) so it
can be restored in a later session. Configure this with a top-level persist:
block in the detector config:
detectors:
NewValueDetector:
method_type: new_value_detector
persist:
path: ./state # base path; detector name is appended automatically
interval_seconds: 300 # save every N seconds (default: 300)
events_until_save: null # also save after N ingested events (default: disabled)
auto_load: false # restore saved state on startup (default: false)
storage_options: {} # backend credentials (see below)
events:
...
All fields are optional — persist: {} uses all defaults. Omitting persist: entirely
disables saving (backward compatible).
The detector name is automatically appended to path, so path: ./state for a detector
named NewValueDetector writes to ./state/NewValueDetector/.
Fields
| Field | Type | Default | Description |
|---|---|---|---|
path |
str |
"./state" |
Base directory or cloud URL. Detector name is appended. |
interval_seconds |
int |
300 |
Background save interval in seconds. |
events_until_save |
int \| null |
null |
Save after this many ingested events. null disables event-count triggering. |
auto_load |
bool |
false |
Load saved state on construction. Raises PersistencyLoadError if no state exists. |
storage_options |
dict |
{} |
Credentials and options forwarded to fsspec. |
Storage options examples
Local filesystem — no storage_options needed:
persist:
path: ./state
S3:
persist:
path: s3://my-bucket/detector-state
storage_options:
key: AKIAIOSFODNN7EXAMPLE
secret: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
region_name: eu-west-1
S3-compatible storage (MinIO, etc.):
persist:
path: s3://my-bucket/detector-state
storage_options:
endpoint_url: http://minio:9000
key: minioadmin
secret: minioadmin
Azure Blob Storage:
persist:
path: az://my-container/detector-state
storage_options:
account_name: mystorageaccount
account_key: base64encodedkey==
GCS:
persist:
path: gs://my-bucket/detector-state
storage_options:
project: my-gcp-project
token: /path/to/service-account.json
In practice, credentials are usually supplied via environment variables
(AWS_ACCESS_KEY_ID, etc.) or instance roles — in which case storage_options
stays empty or is omitted.
Go back Index