Compare commits

..

8 Commits

Author SHA1 Message Date
bd7c590646 Experimentation 2025-11-06 20:37:37 -05:00
764078562a Fixed typos 2025-11-06 18:37:22 -05:00
bf3bd1b147 Updated to build 2025-11-06 16:43:02 -05:00
a531df5e9f Utilizing Docker for portability 2025-11-06 16:32:24 -05:00
085cc008b4 Added addition context 2025-11-06 16:32:04 -05:00
a141fa76f1 fixed logging 2025-11-03 23:16:11 -05:00
388c39973c Added missing field 2025-11-03 23:15:53 -05:00
a60f76c30e Updated README 2025-11-03 23:15:45 -05:00
9 changed files with 141 additions and 25 deletions

1
.gitignore vendored
View File

@@ -195,3 +195,4 @@ cython_debug/
.DS_Store
output/*
.vscode/settings.json

11
Dockerfile Normal file
View File

@@ -0,0 +1,11 @@
FROM python:3.13
WORKDIR /ETL
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
VOLUME ["/etl/logs"]
CMD ["python3", "./src/load.py"]

View File

@@ -1,5 +1,30 @@
# LMS-DB-ETL
An Extract, Transform, Load app to gather book information from public APIs for a POC LMS project
An Extract, Transform, Load (ETL) app to gather book information from public APIs for
a Proof of Concept Library Management System project.
(Past Git history can be found at: https://github.com/Kalarsoft/LMS-DB-ETL and
https://gitea.com/NickKalar/LMS-DB-ETL)
## Problem
Currently, I am working on building a Library Management System (LMS) to help
develop and showcase my software engineering skills. In order to fully test
and run the LMS, I need to have a database that is populated by a variety of
different media. As I am one person, and have only about 300 books to my name,
this problem needed a better solution that manually adding in those books.
## Solution
This project seeks to seed a database with book details, mostly pulled from
public APIs. The current version uses the Google Books API and Open Library
API. After pulling data from these APIs for several books, the data is merged
and transformed to be loaded into a PostgreSQL database for consumption by the
RESTful APIs associated with the LMS project.
This is a rudimentary ETL pipeline, as it uses no external tools and uses only
2 Python libraries for making the API calls and connecting to the database.
However, it does showcase my understanding of Data Engineering and the ETL
cycle.
## Setup
Environmental Variables:
`GOOGLE_API_KEY` - API Key required for using the Google Books API.
@@ -17,7 +42,7 @@ match the column names of the database schema.
## load.py
Takes the JSON file created by transform.py and loads the data into a PostgreSQL database for
retreival later.
retrieval later.
## orchestrator.py
Handles the orchestration of each program being ran one after the other. Ensures each
@@ -30,15 +55,19 @@ needs to be on its own line and any special characters should be escaped.
## How To Use
1) Create a virtual environment (optional, but best practice)
```zsh
python3 -m venv ./.venv
source ./.venv/bin/activate
```
2) Use Pip to install all required packages
```python
pip install -r requirements
```zsh
pip install -r requirements.txt
```
3) Run the Orchestrator:
```python
```zsh
python src/orchestrator.py
```
OR
```python
```zsh
python3 src/orchestrator.py
```

46
compose.yaml Normal file
View File

@@ -0,0 +1,46 @@
name: etl-and-db
services:
db:
container_name: postgres_db
image: postgres:18.0-trixie
networks:
- backend
expose:
- "5432"
ports:
- "5432:5432"
shm_size: 128mb
environment:
- POSTGRES_PASSWORD=${DB_PASSWORD}
- POSTGRES_USER=${DB_USER}
- POSTGRES_DB=${DB_NAME}
volumes:
- postgres_data:/var/lib/postgresql
healthcheck:
test: ["CMD", "pg_isready", "-U", "etl_agent", "-d", "LMS-Test"]
interval: 30s
retries: 3
start_period: 30s
timeout: 10s
restart: always
etl:
container_name: etl_pipeline
image: etl-pipeline
build: .
depends_on:
- db
networks:
- backend
command: "python3 ./src/orchestrator.py"
restart: no
volumes:
- /tmp/etl/logs/:/var/lib/etl/logs
networks:
backend:
name: pg_network
volumes:
postgres_data:

View File

@@ -6,15 +6,16 @@ import json
from dotenv import load_dotenv
from datetime import date, datetime
load_dotenv
load_dotenv()
log_file = os.getenv('LOG_FILE')
logging.basicConfig(filename=log_file, level=logging.INFO)
logger = logging.getLogger('extract.py')
google_api_key = os.getenv('GOOGLE_API_KEY')
google_header = {'key': google_api_key}
open_lib_header = {'User-Agent': 'Kalar-LMS nick@kalar.codes'}
logger = logging.getLogger('extract.py')
logging.basicConfig(filename=os.getenv('LOG_FILE'), level=logging.INFO)
today = date.today()
def extract_book_json(url, header=[]):

View File

@@ -1,4 +1,5 @@
import os
import time
import logging
import json
import psycopg
@@ -9,23 +10,33 @@ import random
load_dotenv()
logger = logging.getLogger('load.py')
logging.basicConfig(filename=os.getenv('LOG_FILE'), level=logging.INFO)
log_file = os.getenv('LOG_FILE')
logging.basicConfig(filename=log_file, level=logging.INFO)
logger = logging.getLogger('extract.py')
db_name = os.getenv('DB_NAME')
db_user = os.getenv('DB_USER')
db_password = os.getenv('DB_PASSWORD')
db_host = os.getenv('DB_HOST')
db_port = os.getenv('DB_PORT')
today = date.today()
def start():
with psycopg.connect(f'dbname={db_name} user={db_user} password={db_password}') as conn, \
logger.info(f"{datetime.now()}: Attempting connection...")
# Attempting to connect to the Book Database for loading book information
db_connection_string = f'dbname={db_name} user={db_user} password={db_password} host={db_host} port={db_port}'
with psycopg.connect(db_connection_string) as conn, \
open(f'output/transformed_{today}.json', 'r') as transformed_books:
logger.info(f'{datetime.now()}: Connection established')
books = json.loads(transformed_books.read())
with conn.cursor() as cur:
cur.execute(f'DROP TABLE IF EXISTS Collection_Item') # TODO: REMOVE WHEN TESTING COMPLETED
cur.execute(f'DROP TABLE IF EXISTS collection_item') # TODO: REMOVE WHEN TESTING COMPLETED
cur.execute(sql_statements.collection_item_table_creation)
logger.info(f'{datetime.now()}: Table Created')
load_transformed_books(cur, books)
logger.info(f'{datetime.now()}: Books loaded')
def load_transformed_books(cursor, books):
@@ -39,12 +50,14 @@ def load_transformed_books(cursor, books):
`collection_item` SQL table
'''
for book in books['books']:
# This simulates a library buying multiple copies of a book.
# This for i in range statement simulates a library buying multiple copies of a book.
try:
for i in range(random.randrange(1, 10)):
cursor.execute(sql_statements.collection_insert_statement(book))
logger.info(f'{datetime.now()}:Book {book['title']} loaded {i+1} times.')
except Exception as err:
# if one, or even a series of books are bad, we still want to load what we can.
# Log the error and move on.
logger.error(f'{err} at {book.title}')
if __name__ == '__main__':

View File

@@ -5,12 +5,20 @@ import transform
import load
import logging
from datetime import date, datetime
from dotenv import load_dotenv
logger = logging.getLogger('orchestrator.py')
logging.basicConfig(filename=os.getenv('LOG_FILE'), level=logging.INFO)
load_dotenv()
log_file = os.getenv('LOG_FILE')
logging.basicConfig(filename=log_file, level=logging.INFO)
logger = logging.getLogger('extract.py')
today = date.today()
# The Orchestrator is a simple app designed to facilitate the execution of
# the ETL pipeline. Should any system fail, the Orchestrator will log the
# error and exit.
if __name__ == '__main__':
try:
logger.info(f'{datetime.now()}:Starting extract.py')

View File

@@ -48,7 +48,8 @@ library_table_creation = '''
CREATE TABLE IF NOT EXISTS "library"(
"id" BIGINT NOT NULL,
"name" VARCHAR(255) NOT NULL,
"address" VARCHAR(255) NOT NULL
"address" VARCHAR(255) NOT NULL,
"is_archived" BOOLEAN NOT NULL
);
'''

View File

@@ -2,9 +2,13 @@ import os
import json
import logging
from datetime import date, datetime
from dotenv import load_dotenv
logger = logging.getLogger('transform.py')
logging.basicConfig(filename=os.getenv('LOG_FILE'), level=logging.INFO)
load_dotenv()
log_file = os.getenv('LOG_FILE')
logging.basicConfig(filename=log_file, level=logging.INFO)
logger = logging.getLogger('extract.py')
today = date.today()
@@ -35,7 +39,7 @@ def format_sort_title(title):
def combine_raw_jsons(google_json, ol_json):
'''
Returns a dictionary consisting of an array of dictionarys.
Returns a dictionary consisting of an array of dictionaries.
Each child dictionary is a transformed book ready to be
inserted into a database.
@@ -53,6 +57,7 @@ def combine_raw_jsons(google_json, ol_json):
isbn = ol_json['book_data'][index]['isbn']
sort_title = format_sort_title(title)
# Ensuring field variables have valid data
if 'categories' in google_json['book_data'][index]['volumeInfo']:
categories = ', '.join(google_json['book_data'][index]['volumeInfo']['categories'])
else:
@@ -65,6 +70,7 @@ def combine_raw_jsons(google_json, ol_json):
if 'publishedDate' in google_json['book_data'][index]['volumeInfo']:
published_date = google_json['book_data'][index]['volumeInfo']['publishedDate']
# Making sure the publishing date has a month and day associated
if len(published_date) == 4:
published_date += '-12-31'
elif len(published_date) < 10:
@@ -72,8 +78,6 @@ def combine_raw_jsons(google_json, ol_json):
else:
published_date = '9999-12-31'
lost_date = '9999-12-31'
if 'printType' in google_json['book_data'][index]['volumeInfo']:
print_type = google_json['book_data'][index]['volumeInfo']['printType']
else:
@@ -94,6 +98,7 @@ def combine_raw_jsons(google_json, ol_json):
description = ol_json['book_data'][index]['description']
price_in_cents = ol_json['book_data'][index]['price_in_cents']
cover_image_uri = ol_json['book_data'][index]['cover_image_uri']
lost_date = '9999-12-31'
transformed_dictionary_entry = {
'title': title,
@@ -116,6 +121,7 @@ def combine_raw_jsons(google_json, ol_json):
'price_in_cents': price_in_cents,
'cover_image_uri': cover_image_uri,
}
transformed_dictionary['books'].append(transformed_dictionary_entry)
return transformed_dictionary