diff --git a/backend/__pycache__/api_routes.cpython-311.pyc b/backend/__pycache__/api_routes.cpython-311.pyc new file mode 100644 index 0000000..7fd670c Binary files /dev/null and b/backend/__pycache__/api_routes.cpython-311.pyc differ diff --git a/backend/__pycache__/db_queries.cpython-311.pyc b/backend/__pycache__/db_queries.cpython-311.pyc new file mode 100644 index 0000000..12f2702 Binary files /dev/null and b/backend/__pycache__/db_queries.cpython-311.pyc differ diff --git a/backend/api_routes.py b/backend/api_routes.py index 14dfe22..ad8ad67 100644 --- a/backend/api_routes.py +++ b/backend/api_routes.py @@ -1,299 +1,240 @@ """ -Example Flask routes using db_queries.py -Add these routes to your existing Flask app +Flask API routes aligned with TitanForge/schema.sql. """ -from flask import Flask, jsonify, request +from flask import Blueprint, jsonify, request -# Import all query functions from db_queries import ( + add_archive_file, + add_audit_log, + add_rag_chunks, + create_audio_post, + create_user, + get_archive_metadata, + get_archive_rights, + get_audio_post_by_id, + get_post_bundle, get_user_by_id, - get_user_by_username, - get_user_stats, - get_post_by_id, - get_posts_feed, - get_user_posts, - get_post_engagement, - check_user_post_interactions, - get_all_categories, - get_posts_by_category, - get_post_comments, - get_listening_history, - get_search_history, - search_posts, - get_trending_topics, - get_user_bookmarks, - get_pagination_info + list_archive_files, + list_audio_posts, + list_audit_logs, + list_rag_chunks, + update_audio_post, + upsert_archive_metadata, + upsert_archive_rights, ) -# Assuming you have the app instance -# app = Flask(__name__) +api = Blueprint("api", __name__, url_prefix="/api") -# ==================== USER ROUTES ==================== +def _error(message: str, status: int = 400): + return jsonify({"error": message}), status -@app.get("/users/") -def get_user(user_id: int): - """Get user profile with stats.""" + +@api.get("/health") +def health(): + return jsonify({"status": "ok"}) + + +# ==================== Users ==================== + +@api.post("/users") +def api_create_user(): + payload = request.get_json(force=True, silent=False) or {} + try: + return jsonify(create_user(payload)), 201 + except ValueError as e: + return _error(str(e), 400) + except Exception as e: + return _error(str(e), 500) + + +@api.get("/users/") +def api_get_user(user_id: int): user = get_user_by_id(user_id) if not user: - return jsonify({"error": "User not found"}), 404 - - stats = get_user_stats(user_id) - - return jsonify({ - "user": { - "id": user["user_id"], - "username": user["username"], - "email": user["email"], - "display_name": user["display_name"], - "bio": user.get("bio"), - "profile_image_url": user.get("profile_image_url"), - "location": None, # Add to schema if needed - "created_at": user["created_at"], - "stats": stats - } - }) + return _error("User not found.", 404) + return jsonify(user) -@app.get("/users/me") -def get_current_user(): - """Get current user profile (requires auth).""" - # In production, get user_id from JWT token - user_id = request.args.get("user_id", type=int) - if not user_id: - return jsonify({"error": "user_id required"}), 400 +# ==================== Audio Posts ==================== - return get_user(user_id) +@api.post("/posts") +def api_create_post(): + payload = request.get_json(force=True, silent=False) or {} + try: + return jsonify(create_audio_post(payload)), 201 + except ValueError as e: + return _error(str(e), 400) + except Exception as e: + return _error(str(e), 500) -# ==================== POST/FEED ROUTES ==================== - -@app.get("/posts") -def get_feed(): - """Get personalized feed.""" - user_id = request.args.get("user_id", type=int) - page = request.args.get("page", default=1, type=int) - limit = request.args.get("limit", default=20, type=int) - sort = request.args.get("sort", default="recent") - - if not user_id: - return jsonify({"error": "user_id required"}), 400 - - if limit > 50: - limit = 50 - - posts = get_posts_feed(user_id, page=page, limit=limit, sort=sort) - - # For production, you'd get total count from a separate query - total_count = len(posts) * 10 # Placeholder - pagination = get_pagination_info(total_count, page, limit) - - return jsonify({ - "posts": posts, - "pagination": pagination - }) - - -@app.get("/posts/") -def get_single_post(post_id: int): - """Get a single post by ID.""" - user_id = request.args.get("user_id", type=int) # For privacy check - - post = get_post_by_id(post_id, requesting_user_id=user_id) - if not post: - return jsonify({"error": "Post not found or private"}), 404 - - # Add user interaction info if user_id provided - if user_id: - interactions = check_user_post_interactions(user_id, post_id) - post.update(interactions) - - return jsonify({"post": post}) - - -@app.get("/posts/user/") -def get_posts_by_user(user_id: int): - """Get posts by a specific user.""" - filter_type = request.args.get("filter", default="all") - page = request.args.get("page", default=1, type=int) - limit = request.args.get("limit", default=20, type=int) - - if limit > 50: - limit = 50 - - posts = get_user_posts(user_id, filter_type=filter_type, page=page, limit=limit) - - return jsonify({ - "posts": posts, - "pagination": get_pagination_info(len(posts) * 5, page, limit) # Placeholder - }) - - -@app.get("/posts/user/me") -def get_my_posts(): - """Get current user's posts.""" - user_id = request.args.get("user_id", type=int) - if not user_id: - return jsonify({"error": "user_id required"}), 400 - - return get_posts_by_user(user_id) - - -# ==================== CATEGORY ROUTES ==================== - -@app.get("/categories") -def get_categories(): - """Get all categories.""" - categories = get_all_categories() - return jsonify({"categories": categories}) - - -@app.get("/categories//posts") -def get_category_posts(category_id: int): - """Get posts in a category.""" - page = request.args.get("page", default=1, type=int) - limit = request.args.get("limit", default=20, type=int) - - posts = get_posts_by_category(category_id, page=page, limit=limit) - - return jsonify({ - "posts": posts, - "pagination": get_pagination_info(len(posts) * 5, page, limit) - }) - - -# ==================== COMMENT ROUTES ==================== - -@app.get("/posts//comments") -def get_comments(post_id: int): - """Get comments for a post.""" - page = request.args.get("page", default=1, type=int) - limit = request.args.get("limit", default=20, type=int) - - comments = get_post_comments(post_id, page=page, limit=limit) - - return jsonify({ - "comments": comments, - "pagination": get_pagination_info(len(comments) * 3, page, limit) - }) - - -# ==================== HISTORY ROUTES ==================== - -@app.get("/history/listening") -def get_user_listening_history(): - """Get user's listening history.""" - user_id = request.args.get("user_id", type=int) - if not user_id: - return jsonify({"error": "user_id required"}), 400 - - page = request.args.get("page", default=1, type=int) - limit = request.args.get("limit", default=50, type=int) - completed_only = request.args.get("completed", default="false").lower() == "true" - - history = get_listening_history(user_id, page=page, limit=limit, completed_only=completed_only) - - return jsonify({ - "history": history, - "pagination": get_pagination_info(len(history) * 3, page, limit) - }) - - -@app.get("/history/searches") -def get_user_search_history(): - """Get user's search history.""" - user_id = request.args.get("user_id", type=int) - if not user_id: - return jsonify({"error": "user_id required"}), 400 - - page = request.args.get("page", default=1, type=int) - limit = request.args.get("limit", default=50, type=int) - - searches = get_search_history(user_id, page=page, limit=limit) - - return jsonify({"searches": searches}) - - -# ==================== SEARCH ROUTES ==================== - -@app.get("/search") -def search(): - """Search posts.""" - query = request.args.get("q") - if not query: - return jsonify({"error": "Search query 'q' is required"}), 400 - - category_id = request.args.get("categoryId", type=int) +@api.get("/posts") +def api_list_posts(): page = request.args.get("page", default=1, type=int) limit = request.args.get("limit", default=20, type=int) + visibility = request.args.get("visibility") user_id = request.args.get("user_id", type=int) - results = search_posts( - query=query, - category_id=category_id, - page=page, - limit=limit, - requesting_user_id=user_id - ) - - return jsonify({ - "results": results, - "pagination": get_pagination_info(len(results) * 5, page, limit) - }) + try: + rows = list_audio_posts(page=page, limit=limit, visibility=visibility, user_id=user_id) + return jsonify({"posts": rows, "page": page, "limit": min(max(1, limit), 100)}) + except Exception as e: + return _error(str(e), 500) -# ==================== TRENDING ROUTES ==================== - -@app.get("/trending/topics") -def get_trending(): - """Get trending topics.""" - limit = request.args.get("limit", default=5, type=int) - - topics = get_trending_topics(limit=limit) - - return jsonify({"topics": topics}) +@api.get("/posts/") +def api_get_post(post_id: int): + row = get_audio_post_by_id(post_id) + if not row: + return _error("Post not found.", 404) + return jsonify(row) -# ==================== BOOKMARK ROUTES ==================== +@api.patch("/posts/") +def api_patch_post(post_id: int): + payload = request.get_json(force=True, silent=False) or {} + try: + row = update_audio_post(post_id, payload) + if not row: + return _error("Post not found.", 404) + return jsonify(row) + except Exception as e: + return _error(str(e), 500) -@app.get("/bookmarks") -def get_bookmarks(): - """Get user's bookmarked posts.""" - user_id = request.args.get("user_id", type=int) - if not user_id: - return jsonify({"error": "user_id required"}), 400 +@api.get("/posts//bundle") +def api_post_bundle(post_id: int): + bundle = get_post_bundle(post_id) + if not bundle: + return _error("Post not found.", 404) + return jsonify(bundle) + + +# ==================== Archive Files ==================== + +@api.post("/posts//files") +def api_add_file(post_id: int): + payload = request.get_json(force=True, silent=False) or {} + try: + return jsonify(add_archive_file(post_id, payload)), 201 + except ValueError as e: + return _error(str(e), 400) + except Exception as e: + return _error(str(e), 500) + + +@api.get("/posts//files") +def api_list_files(post_id: int): + try: + return jsonify({"files": list_archive_files(post_id)}) + except Exception as e: + return _error(str(e), 500) + + +# ==================== Metadata ==================== + +@api.put("/posts//metadata") +def api_put_metadata(post_id: int): + payload = request.get_json(force=True, silent=False) or {} + metadata = payload.get("metadata") + if metadata is None: + return _error("'metadata' is required.", 400) + + try: + return jsonify(upsert_archive_metadata(post_id, metadata)) + except Exception as e: + return _error(str(e), 500) + + +@api.get("/posts//metadata") +def api_get_metadata(post_id: int): + row = get_archive_metadata(post_id) + if not row: + return _error("Metadata not found.", 404) + return jsonify(row) + + +# ==================== Rights ==================== + +@api.put("/posts//rights") +def api_put_rights(post_id: int): + payload = request.get_json(force=True, silent=False) or {} + try: + return jsonify(upsert_archive_rights(post_id, payload)) + except Exception as e: + return _error(str(e), 500) + + +@api.get("/posts//rights") +def api_get_rights(post_id: int): + row = get_archive_rights(post_id) + if not row: + return _error("Rights not found.", 404) + return jsonify(row) + + +# ==================== RAG Chunks ==================== + +@api.post("/posts//chunks") +def api_add_chunks(post_id: int): + payload = request.get_json(force=True, silent=False) or {} + chunks = payload.get("chunks") + + if not isinstance(chunks, list): + return _error("'chunks' must be a list.", 400) + + try: + rows = add_rag_chunks(post_id, chunks) + return jsonify({"inserted": len(rows), "chunks": rows}), 201 + except Exception as e: + return _error(str(e), 500) + + +@api.get("/posts//chunks") +def api_get_chunks(post_id: int): page = request.args.get("page", default=1, type=int) - limit = request.args.get("limit", default=20, type=int) + limit = request.args.get("limit", default=200, type=int) - bookmarks = get_user_bookmarks(user_id, page=page, limit=limit) - - return jsonify({ - "bookmarks": bookmarks, - "pagination": get_pagination_info(len(bookmarks) * 3, page, limit) - }) + try: + return jsonify({"chunks": list_rag_chunks(post_id, page=page, limit=limit)}) + except Exception as e: + return _error(str(e), 500) -# ==================== ENGAGEMENT STATS ROUTES ==================== +# ==================== Audit Log ==================== -@app.get("/posts//engagement") -def get_post_engagement_stats(post_id: int): - """Get engagement statistics for a post.""" - engagement = get_post_engagement(post_id) - return jsonify(engagement) +@api.post("/audit") +def api_create_audit(): + payload = request.get_json(force=True, silent=False) or {} + try: + return jsonify(add_audit_log(payload)), 201 + except ValueError as e: + return _error(str(e), 400) + except Exception as e: + return _error(str(e), 500) -# Example of how to use in your existing main.py: -""" -# In your main.py, import these routes: +@api.get("/audit") +def api_list_audit(): + post_id = request.args.get("post_id", type=int) + user_id = request.args.get("user_id", type=int) + page = request.args.get("page", default=1, type=int) + limit = request.args.get("limit", default=100, type=int) -from flask import Flask -# ... your other imports ... -from api_routes import * # Import all routes + try: + return jsonify({"logs": list_audit_logs(post_id=post_id, user_id=user_id, page=page, limit=limit)}) + except Exception as e: + return _error(str(e), 500) -# Or import specific routes: -# from api_routes import get_user, get_feed, get_categories, etc. -# Then your existing routes will work alongside these new ones -""" +@api.get("/posts//audit") +def api_post_audit(post_id: int): + page = request.args.get("page", default=1, type=int) + limit = request.args.get("limit", default=100, type=int) + + try: + return jsonify({"logs": list_audit_logs(post_id=post_id, page=page, limit=limit)}) + except Exception as e: + return _error(str(e), 500) diff --git a/backend/db_queries.py b/backend/db_queries.py index 444c7be..f1138c5 100644 --- a/backend/db_queries.py +++ b/backend/db_queries.py @@ -1,463 +1,317 @@ """ -Database query functions for VoiceVault backend. -Handles all read operations from Supabase. +Supabase data layer aligned with TitanForge/schema.sql. """ import os from typing import Any, Dict, List, Optional +from dotenv import load_dotenv from supabase import Client, create_client -# Initialize Supabase client -SUPABASE_URL = os.getenv("SUPABASE_URL") -SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") +load_dotenv() + +SUPABASE_URL = (os.getenv("SUPABASE_URL") or "").strip() +SUPABASE_SERVICE_ROLE_KEY = (os.getenv("SUPABASE_SERVICE_ROLE_KEY") or "").strip() if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY: - raise RuntimeError( - "Missing SUPABASE_URL or SUPABASE_SERVICE_ROLE_KEY environment variables." - ) + raise RuntimeError("Missing SUPABASE_URL or SUPABASE_SERVICE_ROLE_KEY environment variables.") supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) -# ==================== USER QUERIES ==================== +def _rows(response: Any) -> List[Dict[str, Any]]: + return getattr(response, "data", None) or [] + + +def _first(response: Any) -> Optional[Dict[str, Any]]: + data = _rows(response) + return data[0] if data else None + + +def _paginate(page: int, limit: int) -> tuple[int, int]: + page = max(1, page) + limit = min(max(1, limit), 100) + start = (page - 1) * limit + end = start + limit - 1 + return start, end + + +# ==================== Users ==================== + +def create_user(payload: Dict[str, Any]) -> Dict[str, Any]: + required = ["email", "password_hash"] + for field in required: + if not payload.get(field): + raise ValueError(f"'{field}' is required.") + + data = { + "email": payload["email"], + "password_hash": payload["password_hash"], + "display_name": payload.get("display_name"), + "avatar_url": payload.get("avatar_url"), + "bio": payload.get("bio"), + } + + response = supabase.table("users").insert(data).execute() + created = _first(response) + if not created: + raise RuntimeError("Failed to create user.") + return created + def get_user_by_id(user_id: int) -> Optional[Dict[str, Any]]: - """Get user information by user ID.""" - response = supabase.table("users").select("*").eq("user_id", user_id).execute() - data = getattr(response, "data", None) or [] - return data[0] if data else None + return _first(supabase.table("users").select("*").eq("user_id", user_id).limit(1).execute()) -def get_user_by_username(username: str) -> Optional[Dict[str, Any]]: - """Get user information by username.""" - response = supabase.table("users").select("*").eq("username", username).execute() - data = getattr(response, "data", None) or [] - return data[0] if data else None +# ==================== Audio Posts ==================== +def create_audio_post(payload: Dict[str, Any]) -> Dict[str, Any]: + required = ["user_id", "title", "storage_prefix"] + for field in required: + if payload.get(field) in (None, ""): + raise ValueError(f"'{field}' is required.") -def get_user_by_email(email: str) -> Optional[Dict[str, Any]]: - """Get user information by email.""" - response = supabase.table("users").select("*").eq("email", email).execute() - data = getattr(response, "data", None) or [] - return data[0] if data else None - - -def get_user_stats(user_id: int) -> Dict[str, int]: - """Get user statistics (posts, followers, following).""" - # Get post count - posts_response = supabase.table("posts").select("post_id", count="exact").eq("user_id", user_id).execute() - post_count = getattr(posts_response, "count", 0) or 0 - - # Get followers count - followers_response = supabase.table("user_follows").select("follower_id", count="exact").eq("following_id", user_id).execute() - followers_count = getattr(followers_response, "count", 0) or 0 - - # Get following count - following_response = supabase.table("user_follows").select("following_id", count="exact").eq("follower_id", user_id).execute() - following_count = getattr(following_response, "count", 0) or 0 - - # Get total listeners (sum of all listens on user's posts) - listens_response = supabase.rpc("get_user_total_listeners", {"p_user_id": user_id}).execute() - total_listeners = getattr(listens_response, "data", 0) or 0 - - return { - "posts": post_count, - "followers": followers_count, - "following": following_count, - "listeners": total_listeners + data = { + "user_id": payload["user_id"], + "title": payload["title"], + "description": payload.get("description"), + "visibility": payload.get("visibility", "private"), + "status": payload.get("status", "uploaded"), + "recorded_date": payload.get("recorded_date"), + "language": payload.get("language", "en"), + "storage_prefix": payload["storage_prefix"], + "manifest_sha256": payload.get("manifest_sha256"), + "bundle_sha256": payload.get("bundle_sha256"), + "published_at": payload.get("published_at"), } + response = supabase.table("audio_posts").insert(data).execute() + created = _first(response) + if not created: + raise RuntimeError("Failed to create audio post.") + return created -# ==================== POST QUERIES ==================== -def get_post_by_id(post_id: int, requesting_user_id: Optional[int] = None) -> Optional[Dict[str, Any]]: - """ - Get a single post by ID with user info and categories. - Returns None if post is private and requesting_user_id doesn't match post owner. - """ +def get_audio_post_by_id(post_id: int) -> Optional[Dict[str, Any]]: + query = ( + supabase.table("audio_posts") + .select("*, users(user_id, email, display_name, avatar_url)") + .eq("post_id", post_id) + .limit(1) + ) + return _first(query.execute()) + + +def list_audio_posts(page: int = 1, limit: int = 20, visibility: Optional[str] = None, user_id: Optional[int] = None) -> List[Dict[str, Any]]: + start, end = _paginate(page, limit) + query = supabase.table("audio_posts").select("*, users(user_id, email, display_name, avatar_url)") + + if visibility: + query = query.eq("visibility", visibility) + if user_id: + query = query.eq("user_id", user_id) + + response = query.order("created_at", desc=True).range(start, end).execute() + return _rows(response) + + +def update_audio_post(post_id: int, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: + if not updates: + return get_audio_post_by_id(post_id) + + allowed = { + "title", + "description", + "visibility", + "status", + "recorded_date", + "language", + "storage_prefix", + "manifest_sha256", + "bundle_sha256", + "published_at", + } + clean = {k: v for k, v in updates.items() if k in allowed} + if not clean: + return get_audio_post_by_id(post_id) + response = ( - supabase.table("posts") - .select(""" - *, - users!inner(user_id, username, display_name, profile_image_url), - post_categories!inner(category_id, categories!inner(name)) - """) + supabase.table("audio_posts") + .update(clean) .eq("post_id", post_id) .execute() ) - - data = getattr(response, "data", None) or [] - if not data: - return None - - post = data[0] - - # Check privacy - if post.get("is_private") and post.get("user_id") != requesting_user_id: - return None - - return _format_post(post) + return _first(response) -def get_posts_feed( - user_id: int, - page: int = 1, - limit: int = 20, - sort: str = "recent" -) -> List[Dict[str, Any]]: - """ - Get personalized feed for a user. - Includes posts from followed users and followed categories. - """ - offset = (page - 1) * limit +# ==================== Archive Files ==================== - # Base query - query = ( - supabase.table("posts") - .select(""" - *, - users!inner(user_id, username, display_name, profile_image_url), - post_categories(category_id, categories(name)) - """) - .eq("is_private", False) - ) +def add_archive_file(post_id: int, payload: Dict[str, Any]) -> Dict[str, Any]: + required = ["role", "path", "sha256"] + for field in required: + if not payload.get(field): + raise ValueError(f"'{field}' is required.") - # Apply sorting - if sort == "recent": - query = query.order("created_at", desc=True) - elif sort == "popular": - # Would need a view or function to sort by engagement - query = query.order("created_at", desc=True) - - response = query.range(offset, offset + limit - 1).execute() - data = getattr(response, "data", None) or [] - - return [_format_post(post) for post in data] - - -def get_user_posts( - user_id: int, - filter_type: str = "all", - page: int = 1, - limit: int = 20 -) -> List[Dict[str, Any]]: - """ - Get posts created by a specific user. - filter_type: 'all', 'public', 'private' - """ - offset = (page - 1) * limit - - query = ( - supabase.table("posts") - .select(""" - *, - post_categories(category_id, categories(name)) - """) - .eq("user_id", user_id) - ) - - # Apply filter - if filter_type == "public": - query = query.eq("is_private", False) - elif filter_type == "private": - query = query.eq("is_private", True) - - response = query.order("created_at", desc=True).range(offset, offset + limit - 1).execute() - data = getattr(response, "data", None) or [] - - return [_format_post(post) for post in data] - - -def get_post_engagement(post_id: int) -> Dict[str, int]: - """Get engagement metrics for a post (likes, comments, listens).""" - # Get likes count - likes_response = supabase.table("post_likes").select("user_id", count="exact").eq("post_id", post_id).execute() - likes_count = getattr(likes_response, "count", 0) or 0 - - # Get comments count - comments_response = supabase.table("comments").select("comment_id", count="exact").eq("post_id", post_id).execute() - comments_count = getattr(comments_response, "count", 0) or 0 - - # Get listens count - listens_response = supabase.table("audio_listening_history").select("history_id", count="exact").eq("post_id", post_id).execute() - listens_count = getattr(listens_response, "count", 0) or 0 - - # Get bookmarks count - bookmarks_response = supabase.table("bookmarks").select("user_id", count="exact").eq("post_id", post_id).execute() - bookmarks_count = getattr(bookmarks_response, "count", 0) or 0 - - return { - "likes": likes_count, - "comments": comments_count, - "listens": listens_count, - "bookmarks": bookmarks_count + data = { + "post_id": post_id, + "role": payload["role"], + "path": payload["path"], + "content_type": payload.get("content_type"), + "size_bytes": payload.get("size_bytes"), + "sha256": payload["sha256"], } - -def check_user_post_interactions(user_id: int, post_id: int) -> Dict[str, bool]: - """Check if user has liked/bookmarked a post.""" - # Check if liked - like_response = supabase.table("post_likes").select("user_id").eq("user_id", user_id).eq("post_id", post_id).execute() - is_liked = len(getattr(like_response, "data", []) or []) > 0 - - # Check if bookmarked - bookmark_response = supabase.table("bookmarks").select("user_id").eq("user_id", user_id).eq("post_id", post_id).execute() - is_bookmarked = len(getattr(bookmark_response, "data", []) or []) > 0 - - return { - "is_liked": is_liked, - "is_bookmarked": is_bookmarked - } + response = supabase.table("archive_files").insert(data).execute() + created = _first(response) + if not created: + raise RuntimeError("Failed to add archive file.") + return created -# ==================== CATEGORY QUERIES ==================== - -def get_all_categories() -> List[Dict[str, Any]]: - """Get all categories.""" - response = supabase.table("categories").select("*").execute() - data = getattr(response, "data", None) or [] - return data - - -def get_category_by_id(category_id: int) -> Optional[Dict[str, Any]]: - """Get category by ID.""" - response = supabase.table("categories").select("*").eq("category_id", category_id).execute() - data = getattr(response, "data", None) or [] - return data[0] if data else None - - -def get_posts_by_category( - category_id: int, - page: int = 1, - limit: int = 20 -) -> List[Dict[str, Any]]: - """Get posts in a specific category.""" - offset = (page - 1) * limit - +def list_archive_files(post_id: int) -> List[Dict[str, Any]]: response = ( - supabase.table("post_categories") - .select(""" - posts!inner(*, - users!inner(user_id, username, display_name, profile_image_url), - post_categories(category_id, categories(name)) - ) - """) - .eq("category_id", category_id) - .eq("posts.is_private", False) - .order("posts.created_at", desc=True) - .range(offset, offset + limit - 1) - .execute() - ) - - data = getattr(response, "data", None) or [] - return [_format_post(item["posts"]) for item in data] - - -# ==================== COMMENT QUERIES ==================== - -def get_post_comments(post_id: int, page: int = 1, limit: int = 20) -> List[Dict[str, Any]]: - """Get comments for a specific post.""" - offset = (page - 1) * limit - - response = ( - supabase.table("comments") - .select(""" - *, - users!inner(user_id, username, display_name, profile_image_url) - """) - .eq("post_id", post_id) - .order("created_at", desc=True) - .range(offset, offset + limit - 1) - .execute() - ) - - data = getattr(response, "data", None) or [] - return data - - -# ==================== HISTORY QUERIES ==================== - -def get_listening_history( - user_id: int, - page: int = 1, - limit: int = 50, - completed_only: bool = False -) -> List[Dict[str, Any]]: - """Get user's listening history.""" - offset = (page - 1) * limit - - query = ( - supabase.table("audio_listening_history") - .select(""" - *, - posts!inner(*, - users!inner(user_id, username, display_name, profile_image_url) - ) - """) - .eq("user_id", user_id) - ) - - if completed_only: - query = query.eq("completed", True) - - response = query.order("listened_at", desc=True).range(offset, offset + limit - 1).execute() - data = getattr(response, "data", None) or [] - - return data - - -def get_search_history(user_id: int, page: int = 1, limit: int = 50) -> List[Dict[str, Any]]: - """Get user's search history.""" - offset = (page - 1) * limit - - response = ( - supabase.table("search_history") + supabase.table("archive_files") .select("*") - .eq("user_id", user_id) - .order("searched_at", desc=True) - .range(offset, offset + limit - 1) + .eq("post_id", post_id) + .order("created_at", desc=False) .execute() ) - - data = getattr(response, "data", None) or [] - return data + return _rows(response) -# ==================== SEARCH QUERIES ==================== +# ==================== Metadata / Rights ==================== -def search_posts( - query: str, - category_id: Optional[int] = None, - page: int = 1, - limit: int = 20, - requesting_user_id: Optional[int] = None -) -> List[Dict[str, Any]]: - """ - Search posts by text query. - Uses full-text search on title and transcribed_text. - """ - offset = (page - 1) * limit +def upsert_archive_metadata(post_id: int, metadata: str) -> Dict[str, Any]: + data = {"post_id": post_id, "metadata": metadata} - # Basic search using ilike (for simple text matching) - # For production, you'd want to use PostgreSQL full-text search - search_query = ( - supabase.table("posts") - .select(""" - *, - users!inner(user_id, username, display_name, profile_image_url), - post_categories(category_id, categories(name)) - """) - .eq("is_private", False) - .or_(f"title.ilike.%{query}%,transcribed_text.ilike.%{query}%") - ) + existing = _first(supabase.table("archive_metadata").select("post_id").eq("post_id", post_id).limit(1).execute()) + if existing: + response = supabase.table("archive_metadata").update({"metadata": metadata}).eq("post_id", post_id).execute() + else: + response = supabase.table("archive_metadata").insert(data).execute() - if category_id: - # This would need a join with post_categories - pass - - response = search_query.order("created_at", desc=True).range(offset, offset + limit - 1).execute() - data = getattr(response, "data", None) or [] - - return [_format_post(post) for post in data] + row = _first(response) + if not row: + raise RuntimeError("Failed to upsert archive metadata.") + return row -# ==================== TRENDING QUERIES ==================== - -def get_trending_topics(limit: int = 5) -> List[Dict[str, Any]]: - """ - Get trending categories based on recent post activity. - This is a simplified version - for production, you'd want a materialized view. - """ - # This would ideally be a database view or function - # For now, we'll get categories with most posts in last 7 days - response = ( - supabase.rpc("get_trending_categories", {"p_limit": limit}) - .execute() - ) - - data = getattr(response, "data", None) or [] - return data +def get_archive_metadata(post_id: int) -> Optional[Dict[str, Any]]: + return _first(supabase.table("archive_metadata").select("*").eq("post_id", post_id).limit(1).execute()) -# ==================== BOOKMARKS QUERIES ==================== - -def get_user_bookmarks(user_id: int, page: int = 1, limit: int = 20) -> List[Dict[str, Any]]: - """Get user's bookmarked posts.""" - offset = (page - 1) * limit - - response = ( - supabase.table("bookmarks") - .select(""" - *, - posts!inner(*, - users!inner(user_id, username, display_name, profile_image_url), - post_categories(category_id, categories(name)) - ) - """) - .eq("user_id", user_id) - .order("bookmarked_at", desc=True) - .range(offset, offset + limit - 1) - .execute() - ) - - data = getattr(response, "data", None) or [] - return [_format_post(item["posts"]) for item in data] - - -# ==================== HELPER FUNCTIONS ==================== - -def _format_post(post: Dict[str, Any]) -> Dict[str, Any]: - """Format post data to include engagement metrics and clean structure.""" - post_id = post.get("post_id") - - # Get engagement metrics - engagement = get_post_engagement(post_id) if post_id else {"likes": 0, "comments": 0, "listens": 0, "bookmarks": 0} - - # Extract categories - categories = [] - if "post_categories" in post and post["post_categories"]: - for pc in post["post_categories"]: - if "categories" in pc and pc["categories"]: - categories.append(pc["categories"]) - - # Clean user data - user_data = post.get("users", {}) - - return { - "id": post.get("post_id"), - "user_id": post.get("user_id"), - "title": post.get("title"), - "audio_url": post.get("audio_url"), - "transcribed_text": post.get("transcribed_text"), - "audio_duration_seconds": post.get("audio_duration_seconds"), - "image_url": post.get("image_url"), - "is_private": post.get("is_private"), - "created_at": post.get("created_at"), - "updated_at": post.get("updated_at"), - "user": { - "id": user_data.get("user_id"), - "username": user_data.get("username"), - "display_name": user_data.get("display_name"), - "profile_image_url": user_data.get("profile_image_url") - }, - "categories": categories, - "likes": engagement["likes"], - "comments": engagement["comments"], - "listens": engagement["listens"], - "bookmarks": engagement["bookmarks"] +def upsert_archive_rights(post_id: int, payload: Dict[str, Any]) -> Dict[str, Any]: + data = { + "post_id": post_id, + "has_speaker_consent": payload.get("has_speaker_consent", False), + "license": payload.get("license"), + "consent_notes": payload.get("consent_notes"), + "allowed_use": payload.get("allowed_use"), + "restrictions": payload.get("restrictions"), } + existing = _first(supabase.table("archive_rights").select("post_id").eq("post_id", post_id).limit(1).execute()) + if existing: + response = ( + supabase.table("archive_rights") + .update({k: v for k, v in data.items() if k != "post_id"}) + .eq("post_id", post_id) + .execute() + ) + else: + response = supabase.table("archive_rights").insert(data).execute() -def get_pagination_info(total_count: int, page: int, limit: int) -> Dict[str, Any]: - """Calculate pagination information.""" - total_pages = (total_count + limit - 1) // limit - has_more = page < total_pages + row = _first(response) + if not row: + raise RuntimeError("Failed to upsert archive rights.") + return row + + +def get_archive_rights(post_id: int) -> Optional[Dict[str, Any]]: + return _first(supabase.table("archive_rights").select("*").eq("post_id", post_id).limit(1).execute()) + + +# ==================== RAG Chunks ==================== + +def add_rag_chunks(post_id: int, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + if not chunks: + return [] + + rows = [] + for c in chunks: + rows.append( + { + "post_id": post_id, + "start_sec": c.get("start_sec"), + "end_sec": c.get("end_sec"), + "text": c.get("text"), + "confidence": c.get("confidence"), + "embedding": c.get("embedding"), + } + ) + + response = supabase.table("rag_chunks").insert(rows).execute() + return _rows(response) + + +def list_rag_chunks(post_id: int, page: int = 1, limit: int = 200) -> List[Dict[str, Any]]: + start, end = _paginate(page, limit) + response = ( + supabase.table("rag_chunks") + .select("*") + .eq("post_id", post_id) + .order("start_sec", desc=False) + .range(start, end) + .execute() + ) + return _rows(response) + + +# ==================== Audit Log ==================== + +def add_audit_log(payload: Dict[str, Any]) -> Dict[str, Any]: + if not payload.get("action"): + raise ValueError("'action' is required.") + + data = { + "post_id": payload.get("post_id"), + "user_id": payload.get("user_id"), + "action": payload["action"], + "details": payload.get("details"), + } + + response = supabase.table("audit_log").insert(data).execute() + row = _first(response) + if not row: + raise RuntimeError("Failed to create audit log.") + return row + + +def list_audit_logs(post_id: Optional[int] = None, user_id: Optional[int] = None, page: int = 1, limit: int = 100) -> List[Dict[str, Any]]: + start, end = _paginate(page, limit) + query = supabase.table("audit_log").select("*") + + if post_id is not None: + query = query.eq("post_id", post_id) + if user_id is not None: + query = query.eq("user_id", user_id) + + response = query.order("created_at", desc=True).range(start, end).execute() + return _rows(response) + + +# ==================== Aggregate View ==================== + +def get_post_bundle(post_id: int) -> Dict[str, Any]: + post = get_audio_post_by_id(post_id) + if not post: + return {} return { - "current_page": page, - "total_pages": total_pages, - "total_items": total_count, - "items_per_page": limit, - "has_more": has_more + "post": post, + "files": list_archive_files(post_id), + "metadata": get_archive_metadata(post_id), + "rights": get_archive_rights(post_id), + "rag_chunks": list_rag_chunks(post_id, page=1, limit=1000), + "audit_log": list_audit_logs(post_id=post_id, page=1, limit=200), }