Output

It is possible to define multiple outputs as a dictionary of <output name>: <output config>. If you define multiple outputs with the attribute default: true then be aware, that logprep only guaranties that one output has received data by calling the batch_finished_callback.

Security Best Practice - Output Connectors

Similar to the input connectors there is a list of available output connectors of which some are only meant for debugging, namely: ConsoleOutput and JsonlOutput. It is advised to not use these in production environments.

When configuring multiple outputs it is also recommend to only use one default output and to define other outputs only for storing custom extra data. Otherwise it cannot be guaranteed that all events are safely stored.

ConfluentKafkaOutput

This section contains the connection settings for ConfluentKafka, the default index, the error index and a buffer size.

Example

 1output:
 2  my_confluent_kafka_output:
 3    type: confluentkafka_output
 4    topic: my_default_topic
 5    flush_timeout: 0.2
 6    send_timeout: 0
 7    kafka_config:
 8        bootstrap.servers: "127.0.0.1:9200,127.0.0.1:9200"
 9        compression.type: gzip
10        request.required.acks: -1
11        queue.buffering.max.ms: 0.5
class logprep.connector.confluent_kafka.output.ConfluentKafkaOutput.Config

Confluent Kafka Output Config

default: bool

(Optional) if false the event are not delivered to this output. But this output can be called as output for extra_data.

topic: str

The topic into which the processed events should be written to.

flush_timeout: float

The maximum time in seconds to wait for the producer to flush the messages to kafka broker. If the buffer is full, the producer will block until the buffer is empty or the timeout is reached. This implies that the producer does not wait for all messages to be send to the broker, if the timeout is reached before the buffer is empty. Default is 0.

send_timeout: float

The maximum time in seconds to wait for an answer from the broker on polling. Default is 0.

kafka_config: MappingProxyType

Kafka configuration for the kafka client. At minimum the following keys must be set:

  • bootstrap.servers (STRING): a comma separated list of kafka brokers

For additional configuration options and their description see: <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md>

DEFAULTS:

  • request.required.acks: -1

  • linger.ms: 0.5

  • compression.codec: none

  • client.id: <<hostname>>

  • queue.buffering.max.messages: 100000

  • statistics.interval.ms: 1000

Security Best Practice - Kafka Output Producer Authentication and Encryption

Kafka authentication is a critical aspect of securing your data pipeline. Ensure that you have the following configurations in place:

  • Use SSL/mTLS encryption for data in transit.

  • Configure SASL or mTLS authentication for your Kafka clients.

  • Regularly rotate your Kafka credentials and secrets.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

ConfluentKafkaGeneratorOutput

The logprep ConfluentKafka generator inherits from the ConfluentKafka connector output. Sends the documents written by the generator to a Kafka topic.

class logprep.generator.confluent_kafka.output.ConfluentKafkaGeneratorOutput.Config

Confluent Kafka Output Config

default: bool

(Optional) if false the event are not delivered to this output. But this output can be called as output for extra_data.

topic: str

The topic into which the processed events should be written to.

flush_timeout: float

The maximum time in seconds to wait for the producer to flush the messages to kafka broker. If the buffer is full, the producer will block until the buffer is empty or the timeout is reached. This implies that the producer does not wait for all messages to be send to the broker, if the timeout is reached before the buffer is empty. Default is 0.

send_timeout: float

The maximum time in seconds to wait for an answer from the broker on polling. Default is 0.

kafka_config: MappingProxyType

Kafka configuration for the kafka client. At minimum the following keys must be set:

  • bootstrap.servers (STRING): a comma separated list of kafka brokers

For additional configuration options and their description see: <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md>

DEFAULTS:

  • request.required.acks: -1

  • linger.ms: 0.5

  • compression.codec: none

  • client.id: <<hostname>>

  • queue.buffering.max.messages: 100000

  • statistics.interval.ms: 1000

