Files
VoiceVault/backend/api_routes.py

710 lines
23 KiB
Python
Raw Normal View History

2026-02-14 21:13:12 -07:00
"""
2026-02-14 21:46:48 -07:00
Flask API routes aligned with TitanForge/schema.sql.
2026-02-14 22:04:39 -07:00
Includes auth, upload+transcription, history, and RAG search workflow.
2026-02-14 21:13:12 -07:00
"""
2026-02-14 22:04:39 -07:00
import hashlib
2026-02-15 01:52:26 -07:00
import io
2026-02-14 22:04:39 -07:00
import json
import os
import uuid
2026-02-15 01:21:05 -07:00
import re
2026-02-15 01:52:26 -07:00
import zipfile
2026-02-14 22:04:39 -07:00
from pathlib import Path
2026-02-15 01:21:05 -07:00
from typing import Any, List
2026-02-14 22:04:39 -07:00
from dotenv import load_dotenv
from faster_whisper import WhisperModel
2026-02-15 01:52:26 -07:00
from flask import Blueprint, jsonify, request, send_file
2026-02-14 22:04:39 -07:00
from werkzeug.security import check_password_hash, generate_password_hash
from werkzeug.utils import secure_filename
2026-02-14 21:13:12 -07:00
from db_queries import (
2026-02-14 21:46:48 -07:00
add_archive_file,
add_audit_log,
add_rag_chunks,
create_audio_post,
create_user,
2026-02-15 01:52:26 -07:00
download_storage_object_by_stored_path,
get_archive_file_by_role,
2026-02-14 21:46:48 -07:00
get_archive_metadata,
get_original_audio_url,
2026-02-14 21:46:48 -07:00
get_archive_rights,
get_audio_post_by_id,
get_post_bundle,
2026-02-14 22:04:39 -07:00
get_user_by_email,
2026-02-14 21:13:12 -07:00
get_user_by_id,
2026-02-14 21:46:48 -07:00
list_archive_files,
list_audio_posts,
list_audit_logs,
list_rag_chunks,
2026-02-14 22:04:39 -07:00
list_user_history,
search_rag_chunks,
2026-02-15 01:21:05 -07:00
search_rag_chunks_vector,
2026-02-14 21:46:48 -07:00
update_audio_post,
upload_storage_object,
2026-02-14 21:46:48 -07:00
upsert_archive_metadata,
upsert_archive_rights,
2026-02-14 21:13:12 -07:00
)
2026-02-14 22:04:39 -07:00
load_dotenv()
2026-02-14 21:46:48 -07:00
api = Blueprint("api", __name__, url_prefix="/api")
2026-02-14 21:13:12 -07:00
2026-02-14 22:04:39 -07:00
UPLOAD_DIR = Path(os.getenv("BACKEND_UPLOAD_DIR", "uploads"))
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
ALLOWED_MEDIA_EXTENSIONS = {"mp4", "mov", "mkv", "webm", "m4a", "mp3", "wav", "ogg", "flac"}
WHISPER_MODEL = os.getenv("WHISPER_MODEL", "base")
WHISPER_DEVICE = os.getenv("WHISPER_DEVICE", "cpu")
WHISPER_COMPUTE_TYPE = os.getenv("WHISPER_COMPUTE_TYPE", "int8")
ARCHIVE_BUCKET = os.getenv("SUPABASE_BUCKET", os.getenv("SUPABASE_ARCHIVE_BUCKET", "archives"))
2026-02-14 22:04:39 -07:00
_whisper_model: WhisperModel | None = None
def _model() -> WhisperModel:
global _whisper_model
if _whisper_model is None:
_whisper_model = WhisperModel(
WHISPER_MODEL,
device=WHISPER_DEVICE,
compute_type=WHISPER_COMPUTE_TYPE,
)
return _whisper_model
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
def _error(message: str, status: int = 400):
return jsonify({"error": message}), status
2026-02-14 21:13:12 -07:00
2026-02-14 22:04:39 -07:00
def _allowed_file(filename: str) -> bool:
if "." not in filename:
return False
return filename.rsplit(".", 1)[1].lower() in ALLOWED_MEDIA_EXTENSIONS
def _sha256(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
while True:
chunk = f.read(1024 * 1024)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
def _build_prompt(transcript_text: str, title: str) -> str:
return (
"You are an archive assistant. Use the following transcribed audio as source context. "
f"Post title: {title}.\n\n"
"Transcript:\n"
f"{transcript_text}\n\n"
"Answer user questions grounded in this transcript."
)
2026-02-15 01:21:05 -07:00
def _add_audio_url(post: dict[str, Any]) -> dict[str, Any]:
2026-02-15 00:24:49 -07:00
"""Add signed audio URL to post if ready"""
if post.get("status") == "ready":
try:
audio_data = get_original_audio_url(post["post_id"], expires_in=3600)
post["audio_url"] = audio_data["signed_url"]
except:
pass
return post
2026-02-14 22:04:39 -07:00
2026-02-15 01:21:05 -07:00
def _local_embedding(text: str, dimensions: int = 1536) -> List[float]:
"""
Free deterministic embedding fallback (offline).
Replace with model-based embeddings later if needed.
"""
vector = [0.0] * dimensions
tokens = re.findall(r"[A-Za-z0-9']+", text.lower())
if not tokens:
return vector
for token in tokens:
digest = hashlib.sha256(token.encode("utf-8")).digest()
idx = int.from_bytes(digest[:4], "big") % dimensions
sign = 1.0 if (digest[4] & 1) == 0 else -1.0
weight = 1.0 + (digest[5] / 255.0) * 0.25
vector[idx] += sign * weight
norm = sum(v * v for v in vector) ** 0.5
if norm > 0:
vector = [v / norm for v in vector]
return vector
2026-02-14 22:04:39 -07:00
2026-02-14 21:46:48 -07:00
@api.get("/health")
def health():
2026-02-14 22:04:39 -07:00
return jsonify({
"status": "ok",
"whisper_model": WHISPER_MODEL,
"whisper_device": WHISPER_DEVICE,
"whisper_compute_type": WHISPER_COMPUTE_TYPE,
})
# ==================== Auth ====================
@api.post("/auth/register")
def api_register():
payload = request.get_json(force=True, silent=False) or {}
email = (payload.get("email") or "").strip().lower()
password = payload.get("password") or ""
if not email or not password:
return _error("'email' and 'password' are required.", 400)
existing = get_user_by_email(email)
if existing:
return _error("User already exists for this email.", 409)
try:
user = create_user(
{
"email": email,
"password_hash": generate_password_hash(password),
"display_name": payload.get("display_name"),
"avatar_url": payload.get("avatar_url"),
"bio": payload.get("bio"),
}
)
add_audit_log({"user_id": user["user_id"], "action": "user.register", "details": json.dumps({"email": email})})
return jsonify({
"user": {
"user_id": user["user_id"],
"email": user["email"],
"display_name": user.get("display_name"),
}
}), 201
except Exception as e:
return _error(str(e), 500)
@api.post("/auth/login")
def api_login():
payload = request.get_json(force=True, silent=False) or {}
email = (payload.get("email") or "").strip().lower()
password = payload.get("password") or ""
if not email or not password:
return _error("'email' and 'password' are required.", 400)
user = get_user_by_email(email)
if not user:
return _error("Invalid credentials.", 401)
if not check_password_hash(user["password_hash"], password):
return _error("Invalid credentials.", 401)
add_audit_log({"user_id": user["user_id"], "action": "user.login", "details": json.dumps({"email": email})})
return jsonify(
{
"user": {
"user_id": user["user_id"],
"email": user["email"],
"display_name": user.get("display_name"),
"avatar_url": user.get("avatar_url"),
"bio": user.get("bio"),
}
}
)
# ==================== Upload + Prompt ====================
@api.post("/posts/upload")
def api_upload_post():
if "file" not in request.files:
return _error("Missing 'file' in form-data.", 400)
2026-02-14 21:13:12 -07:00
2026-02-14 22:04:39 -07:00
media = request.files["file"]
if not media.filename:
return _error("Filename is empty.", 400)
if not _allowed_file(media.filename):
return _error("Unsupported media extension.", 400)
2026-02-14 21:13:12 -07:00
2026-02-14 22:04:39 -07:00
user_id_raw = request.form.get("user_id")
title = (request.form.get("title") or "Untitled recording").strip()
description = request.form.get("description")
visibility = (request.form.get("visibility") or "private").strip().lower()
language = (request.form.get("language") or "en").strip().lower()
if visibility not in {"private", "public"}:
return _error("'visibility' must be 'private' or 'public'.", 400)
try:
user_id = int(user_id_raw)
except (TypeError, ValueError):
return _error("'user_id' is required and must be an integer.", 400)
user = get_user_by_id(user_id)
if not user:
return _error("User not found.", 404)
post_uuid = str(uuid.uuid4())
safe_name = secure_filename(media.filename)
storage_prefix = f"archive/{user_id}/{post_uuid}"
storage_object_path = f"{user_id}/{post_uuid}/original/{safe_name}"
2026-02-14 22:04:39 -07:00
saved_path = UPLOAD_DIR / f"{post_uuid}_{safe_name}"
media.save(saved_path)
created_post = None
try:
created_post = create_audio_post(
{
"user_id": user_id,
"title": title,
"description": description,
"visibility": visibility,
"status": "processing",
"language": language,
"storage_prefix": storage_prefix,
}
)
post_id = int(created_post["post_id"])
media_sha = _sha256(saved_path)
with saved_path.open("rb") as media_file:
upload_storage_object(
bucket=ARCHIVE_BUCKET,
object_path=storage_object_path,
content=media_file.read(),
content_type=media.mimetype or "application/octet-stream",
upsert=False,
)
2026-02-14 22:04:39 -07:00
add_archive_file(
post_id,
{
"role": "original_audio",
"path": f"{ARCHIVE_BUCKET}/{storage_object_path}",
2026-02-14 22:04:39 -07:00
"content_type": media.mimetype,
"size_bytes": saved_path.stat().st_size,
"sha256": media_sha,
},
)
segments, _info = _model().transcribe(str(saved_path))
rag_rows = []
transcript_parts = []
for seg in segments:
segment_text = seg.text.strip()
if not segment_text:
continue
transcript_parts.append(segment_text)
rag_rows.append(
{
"start_sec": float(seg.start),
"end_sec": float(seg.end),
"text": segment_text,
"confidence": float(seg.avg_logprob) if seg.avg_logprob is not None else None,
2026-02-15 01:21:05 -07:00
"embedding": _local_embedding(segment_text),
2026-02-14 22:04:39 -07:00
}
)
transcript_text = " ".join(transcript_parts).strip()
prompt_text = _build_prompt(transcript_text, title)
if rag_rows:
add_rag_chunks(post_id, rag_rows)
upsert_archive_metadata(
post_id,
json.dumps(
{
"prompt": prompt_text,
"transcript_length_chars": len(transcript_text),
"source_file": safe_name,
"language": language,
}
),
)
add_archive_file(
post_id,
{
"role": "transcript_txt",
"path": f"{storage_prefix}/transcript.txt",
"content_type": "text/plain",
"size_bytes": len(transcript_text.encode("utf-8")),
"sha256": hashlib.sha256(transcript_text.encode("utf-8")).hexdigest(),
},
)
update_audio_post(post_id, {"status": "ready"})
add_audit_log(
{
"post_id": post_id,
"user_id": user_id,
"action": "post.upload.transcribed",
"details": json.dumps({"visibility": visibility, "storage_prefix": storage_prefix}),
}
)
return jsonify(
{
"post_id": post_id,
"visibility": visibility,
"status": "ready",
"audio_path": f"{ARCHIVE_BUCKET}/{storage_object_path}",
2026-02-14 22:04:39 -07:00
"transcript_text": transcript_text,
"prompt": prompt_text,
"rag_chunk_count": len(rag_rows),
}
), 201
except Exception as e:
if created_post and created_post.get("post_id"):
update_audio_post(int(created_post["post_id"]), {"status": "failed"})
add_audit_log(
{
"post_id": int(created_post["post_id"]),
"user_id": user_id,
"action": "post.upload.failed",
"details": json.dumps({"error": str(e)}),
}
)
return _error(f"Upload/transcription failed: {e}", 500)
# ==================== History + RAG Search ====================
@api.get("/users/<int:user_id>/history")
def api_user_history(user_id: int):
page = request.args.get("page", default=1, type=int)
limit = request.args.get("limit", default=20, type=int)
try:
posts = list_user_history(user_id, page=page, limit=limit)
return jsonify({"history": posts, "page": page, "limit": min(max(1, limit), 100)})
except Exception as e:
return _error(str(e), 500)
@api.get("/rag/search")
def api_rag_search():
query_text = (request.args.get("q") or "").strip()
user_id = request.args.get("user_id", type=int)
2026-02-15 01:21:05 -07:00
query_embedding_raw = request.args.get("query_embedding")
2026-02-14 22:04:39 -07:00
page = request.args.get("page", default=1, type=int)
limit = request.args.get("limit", default=30, type=int)
if not user_id:
return _error("'user_id' is required.", 400)
try:
2026-02-15 01:21:05 -07:00
if query_embedding_raw:
try:
parsed = json.loads(query_embedding_raw)
if not isinstance(parsed, list):
return _error("'query_embedding' must be a JSON array.", 400)
query_embedding = [float(v) for v in parsed]
except Exception:
return _error("Invalid 'query_embedding'. Example: [0.1,0.2,...]", 400)
rows = search_rag_chunks_vector(user_id=user_id, query_embedding=query_embedding, limit=limit)
return jsonify({"results": rows, "mode": "vector", "limit": min(max(1, limit), 100)})
if not query_text:
return _error("'q' is required when 'query_embedding' is not provided.", 400)
2026-02-14 22:04:39 -07:00
rows = search_rag_chunks(user_id=user_id, query_text=query_text, page=page, limit=limit)
2026-02-15 01:21:05 -07:00
return jsonify({"results": rows, "mode": "text", "page": page, "limit": min(max(1, limit), 100)})
2026-02-14 22:04:39 -07:00
except Exception as e:
return _error(str(e), 500)
# ==================== Existing CRUD Routes ====================
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
@api.post("/users")
def api_create_user():
payload = request.get_json(force=True, silent=False) or {}
try:
return jsonify(create_user(payload)), 201
except ValueError as e:
return _error(str(e), 400)
except Exception as e:
return _error(str(e), 500)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
@api.get("/users/<int:user_id>")
def api_get_user(user_id: int):
user = get_user_by_id(user_id)
if not user:
return _error("User not found.", 404)
return jsonify(user)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
@api.post("/posts")
def api_create_post():
payload = request.get_json(force=True, silent=False) or {}
try:
return jsonify(create_audio_post(payload)), 201
except ValueError as e:
return _error(str(e), 400)
except Exception as e:
return _error(str(e), 500)
2026-02-14 21:13:12 -07:00
2026-02-15 00:24:49 -07:00
2026-02-14 21:46:48 -07:00
@api.get("/posts")
def api_list_posts():
2026-02-14 21:13:12 -07:00
page = request.args.get("page", default=1, type=int)
limit = request.args.get("limit", default=20, type=int)
2026-02-14 21:46:48 -07:00
visibility = request.args.get("visibility")
2026-02-15 00:24:49 -07:00
current_user_id = request.args.get("current_user_id", type=int) # NEW LINE
2026-02-14 21:46:48 -07:00
try:
2026-02-15 00:24:49 -07:00
rows = list_audio_posts(page=page, limit=limit, visibility=visibility)
# NEW: Filter private posts
if current_user_id:
rows = [p for p in rows if p.get('visibility') == 'public' or p.get('user_id') == current_user_id]
else:
rows = [p for p in rows if p.get('visibility') == 'public']
# NEW: Add audio URLs - CHANGE THIS LINE ONLY
rows = [_add_audio_url(post) for post in rows]
2026-02-14 21:46:48 -07:00
return jsonify({"posts": rows, "page": page, "limit": min(max(1, limit), 100)})
except Exception as e:
return _error(str(e), 500)
2026-02-14 21:13:12 -07:00
2026-02-15 00:24:49 -07:00
2026-02-14 21:46:48 -07:00
@api.get("/posts/<int:post_id>")
def api_get_post(post_id: int):
row = get_audio_post_by_id(post_id)
if not row:
return _error("Post not found.", 404)
return jsonify(row)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
@api.patch("/posts/<int:post_id>")
def api_patch_post(post_id: int):
payload = request.get_json(force=True, silent=False) or {}
try:
row = update_audio_post(post_id, payload)
if not row:
return _error("Post not found.", 404)
return jsonify(row)
except Exception as e:
return _error(str(e), 500)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
@api.get("/posts/<int:post_id>/bundle")
def api_post_bundle(post_id: int):
bundle = get_post_bundle(post_id)
if not bundle:
return _error("Post not found.", 404)
return jsonify(bundle)
2026-02-14 21:13:12 -07:00
@api.get("/posts/<int:post_id>/audio-url")
def api_post_audio_url(post_id: int):
"""
Get signed URL for original audio/video so users can play it.
Private posts require owner user_id in query params.
"""
row = get_audio_post_by_id(post_id)
if not row:
return _error("Post not found.", 404)
visibility = row.get("visibility")
owner_id = row.get("user_id")
requester_id = request.args.get("user_id", type=int)
expires_in = request.args.get("expires_in", default=3600, type=int)
expires_in = min(max(60, expires_in), 86400)
if visibility == "private" and requester_id != owner_id:
return _error("Not authorized to access this private audio.", 403)
try:
result = get_original_audio_url(post_id=post_id, expires_in=expires_in)
return jsonify(result)
except ValueError as e:
return _error(str(e), 404)
except Exception as e:
return _error(str(e), 500)
2026-02-15 01:52:26 -07:00
@api.get("/posts/<int:post_id>/archive.zip")
def api_post_archive_zip(post_id: int):
"""
Download a complete archive zip for a post.
Private posts require owner user_id in query params.
"""
row = get_audio_post_by_id(post_id)
if not row:
return _error("Post not found.", 404)
visibility = row.get("visibility")
owner_id = row.get("user_id")
requester_id = request.args.get("user_id", type=int)
if visibility == "private" and requester_id != owner_id:
return _error("Not authorized to download this private archive.", 403)
try:
bundle = get_post_bundle(post_id)
if not bundle:
return _error("Post bundle not found.", 404)
archive_buf = io.BytesIO()
with zipfile.ZipFile(archive_buf, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
# Always include core JSON artifacts.
zf.writestr("post.json", json.dumps(bundle.get("post", {}), indent=2, default=str))
zf.writestr("metadata.json", json.dumps(bundle.get("metadata", {}), indent=2, default=str))
zf.writestr("rights.json", json.dumps(bundle.get("rights", {}), indent=2, default=str))
zf.writestr("rag_chunks.json", json.dumps(bundle.get("rag_chunks", []), indent=2, default=str))
zf.writestr("audit_log.json", json.dumps(bundle.get("audit_log", []), indent=2, default=str))
# Derive transcript from rag chunks.
chunks = bundle.get("rag_chunks", []) or []
transcript_text = " ".join(
(chunk.get("text") or "").strip() for chunk in chunks if chunk.get("text")
).strip()
if transcript_text:
zf.writestr("transcript.txt", transcript_text)
# Include original media from Supabase Storage if present.
original_file = get_archive_file_by_role(post_id, "original_audio")
if original_file and original_file.get("path"):
original_bytes = download_storage_object_by_stored_path(original_file["path"])
source_name = original_file["path"].split("/")[-1] or f"post_{post_id}_original.bin"
zf.writestr(f"original/{source_name}", original_bytes)
archive_buf.seek(0)
download_name = f"voicevault_post_{post_id}_archive.zip"
return send_file(
archive_buf,
mimetype="application/zip",
as_attachment=True,
download_name=download_name,
)
except Exception as e:
return _error(f"Failed to build archive zip: {e}", 500)
2026-02-14 21:46:48 -07:00
@api.post("/posts/<int:post_id>/files")
def api_add_file(post_id: int):
payload = request.get_json(force=True, silent=False) or {}
try:
return jsonify(add_archive_file(post_id, payload)), 201
except ValueError as e:
return _error(str(e), 400)
except Exception as e:
return _error(str(e), 500)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
@api.get("/posts/<int:post_id>/files")
def api_list_files(post_id: int):
try:
return jsonify({"files": list_archive_files(post_id)})
except Exception as e:
return _error(str(e), 500)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
@api.put("/posts/<int:post_id>/metadata")
def api_put_metadata(post_id: int):
payload = request.get_json(force=True, silent=False) or {}
metadata = payload.get("metadata")
if metadata is None:
return _error("'metadata' is required.", 400)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
try:
return jsonify(upsert_archive_metadata(post_id, metadata))
except Exception as e:
return _error(str(e), 500)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
@api.get("/posts/<int:post_id>/metadata")
def api_get_metadata(post_id: int):
row = get_archive_metadata(post_id)
if not row:
return _error("Metadata not found.", 404)
return jsonify(row)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
@api.put("/posts/<int:post_id>/rights")
def api_put_rights(post_id: int):
payload = request.get_json(force=True, silent=False) or {}
try:
return jsonify(upsert_archive_rights(post_id, payload))
except Exception as e:
return _error(str(e), 500)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
@api.get("/posts/<int:post_id>/rights")
def api_get_rights(post_id: int):
row = get_archive_rights(post_id)
if not row:
return _error("Rights not found.", 404)
return jsonify(row)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
@api.post("/posts/<int:post_id>/chunks")
def api_add_chunks(post_id: int):
payload = request.get_json(force=True, silent=False) or {}
chunks = payload.get("chunks")
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
if not isinstance(chunks, list):
return _error("'chunks' must be a list.", 400)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
try:
rows = add_rag_chunks(post_id, chunks)
return jsonify({"inserted": len(rows), "chunks": rows}), 201
except Exception as e:
return _error(str(e), 500)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
@api.get("/posts/<int:post_id>/chunks")
def api_get_chunks(post_id: int):
page = request.args.get("page", default=1, type=int)
limit = request.args.get("limit", default=200, type=int)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
try:
return jsonify({"chunks": list_rag_chunks(post_id, page=page, limit=limit)})
except Exception as e:
return _error(str(e), 500)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
@api.post("/audit")
def api_create_audit():
payload = request.get_json(force=True, silent=False) or {}
try:
return jsonify(add_audit_log(payload)), 201
except ValueError as e:
return _error(str(e), 400)
except Exception as e:
return _error(str(e), 500)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
@api.get("/audit")
def api_list_audit():
post_id = request.args.get("post_id", type=int)
2026-02-14 21:13:12 -07:00
user_id = request.args.get("user_id", type=int)
page = request.args.get("page", default=1, type=int)
2026-02-14 21:46:48 -07:00
limit = request.args.get("limit", default=100, type=int)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
try:
return jsonify({"logs": list_audit_logs(post_id=post_id, user_id=user_id, page=page, limit=limit)})
except Exception as e:
return _error(str(e), 500)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
@api.get("/posts/<int:post_id>/audit")
def api_post_audit(post_id: int):
page = request.args.get("page", default=1, type=int)
limit = request.args.get("limit", default=100, type=int)
2026-02-14 21:13:12 -07:00
2026-02-14 21:46:48 -07:00
try:
return jsonify({"logs": list_audit_logs(post_id=post_id, page=page, limit=limit)})
except Exception as e:
return _error(str(e), 500)