Output
It is possible to define multiple outputs as a dictionary of <output name>: <output config>.
If you define multiple outputs with the attribute default: true then be aware, that
logprep only guaranties that one output has received data by calling the
batch_finished_callback.
Security Best Practice - Output Connectors
Similar to the input connectors there is a list of available output connectors of which some
are only meant for debugging, namely: ConsoleOutput and JsonlOutput.
It is advised to not use these in production environments.
When configuring multiple outputs it is also recommend to only use one default output and to define other outputs only for storing custom extra data. Otherwise it cannot be guaranteed that all events are safely stored.
ConfluentKafkaOutput
This section contains the connection settings for ConfluentKafka, the default index, the error index and a buffer size.
Example
1output:
2 my_confluent_kafka_output:
3 type: confluentkafka_output
4 topic: my_default_topic
5 flush_timeout: 0.2
6 send_timeout: 0
7 kafka_config:
8 bootstrap.servers: "127.0.0.1:9200,127.0.0.1:9200"
9 compression.type: gzip
10 request.required.acks: -1
11 queue.buffering.max.ms: 0.5
- class logprep.connector.confluent_kafka.output.ConfluentKafkaOutput.Config
Confluent Kafka Output Config
- default: bool
(Optional) if
falsethe event are not delivered to this output. But this output can be called as output for extra_data.
- topic: str
The topic into which the processed events should be written to.
- flush_timeout: float
The maximum time in seconds to wait for the producer to flush the messages to kafka broker. If the buffer is full, the producer will block until the buffer is empty or the timeout is reached. This implies that the producer does not wait for all messages to be send to the broker, if the timeout is reached before the buffer is empty. Default is
0.
- send_timeout: float
The maximum time in seconds to wait for an answer from the broker on polling. Default is
0.
- kafka_config: MappingProxyType
Kafka configuration for the kafka client. At minimum the following keys must be set:
bootstrap.servers (STRING): a comma separated list of kafka brokers
For additional configuration options and their description see: <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md>
DEFAULTS:
request.required.acks:-1linger.ms:0.5compression.codec:noneclient.id:<<hostname>>queue.buffering.max.messages:100000statistics.interval.ms:1000
Security Best Practice - Kafka Output Producer Authentication and Encryption
Kafka authentication is a critical aspect of securing your data pipeline. Ensure that you have the following configurations in place:
Use SSL/mTLS encryption for data in transit.
Configure SASL or mTLS authentication for your Kafka clients.
Regularly rotate your Kafka credentials and secrets.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
ConfluentKafkaGeneratorOutput
The logprep ConfluentKafka generator inherits from the ConfluentKafka connector output. Sends the documents written by the generator to a Kafka topic.
- class logprep.generator.confluent_kafka.output.ConfluentKafkaGeneratorOutput.Config
Confluent Kafka Output Config
- default: bool
(Optional) if
falsethe event are not delivered to this output. But this output can be called as output for extra_data.
- topic: str
The topic into which the processed events should be written to.
- flush_timeout: float
The maximum time in seconds to wait for the producer to flush the messages to kafka broker. If the buffer is full, the producer will block until the buffer is empty or the timeout is reached. This implies that the producer does not wait for all messages to be send to the broker, if the timeout is reached before the buffer is empty. Default is
0.
- send_timeout: float
The maximum time in seconds to wait for an answer from the broker on polling. Default is
0.
- kafka_config: MappingProxyType
Kafka configuration for the kafka client. At minimum the following keys must be set:
bootstrap.servers (STRING): a comma separated list of kafka brokers
For additional configuration options and their description see: <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md>
DEFAULTS:
request.required.acks:-1linger.ms:0.5compression.codec:noneclient.id:<<hostname>>queue.buffering.max.messages:100000statistics.interval.ms:1000
Security Best Practice - Kafka Output Producer Authentication and Encryption
Kafka authentication is a critical aspect of securing your data pipeline. Ensure that you have the following configurations in place:
Use SSL/mTLS encryption for data in transit.
Configure SASL or mTLS authentication for your Kafka clients.
Regularly rotate your Kafka credentials and secrets.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
ConsoleOutput
This section describes the ConsoleOutput, which pretty prints documents to the console and can be used for testing.
Example
1output:
2 my_console_output:
3 type: console_output
- class logprep.connector.console.output.ConsoleOutput.Config
output config parameters
- default: bool
(Optional) if
falsethe event are not delivered to this output. But this output can be called as output for extra_data.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
JsonlOutput
The JsonlOutput Connector can be used to write processed documents to .jsonl files.
Example
1output:
2 my_jsonl_output:
3 type: jsonl_output
4 output_file: path/to/output.file
5 output_file_custom: ""
6 output_file_error: ""
- class logprep.connector.jsonl.output.JsonlOutput.Config
Common Configurations
- output_file
- output_file_custom
- default: bool
(Optional) if
falsethe event are not delivered to this output. But this output can be called as output for extra_data.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
OpensearchOutput
This section contains the connection settings for Opensearch, the default index, the error index and a buffer size. Documents are sent in batches to Opensearch to reduce the amount of times connections are created.
The documents desired index is the field _index in the document. It is deleted afterwards.
If you want to send documents to data streams, you have to set the field _op_type: create in
the document.
Example
1output:
2 myopensearch_output:
3 type: opensearch_output
4 hosts:
5 - 127.0.0.1:9200
6 default_index: default_index
7 error_index: error_index
8 message_backlog_size: 10000
9 timeout: 10000
10 max_retries:
11 user:
12 secret:
13 ca_cert: /path/to/cert.crt
- class logprep.connector.opensearch.output.OpensearchOutput.Config
Opensearch Output Config
Security Best Practice - Output Connectors - OpensearchOutput
It is suggested to enable a secure message transfer by setting
user,secretand a validca_cert.- default: bool
(Optional) if
falsethe event are not delivered to this output. But this output can be called as output for extra_data.
- hosts: list[str]
Addresses of opensearch/opensearch servers. Can be a list of hosts or one single host in the format HOST:PORT without specifying a schema. The schema is set automatically to https if a certificate is being used.
- default_index: str
Default index to write to if no index was set in the document or the document could not be indexed. The document will be transformed into a string to prevent rejections by the default index.
- message_backlog_size: int
Amount of documents to store before sending them.
- timeout: int
(Optional) Timeout for the connection (default is 500ms).
- user: str | None
(Optional) User used for authentication.
- secret: str | None
(Optional) Secret used for authentication.
- ca_cert: str | None
(Optional) Path to a SSL ca certificate to verify the ssl context.
- flush_timeout: int | None
(Optional) Timeout after
message_backlogis flushed ifmessage_backlog_sizeis not reached.
- thread_count: int
Number of threads to use for bulk requests.
- queue_size: int
Number of queue size to use for bulk requests.
- chunk_size: int
Chunk size to use for bulk requests.
- max_chunk_bytes: int
Max chunk size to use for bulk requests. The default is 100MB.
- max_retries: int
Max retries for all requests. Default is 3.
- desired_cluster_status: list
Desired cluster status for health check as list of strings. Default is [“green”]
- default_op_type: str
Default op_type for indexing documents. Default is ‘index’, Consider using ‘create’ for data streams or to prevent overwriting existing documents.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
S3Output
This section contains the connection settings for the AWS s3 output connector.
The target bucket is defined by the bucket configuration parameter.
The prefix is defined by the value in the field prefix_field in the document.
Except for the base prefix, all prefixes can have an arrow date pattern that will be replaced with
the current date. The pattern needs to be wrapped in %{...}.
For example, prefix-%{YY:MM:DD} would be replaced with prefix-%{23:12:06} if the
date was 2023-12-06.
Example
1output:
2 my_s3_output:
3 type: s3_output
4 endpoint_url: http://127.0.0.1:9200
5 bucket: s3_bucket_name
6 error_prefix: some_prefix
7 prefix_field: dotted.field
8 default_prefix: some_prefix
9 base_prefix:
10 message_backlog_size: 100000
11 connect_timeout:
12 max_retries:
13 aws_access_key_id:
14 aws_secret_access_key:
15 ca_cert: /path/to/cert.crt
16 use_ssl:
17 call_input_callback:
18 region_name:
- class logprep.connector.s3.output.S3Output.Config
S3 Output Config
Security Best Practice - Output Connectors - S3Output
It is suggested to activate SSL for a secure connection. In order to do that set
use_ssland the correspondingca_cert.- default: bool
(Optional) if
falsethe event are not delivered to this output. But this output can be called as output for extra_data.
- endpoint_url: str
PORT.
- Type:
Address of s3 endpoint in the format SCHEMA
- Type:
HOST
- bucket: str
Bucket to write to.
- error_prefix: str
Prefix for documents that could not be processed.
- prefix_field: str
Field with value to use as prefix for the document.
- default_prefix: str
Default prefix if no prefix found in the document.
- base_prefix: str | None
base_prefix prefix (optional).
- message_backlog_size: int
Backlog size to collect messages before sending a batch (default is 500)
- connect_timeout: float
Timeout for the AWS s3 connection (default is 500ms)
- max_retries: int
Maximum retry attempts to connect to AWS s3 (default is 0)
- aws_access_key_id: str | None
The accees key ID for authentication (optional).
- aws_secret_access_key: str | None
The secret used for authentication (optional).
- region_name: str | None
Region name for s3 (optional).
- ca_cert: str | None
The path to a SSL ca certificate to verify the ssl context (optional)
- use_ssl: bool | None
Use SSL or not. Is set to true by default (optional)
- call_input_callback: bool | None
The input callback is called after the maximum backlog size has been reached if this is set to True (optional)
- flush_timeout: int | None
(Optional) Timeout after
message_backlogis flushed ifmessage_backlog_sizeis not reached.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
HTTPOutput
A http output connector that sends http post requests to paths under a given endpoint
HTTP Output Connector Config Example
An example config file would look like:
1output:
2 myhttpoutput:
3 type: http_output
4 target_url: http://the.target.url:8080
5 username: user
6 password: password
The store method of this connector can be fed with a dictionary or a tuple.
If a tuple is passed, the first element is the target path and
the second element is the event or a list of events.
If a dictionary is passed, the event will be send to the configured root
of the target_url.
Security Best Practice - Http Output Connector - Usage
This Connector is currently only used in the log generator and does not have a stable interface. Do not use this in production.
Security Best Practice - Http Output Connector - SSL
This connector does not verify the SSL Context, which could lead to exposing sensitive data.
- class logprep.connector.http.output.HttpOutput.Config
Configuration for the HttpOutput.
- user: str
User that is used for the basic auth http request
- password: str
Password that is used for the basic auth http request
- target_url: str
URL of the endpoint that receives the events
- timeout: int
Timeout in seconds for the http request
- verify: bool | str
Switch to disable ssl verification or path to certificate
- default: bool
(Optional) if
falsethe event are not delivered to this output. But this output can be called as output for extra_data.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
HTTPGeneratorOutput
The logprep Http generator inheriting from the http connector output. Sends the documents written by the generator to a http endpoint.
- class logprep.generator.http.output.HttpGeneratorOutput.Config
Configuration for the HttpOutput.
- user: str
User that is used for the basic auth http request
- password: str
Password that is used for the basic auth http request
- target_url: str
URL of the endpoint that receives the events
- timeout: int
Timeout in seconds for the http request
- verify: bool | str
Switch to disable ssl verification or path to certificate
- default: bool
(Optional) if
falsethe event are not delivered to this output. But this output can be called as output for extra_data.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check