Components: Parsers
Parsers convert unstructured raw logs into structured ParserSchema objects that downstream detectors consume.
| Schema | Description | |
|---|---|---|
| Input | LogSchema | Unstructured log |
| Output | ParserSchema | Structured log |
This document explains expected APIs, how to implement a parser, testing tips and common pitfalls.
Overview
- Parsers must inherit from
CoreParserand provide aparse()implementation. CoreParser.run()handles lifecycle and callsparse()for each input; implement pure parsing logic insideparse()where possible.- Use a typed
Configclass (subclass ofCoreParserConfig) to hold runtime parameters.
CoreParser — minimal API
Recommended signatures and behavior:
class CoreParser:
def run(self, input_: schemas.LogSchema, output_: schemas.ParserSchema) -> bool:
"""Top-level runner. Calls parse() and performs pre/post processing.
Return True when a parsed output was produced, False otherwise.
"""
def parse(self, input_: schemas.LogSchema, output_: schemas.ParserSchema) -> bool:
"""Implement parsing here.
- Fill required output_ fields (see ParserSchema table below).
- Return True if parsing succeeded and output_ contains a result.
"""
def train(self, input_: Iterable[schemas.LogSchema]) -> None:
"""Optional: train internal models. Can be a no-op for stateless parsers."""
ParserSchema — what to populate
Minimum fields commonly expected by downstream components:
EventID(int) — identifier for the matched template/eventtemplate(string) — event template textvariables(repeated string) — extracted parameters (extend the list)parsedLogID/logID— identifiers linking raw and parsed recordsparsedTimestamp/receivedTimestamp— timestamps
Creating a new parser — step by step
- Create a Config class inheriting
CoreParserConfig. - Create parser class inheriting
CoreParser. - Implement
parse()to populateoutput_. - Add unit tests for
parse()behavior.
Example:
# filepath: src/detectmatelibrary/parsers/my_parser.py
from detectmatelibrary.common.parser import CoreParser, CoreParserConfig
from detectmatelibrary import schemas
from typing import Any
class MyParserConfig(CoreParserConfig):
method_type: str = "my_parser"
# add parser-specific settings here
pattern: str | None = None
class MyParser(CoreParser):
def __init__(self, name: str = "MyParser", config: MyParserConfig | dict[str, Any] = MyParserConfig()):
if isinstance(config, dict):
config = MyParserConfig.from_dict(config, name)
super().__init__(name=name, config=config)
def parse(self, input_: schemas.LogSchema, output_: schemas.ParserSchema) -> bool:
text = input_.log or ""
# simple example: split on whitespace and treat as variables
tokens = text.split()
output_["EventID"] = 1
output_["template"] = " ".join(["<*>"] * len(tokens))
output_["variables"].extend(tokens)
output_["parsedTimestamp"] = int(time.time())
return True
Testing parsers
Unit test parse() directly using ParserSchema objects:
def test_my_parser_parse():
from detectmatelibrary.parsers.my_parser import MyParser
from detectmatelibrary import schemas
parser = MyParser()
raw = schemas.LogSchema({"log": "a b c", "logID": "1"})
out = schemas.ParserSchema()
ok = parser.parse(raw, out)
assert ok is True
assert out["EventID"] == 1
assert out["variables"] == ["a", "b", "c"]
Go back to Index