Processors

Amides

The Amides processor implements the proof-of-concept Adaptive Misuse Detection System (AMIDES). AMIDES extends conventional rule matching of SIEM systems by applying machine learning components aiming to detect attacks that evade existing SIEM rules as well as otherwise undetected attack variants. It learns from SIEM rules and historical benign events and can thus estimate which SIEM rule was tried to be evaded. An overview of AMIDES is depicted in the figure below.

../_images/amides.svg

Overview of the AMIDES architecture.

The machine learning components of AMIDES are trained using the current SIEM rule set and historical benign events. Incoming events are transformed into feature vectors by the feature extraction component. During operation, features learned during the training phase will be re-used by the feature extraction component. Feature vectors are then passed to the Misuse Classification component which classifies events as malicious or benign. In case of a malicious result, the feature vector is passed to the Rule Attribution component which generates a ranked list of SIEM rules potentially evaded by the event. Finally, results generated by the Rule Attribution component and conventional rule matching results can be correlated for alert generation.

Since there is a plethora of different SIEM event types, the current implementation focuses on events that provide process command lines. Command lines are most commonly targeted by SIEM rules while they are also highly vulnerable to evasions. The rules and models for AMIDES provided in the deployment examples are for Sysmon Process Creation events. In general, the Amides rule format allows to create rules for other event types that provide process command lines, e.g. Process Creation events generated by Windows Security Auditing.

Misuse classification is performed by the MisuseDetector class. Instances of the MisuseDetector contain the model for misuse classification, which includes the trained classifier instance, the corresponding feature extractor, and an additional scaler to transform classifier results into the pre-defined output range between 0 and 1. The processor configuration parameter decision_threshold is used to fine-tune the classification results produced by the misuse detector.

Rule attribution is performed by the RuleAttributor class. The num_rule_attributions configuration parameter determines the number of rule attributions returned by the attributor. Models and vectorizer for rule attribution and feature extraction are held by RuleAttributor instances.

In order to speed up the detection and attribution process, the Amides processor makes use of a LRU cache that keeps track of incoming command line samples. In case of a previously seen command line, classification and attribution results can be retrieved from the cache in a shorter amount of time. The max_cache_entries configuration parameter determines the maximum number of elements of the internal cache.

Models used by the MisuseDetector and RuleAttributor are currently generated by scikit-learn. Each trained model needs to be packed into a dictionary together with its corresponding feature extractor and scaler. Dictionaries are then pickled and compressed (.zip). The URI or path of the compressed models file is given by the models_path configuration parameter. An example of a configuration of the Amides processor is given below:

Processor Configuration

1- amides:
2    type: amides
3    rules:
4        - tests/testdata/rules/rules
5    max_cache_entries: 10000
6    decision_threshold: 0.0
7    num_rule_attributions: 10

To keep track of the components performance, the Amides processor tracks several processor metrics. This includes the mean misuse detection time, the mean rule attribution time, and several cache-related metrics like the number of hits and misses and the current cache load.

class logprep.processor.amides.processor.Amides.Config

Amides processor configuration class.

max_cache_entries: int

Maximum number of cached command lines and their rule attribution results.

decision_threshold: float

Specifies the decision threshold of the misuse detector to adjust it’s overall classification performance.

num_rule_attributions: int

Number of rule attributions returned in case of a positive misuse detection result.

models_path: str

Path or URI of the archive (.zip) containing the models used by the misuse detector and the rule attributor.

Security Best Practice - Processor - Amides Model

Ensure that you only use models from trusted sources, as it can be used to inject python code into the runtime.

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

This rule enables to check if incoming documents are of a specific type suitable for classification by the Amides processor. The specified source_field should contain command line strings. In case of an positive detection result, rule attributions are written into the target_field.

The following example shows a complete rule:

Example
1filter: 'some_field: "sample_cmdline"'
2amides:
3    source_fields: ["process.command_line"]
4    target_field: "rule_attributions"
5description: Sample rule for AMIDES processor.
class logprep.processor.amides.rule.AmidesRule.Config

Config of AmidesRule to specify source fields of command lines and target field of rule attribution results.

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The fields from where to get the values which should be processed, requires target_field.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

tests: List[Dict[str, str]]

Custom tests for this rule.

Calculator

The Calculator can be used to calculate with or without field values.

Processor Configuration

1- calculatorname:
2    type: calculator
3    rules:
4        - tests/testdata/rules/rules
class logprep.processor.calculator.processor.Calculator.Config

Common Configurations

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

A speaking example:

Given calculator rule
1filter: 'duration'
2calculator:
3  target_field: duration
4  calc: ${duration} * 10e5
5  overwrite_target: True
6description: '...'
Incoming event
1{"duration": "0.01"}
Processed event
1{"duration": 10000.0}
class logprep.processor.calculator.rule.CalculatorRule.Config

Config for Calculator

calc: str

The calculation expression. Fields from the event can be used by surrounding them with ${ and }.

timeout: int

The maximum time in seconds for the calculation. Defaults to 1

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged, and the event is not tagged with the a failure tag. As soon as one field is missing no calculation is performed at all. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The fields from where to get the values which should be processed, requires target_field.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

tests: List[Dict[str, str]]

Custom tests for this rule.

Following a list with example calculation expressions, where all factors and the operators can be retrieved from a field with the schema ${your.dotted.field}:

  • 9 => 9

  • -9 => -9

  • --9 => 9

  • -E => -math.e

  • 9 + 3 + 6 => 9 + 3 + 6

  • 9 + 3 / 11 => 9 + 3.0 / 11

  • (9 + 3) => (9 + 3)

  • (9+3) / 11 => (9 + 3.0) / 11

  • 9 - 12 - 6 => 9 - 12 - 6

  • 9 - (12 - 6) => 9 - (12 - 6)

  • 2*3.14159 => 2 * 3.14159

  • 3.1415926535*3.1415926535 / 10 => 3.1415926535 * 3.1415926535 / 10

  • PI * PI / 10 => math.pi * math.pi / 10

  • PI*PI/10 => math.pi * math.pi / 10

  • PI^2 => math.pi ** 2

  • round(PI^2) => round(math.pi ** 2)

  • 6.02E23 * 8.048 => 6.02e23 * 8.048

  • e / 3 => math.e / 3

  • sin(PI/2) => math.sin(math.pi / 2)

  • 10+sin(PI/4)^2 => 10 + math.sin(math.pi / 4) ** 2

  • trunc(E) => int(math.e)

  • trunc(-E) => int(-math.e)

  • round(E) => round(math.e)

  • round(-E) => round(-math.e)

  • E^PI => math.e ** math.pi

  • exp(0) => 1

  • exp(1) => math.e

  • 2^3^2 => 2 ** 3 ** 2

  • (2^3)^2 => (2 ** 3) ** 2

  • 2^3+2 => 2 ** 3 + 2

  • 2^3+5 => 2 ** 3 + 5

  • 2^9 => 2 ** 9

  • sgn(-2) => -1

  • sgn(0) => 0

  • sgn(0.1) => 1

  • round(E, 3) => round(math.e, 3)

  • round(PI^2, 3) => round(math.pi ** 2, 3)

  • sgn(cos(PI/4)) => 1

  • sgn(cos(PI/2)) => 0

  • sgn(cos(PI*3/4)) => -1

  • +(sgn(cos(PI/4))) => 1

  • -(sgn(cos(PI/4))) => -1

  • hypot(3, 4) => 5

  • multiply(3, 7) => 21

  • all(1,1,1) => True

  • all(1,1,1,1,1,0) => False

The calc expression is not whitespace sensitive.

Clusterer

The log clustering is mainly developed for Syslogs, unstructured and semi-structured logs. The clusterer calculates a log signature based on the message field. The log signature is calculated with heuristic and deterministic rules. The idea of a log signature is to extract a subset of the constant parts of a log and to delete the dynamic parts. If the fields syslog.facility and event.severity are in the log, then they are prefixed to the log signature.

Logs are only clustered if at least one of the following criteria is fulfilled:

Criteria 1: { "message": "A sample message", "tags": ["clusterable", ...], ... }
Criteria 2: { "message": "A sample message", "clusterable": true, ... }
Criteria 3: { "message": "A sample message", "syslog": { "facility": <number> }, "event": { "severity": <string> }, ... }

Processor Configuration

1- clusterername:
2    type: clusterer
3    rules:
4        - tests/testdata/rules/rules
5    output_field_name: target_field
class logprep.processor.clusterer.processor.Clusterer.Config

Clusterer Configuration

output_field_name: str

defines in which field results of the clustering should be stored.

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

Rules of the clusterer are evaluated in alphanumerical order. Some rules do only make sense if they are performed in a sequence with other rules. The clusterer matches multiple rules at once and applies them all before creating a clustering signature. Therefore, it is recommended to prefix rules with numbers, i.e. 00_01_*. Here the first two digits represent a type of rules that make sense together and the second digits represent the order of rules of the same type.

A subset of terms from this field can be extracted into the clustering-signature field defined in the clusterer configuration.

Since clusterer rules must be used in a sequence, it makes no sense to perform regular auto tests on them. Thus, every rule can have a field tests containing signature calculation tests. It can contain one test or a list of tests. Each tests consists of the fields tests.raw and tests.result. tests.raw is the input and would be usually the message. tests.result is the expected result.

Example - One Test
1filter: ...
2clusterer: ...
3tests:
4  raw:    'Some message'
5  result: 'Some changed message'
Example - Multiple Test
1filter: ...
2clusterer: ...
3tests:
4  - raw:    'Some message'
5    result: 'Some changed message'
6  - raw:    'Another message'
7    result: 'Another changed message'

In the following rule example the word byte is stemmed.

Example - Stemming Rule
1filter: message
2clusterer:
3  target: message
4  pattern: '(bytes|Bytes|Byte)'
5  repl: 'byte'
6description: '...'
7tests:
8  raw:    'Byte is a Bytes is a bytes is a byte'
9  result: 'byte is a byte is a byte is a byte'

In the following rule example the word baz is removed.

Example - Removal Rule
1filter: message
2clusterer:
3  target: message
4  pattern: 'foo (bar) baz'
5  repl: ''
6description: '...'
7tests:
8  raw:    'foo bar baz'
9  result: 'foo  baz'

In the following rule example the word baz is surrounded by extraction tags.

Example - Extraction Rule
1filter: message
2clusterer:
3  target: message
4  pattern: 'foo (bar) baz'
5  repl: '<+></+>'
6description: '...'
7tests:
8  raw:    'foo bar baz'
9  result: 'foo <+>bar</+> baz'
class logprep.processor.clusterer.rule.ClustererRule.Config

RuleConfig for Clusterer

pattern: Pattern

Defines the regex pattern that will be matched on the clusterer.source_fields.

repl: str

Anything within a capture group in clusterer.pattern will be substituted with values defined in clusterer.repl. The clusterer will only extract terms into a signature that are surrounded by the tags <+></+>. One could first use rules to remove common terms, other rules to perform stemming and finally rules to wrap terms in <+></+> to create a signature.

For example: * Setting clusterer.repl: '' would remove anything within a capture group. * Setting clusterer.repl: 'FOO' would replace anything within a capture group with FOO. * Setting clusterer.repl: '<+></+>' would surround anything within a capture group with <+></+>.

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to True

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The field from where to get the value which should be clustered.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

tests: List[Dict[str, str]]

Custom tests for this rule.

Concatenator

The concatenator processor allows to concat a list of source fields into one new target field. The concat separator and the target field can be specified. Furthermore, it is possible to directly delete all given source fields, or to overwrite the specified target field.

Processor Configuration

1- Concatenatorname:
2    type: concatenator
3    rules:
4        - tests/testdata/rules/rules
class logprep.processor.concatenator.processor.Concatenator.Config

Common Configurations

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

A speaking example:

Given concatenator rule
1filter: 'date AND time'
2concatenator:
3  source_fields: ["date", "time"]
4  target_field: timestamp
5  separator: " "
6  overwrite_target: True
7  delete_source_fields: True
8description: '...'
Incoming event
1{
2    "date": "01.01.1007",
3    "time": "13:07"
4}
Processed event
1{
2    "timetsamp": "01.01.1007 13:07"
3}
class logprep.processor.concatenator.rule.ConcatenatorRule.Config

RuleConfig for Concatenator

separator: str

The character(s) that should be used between the combined source field values.

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The source fields that should be concatenated, can contain dotted field paths.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

tests: List[Dict[str, str]]

Custom tests for this rule.

DatetimeExtractor

The datetime_extractor is a processor that can extract timestamps from a field and split it into its parts.

Processor Configuration

1- datetimeextractorname:
2    type: datetime_extractor
3    rules:
4        - tests/testdata/rules/rules
class logprep.processor.datetime_extractor.processor.DatetimeExtractor.Config

Common Configurations

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

The datetime extractor requires the additional field datetime_extractor. The additional fields datetime_extractor.source_fields and datetime_extractor.target_field must be defined. The first one contains the name of the field from which the timestamp should be taken and the last one contains the name of the field under which a split timestamp should be written.

In the following example the timestamp will be extracted from @timestamp and written to split_@timestamp.

Example
1filter: '@timestamp'
2datetime_extractor:
3  source_fields: ['@timestamp']
4  target_field: 'split_@timestamp'
5description: '...'
class logprep.processor.datetime_extractor.rule.DatetimeExtractorRule.Config

Config for DatetimeExtractorRule

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The fields from where to get the values which should be processed.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to.

tests: List[Dict[str, str]]

Custom tests for this rule.

Deleter

The deleter is a processor that removes an entire event from further pipeline processing.

Processor Configuration

1- deletename:
2    type: deleter
3    rules:
4        - tests/testdata/rules/rules
class logprep.processor.deleter.processor.Deleter.Config

Common Configurations

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

The example below deletes the log message if the message field equals “foo”.

Example delete rule
1filter: 'message: "foo"'
2deleter:
3    delete: true
4description: '...'
class logprep.processor.deleter.rule.DeleterRule.Config

Config for DeleterRule

delete: bool

Delete or not

target_field
description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

tests: List[Dict[str, str]]

Custom tests for this rule.

Decoder

The decoder processor decodes or parses field values from the configured source_format. Following options for source_format are implemented:

Processor Configuration

1- samplename:
2    type: decoder
3    rules:
4        - tests/testdata/rules/
class logprep.processor.decoder.processor.Decoder.Config

Common Configurations

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Decoder

With the decoder processor you are able to parse fields from different formats.

A speaking example:

Given decoder rule
1filter: message
2decoder:
3    source_format: json
4    mapping:
5        message: parsed
6description: 'parse message field to the field called parsed'
Incoming event
1{
2    "message": "{\"timestamp\": \"2019-08-02T09:46:18.625Z\", \"log\": \"user login failed\"}"
3}
Processed event
1{
2    "message": "{\"timestamp\": \"2019-08-02T09:46:18.625Z\", \"log\": \"user login failed\"}",
3    "parsed": {
4        "timestamp": "2019-08-02T09:46:18.625Z",
5        "log": "user login failed"
6    }
7}
class logprep.processor.decoder.rule.DecoderRule.Config

Config for DecoderRule

source_format: str

The source format in the source field. Defaults to json Possible values are json, base64, clf, nginx, syslog_rfc5424, syslog_rfc3164, syslog_rfc3164_local, logfmt, cri, docker, decolorize

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The field to decode as list with maximum one element. For multi field operations use mapping instead.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

tests: List[Dict[str, str]]

Custom tests for this rule.

Examples for decoder:

Dissector

The dissector is a processor that tokenizes incoming strings using defined patterns. The behavior is based of the logstash dissect filter plugin and has the same advantage that for the event processing no regular expressions are used. Additionally, it can be used to convert datatypes of given fields.

Processor Configuration

1- dissectorname:
2    type: dissector
3    rules:
4        - tests/testdata/rules/rules
class logprep.processor.dissector.processor.Dissector.Config

Common Configurations

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

A speaking example:

Given dissector rule
1filter: message
2dissector:
3    mapping:
4        message: "%{}of %{extracted.message_float} and an int of %{extracted.message_int}"
5    convert_datatype:
6        extracted.message_int: "int"
7        extracted.message_float: "float"
8description: '...'
Incoming event
1{"message": "This message has a float of 1.23 and an int of 1337"}
Processed event
1{
2    "message": "This message has a float of 1.23 and an int of 1337",
3    "extracted": {"message_float": 1.23, "message_int": 1337},
4}
Dissect Pattern Language

The dissect pattern describes the textual format of the source field.

Given a dissect pattern of %{field1} %{field2} the source field value will be dissected into everything before the first whitespace which would be written into the field field1 and everything after the first whitespace which would be written into the field field2.

The string between %{ and } is the desired target field. This can be declared in dotted field notation (e.g. %{target.subfield1.subfield2}). Every subfield between the first and the last subfield will be created if necessary.

By default, the target field will always be overwritten with the captured value. If you want to append to a preexisting target field value, as string or list, you have to use the + operator. If you want to use a prefix before the appended string use this notation +( ). In this example a whitespace would be added before the extracted string is added. If you want to use the symbols ( or ) as your separator, you have to escape with \` (e.g. :code:`+(\()).

If you want to remove unwanted padding characters around a dissected pattern you have to use the -(<char>) notation, while <char> can be any character similar to the +( ) notation. If for example you have a field like "[2022-11-04 10:00:00 AM     ] - 127.0.0.1" and you want to extract the timestamp and the ip, you can use the dissect-pattern [%{time-( )}] - %{ip} to remove the unwanted spaces after the ‘AM’. This works independent of the number of spaces.

It is also possible to capture the target field name from the source field value with the notation %{?<your name for the reference>} (e.g. %{?key1}). In the same dissection pattern this can be referred to with the notation %{&<the reference>} (e.g. %{&key1}). References can be combined with the append operator. For examples see below.

