Added producer
This commit is contained in:
7
.idea/dictionaries/project.xml
generated
Normal file
7
.idea/dictionaries/project.xml
generated
Normal file
@@ -0,0 +1,7 @@
|
||||
<component name="ProjectDictionaryState">
|
||||
<dictionary name="project">
|
||||
<words>
|
||||
<w>vieja</w>
|
||||
</words>
|
||||
</dictionary>
|
||||
</component>
|
||||
41
src/producer.py
Normal file
41
src/producer.py
Normal file
@@ -0,0 +1,41 @@
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from confluent_kafka import Producer
|
||||
|
||||
# Configuration to set up a Kafka Producer
|
||||
PRODUCER_CONFIG = {
|
||||
'bootstrap.servers': 'localhost:9092',
|
||||
}
|
||||
|
||||
# A Producer object created using the config
|
||||
producer = Producer(PRODUCER_CONFIG)
|
||||
|
||||
# Function to "log" if an order was successfully delivered to the Kafka Topic or not
|
||||
def delivery_report(err, mesg):
|
||||
if err:
|
||||
print(f'🚨 Delivery failed: {err}')
|
||||
else:
|
||||
print(f'✅ Delivery successful: {mesg.value().decode('utf-8')}')
|
||||
print(f"✅ Delivered to {mesg.topic()}: partition {mesg.partition()}: at offset {mesg.offset()}")
|
||||
|
||||
# Sample Order
|
||||
order = {
|
||||
'order_id': str(uuid.uuid4()),
|
||||
'user': 'Michelle',
|
||||
'item': 'Lechon Asado', # yum 😋
|
||||
'quantity': 1
|
||||
}
|
||||
|
||||
# Convert order to a Kafka readable value
|
||||
kafka_value = json.dumps(order).encode('utf-8')
|
||||
|
||||
# Send order to topic (and create topic if it doesn't exist)
|
||||
producer.produce(
|
||||
topic='orders',
|
||||
value=kafka_value,
|
||||
callback=delivery_report
|
||||
)
|
||||
|
||||
# Ensures events are sent before closing
|
||||
producer.flush()
|
||||
Reference in New Issue
Block a user