{ "cells": [ { "cell_type": "markdown", "id": "3ea6bd8d", "metadata": {}, "source": [ "## Input Connectors -> ConfluentKafka Input\n", "\n", "1. prepare the kafka cluster" ] }, { "cell_type": "code", "execution_count": null, "id": "c7a86700", "metadata": { "vscode": { "languageId": "shellscript" } }, "outputs": [], "source": [ "%%bash\n", "# starting kafka container\n", "docker compose -f ../../../../../examples/compose/docker-compose.yml down -v \n", "docker compose -f ../../../../../examples/compose/docker-compose.yml up -d kafka\n", "# creating the topic\n", "docker exec -i kafka /bin/bash -c \"/opt/bitnami/kafka/bin/kafka-topics.sh --create --topic consumer --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1\"\n", "# waiting for kafka to be ready\n", "sleep 10\n" ] }, { "cell_type": "markdown", "id": "3c7a70d0", "metadata": {}, "source": [ "2. produce messages" ] }, { "cell_type": "code", "execution_count": null, "id": "458b38c1", "metadata": { "vscode": { "languageId": "shellscript" } }, "outputs": [], "source": [ "%%bash\n", "# producing 3 events to kafka topic consumer\n", "docker exec -i kafka /bin/bash -c \"echo '{\\\"message\\\": \\\"the message\\\"}' | /opt/bitnami/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic consumer \"\n", "# showing events in kafka\n", "# docker exec -i kafka /bin/bash -c \"/opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic consumer --from-beginning --max-messages 10\"\n" ] }, { "cell_type": "markdown", "id": "3f58d924", "metadata": {}, "source": [ "3. initializing the confluentkafka_input connector" ] }, { "cell_type": "code", "execution_count": null, "id": "e3cd8f43", "metadata": {}, "outputs": [], "source": [ "import sys\n", "import logging\n", "from logprep.factory import Factory\n", "from logprep.ng.connector.confluent_kafka.input import ConfluentKafkaInput\n", "\n", "# Configure logging\n", "logging.basicConfig(\n", " level=logging.DEBUG, \n", " stream=sys.stdout\n", ")\n", "\n", "# Create a Kafka input connector configuration\n", "\n", "kafka_config = {\n", " \"kafka\": {\n", " \"type\": \"ng_confluentkafka_input\",\n", " \"topic\": \"consumer\",\n", " \"kafka_config\": {\n", " \"bootstrap.servers\": \"127.0.0.1:9092\",\n", " \"group.id\": \"cgroup3\",\n", " \"enable.auto.commit\": \"true\",\n", " \"auto.commit.interval.ms\": \"10000\",\n", " \"enable.auto.offset.store\": \"false\",\n", " \"queued.min.messages\": \"100000\",\n", " \"queued.max.messages.kbytes\": \"65536\",\n", " \"statistics.interval.ms\": \"60000\"\n", " }\n", " }\n", "}\n", "\n", "kafka_connector: ConfluentKafkaInput = Factory.create(kafka_config)\n", "\n", "# Start the connector\n", "kafka_connector.setup()\n", "\n", "# show the current backlog\n", "print(f\"{kafka_connector.event_backlog.backlog=}\")\n", "\n", "# Consume 3 messages from the Kafka topic\n", "event = next(kafka_connector(timeout=10))\n", "event = next(kafka_connector(timeout=10))\n", "event = next(kafka_connector(timeout=10))\n", "\n", "# show the backlog after consuming an event\n", "print(f\"{kafka_connector.event_backlog.backlog=}\")\n", "\n", "# Shut down the connector to close the producer\n", "kafka_connector.shut_down()\n" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.3" } }, "nbformat": 4, "nbformat_minor": 5 }