from datetime import date as dt import os import sqlite3 from werkzeug.security import generate_password_hash, check_password_hash class User(): def __init__(self, name, pass_hash=None): self.name = name self.id = None self.is_active = True self.is_authenticated = True self.is_anonymous = False self.pass_hash = pass_hash def set_password(self, password): self.pass_hash = password def set_id(self, ident): self.id = ident def check_password(self, password): return self.pass_hash == password def get_id(self): return self.id class Item(): def __init__(self, name): self.name = name self.id = None def set_id(self, ident): self.id = ident class Entry(): def __init__(self, item_id, date, text, rating, user_id, reviewed): self.item_id = item_id self.date = date self.text = text self.rating = rating self.user_id = user_id self.reviewed = reviewed def set_id(self, ident): self.id = ident class Database: def __init__(self): self.USER_TABLE_FILE = 'USERS' self.ENTRY_TABLE_FILE = 'ENTRIES' self.ITEM_TABLE_FILE = 'ITEMS' self.DB_DIR = os.path.dirname("./data/") self.setup_db() def connect(self): """ Connect to an existing database instance based on the object attributes. """ path = os.path.join(self.DB_DIR, "data.db") return sqlite3.connect(path) def setup_db(self): """Creates a database with tables.""" db = self.connect() crs = db.cursor() query = "CREATE TABLE IF NOT EXISTS " + self.USER_TABLE_FILE + \ "(id INTEGER PRIMARY KEY AUTOINCREMENT," + \ "name CHAR(32) NOT NULL UNIQUE," + \ "password CHAR(32) NOT NULL)" crs.execute(query) query = "CREATE TABLE IF NOT EXISTS " + self.ITEM_TABLE_FILE + \ "(id INTEGER PRIMARY KEY AUTOINCREMENT," + \ "name CHAR(32) NOT NULL UNIQUE)" crs.execute(query) query = "CREATE TABLE IF NOT EXISTS " + self.ENTRY_TABLE_FILE + \ "(id INTEGER PRIMARY KEY AUTOINCREMENT," + \ "item_id INTEGER NOT NULL REFERENCES " + self.ITEM_TABLE_FILE + "(id)," + \ "date CHAR(4) NOT NULL," + \ "text TEXT NOT NULL," + \ "rating INTEGER NOT NULL," +\ "user_id INTEGER REFERENCES " + self.USER_TABLE_FILE + "(id),"\ "reviewed CHAR(10) NOT NULL)" crs.execute(query) db.commit() def insert_user(self, user): if self.get_user_by_name(user.name) is None and user.pass_hash is not None: db = self.connect() crs = db.cursor() query = "INSERT INTO " + self.USER_TABLE_FILE + \ "(`name`,`password`)" + \ "VALUES (?, ?) ON CONFLICT DO NOTHING" crs.execute(query, (user.name, user.pass_hash)) db.commit() return crs.lastrowid return None def insert_entry(self, name, date, text, rating, user_id=None): db = self.connect() crs = db.cursor() query = "INSERT OR IGNORE INTO " + self.ITEM_TABLE_FILE + \ "(`name`)" + "VALUES (?)" crs.execute(query, (name, )) query = "SELECT id FROM " + self.ITEM_TABLE_FILE + \ " WHERE name = ?" crs.execute(query, (name, )) item_id = crs.fetchone()[0] reviewed = dt.today().strftime('%Y-%m-%d') query = "INSERT INTO " + self.ENTRY_TABLE_FILE + \ "(`item_id`,`date`, `text`, `rating`, `user_id`, `reviewed`)" + \ "VALUES (?, ?, ?, ?, ?, ?)" crs.execute(query, (item_id, date, text, rating, user_id, reviewed)) db.commit() return crs.lastrowid def delete_entry(self, ident): db = self.connect() crs = db.cursor() query = "DELETE FROM " + self.ENTRY_TABLE_FILE + " WHERE id = ?" crs.execute(query, (ident, )) db.commit() return crs.lastrowid def get_entries(self): db = self.connect() crs = db.cursor() query = "SELECT * FROM " + self.ENTRY_TABLE_FILE crs.execute(query) return crs.fetchall() def get_entry_by_id(self, ident): db = self.connect() crs = db.cursor() query = "SELECT * FROM " + self.ENTRY_TABLE_FILE + " WHERE id = ?" crs.execute(query, (ident, )) fetched = crs.fetchone() if fetched is None: return None else: return self.db_to_entry(*fetched) def get_entries_by_user(self, name): db = self.connect() crs = db.cursor() query = "SELECT * FROM " + self.ENTRY_TABLE_FILE + \ " WHERE user_id = (SELECT id FROM " + self.USER_TABLE_FILE + \ " WHERE name = ?)" crs.execute(query, (name, )) return crs.fetchall() def get_item_by_id(self, ident): db = self.connect() crs = db.cursor() query = "SELECT * FROM " + self.ITEM_TABLE_FILE + " WHERE id = ?" crs.execute(query, (ident, )) fetched = crs.fetchone() if fetched is None: return None else: return self.db_to_item(*fetched) def get_user_by_id(self, ident): db = self.connect() crs = db.cursor() query = "SELECT * FROM " + self.USER_TABLE_FILE + " WHERE id = ?" crs.execute(query, (ident, )) fetched = crs.fetchone() if fetched is None: return None else: return self.db_to_user(*fetched) def get_user_by_name(self, name): db = self.connect() crs = db.cursor() query = "SELECT * FROM " + self.USER_TABLE_FILE + " WHERE name = ?" crs.execute(query, (name, )) fetched = crs.fetchone() if fetched is None: return None else: return self.db_to_user(*fetched) def db_to_user(self, ident, name, pass_hash): user = User(name, pass_hash) user.set_id(ident) return user def db_to_item(self, ident, name): item = Item(name) item.set_id(ident) return item def db_to_entry(self, ident, item_id, date, text, rating, user_id, reviewed): entry = Entry(item_id, date, text, rating, user_id, reviewed) entry.set_id(ident) return entry