Components: Detectors

Detectors process structured logs from Parsers and emit alerts when anomalies are detected.

Schema Description
Input ParserSchema Structured log
Output DetectorSchema Alert / finding

This document describes the minimal API, implementation guidance, a short example detector and a unit test pattern.

CoreDetector — minimal API

class CoreDetectorConfig(CoreConfig):
    component_type: str = "detectors"
    method_type: str = "core_detector"
    parser: str = "<PLACEHOLDER>"

    auto_config: bool = False


class CoreDetector(CoreComponent):
    def run(
        self, input_: List[ParserSchema] | ParserSchema, output_: DetectorSchema
    ) -> bool:
        """Define in the Core detector"""

    def detect(
        self,
        input_: List[ParserSchema] | ParserSchema,
        output_: DetectorSchema,
    ) -> bool:
        """Empty, must be define in the specific detector"""

    def train(
        self, input_: ParserSchema | list[ParserSchema]
    ) -> None:
        """Empty, can be define in the detector. It trains the detector"""

Implementing a detector — example

Simple detector that raises an alert when a numeric variable exceeds a threshold.

class SimpleThresholdConfig(CoreDetectorConfig):
    method_type: str = "simple_threshold"
    threshold: float = 0.0

class SimpleThresholdDetector(CoreDetector):
    def __init__(
        self, name: str = "SimpleThreshold",
        config: SimpleThresholdConfig | dict[str, Any] = SimpleThresholdConfig()
    ):

        if isinstance(config, dict):
            config = SimpleThresholdConfig.from_dict(config, name)
        super().__init__(name=name, buffer_mode=BufferMode.NO_BUF, config=config)

    def detect(
        self,
        input_: schemas.ParserSchema,
        output_: schemas.DetectorSchema
    ) -> bool:

        # calculate is a dummy method
        if calculate(input_) > self.config.threshold:

            output_["alertID"] = f"{self.name}-{int(time.time())}"
            output_["logIDs"].extend([ev.logID] if ev.logID else [])
            output_["score"] = float(value)
            output_["description"] = f"Value {value} > threshold {self.config.threshold}"
            return True

        return False

To configure the number of logs receive as input, you need to configure the buffer in the initialization of the Detector.

Detectors methods

List of detectors:

  • Random detector: Generates random alerts.
  • New Value: Detect new values in the variables in the logs.
  • Combo Detector: Detect new combination of variables in the logs.
  • New Event: Detect new events in the variables in the logs.
  • Value Range: Detect numeric value ranges in variables in the logs.
  • Rule Based: Detect anomalies based in a set of rules.
  • Bigram Frequency: Detect bigram-frequency-based anomalies in the logs.
  • Charset: Detect new characters in the variables in the logs.

Configuration

