Schemas

Schemas are the typed message objects used to transmit data between components (Parser → Detector). Each schema class wraps a Protobuf message and provides a small, convenient Python API for creation, inspection, (de)serialization and validation.

This document summarizes the available schema classes, the BaseSchema API and common usage patterns.

Design goals

  • Strongly-typed contracts between components.
  • Lightweight wrapper over generated Protobuf types.
  • Simple API for tests and runtime wiring.
  • Safe (de)serialization for transport and persistence.

Architecture

Schemas are used to transfer data between components. Each schema implements the methods defined in BaseSchema. This class acts as a wrapper around the underlying Protobuf classes (op.SchemaT).

UML

All concrete schema classes inherit from BaseSchema. Key utility methods:

class BaseSchema:
    def __contains__(self, idx: str) -> bool:
        """Return if a variable is in the schema"""

    def as_dict(self) -> dict[str, Any]:
        """Return the schema variables as a dictionary."""

    def get_schema(self) -> op.SchemaT:
        """Retrieve the current schema instance."""

    def set_schema(self, schema: op.SchemaT) -> None:
        """Set the schema instance and update attributes."""

    def init_schema(self, kwargs: dict[str, Any] | None) -> None:
        """Initialize the schema instance and set attributes."""
        self.var_names = set(var_names)

    def is_field_list(self, field_name: str) -> bool:
        """Check if a field is a list."""

    def copy(self) -> "BaseSchema":
        """Create a deep copy of the schema instance."""

    def serialize(self) -> bytes:
        """Serialize the schema instance to bytes."""

    def deserialize(self, message: bytes) -> None | op.IncorrectSchema:
        """Deserialize bytes to populate the schema instance."""

    def check_is_same(self, other: Self) -> None | op.IncorrectSchema:
        """Check if another schema instance is of the same schema type."""

    def __eq__(self, other: object) -> bool:
        """Check equality between two schema instances."""

Schema Clases

Below are the primary schema classes and their main fields. All fields are optional at the Protobuf level; components should document which fields they require.

LogSchema

Represents a raw log message.

Fields:

Field Type Notes
logID string Unique identifier for the raw log.
log string Raw log text.
logSource string Source of the log (file, topic, etc.).
hostname string Hostname where log originated.

ParserSchema

Output of a Parser. Contains parsed fields and template information.

Fields:

Field Type Notes
parserType string Parser type.
parserID string Parser instance identifier.
EventID int32 Template/event identifier.
template string Event template text.
variables repeated string Parameters extracted from the template.
parsedLogID string ID assigned after parsing (optional).
logID string Original raw log ID (link to LogSchema).
log string Raw log text.
logFormatVariables map Key/value pairs from format extraction.
receivedTimestamp int32 Timestamp when log was received.
parsedTimestamp int32 Timestamp when parsing completed.

DetectorSchema

Output from Detectors (alerts / findings).

Fields:

Field Type Notes
detectorID string Detector instance identifier.
detectorType string Type/name of detector.
alertID string Unique alert identifier.
detectionTimestamp int32 When the alert was produced.
logIDs repeated string IDs of logs related to the alert.
score float Confidence/score (if applicable).
extractedTimestamps repeated int32 Timestamps extracted from logs.
description string Human-readable description of the alert.
receivedTimestamp int32 When inputs were received by detector.
alertsObtain map Additional alert metadata.

Tutorial

Small tutorials of the different schemas.

Initialize a schema

from detectmatelibrary import schemas

kwargs = load_somewhere()  # load the dict
kwargs["log"] = "Test log"

log_schema = LogSchema(kwargs)
print(log_schema.log == "Test log")  # True

Assign values

from detectmatelibrary import schemas

log_schema = LogSchema()
log_schema.log = "Test log"
print(log_schema["log"] == log_schema.log)  # True

log_schema2 = LogSchema()
print(log_schema == log_schema2)  # False

log_schema2.log = "Test log"
print(log_schema == log_schema2)  # True

Serialization

from detectmatelibrary import schemas

log_schema = LogSchema()
log_schema.log = "Test log"
serialized = log_schema.serialize()
print(isinstance(serialized, bytes))  # True

new_log_schema = LogSchema()
new_log_schema.deserialize(serialized)
print(new_log_schema.schema_id == log_schema.schema_id)  # True

Go back Index