Security Best Practice - Kafka Output Producer Authentication and Encryption

Kafka authentication is a critical aspect of securing your data pipeline. Ensure that you have the following configurations in place:

  • Use SSL/mTLS encryption for data in transit.

  • Configure SASL or mTLS authentication for your Kafka clients.

  • Regularly rotate your Kafka credentials and secrets.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

ConsoleOutput

This section describes the ConsoleOutput, which pretty prints documents to the console and can be used for testing.

Example

1output:
2  my_console_output:
3    type: console_output
class logprep.connector.console.output.ConsoleOutput.Config

output config parameters

default: bool

(Optional) if false the event are not delivered to this output. But this output can be called as output for extra_data.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

JsonlOutput

The JsonlOutput Connector can be used to write processed documents to .jsonl files.

Example

1output:
2  my_jsonl_output:
3    type: jsonl_output
4    output_file: path/to/output.file
5    output_file_custom: ""
6    output_file_error: ""
class logprep.connector.jsonl.output.JsonlOutput.Config

Common Configurations

output_file
output_file_custom
default: bool

(Optional) if false the event are not delivered to this output. But this output can be called as output for extra_data.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

OpensearchOutput

This section contains the connection settings for Opensearch, the default index, the error index and a buffer size. Documents are sent in batches to Opensearch to reduce the amount of times connections are created.

The documents desired index is the field _index in the document. It is deleted afterwards. If you want to send documents to data streams, you have to set the field _op_type: create in the document.

Example

 1output:
 2  myopensearch_output:
 3    type: opensearch_output
 4    hosts:
 5        - 127.0.0.1:9200
 6    default_index: default_index
 7    error_index: error_index
 8    message_backlog_size: 10000
 9    timeout: 10000
10    max_retries:
11    user:
12    secret:
13    ca_cert: /path/to/cert.crt
class logprep.connector.opensearch.output.OpensearchOutput.Config

Opensearch Output Config

Security Best Practice - Output Connectors - OpensearchOutput

It is suggested to enable a secure message transfer by setting user, secret and a valid ca_cert.

default: bool

(Optional) if false the event are not delivered to this output. But this output can be called as output for extra_data.

hosts: list[str]

Addresses of opensearch/opensearch servers. Can be a list of hosts or one single host in the format HOST:PORT without specifying a schema. The schema is set automatically to https if a certificate is being used.

default_index: str

Default index to write to if no index was set in the document or the document could not be indexed. The document will be transformed into a string to prevent rejections by the default index.

message_backlog_size: int

Amount of documents to store before sending them.

timeout: int

(Optional) Timeout for the connection (default is 500ms).

user: str | None

(Optional) User used for authentication.

secret: str | None

(Optional) Secret used for authentication.

ca_cert: str | None

(Optional) Path to a SSL ca certificate to verify the ssl context.

flush_timeout: int | None

(Optional) Timeout after message_backlog is flushed if message_backlog_size is not reached.

thread_count: int

Number of threads to use for bulk requests.

queue_size: int

Number of queue size to use for bulk requests.

chunk_size: int

Chunk size to use for bulk requests.

max_chunk_bytes: int

Max chunk size to use for bulk requests. The default is 100MB.

max_retries: int

Max retries for all requests. Default is 3.

desired_cluster_status: list

Desired cluster status for health check as list of strings. Default is [“green”]

default_op_type: str

Default op_type for indexing documents. Default is ‘index’, Consider using ‘create’ for data streams or to prevent overwriting existing documents.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

S3Output

This section contains the connection settings for the AWS s3 output connector.

The target bucket is defined by the bucket configuration parameter. The prefix is defined by the value in the field prefix_field in the document.

Except for the base prefix, all prefixes can have an arrow date pattern that will be replaced with the current date. The pattern needs to be wrapped in %{...}. For example, prefix-%{YY:MM:DD} would be replaced with prefix-%{23:12:06} if the date was 2023-12-06.

