Input Connectors -> ConfluentKafka Input
prepare the kafka cluster
[ ]:
%%bash
# starting kafka container
docker compose -f ../../../../../examples/compose/docker-compose.yml down -v
docker compose -f ../../../../../examples/compose/docker-compose.yml up -d kafka
# creating the topic
docker exec -i kafka /bin/bash -c "/opt/bitnami/kafka/bin/kafka-topics.sh --create --topic consumer --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1"
# waiting for kafka to be ready
sleep 10
produce messages
[ ]:
%%bash
# producing 3 events to kafka topic consumer
docker exec -i kafka /bin/bash -c "echo '{\"message\": \"the message\"}' | /opt/bitnami/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic consumer "
# showing events in kafka
# docker exec -i kafka /bin/bash -c "/opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic consumer --from-beginning --max-messages 10"
initializing the confluentkafka_input connector
[ ]:
import sys
import logging
from logprep.factory import Factory
from logprep.ng.connector.confluent_kafka.input import ConfluentKafkaInput
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
stream=sys.stdout
)
# Create a Kafka input connector configuration
kafka_config = {
"kafka": {
"type": "ng_confluentkafka_input",
"topic": "consumer",
"kafka_config": {
"bootstrap.servers": "127.0.0.1:9092",
"group.id": "cgroup3",
"enable.auto.commit": "true",
"auto.commit.interval.ms": "10000",
"enable.auto.offset.store": "false",
"queued.min.messages": "100000",
"queued.max.messages.kbytes": "65536",
"statistics.interval.ms": "60000"
}
}
}
kafka_connector: ConfluentKafkaInput = Factory.create(kafka_config)
# Start the connector
kafka_connector.setup()
# show the current backlog
print(f"{kafka_connector.event_backlog.backlog=}")
# Consume 3 messages from the Kafka topic
event = next(kafka_connector(timeout=10))
event = next(kafka_connector(timeout=10))
event = next(kafka_connector(timeout=10))
# show the backlog after consuming an event
print(f"{kafka_connector.event_backlog.backlog=}")
# Shut down the connector to close the producer
kafka_connector.shut_down()