Added tracker
This commit is contained in:
48
src/tracker.py
Normal file
48
src/tracker.py
Normal file
@@ -0,0 +1,48 @@
|
||||
import json
|
||||
from pprint import pprint
|
||||
|
||||
from confluent_kafka import Consumer
|
||||
|
||||
# config for a consumer
|
||||
# if it loses track of where it was in Kafka, it will automatically
|
||||
# go to the earliest offset.
|
||||
consumer_config = {
|
||||
'bootstrap.servers': 'localhost:9092',
|
||||
'group.id': 'order-tracker',
|
||||
'auto.offset.reset': 'earliest'
|
||||
}
|
||||
|
||||
# Create a Consumer object
|
||||
consumer = Consumer(consumer_config)
|
||||
|
||||
# Subscribe to a topic (or more than one) to get events
|
||||
consumer.subscribe(["orders"])
|
||||
|
||||
print("🟢 Consumer is running and subscribed to orders topic")
|
||||
|
||||
# Continuously check for new events
|
||||
try:
|
||||
while True:
|
||||
# Check for messages
|
||||
msg = consumer.poll(1.0)
|
||||
|
||||
# Check for empty messages or errors
|
||||
if msg is None:
|
||||
continue
|
||||
if msg.error():
|
||||
print(f"🚨 Error: {msg.error()}")
|
||||
continue
|
||||
|
||||
# Print out valid orders
|
||||
value = msg.value().decode('utf-8')
|
||||
order = json.loads(value)
|
||||
print('📦 Received Order!')
|
||||
pprint(order)
|
||||
|
||||
# Close gracefully
|
||||
except KeyboardInterrupt:
|
||||
print("\n🔴 Stopping consumer")
|
||||
|
||||
# Ensure the consumer is closed
|
||||
finally:
|
||||
consumer.close()
|
||||
Reference in New Issue
Block a user