refactored functions
This commit is contained in:
27
src/load.py
27
src/load.py
@@ -17,22 +17,29 @@ db_password = os.getenv('DB_PASSWORD')
|
||||
|
||||
today = date.today()
|
||||
|
||||
collections_table_creation = sql_statements.collections_table_creation
|
||||
|
||||
def start():
|
||||
with psycopg.connect(f'dbname={db_name} user={db_user} password={db_password}') as conn, \
|
||||
open(f'output/transformed_{today}.json', 'r') as transformed_books:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(f'DROP TABLE IF EXISTS Collection_Item') # TODO: REMOVE WHEN TESTING COMPLETED
|
||||
cur.execute(collections_table_creation)
|
||||
books = json.loads(transformed_books.read())
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(f'DROP TABLE IF EXISTS Collection_Item') # TODO: REMOVE WHEN TESTING COMPLETED
|
||||
cur.execute(sql_statements.collections_table_creation)
|
||||
load_transformed_books(cur, books)
|
||||
|
||||
|
||||
for book in books['books']:
|
||||
cur.execute(sql_statements.collection_insert_statement(book))
|
||||
logger.info(f'{datetime.now()}:Book {book['title']} loaded.')
|
||||
def load_transformed_books(cursor, books):
|
||||
'''
|
||||
Takes a pyscopg connection cursor and a dictionary of books and inserts
|
||||
the books into a PostgreSQL database.
|
||||
|
||||
def load_transformed_books():
|
||||
pass
|
||||
Keyword arguments:
|
||||
cursor - a psycopg.connect.cursor object
|
||||
books - a dictionary of transformed books following the schema for the
|
||||
`collection_item` SQL table
|
||||
'''
|
||||
for book in books['books']:
|
||||
cursor.execute(sql_statements.collection_insert_statement(book))
|
||||
logger.info(f'{datetime.now()}:Book {book['title']} loaded.')
|
||||
|
||||
if __name__ == '__main__':
|
||||
print('Loading Started')
|
||||
|
||||
Reference in New Issue
Block a user