105 lines
3.5 KiB
Python
105 lines
3.5 KiB
Python
import os
|
|
import sqlite3
|
|
import json
|
|
|
|
# DB Settings (Postgres)
|
|
DB_HOST = os.environ.get('DB_HOST', None)
|
|
DB_PORT = int(os.environ.get('DB_PORT', '5432'))
|
|
DB_NAME = os.environ.get('DB_NAME', 'gkachele')
|
|
DB_USER = os.environ.get('DB_USER', 'gkachele')
|
|
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'gkachele_pass')
|
|
|
|
# SQLite Settings
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
SQLITE_PATH = os.path.join(BASE_DIR, 'database', 'main.db')
|
|
|
|
class CursorWrapper:
|
|
def __init__(self, cursor, is_sqlite=False):
|
|
self._cursor = cursor
|
|
self._is_sqlite = is_sqlite
|
|
|
|
def execute(self, query, params=None):
|
|
try:
|
|
if self._is_sqlite:
|
|
# Skip postgres-specific maintenance commands
|
|
if 'setval' in query or 'pg_get_serial_sequence' in query:
|
|
return None
|
|
|
|
# Adapt psycopg2-style placeholders (%s) to sqlite (?)
|
|
if isinstance(query, str) and '%s' in query:
|
|
query = query.replace('%s', '?')
|
|
|
|
# SQLite doesn't support IF NOT EXISTS in ALTER TABLE
|
|
if 'ALTER TABLE' in query and 'ADD COLUMN' in query and 'IF NOT EXISTS' in query:
|
|
query = query.replace('IF NOT EXISTS', '')
|
|
|
|
# Handle SERIAL PRIMARY KEY and other PG types
|
|
query = query.replace('SERIAL PRIMARY KEY', 'INTEGER PRIMARY KEY AUTOINCREMENT')
|
|
if 'IF NOT EXISTS' not in query and 'CREATE TABLE' in query:
|
|
query = query.replace('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS')
|
|
else:
|
|
# Adapt sqlite-style placeholders (?) to psycopg2 (%s)
|
|
if isinstance(query, str) and '?' in query:
|
|
query = query.replace('?', '%s')
|
|
|
|
if params is None:
|
|
return self._cursor.execute(query)
|
|
return self._cursor.execute(query, params)
|
|
except (sqlite3.OperationalError, sqlite3.IntegrityError) as e:
|
|
msg = str(e).lower()
|
|
if "duplicate column name" in msg or "already exists" in msg:
|
|
return None
|
|
raise e
|
|
|
|
def fetchone(self):
|
|
return self._cursor.fetchone()
|
|
|
|
def fetchall(self):
|
|
return self._cursor.fetchall()
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self._cursor, name)
|
|
|
|
class ConnectionWrapper:
|
|
def __init__(self, conn, is_sqlite=False):
|
|
self._conn = conn
|
|
self._is_sqlite = is_sqlite
|
|
|
|
def cursor(self):
|
|
return CursorWrapper(self._conn.cursor(), is_sqlite=self._is_sqlite)
|
|
|
|
def commit(self):
|
|
return self._conn.commit()
|
|
|
|
def close(self):
|
|
return self._conn.close()
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self._conn, name)
|
|
|
|
def get_db():
|
|
if DB_HOST:
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(
|
|
host=DB_HOST,
|
|
port=DB_PORT,
|
|
dbname=DB_NAME,
|
|
user=DB_USER,
|
|
password=DB_PASSWORD,
|
|
)
|
|
return ConnectionWrapper(conn, is_sqlite=False)
|
|
except Exception as e:
|
|
print(f"Postgres connection failed: {e}. Falling back to SQLite.")
|
|
|
|
# Fallback to SQLite
|
|
os.makedirs(os.path.dirname(SQLITE_PATH), exist_ok=True)
|
|
conn = sqlite3.connect(SQLITE_PATH)
|
|
return ConnectionWrapper(conn, is_sqlite=True)
|
|
|
|
try:
|
|
import psycopg2
|
|
IntegrityError = psycopg2.IntegrityError
|
|
except ImportError:
|
|
IntegrityError = sqlite3.IntegrityError
|