Additionally an optional convert datatype can be provided after the key using | as separator to convert the value from string to int, float or bool. The conversion to bool is interpreted by meaning. (e.g. yes is translated to True). When removing padding characters at the same time then the conversion has to come after the padding character (e.g. %{field2-(#)|bool}).

If you want to reorder parts of a dissection you can give the order by adding /<position> to the dissect pattern. A valid example would be: %{time/1} %{+time/3} %{+time/2}. When removing padding characters at the same time then the position has to come after the padding character (e.g. %{time-(*)/2}).

class logprep.processor.dissector.rule.DissectorRule.Config

Config for Dissector

convert_datatype: dict

A mapping from source field and desired datatype [optional]. The datatypes could be float, int, bool, string

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A mapping from source fields to a dissect pattern [optional]. Dotted field notation is possible in key and in the dissect pattern.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The fields from where to get the values which should be processed, requires target_field.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

tests: List[Dict[str, str]]

Custom tests for this rule.

Examples for dissection and datatype conversion:

writes new fields with same separator:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} %{field2} %{field3} %{field4}'}}}

  • message: {'message': 'This is a message'}

  • processed: {'message': 'This is a message', 'field1': 'This', 'field2': 'is', 'field3': 'a', 'field4': 'message'}

writes new fields with different separator:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} %{field2}:%{field3} %{field4}'}}}

  • message: {'message': 'This is:a message'}

  • processed: {'message': 'This is:a message', 'field1': 'This', 'field2': 'is', 'field3': 'a', 'field4': 'message'}

writes new fields with long separator:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} is %{field3} %{field4}'}}}

  • message: {'message': 'This is a message'}

  • processed: {'message': 'This is a message', 'field1': 'This', 'field3': 'a', 'field4': 'message'}

writes new fields and appends to existing list:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} is %{field3} %{+field4}'}}}

  • message: {'message': 'This is a message', 'field4': ['preexisting']}

  • processed: {'message': 'This is a message', 'field1': 'This', 'field3': 'a', 'field4': ['preexisting', 'message']}

writes new fields and appends to existing empty list:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} is %{field3} %{+field4}'}}}

  • message: {'message': 'This is a message', 'field4': []}

  • processed: {'message': 'This is a message', 'field1': 'This', 'field3': 'a', 'field4': ['message']}

writes new fields and appends to existing string:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} is %{field3} %{+( )field4}'}}}

  • message: {'message': 'This is a message', 'field4': 'preexisting'}

  • processed: {'message': 'This is a message', 'field1': 'This', 'field3': 'a', 'field4': 'preexisting message'}

writes new dotted fields:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} %{my.new.field2} %{field3} %{+field4}'}}}

  • message: {'message': 'This is a message', 'field4': 'preexisting'}

  • processed: {'message': 'This is a message', 'field1': 'This', 'my': {'new': {'field2': 'is'}}, 'field3': 'a', 'field4': 'preexistingmessage'}

overwrites dotted fields:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} %{my.new.field2} %{field3} %{+( )field4}'}}}

  • message: {'message': 'This is a message', 'field4': 'preexisting', 'my': {'new': {'field2': 'preexisting'}}}

  • processed: {'message': 'This is a message', 'field1': 'This', 'my': {'new': {'field2': 'is'}}, 'field3': 'a', 'field4': 'preexisting message'}

appends to dotted fields preexisting string:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} %{+my.new.field2} %{field3} %{+( )field4}'}}}

  • message: {'message': 'This is a message', 'field4': 'preexisting', 'my': {'new': {'field2': 'preexisting'}}}

  • processed: {'message': 'This is a message', 'field1': 'This', 'my': {'new': {'field2': 'preexistingis'}}, 'field3': 'a', 'field4': 'preexisting message'}

appends to dotted fields preexisting list:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} %{+my.new.field2} %{field3} %{+( )field4}'}}}

  • message: {'message': 'This is a message', 'field4': 'preexisting', 'my': {'new': {'field2': ['preexisting']}}}

  • processed: {'message': 'This is a message', 'field1': 'This', 'my': {'new': {'field2': ['preexisting', 'is']}}, 'field3': 'a', 'field4': 'preexisting message'}

processes dotted source field:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message.key1.key2': '%{field1} %{field2} %{field3} %{field4}'}}}

  • message: {'message': {'key1': {'key2': 'This is the message'}}}

  • processed: {'message': {'key1': {'key2': 'This is the message'}}, 'field1': 'This', 'field2': 'is', 'field3': 'the', 'field4': 'message'}

processes multiple mappings to different target fields:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'source1': '%{extracted.source1.key1} %{extracted.source1.key2} %{extracted.source1.key3}', 'source2': '%{extracted.source2.key1} %{extracted.source2.key2} %{extracted.source2.key3}'}}}

  • message: {'message': 'This message does not matter', 'source1': 'This is source1', 'source2': 'This is source2'}

  • processed: {'message': 'This message does not matter', 'source1': 'This is source1', 'source2': 'This is source2', 'extracted': {'source1': {'key1': 'This', 'key2': 'is', 'key3': 'source1'}, 'source2': {'key1': 'This', 'key2': 'is', 'key3': 'source2'}}}

processes multiple mappings to same target fields (overwrite):

  • rule: {'filter': 'message', 'dissector': {'mapping': {'source1': '%{extracted.key1} %{extracted.key2} %{extracted.key3}', 'source2': '%{extracted.key1} %{extracted.key2} %{extracted.key3}'}}}

  • message: {'message': 'This message does not matter', 'source1': 'This is source1', 'source2': 'This is source2'}

  • processed: {'message': 'This message does not matter', 'source1': 'This is source1', 'source2': 'This is source2', 'extracted': {'key1': 'This', 'key2': 'is', 'key3': 'source2'}}

processes multiple mappings to same target fields (appending):

  • rule: {'filter': 'message', 'dissector': {'mapping': {'source1': '%{+extracted.key1} %{+extracted.key2} %{+extracted.key3}', 'source2': '%{+( )extracted.key1} %{+( )extracted.key2} %{+( )extracted.key3}'}}}

  • message: {'message': 'This message does not matter', 'source1': 'This is source1', 'source2': 'This is source2'}

  • processed: {'message': 'This message does not matter', 'source1': 'This is source1', 'source2': 'This is source2', 'extracted': {'key1': 'This This', 'key2': 'is is', 'key3': 'source1 source2'}}

append to new field in different order as string:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{+( )extracted/4} %{+( )extracted/3} %{+( )extracted/2} %{+extracted/1}'}}}

  • message: {'message': 'This is the message'}

  • processed: {'message': 'This is the message', 'extracted': 'message the is This'}

append to existing field in different order as string:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{+( )extracted/4} %{+( )extracted/3} %{+( )extracted/2} %{+( )extracted/1}'}}}

  • message: {'message': 'This is the message', 'extracted': 'preexisting'}

  • processed: {'message': 'This is the message', 'extracted': 'preexisting message the is This'}

append to existing empty list field in different order as list:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{+extracted/4} %{+extracted/3} %{+extracted/2} %{+extracted/1}'}}}

  • message: {'message': 'This is the message', 'extracted': []}

  • processed: {'message': 'This is the message', 'extracted': ['message', 'the', 'is', 'This']}

append to existing prefilled field in different order as list:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{+extracted/4} %{+extracted/3} %{+extracted/2} %{+extracted/1}'}}}

  • message: {'message': 'This is the message', 'extracted': ['preexisting']}

  • processed: {'message': 'This is the message', 'extracted': ['preexisting', 'message', 'the', 'is', 'This']}

append to new field in specified order as string with multiple fields:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{}: %{+( )extracted/2}', 'message2': '%{}: %{+extracted/1}'}}}

  • message: {'message': 'The first message: first', 'message2': 'The second message: second'}

  • processed: {'message': 'The first message: first', 'message2': 'The second message: second', 'extracted': 'second first'}

converts datatype without mapping:

  • rule: {'filter': 'message', 'dissector': {'convert_datatype': {'message': 'int'}}}

  • message: {'message': '42'}

  • processed: {'message': 42}

converts datatype with mapping in dotted field notation:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{}of %{extracted.message_float} and a int of %{extracted.message_int}'}, 'convert_datatype': {'extracted.message_int': 'int', 'extracted.message_float': 'float'}}}

  • message: {'message': 'This message has a float of 1.23 and a int of 1337'}

  • processed: {'message': 'This message has a float of 1.23 and a int of 1337', 'extracted': {'message_float': 1.23, 'message_int': 1337}}

indirect field notation: uses captured field as key:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{?key} %{&key}'}}}

  • message: {'message': 'This is the message'}

  • processed: {'message': 'This is the message', 'This': 'is the message'}

indirect field notation: uses captured field as key and appends to it:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{?key} %{&key} %{} %{+( )&key}'}}}

  • message: {'message': 'This is the message'}

  • processed: {'message': 'This is the message', 'This': 'is message'}

handles special chars as captured content:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} %{field2} %{field3} %{+field4}'}}}

  • message: {'message': 'This is \a + mess}age'}

  • processed: {'message': 'This is \a + mess}age', 'field1': 'This', 'field2': 'is', 'field3': '\a', 'field4': '+ mess}age'}

handles special chars in captured content and target field names:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{~field1} %{fie ld2} %{+field4}'}}}

  • message: {'message': '&This isx02 mess}age /1'}

  • processed: {'message': '&This isx02 mess}age /1', '~field1': '&This', 'fie ld2': 'isx02', 'field4': 'mess}age /1'}

deletes source fields:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} %{field2} %{field3} %{field4}', 'message2': '%{field21} %{field22} %{field23} %{field24}'}, 'delete_source_fields': True}}

  • message: {'message': 'This is a message', 'message2': 'This is a message'}

  • processed: {'field1': 'This', 'field2': 'is', 'field3': 'a', 'field4': 'message', 'field21': 'This', 'field22': 'is', 'field23': 'a', 'field24': 'message'}

parses path elements:

  • rule: {'filter': 'path', 'dissector': {'mapping': {'path': '/%{field1}/%{field2}/%{field3}/%{field4}'}}}

  • message: {'path': '/this/is/the/path'}

  • processed: {'path': '/this/is/the/path', 'field1': 'this', 'field2': 'is', 'field3': 'the', 'field4': 'path'}

Appending without separator:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': 'INFO#%{date}#%{+date}#MOREINFO%{}'}}}

  • message: {'message': 'INFO#2022 12 06 15:12:30:534#+0100#MOREINFO'}

  • processed: {'message': 'INFO#2022 12 06 15:12:30:534#+0100#MOREINFO', 'date': '2022 12 06 15:12:30:534+0100'}

Appending with special field separator:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': 'INFO#%{+(\()date}#%{+(\))date}#MOREINFO%{}'}}}

  • message: {'message': 'INFO#2022 12 06 15:12:30:534#+0100#MOREINFO'}

  • processed: {'message': 'INFO#2022 12 06 15:12:30:534#+0100#MOREINFO', 'date': '(2022 12 06 15:12:30:534)+0100'}

Dissection with delimiter ending:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': 'this is %{target}.'}}}

  • message: {'message': 'this is the message.'}

  • processed: {'message': 'this is the message.', 'target': 'the message'}

Convert datatype via dissect pattern:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': 'this is %{field1|int} message and this is %{field2|bool}'}}}

  • message: {'message': 'this is 42 message and this is 0'}

  • processed: {'message': 'this is 42 message and this is 0', 'field1': 42, 'field2': False}

Strip char after dissecting:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '[%{time-( )}] - %{ip}'}}}

  • message: {'message': '[2022-11-04 10:00:00 AM     ] - 127.0.0.1'}

  • processed: {'message': '[2022-11-04 10:00:00 AM     ] - 127.0.0.1', 'time': '2022-11-04 10:00:00 AM', 'ip': '127.0.0.1'}

Strip special char after dissecting:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '[%{time-(#)}] - %{ip}'}}}

  • message: {'message': '[2022-11-04 10:00:00 AM####] - 127.0.0.1'}

  • processed: {'message': '[2022-11-04 10:00:00 AM####] - 127.0.0.1', 'time': '2022-11-04 10:00:00 AM', 'ip': '127.0.0.1'}

Strip another special char after dissecting:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '[%{time-(?)}] - %{ip}'}}}

  • message: {'message': '[2022-11-04 10:00:00 AM?????] - 127.0.0.1'}

  • processed: {'message': '[2022-11-04 10:00:00 AM?????] - 127.0.0.1', 'time': '2022-11-04 10:00:00 AM', 'ip': '127.0.0.1'}

Strip char on both sides:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '[%{time-(*)}] - %{ip}'}}}

  • message: {'message': '[***2022-11-04 10:00:00 AM***] - 127.0.0.1'}

  • processed: {'message': '[***2022-11-04 10:00:00 AM***] - 127.0.0.1', 'time': '2022-11-04 10:00:00 AM', 'ip': '127.0.0.1'}

Strip char while appending:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '[%{time} %{+( )time} %{+( )time-(*)}] - %{ip}'}}}

  • message: {'message': '[2022-11-04 10:00:00 AM***] - 127.0.0.1'}

  • processed: {'message': '[2022-11-04 10:00:00 AM***] - 127.0.0.1', 'time': '2022-11-04 10:00:00 AM', 'ip': '127.0.0.1'}

Strip char while changing position:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '[%{time/1} %{+( )time/3} %{+( )time-(*)/2}] - %{ip}'}}}

  • message: {'message': '[2022-11-04 10:00:00 AM***] - 127.0.0.1'}

  • processed: {'message': '[2022-11-04 10:00:00 AM***] - 127.0.0.1', 'time': '2022-11-04 AM 10:00:00', 'ip': '127.0.0.1'}

