Files
gkachele-saas/demo/db.py

105 lines
3.5 KiB
Python

import os
import sqlite3
import json
# DB Settings (Postgres)
DB_HOST = os.environ.get('DB_HOST', None)
DB_PORT = int(os.environ.get('DB_PORT', '5432'))
DB_NAME = os.environ.get('DB_NAME', 'gkachele')
DB_USER = os.environ.get('DB_USER', 'gkachele')
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'gkachele_pass')
# SQLite Settings
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
SQLITE_PATH = os.path.join(BASE_DIR, 'database', 'main.db')
class CursorWrapper:
def __init__(self, cursor, is_sqlite=False):
self._cursor = cursor
self._is_sqlite = is_sqlite
def execute(self, query, params=None):
try:
if self._is_sqlite:
# Skip postgres-specific maintenance commands
if 'setval' in query or 'pg_get_serial_sequence' in query:
return None
# Adapt psycopg2-style placeholders (%s) to sqlite (?)
if isinstance(query, str) and '%s' in query:
query = query.replace('%s', '?')
# SQLite doesn't support IF NOT EXISTS in ALTER TABLE
if 'ALTER TABLE' in query and 'ADD COLUMN' in query and 'IF NOT EXISTS' in query:
query = query.replace('IF NOT EXISTS', '')
# Handle SERIAL PRIMARY KEY and other PG types
query = query.replace('SERIAL PRIMARY KEY', 'INTEGER PRIMARY KEY AUTOINCREMENT')
if 'IF NOT EXISTS' not in query and 'CREATE TABLE' in query:
query = query.replace('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS')
else:
# Adapt sqlite-style placeholders (?) to psycopg2 (%s)
if isinstance(query, str) and '?' in query:
query = query.replace('?', '%s')
if params is None:
return self._cursor.execute(query)
return self._cursor.execute(query, params)
except (sqlite3.OperationalError, sqlite3.IntegrityError) as e:
msg = str(e).lower()
if "duplicate column name" in msg or "already exists" in msg:
return None
raise e
def fetchone(self):
return self._cursor.fetchone()
def fetchall(self):
return self._cursor.fetchall()
def __getattr__(self, name):
return getattr(self._cursor, name)
class ConnectionWrapper:
def __init__(self, conn, is_sqlite=False):
self._conn = conn
self._is_sqlite = is_sqlite
def cursor(self):
return CursorWrapper(self._conn.cursor(), is_sqlite=self._is_sqlite)
def commit(self):
return self._conn.commit()
def close(self):
return self._conn.close()
def __getattr__(self, name):
return getattr(self._conn, name)
def get_db():
if DB_HOST:
try:
import psycopg2
conn = psycopg2.connect(
host=DB_HOST,
port=DB_PORT,
dbname=DB_NAME,
user=DB_USER,
password=DB_PASSWORD,
)
return ConnectionWrapper(conn, is_sqlite=False)
except Exception as e:
print(f"Postgres connection failed: {e}. Falling back to SQLite.")
# Fallback to SQLite
os.makedirs(os.path.dirname(SQLITE_PATH), exist_ok=True)
conn = sqlite3.connect(SQLITE_PATH)
return ConnectionWrapper(conn, is_sqlite=True)
try:
import psycopg2
IntegrityError = psycopg2.IntegrityError
except ImportError:
IntegrityError = sqlite3.IntegrityError