Files
VoiceVault/backend/db_queries.py

343 lines
11 KiB
Python
Raw Normal View History

2026-02-14 21:13:12 -07:00
"""
2026-02-14 21:46:48 -07:00
Supabase data layer aligned with TitanForge/schema.sql.
2026-02-14 21:13:12 -07:00
"""
import os
from typing import Any, Dict, List, Optional
2026-02-14 21:46:48 -07:00
from dotenv import load_dotenv
2026-02-14 21:13:12 -07:00
from supabase import Client, create_client
2026-02-14 21:46:48 -07:00
load_dotenv()
SUPABASE_URL = (os.getenv("SUPABASE_URL") or "").strip()
SUPABASE_SERVICE_ROLE_KEY = (os.getenv("SUPABASE_SERVICE_ROLE_KEY") or "").strip()
2026-02-14 21:13:12 -07:00
if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY:
2026-02-14 21:46:48 -07:00
raise RuntimeError("Missing SUPABASE_URL or SUPABASE_SERVICE_ROLE_KEY environment variables.")
2026-02-14 21:13:12 -07:00
supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY)
2026-02-14 21:46:48 -07:00
def _rows(response: Any) -> List[Dict[str, Any]]:
return getattr(response, "data", None) or []
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
def _first(response: Any) -> Optional[Dict[str, Any]]:
data = _rows(response)
2026-02-14 21:13:12 -07:00
return data[0] if data else None
2026-02-14 21:46:48 -07:00
def _paginate(page: int, limit: int) -> tuple[int, int]:
page = max(1, page)
limit = min(max(1, limit), 100)
start = (page - 1) * limit
end = start + limit - 1
return start, end
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
# ==================== Users ====================
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
def create_user(payload: Dict[str, Any]) -> Dict[str, Any]:
required = ["email", "password_hash"]
for field in required:
if not payload.get(field):
raise ValueError(f"'{field}' is required.")
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
data = {
"email": payload["email"],
"password_hash": payload["password_hash"],
"display_name": payload.get("display_name"),
"avatar_url": payload.get("avatar_url"),
"bio": payload.get("bio"),
2026-02-14 21:13:12 -07:00
}
2026-02-14 21:46:48 -07:00
response = supabase.table("users").insert(data).execute()
created = _first(response)
if not created:
raise RuntimeError("Failed to create user.")
return created
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
def get_user_by_id(user_id: int) -> Optional[Dict[str, Any]]:
return _first(supabase.table("users").select("*").eq("user_id", user_id).limit(1).execute())
2026-02-14 22:04:39 -07:00
def get_user_by_email(email: str) -> Optional[Dict[str, Any]]:
return _first(supabase.table("users").select("*").eq("email", email).limit(1).execute())
2026-02-14 21:46:48 -07:00
# ==================== Audio Posts ====================
def create_audio_post(payload: Dict[str, Any]) -> Dict[str, Any]:
required = ["user_id", "title", "storage_prefix"]
for field in required:
if payload.get(field) in (None, ""):
raise ValueError(f"'{field}' is required.")
data = {
"user_id": payload["user_id"],
"title": payload["title"],
"description": payload.get("description"),
"visibility": payload.get("visibility", "private"),
"status": payload.get("status", "uploaded"),
"recorded_date": payload.get("recorded_date"),
"language": payload.get("language", "en"),
"storage_prefix": payload["storage_prefix"],
"manifest_sha256": payload.get("manifest_sha256"),
"bundle_sha256": payload.get("bundle_sha256"),
"published_at": payload.get("published_at"),
}
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
response = supabase.table("audio_posts").insert(data).execute()
created = _first(response)
if not created:
raise RuntimeError("Failed to create audio post.")
return created
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
def get_audio_post_by_id(post_id: int) -> Optional[Dict[str, Any]]:
2026-02-14 21:13:12 -07:00
query = (
2026-02-14 21:46:48 -07:00
supabase.table("audio_posts")
.select("*, users(user_id, email, display_name, avatar_url)")
.eq("post_id", post_id)
.limit(1)
2026-02-14 21:13:12 -07:00
)
2026-02-14 21:46:48 -07:00
return _first(query.execute())
2026-02-14 21:13:12 -07:00
2026-02-14 22:04:39 -07:00
def list_user_history(user_id: int, page: int = 1, limit: int = 20) -> List[Dict[str, Any]]:
return list_audio_posts(page=page, limit=limit, user_id=user_id)
2026-02-14 21:46:48 -07:00
def list_audio_posts(page: int = 1, limit: int = 20, visibility: Optional[str] = None, user_id: Optional[int] = None) -> List[Dict[str, Any]]:
start, end = _paginate(page, limit)
query = supabase.table("audio_posts").select("*, users(user_id, email, display_name, avatar_url)")
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
if visibility:
query = query.eq("visibility", visibility)
if user_id:
query = query.eq("user_id", user_id)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
response = query.order("created_at", desc=True).range(start, end).execute()
return _rows(response)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
def update_audio_post(post_id: int, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]:
if not updates:
return get_audio_post_by_id(post_id)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
allowed = {
"title",
"description",
"visibility",
"status",
"recorded_date",
"language",
"storage_prefix",
"manifest_sha256",
"bundle_sha256",
"published_at",
2026-02-14 21:13:12 -07:00
}
2026-02-14 21:46:48 -07:00
clean = {k: v for k, v in updates.items() if k in allowed}
if not clean:
return get_audio_post_by_id(post_id)
2026-02-14 21:13:12 -07:00
response = (
2026-02-14 21:46:48 -07:00
supabase.table("audio_posts")
.update(clean)
.eq("post_id", post_id)
2026-02-14 21:13:12 -07:00
.execute()
)
2026-02-14 21:46:48 -07:00
return _first(response)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
# ==================== Archive Files ====================
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
def add_archive_file(post_id: int, payload: Dict[str, Any]) -> Dict[str, Any]:
required = ["role", "path", "sha256"]
for field in required:
if not payload.get(field):
raise ValueError(f"'{field}' is required.")
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
data = {
"post_id": post_id,
"role": payload["role"],
"path": payload["path"],
"content_type": payload.get("content_type"),
"size_bytes": payload.get("size_bytes"),
"sha256": payload["sha256"],
}
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
response = supabase.table("archive_files").insert(data).execute()
created = _first(response)
if not created:
raise RuntimeError("Failed to add archive file.")
return created
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
def list_archive_files(post_id: int) -> List[Dict[str, Any]]:
2026-02-14 21:13:12 -07:00
response = (
2026-02-14 21:46:48 -07:00
supabase.table("archive_files")
.select("*")
2026-02-14 21:13:12 -07:00
.eq("post_id", post_id)
2026-02-14 21:46:48 -07:00
.order("created_at", desc=False)
2026-02-14 21:13:12 -07:00
.execute()
)
2026-02-14 21:46:48 -07:00
return _rows(response)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
# ==================== Metadata / Rights ====================
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
def upsert_archive_metadata(post_id: int, metadata: str) -> Dict[str, Any]:
data = {"post_id": post_id, "metadata": metadata}
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
existing = _first(supabase.table("archive_metadata").select("post_id").eq("post_id", post_id).limit(1).execute())
if existing:
response = supabase.table("archive_metadata").update({"metadata": metadata}).eq("post_id", post_id).execute()
else:
response = supabase.table("archive_metadata").insert(data).execute()
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
row = _first(response)
if not row:
raise RuntimeError("Failed to upsert archive metadata.")
return row
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
def get_archive_metadata(post_id: int) -> Optional[Dict[str, Any]]:
return _first(supabase.table("archive_metadata").select("*").eq("post_id", post_id).limit(1).execute())
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
def upsert_archive_rights(post_id: int, payload: Dict[str, Any]) -> Dict[str, Any]:
data = {
"post_id": post_id,
"has_speaker_consent": payload.get("has_speaker_consent", False),
"license": payload.get("license"),
"consent_notes": payload.get("consent_notes"),
"allowed_use": payload.get("allowed_use"),
"restrictions": payload.get("restrictions"),
}
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
existing = _first(supabase.table("archive_rights").select("post_id").eq("post_id", post_id).limit(1).execute())
if existing:
response = (
supabase.table("archive_rights")
.update({k: v for k, v in data.items() if k != "post_id"})
.eq("post_id", post_id)
.execute()
)
else:
response = supabase.table("archive_rights").insert(data).execute()
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
row = _first(response)
if not row:
raise RuntimeError("Failed to upsert archive rights.")
return row
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
def get_archive_rights(post_id: int) -> Optional[Dict[str, Any]]:
return _first(supabase.table("archive_rights").select("*").eq("post_id", post_id).limit(1).execute())
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
# ==================== RAG Chunks ====================
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
def add_rag_chunks(post_id: int, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
if not chunks:
return []
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
rows = []
for c in chunks:
rows.append(
{
"post_id": post_id,
"start_sec": c.get("start_sec"),
"end_sec": c.get("end_sec"),
"text": c.get("text"),
"confidence": c.get("confidence"),
"embedding": c.get("embedding"),
}
)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
response = supabase.table("rag_chunks").insert(rows).execute()
return _rows(response)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
def list_rag_chunks(post_id: int, page: int = 1, limit: int = 200) -> List[Dict[str, Any]]:
start, end = _paginate(page, limit)
2026-02-14 21:13:12 -07:00
response = (
2026-02-14 21:46:48 -07:00
supabase.table("rag_chunks")
.select("*")
.eq("post_id", post_id)
.order("start_sec", desc=False)
.range(start, end)
2026-02-14 21:13:12 -07:00
.execute()
)
2026-02-14 21:46:48 -07:00
return _rows(response)
2026-02-14 21:13:12 -07:00
2026-02-14 22:04:39 -07:00
def search_rag_chunks(user_id: int, query_text: str, page: int = 1, limit: int = 30) -> List[Dict[str, Any]]:
start, end = _paginate(page, limit)
response = (
supabase.table("rag_chunks")
.select(
"chunk_id, post_id, start_sec, end_sec, text, confidence, created_at, "
"audio_posts!inner(post_id, user_id, title, visibility, created_at)"
)
.eq("audio_posts.user_id", user_id)
.ilike("text", f"%{query_text}%")
.order("created_at", desc=True)
.range(start, end)
.execute()
)
return _rows(response)
2026-02-14 21:46:48 -07:00
# ==================== Audit Log ====================
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
def add_audit_log(payload: Dict[str, Any]) -> Dict[str, Any]:
if not payload.get("action"):
raise ValueError("'action' is required.")
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
data = {
"post_id": payload.get("post_id"),
"user_id": payload.get("user_id"),
"action": payload["action"],
"details": payload.get("details"),
}
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
response = supabase.table("audit_log").insert(data).execute()
row = _first(response)
if not row:
raise RuntimeError("Failed to create audit log.")
return row
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
def list_audit_logs(post_id: Optional[int] = None, user_id: Optional[int] = None, page: int = 1, limit: int = 100) -> List[Dict[str, Any]]:
start, end = _paginate(page, limit)
query = supabase.table("audit_log").select("*")
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
if post_id is not None:
query = query.eq("post_id", post_id)
if user_id is not None:
query = query.eq("user_id", user_id)
response = query.order("created_at", desc=True).range(start, end).execute()
return _rows(response)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
# ==================== Aggregate View ====================
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
def get_post_bundle(post_id: int) -> Dict[str, Any]:
post = get_audio_post_by_id(post_id)
if not post:
return {}
2026-02-14 21:13:12 -07:00
return {
2026-02-14 21:46:48 -07:00
"post": post,
"files": list_archive_files(post_id),
"metadata": get_archive_metadata(post_id),
"rights": get_archive_rights(post_id),
"rag_chunks": list_rag_chunks(post_id, page=1, limit=1000),
"audit_log": list_audit_logs(post_id=post_id, page=1, limit=200),
2026-02-14 21:13:12 -07:00
}