Strip char in indirect field notation:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{?key} %{&key-(#)} %{} %{+( )&key-(#)}'}}}

  • message: {'message': 'This is## the message####'}

  • processed: {'message': 'This is## the message####', 'This': 'is message'}

Strip char while inferring datatype:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': 'this is %{field1-(#)|int} message and this is %{field2-(#)|bool}'}}}

  • message: {'message': 'this is 42#### message and this is 0##'}

  • processed: {'message': 'this is 42#### message and this is 0##', 'field1': 42, 'field2': False}

extract end of string:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': 'system_%{type}'}}}

  • message: {'message': 'system_monitor'}

  • processed: {'message': 'system_monitor', 'type': 'monitor'}

copy field - dissect without separator:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{sys_type}'}}}

  • message: {'message': 'system_monitor'}

  • processed: {'message': 'system_monitor', 'sys_type': 'system_monitor'}

ignore missing fields:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': '%{sys_type}', 'does_not_exist': '%{sys_type}'}, 'ignore_missing_fields': True}}

  • message: {'message': 'system_monitor'}

  • processed: {'message': 'system_monitor', 'sys_type': 'system_monitor'}

handle curly braces in message simple case:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': 'proxy{addr=%{destination.address}}'}}}

  • message: {'message': 'proxy{addr=10.99.172.10:4191}'}

  • processed: {'destination': {'address': '10.99.172.10:4191'}, 'message': 'proxy{addr=10.99.172.10:4191}'}

handle curly braces in message full case:

  • rule: {'filter': 'message', 'dissector': {'mapping': {'message': 'proxy{addr=%{destination.address}}:service{ns=linkerd-multicluster name=%{destination.domain} port=4191}:endpoint{addr=%{source.address}}: %{log.logger}: %{message}'}}}

  • message: {'message': 'proxy{addr=10.99.172.10:4191}:service{ns=linkerd-multicluster name=probe-gateway-bbb port=4191}:endpoint{addr=192.8.177.98:4191}: linkerd_reconnect: Failed to connect error=connect timed out after 1s'}

  • processed: {'destination': {'address': '10.99.172.10:4191', 'domain': 'probe-gateway-bbb'}, 'log': {'logger': 'linkerd_reconnect'}, 'message': 'Failed to connect error=connect timed out after 1s', 'source': {'address': '192.8.177.98:4191'}}

DomainLabelExtractor

The domain_label_extractor is a processor that splits a domain into it’s corresponding labels like registered_domain, top_level_domain and subdomain. If instead an IP is given in the target field an informational tag is added to the configured tags field. If neither a domain nor an ip address can be recognized an invalid error tag will be added to the tag field in the event. The added tags contain each the target field name that was checked by the configured rule, such that it is possible to distinguish between different domain fields in one event. For example for the target field url.domain following tags could be added: invalid_domain_in_url_domain and ip_in_url_domain

Processor Configuration

1- domainlabelextractorname:
2    type: domain_label_extractor
3    rules:
4        - tests/testdata/rules/rules
5    tagging_field_name: resolved
class logprep.processor.domain_label_extractor.processor.DomainLabelExtractor.Config

DomainLabelExtractor config

tagging_field_name: str

Optional configuration field that defines into which field in the event the informational tags should be written to. If this field is not present it defaults to tags.

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

The domain label extractor requires the additional field domain_label_extractor. The mandatory keys under domain_label_extractor are source_fields and target_field. Former is used to identify the field (declared as list with one element) which contains the domain. And the latter is used to define the parent field where theresults should be written to. Both fields can be dotted subfields. The sub fields of the parent output field of the result are: registered_domain, top_level_domain and subdomain.

In the following example the domain www.sub.domain.de will be split into it’s subdomain www.sub, it’s registered domain domain and lastly it’s TLD de:

Example Rule to extract the labels / parts of a domain.
1filter: 'url'
2domain_label_extractor:
3  source_fields: ['url.domain']
4  target_field: 'url'
5description: '...'

The example rule applied to the input event

Input Event
{
    "url": {
        "domain": "www.sub.domain.de"
    }
}

will result in the following output

Output Event
{
    "url": {
        "domain": "www.sub.domain.de",
        "registered_domain": "domain.de",
        "top_level_domain": "de",
        "subdomain": "www.sub"
    }
}
class logprep.processor.domain_label_extractor.rule.DomainLabelExtractorRule.Config

Config for DomainLabelExtractorRule

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The fields from where to get the values which should be processed.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

tests: List[Dict[str, str]]

Custom tests for this rule.

DomainResolver

The domain_resolver is a processor that can resolve domains inside a defined field.

Processor Configuration

 1- domainresolvername:
 2    type: domain_resolver
 3    rules:
 4        - tests/testdata/rules/rules
 5    timeout: 0.5
 6    max_cached_domains: 20000
 7    max_caching_days: 1
 8    hash_salt: secure_salt
 9    cache_enabled: true
10    debug_cache: false
class logprep.processor.domain_resolver.processor.DomainResolver.Config

DomainResolver config

timeout: float

Timeout for resolving of domains.

Security Best Practice - Processor - Domain Resolver Timeout

Ensure to set this to a reasonable value to avoid DOS attacks by malicious domains in your logs. The default is set to 0.5 seconds.

max_cached_domains: int

The maximum number of cached domains. One cache entry requires ~250 Byte, thus 10 million elements would require about 2.3 GB RAM. The cache is not persisted. Restarting Logprep does therefore clear the cache.

Security Best Practice - Processor - Domain Resolver Max Cached Domains

Ensure to set this to a reasonable value to avoid excessive memory usage and OOM situations by the domain resolver cache.

max_caching_days: int

Number of days a domains is cached after the last time it appeared. This caching reduces the CPU load of Logprep (no demanding encryption must be performed repeatedly) and the load on subsequent components (i.e. Logstash or Opensearch). Setting the caching days to Null deactivates the caching. In case the cache size has been exceeded (see domain_resolver.max_cached_domains),the oldest cached resolved domains will be discarded first.Thus, it is possible that a domain is re-added to the cache before max_caching_days has elapsed if it was discarded due to the size limit.

hash_salt: str

A salt that is used for hashing.

cache_enabled: bool

If enabled activates a cache such that already seen domains do not need to be resolved again.

debug_cache: bool

If enabled adds debug information to the current event, for example if the event was retrieved from the cache or newly resolved, as well as the cache size.

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

The domain resolver requires the additional field domain_resolver. The additional field domain_resolver.source_fields must be defined as list with one element. It contains the field from which an URL should be parsed and then written to resolved_ip. The URL can be located in continuous text insofar the URL is valid.

Optionally, the output field can be configured (overriding the default resolved_ip) using the parameter target_field. This can be a dotted subfield.

In the following example the URL from the field url will be extracted and written to resolved_ip.

Example
1  filter: url
2  domain_resolver:
3    source_fields: [url]
4  description: '...'
class logprep.processor.domain_resolver.rule.DomainResolverRule.Config

RuleConfig for DomainResolver

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The fields from where to get the values which should be processed, requires target_field.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processor output to. Defaults to resovled_ip

tests: List[Dict[str, str]]

Custom tests for this rule.

Dropper

The dropper is a processor that removes fields from log messages. Which fields are deleted is determined within each rule.

Processor Configuration

1- droppername:
2    type: dropper
3    rules:
4        - tests/testdata/rules/rules
class logprep.processor.dropper.processor.Dropper.Config

Common Configurations

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

Which fields are removed is defined in the additional field drop. It contains a list of fields in dot notation. For nested fields all subfields are also removed if they are empty. If only the specified subfield should be removed, then this can be achieved by setting the option drop_full: false.

In the following example the field keep_me.drop_me is deleted while the fields keep_me and keep_me.keep_me_too are kept.

Example - Rule
1filter: keep_me.drop_me
2dropper:
3    drop:
4    - keep_me.drop_me
Example - Input document
1[{
2    "keep_me": {
3        "drop_me": "something",
4        "keep_me_too": "something"
5    }
6}]
Example - Expected output after application of the rule
1[{
2    "keep_me": {
3        "keep_me_too": "something"
4    }
5}]
class logprep.processor.dropper.rule.DropperRule.Config

RuleConfig for DropperRule

drop: list

List of fields to drop

drop_full: bool

Drop recursive? defaults to [True]

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

tests: List[Dict[str, str]]

Custom tests for this rule.

FieldManager

The field_manager processor copies or moves values from multiple source fields to one target field. Additionally, it can be used to merge multiple source field values into one target field value. In this process, source field lists will be merged.

Processor Configuration

1- fieldmanagername:
2    type: field_manager
3    rules:
4        - tests/testdata/rules/rules
class logprep.processor.field_manager.processor.FieldManager.Config

Common Configurations

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

A speaking example:

Given field_manager rule
 1filter: client.ip
 2field_manager:
 3    source_fields:
 4        - client.ip
 5        - destination.ip
 6        - host.ip
 7        - observer.ip
 8        - server.ip
 9        - source.ip
10        - server.nat.ip
11        - client.nat.ip
12    target_field: related.ip
13    merge_with_target: True
14description: '...'
Incoming event
1{
2    "client": {"ip": ["127.0.0.1", "fe89::", "192.168.5.1"], "nat": {"ip": "223.2.3.2"}},
3    "destination": {"ip": "8.8.8.8"},
4    "host": {"ip": ["192.168.5.1", "180.22.66.3"]},
5    "observer": {"ip": "10.10.2.33"},
6    "server": {"ip": "10.10.2.33", "nat": {"ip": "180.22.66.1"}},
7    "source": {"ip": "10.10.2.33"}
8}
Processed event
 1{
 2    "client": {"ip": ["127.0.0.1", "fe89::", "192.168.5.1"], "nat": {"ip": "223.2.3.2"}},
 3    "destination": {"ip": "8.8.8.8"},
 4    "host": {"ip": ["192.168.5.1", "180.22.66.3"]},
 5    "observer": {"ip": "10.10.2.33"},
 6    "server": {"ip": "10.10.2.33", "nat": {"ip": "180.22.66.1"}},
 7    "source": {"ip": "10.10.2.33"},
 8    "related": {
 9        "ip": [
10            "10.10.2.33",
11            "127.0.0.1",
12            "180.22.66.1",
13            "180.22.66.3",
14            "192.168.5.1",
15            "223.2.3.2",
16            "8.8.8.8",
17            "fe89::"
18        ]
19    }
20}
class logprep.processor.field_manager.rule.FieldManagerRule.Config

Config for FieldManagerRule

source_fields: list

The fields from where to get the values which should be processed, requires target_field.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

tests: List[Dict[str, str]]

Custom tests for this rule.

Examples for field_manager:

copies single field to non existing target field:

  • rule: {'filter': 'message', 'field_manager': {'source_fields': ['message'], 'target_field': 'new_field'}}

  • message: {'message': 'This is a message'}

  • processed: {'message': 'This is a message', 'new_field': 'This is a message'}

copies single field to existing target field:

  • rule: {'filter': 'message', 'field_manager': {'source_fields': ['message'], 'target_field': 'new_field', 'overwrite_target': True}}

  • message: {'message': 'This is a message', 'new_field': 'existing value'}

  • processed: {'message': 'This is a message', 'new_field': 'This is a message'}

moves single field to non existing target field:

  • rule: {'filter': 'message', 'field_manager': {'source_fields': ['message'], 'target_field': 'new_field', 'delete_source_fields': True}}

  • message: {'message': 'This is a message'}

  • processed: {'new_field': 'This is a message'}

moves single field to existing target field:

  • rule: {'filter': 'message', 'field_manager': {'source_fields': ['message'], 'target_field': 'existing', 'delete_source_fields': True, 'overwrite_target': True}}

  • message: {'message': 'This is a message', 'existing': 'existing'}

  • processed: {'existing': 'This is a message'}

moves single field to existing target field:

  • rule: {'filter': 'message', 'field_manager': {'source_fields': ['message'], 'target_field': 'new_field', 'delete_source_fields': True, 'overwrite_target': True}}

  • message: {'message': 'This is a message', 'new_field': 'existing content'}

  • processed: {'new_field': 'This is a message'}

moves field and writes as list to target field:

  • rule: {'filter': 'message', 'field_manager': {'source_fields': ['message'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True}}

  • message: {'message': 'This is a message'}

  • processed: {'new_field': ['This is a message']}

moves multiple fields and writes them as list to non existing target field:

  • rule: {'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True}}

  • message: {'field1': 'value1', 'field2': 'value2', 'field3': 'value3'}

  • processed: {'new_field': ['value1', 'value2', 'value3']}

moves multiple fields and writes them as list to existing target field:

  • rule: {'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True, 'overwrite_target': True}}

  • message: {'field1': 'value1', 'field2': 'value2', 'field3': 'value3', 'new_field': 'i exist'}

  • processed: {'new_field': ['value1', 'value2', 'value3']}

moves multiple fields and replaces existing target field with list including the existing value:

  • rule: {'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True}}

  • message: {'field1': 'value1', 'field2': 'value2', 'field3': 'value3', 'new_field': 'i exist'}

  • processed: {'new_field': ['i exist', 'value1', 'value2', 'value3']}

moves multiple fields and writes them to a existing list:

  • rule: {'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True}}

  • message: {'field1': 'value1', 'field2': 'value2', 'field3': 'value3', 'new_field': ['i exist']}

  • processed: {'new_field': ['i exist', 'value1', 'value2', 'value3']}

moves multiple fields and writes them to a existing target field as list:

  • rule: {'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True}}

  • message: {'field1': 'value1', 'field2': 'value2', 'field3': 'value3', 'new_field': 'i exist'}

  • processed: {'new_field': ['i exist', 'value1', 'value2', 'value3']}

moves multiple fields and merges to target list:

  • rule: {'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True}}

  • message: {'field1': ['value1', 'value2', 'value3'], 'field2': ['value4'], 'field3': ['value5', 'value6'], 'new_field': ['i exist']}

  • processed: {'new_field': ['i exist', 'value1', 'value2', 'value3', 'value4', 'value5', 'value6']}

moves multiple fields and merges to target list with different source types:

  • rule: {'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True}}

  • message: {'field1': ['value1', 'value2', 'value3'], 'field2': 'value4', 'field3': ['value5', 'value6'], 'new_field': ['i exist']}

  • processed: {'new_field': ['i exist', 'value1', 'value2', 'value3', 'value4', 'value5', 'value6']}

(‘moves multiple fields and merges to target list ‘, ‘with different source types and filters duplicates’):

  • rule: {'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True}}

  • message: {'field1': ['value1', 'value2', 'value3', 'value5'], 'field2': 'value4', 'field3': ['value5', 'value6', 'value4'], 'new_field': ['i exist']}

  • processed: {'new_field': ['i exist', 'value1', 'value2', 'value3', 'value5', 'value4', 'value6']}

(‘moves multiple fields and merges to target list ‘, ‘with different source types and filters duplicates and overwrites target’):

  • rule: {'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True, 'overwrite_target': True}}

  • message: {'field1': ['value1', 'value2', 'value3', 'value5'], 'field2': 'value4', 'field3': ['value5', 'value6', 'value4'], 'new_field': ['i exist']}

  • processed: {'new_field': ['value1', 'value2', 'value3', 'value5', 'value4', 'value6']}

real world example from documentation:

  • rule: {'filter': 'client.ip', 'field_manager': {'source_fields': ['client.ip', 'destination.ip', 'host.ip', 'observer.ip', 'server.ip', 'source.ip', 'server.nat.ip', 'client.nat.ip'], 'target_field': 'related.ip', 'merge_with_target': True}}

  • message: {'client': {'ip': ['127.0.0.1', 'fe89::', '192.168.5.1'], 'nat': {'ip': '223.2.3.2'}}, 'destination': {'ip': '8.8.8.8'}, 'host': {'ip': ['192.168.5.1', '180.22.66.3']}, 'observer': {'ip': '10.10.2.33'}, 'server': {'ip': '10.10.2.33', 'nat': {'ip': '180.22.66.1'}}, 'source': {'ip': '10.10.2.33'}}

  • processed: {'client': {'ip': ['127.0.0.1', 'fe89::', '192.168.5.1'], 'nat': {'ip': '223.2.3.2'}}, 'destination': {'ip': '8.8.8.8'}, 'host': {'ip': ['192.168.5.1', '180.22.66.3']}, 'observer': {'ip': '10.10.2.33'}, 'server': {'ip': '10.10.2.33', 'nat': {'ip': '180.22.66.1'}}, 'source': {'ip': '10.10.2.33'}, 'related': {'ip': ['127.0.0.1', 'fe89::', '192.168.5.1', '8.8.8.8', '180.22.66.3', '10.10.2.33', '180.22.66.1', '223.2.3.2']}}

copies multiple fields to multiple target fields:

  • rule: {'filter': 'field', 'field_manager': {'mapping': {'field.one': 'one', 'field.two': 'two', 'field.three': 'three'}}}

  • message: {'field': {'one': 1, 'two': 2, 'three': 3}}

  • processed: {'field': {'one': 1, 'two': 2, 'three': 3}, 'one': 1, 'two': 2, 'three': 3}

copies multiple fields to multiple target fields, while overwriting existing fields:

  • rule: {'filter': 'field', 'field_manager': {'mapping': {'field.one': 'one', 'field.two': 'two', 'field.three': 'three'}, 'overwrite_target': True}}

  • message: {'field': {'one': 1, 'two': 2, 'three': 3}, 'three': 'exists already'}

  • processed: {'field': {'one': 1, 'two': 2, 'three': 3}, 'one': 1, 'two': 2, 'three': 3}

copies multiple fields to multiple target fields, while one list will be extended:

  • rule: {'filter': 'field', 'field_manager': {'mapping': {'field.one': 'one', 'field.two': 'two', 'field.three': 'three'}, 'merge_with_target': True}}

  • message: {'field': {'one': 1, 'two': 2, 'three': 3}, 'three': ['exists already']}

  • processed: {'field': {'one': 1, 'two': 2, 'three': 3}, 'one': 1, 'two': 2, 'three': ['exists already', 3]}

copies multiple fields to multiple target fields, while one list will be extended with existing list:

  • rule: {'filter': 'field', 'field_manager': {'mapping': {'field.one': 'one', 'field.two': 'two', 'field.three': 'three'}, 'merge_with_target': True}}

  • message: {'field': {'one': 1, 'two': 2, 'three': [3, 3]}, 'three': ['exists already']}

  • processed: {'field': {'one': 1, 'two': 2, 'three': [3, 3]}, 'one': 1, 'two': 2, 'three': ['exists already', 3, 3]}

copies multiple fields to multiple target fields, while one target list will be overwritten with existing list:

  • rule: {'filter': 'field', 'field_manager': {'mapping': {'field.one': 'one', 'field.two': 'two', 'field.three': 'three'}, 'overwrite_target': True}}

  • message: {'field': {'one': 1, 'two': 2, 'three': [3, 3]}, 'three': ['exists already']}

  • processed: {'field': {'one': 1, 'two': 2, 'three': [3, 3]}, 'one': 1, 'two': 2, 'three': [3, 3]}

copies multiple fields to multiple target fields, while one source field is missing:

  • rule: {'filter': 'field', 'field_manager': {'mapping': {'field.one': 'one', 'field.two': 'two', 'field.three': 'three'}}}

  • message: {'field': {'one': 1, 'three': 3}}

  • processed: {'field': {'one': 1, 'three': 3}, 'one': 1, 'three': 3, 'tags': ['_field_manager_missing_field_warning']}

moves multiple fields to multiple target fields:

  • rule: {'filter': 'field', 'field_manager': {'mapping': {'field.one': 'one', 'field.two': 'two', 'field.three': 'three'}, 'delete_source_fields': True}}

  • message: {'field': {'one': 1, 'two': 2, 'three': 3}}

  • processed: {'one': 1, 'two': 2, 'three': 3}

Combine fields to list and copy fields at the same time:

  • rule: {'filter': 'field', 'field_manager': {'source_fields': ['source.one', 'source.two'], 'target_field': 'merged', 'mapping': {'field.one': 'one', 'field.two': 'two', 'field.three': 'three'}, 'merge_with_target': True}}

  • message: {'field': {'one': 1, 'two': 2, 'three': 3}, 'source': {'one': ['a'], 'two': ['b']}}

  • processed: {'field': {'one': 1, 'two': 2, 'three': 3}, 'source': {'one': ['a'], 'two': ['b']}, 'one': 1, 'two': 2, 'three': 3, 'merged': ['a', 'b']}

Ignore missing fields: No warning and no failure tag if source field is missing:

  • rule: {'filter': 'field.a', 'field_manager': {'mapping': {'field.a': 'target_field', 'does.not.exists': 'target_field'}, 'ignore_missing_fields': True}}

  • message: {'field': {'a': 'first', 'b': 'second'}}

  • processed: {'field': {'a': 'first', 'b': 'second'}, 'target_field': 'first'}

merge_with_target preserves list ordering:

  • rule: {'filter': '(foo) OR (test)', 'field_manager': {'id': '5cfa7a26-94af-49de-bc82-460c42e9dc56', 'source_fields': ['foo', 'test'], 'target_field': 'existing_list', 'delete_source_fields': False, 'overwrite_target': False, 'merge_with_target': True}}

  • message: {'existing_list': ['hello', 'world'], 'foo': 'bar', 'test': 'value'}

  • processed: {'existing_list': ['hello', 'world', 'bar', 'value'], 'foo': 'bar', 'test': 'value'}

Convert existing target to list:

  • rule: {'filter': 'message', 'field_manager': {'source_fields': ['message'], 'target_field': 'new_field', 'merge_with_target': True}}

  • message: {'message': 'Value B', 'new_field': 'Value A'}

  • processed: {'message': 'Value B', 'new_field': ['Value A', 'Value B']}

Convert existing target to list with multiple source fields:

  • rule: {'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True}}

  • message: {'field1': 'Value B', 'field2': 'Value C', 'field3': 'Value D', 'new_field': 'Value A'}

  • processed: {'field1': 'Value B', 'field2': 'Value C', 'field3': 'Value D', 'new_field': ['Value A', 'Value B', 'Value C', 'Value D']}

Merge source dict into existing target dict:

  • rule: {'filter': 'source', 'field_manager': {'source_fields': ['source'], 'target_field': 'target', 'merge_with_target': True}}

  • message: {'source': {'source1': 'value'}, 'target': {'target1': 'value'}}

  • processed: {'source': {'source1': 'value'}, 'target': {'source1': 'value', 'target1': 'value'}}

Merge multiple source dicts into existing target dict:

  • rule: {'filter': 'source1', 'field_manager': {'source_fields': ['source1', 'source2', 'source3'], 'target_field': 'target', 'delete_source_fields': True, 'merge_with_target': True}}

  • message: {'source1': {'source1': 'value'}, 'source2': {'source2': 'value'}, 'source3': {'source-nested': {'foo': 'bar'}}, 'target': {'target1': 'value'}}

  • processed: {'target': {'source1': 'value', 'source2': 'value', 'source-nested': {'foo': 'bar'}, 'target1': 'value'}}

overlapping source with target single processing:

  • rule: {'filter': 'host', 'field_manager': {'source_fields': ['host'], 'target_field': 'host.name', 'overwrite_target': True}}

  • message: {'host': 'example.com'}

  • processed: {'host': {'name': 'example.com'}}

overlapping source with target mapping processing:

  • rule: {'filter': 'host', 'field_manager': {'mapping': {'host': 'host.name'}, 'overwrite_target': True}}

  • message: {'host': 'example.com'}

  • processed: {'host': {'name': 'example.com'}}

GenericAdder

The generic_adder is a processor that adds new fields and values to documents based on a list. The list resides inside a rule and/or inside a file.

Processor Configuration

1- genericaddername:
2    type: generic_adder
3    rules:
4        - tests/testdata/rules/rules
class logprep.processor.generic_adder.processor.GenericAdder.Config

Common Configurations

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

The generic adder requires the additional field generic_adder. The field generic_adder.add can be defined. It contains a dictionary of field names and values that should be added. If dot notation is being used, then all fields on the path are being automatically created.

In the following example, the field some.added.field with the value some added value is being added.

Example with add
1filter: add_generic_test
2generic_adder:
3  add:
4    some.added.field: some added value
5description: '...'

Alternatively, the additional field generic_adder.add_from_file can be added. It contains the path or url to a file with a YML file that contains a dictionary of field names and values that should be added to the document. Instead of a path, a list of paths can be used to add multiple files. All of those files must exist. If a list is used, it is possible to tell the generic adder to only use the first existing file by setting generic_adder.only_first_existing_file: true. In that case, only one file must exist. Additions from generic_adder.add and generic_adder.add_from_file are combined.

In the following example a dictionary with field names and values is loaded from the file at PATH_TO_FILE_WITH_LIST. This dictionary is used like the one that can be defined via generic_adder.add.

Example with add_from_file
1filter: 'add_generic_test'
2generic_adder:
3  add_from_file: PATH_TO_FILE_WITH_LIST
4description: '...'

In the following example two files are being used.

Example with multiple files
1filter: 'add_generic_test'
2generic_adder:
3  add_from_file:
4    - PATH_TO_FILE_WITH_LIST
5    - ANOTHER_PATH_TO_FILE_WITH_LIST
6description: '...'

In the following example two files are being used, but only the first existing file is being loaded.

Example with multiple files and one loaded file
1filter: 'add_generic_test'
2generic_adder:
3  only_first_existing_file: true
4  add_from_file:
5    - PATH_TO_FILE_THAT_DOES_NOT_EXIST
6    - PATH_TO_FILE_WITH_LIST
7description: '...'
class logprep.processor.generic_adder.rule.GenericAdderRule.Config

Config for GenericAdderRule

add: dict

Contains a dictionary of field names and values that should be added. If dot notation is being used, then all fields on the path are being automatically created.

add_from_file: list

Contains the path or url to YML file that contains a dictionary of field names and values that should be added to the document. Instead of a path, a list of paths can be used to add multiple files. All of those files must exist. For string format see Getters

Security Best Practice - Processor - Generic Adder Add From File Memory Consumption

Be aware that all values of the remote file were loaded into memory. Consider to avoid dynamic increasing lists without setting limits for Memory consumption. Additionally avoid loading large files all at once to avoid exceeding http body limits.

Security Best Practice - Processor - Generic Adder Authenticity and Integrity

Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.

only_first_existing_file: bool

If a list is used, it is possible to tell the generic adder to only use the first existing file by setting generic_adder.only_first_existing_file: true. In that case, only one file must exist.

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The fields from where to get the values which should be processed, requires target_field.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

tests: List[Dict[str, str]]

Custom tests for this rule.

GenericResolver

The generic_resolver resolves log event values using regex lists.

Processor Configuration

1- genericresolvername:
2    type: generic_resolver
3    rules:
4        - tests/testdata/rules/rules
class logprep.processor.generic_resolver.processor.GenericResolver.Config

GenericResolver config

max_cache_entries: int

(Optional) Size of cache for results when resolving from a list. The cache can be disabled by setting this option to 0.

Security Best Practice - Processor - Generic Resolver Max Cached Entries

Ensure to set this to a reasonable value to avoid excessive memory usage and OOM situations caused by the generic resolver cache.

cache_metrics_interval: int

(Optional) Cache metrics won’t be updated immediately. Instead updating is skipped for a number of events before it’s next update. cache_metrics_interval sets the number of events between updates (default: 1).

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

The generic resolver requires the additional field generic_resolver. Configurable fields are being checked by regex patterns and a configurable value will be added if a pattern matches. The parameters within generic_resolver must be of the form field_mapping: {SOURCE_FIELD: DESTINATION_FIELD}, resolve_list: {REGEX_PATTERN_0: ADDED_VALUE_0, ..., REGEX_PATTERN_N: ADDED_VALUE_N}. SOURCE_FIELD will be checked by the regex patterns REGEX_PATTERN_[0-N] and a new field DESTINATION_FIELD with the value ADDED_VALUE_[0-N] will be added if there is a match. Adding the option "merge_with_target": True makes the generic resolver write resolved values into a list so that multiple different values can be written into the same field.

In the following example to_resolve will be checked by the regex pattern .*Hello.*. "resolved": "Greeting" will be added to the event if the pattern matches the value in to_resolve.

Example
1filter: to_resolve
2generic_resolver:
3  field_mapping:
4    to_resolve: resolved
5  resolve_list:
6    .*Hello.*: Greeting

For YAML compliance, it is possible to declare the resolve list as follows to maintain ordering when using the configuration file with different programs. Both styles will be supported in future; however, this one is recommended for clarity and YAML compliance.

Example
1filter: to_resolve
2generic_resolver:
3  field_mapping:
4    to_resolve: resolved
5  resolve_list:
6    - .*Hello.*: Greeting
7    - .*error.*: Error
8    - never_match: Panic

Alternatively, a YML file with a resolve list and a regex pattern can be used to resolve values. For this, a field resolve_from_file with the subfields path and pattern must be added. The resolve list in the file at path is then used in conjunction with the regex pattern in pattern. pattern must be a regex pattern with a capture group that is named mapping. The resolver will check for the pattern and get value captured by the mapping group. This captured value is then used in the list from the file.

ignore_case can be set to ignore the case when matching values that will be resolved. It is disabled by default. In the following example to_resolve: heLLo would be resolved, since ignore_case is set to true.

Example
1filter: to_resolve
2generic_resolver:
3  field_mapping:
4    to_resolve: resolved
5  resolve_list:
6    .*Hello.*: Greeting
7  ignore_case: true

It is furthermore possible to resolve into dictionaries. In the following example {"to_resolve": "Hello!"} would be resolved to {"resolved": {"Greeting": "Hello"}}.

Example
1filter: to_resolve
2generic_resolver:
3  field_mapping:
4    to_resolve: resolved
5  resolve_list:
6    .*Hello.*: {"Greeting": "Hello"}

Resolved dictionaries can be merged into existing dictionaries. In the following example {"to": {"resolve": "Hello!"}} would be resolved to {"to": {"Greeting": "Hello", "resolve": "Hello!"}}.

Example
1filter: to_resolve
2generic_resolver:
3  field_mapping:
4    to.resolve: to
5  resolve_list:
6    .*Hello.*: {"Greeting": "Hello"}

In the following example to_resolve will be checked by the regex pattern \d*(?P<mapping>[a-z]+)\d* and the list in path/to/resolve_mapping.yml will be used to add new fields. "resolved": "resolved foo" will be added to the event if the value in to_resolve begins with number, ends with numbers and contains foo. Furthermore, "resolved": "resolved bar" will be added to the event if the value in to_resolve begins with number, ends with numbers and contains bar.

Example resolving with list from file
1filter: to_resolve
2generic_resolver:
3  field_mapping:
4    to_resolve: resolved
5  resolve_from_file:
6    path: path/to/resolve_mapping.yml
7    pattern: \d*(?P<mapping>[a-z]+)\d*
Example file with resolve list
1foo: resolved foo
2bar: resolved bar
class logprep.processor.generic_resolver.rule.GenericResolverRule.Config

RuleConfig for GenericResolver

field_mapping: dict[str, str]

Mapping in form of {SOURCE_FIELD: DESTINATION_FIELD}

resolve_list: dict[str, dict[str, FieldValue] | list[FieldValue] | str | int | float | bool | None]

lookup mapping in form of {REGEX_PATTERN_0: ADDED_VALUE_0, ..., REGEX_PATTERN_N: ADDED_VALUE_N}

resolve_from_file: dict[Literal['path', 'pattern'], str]

Mapping with a path key to a YML file (for string format see Getters) with a resolve list and a pattern key with a regex pattern which can be used to resolve values. The resolve list in the file at path is then used in conjunction with the regex pattern in pattern.

Security Best Practice - Processor - Generic Resolver Resolve From File Memory Consumption

Be aware that all values of the remote file were loaded into memory. Consider to avoid dynamic increasing lists without setting limits for Memory consumption. Additionally avoid loading large files all at once to avoid exceeding http body limits.

Security Best Practice - Processor - Generic Resolver Authenticity and Integrity

Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.

ignore_case: bool

(Optional) Ignore case when matching resolve values. Defaults to False.

additions: dict[str, dict[str, FieldValue] | list[FieldValue] | str | int | float | bool | None]

Contains a dictionary of field names and values that should be added.

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The fields from where to get the values which should be processed, requires target_field.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

tests: List[Dict[str, str]]

Custom tests for this rule.

GeoipEnricher

Processor to enrich log messages with geolocalization information

Processor Configuration

1- geoipenrichername:
2    type: geoip_enricher
3    rules:
4        - tests/testdata/geoip_enricher/rules
5    db_path: /path/to/GeoLite2-City.mmdb
class logprep.processor.geoip_enricher.processor.GeoipEnricher.Config

geoip_enricher config

db_path: str
Path to a Geo2Lite city database by Maxmind in binary format.

This must be provided separately. The file will be downloaded or copied and cached. For valid URI formats see Getters This product includes GeoLite2 data created by MaxMind, available from https://www.maxmind.com.

Security Best Practice - Processor - GeoIP Enricher Database Memory Consumption

Be aware that all values of the remote file were loaded into memory. Avoid loading a large database via http to avoid exceeding http body limits.

Security Best Practice - Processor - GeoIP Enricher Authenticity and Integrity

Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded database.

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

The geoip enricher requires the additional field geoip. The default output_field can be overridden using the optional parameter target_field. This can be a dotted subfield. The additional field geoip.source_fields must be given as list with one element. It contains the IP for which the geoip data should be added.

In the following example the IP in client.ip will be enriched with geoip data.

Example
1filter: client.ip
2geoip:
3  source_fields: [client.ip]
4description: '...'
class logprep.processor.geoip_enricher.rule.GeoipEnricherRule.Config

RuleConfig for GeoipEnricher

customize_target_subfields: dict

(Optional) Rewrites the default output subfield locations to custom output subfield locations. Must be in the form of key value mapping pairs (e.g. default_output: custom_output). Following default outputs can be customized:

  • type

  • geometry.type

  • geometry.coordinates

  • properties.accuracy_radius

  • properties.continent

  • properties.continent_code

  • properties.country

  • properties.country_iso_code

  • properties.time_zone

  • properties.city

  • properties.postal_code

  • properties.subdivision

A concrete example would look like this:

1filter: client.ip
2geoip:
3  source_fields: [client.ip]
4  customize_target_subfields:
5    geometry.type: client.geo.type
6    geometry.coordinates: client.geo.coordinates
7description: '...'
delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

Field to get geoip information for.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

Field for the output information. Defaults to geoip.

tests: List[Dict[str, str]]

Custom tests for this rule.

Grokker

The grokker processor dissects a message on a basis of grok patterns. This processor is based of the ideas of the logstash grok filter plugin. (see: https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html)

The default builtin grok patterns shipped with logprep are the same than in logstash.

Processor Configuration

1- my_grokker:
2    type: grokker
3    rules:
4        - tests/testdata/rules/rules
5    custom_patterns_dir: "http://the.patterns.us/patterns.zip"
class logprep.processor.grokker.processor.Grokker.Config

Config of Grokker

custom_patterns_dir: str

(Optional) A directory or URI to load patterns from. All files in all subdirectories will be loaded recursively. If an uri is given, the target file has to be a zip file with a directory structure in it.

Security Best Practice - Processor - Grokker Custom Patterns Directory Memory Consumption

Be aware that all values of the remote zip were loaded into memory. Reserve memory for this and avoid loading large files all at once to avoid exceeding http body limits.

Security Best Practice - Processor - Grokker Authenticity and Integrity

Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

A speaking example:

Given grokker rule
1filter: message
2grokker:
3    mapping:
4        message: "%{TIMESTAMP_ISO8601:@timestamp} %{LOGLEVEL:logLevel} %{GREEDYDATA:logMessage}"
5description: 'an example log message'
Incoming event
1{"message": "2020-07-16T19:20:30.45+01:00 DEBUG This is a sample log"}
Processed event
1{
2    "message": "2020-07-16T19:20:30.45+01:00 DEBUG This is a sample log",
3    "@timestamp": "2020-07-16T19:20:30.45+01:00",
4    "logLevel": "DEBUG",
5    "logMessage": "This is a sample log"
6}
class logprep.processor.grokker.rule.GrokkerRule.Config

Config for GrokkerRule

patterns: dict

(Optional) additional grok patterns as mapping. E.g. CUSTOM_PATTERN: [^s]* if you want to use special target fields, you are able to use them an usual in the mapping sections. Here you only have to declare the matching regex without named groups.

convert_datatype: dict

A mapping from source field and desired datatype [optional]. The datatypes could be float, int, bool, string

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A mapping from source fields to a grok pattern. Dotted field notation is possible in key and in the grok pattern. Additionally logstash field notation is possible in grok pattern. The value can be a list of search patterns or a single search pattern. Lists of search pattern will be checked in the order of the list until the first matching pattern. It is possible to use oniguruma regex pattern with or without grok patterns in the patterns part. When defining an oniguruma there is a limitation of three nested parentheses inside the pattern. Applying more nested parentheses is not possible. Logstashs ecs conform grok patterns are used to resolve the here used grok patterns. When writing patterns it is advised to be careful as the underlying regex can become complex fast. If the execution and the resolving of the pattern takes more than one second a matching timeout will be raised.

Security Best Practice - Processor - Grokker DOS (Denial of Service) via Backreferences

Avoid using backreferences in grok patterns, as they can lead to excessive memory consumption and potential denial of service attacks.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The fields from where to get the values which should be processed, requires target_field.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

tests: List[Dict[str, str]]

Custom tests for this rule.

Examples for grokker:

matches simple grok pattern:

  • rule: {'filter': 'message', 'grokker': {'mapping': {'message': 'this is the %{USER:userfield}'}}}

  • message: {'message': 'this is the MyUser586'}

  • processed: {'message': 'this is the MyUser586', 'userfield': 'MyUser586'}

matches simple grok pattern with dotted field target:

  • rule: {'filter': 'message', 'grokker': {'mapping': {'message': 'this is the %{USER:user.subfield}'}}}

  • message: {'message': 'this is the MyUser586'}

  • processed: {'message': 'this is the MyUser586', 'user': {'subfield': 'MyUser586'}}

matches simple grok pattern with logstash field target:

  • rule: {'filter': 'message', 'grokker': {'mapping': {'message': 'this is the %{USER:[user][subfield]}'}}}

  • message: {'message': 'this is the MyUser586'}

  • processed: {'message': 'this is the MyUser586', 'user': {'subfield': 'MyUser586'}}

matches custom patterns:

  • rule: {'filter': 'message', 'grokker': {'mapping': {'message': 'this is the %{CUSTOM_PATTERN:user.subfield}'}, 'patterns': {'CUSTOM_PATTERN': '[^\s]*'}}}

  • message: {'message': 'this is the MyUser586'}

  • processed: {'message': 'this is the MyUser586', 'user': {'subfield': 'MyUser586'}}

normalize from grok:

  • rule: {'filter': 'winlog.event_id: 123456789', 'grokker': {'mapping': {'winlog.event_data.normalize me!': '%{IP:some_ip} %{NUMBER:port:int}'}}}

  • message: {'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234'}}}

  • processed: {'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234'}}, 'some_ip': '123.123.123.123', 'port': 1234}

grok list match first matching after skipping non matching:

  • rule: {'filter': 'winlog.event_id: 123456789', 'grokker': {'mapping': {'winlog.event_data.normalize me!': ['%{IP:some_ip_1} %{NUMBER:port_1:int} foo', '%{IP:some_ip_2} %{NUMBER:port_2:int} bar']}}}

  • message: {'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234 bar'}}}

  • processed: {'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234 bar'}}, 'some_ip_2': '123.123.123.123', 'port_2': 1234}

grok list match first matching after skipping non matching and does not match twice:

  • rule: {'filter': 'winlog.event_id: 123456789', 'grokker': {'mapping': {'winlog.event_data.normalize me!': ['%{IP:some_ip_1} %{NUMBER:port_1:int} foo', '%{IP:some_ip_2} %{NUMBER:port_2:int} bar', '%{IP:some_ip_3} %{NUMBER:port_3:int} bar']}}}

  • message: {'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234 bar'}}}

  • processed: {'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234 bar'}}, 'some_ip_2': '123.123.123.123', 'port_2': 1234}

grok list match first matching after skipping non matching with same target fields:

  • rule: {'filter': 'winlog.event_id: 123456789', 'grokker': {'mapping': {'winlog.event_data.normalize me!': ['%{IP:some_ip} %{NUMBER:port:int} foo', '%{IP:some_ip} %{NUMBER:port:int} bar']}}}

  • message: {'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234 bar'}}}

  • processed: {'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234 bar'}}, 'some_ip': '123.123.123.123', 'port': 1234}