When auto_config is set to False, the detector expects an explicit events or global block that specifies exactly which variables to monitor. eventsrefers to event-specific variables while global refers to variables, that are not bound to events (header_variablescan but don't have to be event bound):

detectors:
  NewValueDetector:
    method_type: new_value_detector
    auto_config: False
    params: {}  # global parameters
    events:  # event-specific configuration
      1:  # event_id
        instance1:  # name of instance (arbitrary)
          params: {}  # additional params
          variables:
            - pos: 0  # location of an unnamed variable from the log message
              name: var1  # name of variable (arbitrary)
          header_variables:
            - pos: level  # location of a named variable (defined in log_format of parser)
    global:  # define global instance for new_value_detector similar to "events"
      global_instance1:  # define instance name
        header_variables:  # same logic as header_variables in "events"
          - pos: Status

Configuration semantics (preliminary)

events key — The integer key is the EventID (or event_id) to monitor (see the Template Matcher docs for how the EventID is assigned.

global key - This one has a similar functionality as the events key but refers to variables, that are not bound to events (thus can only contain header_variables).

variables[].pos — The 0-indexed position of the <*> wildcard in the matched template, counting from left to right starting at 0. For example, given:

pid=<*> uid=<*> auid=<*> ses=<*> msg='op=<*> acct=<*> exe=<*> hostname=<*> addr=<*> terminal=<*> res=<*>'

pos: 0 captures pid=, pos: 6 captures exe=, etc.

header_variables[].pos — A named field from the log format string (e.g., Type, Time, Content) rather than a wildcard position.

Auto-configuration (optional)

Detectors can optionally support auto-configuration — a process where the detector automatically discovers which variables are worth monitoring, instead of requiring the user to specify them manually.

Auto-configuration is controlled by the auto_config flag in the pipeline config (e.g. config/pipeline_config_default.yaml):

detectors:
  NewValueDetector:
    method_type: new_value_detector
    auto_config: True       # enable auto-configuration
    params: {}
    # no "events" block needed — it will be generated automatically

How it works

When auto-configuration is enabled, the detector goes through two extra phases before training:

Phase 1 — configure(input_): The detector ingests events into an EventPersistency instance that uses a tracker backend to analyze variable behavior — for example, whether each variable is stable, random, or still has insufficient data. This instance is typically separate from the one used for training, because the configuration phase needs to observe all variables to decide which ones are worth monitoring, while training only tracks the variables that were selected as a result.

Phase 2 — set_configuration(): After enough data has been ingested, the detector queries the tracker to select variables that meet its criteria (e.g. only stable variables). It then generates a full events configuration from those results and updates its own config. At this point auto_config is set to False in the generated config, since the configuration is now explicit.

After these two phases, the detector proceeds with the normal train() and detect() lifecycle using the generated configuration.

Implementation pattern

A detector that supports auto-configuration typically creates a separate EventPersistency instance for this purpose (but doesn't have to):

class MyDetector(CoreDetector):
    def __init__(self, ...):
        super().__init__(...)

        # main persistency for training / detection
        self.persistency = EventPersistency(
            event_data_class=EventStabilityTracker,
        )
        # separate persistency for auto-configuration
        self.auto_conf_persistency = EventPersistency(
            event_data_class=EventStabilityTracker,
        )

The configure() method ingests all available variables (not just configured ones) so the tracker can assess each one:

def configure(self, input_):
    self.auto_conf_persistency.ingest_event(
        event_id=input_["EventID"],
        event_template=input_["template"],
        variables=input_["variables"],
        named_variables=input_["logFormatVariables"],
    )

The set_configuration() method queries the tracker results and generates the final config:

def set_configuration(self):
    variables = {}
    for event_id, tracker in self.auto_conf_persistency.get_events_data().items():
        stable_vars = tracker.get_features_by_classification("STABLE")
        variables[event_id] = stable_vars

    config_dict = generate_detector_config(
        variable_selection=variables,
        detector_name=self.name,
        method_type=self.config.method_type,
    )
    self.config = MyDetectorConfig.from_dict(config_dict, self.name)

Full lifecycle with auto-configuration

1. configure(input_)         # call for each event in the dataset
2. set_configuration()       # finalize which variables to monitor
3. train(input_)             # call for each event in the dataset
4. detect(input_, output_)   # call for each event to detect anomalies

When auto_config is False, steps 1 and 2 are skipped entirely.

Saving state (persist)

Detectors can persist their training state to disk (or cloud storage) so it can be restored in a later session. Configure this with a top-level persist: block in the detector config:

detectors:
  NewValueDetector:
    method_type: new_value_detector
    persist:
      path: ./state               # base path; detector name is appended automatically
      interval_seconds: 300       # save every N seconds (default: 300)
      events_until_save: null     # also save after N ingested events (default: disabled)
      auto_load: false            # restore saved state on startup (default: false)
      storage_options: {}         # backend credentials (see below)
    events:
      ...

All fields are optional — persist: {} uses all defaults. Omitting persist: entirely disables saving (backward compatible).

The detector name is automatically appended to path, so path: ./state for a detector named NewValueDetector writes to ./state/NewValueDetector/.

Fields

Field Type Default Description
path str "./state" Base directory or cloud URL. Detector name is appended.
interval_seconds int 300 Background save interval in seconds.
events_until_save int \| null null Save after this many ingested events. null disables event-count triggering.
auto_load bool false Load saved state on construction. Raises PersistencyLoadError if no state exists.
storage_options dict {} Credentials and options forwarded to fsspec.

Storage options examples

Local filesystem — no storage_options needed:

persist:
  path: ./state

S3:

persist:
  path: s3://my-bucket/detector-state
  storage_options:
    key: AKIAIOSFODNN7EXAMPLE
    secret: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
    region_name: eu-west-1

S3-compatible storage (MinIO, etc.):

persist:
  path: s3://my-bucket/detector-state
  storage_options:
    endpoint_url: http://minio:9000
    key: minioadmin
    secret: minioadmin

Azure Blob Storage:

persist:
  path: az://my-container/detector-state
  storage_options:
    account_name: mystorageaccount
    account_key: base64encodedkey==

GCS:

persist:
  path: gs://my-bucket/detector-state
  storage_options:
    project: my-gcp-project
    token: /path/to/service-account.json

In practice, credentials are usually supplied via environment variables (AWS_ACCESS_KEY_ID, etc.) or instance roles — in which case storage_options stays empty or is omitted.

Go back Index