updates on archives

This commit is contained in:
Gaumit Kauts
2026-02-14 21:46:48 -07:00
parent 3fc51ba40c
commit 09b7d59d6c
4 changed files with 464 additions and 669 deletions

View File

@@ -1,299 +1,240 @@
"""
Example Flask routes using db_queries.py
Add these routes to your existing Flask app
Flask API routes aligned with TitanForge/schema.sql.
"""
from flask import Flask, jsonify, request
from flask import Blueprint, jsonify, request
# Import all query functions
from db_queries import (
add_archive_file,
add_audit_log,
add_rag_chunks,
create_audio_post,
create_user,
get_archive_metadata,
get_archive_rights,
get_audio_post_by_id,
get_post_bundle,
get_user_by_id,
get_user_by_username,
get_user_stats,
get_post_by_id,
get_posts_feed,
get_user_posts,
get_post_engagement,
check_user_post_interactions,
get_all_categories,
get_posts_by_category,
get_post_comments,
get_listening_history,
get_search_history,
search_posts,
get_trending_topics,
get_user_bookmarks,
get_pagination_info
list_archive_files,
list_audio_posts,
list_audit_logs,
list_rag_chunks,
update_audio_post,
upsert_archive_metadata,
upsert_archive_rights,
)
# Assuming you have the app instance
# app = Flask(__name__)
api = Blueprint("api", __name__, url_prefix="/api")
# ==================== USER ROUTES ====================
def _error(message: str, status: int = 400):
return jsonify({"error": message}), status
@app.get("/users/<int:user_id>")
def get_user(user_id: int):
"""Get user profile with stats."""
@api.get("/health")
def health():
return jsonify({"status": "ok"})
# ==================== Users ====================
@api.post("/users")
def api_create_user():
payload = request.get_json(force=True, silent=False) or {}
try:
return jsonify(create_user(payload)), 201
except ValueError as e:
return _error(str(e), 400)
except Exception as e:
return _error(str(e), 500)
@api.get("/users/<int:user_id>")
def api_get_user(user_id: int):
user = get_user_by_id(user_id)
if not user:
return jsonify({"error": "User not found"}), 404
stats = get_user_stats(user_id)
return jsonify({
"user": {
"id": user["user_id"],
"username": user["username"],
"email": user["email"],
"display_name": user["display_name"],
"bio": user.get("bio"),
"profile_image_url": user.get("profile_image_url"),
"location": None, # Add to schema if needed
"created_at": user["created_at"],
"stats": stats
}
})
return _error("User not found.", 404)
return jsonify(user)
@app.get("/users/me")
def get_current_user():
"""Get current user profile (requires auth)."""
# In production, get user_id from JWT token
user_id = request.args.get("user_id", type=int)
if not user_id:
return jsonify({"error": "user_id required"}), 400
# ==================== Audio Posts ====================
return get_user(user_id)
@api.post("/posts")
def api_create_post():
payload = request.get_json(force=True, silent=False) or {}
try:
return jsonify(create_audio_post(payload)), 201
except ValueError as e:
return _error(str(e), 400)
except Exception as e:
return _error(str(e), 500)
# ==================== POST/FEED ROUTES ====================
@app.get("/posts")
def get_feed():
"""Get personalized feed."""
user_id = request.args.get("user_id", type=int)
page = request.args.get("page", default=1, type=int)
limit = request.args.get("limit", default=20, type=int)
sort = request.args.get("sort", default="recent")
if not user_id:
return jsonify({"error": "user_id required"}), 400
if limit > 50:
limit = 50
posts = get_posts_feed(user_id, page=page, limit=limit, sort=sort)
# For production, you'd get total count from a separate query
total_count = len(posts) * 10 # Placeholder
pagination = get_pagination_info(total_count, page, limit)
return jsonify({
"posts": posts,
"pagination": pagination
})
@app.get("/posts/<int:post_id>")
def get_single_post(post_id: int):
"""Get a single post by ID."""
user_id = request.args.get("user_id", type=int) # For privacy check
post = get_post_by_id(post_id, requesting_user_id=user_id)
if not post:
return jsonify({"error": "Post not found or private"}), 404
# Add user interaction info if user_id provided
if user_id:
interactions = check_user_post_interactions(user_id, post_id)
post.update(interactions)
return jsonify({"post": post})
@app.get("/posts/user/<int:user_id>")
def get_posts_by_user(user_id: int):
"""Get posts by a specific user."""
filter_type = request.args.get("filter", default="all")
page = request.args.get("page", default=1, type=int)
limit = request.args.get("limit", default=20, type=int)
if limit > 50:
limit = 50
posts = get_user_posts(user_id, filter_type=filter_type, page=page, limit=limit)
return jsonify({
"posts": posts,
"pagination": get_pagination_info(len(posts) * 5, page, limit) # Placeholder
})
@app.get("/posts/user/me")
def get_my_posts():
"""Get current user's posts."""
user_id = request.args.get("user_id", type=int)
if not user_id:
return jsonify({"error": "user_id required"}), 400
return get_posts_by_user(user_id)
# ==================== CATEGORY ROUTES ====================
@app.get("/categories")
def get_categories():
"""Get all categories."""
categories = get_all_categories()
return jsonify({"categories": categories})
@app.get("/categories/<int:category_id>/posts")
def get_category_posts(category_id: int):
"""Get posts in a category."""
page = request.args.get("page", default=1, type=int)
limit = request.args.get("limit", default=20, type=int)
posts = get_posts_by_category(category_id, page=page, limit=limit)
return jsonify({
"posts": posts,
"pagination": get_pagination_info(len(posts) * 5, page, limit)
})
# ==================== COMMENT ROUTES ====================
@app.get("/posts/<int:post_id>/comments")
def get_comments(post_id: int):
"""Get comments for a post."""
page = request.args.get("page", default=1, type=int)
limit = request.args.get("limit", default=20, type=int)
comments = get_post_comments(post_id, page=page, limit=limit)
return jsonify({
"comments": comments,
"pagination": get_pagination_info(len(comments) * 3, page, limit)
})
# ==================== HISTORY ROUTES ====================
@app.get("/history/listening")
def get_user_listening_history():
"""Get user's listening history."""
user_id = request.args.get("user_id", type=int)
if not user_id:
return jsonify({"error": "user_id required"}), 400
page = request.args.get("page", default=1, type=int)
limit = request.args.get("limit", default=50, type=int)
completed_only = request.args.get("completed", default="false").lower() == "true"
history = get_listening_history(user_id, page=page, limit=limit, completed_only=completed_only)
return jsonify({
"history": history,
"pagination": get_pagination_info(len(history) * 3, page, limit)
})
@app.get("/history/searches")
def get_user_search_history():
"""Get user's search history."""
user_id = request.args.get("user_id", type=int)
if not user_id:
return jsonify({"error": "user_id required"}), 400
page = request.args.get("page", default=1, type=int)
limit = request.args.get("limit", default=50, type=int)
searches = get_search_history(user_id, page=page, limit=limit)
return jsonify({"searches": searches})
# ==================== SEARCH ROUTES ====================
@app.get("/search")
def search():
"""Search posts."""
query = request.args.get("q")
if not query:
return jsonify({"error": "Search query 'q' is required"}), 400
category_id = request.args.get("categoryId", type=int)
@api.get("/posts")
def api_list_posts():
page = request.args.get("page", default=1, type=int)
limit = request.args.get("limit", default=20, type=int)
visibility = request.args.get("visibility")
user_id = request.args.get("user_id", type=int)
results = search_posts(
query=query,
category_id=category_id,
page=page,
limit=limit,
requesting_user_id=user_id
)
return jsonify({
"results": results,
"pagination": get_pagination_info(len(results) * 5, page, limit)
})
try:
rows = list_audio_posts(page=page, limit=limit, visibility=visibility, user_id=user_id)
return jsonify({"posts": rows, "page": page, "limit": min(max(1, limit), 100)})
except Exception as e:
return _error(str(e), 500)
# ==================== TRENDING ROUTES ====================
@app.get("/trending/topics")
def get_trending():
"""Get trending topics."""
limit = request.args.get("limit", default=5, type=int)
topics = get_trending_topics(limit=limit)
return jsonify({"topics": topics})
@api.get("/posts/<int:post_id>")
def api_get_post(post_id: int):
row = get_audio_post_by_id(post_id)
if not row:
return _error("Post not found.", 404)
return jsonify(row)
# ==================== BOOKMARK ROUTES ====================
@api.patch("/posts/<int:post_id>")
def api_patch_post(post_id: int):
payload = request.get_json(force=True, silent=False) or {}
try:
row = update_audio_post(post_id, payload)
if not row:
return _error("Post not found.", 404)
return jsonify(row)
except Exception as e:
return _error(str(e), 500)
@app.get("/bookmarks")
def get_bookmarks():
"""Get user's bookmarked posts."""
user_id = request.args.get("user_id", type=int)
if not user_id:
return jsonify({"error": "user_id required"}), 400
@api.get("/posts/<int:post_id>/bundle")
def api_post_bundle(post_id: int):
bundle = get_post_bundle(post_id)
if not bundle:
return _error("Post not found.", 404)
return jsonify(bundle)
# ==================== Archive Files ====================
@api.post("/posts/<int:post_id>/files")
def api_add_file(post_id: int):
payload = request.get_json(force=True, silent=False) or {}
try:
return jsonify(add_archive_file(post_id, payload)), 201
except ValueError as e:
return _error(str(e), 400)
except Exception as e:
return _error(str(e), 500)
@api.get("/posts/<int:post_id>/files")
def api_list_files(post_id: int):
try:
return jsonify({"files": list_archive_files(post_id)})
except Exception as e:
return _error(str(e), 500)
# ==================== Metadata ====================
@api.put("/posts/<int:post_id>/metadata")
def api_put_metadata(post_id: int):
payload = request.get_json(force=True, silent=False) or {}
metadata = payload.get("metadata")
if metadata is None:
return _error("'metadata' is required.", 400)
try:
return jsonify(upsert_archive_metadata(post_id, metadata))
except Exception as e:
return _error(str(e), 500)
@api.get("/posts/<int:post_id>/metadata")
def api_get_metadata(post_id: int):
row = get_archive_metadata(post_id)
if not row:
return _error("Metadata not found.", 404)
return jsonify(row)
# ==================== Rights ====================
@api.put("/posts/<int:post_id>/rights")
def api_put_rights(post_id: int):
payload = request.get_json(force=True, silent=False) or {}
try:
return jsonify(upsert_archive_rights(post_id, payload))
except Exception as e:
return _error(str(e), 500)
@api.get("/posts/<int:post_id>/rights")
def api_get_rights(post_id: int):
row = get_archive_rights(post_id)
if not row:
return _error("Rights not found.", 404)
return jsonify(row)
# ==================== RAG Chunks ====================
@api.post("/posts/<int:post_id>/chunks")
def api_add_chunks(post_id: int):
payload = request.get_json(force=True, silent=False) or {}
chunks = payload.get("chunks")
if not isinstance(chunks, list):
return _error("'chunks' must be a list.", 400)
try:
rows = add_rag_chunks(post_id, chunks)
return jsonify({"inserted": len(rows), "chunks": rows}), 201
except Exception as e:
return _error(str(e), 500)
@api.get("/posts/<int:post_id>/chunks")
def api_get_chunks(post_id: int):
page = request.args.get("page", default=1, type=int)
limit = request.args.get("limit", default=20, type=int)
limit = request.args.get("limit", default=200, type=int)
bookmarks = get_user_bookmarks(user_id, page=page, limit=limit)
return jsonify({
"bookmarks": bookmarks,
"pagination": get_pagination_info(len(bookmarks) * 3, page, limit)
})
try:
return jsonify({"chunks": list_rag_chunks(post_id, page=page, limit=limit)})
except Exception as e:
return _error(str(e), 500)
# ==================== ENGAGEMENT STATS ROUTES ====================
# ==================== Audit Log ====================
@app.get("/posts/<int:post_id>/engagement")
def get_post_engagement_stats(post_id: int):
"""Get engagement statistics for a post."""
engagement = get_post_engagement(post_id)
return jsonify(engagement)
@api.post("/audit")
def api_create_audit():
payload = request.get_json(force=True, silent=False) or {}
try:
return jsonify(add_audit_log(payload)), 201
except ValueError as e:
return _error(str(e), 400)
except Exception as e:
return _error(str(e), 500)
# Example of how to use in your existing main.py:
"""
# In your main.py, import these routes:
@api.get("/audit")
def api_list_audit():
post_id = request.args.get("post_id", type=int)
user_id = request.args.get("user_id", type=int)
page = request.args.get("page", default=1, type=int)
limit = request.args.get("limit", default=100, type=int)
from flask import Flask
# ... your other imports ...
from api_routes import * # Import all routes
try:
return jsonify({"logs": list_audit_logs(post_id=post_id, user_id=user_id, page=page, limit=limit)})
except Exception as e:
return _error(str(e), 500)
# Or import specific routes:
# from api_routes import get_user, get_feed, get_categories, etc.
# Then your existing routes will work alongside these new ones
"""
@api.get("/posts/<int:post_id>/audit")
def api_post_audit(post_id: int):
page = request.args.get("page", default=1, type=int)
limit = request.args.get("limit", default=100, type=int)
try:
return jsonify({"logs": list_audit_logs(post_id=post_id, page=page, limit=limit)})
except Exception as e:
return _error(str(e), 500)