normalization from nested grok:

  • rule: {'filter': 'winlog.event_id: 123456789', 'grokker': {'mapping': {'winlog.event_data.normalize me!': '%{IP:[parent][some_ip]} \w+ %{NUMBER:[parent][port]:int} %[ts]+ %{NUMBER:test:int}'}}}

  • message: {'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 555 1234 %ttss 11'}}}

  • processed: {'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 555 1234 %ttss 11'}}, 'test': 11, 'parent': {'some_ip': '123.123.123.123', 'port': 1234}}

example log message:

  • rule: {'filter': 'message', 'grokker': {'mapping': {'message': '%{TIMESTAMP_ISO8601:@timestamp} %{LOGLEVEL:logLevel} %{GREEDYDATA:logMessage}'}}}

  • message: {'message': '2020-07-16T19:20:30.45+01:00 DEBUG This is a sample log'}

  • processed: {'message': '2020-07-16T19:20:30.45+01:00 DEBUG This is a sample log', '@timestamp': '2020-07-16T19:20:30.45+01:00', 'logLevel': 'DEBUG', 'logMessage': 'This is a sample log'}

example for ecs conform output:

  • rule: {'filter': 'message', 'grokker': {'mapping': {'message': '%{COMBINEDAPACHELOG}'}}}

  • message: {'message': '127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"'}

  • processed: {'message': '127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"', 'source': {'address': '127.0.0.1'}, 'timestamp': '11/Dec/2013:00:01:45 -0800', 'http': {'request': {'method': 'GET', 'referrer': 'http://cadenza/xampp/navi.php'}, 'version': '1.1', 'response': {'status_code': 200, 'body': {'bytes': 3891}}}, 'url': {'original': '/xampp/status.php'}, 'user_agent': {'original': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0'}}

matches simple oniguruma pattern:

  • rule: {'filter': 'message', 'grokker': {'mapping': {'message': 'this is the (?<userfield>[A-Za-z0-9]+)'}}}

  • message: {'message': 'this is the MyUser586'}

  • processed: {'message': 'this is the MyUser586', 'userfield': 'MyUser586'}

oniguruma with nested parentheses (3 levels supported):

  • rule: {'filter': 'message', 'grokker': {'mapping': {'message': '^(?<timestamp>%{DAY}%{SPACE}%{MONTH}%{SPACE}%{MONTHDAY}%{SPACE}%{TIME}%{SPACE}%{YEAR})%{SPACE}%{GREEDYDATA:[remains]}$', 'remains': '(?<action>(SEND%{SPACE}INFO)%{SPACE}(?<info>BAL)%{GREEDYDATA:rest}'}}}

  • message: {'message': 'Wed Dec 7 13:14:13 2005 SEND INFO BAL/4'}

  • processed: {'message': 'Wed Dec 7 13:14:13 2005 SEND INFO BAL/4', 'timestamp': 'Wed Dec 7 13:14:13 2005', 'action': 'SEND INFO', 'info': 'BAL', 'rest': '/4', 'remains': 'SEND INFO BAL/4'}

two oniguruma with same target names, applies only the last target:

  • rule: {'filter': 'message', 'grokker': {'mapping': {'message': '^(?<action>%{NUMBER})%{SPACE}(?<action>%{NUMBER})%{SPACE}(?<action>%{NUMBER})%{SPACE}(?<action>%{NUMBER})$'}}}

  • message: {'message': '13 37 21 42'}

  • processed: {'message': '13 37 21 42', 'action': '42'}

ignore_missing_fields:

  • rule: {'filter': 'winlog.event_id: 123456789', 'grokker': {'mapping': {'winlog.event_data.normalize me!': '%{IP:some_ip} %{NUMBER:port:int}', 'this_field_does_not_exist': '%{IP:some_ip} %{NUMBER:port:int}'}, 'ignore_missing_fields': True}}

  • message: {'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234'}}}

  • processed: {'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234'}}, 'some_ip': '123.123.123.123', 'port': 1234}