Example

 1output:
 2  my_s3_output:
 3    type: s3_output
 4    endpoint_url: http://127.0.0.1:9200
 5    bucket: s3_bucket_name
 6    error_prefix: some_prefix
 7    prefix_field: dotted.field
 8    default_prefix: some_prefix
 9    base_prefix:
10    message_backlog_size: 100000
11    connect_timeout:
12    max_retries:
13    aws_access_key_id:
14    aws_secret_access_key:
15    ca_cert: /path/to/cert.crt
16    use_ssl:
17    call_input_callback:
18    region_name:
class logprep.connector.s3.output.S3Output.Config

S3 Output Config

Security Best Practice - Output Connectors - S3Output

It is suggested to activate SSL for a secure connection. In order to do that set use_ssl and the corresponding ca_cert.

default: bool

(Optional) if false the event are not delivered to this output. But this output can be called as output for extra_data.

endpoint_url: str

PORT.

Type:

Address of s3 endpoint in the format SCHEMA

Type:

HOST

bucket: str

Bucket to write to.

error_prefix: str

Prefix for documents that could not be processed.

prefix_field: str

Field with value to use as prefix for the document.

default_prefix: str

Default prefix if no prefix found in the document.

base_prefix: str | None

base_prefix prefix (optional).

message_backlog_size: int

Backlog size to collect messages before sending a batch (default is 500)

connect_timeout: float

Timeout for the AWS s3 connection (default is 500ms)

max_retries: int

Maximum retry attempts to connect to AWS s3 (default is 0)

aws_access_key_id: str | None

The accees key ID for authentication (optional).

aws_secret_access_key: str | None

The secret used for authentication (optional).

region_name: str | None

Region name for s3 (optional).

ca_cert: str | None

The path to a SSL ca certificate to verify the ssl context (optional)

use_ssl: bool | None

Use SSL or not. Is set to true by default (optional)

call_input_callback: bool | None

The input callback is called after the maximum backlog size has been reached if this is set to True (optional)

flush_timeout: int | None

(Optional) Timeout after message_backlog is flushed if message_backlog_size is not reached.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

HTTPOutput

A http output connector that sends http post requests to paths under a given endpoint

HTTP Output Connector Config Example

An example config file would look like:

1output:
2  myhttpoutput:
3    type: http_output
4    target_url: http://the.target.url:8080
5    username: user
6    password: password

The store method of this connector can be fed with a dictionary or a tuple. If a tuple is passed, the first element is the target path and the second element is the event or a list of events. If a dictionary is passed, the event will be send to the configured root of the target_url.

Security Best Practice - Http Output Connector - Usage

This Connector is currently only used in the log generator and does not have a stable interface. Do not use this in production.

Security Best Practice - Http Output Connector - SSL

This connector does not verify the SSL Context, which could lead to exposing sensitive data.

class logprep.connector.http.output.HttpOutput.Config

Configuration for the HttpOutput.

user: str

User that is used for the basic auth http request

password: str

Password that is used for the basic auth http request

target_url: str

URL of the endpoint that receives the events

timeout: int

Timeout in seconds for the http request

verify: bool | str

Switch to disable ssl verification or path to certificate

default: bool

(Optional) if false the event are not delivered to this output. But this output can be called as output for extra_data.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

HTTPGeneratorOutput

The logprep Http generator inheriting from the http connector output. Sends the documents written by the generator to a http endpoint.

class logprep.generator.http.output.HttpGeneratorOutput.Config

Configuration for the HttpOutput.

user: str

User that is used for the basic auth http request

password: str

Password that is used for the basic auth http request

target_url: str

URL of the endpoint that receives the events

timeout: int

Timeout in seconds for the http request

verify: bool | str

Switch to disable ssl verification or path to certificate

default: bool

(Optional) if false the event are not delivered to this output. But this output can be called as output for extra_data.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check