Timestamper
This presentations goal it to introduce the features of the timestamper and how to configure it.
The challenge
I want normalize different time formats to one output format and timezone.
from this:
[ ]:
document = {
"winlog": {
"api": "wineventlog",
"event_id": 123456789,
"event_data": {"some_timestamp_utc": "1642160449"},
}
}
to this:
[ ]:
expected = {
"@timestamp": "2022-01-14T12:40:49+01:00",
"winlog": {
"api": "wineventlog",
"event_id": 123456789,
"event_data": {"some_timestamp_utc": "1642160449"},
},
}
Create rule and processor
create the rule:
[ ]:
import sys
sys.path.append("../../../../../")
import tempfile
from pathlib import Path
rule_yaml = """---
filter: "winlog.event_id: 123456789"
timestamper:
source_fields: ["winlog.event_data.some_timestamp_utc"]
target_field: "@timestamp"
source_format: UNIX
source_timezone: UTC
target_timezone: Europe/Berlin
description: example timestamper rule
"""
rule_path = Path(tempfile.gettempdir()) / "timestamper"
rule_path.mkdir(exist_ok=True)
rule_file = rule_path / "timestamper.yml"
rule_file.write_text(rule_yaml)
create the processor config:
[ ]:
processor_config = {
"my_timestamper":{
"type": "timestamper",
"rules": [str(rule_path), "/dev"],
}
}
create the processor with the factory:
[ ]:
from unittest import mock
from logprep.factory import Factory
mock_logger = mock.MagicMock()
processor = Factory.create(processor_config)
processor
Process event
[ ]:
from copy import deepcopy
mydocument = deepcopy(document)
print(f"before: {mydocument}")
processor.process(mydocument)
print(f"after: {mydocument}")
print(mydocument == expected)