Subfield with common prefix:

  • rule: {'filter': 'message', 'grokker': {'mapping': {'message': 'Facility %{USER:facility.location} %{USER:facility.location_level}'}}}

  • message: {'message': 'Facility spain primary'}

  • processed: {'message': 'Facility spain primary', 'facility': {'location': 'spain', 'location_level': 'primary'}}

IpInformer

The ip_informer processor enriches an event with ip information.

Processor Configuration

1- myipinformer:
2    type: ip_informer
3    rules:
4        - tests/testdata/rules/rules
class logprep.processor.ip_informer.processor.IpInformer.Config

Common Configurations

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

A speaking example:

Given ip_informer rule
1filter: message
2ip_informer:
3    source_fields: ["ip"]
4    target_field: result
5description: '...'
Incoming event
1{"ip": "192.168.5.1"}
Processed event
 1{
 2    "ip": "192.168.5.1",
 3    "result": {
 4        "192.168.5.1": {
 5            "compressed": "192.168.5.1",
 6            "exploded": "192.168.5.1",
 7            "is_global": false,
 8            "is_link_local": false,
 9            "is_loopback": false,
10            "is_multicast": false,
11            "is_private": true,
12            "is_reserved": false,
13            "is_unspecified": false,
14            "max_prefixlen": 32,
15            "reverse_pointer": "1.5.168.192.in-addr.arpa",
16            "version": 4
17        }
18    }
19}
class logprep.processor.ip_informer.rule.IpInformerRule.Config

Config for IPInformer

properties: list

(Optional) configures the properties to extract. Default is to extract all properties. Possible properties are: ['compressed', 'exploded', 'is_global', 'is_link_local', 'is_loopback', 'is_multicast', 'is_private', 'is_reserved', 'is_unspecified', 'max_prefixlen', 'reverse_pointer', 'version', 'compressed', 'exploded', 'ipv4_mapped', 'is_global', 'is_link_local', 'is_loopback', 'is_multicast', 'is_private', 'is_reserved', 'is_site_local', 'is_unspecified', 'max_prefixlen', 'reverse_pointer', 'scope_id', 'sixtofour', 'teredo', 'version'].

Default is to extract all available properties. If you explicitly want to extract a property, which does not exist for an IPAddress (e.g. toredo which is only given for IPv4Addresses), the property will be extracted with the value False.

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The fields from where to get the values which should be processed, requires target_field.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

tests: List[Dict[str, str]]

Custom tests for this rule.

Examples for ip_informer:

single field with ipv4 address:

  • rule: {'filter': 'ip', 'ip_informer': {'source_fields': ['ip'], 'target_field': 'result'}}

  • message: {'ip': '192.168.5.1'}

  • processed: {'ip': '192.168.5.1', 'result': {'192.168.5.1': {'compressed': '192.168.5.1', 'exploded': '192.168.5.1', 'is_global': False, 'is_link_local': False, 'is_loopback': False, 'is_multicast': False, 'is_private': True, 'is_reserved': False, 'is_unspecified': False, 'max_prefixlen': 32, 'reverse_pointer': '1.5.168.192.in-addr.arpa', 'version': 4}}}

single field with ipv6 address:

  • rule: {'filter': 'ip', 'ip_informer': {'source_fields': ['ip'], 'target_field': 'result'}}

  • message: {'ip': 'fe80::2c71:58ff:fe6a:5a08'}

  • processed: {'ip': 'fe80::2c71:58ff:fe6a:5a08', 'result': {'fe80::2c71:58ff:fe6a:5a08': {'compressed': 'fe80::2c71:58ff:fe6a:5a08', 'exploded': 'fe80:0000:0000:0000:2c71:58ff:fe6a:5a08', 'ipv4_mapped': None, 'is_global': False, 'is_link_local': True, 'is_loopback': False, 'is_multicast': False, 'is_private': True, 'is_reserved': False, 'is_site_local': False, 'is_unspecified': False, 'max_prefixlen': 128, 'reverse_pointer': '8.0.a.5.a.6.e.f.f.f.8.5.1.7.c.2.0.0.0.0.0.0.0.0.0.0.0.0.0.8.e.f.ip6.arpa', 'scope_id': None, 'sixtofour': None, 'teredo': None, 'version': 6}}}

list field with ipv4 and ipv6 addresses:

  • rule: {'filter': 'ip', 'ip_informer': {'source_fields': ['ip'], 'target_field': 'result'}}

  • message: {'ip': ['192.168.5.1', 'fe80::2c71:58ff:fe6a:5a08']}

  • processed: {'ip': ['192.168.5.1', 'fe80::2c71:58ff:fe6a:5a08'], 'result': {'192.168.5.1': {'compressed': '192.168.5.1', 'exploded': '192.168.5.1', 'is_global': False, 'is_link_local': False, 'is_loopback': False, 'is_multicast': False, 'is_private': True, 'is_reserved': False, 'is_unspecified': False, 'max_prefixlen': 32, 'reverse_pointer': '1.5.168.192.in-addr.arpa', 'version': 4}, 'fe80::2c71:58ff:fe6a:5a08': {'compressed': 'fe80::2c71:58ff:fe6a:5a08', 'exploded': 'fe80:0000:0000:0000:2c71:58ff:fe6a:5a08', 'ipv4_mapped': None, 'is_global': False, 'is_link_local': True, 'is_loopback': False, 'is_multicast': False, 'is_private': True, 'is_reserved': False, 'is_site_local': False, 'is_unspecified': False, 'max_prefixlen': 128, 'reverse_pointer': '8.0.a.5.a.6.e.f.f.f.8.5.1.7.c.2.0.0.0.0.0.0.0.0.0.0.0.0.0.8.e.f.ip6.arpa', 'scope_id': None, 'sixtofour': None, 'teredo': None, 'version': 6}}}

list and single field with ipv4 and ipv6 addresses:

  • rule: {'filter': 'ip', 'ip_informer': {'source_fields': ['ip', 'single'], 'target_field': 'result'}}

  • message: {'ip': ['192.168.5.1', 'fe80::2c71:58ff:fe6a:5a08'], 'single': '127.0.0.1'}

  • processed: {'ip': ['192.168.5.1', 'fe80::2c71:58ff:fe6a:5a08'], 'single': '127.0.0.1', 'result': {'192.168.5.1': {'compressed': '192.168.5.1', 'exploded': '192.168.5.1', 'is_global': False, 'is_link_local': False, 'is_loopback': False, 'is_multicast': False, 'is_private': True, 'is_reserved': False, 'is_unspecified': False, 'max_prefixlen': 32, 'reverse_pointer': '1.5.168.192.in-addr.arpa', 'version': 4}, 'fe80::2c71:58ff:fe6a:5a08': {'compressed': 'fe80::2c71:58ff:fe6a:5a08', 'exploded': 'fe80:0000:0000:0000:2c71:58ff:fe6a:5a08', 'ipv4_mapped': None, 'is_global': False, 'is_link_local': True, 'is_loopback': False, 'is_multicast': False, 'is_private': True, 'is_reserved': False, 'is_site_local': False, 'is_unspecified': False, 'max_prefixlen': 128, 'reverse_pointer': '8.0.a.5.a.6.e.f.f.f.8.5.1.7.c.2.0.0.0.0.0.0.0.0.0.0.0.0.0.8.e.f.ip6.arpa', 'scope_id': None, 'sixtofour': None, 'teredo': None, 'version': 6}, '127.0.0.1': {'compressed': '127.0.0.1', 'exploded': '127.0.0.1', 'is_global': False, 'is_link_local': False, 'is_loopback': True, 'is_multicast': False, 'is_private': True, 'is_reserved': False, 'is_unspecified': False, 'max_prefixlen': 32, 'reverse_pointer': '1.0.0.127.in-addr.arpa', 'version': 4}}}

single field with ipv4 address and filtered properties:

  • rule: {'filter': 'ip', 'ip_informer': {'source_fields': ['ip'], 'target_field': 'result', 'properties': ['is_loopback']}}

  • message: {'ip': '192.168.5.1'}

  • processed: {'ip': '192.168.5.1', 'result': {'192.168.5.1': {'is_loopback': False}}}

get field value for non existent property:

  • rule: {'filter': 'ip', 'ip_informer': {'source_fields': ['ip'], 'target_field': 'result', 'properties': ['teredo']}}

  • message: {'ip': '192.168.5.1'}

  • processed: {'ip': '192.168.5.1', 'result': {'192.168.5.1': {'teredo': False}}}

ignore missing fields:

  • rule: {'filter': 'ip', 'ip_informer': {'source_fields': ['ip', 'does_not_exist'], 'target_field': 'result', 'properties': ['teredo'], 'ignore_missing_fields': True}}

  • message: {'ip': '192.168.5.1'}

  • processed: {'ip': '192.168.5.1', 'result': {'192.168.5.1': {'teredo': False}}}

KeyChecker

The key_checker processor checks if all field names in a provided list are given in the processed event.

Processor Configuration

1- keycheckername:
2    type: key_checker
3    rules:
4        - tests/testdata/rules/rules
class logprep.processor.key_checker.processor.KeyChecker.Config

Common Configurations

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

The key_checker processor needs a list with at least one element in it. The Rule contains this list and it also contains a custom field where the processor can store all missing keys.

Given key_checker rule
1filter: testkey
2key_checker:
3    source_fields:
4        - key1
5        - key2
6    target_field: "missing_fields"
7description: '...'
Incoming event
1{
2    "testkey": "key1_value",
3    "_index": "value"
4}
Processed event
1{
2    "testkey": "key1_value",
3    "_index": "value",
4    "missing_fields": "key1","key2"
5}
class logprep.processor.key_checker.rule.KeyCheckerRule.Config

key_checker rule config

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: set

List of fields to check for.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to.

tests: List[Dict[str, str]]

Custom tests for this rule.

Labeler

Processor Configuration

1- labelername:
2    type: labeler
3    schema: tests/testdata/labeler_rules/labeling/schema.json
4    include_parent_labels: true
5    rules:
6        - tests/testdata/labeler_rules/rules
class logprep.processor.labeler.processor.Labeler.Config

Labeler Configurations

schema: str

Path to a labeling schema file. For string format see Getters.

Security Best Practice - Processor - Labeler Schema File Memory Consumption

Be aware that all values of the remote file were loaded into memory. Consider to avoid dynamic increasing lists without setting limits for Memory consumption. Additionally avoid loading large files all at once to avoid exceeding http body limits.

Security Best Practice - Processor - Labeler Schema File Authenticity and Integrity

Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.

include_parent_labels: bool | None

If the option is deactivated only labels defined in a rule will be activated. Otherwise, also allowed labels in the path to the root of the corresponding category of a label will be added. This allows to search for higher level labels if this option was activated in the rule.

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

The labeler requires the additional field label. The keys under label define the categories under which a label should be added. The values are a list of labels that should be added under a category.

In the following example, the label execute will be added to the labels of the category action:

Example
1filter: 'command: "executing something"'
2labeler:
3    label:
4        action:
5        - execute
6description: '...'
class logprep.processor.labeler.rule.LabelerRule.Config

RuleConfig for Labeler

label: dict

Mapping of a category and a list of labels to add

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The fields from where to get the values which should be processed, requires target_field.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

tests: List[Dict[str, str]]

Custom tests for this rule.

ListComparison

The list_comparison processor allows to compare values of source fields against lists provided as files.

Processor Configuration

1- listcomparisonname:
2    type: list_comparison
3    rules:
4        - tests/testdata/rules/rules
5    list_search_base_path: /path/to/list/dir
class logprep.processor.list_comparison.processor.ListComparison.Config

ListComparison config

list_search_base_path: str

Relative list paths in rules will be relative to this path if this is set. This parameter is optional. For string format see Getters. You can also pass a template with keys from environment, e.g., ${<your environment variable>}. The special key ${LOGPREP_LIST} will be filled by this processor.

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

The list comparison enricher requires the additional field list_comparison. The mandatory keys under list_comparison are source_fields (as list with one element) and target_field. Former is used to identify the field which is to be checked against the provided lists. And the latter is used to define the parent field where the results should be written to. Both fields can be dotted subfields.

Additionally, a list or array of lists can be provided underneath the required field list_file_paths.

In the following example, the field user_agent will be checked against the provided list (priviliged_users.txt). Assuming that the value non_privileged_user will match the provided list, the result of the list comparison (in_list) will be added to the target field List_comparison.example.

