Compare commits
8 Commits
ffbe9274fe
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| bd7c590646 | |||
| 764078562a | |||
| bf3bd1b147 | |||
| a531df5e9f | |||
| 085cc008b4 | |||
| a141fa76f1 | |||
| 388c39973c | |||
| a60f76c30e |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -195,3 +195,4 @@ cython_debug/
|
||||
|
||||
.DS_Store
|
||||
output/*
|
||||
.vscode/settings.json
|
||||
|
||||
11
Dockerfile
Normal file
11
Dockerfile
Normal file
@@ -0,0 +1,11 @@
|
||||
FROM python:3.13
|
||||
WORKDIR /ETL
|
||||
|
||||
COPY requirements.txt ./
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY . .
|
||||
|
||||
VOLUME ["/etl/logs"]
|
||||
|
||||
CMD ["python3", "./src/load.py"]
|
||||
41
README.md
41
README.md
@@ -1,5 +1,30 @@
|
||||
# LMS-DB-ETL
|
||||
An Extract, Transform, Load app to gather book information from public APIs for a POC LMS project
|
||||
An Extract, Transform, Load (ETL) app to gather book information from public APIs for
|
||||
a Proof of Concept Library Management System project.
|
||||
|
||||
(Past Git history can be found at: https://github.com/Kalarsoft/LMS-DB-ETL and
|
||||
https://gitea.com/NickKalar/LMS-DB-ETL)
|
||||
|
||||
## Problem
|
||||
Currently, I am working on building a Library Management System (LMS) to help
|
||||
develop and showcase my software engineering skills. In order to fully test
|
||||
and run the LMS, I need to have a database that is populated by a variety of
|
||||
different media. As I am one person, and have only about 300 books to my name,
|
||||
this problem needed a better solution that manually adding in those books.
|
||||
|
||||
## Solution
|
||||
This project seeks to seed a database with book details, mostly pulled from
|
||||
public APIs. The current version uses the Google Books API and Open Library
|
||||
API. After pulling data from these APIs for several books, the data is merged
|
||||
and transformed to be loaded into a PostgreSQL database for consumption by the
|
||||
RESTful APIs associated with the LMS project.
|
||||
|
||||
This is a rudimentary ETL pipeline, as it uses no external tools and uses only
|
||||
2 Python libraries for making the API calls and connecting to the database.
|
||||
However, it does showcase my understanding of Data Engineering and the ETL
|
||||
cycle.
|
||||
|
||||
## Setup
|
||||
|
||||
Environmental Variables:
|
||||
`GOOGLE_API_KEY` - API Key required for using the Google Books API.
|
||||
@@ -17,7 +42,7 @@ match the column names of the database schema.
|
||||
|
||||
## load.py
|
||||
Takes the JSON file created by transform.py and loads the data into a PostgreSQL database for
|
||||
retreival later.
|
||||
retrieval later.
|
||||
|
||||
## orchestrator.py
|
||||
Handles the orchestration of each program being ran one after the other. Ensures each
|
||||
@@ -30,15 +55,19 @@ needs to be on its own line and any special characters should be escaped.
|
||||
|
||||
## How To Use
|
||||
1) Create a virtual environment (optional, but best practice)
|
||||
```zsh
|
||||
python3 -m venv ./.venv
|
||||
source ./.venv/bin/activate
|
||||
```
|
||||
2) Use Pip to install all required packages
|
||||
```python
|
||||
pip install -r requirements
|
||||
```zsh
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
3) Run the Orchestrator:
|
||||
```python
|
||||
```zsh
|
||||
python src/orchestrator.py
|
||||
```
|
||||
OR
|
||||
```python
|
||||
```zsh
|
||||
python3 src/orchestrator.py
|
||||
```
|
||||
46
compose.yaml
Normal file
46
compose.yaml
Normal file
@@ -0,0 +1,46 @@
|
||||
name: etl-and-db
|
||||
|
||||
services:
|
||||
db:
|
||||
container_name: postgres_db
|
||||
image: postgres:18.0-trixie
|
||||
networks:
|
||||
- backend
|
||||
expose:
|
||||
- "5432"
|
||||
ports:
|
||||
- "5432:5432"
|
||||
shm_size: 128mb
|
||||
environment:
|
||||
- POSTGRES_PASSWORD=${DB_PASSWORD}
|
||||
- POSTGRES_USER=${DB_USER}
|
||||
- POSTGRES_DB=${DB_NAME}
|
||||
volumes:
|
||||
- postgres_data:/var/lib/postgresql
|
||||
healthcheck:
|
||||
test: ["CMD", "pg_isready", "-U", "etl_agent", "-d", "LMS-Test"]
|
||||
interval: 30s
|
||||
retries: 3
|
||||
start_period: 30s
|
||||
timeout: 10s
|
||||
restart: always
|
||||
|
||||
etl:
|
||||
container_name: etl_pipeline
|
||||
image: etl-pipeline
|
||||
build: .
|
||||
depends_on:
|
||||
- db
|
||||
networks:
|
||||
- backend
|
||||
command: "python3 ./src/orchestrator.py"
|
||||
restart: no
|
||||
volumes:
|
||||
- /tmp/etl/logs/:/var/lib/etl/logs
|
||||
|
||||
networks:
|
||||
backend:
|
||||
name: pg_network
|
||||
|
||||
volumes:
|
||||
postgres_data:
|
||||
@@ -6,15 +6,16 @@ import json
|
||||
from dotenv import load_dotenv
|
||||
from datetime import date, datetime
|
||||
|
||||
load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
log_file = os.getenv('LOG_FILE')
|
||||
logging.basicConfig(filename=log_file, level=logging.INFO)
|
||||
logger = logging.getLogger('extract.py')
|
||||
|
||||
google_api_key = os.getenv('GOOGLE_API_KEY')
|
||||
google_header = {'key': google_api_key}
|
||||
open_lib_header = {'User-Agent': 'Kalar-LMS nick@kalar.codes'}
|
||||
|
||||
logger = logging.getLogger('extract.py')
|
||||
logging.basicConfig(filename=os.getenv('LOG_FILE'), level=logging.INFO)
|
||||
|
||||
today = date.today()
|
||||
|
||||
def extract_book_json(url, header=[]):
|
||||
|
||||
23
src/load.py
23
src/load.py
@@ -1,4 +1,5 @@
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
import json
|
||||
import psycopg
|
||||
@@ -9,23 +10,33 @@ import random
|
||||
|
||||
load_dotenv()
|
||||
|
||||
logger = logging.getLogger('load.py')
|
||||
logging.basicConfig(filename=os.getenv('LOG_FILE'), level=logging.INFO)
|
||||
log_file = os.getenv('LOG_FILE')
|
||||
logging.basicConfig(filename=log_file, level=logging.INFO)
|
||||
logger = logging.getLogger('extract.py')
|
||||
|
||||
db_name = os.getenv('DB_NAME')
|
||||
db_user = os.getenv('DB_USER')
|
||||
db_password = os.getenv('DB_PASSWORD')
|
||||
db_host = os.getenv('DB_HOST')
|
||||
db_port = os.getenv('DB_PORT')
|
||||
|
||||
today = date.today()
|
||||
|
||||
def start():
|
||||
with psycopg.connect(f'dbname={db_name} user={db_user} password={db_password}') as conn, \
|
||||
logger.info(f"{datetime.now()}: Attempting connection...")
|
||||
|
||||
# Attempting to connect to the Book Database for loading book information
|
||||
db_connection_string = f'dbname={db_name} user={db_user} password={db_password} host={db_host} port={db_port}'
|
||||
with psycopg.connect(db_connection_string) as conn, \
|
||||
open(f'output/transformed_{today}.json', 'r') as transformed_books:
|
||||
logger.info(f'{datetime.now()}: Connection established')
|
||||
books = json.loads(transformed_books.read())
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(f'DROP TABLE IF EXISTS Collection_Item') # TODO: REMOVE WHEN TESTING COMPLETED
|
||||
cur.execute(f'DROP TABLE IF EXISTS collection_item') # TODO: REMOVE WHEN TESTING COMPLETED
|
||||
cur.execute(sql_statements.collection_item_table_creation)
|
||||
logger.info(f'{datetime.now()}: Table Created')
|
||||
load_transformed_books(cur, books)
|
||||
logger.info(f'{datetime.now()}: Books loaded')
|
||||
|
||||
|
||||
def load_transformed_books(cursor, books):
|
||||
@@ -39,12 +50,14 @@ def load_transformed_books(cursor, books):
|
||||
`collection_item` SQL table
|
||||
'''
|
||||
for book in books['books']:
|
||||
# This simulates a library buying multiple copies of a book.
|
||||
# This for i in range statement simulates a library buying multiple copies of a book.
|
||||
try:
|
||||
for i in range(random.randrange(1, 10)):
|
||||
cursor.execute(sql_statements.collection_insert_statement(book))
|
||||
logger.info(f'{datetime.now()}:Book {book['title']} loaded {i+1} times.')
|
||||
except Exception as err:
|
||||
# if one, or even a series of books are bad, we still want to load what we can.
|
||||
# Log the error and move on.
|
||||
logger.error(f'{err} at {book.title}')
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -5,12 +5,20 @@ import transform
|
||||
import load
|
||||
import logging
|
||||
from datetime import date, datetime
|
||||
from dotenv import load_dotenv
|
||||
|
||||
logger = logging.getLogger('orchestrator.py')
|
||||
logging.basicConfig(filename=os.getenv('LOG_FILE'), level=logging.INFO)
|
||||
load_dotenv()
|
||||
|
||||
log_file = os.getenv('LOG_FILE')
|
||||
logging.basicConfig(filename=log_file, level=logging.INFO)
|
||||
logger = logging.getLogger('extract.py')
|
||||
|
||||
today = date.today()
|
||||
|
||||
# The Orchestrator is a simple app designed to facilitate the execution of
|
||||
# the ETL pipeline. Should any system fail, the Orchestrator will log the
|
||||
# error and exit.
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
logger.info(f'{datetime.now()}:Starting extract.py')
|
||||
|
||||
@@ -48,7 +48,8 @@ library_table_creation = '''
|
||||
CREATE TABLE IF NOT EXISTS "library"(
|
||||
"id" BIGINT NOT NULL,
|
||||
"name" VARCHAR(255) NOT NULL,
|
||||
"address" VARCHAR(255) NOT NULL
|
||||
"address" VARCHAR(255) NOT NULL,
|
||||
"is_archived" BOOLEAN NOT NULL
|
||||
);
|
||||
'''
|
||||
|
||||
|
||||
@@ -2,9 +2,13 @@ import os
|
||||
import json
|
||||
import logging
|
||||
from datetime import date, datetime
|
||||
from dotenv import load_dotenv
|
||||
|
||||
logger = logging.getLogger('transform.py')
|
||||
logging.basicConfig(filename=os.getenv('LOG_FILE'), level=logging.INFO)
|
||||
load_dotenv()
|
||||
|
||||
log_file = os.getenv('LOG_FILE')
|
||||
logging.basicConfig(filename=log_file, level=logging.INFO)
|
||||
logger = logging.getLogger('extract.py')
|
||||
|
||||
today = date.today()
|
||||
|
||||
@@ -35,7 +39,7 @@ def format_sort_title(title):
|
||||
|
||||
def combine_raw_jsons(google_json, ol_json):
|
||||
'''
|
||||
Returns a dictionary consisting of an array of dictionarys.
|
||||
Returns a dictionary consisting of an array of dictionaries.
|
||||
Each child dictionary is a transformed book ready to be
|
||||
inserted into a database.
|
||||
|
||||
@@ -53,6 +57,7 @@ def combine_raw_jsons(google_json, ol_json):
|
||||
isbn = ol_json['book_data'][index]['isbn']
|
||||
sort_title = format_sort_title(title)
|
||||
|
||||
# Ensuring field variables have valid data
|
||||
if 'categories' in google_json['book_data'][index]['volumeInfo']:
|
||||
categories = ', '.join(google_json['book_data'][index]['volumeInfo']['categories'])
|
||||
else:
|
||||
@@ -65,6 +70,7 @@ def combine_raw_jsons(google_json, ol_json):
|
||||
|
||||
if 'publishedDate' in google_json['book_data'][index]['volumeInfo']:
|
||||
published_date = google_json['book_data'][index]['volumeInfo']['publishedDate']
|
||||
# Making sure the publishing date has a month and day associated
|
||||
if len(published_date) == 4:
|
||||
published_date += '-12-31'
|
||||
elif len(published_date) < 10:
|
||||
@@ -72,8 +78,6 @@ def combine_raw_jsons(google_json, ol_json):
|
||||
else:
|
||||
published_date = '9999-12-31'
|
||||
|
||||
lost_date = '9999-12-31'
|
||||
|
||||
if 'printType' in google_json['book_data'][index]['volumeInfo']:
|
||||
print_type = google_json['book_data'][index]['volumeInfo']['printType']
|
||||
else:
|
||||
@@ -94,6 +98,7 @@ def combine_raw_jsons(google_json, ol_json):
|
||||
description = ol_json['book_data'][index]['description']
|
||||
price_in_cents = ol_json['book_data'][index]['price_in_cents']
|
||||
cover_image_uri = ol_json['book_data'][index]['cover_image_uri']
|
||||
lost_date = '9999-12-31'
|
||||
|
||||
transformed_dictionary_entry = {
|
||||
'title': title,
|
||||
@@ -116,6 +121,7 @@ def combine_raw_jsons(google_json, ol_json):
|
||||
'price_in_cents': price_in_cents,
|
||||
'cover_image_uri': cover_image_uri,
|
||||
}
|
||||
|
||||
transformed_dictionary['books'].append(transformed_dictionary_entry)
|
||||
|
||||
return transformed_dictionary
|
||||
|
||||
Reference in New Issue
Block a user