From bdfd9d7e05c1834d926633581d96285810cca0be Mon Sep 17 00:00:00 2001 From: Nicholas Kalar Date: Fri, 7 Nov 2025 17:15:33 -0500 Subject: [PATCH] Added tracker --- src/tracker.py | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 src/tracker.py diff --git a/src/tracker.py b/src/tracker.py new file mode 100644 index 0000000..1a296ab --- /dev/null +++ b/src/tracker.py @@ -0,0 +1,48 @@ +import json +from pprint import pprint + +from confluent_kafka import Consumer + +# config for a consumer +# if it loses track of where it was in Kafka, it will automatically +# go to the earliest offset. +consumer_config = { + 'bootstrap.servers': 'localhost:9092', + 'group.id': 'order-tracker', + 'auto.offset.reset': 'earliest' +} + +# Create a Consumer object +consumer = Consumer(consumer_config) + +# Subscribe to a topic (or more than one) to get events +consumer.subscribe(["orders"]) + +print("🟢 Consumer is running and subscribed to orders topic") + +# Continuously check for new events +try: + while True: + # Check for messages + msg = consumer.poll(1.0) + + # Check for empty messages or errors + if msg is None: + continue + if msg.error(): + print(f"🚨 Error: {msg.error()}") + continue + + # Print out valid orders + value = msg.value().decode('utf-8') + order = json.loads(value) + print('šŸ“¦ Received Order!') + pprint(order) + +# Close gracefully +except KeyboardInterrupt: + print("\nšŸ”“ Stopping consumer") + +# Ensure the consumer is closed +finally: + consumer.close()