Example Rule to compare a single field against a provided list.
1filter: 'user_agent'
2list_comparison:
3    source_fields: ['user_agent']
4    target_field: 'List_comparison.example'
5    list_file_paths:
6        - lists/privileged_users.txt
7description: '...'

Note

Currently, it is not possible to check in more than one source_field per rule.

class logprep.processor.list_comparison.rule.ListComparisonRule.Config

RuleConfig for ListComparisonRule

list_file_paths: List[str]

List of files. For string format see Getters.

Security Best Practice - Processor - List Comparison list file paths Memory Consumption

Be aware that all values of the remote files were loaded into memory. Consider to avoid dynamic increasing lists without setting limits for Memory consumption. Additionally avoid loading large files all at once to avoid exceeding http body limits.

Security Best Practice - Processor - List Comparison list file paths Authenticity and Integrity

Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.

list_search_base_path: str

Base Path from where to find relative files from list_file_paths. You can also pass a template with keys from environment, e.g., ${<your environment variable>}. The special key ${LOGPREP_LIST} will be filled by this processor.

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The fields from where to get the values which should be processed, requires target_field.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

tests: List[Dict[str, str]]

Custom tests for this rule.

PreDetector

The pre_detector is a processor that creates alerts for matching events. It adds MITRE ATT&CK data to the alerts.

Processor Configuration

1- predetectorname:
2    type: pre_detector
3    rules:
4        - tests/testdata/rules/rules
5    outputs:
6        - kafka: sre_topic
7    alert_ip_list_path: /tmp/ip_list.yml
class logprep.processor.pre_detector.processor.PreDetector.Config

PreDetector config

outputs: tuple[dict[str, str]]

list of output mappings in form of output_name:topic. Only one mapping is allowed per list element

alert_ip_list_path: str | None

Path to a YML file or a list of paths to YML files with dictionaries of IPs. For string format see Getters. It is used by the PreDetector to throw alerts if one of the IPs is found in fields that were defined in a rule.

It uses IPs or networks in the CIDR format as keys and can contain expiration dates in the ISO format as values. If a value is empty, then there is no expiration date for the IP check. If a checked IP is covered by an IP and a network in the dictionary (i.e. IP 127.0.0.1 and network 127.0.0.0/24 when checking 127.0.0.1), then the expiration date of the IP is being used.

Security Best Practice - Processor - PreDetector alert_ip_list_path Memory Consumption

Be aware that all values of the remote file were loaded into memory. Consider to avoid dynamic increasing lists without setting limits for Memory consumption. Additionally avoid loading large files all at once to avoid exceeding http body limits.

Security Best Practice - Processor - PreDetector alert_ip_list_path Authenticity and Integrity

Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

The predetector requires the additional field pre_detector.

The rule fields and a pre_detector_id are written into a custom output of the current output connector. The pre_detector_id will be furthermore added to the triggering event so that an event can be linked with its detection.

The following example shows a complete rule:

Example
 1filter: 'some_field: "very malicious!"'
 2pre_detector:
 3  case_condition: directly
 4  id: RULE_ONE_ID
 5  mitre:
 6  - attack.something1
 7  - attack.something2
 8  severity: critical
 9  title: Rule one
10description: Some malicious event.

Applying this rule to the event

Example Input Event
1{
2  "some_field": "very malicious!",
3}

would result in the following output and event enrichment

Enriched event
1{
2  "some_field": "very malicious!",
3  "pre_detection_id": "80bfea3f-c24e-41d0-b82d-b2f02fc03ba9"
4}
Generated extra output
 1{
 2  "@timestamp": "2023-06-16T08:23:41.000Z",
 3  "id": "RULE_ONE_ID",
 4  "title": "Rule one",
 5  "mitre": ["attack.something1", "attack.something2"],
 6  "case_condition": "directly",
 7  "rule_filter": "(some_field: 'very malicious!')",
 8  "severity": "critical",
 9  "pre_detection_id": "80bfea3f-c24e-41d0-b82d-b2f02fc03ba9",
10  "description": "Some malicious event."
11}

This generated extra output contains a corresponding rule_filter in lucene notation, which can be used to further investigate this rule in an existing OpenSearch.

Additionally, the optional field ip_fields can be specified. It allows to specify a list of fields that can be compared to a list of IPs, which can be configured in the pipeline for the predetector. If this field was specified, then the rule will only trigger in case one of the IPs from the list is also available in the specified fields.

Example
 1filter: 'some_field: something AND some_ip_field'
 2pre_detector:
 3  id: RULE_ONE_ID
 4  title: Rule one
 5  severity: critical
 6  mitre:
 7  - some_tag
 8  case_condition: directly
 9description: Some malicous event.
10ip_fields:
11- some_ip_field

The pre_detector also has the option to normalize the timestamp. To configure this the following parameters can be set in the rule configuration.

Example
 1filter: 'some_field: "very malicious!"'
 2pre_detector:
 3  case_condition: directly
 4  id: RULE_ONE_ID
 5  mitre:
 6  - attack.something1
 7  - attack.something2
 8  severity: critical
 9  title: Rule one
10  timestamp_field: <field which includes the timestamp to be normalized>
11  source_format: <the format of the timestamp in strftime format or ISO8601 or UNIX>
12  sorce_timezone: <the timezone of the timestamp>
13  target_timezone: <the timezone after normalization>
14description: Some malicious event.

All of these new parameters are configurable and default to standard values if not explicitly set.

class logprep.processor.pre_detector.rule.PreDetectorRule.Config

RuleConfig for Predetector

title: str

A description for the triggered rule.

severity: str

Rating how dangerous an Event is, i.e. critical.

mitre: list

A list of MITRE ATT&CK tags.

case_condition: str

The type of the triggered rule, mostly directly.

ip_fields: list

Specify a list of fields that can be compared to a list of IPs, which can be configured in the pipeline for the pre_detector. If this field was specified, then the rule will only trigger in case one of the IPs from the list is also available in the specified fields.

sigma_fields: list | bool

tbd

link: str | None

A link to the rule if applicable.

source_format: str

the source format that can be given for normalizing the timestamp defaults to ISO8601

timestamp_field: str

the field which has the given timestamp to be normalized defaults to @timestamp

source_timezone: ZoneInfo

timezone of source_fields defaults to UTC

target_timezone: ZoneInfo

timezone for target_field defaults to UTC

failure_tags: list

tags to be added if processing of the rule fails

copy_fields_to_detection_event: set[str]

Field (names) from the triggering event to be added to the detection events. Defaults to [“host.name”] for downwards compatibility reasons. Collissions with field names which are already written by this processor are not allowed and will be rejected with a configuration validation error.

description: str

A description for the Rule. This has only documentation character.

id: str | int

An ID for the triggered rule.

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

tests: List[Dict[str, str]]

Custom tests for this rule.

Pseudonymizer

The pseudonymizer is a processor that pseudonymizes certain fields of log messages to ensure privacy regulations can be adhered to.

Processor Configuration

 1- pseudonymizername:
 2    type: pseudonymizer
 3    rules:
 4        - tests/testdata/rules/rules
 5    outputs:
 6        - kafka: pseudonyms_topic
 7    pubkey_analyst: /path/to/analyst_pubkey.pem
 8    pubkey_depseudo: /path/to/depseudo_pubkey.pem
 9    hash_salt: secret_salt
10    regex_mapping: /path/to/regex_mapping.json
11    max_cached_pseudonyms: 1000000
12    mode: GCM
class logprep.processor.pseudonymizer.processor.Pseudonymizer.Config

Pseudonymizer config

outputs: tuple[dict[str, str]]

list of output mappings in form of output_name:topic. Only one mapping is allowed per list element

pubkey_analyst: str

Path to the public key of an analyst. For string format see Getters.

Security Best Practice - Processor - Pseudonymizer pubkey analyst Authenticity and Integrity

Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.

pubkey_depseudo: str

Path to the public key for depseudonymization. For string format see Getters.

Security Best Practice - Processor - Pseudonymizer pubkey depseudo Authenticity and Integrity

Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.

hash_salt: str

A salt that is used for hashing.

regex_mapping: str

Path to a file (for string format see Getters) with a regex mapping for pseudonymization, i.e.:

Security Best Practice - Processor - Pseudonymizer regex mapping Memory Consumption

Be aware that all values of the remote file were loaded into memory. Consider to avoid dynamic increasing lists without setting limits for Memory consumption. Additionally avoid loading large files all at once to avoid exceeding http body limits.

Security Best Practice - Processor - Pseudonymizer regex mapping Authenticity and Integrity

Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.

max_cached_pseudonyms: int

The maximum number of cached pseudonyms. One cache entry requires ~250 Byte, thus 10 million elements would require about 2.3 GB RAM. The cache is not persisted. Restarting Logprep does therefore clear the cache. This caching reduces the CPU load of Logprep (no demanding encryption must be performed repeatedly) and the load on subsequent components (i.e. Logstash or Opensearch). In case the cache size has been exceeded, the least recently used entry is deleted. Has to be greater than 0.

Security Best Practice - Processor - Pseudonymizer max_cached_pseudonyms

Ensure to set this to a reasonable value to avoid excessive memory usage and OOM situations by the domain resolver cache.

max_cached_pseudonymized_urls: int

The maximum number of cached pseudonymized urls. Default is 10000. Behaves similarly to the max_cached_pseudonyms. Has to be greater than 0.

Security Best Practice - Processor - Pseudonymizer max_cached_pseudonymized_urls

Ensure to set this to a reasonable value to avoid excessive memory usage and OOM situations by the domain resolver cache.

mode: str

Optional mode of operation for the encryption. Can be either ‘GCM’ or ‘CTR’. Default is ‘GCM’.

Security Best Practice - Processor - Pseudonymizer

The pseudonymizer works with two public keys for different roles. It is suggested to ensure that two different keys are being used such that the separation of the roles can be maintained. It is suggested to use the GCM mode for encryption as it decouples the key length of the depseudo and analyst keys. This leads to additional 152 bytes of overhead for the encryption compared to the CTR mode encrypter.

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

The pseudonymizer requires the additional field pseudonymizer.mapping. It contains key value pairs that define what will be pseudonymized.

They key represents the field that will be pseudonymized and the value contains a regex keyword. The regex keyword defines which parts of the value are being pseudonymized. Only the regex matches are being pseudonymized that are also in a capture group. An arbitrary amount of capture groups can be used. The definitions of regex keywords are located in a separate file.

In the following the field event_data.param1 is being completely pseudonymized. This is achieved by using the predefined keyword RE_WHOLE_FIELD, which will be resolved to a regex expression. RE_WHOLE_FIELD resolves to (.*) which puts the whole match in a capture group and therefore pseudonymizes it completely.

Example - Rule
1filter: 'event_id: 1 AND source_name: "Test"'
2pseudonymizer:
3    mapping:
4        event_data.param1: RE_WHOLE_FIELD
5description: '...'
Example - Regex mapping file
1{
2  "RE_WHOLE_FIELD": "(.*)",
3  "RE_DOMAIN_BACKSLASH_USERNAME": "\w+\\(.*)",
4  "RE_IP4_COLON_PORT": "([\d.]+):\d+"
5}
class logprep.processor.pseudonymizer.rule.PseudonymizerRule.Config

RuleConfig for Pseudonymizer

url_fields: list

url fields to pseudonymize

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

mapping of field to regex string

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The fields from where to get the values which should be processed, requires target_field.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

tests: List[Dict[str, str]]

Custom tests for this rule.

Replacer

The replacer is a processor that replaces parts of a string with strings defined in rules.

Processor Configuration

1- samplename:
2    type: replacer
3    rules:
4        - tests/testdata/rules/
class logprep.processor.replacer.processor.Replacer.Config

Common Configurations

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

A speaking example:

Given replacer rule
1filter: message
2replacer:
3    mapping:
4        message: "Message %{*} was created by user %{USER_ID}."
5description: '...'
Incoming event
1{"message": "Message 123 was created by user 456."}
Processed event
1{ "message": "Message 123 was created by user USER_ID." }
class logprep.processor.replacer.rule.ReplacerRule.Config

Config for ReplacerRule

extend_target_list: bool
delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A mapping of fieldnames to patterns to replace

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to True

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The fields from where to get the values which should be processed, requires target_field.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Defaults to the source fields in mapping.

tests: List[Dict[str, str]]

Custom tests for this rule.

Replacement Pattern Language

The replacement pattern describes the textual format of the source field.

Given a replacement pattern of %{replaced_1}and%{replaced_2} the source field value will be replaced with a value where everything before and will be replaced with replaced_1 and everything after and will be replaced with replaced_2.

The string between %{ and } is the desired replacement value.

Additionally, there exists a colon notation MATCH_VALUE:REPLACE_VALUE to achieve more specific results. MATCH_VALUE is a specific value that will be replaced and REPLACE_VALUE is the value it will be replaced with. The value : needs to be escaped with :code:`` if it is to be used as a character. The escaping works like sigma wildcard escaping.

foo {VAL1:VAL2} bar results in foo VAL1:VAL2 bar for input foo SOME bar. foo {VAL1\:VAL2} bar results in foo VAL2 bar only for input foo VAL1bar.

{REPLACE_VALUE} is a shorthand for {*:REPLACE_VALUE}. {*} is a shorthand for {*:*}.

The special replacement value %{*} acts as a wildcard that allows to match a string without replacing it. If you want to use the symbol * as a replacement value, you have to escape it with \ (e.g. %{\*}). * does not have to be escaped if it occurs in combination with other values (e.g. %{**} or %{foo*}). The exception is if a single wildcard gets escaped multiple times (e.g. %{\\*} becomes \* and %{\\\*} becomes \\*).

The character %{|g} at the end of a replacement means that values will be matched greedily. Given an input 1.2.3.4. and a pattern %{IP}. it would replace to IP.2.3.4.. Given an input 1.2.3.4. and a pattern %{IP|g}. it would replace to IP..

An empty replacement value %{} will remove the matching parts from the new value.

class logprep.processor.dissector.rule.DissectorRule.Config

Config for Dissector

convert_datatype: dict

A mapping from source field and desired datatype [optional]. The datatypes could be float, int, bool, string

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A mapping from source fields to a dissect pattern [optional]. Dotted field notation is possible in key and in the dissect pattern.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The fields from where to get the values which should be processed, requires target_field.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

tests: List[Dict[str, str]]

Custom tests for this rule.

Examples for replacer:

replace the beginning:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': '%{X} login attempts.'}}}

  • message: {'field': '123 login attempts.'}

  • processed: {'field': 'X login attempts.'}

replace with a different target field:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': '%{X} login attempts.'}, 'target_field': 'new_target'}}

  • message: {'field': '123 login attempts.'}

  • processed: {'field': '123 login attempts.', 'new_target': 'X login attempts.'}

replace with dotted field:

  • rule: {'filter': 'some.field', 'replacer': {'mapping': {'some.field': '%{X} login attempts.'}}}

  • message: {'some': {'field': '123 login attempts.'}}

  • processed: {'some': {'field': 'X login attempts.'}}

replace with colon notation:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': '%{*:X} login attempts.'}}}

  • message: {'field': '123 login attempts.'}

  • processed: {'field': 'X login attempts.'}

replace wildcard with colon notation:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': '%{*:*} login attempts.'}}}

  • message: {'field': '123 login attempts.'}

  • processed: {'field': '123 login attempts.'}

replace specific with colon notation matches:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': '%{123:X} login attempts.'}}}

  • message: {'field': '123 login attempts.'}

  • processed: {'field': 'X login attempts.'}

replace specific with colon notation at beginning does not match:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': '%{123:X} login attempts by %{USER_ID}.'}}}

  • message: {'field': '456 login attempts by 789.'}

  • processed: {'field': '456 login attempts by 789.'}

replace specific with colon notation at beginning matches:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': '%{123:X} login attempts by %{USER_ID}.'}}}

  • message: {'field': '123 login attempts by 789.'}

  • processed: {'field': 'X login attempts by USER_ID.'}

replace specific with colon notation at middle does not match:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'User %{USER_ID} performed %{789:X} login attempts.'}}}

  • message: {'field': 'User 123 performed 456 login attempts.'}

  • processed: {'field': 'User 123 performed 456 login attempts.'}

replace specific with colon notation at middle matches:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'User %{USER_ID} performed %{456:X} login attempts.'}}}

  • message: {'field': 'User 123 performed 456 login attempts.'}

  • processed: {'field': 'User USER_ID performed X login attempts.'}

replace specific with colon notation at end does not match:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'User %{USER_ID} login count: %{789:X}'}}}

  • message: {'field': 'User 123 login count: 456'}

  • processed: {'field': 'User 123 login count: 456'}

replace specific with colon notation at end matches:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'User %{USER_ID} login count: %{456:X}'}}}

  • message: {'field': 'User 123 login count: 456'}

  • processed: {'field': 'User USER_ID login count: X'}

replace specific with colon notation matches combined without colon notation:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': '%{123:X} login attempts within %{Y} minutes.'}}}

  • message: {'field': '123 login attempts within 456 minutes.'}

  • processed: {'field': 'X login attempts within Y minutes.'}

replace specific with colon notation matches combined without colon notation:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': '/%{*}/foo/%{_:}%{ID}/%{*}'}}}

  • message: {'field': '/some/path/foo/_123/bar'}

  • processed: {'field': '/some/path/foo/ID/bar'}

replace specific with colon notation starting with wildcard:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': '%{*}/%{_:}%{ID}/%{*}'}}}

  • message: {'field': '/some/path/foo/_123/bar'}

  • processed: {'field': '/some/path/foo/ID/bar'}

replace specific with colon notation without wildcard:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': '/some/path/%{_:}%{ID}'}}}

  • message: {'field': '/some/path/_123'}

  • processed: {'field': '/some/path/ID'}

