Compare commits
5 Commits
ffbe9274fe
...
a531df5e9f
| Author | SHA1 | Date | |
|---|---|---|---|
| a531df5e9f | |||
| 085cc008b4 | |||
| a141fa76f1 | |||
| 388c39973c | |||
| a60f76c30e |
9
Dockerfile
Normal file
9
Dockerfile
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
FROM python:3.13
|
||||||
|
WORKDIR /ETL
|
||||||
|
|
||||||
|
COPY requirements.txt ./
|
||||||
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
|
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
CMD ["python3", "./src/load.py"]
|
||||||
39
README.md
39
README.md
@@ -1,5 +1,30 @@
|
|||||||
# LMS-DB-ETL
|
# LMS-DB-ETL
|
||||||
An Extract, Transform, Load app to gather book information from public APIs for a POC LMS project
|
An Extract, Transform, Load (ETL) app to gather book information from public APIs for
|
||||||
|
a Proof of Concept Library Management System project.
|
||||||
|
|
||||||
|
(Past Git history can be found at: https://github.com/Kalarsoft/LMS-DB-ETL and
|
||||||
|
https://gitea.com/NickKalar/LMS-DB-ETL)
|
||||||
|
|
||||||
|
## Problem
|
||||||
|
Currently, I am working on building a Library Management System (LMS) to help
|
||||||
|
develop and showcase my software engineering skills. In order to fully test
|
||||||
|
and run the LMS, I need to have a database that is populated by a variety of
|
||||||
|
different media. As I am one person, and have only about 300 books to my name,
|
||||||
|
this problem needed a better solution that manually adding in those books.
|
||||||
|
|
||||||
|
## Solution
|
||||||
|
This project seeks to seed a database with book details, mostly pulled from
|
||||||
|
public APIs. The current version uses the Google Books API and Open Library
|
||||||
|
API. After pulling data from these APIs for several books, the data is merged
|
||||||
|
and transformed to be loaded into a PostgreSQL database for consumption by the
|
||||||
|
RESTful APIs associated with the LMS project.
|
||||||
|
|
||||||
|
This is a rudimentary ETL pipeline, as it uses no external tools and uses only
|
||||||
|
2 Python libraries for making the API calls and connecting to the database.
|
||||||
|
However, it does showcase my understanding of Data Engineering and the ETL
|
||||||
|
cycle.
|
||||||
|
|
||||||
|
## Setup
|
||||||
|
|
||||||
Environmental Variables:
|
Environmental Variables:
|
||||||
`GOOGLE_API_KEY` - API Key required for using the Google Books API.
|
`GOOGLE_API_KEY` - API Key required for using the Google Books API.
|
||||||
@@ -30,15 +55,19 @@ needs to be on its own line and any special characters should be escaped.
|
|||||||
|
|
||||||
## How To Use
|
## How To Use
|
||||||
1) Create a virtual environment (optional, but best practice)
|
1) Create a virtual environment (optional, but best practice)
|
||||||
|
```zsh
|
||||||
|
python3 -m venv ./.venv
|
||||||
|
source ./.venv/bin/activate
|
||||||
|
```
|
||||||
2) Use Pip to install all required packages
|
2) Use Pip to install all required packages
|
||||||
```python
|
```zsh
|
||||||
pip install -r requirements
|
pip install -r requirements.txt
|
||||||
```
|
```
|
||||||
3) Run the Orchestrator:
|
3) Run the Orchestrator:
|
||||||
```python
|
```zsh
|
||||||
python src/orchestrator.py
|
python src/orchestrator.py
|
||||||
```
|
```
|
||||||
OR
|
OR
|
||||||
```python
|
```zsh
|
||||||
python3 src/orchestrator.py
|
python3 src/orchestrator.py
|
||||||
```
|
```
|
||||||
46
compose.yaml
Normal file
46
compose.yaml
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
name: etl-and-db
|
||||||
|
|
||||||
|
services:
|
||||||
|
db:
|
||||||
|
container_name: postgres_db
|
||||||
|
image: postgres:18.0-trixie
|
||||||
|
networks:
|
||||||
|
- backend
|
||||||
|
expose:
|
||||||
|
- "5432"
|
||||||
|
ports:
|
||||||
|
- "8081:5432"
|
||||||
|
shm_size: 128mb
|
||||||
|
environment:
|
||||||
|
- POSTGRES_PASSWORD=${DB_PASSWORD}
|
||||||
|
- POSTGRES_USER=${DB_USER}
|
||||||
|
- POSTGRES_DB=${DB_NAME}
|
||||||
|
volumes:
|
||||||
|
- postgres_data:/var/lib/postgresql
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "pg_isready", "-U", "etl_agent", "-d", "LMS-Test"]
|
||||||
|
interval: 30s
|
||||||
|
retries: 3
|
||||||
|
start_period: 30s
|
||||||
|
timeout: 10s
|
||||||
|
restart: always
|
||||||
|
|
||||||
|
etl:
|
||||||
|
image: etl-pipeline:0.3
|
||||||
|
depends_on:
|
||||||
|
- db
|
||||||
|
networks:
|
||||||
|
- backend
|
||||||
|
command: "python3 ./src/orchestrator.py"
|
||||||
|
restart: no
|
||||||
|
volumes:
|
||||||
|
- postgres_data:/var/lib/etl
|
||||||
|
pre_stop:
|
||||||
|
- command: docker cp CONTAINER:/var/lib/etl/lms-etl.log /tmp/app_logs
|
||||||
|
|
||||||
|
networks:
|
||||||
|
backend:
|
||||||
|
name: pg_network
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
postgres_data:
|
||||||
@@ -6,15 +6,16 @@ import json
|
|||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from datetime import date, datetime
|
from datetime import date, datetime
|
||||||
|
|
||||||
load_dotenv
|
load_dotenv()
|
||||||
|
|
||||||
|
log_file = os.getenv('LOG_FILE')
|
||||||
|
logging.basicConfig(filename=log_file, level=logging.INFO)
|
||||||
|
logger = logging.getLogger('extract.py')
|
||||||
|
|
||||||
google_api_key = os.getenv('GOOGLE_API_KEY')
|
google_api_key = os.getenv('GOOGLE_API_KEY')
|
||||||
google_header = {'key': google_api_key}
|
google_header = {'key': google_api_key}
|
||||||
open_lib_header = {'User-Agent': 'Kalar-LMS nick@kalar.codes'}
|
open_lib_header = {'User-Agent': 'Kalar-LMS nick@kalar.codes'}
|
||||||
|
|
||||||
logger = logging.getLogger('extract.py')
|
|
||||||
logging.basicConfig(filename=os.getenv('LOG_FILE'), level=logging.INFO)
|
|
||||||
|
|
||||||
today = date.today()
|
today = date.today()
|
||||||
|
|
||||||
def extract_book_json(url, header=[]):
|
def extract_book_json(url, header=[]):
|
||||||
|
|||||||
23
src/load.py
23
src/load.py
@@ -1,4 +1,5 @@
|
|||||||
import os
|
import os
|
||||||
|
import time
|
||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
import psycopg
|
import psycopg
|
||||||
@@ -9,23 +10,33 @@ import random
|
|||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
logger = logging.getLogger('load.py')
|
log_file = os.getenv('LOG_FILE')
|
||||||
logging.basicConfig(filename=os.getenv('LOG_FILE'), level=logging.INFO)
|
logging.basicConfig(filename=log_file, level=logging.INFO)
|
||||||
|
logger = logging.getLogger('extract.py')
|
||||||
|
|
||||||
db_name = os.getenv('DB_NAME')
|
db_name = os.getenv('DB_NAME')
|
||||||
db_user = os.getenv('DB_USER')
|
db_user = os.getenv('DB_USER')
|
||||||
db_password = os.getenv('DB_PASSWORD')
|
db_password = os.getenv('DB_PASSWORD')
|
||||||
|
db_host = os.getenv('DB_HOST')
|
||||||
|
db_port = os.getenv('DB_PORT')
|
||||||
|
|
||||||
today = date.today()
|
today = date.today()
|
||||||
|
|
||||||
def start():
|
def start():
|
||||||
with psycopg.connect(f'dbname={db_name} user={db_user} password={db_password}') as conn, \
|
logger.info(f"{datetime.now()}: Attempting connection...")
|
||||||
|
|
||||||
|
# Attempting to connect to the Book Database for loading book information
|
||||||
|
db_connection_string = f'dbname={db_name} user={db_user} password={db_password} host={db_host} port={db_port}'
|
||||||
|
with psycopg.connect(db_connection_string) as conn, \
|
||||||
open(f'output/transformed_{today}.json', 'r') as transformed_books:
|
open(f'output/transformed_{today}.json', 'r') as transformed_books:
|
||||||
|
logger.info(f'{datetime.now()}: Connection established')
|
||||||
books = json.loads(transformed_books.read())
|
books = json.loads(transformed_books.read())
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
cur.execute(f'DROP TABLE IF EXISTS Collection_Item') # TODO: REMOVE WHEN TESTING COMPLETED
|
cur.execute(f'DROP TABLE IF EXISTS collection_item') # TODO: REMOVE WHEN TESTING COMPLETED
|
||||||
cur.execute(sql_statements.collection_item_table_creation)
|
cur.execute(sql_statements.collection_item_table_creation)
|
||||||
|
logger.info(f'{datetime.now()}: Table Created')
|
||||||
load_transformed_books(cur, books)
|
load_transformed_books(cur, books)
|
||||||
|
logger.info(f'{datetime.now()}: Books loaded')
|
||||||
|
|
||||||
|
|
||||||
def load_transformed_books(cursor, books):
|
def load_transformed_books(cursor, books):
|
||||||
@@ -39,12 +50,14 @@ def load_transformed_books(cursor, books):
|
|||||||
`collection_item` SQL table
|
`collection_item` SQL table
|
||||||
'''
|
'''
|
||||||
for book in books['books']:
|
for book in books['books']:
|
||||||
# This simulates a library buying multiple copies of a book.
|
# This for i in range statment simulates a library buying multiple copies of a book.
|
||||||
try:
|
try:
|
||||||
for i in range(random.randrange(1, 10)):
|
for i in range(random.randrange(1, 10)):
|
||||||
cursor.execute(sql_statements.collection_insert_statement(book))
|
cursor.execute(sql_statements.collection_insert_statement(book))
|
||||||
logger.info(f'{datetime.now()}:Book {book['title']} loaded {i+1} times.')
|
logger.info(f'{datetime.now()}:Book {book['title']} loaded {i+1} times.')
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
# if one, or even a series of books are bad, we still want to load what we can.
|
||||||
|
# Log the error and move on.
|
||||||
logger.error(f'{err} at {book.title}')
|
logger.error(f'{err} at {book.title}')
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|||||||
@@ -5,12 +5,20 @@ import transform
|
|||||||
import load
|
import load
|
||||||
import logging
|
import logging
|
||||||
from datetime import date, datetime
|
from datetime import date, datetime
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
logger = logging.getLogger('orchestrator.py')
|
load_dotenv()
|
||||||
logging.basicConfig(filename=os.getenv('LOG_FILE'), level=logging.INFO)
|
|
||||||
|
log_file = os.getenv('LOG_FILE')
|
||||||
|
logging.basicConfig(filename=log_file, level=logging.INFO)
|
||||||
|
logger = logging.getLogger('extract.py')
|
||||||
|
|
||||||
today = date.today()
|
today = date.today()
|
||||||
|
|
||||||
|
# The Orchestrator is a simple app designed to facilitate the execution of
|
||||||
|
# the ETL pipeline. Should any system fail, the Orchestrator will log the
|
||||||
|
# error and exit.
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
try:
|
try:
|
||||||
logger.info(f'{datetime.now()}:Starting extract.py')
|
logger.info(f'{datetime.now()}:Starting extract.py')
|
||||||
@@ -39,4 +47,4 @@ if __name__ == '__main__':
|
|||||||
os.remove(f'output/raw_google_books_{today}.json')
|
os.remove(f'output/raw_google_books_{today}.json')
|
||||||
os.remove(f'output/raw_open_lib_books_{today}.json')
|
os.remove(f'output/raw_open_lib_books_{today}.json')
|
||||||
os.remove(f'output/transformed_{today}.json')
|
os.remove(f'output/transformed_{today}.json')
|
||||||
logger.info(f'{datetime.now()}:Orchestration complete. ETL Pipeline executed without errors.')
|
logger.info(f'{datetime.now()}:Orchestration complete. ETL Pipeline executed without errors.')
|
||||||
@@ -48,7 +48,8 @@ library_table_creation = '''
|
|||||||
CREATE TABLE IF NOT EXISTS "library"(
|
CREATE TABLE IF NOT EXISTS "library"(
|
||||||
"id" BIGINT NOT NULL,
|
"id" BIGINT NOT NULL,
|
||||||
"name" VARCHAR(255) NOT NULL,
|
"name" VARCHAR(255) NOT NULL,
|
||||||
"address" VARCHAR(255) NOT NULL
|
"address" VARCHAR(255) NOT NULL,
|
||||||
|
"is_archived" BOOLEAN NOT NULL
|
||||||
);
|
);
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
|||||||
@@ -2,9 +2,13 @@ import os
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
from datetime import date, datetime
|
from datetime import date, datetime
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
logger = logging.getLogger('transform.py')
|
load_dotenv()
|
||||||
logging.basicConfig(filename=os.getenv('LOG_FILE'), level=logging.INFO)
|
|
||||||
|
log_file = os.getenv('LOG_FILE')
|
||||||
|
logging.basicConfig(filename=log_file, level=logging.INFO)
|
||||||
|
logger = logging.getLogger('extract.py')
|
||||||
|
|
||||||
today = date.today()
|
today = date.today()
|
||||||
|
|
||||||
@@ -35,7 +39,7 @@ def format_sort_title(title):
|
|||||||
|
|
||||||
def combine_raw_jsons(google_json, ol_json):
|
def combine_raw_jsons(google_json, ol_json):
|
||||||
'''
|
'''
|
||||||
Returns a dictionary consisting of an array of dictionarys.
|
Returns a dictionary consisting of an array of dictionaries.
|
||||||
Each child dictionary is a transformed book ready to be
|
Each child dictionary is a transformed book ready to be
|
||||||
inserted into a database.
|
inserted into a database.
|
||||||
|
|
||||||
@@ -53,6 +57,7 @@ def combine_raw_jsons(google_json, ol_json):
|
|||||||
isbn = ol_json['book_data'][index]['isbn']
|
isbn = ol_json['book_data'][index]['isbn']
|
||||||
sort_title = format_sort_title(title)
|
sort_title = format_sort_title(title)
|
||||||
|
|
||||||
|
# Ensuring field variables have valid data
|
||||||
if 'categories' in google_json['book_data'][index]['volumeInfo']:
|
if 'categories' in google_json['book_data'][index]['volumeInfo']:
|
||||||
categories = ', '.join(google_json['book_data'][index]['volumeInfo']['categories'])
|
categories = ', '.join(google_json['book_data'][index]['volumeInfo']['categories'])
|
||||||
else:
|
else:
|
||||||
@@ -65,6 +70,7 @@ def combine_raw_jsons(google_json, ol_json):
|
|||||||
|
|
||||||
if 'publishedDate' in google_json['book_data'][index]['volumeInfo']:
|
if 'publishedDate' in google_json['book_data'][index]['volumeInfo']:
|
||||||
published_date = google_json['book_data'][index]['volumeInfo']['publishedDate']
|
published_date = google_json['book_data'][index]['volumeInfo']['publishedDate']
|
||||||
|
# Making sure the publishing date has a month and day associated
|
||||||
if len(published_date) == 4:
|
if len(published_date) == 4:
|
||||||
published_date += '-12-31'
|
published_date += '-12-31'
|
||||||
elif len(published_date) < 10:
|
elif len(published_date) < 10:
|
||||||
@@ -72,8 +78,6 @@ def combine_raw_jsons(google_json, ol_json):
|
|||||||
else:
|
else:
|
||||||
published_date = '9999-12-31'
|
published_date = '9999-12-31'
|
||||||
|
|
||||||
lost_date = '9999-12-31'
|
|
||||||
|
|
||||||
if 'printType' in google_json['book_data'][index]['volumeInfo']:
|
if 'printType' in google_json['book_data'][index]['volumeInfo']:
|
||||||
print_type = google_json['book_data'][index]['volumeInfo']['printType']
|
print_type = google_json['book_data'][index]['volumeInfo']['printType']
|
||||||
else:
|
else:
|
||||||
@@ -94,6 +98,7 @@ def combine_raw_jsons(google_json, ol_json):
|
|||||||
description = ol_json['book_data'][index]['description']
|
description = ol_json['book_data'][index]['description']
|
||||||
price_in_cents = ol_json['book_data'][index]['price_in_cents']
|
price_in_cents = ol_json['book_data'][index]['price_in_cents']
|
||||||
cover_image_uri = ol_json['book_data'][index]['cover_image_uri']
|
cover_image_uri = ol_json['book_data'][index]['cover_image_uri']
|
||||||
|
lost_date = '9999-12-31'
|
||||||
|
|
||||||
transformed_dictionary_entry = {
|
transformed_dictionary_entry = {
|
||||||
'title': title,
|
'title': title,
|
||||||
@@ -116,6 +121,7 @@ def combine_raw_jsons(google_json, ol_json):
|
|||||||
'price_in_cents': price_in_cents,
|
'price_in_cents': price_in_cents,
|
||||||
'cover_image_uri': cover_image_uri,
|
'cover_image_uri': cover_image_uri,
|
||||||
}
|
}
|
||||||
|
|
||||||
transformed_dictionary['books'].append(transformed_dictionary_entry)
|
transformed_dictionary['books'].append(transformed_dictionary_entry)
|
||||||
|
|
||||||
return transformed_dictionary
|
return transformed_dictionary
|
||||||
|
|||||||
Reference in New Issue
Block a user