replace the middle:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'Attempted to login %{X} times.'}}}

  • message: {'field': 'Attempted to login 123 times.'}

  • processed: {'field': 'Attempted to login X times.'}

replace the end:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'Delete user %{USER_ID}'}}}

  • message: {'field': 'Delete user 123'}

  • processed: {'field': 'Delete user USER_ID'}

replace beginning and the middle:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': '%{A user} tried to call /users/%{USER_ID}/delete'}}}

  • message: {'field': 'User 123 tried to call /users/456/delete'}

  • processed: {'field': 'A user tried to call /users/USER_ID/delete'}

replace twice in middle:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'User %{USER_ID} tried %{ATTEMPTS} times to log in.'}}}

  • message: {'field': 'User 123 tried 456 times to log in.'}

  • processed: {'field': 'User USER_ID tried ATTEMPTS times to log in.'}

replace the middle and the end:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'Attempted to login %{ATTEMPTS} times to %{IP}'}}}

  • message: {'field': 'Attempted to login 123 times to 1.2.3.4'}

  • processed: {'field': 'Attempted to login ATTEMPTS times to IP'}

replace three times:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'User %{USER_ID} tried to login %{ATTEMPTS} to %{IP}'}}}

  • message: {'field': 'User 123 tried to login 456 to 1.2.3.4'}

  • processed: {'field': 'User USER_ID tried to login ATTEMPTS to IP'}

replace with empty string:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': '%{}login attempts%{}.'}}}

  • message: {'field': '123 login attempts by user 456.'}

  • processed: {'field': 'login attempts.'}

don’t replace greedily if part of variable string is contained in unchanging part:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'Connected to %{IP|g}.'}}}

  • message: {'field': 'Connected to 1.2.3.4.'}

  • processed: {'field': 'Connected to IP.'}

twice don’t replace greedily if part of variable string is contained in unchanging part:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'Disconnected from %{IP|g}. Connected to %{IP|g}.'}}}

  • message: {'field': 'Disconnected from 1.2.3.4. Connected to 1.2.3.4.'}

  • processed: {'field': 'Disconnected from IP. Connected to IP.'}

replace wildcard greedily:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'Disconnected from %{IP|g}. Connected to %{*|g}.'}}}

  • message: {'field': 'Disconnected from 1.2.3.4. Connected to 1.2.3.4.'}

  • processed: {'field': 'Disconnected from IP. Connected to 1.2.3.4.'}

replace multiple fields:

  • rule: {'filter': 'field_a AND field_b', 'replacer': {'mapping': {'field_a': 'do %{replace this}!', 'field_b': 'do also %{replace this}!'}}}

  • message: {'field_a': 'do something!', 'field_b': 'do also something!'}

  • processed: {'field_a': 'do replace this!', 'field_b': 'do also replace this!'}

replace by matching with wildcard:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'User with ID %{USER_ID} has%{*}.'}}}

  • message: {'field': 'User with ID 123 has logged in.'}

  • processed: {'field': 'User with ID USER_ID has logged in.'}

replace by matching only with wildcard does not change anything:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'User has logged %{*}.'}}}

  • message: {'field': 'User has logged in.'}

  • processed: {'field': 'User has logged in.'}

replace by matching with wildcard at the end:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'User with ID %{USER_ID} has logged %{*}.'}}}

  • message: {'field': 'User with ID 123 has logged in.'}

  • processed: {'field': 'User with ID USER_ID has logged in.'}

replace by matching with wildcard at the beginning:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': '%{*} with ID %{USER_ID} has logged in.'}}}

  • message: {'field': 'User with ID 123 has logged in.'}

  • processed: {'field': 'User with ID USER_ID has logged in.'}

replace by matching with wildcard in the middle before other replacement:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'User %{*} with ID %{USER_ID} has logged in.'}}}

  • message: {'field': 'User with ID 123 has logged in.'}

  • processed: {'field': 'User with ID USER_ID has logged in.'}

replace by matching with multiple wildcards:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': '%{*} with ID %{USER_ID} has %{*}in%{*}'}}}

  • message: {'field': 'User with ID 123 has logged in.'}

  • processed: {'field': 'User with ID USER_ID has logged in.'}

replace with star by escaping single wildcard:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'User with ID %{USER_ID} has logged %{\*}.'}}}

  • message: {'field': 'User with ID 123 has logged in.'}

  • processed: {'field': 'User with ID USER_ID has logged *.'}

replace with backslash and star by escaping single wildcard:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'User with ID %{USER_ID} has%{\\*}'}}}

  • message: {'field': 'User with ID 123 has logged in.'}

  • processed: {'field': 'User with ID USER_ID has\*'}

replace with multiple backslashes and star by escaping single wildcard:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'User with ID %{USER_ID} has logged %{\\\*}.'}}}

  • message: {'field': 'User with ID 123 has logged in.'}

  • processed: {'field': 'User with ID USER_ID has logged \\*.'}

replacement of multiple stars does not require escaping wildcard:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'User with ID %{USER_ID} has logged %{**}.'}}}

  • message: {'field': 'User with ID 123 has logged in.'}

  • processed: {'field': 'User with ID USER_ID has logged **.'}

replacement without matching end fails:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'Call /some_path/user/%{USER_ID}/'}}}

  • message: {'field': 'Call /some_path/user/123/delete'}

  • processed: {'field': 'Call /some_path/user/123/delete'}

replacement without matching beginning fails:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'failed logins: %{COUNT}'}}}

  • message: {'field': 'logins: 123'}

  • processed: {'field': 'logins: 123'}

replacement without matching beginning and end fails:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': 'failed to login %{COUNT} times during the last hour'}}}

  • message: {'field': 'succeeded to login 123 times during the last minute'}

  • processed: {'field': 'succeeded to login 123 times during the last minute'}

replacement without matching middle fails:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': '%{USER} created by %{USER}'}}}

  • message: {'field': '123 deleted by 456'}

  • processed: {'field': '123 deleted by 456'}

nested replacement ignores second start token and terminates with first end token:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': '%{%{replace this} not}!'}}}

  • message: {'field': 'something not}!'}

  • processed: {'field': '%{replace this not}!'}

nested replacement ignores second start token and terminates with first end token:

  • rule: {'filter': 'field', 'replacer': {'mapping': {'field': '%{do %{replace this}}!'}}}

  • message: {'field': 'do %{something not}!'}

  • processed: {'field': 'do %{replace this}!'}

Requester

A processor to invoke http requests. Can be used to enrich events from an external api or to trigger external systems by and with event field values.

Security Best Practice - Processor - Requester

As the requester can execute arbitrary http requests it is advised to execute requests only against known and trusted endpoints and that the communication is protected with a valid SSL-Certificate. Do so by setting a certificate path with the option cert. To ensure that the communication is trusted it is also recommended to set either an Authorization-Header or a corresponding authentication with a username and password, via auth.

Processor Configuration

1- requestername:
2    type: requester
3    rules:
4        - tests/testdata/rules/rules
class logprep.processor.requester.processor.Requester.Config

Common Configurations

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

The Requester is configured by the keyword requester. It can be used to trigger external systems via web request or enrich eventdata by external apis.

A speaking example for event enrichment via external api:

Given requester rule
 1filter: 'domain'
 2requester:
 3  url: https://internal.cmdb.local/api/v1/locations
 4  method: POST
 5  target_field: cmdb.location
 6  headers:
 7    Authorization: Bearer askdfjpiowejf283u9r
 8  json:
 9    hostname: ${message.hostname}
10description: '...'
Incoming event
1{"message": {"hostname": "BB37293hhj"}}
Raw response json data given from the api
1{
2    "city": "Montreal",
3    "Building": "L76",
4    "Floor": 3,
5    "Room": 34
6}
Processed event
 1{"message": {"hostname": "BB37293hhj"},
 2 "cmdb": {
 3     "location": {
 4         "city": "Montreal",
 5         "Building": "L76",
 6         "Floor": 3,
 7         "Room": 34
 8         }
 9    }
10}
class logprep.processor.requester.rule.RequesterRule.Config

Config for RequesterRule

target_field_mapping: dict

(Optional) A mapping from dotted_fields to dotted_fields to extract data from response json to target fields. If target_field is given too, this is made additionally

method: str

The method for the request. must be one of GET, OPTIONS, HEAD, POST, PUT, PATCH, DELETE

url: str

The url for the request. You can use dissect pattern language to add field values

json: dict

(Optional) The json payload. Can be enriched with event data by using the pattern ${the.dotted.field} to retrieve nested field values.

data: str

(Optional) The data payload. Can be enriched with event data by using the pattern ${the.dotted.field} to retrieve nested field values.

params: dict

(Optional) The query parameters as dictionary. Can be enriched with event data by using the pattern ${the.dotted.field} to retrieve nested field values.

headers: dict

(Optional) The http headers as dictionary.

auth: tuple

(Optional) The authentication tuple. Defined as list. Will be converted to tuple

timeout: float

(Optional) The timeout in seconds as float for the request. Defaults to 2 seconds

verify: bool

(Optional) Whether or not verify the ssl context. Defaults to True.

proxies: dict

(Optional) Dictionary mapping protocol or protocol and host to the URL of the proxy (e.g. {"http": "foo.bar:3128", "http://host.name": "foo.bar:4012"}) to be used on the request

cert: str

(Optional) SSL client certificate as path to ssl client cert file (.pem).

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The fields from where to get the values which should be processed, requires target_field.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

(Optional) The target field to write the complete response json or body to

tests: List[Dict[str, str]]

Custom tests for this rule.

SelectiveExtractor

The selective_extractor is a processor that allows to write field values of a given log message to a different Kafka topic. The output topic is configured via the pipeline yml, while the fields to be extracted are specified by means of a list which is also specified in the pipeline configuration as a file path. This processor is applied to all messages, because of that it does not need further rules to specify it’s behavior.

Processor Configuration

1- selectiveextractorname:
2    type: selective_extractor
3    rules:
4        - tests/testdata/rules/rules
class logprep.processor.selective_extractor.processor.SelectiveExtractor.Config

Common Configurations

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

The selective extractor requires the additional field selective_extractor. It contains a list of field names that should be extracted (source_fields) and list of output mappings to which they should be send to (outputs). If dotted notation is being used, then all fields on the path are being automatically created.

In the following example, the field field.extract with the value extracted value is being extracted and send to the output named kafka and the topic named topic_to_send_to.

Example rule with extract from field list
1filter: extract_test
2selective_extractor:
3  source_fields: ["field.extract", "field2", "field3"]
4  outputs:
5    - kafka: topic_to_send_to
6description: '...'
Example event
{
  "extract_test": {
    "field": {
      "extract": "extracted value"
    }
  }
}
Extracted event from Example
{
  "extract": "extracted value"
}

Alternatively, the additional field selective_extractor.extract.extract_from_file can be added. It contains the path to a text file with a list of fields per line to be extracted.

Example rule with extract from file
1filter: extract_test
2selective_extractor:
3    extract_from_file: /path/to/file
4    outputs:
5        - opensearch: topic_to_send_to
6description: '...'
Example of file with field list
field1
field2
field3

The file has to exist.

It is possible to mix both extraction sources. They will be merged to one list without duplicates.

Example rule with extract from file
1filter: extract_test
2selective_extractor:
3    extract_from_file: /path/to/file
4    source_fields: ["field1", "field2", "field4"]
5    outputs:
6      - kafka: topic_to_send_to
7description: '...'
Example of file with field list
field1
field2
field3
class logprep.processor.selective_extractor.rule.SelectiveExtractorRule.Config

RuleConfig for SelectiveExtractor

outputs: tuple[dict[str, str]]

list of output mappings in form of output_name:topic. Only one mapping is allowed per list element

extract_from_file: str

The path or url to a file with a flat list of fields to extract. For string format see Getters.

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to True

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

List of fields in dotted field notation

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

tests: List[Dict[str, str]]

Custom tests for this rule.

StringSplitter

The string_splitter processor splits string by whitespace (default) or a given delimiter and writes the resulting list to a target field.

Processor Configuration

1- samplename:
2    type: string_splitter
3    rules:
4        - tests/testdata/rules/rules
class logprep.processor.string_splitter.processor.StringSplitter.Config

Common Configurations

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

A speaking example:

Given string_splitter rule
1filter: message
2string_splitter:
3    source_fields: ["message"]
4    target_field: result
5description: '...'
Incoming event
1{"message": "this is the message"}
Processed event
1{"message": "this is the message", "result": ["this", "is", "the", "message"]}
class logprep.processor.string_splitter.rule.StringSplitterRule.Config

Config for StringSplitterRule

delimiter: str

The delimiter for splitting. Defaults to whitespace

drop_empty: bool

If empty list values (as a result of the splitting operation) should be dropped or kept. By this definition, the empty string (no characters) and strings containing only whitespace count as ‘empty’. The default setting is to keep empty list values.

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The fields from where to get the values which should be processed, requires target_field.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

tests: List[Dict[str, str]]

Custom tests for this rule.

Examples for string_splitter:

splits_without_explicit_set_delimiter_on_whitespace:

  • rule: {'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'drop_empty': True}}

  • message: {'message': 'this is the message'}

  • processed: ['this', 'is', 'the', 'message']

splits_with_delimiter:

  • rule: {'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'delimiter': ', ', 'drop_empty': True}}

  • message: {'message': 'this, is, the, message'}

  • processed: ['this', 'is', 'the', 'message']

splits_one_item_with_delimiter:

  • rule: {'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'delimiter': ',', 'drop_empty': True}}

  • message: {'message': 'this,'}

  • processed: ['this']

splits_one_item_with_multiple_delimiter_and_drop_empty:

  • rule: {'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'delimiter': ',', 'drop_empty': True}}

  • message: {'message': ',,this,,'}

  • processed: ['this']

splits_one_item_with_multiple_delimiter_and_no_drop_empty:

  • rule: {'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'delimiter': ',', 'drop_empty': False}}

  • message: {'message': ',,this,,'}

  • processed: ['', '', 'this', '', '']

splits_one_item_with_multiple_delimiter_and_empty_fields:

  • rule: {'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'delimiter': ',', 'drop_empty': True}}

  • message: {'message': ' , ,this, ,'}

  • processed: ['this']

splits_one_item_with_multiple_delimiter_and_whitespace:

  • rule: {'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'delimiter': ',', 'drop_empty': True}}

  • message: {'message': ',, this , , '}

  • processed: [' this ']

splits_one_item_with_multiple_delimiter_and_newline:

  • rule: {'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'delimiter': ',', 'drop_empty': True}}

  • message: {'message': 'n,,this,t, '}

  • processed: ['this']

splits_one_item_with_multiple_delimiter_and_whitespace_only_in_front:

  • rule: {'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'delimiter': ',', 'drop_empty': True}}

  • message: {'message': ',, this, , '}

  • processed: [' this']

splits_one_item_with_multiple_delimiter_and_whitespace_only_in_front:

  • rule: {'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'delimiter': ',', 'drop_empty': True}}

  • message: {'message': 'hello , world,this, is a very complex,n , and even multiline, text,,, '}

  • processed: ['hello ', ' world', 'this', ' is a very complex', ' and even multiline', ' text']

TemplateReplacer

The template_replacer is a processor that can replace parts of a text field to anonymize those parts. The replacement is based on a template file.

Processor Configuration

 1- templatereplacername:
 2    type: template_replacer
 3    rules:
 4        - tests/testdata/rules/rules
 5    template: /tmp/template.yml
 6    pattern:
 7        delimiter: ","
 8        fields:
 9            - field.name.a
10            - field.name.b
11        allowed_delimiter_field: field.name.b
12        target_field: target.field
class logprep.processor.template_replacer.processor.TemplateReplacer.Config

TemplateReplacer config

template: str

Path to a YML file (for path format see Getters) with a list of replacements in the format %{provider_name}-%{event_id}: %{new_message}.

Security Best Practice - Processor - TemplateReplacer template Memory Consumption

Be aware that all values of the remote file were loaded into memory. Consider to avoid dynamic increasing lists without setting limits for Memory consumption. Additionally avoid loading large files all at once to avoid exceeding http body limits.

Security Best Practice - Processor - TemplateReplacer template Authenticity and Integrity

Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.

pattern: dict

Configures how to use the template file by specifying the following subfields:

  • delimiter - Delimiter to use to split the template

  • fields - A list of dotted fields that are being checked by the template.

  • allowed_delimiter_field - One of the fields in the fields list can contain the delimiter. This must be specified here.

  • target_field - The field that gets replaced by the template.

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

The template replacer requires the additional field template_replacer. No additional configuration parameters are required for the rules. The module is completely configured over the pipeline configuration.

In the following example the target field specified in the processor configuration is replaced for all log messages that have winlog.provider_name and winlog.event_id if it is defined in the template file.

Example
1filter: winlog.provider_name AND winlog.event_id
2template_replacer: {}
3description: ''
class logprep.processor.template_replacer.rule.TemplateReplacerRule.Config

Config for FieldManagerRule

source_fields: list

The fields from where to get the values which should be processed, requires target_field.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

tests: List[Dict[str, str]]

Custom tests for this rule.

Timestamper

The timestamper processor normalizes timestamps to iso8601 compliant output format.

Processor Configuration

1- myteimestamper:
2    type: timestamper
3    rules:
4        - tests/testdata/rules/rules
class logprep.processor.timestamper.processor.Timestamper.Config

Common Configurations

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

A speaking example:

Given timestamper rule
1filter: "winlog.event_id: 123456789"
2timestamper:
3    source_fields: ["winlog.event_data.some_timestamp_utc"]
4    target_field: "@timestamp"
5    source_format: UNIX
6    source_timezone: UTC
7    target_timezone: Europe/Berlin
8description: example timestamper rule
Incoming event
1    {
2        "winlog": {
3            "api": "wineventlog",
4            "event_id": 123456789,
5            "event_data": {"some_timestamp_utc": "1642160449"},
6        }
7    }
Processed event
1    {
2        "@timestamp": "2022-01-14T12:40:49+01:00",
3        "winlog": {
4            "api": "wineventlog",
5            "event_id": 123456789,
6            "event_data": {"some_timestamp_utc": "1642160449"},
7        },
8    }
class logprep.processor.timestamper.rule.TimestamperRule.Config

Config for TimestamperRule

source_format: list

A list of possible source formats if source_fields is not an iso8601 compliant time format string the format must be given in the syntax of the python builtin datetime.strptime (see: https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes). Additionally, the value ISO8601 (default) and UNIX can be used in the list of the source_formats field. The former can be used if the timestamp already exists in the ISO8601 format, such that only a timezone conversion should be applied. And the latter can be used if the timestamp is given in the UNIX Epoch Time. This supports the Unix timestamps in seconds and milliseconds. Be aware that UNIX and ISO8601 formats do not validate the completeness of input string. If you want to ensure, the completeness of the input string, you have to use the datetime.strptime syntax.

For example, the following time formats are valid ISO8601 formats:

  • hh:mm

  • hh:mm:ss

  • hh:mm:ss.sss

  • hhmmss.ssssss

  • hhmm

  • hhmmss

The output string will always be in this format: 2000-12-31T22:59:59Z. As you can see the output string has a time with seconds. If the input string does not have a time or the time does not have seconds, the output string will have seconds or times set to zero.

If you don’t want this behavior, you have to use the datetime.strptime syntax. With this syntax, the timestamper`errors out with a :code:`TimeParserException and a tag _timestamper_failure will be added to the event.

source_timezone: ZoneInfo

timezone of source_fields. defaults to UTC

target_timezone: ZoneInfo

timezone for target_field. defaults to UTC

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The field from where to get the time from as list with one element

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to, defaults to @timestamp

tests: List[Dict[str, str]]

Custom tests for this rule.

Examples for timestamper:

parses iso8601 without pattern:

  • rule: {'filter': 'message', 'timestamper': {'source_fields': ['message'], 'target_field': '@timestamp'}}

  • message: {'message': '2009-06-15 13:45:30Z'}

  • processed: {'message': '2009-06-15 13:45:30Z', '@timestamp': '2009-06-15T13:45:30Z'}

parses iso8601 to default target field:

  • rule: {'filter': 'message', 'timestamper': {'source_fields': ['message']}}

  • message: {'message': '2009-06-15 13:45:30Z'}

  • processed: {'message': '2009-06-15 13:45:30Z', '@timestamp': '2009-06-15T13:45:30Z'}

parses by datetime source format:

  • rule: {'filter': 'message', 'timestamper': {'source_fields': ['message'], 'source_format': '%Y %m %d - %H:%M:%S'}}

  • message: {'message': '2000 12 31 - 22:59:59'}

  • processed: {'message': '2000 12 31 - 22:59:59', '@timestamp': '2000-12-31T22:59:59Z'}

converts timezone information:

  • rule: {'filter': 'message', 'timestamper': {'source_fields': ['message'], 'source_format': '%Y %m %d - %H:%M:%S', 'source_timezone': 'UTC', 'target_timezone': 'Europe/Berlin'}}

  • message: {'message': '2000 12 31 - 22:59:59'}

  • processed: {'message': '2000 12 31 - 22:59:59', '@timestamp': '2000-12-31T23:59:59+01:00'}

parses unix timestamp:

  • rule: {'filter': 'message', 'timestamper': {'source_fields': ['message'], 'source_format': 'UNIX', 'source_timezone': 'UTC', 'target_timezone': 'Europe/Berlin'}}

  • message: {'message': '1642160449843'}

  • processed: {'message': '1642160449843', '@timestamp': '2022-01-14T12:40:49.843000+01:00'}

normalization from timestamp berlin to utc:

  • rule: {'filter': 'winlog.event_id: 123456789', 'timestamper': {'source_fields': ['winlog.event_data.some_timestamp_berlin'], 'target_field': '@timestamp', 'source_format': '%Y %m %d - %H:%M:%S', 'source_timezone': 'Europe/Berlin', 'target_timezone': 'UTC'}}

  • message: {'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'some_timestamp_berlin': '1999 12 12 - 12:12:22'}}}

  • processed: {'@timestamp': '1999-12-12T11:12:22Z', 'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'some_timestamp_berlin': '1999 12 12 - 12:12:22'}}}

normalization from timestamp same timezone:

  • rule: {'filter': 'winlog.event_id: 123456789', 'timestamper': {'source_fields': ['winlog.event_data.some_timestamp_utc'], 'target_field': '@timestamp', 'source_format': '%Y %m %d - %H:%M:%S', 'source_timezone': 'UTC', 'target_timezone': 'UTC'}}

  • message: {'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'some_timestamp_utc': '1999 12 12 - 12:12:22'}}}

  • processed: {'@timestamp': '1999-12-12T12:12:22Z', 'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'some_timestamp_utc': '1999 12 12 - 12:12:22'}}}

normalization from unix with millis timestamp:

  • rule: {'filter': 'winlog.event_id: 123456789', 'timestamper': {'source_fields': ['winlog.event_data.some_timestamp_utc'], 'target_field': '@timestamp', 'source_format': 'UNIX', 'source_timezone': 'UTC', 'target_timezone': 'Europe/Berlin'}}

  • message: {'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'some_timestamp_utc': '1642160449843'}}}

  • processed: {'@timestamp': '2022-01-14T12:40:49.843000+01:00', 'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'some_timestamp_utc': '1642160449843'}}}

normalization from unix with seconds timestamp:

  • rule: {'filter': 'winlog.event_id: 123456789', 'timestamper': {'source_fields': ['winlog.event_data.some_timestamp_utc'], 'target_field': '@timestamp', 'source_format': 'UNIX', 'source_timezone': 'UTC', 'target_timezone': 'Europe/Berlin'}}

  • message: {'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'some_timestamp_utc': '1642160449'}}}

  • processed: {'@timestamp': '2022-01-14T12:40:49+01:00', 'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'some_timestamp_utc': '1642160449'}}}

attempt parsing with multiple patterns, second one successful:

  • rule: {'filter': 'message', 'timestamper': {'source_fields': ['message'], 'source_format': ['%Y %m %d', '%Y %m %d - %H:%M:%S']}}

  • message: {'message': '2000 12 31 - 22:59:59'}

  • processed: {'message': '2000 12 31 - 22:59:59', '@timestamp': '2000-12-31T22:59:59Z'}

attempt parsing with multiple patterns, both successful but stopping after first:

  • rule: {'filter': 'message', 'timestamper': {'source_fields': ['message'], 'source_format': ['%Y %m %d - %H:%M:%S', '%Y %m %d - %H:%M:%S']}}

  • message: {'message': '2000 12 31 - 22:59:59'}

  • processed: {'message': '2000 12 31 - 22:59:59', '@timestamp': '2000-12-31T22:59:59Z'}

TimestampDiffer

The timestamp_differ can calculate the time difference between two timestamps.

Processor Configuration

1- timestampdiffer_name:
2    type: timestamp_differ
3    rules:
4        - tests/testdata/rules/rules
class logprep.processor.timestamp_differ.processor.TimestampDiffer.Config

Common Configurations

rules: list[str]

List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.

tree_config: str | None

Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.

apply_multiple_times: bool

Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

Rule Configuration

The timestamp format can be specified per timestamp.

A speaking example:

Given timestamp differ rule
1filter: 'ingest AND processed'
2timestamp_differ:
3  diff: ${processed:%Y-%m-%d %H:%M:%S} - ${ingest:%Y-%m-%d %H:%M:%S}
4  target_field: processing_time
5  output_format: seconds
6description: '...'
Incoming event
1{"ingest": "2022-12-06 10:00:00", "processed": "2022-12-06 10:00:05"}
Processed event
1{"ingest": "2022-12-06 10:00:00", "processed": "2022-12-06 10:00:05", "processing_time": "5.0"}
class logprep.processor.timestamp_differ.rule.TimestampDifferRule.Config

Config for TimestampDiffer

diff: str

Specifies the timestamp subtraction and their respective timestamp formats. The fields and the timestamp format can be specified in the form of: ${dotted.field.path:timestamp-format}. If no timestamp format is given, e.g. ${dotted.field.path}, the string will be assumed as an iso8601 compliant string and parsed. For more information on the format syntax see datetime strftime/strptime.

source_field_formats: list
output_format: str

(Optional) Specifies the desired output format of the timestamp difference, allowed values are: seconds, milliseconds, nanoseconds, defaults to: seconds.

show_unit: bool

(Optional) Specifies whether the unit (s, ms, ns) should be part of the output. Defaults to False.

delete_source_fields: bool

Whether to delete all the source fields or not. Defaults to False

description: str

A description for the Rule. This has only documentation character.

id: str | int | None

A uuid for the rule. Is generated by logprep.

ignore_missing_fields: bool

If set to True missing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults to False

mapping: dict

A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set delete_source_fields to true. Works independent of source_fields and target_field.

merge_with_target: bool

If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to False.

overwrite_target: bool

Overwrite the target field value if exists. Defaults to False

regex_fields: list

It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field regex_fields in the rule definition.

source_fields: list

The fields from where to get the values which should be processed, requires target_field.

tag_on_failure: list

A list of tags which will be appended to the event on non critical errors, defaults to ["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.

target_field: str

The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires source_field.

tests: List[Dict[str, str]]

Custom tests for this rule.

Examples for timestamp_differ:

Time difference between two timestamps:

  • rule: {'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2} - ${field1}', 'target_field': 'time_diff'}}

  • message: {'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00'}

  • processed: {'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00', 'time_diff': '1278.0'}

Time difference between two timestamps with day change:

  • rule: {'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2} - ${field1}', 'target_field': 'time_diff'}}

  • message: {'field1': '2022-12-04 12:00:00', 'field2': '2022-12-05 12:00:00'}

  • processed: {'field1': '2022-12-04 12:00:00', 'field2': '2022-12-05 12:00:00', 'time_diff': '86400.0'}

Time difference between two timestamps with timezone information:

  • rule: {'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2:%Y-%m-%d %H:%M:%S %z} - ${field1:%Y-%m-%d}', 'target_field': 'time_diff'}}

  • message: {'field2': '2022-05-09 03:56:47 -03:00', 'field1': '2022-05-08'}

  • processed: {'field2': '2022-05-09 03:56:47 -03:00', 'field1': '2022-05-08', 'time_diff': '111407.0'}

Time difference between two timestamps with full weekday and month:

  • rule: {'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2:%A, %d. %B %Y %I:%M%p} - ${field1:%Y-%m-%d}', 'target_field': 'time_diff'}}

  • message: {'field2': 'Monday, 05. December 2022 11:19AM', 'field1': '2022-12-05'}

  • processed: {'field2': 'Monday, 05. December 2022 11:19AM', 'field1': '2022-12-05', 'time_diff': '40740.0'}

Time difference between two timestamps with AM/PM :

  • rule: {'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2:%a %b %d %I:%M:%S %p %Y} - ${field1:%Y-%m-%d}', 'target_field': 'time_diff'}}

  • message: {'field2': 'Wed Dec 4 1:14:31 PM 2022', 'field1': '2022-12-03'}

  • processed: {'field2': 'Wed Dec 4 1:14:31 PM 2022', 'field1': '2022-12-03', 'time_diff': '134071.0'}

Time difference between two timestamps with milliseconds output:

  • rule: {'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2:%Y-%m-%d %H:%M:%S} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff', 'output_format': 'milliseconds'}}

  • message: {'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00'}

  • processed: {'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00', 'time_diff': '1278000.0'}

Time difference between two timestamps with nanoseconds output:

  • rule: {'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2:%Y-%m-%d %H:%M:%S} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff', 'output_format': 'nanoseconds'}}

  • message: {'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00'}

  • processed: {'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00', 'time_diff': '1278000000000.0'}

Time difference between two timestamps in subfield:

  • rule: {'filter': 'field1 AND subfield.field2', 'timestamp_differ': {'diff': '${subfield.field2:%Y-%m-%d %H:%M:%S} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff'}}

  • message: {'field1': '2022-12-05 11:38:42', 'subfield': {'field2': '2022-12-05 12:00:00'}}

  • processed: {'field1': '2022-12-05 11:38:42', 'subfield': {'field2': '2022-12-05 12:00:00'}, 'time_diff': '1278.0'}

Time difference between two timestamps without specific timestamp format:

  • rule: {'filter': 'field1 AND subfield.field2', 'timestamp_differ': {'diff': '${subfield.field2} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff'}}

  • message: {'field1': '2022-12-05 12:00:00', 'subfield': {'field2': '2022-12-05T11:38:42-02:00'}}

  • processed: {'field1': '2022-12-05 12:00:00', 'subfield': {'field2': '2022-12-05T11:38:42-02:00'}, 'time_diff': '5922.0'}

Time difference between two timestamps with removal of source fields:

  • rule: {'filter': 'field1 AND subfield.field2', 'timestamp_differ': {'diff': '${subfield.field2} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff', 'delete_source_fields': True}}

  • message: {'field1': '2022-12-05 12:00:00', 'subfield': {'field2': '2022-12-05T11:38:42-02:00'}}

  • processed: {'time_diff': '5922.0'}

Time difference between two timestamps with overwriting of target:

  • rule: {'filter': 'field1 AND subfield.field2', 'timestamp_differ': {'diff': '${subfield.field2} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff', 'overwrite_target': True}}

  • message: {'field1': '2022-12-05 12:00:00', 'subfield': {'field2': '2022-12-05T11:38:42-02:00'}, 'time_diff': 'some content'}

  • processed: {'field1': '2022-12-05 12:00:00', 'subfield': {'field2': '2022-12-05T11:38:42-02:00'}, 'time_diff': '5922.0'}

Time difference between two timestamps with extension of existing list in target field:

  • rule: {'filter': 'field1 AND subfield.field2', 'timestamp_differ': {'diff': '${subfield.field2} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff', 'merge_with_target': True}}

  • message: {'field1': '2022-12-05 12:00:00', 'subfield': {'field2': '2022-12-05T11:38:42-02:00'}, 'time_diff': ['some content']}

  • processed: {'field1': '2022-12-05 12:00:00', 'subfield': {'field2': '2022-12-05T11:38:42-02:00'}, 'time_diff': ['some content', '5922.0']}

Timestamp diff with integer field (unix epoch):

  • rule: {'filter': 'field1 AND subfield.field2', 'timestamp_differ': {'diff': '${subfield.field2} - ${field1}', 'target_field': 'time_diff'}}

  • message: {'field1': 1670234400, 'subfield': {'field2': '2022-12-05 12:00:00'}}

  • processed: {'field1': 1670234400, 'subfield': {'field2': '2022-12-05 12:00:00'}, 'time_diff': '7200.0'}

Timestamp diff with difference in milliseconds, output in seconds:

  • rule: {'filter': 'field1 AND subfield.field2', 'timestamp_differ': {'diff': '${subfield.field2} - ${field1}', 'target_field': 'time_diff'}}

  • message: {'field1': '2022-12-05 12:00:00.200', 'subfield': {'field2': '2022-12-05 12:00:00.500'}}

  • processed: {'field1': '2022-12-05 12:00:00.200', 'subfield': {'field2': '2022-12-05 12:00:00.500'}, 'time_diff': '0.3'}

Timestamp diff with difference in milliseconds, output in milliseconds:

  • rule: {'filter': 'field1 AND subfield.field2', 'timestamp_differ': {'diff': '${subfield.field2} - ${field1}', 'target_field': 'time_diff', 'output_format': 'milliseconds'}}

  • message: {'field1': '2022-12-05 12:00:00.200', 'subfield': {'field2': '2022-12-05 12:00:00.500'}}

  • processed: {'field1': '2022-12-05 12:00:00.200', 'subfield': {'field2': '2022-12-05 12:00:00.500'}, 'time_diff': '300.0'}

Timestamp diff with difference in milliseconds, output in nanoseconds:

  • rule: {'filter': 'field1 AND subfield.field2', 'timestamp_differ': {'diff': '${subfield.field2} - ${field1}', 'target_field': 'time_diff', 'output_format': 'nanoseconds'}}

  • message: {'field1': '2022-12-05 12:00:00.200', 'subfield': {'field2': '2022-12-05 12:00:00.500'}}

  • processed: {'field1': '2022-12-05 12:00:00.200', 'subfield': {'field2': '2022-12-05 12:00:00.500'}, 'time_diff': '300000000.0'}

Time difference between two timestamps with negative result:

  • rule: {'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2} - ${field1}', 'target_field': 'time_diff'}}

  • message: {'field2': '2022-12-09', 'field1': '2022-12-10'}

  • processed: {'field2': '2022-12-09', 'field1': '2022-12-10', 'time_diff': '-86400.0'}

Time difference between two timestamps with visible second unit:

  • rule: {'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2:%Y-%m-%d %H:%M:%S} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff', 'output_format': 'seconds', 'show_unit': True}}

  • message: {'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00'}

  • processed: {'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00', 'time_diff': '1278.0 s'}

Time difference between two timestamps with visible millisecond unit:

  • rule: {'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2:%Y-%m-%d %H:%M:%S} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff', 'output_format': 'milliseconds', 'show_unit': True}}

  • message: {'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00'}

  • processed: {'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00', 'time_diff': '1278000.0 ms'}

Time difference between two timestamps with visible nanosecond unit:

  • rule: {'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2:%Y-%m-%d %H:%M:%S} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff', 'output_format': 'nanoseconds', 'show_unit': True}}

  • message: {'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00'}

  • processed: {'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00', 'time_diff': '1278000000000.0 ns'}