From 09b7d59d6caadf4dd56abd15f150a2477c96cf5d Mon Sep 17 00:00:00 2001 From: Gaumit Kauts <123269559+Gaumit-Kauts@users.noreply.github.com> Date: Sat, 14 Feb 2026 21:46:48 -0700 Subject: [PATCH] updates on archives --- .../__pycache__/api_routes.cpython-311.pyc | Bin 0 -> 14572 bytes .../__pycache__/db_queries.cpython-311.pyc | Bin 0 -> 17979 bytes backend/api_routes.py | 463 ++++++------ backend/db_queries.py | 670 +++++++----------- 4 files changed, 464 insertions(+), 669 deletions(-) create mode 100644 backend/__pycache__/api_routes.cpython-311.pyc create mode 100644 backend/__pycache__/db_queries.cpython-311.pyc diff --git a/backend/__pycache__/api_routes.cpython-311.pyc b/backend/__pycache__/api_routes.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..7fd670cbff86a594e60c323b45d912811cb7a768 GIT binary patch literal 14572 zcmeG?TWlLwcJtyxq{PwFlx$k2-jZ!emX+9!qsW!yN0RN@sqI(^<1RsSD2b6MDR)M( ztw@Ft1g#SoSTB%8)L;u}fyOIuQotWw)J1m-OWp$6znNfM8&l}3U7-GB;r&?Tqv$zz zI3#CCDppphQ6LxOnLGD!?z#7#*FATBQBmPzAhnvmzMe0&JB0E@yV-lPsB!ZIb3->LaVLpeynIiR4 zx8zMUNM(tgV}j)Sz!lk*s9y|l7^m<`T0c zIjtC;x8<4X56UlcI(Xr%I!lbKF(qbH=a-V?_=F;7 zT4vEiFI|l$B}2rPh&Fm@Zdw9*0$6~kGYzVzl2DmXvTRhEWm#OZA!bYHG^~RBiOkIc zX|{<3=VoD?9P%vrb&iV64EWe=a$e{K*u@&YIWiTUC5f@*mB%;1ycnKTLY8CW3BxTb zF=bAUb9t&jfIQl-C!GL3VU(;rV!&oDFiF&(cAJ{D?J_8+)-W;$`4!0ui=o{%S%nc0g!yc#`zX5`$(k=WFD zj9iVr9E(rN7jtZM@e&jyrlpH8uc*m!J+oI0ca)Ay!Y~X&=HSKX?=M-_UWKrJ)x>77W#rj&^|zF8?d_f{xz&Ryc1bBB5Jr|A?8Zf26;*tZ$d z1P`(YfZ;ZWAZ_r{41c)}z!LL&AnfI3Nv&zo{H?mbRTWxkr+IfTa+7Q$$qilTb{v9@ zu#@z5c5BeV`GUU%>->3kD`(jxv&fV&iscXiBEnFidR+1S;onV4j-i441&EFdDQ=4m1F93Vs!EdMhA;Dtb)9%x)S zf2U0i9MJ+t^uQ4{FnD)(=~S9?xmq^F9Y1|<<**jqrw8|G;(lG+zZzG?KGoaz#|?j7 znqgfnIj|wt{WQMZ^pmlV$Cl%o*sP1qs@M7ZBcu&~ZLSy1J$2VK4?_ZqC*45*r@N0Z zcYVY4{2CV;YUS4&SyojnPJUyd?$7Vpk_dNa>xq>Gm zV5Pfm>9s>pUjzSG?wTu8ozG1mT^Ed1M=B4A0{35P-53ISm zqhqy36T5Y>TlIFM#(M6B_A<=% zAP6JqMQ{K?AApi#jXOjJpv54z`Za*35$gxR&efn6?9+pNns`tb58fG3#lx!i@H3Wc z@;pqyW`R%zQVX;L*d1gMUagd~Kr30hW7XjkB~d{OmPufv08{RGytp-Ng5=s18@nai z+;)q%$Ze^&$3}_Qu5T83u)D}jnY6$xunXJ*|3%p?&}^ThcB5%g$hW2o!h(&%6;wT$ zBypg%V2yM3UGh2J{XgUFGotpp(!!9j-!7#lSE8Upl1U3LdvB@Sjz43+UDsUHe#7%J zYgyQ?iqnQL3m)ZTRN`n3rNmflZb~r(aw@4@C8$cl zVH}1>mq1W{$OwWH2(a?81OUzoE~*CeTI|PiziG0C7#Wuh9(1q4gJ~gE*c;c_;8V$I z0I@5k7>BU}rMkR+7K24B|)Usr<{;HQbB zx;UzON4LZN%jn^**A1~FLFRtYcihY0Zwejf`TH-ikpC_30tgnLu9j)>F$?D?u=ttW zv$XiP`O{yEuK`7Q7M~n}DS^dDr#5HtZ*$}|C@^?r4%Bi51|O$GEs17iVIO><&#+fot#a*CVNDgBC>jrcW*J11Y^Jf4FdC zLG|w17BOrF>OO56@-x2<09f<;h6ebxy3o*J@W5HjA9ewp-&IT!d(A+Gjp=YvsI|WY z89UQ`7oImc3&ViS;XyFBbr3zEGUql4w22)(QBe(IR+6`%>m>Xw$*TZ0t5vk=6>Uo= zANi|FOWvUmkKH(?3Oh|#duP!V!oZVkTcE5|9syftU!bi^WxlKwN~m4hN*>9%d?&&t zaSnNPN_WPjbJyUg4@Yx&7CDkXJ@(@AY{PCj3?&?JT-#y}_-#Hvi)KZa9=$Lc zh`U981XXj$5B~ySbHO!F*bjmotDZaHjK81jLYevJhjjSwt9yImeIi)oX=uHD^@Fh(g)Wt#7JGd>18H14T?+!C7zM(4q zm!Y9@e$CB7cCFk6aDI1Dbz{YKi*IA=x6(KZY>C5}VAn6`4=maJ0hiN8pF<|f^9b@a z59lw)Hrw%2Z=oDkC6kJH3wZ~g1*f;*v3n*N7Ao#7cr9$RnzrAPpGx-hqJ8^Sb!F>V4kg^T6iM@9VjgLEIzs}uWl8jHoioz0Ig;cuC@ zGqwfGx)c9K2hP{f->o}QG`j!v#$G5Yjzx`qm&+2zLYE!+h3r6Z06`c5y6ogH0mwEh zPqjMAlevmy0$ThWTU`UNnKY$2Q{O)b?q40fGp+>(_28f;9@8xoUyDBCw~3=$!EhO` zxjqdYX8zGP)XAf`wD4=YS$ma+46pU6`Z5=jmftNf#bNcY!%*9-_)hl!EwuKUHfjx(vm&)uj z%2-n2nm|th@XR}R`z+Bl+~)4g*LRqij&N!Y!Y7v!;aC_0W2PbP;tX$c8t9&6M;!9c?YWh0o(l!z-GGQQAO>K-}>mS<$A56S+8hbI{ABl-OaJTl{9~g?x!l^ zFlFxpcUm;@kS-oly@xVUx!ud-EAMJzk1qDmXxwKbtHRG;`MX!{)~dll_?a5#UX3aq zSG~u#&p6i}7Q_n=VERqdXgiezp1-fcG?UlAA_OD zpCK3l@c2v2A)i7xW6&x!MWM`$T44m4P*3v*n4`w926J{}csmon$|0dVkIfw{kD5Be z@^}Qk4}bYH0KoG8>ZgoQ$A81q2=#zoKCpDcaxs^^pY`79xF>vhLTx^)HJ{a+&#FQ2 zfX?gUdDVM6xW}okR9JKZ}oCi^4e%~ zVsl)NN@a!W9WG0K_RG?zQt(1a-^%F)`M2N&m7g{}kCP}5crRH@-;+~N0e;NPxD1z> zui2ZTL7=Mt_9m(yT{?&A@5=Z*xj`wWl0HxF<3h3k*!~^<@&*9v^OU$%EPdYyj{Bcx zG~AzwcRgPQ)zP=q_=FlxYT=|FPO8DTHE~iGCsps{b{GyB4aa(@X?T!XAM}m%^7p-= zk-hx=Ru=PnT>$5chxbiml2w_2=6fIMsgEby;3H9)A@i{Aa0f(1CgP0^{iM5Ne!}T?rZ_`lTYF z@h)_F%}Z>0tA-vgsYF0OjG#(}I+?XtMgyA|fHAz$=yWWVh(<{eD`*skx}{-!rCw?q zwiD=mH-^XPS8nvvCvpZ~FJQHc-l5X#Q+hQ=?_)>|t9Xds&>$!o!{!qRFrYw>X9OKq z>NQZ!MR%06Vp$Uc{M_Jc??7rsnwv@tlD~pi6kGWr@CAfnST@aZtdM39cp0`wWx7lK z(n2-MrJ4V)pp)@bE+uceZeCeEsk-|$cfaoLU*gklmhFW=7TbGsgg#bk=)(!T4Cngm z;QPVr4H{dcvo&dsW!tb(+kf*6SDP&<$9DDa*I)0n+Id(w00YRi!;;zzKDBI*#)fq^ zoaO|!VHujjBL}cklU19p-Xikr^%`5Pv(;(NheQBCd_aDtWM?rV>P;f*kq8sy6H%WI zV5f%n8&rRj#y0D0bDHzAEz7Wa@W=t|)MC}9cVtIu*Vqo7?ZD~oT0Vz^?8*V`)M?db zr>pw1onYyZJZnv}3u|_=FKnB}w(D#=jzr0W2aaR{90}+!Yt4~hchxf3wl1CRN^@n< zDV}EFkptLiuT`6_&(0~^304d{Sqn#VxlUM2w~&sMos-KX4<2d80=b32lFIBz*-kjp zNRg4O)ys?ooT&ObG!}|G)12QN2_9+20yq-P*sM*53P-{vhmkf}a=H*(;Q1U|TC0mh Xm~4q9xrNPcVJRZaEws@_72p2?qv3z4 literal 0 HcmV?d00001 diff --git a/backend/__pycache__/db_queries.cpython-311.pyc b/backend/__pycache__/db_queries.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..12f27028675d602ca5b65098828c4560ed3b764d GIT binary patch literal 17979 zcmeHuTWlNIm0%TL;=81xMC)zZ@ z=&wEJ7Adldlw0YU4f3&9;;Fi~PTl9Z=XvYzJsu|oY1`Z1kH%Ul>Yp)_Pu|?km)ooq z^@O@eanx0cqd8NIzDmQ@95aoYubM|KS1qH~tJYE5RU3`ZEwPGG`&B!+x5gZ!&a2K* z*HsrynJC`=Q>gT3`4%WrM&EMh3pm?PX?PAlEsfJ$Maq-U6<_uBT=hcT#r4>~VLc4* z6-+!Mx}d&F&Y`d6do+}~crVZV)C9eRpO&sVcqd=QIgdh?bMa=*&0Bd3&+_J*rn)>5 zRk*up;XEH%uKIGgwy(kU0&X?O@HJc|{M@{suih=SEJ-1<9b=LxJ1J}Mfx(43HvxS+09@_YtqWss*tdnc}$a=Mr zwBA%Ya+rsCGhxEl%XCY~+3IyP=I-~@8@y}rK| zt_5&6{gM6M0`F{IXZ>3Nw}s$tdp%XdSYAJN8@C5j^Agt)tKZ5>t zKr7q0J?7Um&X(2wGHgPyC*0>5E1Z zs^x4{OsMv^#}m=9csLfMRaa~*%!RnI1RuX0G^w`USd@<^R8K_U!wEhV(X#Hri|_QD z>=`^2x_JJqN~v|L`N31?FP-i^6*~X+*;ApnPQ5$Xe=aJD(fCkc6$M@ij0uIj@s$8H z9TmpnqtI~Rc36mpZ^U@9UA2q3k*jur7stoqBCpy6J~1K0BdaVO#t7nsGQ;mnkT@V% z5=GCZP=HWBg{MD543fs_Aqu9}G@x2=Mqv&rhWJD{kr0BkfI>>d3IbnU>OFkzqR0#4 zwKqrhcJ8_McAW1MqPO{LeY`l57#qKK?p~dI9APj2Qb7G?9S|Rd6q2c_1M8 z;G)_>H=}}>5H>)j9v=n(U}r1mY9AbbbbO|7*|UmMuaJAgPw| z@DQ(BW6{xQBC;ym;E4@>WB`8&H~Il^^c1C+h0|1Gb>S%8(gH56$K_1=JPaR}e-bp3 zDS8r!WZE=kf;(tcE1fn&9%L#^hk4)&T=XwE0I&jh)I8TSH}QSX zC!VAzyIT}@%gm`|Pt{z{gDc4z+1(5|Co{M8!LH;c+1W7Dy9_hkGJ9CE)M&F>F&@5$ zJ61;$70?u!(M1@<6C`=lkO>#Rk9uzhjG9UTLTQ>Z>B&X(70)#Xv-3lWv;4@c$NC=q zXeZ8VfutZ;q?fGK8fVqlvnHNFXIm4mLSG+ee`cLFCvahMDOE%{`7&60gBF%5dZU0f zWf*k|nM_I2YmL*EM3HVeWzqL<+B#(wnFLd)gmdaKzDG}4ffii@suh@cG^TpS!=iX+ zOyEMpVR85?diz)2?SZHm5cm%!q5{vgt8OkTj)O=D#lxe#>I~loh9-n2gqUi+5giL! zg)PvWuoVF^M0~gfT(D zv8mPsGH2Bm1!fF;p@P55M<#&mkwK{L^AqtzbX3Ft%qDEcLg3w6Z&mBfC?DeplOgOz zt8=+HvqD+bA{dPt^d>{$=)n!XA*^2Ug7Rc*Rb!b;%7%)u!dTKx(5> z*Rfo?@ww}%D>We3b}F@<%l@V(BacT?ZL)v2;@@p-!ew*YRww{7eSfH=Dr-Of@ZpEa zA(`2xFx#@st_-tlo|BnQh3U*PM>5P2nK`B~$7cFpdaLKI$lhkf+dT8;vWtbjTDx9) z>!pTH*}G5i?vw2Mpm=5=x9M>nLQ(yuUuC1eg=x{V(L@ouO%>e(VlHvCp7hq(Dh38o zeyaeZFfi@%n}suMh!l>XB?`7EcWHzL* zA=!0Zab1@z*U5tFm^W@K!n|>%o=`);T7OLEn0MRynKx&;O$qB|;JbijTXhWFR-8*g zmr4_7V7O~CW;?K07)!yRT@%yExz>s4)>FcoHF`L&u||$r$Ji>@S%1|!@mM_`=X>T_ zmvE}qzgK?8{YivVjlL&2;p86(n#nc-Nf1qdB=CxfF#$Ba(73?gjNS#cT5tjF2-^^B zM?m)J3Aj^T94|(MC^0@%=k2H%y%CK?6Zh1LaZs#5dxDJ%iui=6+9$?Q%i%fIBk&QB zfIQeJh+vO}<3kguS(^B`(1W!yqv3e;CNCyJ;&6Cx*M8M=V}-Rnhg3c;HQ`VnBg!Wjf_AwX45b%iIm=vZicOiYNAjU{4}JPqK}cA*Vl z>I6_C*s5}Ztx%UR0C;_fI{?ZXh&QH7BLAiG&BcCvT;@oinMs?|i@hUO>tAKy`{e2lrMhFeev`C$uTtN++_3rik*7zb z?FZ$CLrTLT_ig=xz&of)Px-6u1L6y{Kt>B%rXGSjOt zz2&9n5h6Vg$=<_?_poF?Tr53tQH;{l0y9py?(g6Rq$j|9O2ImmrV@!+UalG?W_cB~ zRAOR(N|I4xPSXk8Te&nve^Uv14r=&n?~XR2>xr#7?V>F}w9d9bu};|*DCm?y;ca-4 zh}>XBLD~xFrxcDMKt`*{Vj^%&?#PafieuyF4)4R7*$*DnKB`@D zP*&$3+?1Q0+xleJ<6Yn1{mE|GwOMg(mMoiz{3S#90u5iX`y<7Zze*UBKn8lXY6|Q~ zy>Ph(L@;fcw*J3TxTbAWCc!^tD=!AnLeuvL@XfEmw}9Z#31+~zt)Z-0Iopd0J$89= zo3cz<4R~memL$DmH(ilHXG1Pc8B_~U^y~uylBw`51cXPU98sM)lL1)yh0kd4;DAK= z%t5T~aulgrQFaKbRRFQU6E#qCc-RYeeW7*%4R;}c;5-8246?%=F-=yjlS+^##ndYb z*8nF9KXD!a=#5qN$$hCBxoW#owf(ziW=_p^zI0cAJn(QJc}#W(6?ZV}-ji|fkve$U zeN%DYoH_NXq6&?G&gIIQC-%qoWMgVluH36s?ghK1!i$e~no<3 zm^?@4x!L=9L*r0}Ikd2K(I+#f6y{WxIhSG1$;{gd^ER-~yyHf))aPxBaTKhg&PB7j z2`<1U_$$j3Abc=gi})gF{E#aIRtBkI&|CkGerTDs+ywW-N8oiT&qwuS&`0U>=%V?$ zmeyh78;$qk4Gt@$C8PUejg4R_v2JYn8j7!mahQt@eWSic?3iwhlnEyg5bc8~9)!?| zT7o)``lbOS(UCz52YK^jkDjHqw$(<)EF_@;wV*u zb^*D+YKek?d_N78e5=N*>I-xl&7cj;{_aIrMz=>F5u}o{d3~ z?Cw_F-Qd1+Rg&GRujvmq%28D!%;u~QnjSTkNV3X@8y?g>szW*E(d1aG>}pV44U(ln zvmc9f4`jAPlbMGdxoSVcJ|fOPGwee^C{0G1G&ujjF$&HZr1UnN#1&DjfWagJv;aI;wo<5|OVVQ{UL7obKLU3m^**x6Q{lh5GPM1g8>^11Y! z66`1&?0+}dQL0540}Bcq<%waie2DQ>CNu2{)1GCzGE7$*oOVYP=17+5%`m;1 z94&8l942PRLD_pq@g9=whYDs#37;l=bOk2j&)3@EtS@W^RSsEcNiysRTDKz%o<*Sb ziUDC9>JSbiAe4pr{Tsq%fNK~MR}qzP`&@X6yhijfL~zn7joMp8pJf;uQb| zBjP)FAu?rcomvUEWL9bQa$vz5k zLyRHnlqVasOBS1hu7c_W$`eOFGfkU{6{vMs4C^!{_3WiQqx;jjXd$L%7A}2*(Ur9s zIO`N0H4xOgM~LfCk_@D4i68$4eMOU?B$cR>>uSH4CXC`J7coKwFPDr$H0PA77=QpS z*&*o1P%HCHI8UD1Gjx>;h%mXiAOR-EF^;n;1oCgR2b~6=y3xufn{x=O+=E~*fV?2W zHi)!P?fhLZmEbUd7Vj}*;2Wm}g+_i4Z_v81Dmn695bgou0{p}ZP;5YYY-pbEmt0-& zKhyUcrsmV(C-KK|Y3l*me^Bus%=(XK{Kpr;hjm8rpUL{qW&G!sqSCw9Wq(-lhqL~n zjDJWPj>`U9ivL#DFJ%0J>`y5Egv?AR%mldGDwq|@Y&}Q6^ff%`e%vi>-Y5II6kk`? zcQoTWx^Qc$RXR2(`z|QH3t8XgjPJ5^B`ig5$-WWAH5_~pD!tHB;=Q>V=CQ`mi3_DF_3vd}BDClvNX zmhH{3y^E)p?#S$Ag}tnWcck}2GJ9QNuV>le3_C2dw-ol4>>5#ABa&sLR9i>ZL6++} zKutKLpr|xq5MM8g*VLE|*6x~khWOi>cm|zzO*})Oo~r=e+*EvoU>$n8;b7$&q-6}* zmnSU8`Yd&0*gaKN^t_G$iI5xr2BRx0I5^iw)RZaZD5U|);2kk~OZ3~Ky!FDB&dNH* zJB@#Z!~>Y+=cB$Wupb+uniM{5X1JadpsoFP7#6O5h#)dxo>LgF|d9)S=#*ijFC#;|a0C;PvPig-@;F94~RXV(I=)cF)@wsm$=`>u^ zal*(Z18eFHxmJR1VHZ@mZq?x?BA4h7o3XSC!5svs^$T|al&B6hWydP~1>pV<#2*H+ zHAB7f)h}<{_Wb_S`_j%6i(BQ5{mRDvWncZ1Bae?H@5#QP;tLuhRfZ6m-mo-Ukp8?X z(k1)%EB^gi|FMk!*aD~;{ffUo>whcbe{1QqboqVRA5#3Gte?;LdFkem>>pPA!&(1$ z#y>9m1qBomMpPIvr;6CEZ_&Te+{RMFPTAe2xZ6JW_#R%;mMX>l{jDG0`Y*Ac#2_zM zpHcvN-cUSmeC}Z$o}Y6h56GSt#nbY+r~2XL2k$<57yL<%s#!Z2GS!WmezRBhbt=Bj ztnYBfcX$D`ofC@hMAp}z@%78TGm7s_dDY}N(M*oX-fqR)E!n&Ozh(l&Mv|tP{0F!% zrfnTXs(jl{Z6?SiyJ(OV&EwspNx59R3*`5sBvlv{u zoIiB(?F{>E=^C6nd0%1Q&$2f&>`j>+RzNp#MHN?6vP8*dR&9~tiTH@#@9%FV2x0OM<@( zXZS$G-HdX4Ji@CEe)I;!=Lm z5(JxJ#tO!$=3o36o))oOBx=pSD7*`|d0MYUtU3gRKritrVsAl>f~2{!8RAzgqR+np z1RB~E@$`7v0B`=V!jx>^qS&{5ouBc3ss{ax+Z6}6B+>CWF&B{?^@^ju;CA$&+p)65 z?WofyvFC)2l7x=dJQrs$^dUx3M2$uRAGBvG1r?Vj&d7V$!ZY$-!-4TsY1L3-JR%O9 z*aBx>aIfT2s;JlbGI(!&oYyc1yHJJU+XW8^+vF=1eh3(qNa_CoHzY7h4xQbVT>-@vz|iL2m)=^bZjbEk zP`n+Iy<^1&C(Lxpv_ZGd1zrp)_lm=caTov)hwxj(kP?#NArX$02q_@b-0fjk` zWsYZ<<1*8uFg?Is%XrI5<1l$=na~L1SoaAZ2z%-o)Suvl5B1&Go$xUnt%d+2{Th6W z@r2T!_X(e;ur@_(n~*xjhAxoUkjA_1Mcbej@eK_8btiQ+KEAqRMBztJ!DgiHzk{2y zd^~?p=ZNgyuDG{n-R&87`#gNB)2X;S@tlrVs;Dn>6bDY}lz;GVKck~#_~>FG3?Eer zy}#FZC4?j@Kba#yEoz`id4{&XGPAMl4nIOEBqpeGrchDKef{xS40fU3#%Rj zm2FLgqXRekTNoO|G79Vb36z4{gK)yt(x453=5VMJzbJu-HVJCLmw}q!g{T5ZSRnNW z;X(p}0y=vIw5+n^M*GvbuGCgLX*vCA%dBHrg(Q-SyJHH8qs(S-2qi zPs;w2ivQ%xnl1B}7djSurJ8=ZreCS)FZdWhCAHd!$p#VuCrP*fU;sltBu8e*GBBX! zklO`=YDEY>>5POzs%cDAZE$P=V=zcI0t!>;vR#h%z#D?*Ae%RYL3EZq}Bu+`PUrBJ-W8;h!`sjeAI$5!g-i z*118PsjbN?DQ>ko$8;zkZwtf>DCwTD_f3Y@56K% z=&prk5~<#KU#kDvhbt6h(_7NwLT~!+FWO1g2=-UdEvXjRaCFOjYkJ>;d0}wD{)=v9 z$4OFV1l3Nu0qjw_A=ybT^V(&xcB$o8O{7G+J}gCJ%Joq#cW-)yqP|1-n&8rBdJ`|F zO=q>-S<~;Cf7z|wItLM3N=YlzX@I_n+ z9wTLoTNV>bz0$WXEcO5DL*?WpQfLG_@=Kb8$$|TM=mT69w96v5v`>0xP`Y?Udgsco zyOlSt7Oi}N8k!lJfz!q2)L_c~tb2umY@s_OYXnc>bmiOH zb$CR(-ky~1e+Ou@$JB?k`G)B") -def get_user(user_id: int): - """Get user profile with stats.""" + +@api.get("/health") +def health(): + return jsonify({"status": "ok"}) + + +# ==================== Users ==================== + +@api.post("/users") +def api_create_user(): + payload = request.get_json(force=True, silent=False) or {} + try: + return jsonify(create_user(payload)), 201 + except ValueError as e: + return _error(str(e), 400) + except Exception as e: + return _error(str(e), 500) + + +@api.get("/users/") +def api_get_user(user_id: int): user = get_user_by_id(user_id) if not user: - return jsonify({"error": "User not found"}), 404 - - stats = get_user_stats(user_id) - - return jsonify({ - "user": { - "id": user["user_id"], - "username": user["username"], - "email": user["email"], - "display_name": user["display_name"], - "bio": user.get("bio"), - "profile_image_url": user.get("profile_image_url"), - "location": None, # Add to schema if needed - "created_at": user["created_at"], - "stats": stats - } - }) + return _error("User not found.", 404) + return jsonify(user) -@app.get("/users/me") -def get_current_user(): - """Get current user profile (requires auth).""" - # In production, get user_id from JWT token - user_id = request.args.get("user_id", type=int) - if not user_id: - return jsonify({"error": "user_id required"}), 400 +# ==================== Audio Posts ==================== - return get_user(user_id) +@api.post("/posts") +def api_create_post(): + payload = request.get_json(force=True, silent=False) or {} + try: + return jsonify(create_audio_post(payload)), 201 + except ValueError as e: + return _error(str(e), 400) + except Exception as e: + return _error(str(e), 500) -# ==================== POST/FEED ROUTES ==================== - -@app.get("/posts") -def get_feed(): - """Get personalized feed.""" - user_id = request.args.get("user_id", type=int) - page = request.args.get("page", default=1, type=int) - limit = request.args.get("limit", default=20, type=int) - sort = request.args.get("sort", default="recent") - - if not user_id: - return jsonify({"error": "user_id required"}), 400 - - if limit > 50: - limit = 50 - - posts = get_posts_feed(user_id, page=page, limit=limit, sort=sort) - - # For production, you'd get total count from a separate query - total_count = len(posts) * 10 # Placeholder - pagination = get_pagination_info(total_count, page, limit) - - return jsonify({ - "posts": posts, - "pagination": pagination - }) - - -@app.get("/posts/") -def get_single_post(post_id: int): - """Get a single post by ID.""" - user_id = request.args.get("user_id", type=int) # For privacy check - - post = get_post_by_id(post_id, requesting_user_id=user_id) - if not post: - return jsonify({"error": "Post not found or private"}), 404 - - # Add user interaction info if user_id provided - if user_id: - interactions = check_user_post_interactions(user_id, post_id) - post.update(interactions) - - return jsonify({"post": post}) - - -@app.get("/posts/user/") -def get_posts_by_user(user_id: int): - """Get posts by a specific user.""" - filter_type = request.args.get("filter", default="all") - page = request.args.get("page", default=1, type=int) - limit = request.args.get("limit", default=20, type=int) - - if limit > 50: - limit = 50 - - posts = get_user_posts(user_id, filter_type=filter_type, page=page, limit=limit) - - return jsonify({ - "posts": posts, - "pagination": get_pagination_info(len(posts) * 5, page, limit) # Placeholder - }) - - -@app.get("/posts/user/me") -def get_my_posts(): - """Get current user's posts.""" - user_id = request.args.get("user_id", type=int) - if not user_id: - return jsonify({"error": "user_id required"}), 400 - - return get_posts_by_user(user_id) - - -# ==================== CATEGORY ROUTES ==================== - -@app.get("/categories") -def get_categories(): - """Get all categories.""" - categories = get_all_categories() - return jsonify({"categories": categories}) - - -@app.get("/categories//posts") -def get_category_posts(category_id: int): - """Get posts in a category.""" - page = request.args.get("page", default=1, type=int) - limit = request.args.get("limit", default=20, type=int) - - posts = get_posts_by_category(category_id, page=page, limit=limit) - - return jsonify({ - "posts": posts, - "pagination": get_pagination_info(len(posts) * 5, page, limit) - }) - - -# ==================== COMMENT ROUTES ==================== - -@app.get("/posts//comments") -def get_comments(post_id: int): - """Get comments for a post.""" - page = request.args.get("page", default=1, type=int) - limit = request.args.get("limit", default=20, type=int) - - comments = get_post_comments(post_id, page=page, limit=limit) - - return jsonify({ - "comments": comments, - "pagination": get_pagination_info(len(comments) * 3, page, limit) - }) - - -# ==================== HISTORY ROUTES ==================== - -@app.get("/history/listening") -def get_user_listening_history(): - """Get user's listening history.""" - user_id = request.args.get("user_id", type=int) - if not user_id: - return jsonify({"error": "user_id required"}), 400 - - page = request.args.get("page", default=1, type=int) - limit = request.args.get("limit", default=50, type=int) - completed_only = request.args.get("completed", default="false").lower() == "true" - - history = get_listening_history(user_id, page=page, limit=limit, completed_only=completed_only) - - return jsonify({ - "history": history, - "pagination": get_pagination_info(len(history) * 3, page, limit) - }) - - -@app.get("/history/searches") -def get_user_search_history(): - """Get user's search history.""" - user_id = request.args.get("user_id", type=int) - if not user_id: - return jsonify({"error": "user_id required"}), 400 - - page = request.args.get("page", default=1, type=int) - limit = request.args.get("limit", default=50, type=int) - - searches = get_search_history(user_id, page=page, limit=limit) - - return jsonify({"searches": searches}) - - -# ==================== SEARCH ROUTES ==================== - -@app.get("/search") -def search(): - """Search posts.""" - query = request.args.get("q") - if not query: - return jsonify({"error": "Search query 'q' is required"}), 400 - - category_id = request.args.get("categoryId", type=int) +@api.get("/posts") +def api_list_posts(): page = request.args.get("page", default=1, type=int) limit = request.args.get("limit", default=20, type=int) + visibility = request.args.get("visibility") user_id = request.args.get("user_id", type=int) - results = search_posts( - query=query, - category_id=category_id, - page=page, - limit=limit, - requesting_user_id=user_id - ) - - return jsonify({ - "results": results, - "pagination": get_pagination_info(len(results) * 5, page, limit) - }) + try: + rows = list_audio_posts(page=page, limit=limit, visibility=visibility, user_id=user_id) + return jsonify({"posts": rows, "page": page, "limit": min(max(1, limit), 100)}) + except Exception as e: + return _error(str(e), 500) -# ==================== TRENDING ROUTES ==================== - -@app.get("/trending/topics") -def get_trending(): - """Get trending topics.""" - limit = request.args.get("limit", default=5, type=int) - - topics = get_trending_topics(limit=limit) - - return jsonify({"topics": topics}) +@api.get("/posts/") +def api_get_post(post_id: int): + row = get_audio_post_by_id(post_id) + if not row: + return _error("Post not found.", 404) + return jsonify(row) -# ==================== BOOKMARK ROUTES ==================== +@api.patch("/posts/") +def api_patch_post(post_id: int): + payload = request.get_json(force=True, silent=False) or {} + try: + row = update_audio_post(post_id, payload) + if not row: + return _error("Post not found.", 404) + return jsonify(row) + except Exception as e: + return _error(str(e), 500) -@app.get("/bookmarks") -def get_bookmarks(): - """Get user's bookmarked posts.""" - user_id = request.args.get("user_id", type=int) - if not user_id: - return jsonify({"error": "user_id required"}), 400 +@api.get("/posts//bundle") +def api_post_bundle(post_id: int): + bundle = get_post_bundle(post_id) + if not bundle: + return _error("Post not found.", 404) + return jsonify(bundle) + + +# ==================== Archive Files ==================== + +@api.post("/posts//files") +def api_add_file(post_id: int): + payload = request.get_json(force=True, silent=False) or {} + try: + return jsonify(add_archive_file(post_id, payload)), 201 + except ValueError as e: + return _error(str(e), 400) + except Exception as e: + return _error(str(e), 500) + + +@api.get("/posts//files") +def api_list_files(post_id: int): + try: + return jsonify({"files": list_archive_files(post_id)}) + except Exception as e: + return _error(str(e), 500) + + +# ==================== Metadata ==================== + +@api.put("/posts//metadata") +def api_put_metadata(post_id: int): + payload = request.get_json(force=True, silent=False) or {} + metadata = payload.get("metadata") + if metadata is None: + return _error("'metadata' is required.", 400) + + try: + return jsonify(upsert_archive_metadata(post_id, metadata)) + except Exception as e: + return _error(str(e), 500) + + +@api.get("/posts//metadata") +def api_get_metadata(post_id: int): + row = get_archive_metadata(post_id) + if not row: + return _error("Metadata not found.", 404) + return jsonify(row) + + +# ==================== Rights ==================== + +@api.put("/posts//rights") +def api_put_rights(post_id: int): + payload = request.get_json(force=True, silent=False) or {} + try: + return jsonify(upsert_archive_rights(post_id, payload)) + except Exception as e: + return _error(str(e), 500) + + +@api.get("/posts//rights") +def api_get_rights(post_id: int): + row = get_archive_rights(post_id) + if not row: + return _error("Rights not found.", 404) + return jsonify(row) + + +# ==================== RAG Chunks ==================== + +@api.post("/posts//chunks") +def api_add_chunks(post_id: int): + payload = request.get_json(force=True, silent=False) or {} + chunks = payload.get("chunks") + + if not isinstance(chunks, list): + return _error("'chunks' must be a list.", 400) + + try: + rows = add_rag_chunks(post_id, chunks) + return jsonify({"inserted": len(rows), "chunks": rows}), 201 + except Exception as e: + return _error(str(e), 500) + + +@api.get("/posts//chunks") +def api_get_chunks(post_id: int): page = request.args.get("page", default=1, type=int) - limit = request.args.get("limit", default=20, type=int) + limit = request.args.get("limit", default=200, type=int) - bookmarks = get_user_bookmarks(user_id, page=page, limit=limit) - - return jsonify({ - "bookmarks": bookmarks, - "pagination": get_pagination_info(len(bookmarks) * 3, page, limit) - }) + try: + return jsonify({"chunks": list_rag_chunks(post_id, page=page, limit=limit)}) + except Exception as e: + return _error(str(e), 500) -# ==================== ENGAGEMENT STATS ROUTES ==================== +# ==================== Audit Log ==================== -@app.get("/posts//engagement") -def get_post_engagement_stats(post_id: int): - """Get engagement statistics for a post.""" - engagement = get_post_engagement(post_id) - return jsonify(engagement) +@api.post("/audit") +def api_create_audit(): + payload = request.get_json(force=True, silent=False) or {} + try: + return jsonify(add_audit_log(payload)), 201 + except ValueError as e: + return _error(str(e), 400) + except Exception as e: + return _error(str(e), 500) -# Example of how to use in your existing main.py: -""" -# In your main.py, import these routes: +@api.get("/audit") +def api_list_audit(): + post_id = request.args.get("post_id", type=int) + user_id = request.args.get("user_id", type=int) + page = request.args.get("page", default=1, type=int) + limit = request.args.get("limit", default=100, type=int) -from flask import Flask -# ... your other imports ... -from api_routes import * # Import all routes + try: + return jsonify({"logs": list_audit_logs(post_id=post_id, user_id=user_id, page=page, limit=limit)}) + except Exception as e: + return _error(str(e), 500) -# Or import specific routes: -# from api_routes import get_user, get_feed, get_categories, etc. -# Then your existing routes will work alongside these new ones -""" +@api.get("/posts//audit") +def api_post_audit(post_id: int): + page = request.args.get("page", default=1, type=int) + limit = request.args.get("limit", default=100, type=int) + + try: + return jsonify({"logs": list_audit_logs(post_id=post_id, page=page, limit=limit)}) + except Exception as e: + return _error(str(e), 500) diff --git a/backend/db_queries.py b/backend/db_queries.py index 444c7be..f1138c5 100644 --- a/backend/db_queries.py +++ b/backend/db_queries.py @@ -1,463 +1,317 @@ """ -Database query functions for VoiceVault backend. -Handles all read operations from Supabase. +Supabase data layer aligned with TitanForge/schema.sql. """ import os from typing import Any, Dict, List, Optional +from dotenv import load_dotenv from supabase import Client, create_client -# Initialize Supabase client -SUPABASE_URL = os.getenv("SUPABASE_URL") -SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") +load_dotenv() + +SUPABASE_URL = (os.getenv("SUPABASE_URL") or "").strip() +SUPABASE_SERVICE_ROLE_KEY = (os.getenv("SUPABASE_SERVICE_ROLE_KEY") or "").strip() if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY: - raise RuntimeError( - "Missing SUPABASE_URL or SUPABASE_SERVICE_ROLE_KEY environment variables." - ) + raise RuntimeError("Missing SUPABASE_URL or SUPABASE_SERVICE_ROLE_KEY environment variables.") supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) -# ==================== USER QUERIES ==================== +def _rows(response: Any) -> List[Dict[str, Any]]: + return getattr(response, "data", None) or [] + + +def _first(response: Any) -> Optional[Dict[str, Any]]: + data = _rows(response) + return data[0] if data else None + + +def _paginate(page: int, limit: int) -> tuple[int, int]: + page = max(1, page) + limit = min(max(1, limit), 100) + start = (page - 1) * limit + end = start + limit - 1 + return start, end + + +# ==================== Users ==================== + +def create_user(payload: Dict[str, Any]) -> Dict[str, Any]: + required = ["email", "password_hash"] + for field in required: + if not payload.get(field): + raise ValueError(f"'{field}' is required.") + + data = { + "email": payload["email"], + "password_hash": payload["password_hash"], + "display_name": payload.get("display_name"), + "avatar_url": payload.get("avatar_url"), + "bio": payload.get("bio"), + } + + response = supabase.table("users").insert(data).execute() + created = _first(response) + if not created: + raise RuntimeError("Failed to create user.") + return created + def get_user_by_id(user_id: int) -> Optional[Dict[str, Any]]: - """Get user information by user ID.""" - response = supabase.table("users").select("*").eq("user_id", user_id).execute() - data = getattr(response, "data", None) or [] - return data[0] if data else None + return _first(supabase.table("users").select("*").eq("user_id", user_id).limit(1).execute()) -def get_user_by_username(username: str) -> Optional[Dict[str, Any]]: - """Get user information by username.""" - response = supabase.table("users").select("*").eq("username", username).execute() - data = getattr(response, "data", None) or [] - return data[0] if data else None +# ==================== Audio Posts ==================== +def create_audio_post(payload: Dict[str, Any]) -> Dict[str, Any]: + required = ["user_id", "title", "storage_prefix"] + for field in required: + if payload.get(field) in (None, ""): + raise ValueError(f"'{field}' is required.") -def get_user_by_email(email: str) -> Optional[Dict[str, Any]]: - """Get user information by email.""" - response = supabase.table("users").select("*").eq("email", email).execute() - data = getattr(response, "data", None) or [] - return data[0] if data else None - - -def get_user_stats(user_id: int) -> Dict[str, int]: - """Get user statistics (posts, followers, following).""" - # Get post count - posts_response = supabase.table("posts").select("post_id", count="exact").eq("user_id", user_id).execute() - post_count = getattr(posts_response, "count", 0) or 0 - - # Get followers count - followers_response = supabase.table("user_follows").select("follower_id", count="exact").eq("following_id", user_id).execute() - followers_count = getattr(followers_response, "count", 0) or 0 - - # Get following count - following_response = supabase.table("user_follows").select("following_id", count="exact").eq("follower_id", user_id).execute() - following_count = getattr(following_response, "count", 0) or 0 - - # Get total listeners (sum of all listens on user's posts) - listens_response = supabase.rpc("get_user_total_listeners", {"p_user_id": user_id}).execute() - total_listeners = getattr(listens_response, "data", 0) or 0 - - return { - "posts": post_count, - "followers": followers_count, - "following": following_count, - "listeners": total_listeners + data = { + "user_id": payload["user_id"], + "title": payload["title"], + "description": payload.get("description"), + "visibility": payload.get("visibility", "private"), + "status": payload.get("status", "uploaded"), + "recorded_date": payload.get("recorded_date"), + "language": payload.get("language", "en"), + "storage_prefix": payload["storage_prefix"], + "manifest_sha256": payload.get("manifest_sha256"), + "bundle_sha256": payload.get("bundle_sha256"), + "published_at": payload.get("published_at"), } + response = supabase.table("audio_posts").insert(data).execute() + created = _first(response) + if not created: + raise RuntimeError("Failed to create audio post.") + return created -# ==================== POST QUERIES ==================== -def get_post_by_id(post_id: int, requesting_user_id: Optional[int] = None) -> Optional[Dict[str, Any]]: - """ - Get a single post by ID with user info and categories. - Returns None if post is private and requesting_user_id doesn't match post owner. - """ +def get_audio_post_by_id(post_id: int) -> Optional[Dict[str, Any]]: + query = ( + supabase.table("audio_posts") + .select("*, users(user_id, email, display_name, avatar_url)") + .eq("post_id", post_id) + .limit(1) + ) + return _first(query.execute()) + + +def list_audio_posts(page: int = 1, limit: int = 20, visibility: Optional[str] = None, user_id: Optional[int] = None) -> List[Dict[str, Any]]: + start, end = _paginate(page, limit) + query = supabase.table("audio_posts").select("*, users(user_id, email, display_name, avatar_url)") + + if visibility: + query = query.eq("visibility", visibility) + if user_id: + query = query.eq("user_id", user_id) + + response = query.order("created_at", desc=True).range(start, end).execute() + return _rows(response) + + +def update_audio_post(post_id: int, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: + if not updates: + return get_audio_post_by_id(post_id) + + allowed = { + "title", + "description", + "visibility", + "status", + "recorded_date", + "language", + "storage_prefix", + "manifest_sha256", + "bundle_sha256", + "published_at", + } + clean = {k: v for k, v in updates.items() if k in allowed} + if not clean: + return get_audio_post_by_id(post_id) + response = ( - supabase.table("posts") - .select(""" - *, - users!inner(user_id, username, display_name, profile_image_url), - post_categories!inner(category_id, categories!inner(name)) - """) + supabase.table("audio_posts") + .update(clean) .eq("post_id", post_id) .execute() ) - - data = getattr(response, "data", None) or [] - if not data: - return None - - post = data[0] - - # Check privacy - if post.get("is_private") and post.get("user_id") != requesting_user_id: - return None - - return _format_post(post) + return _first(response) -def get_posts_feed( - user_id: int, - page: int = 1, - limit: int = 20, - sort: str = "recent" -) -> List[Dict[str, Any]]: - """ - Get personalized feed for a user. - Includes posts from followed users and followed categories. - """ - offset = (page - 1) * limit +# ==================== Archive Files ==================== - # Base query - query = ( - supabase.table("posts") - .select(""" - *, - users!inner(user_id, username, display_name, profile_image_url), - post_categories(category_id, categories(name)) - """) - .eq("is_private", False) - ) +def add_archive_file(post_id: int, payload: Dict[str, Any]) -> Dict[str, Any]: + required = ["role", "path", "sha256"] + for field in required: + if not payload.get(field): + raise ValueError(f"'{field}' is required.") - # Apply sorting - if sort == "recent": - query = query.order("created_at", desc=True) - elif sort == "popular": - # Would need a view or function to sort by engagement - query = query.order("created_at", desc=True) - - response = query.range(offset, offset + limit - 1).execute() - data = getattr(response, "data", None) or [] - - return [_format_post(post) for post in data] - - -def get_user_posts( - user_id: int, - filter_type: str = "all", - page: int = 1, - limit: int = 20 -) -> List[Dict[str, Any]]: - """ - Get posts created by a specific user. - filter_type: 'all', 'public', 'private' - """ - offset = (page - 1) * limit - - query = ( - supabase.table("posts") - .select(""" - *, - post_categories(category_id, categories(name)) - """) - .eq("user_id", user_id) - ) - - # Apply filter - if filter_type == "public": - query = query.eq("is_private", False) - elif filter_type == "private": - query = query.eq("is_private", True) - - response = query.order("created_at", desc=True).range(offset, offset + limit - 1).execute() - data = getattr(response, "data", None) or [] - - return [_format_post(post) for post in data] - - -def get_post_engagement(post_id: int) -> Dict[str, int]: - """Get engagement metrics for a post (likes, comments, listens).""" - # Get likes count - likes_response = supabase.table("post_likes").select("user_id", count="exact").eq("post_id", post_id).execute() - likes_count = getattr(likes_response, "count", 0) or 0 - - # Get comments count - comments_response = supabase.table("comments").select("comment_id", count="exact").eq("post_id", post_id).execute() - comments_count = getattr(comments_response, "count", 0) or 0 - - # Get listens count - listens_response = supabase.table("audio_listening_history").select("history_id", count="exact").eq("post_id", post_id).execute() - listens_count = getattr(listens_response, "count", 0) or 0 - - # Get bookmarks count - bookmarks_response = supabase.table("bookmarks").select("user_id", count="exact").eq("post_id", post_id).execute() - bookmarks_count = getattr(bookmarks_response, "count", 0) or 0 - - return { - "likes": likes_count, - "comments": comments_count, - "listens": listens_count, - "bookmarks": bookmarks_count + data = { + "post_id": post_id, + "role": payload["role"], + "path": payload["path"], + "content_type": payload.get("content_type"), + "size_bytes": payload.get("size_bytes"), + "sha256": payload["sha256"], } - -def check_user_post_interactions(user_id: int, post_id: int) -> Dict[str, bool]: - """Check if user has liked/bookmarked a post.""" - # Check if liked - like_response = supabase.table("post_likes").select("user_id").eq("user_id", user_id).eq("post_id", post_id).execute() - is_liked = len(getattr(like_response, "data", []) or []) > 0 - - # Check if bookmarked - bookmark_response = supabase.table("bookmarks").select("user_id").eq("user_id", user_id).eq("post_id", post_id).execute() - is_bookmarked = len(getattr(bookmark_response, "data", []) or []) > 0 - - return { - "is_liked": is_liked, - "is_bookmarked": is_bookmarked - } + response = supabase.table("archive_files").insert(data).execute() + created = _first(response) + if not created: + raise RuntimeError("Failed to add archive file.") + return created -# ==================== CATEGORY QUERIES ==================== - -def get_all_categories() -> List[Dict[str, Any]]: - """Get all categories.""" - response = supabase.table("categories").select("*").execute() - data = getattr(response, "data", None) or [] - return data - - -def get_category_by_id(category_id: int) -> Optional[Dict[str, Any]]: - """Get category by ID.""" - response = supabase.table("categories").select("*").eq("category_id", category_id).execute() - data = getattr(response, "data", None) or [] - return data[0] if data else None - - -def get_posts_by_category( - category_id: int, - page: int = 1, - limit: int = 20 -) -> List[Dict[str, Any]]: - """Get posts in a specific category.""" - offset = (page - 1) * limit - +def list_archive_files(post_id: int) -> List[Dict[str, Any]]: response = ( - supabase.table("post_categories") - .select(""" - posts!inner(*, - users!inner(user_id, username, display_name, profile_image_url), - post_categories(category_id, categories(name)) - ) - """) - .eq("category_id", category_id) - .eq("posts.is_private", False) - .order("posts.created_at", desc=True) - .range(offset, offset + limit - 1) - .execute() - ) - - data = getattr(response, "data", None) or [] - return [_format_post(item["posts"]) for item in data] - - -# ==================== COMMENT QUERIES ==================== - -def get_post_comments(post_id: int, page: int = 1, limit: int = 20) -> List[Dict[str, Any]]: - """Get comments for a specific post.""" - offset = (page - 1) * limit - - response = ( - supabase.table("comments") - .select(""" - *, - users!inner(user_id, username, display_name, profile_image_url) - """) - .eq("post_id", post_id) - .order("created_at", desc=True) - .range(offset, offset + limit - 1) - .execute() - ) - - data = getattr(response, "data", None) or [] - return data - - -# ==================== HISTORY QUERIES ==================== - -def get_listening_history( - user_id: int, - page: int = 1, - limit: int = 50, - completed_only: bool = False -) -> List[Dict[str, Any]]: - """Get user's listening history.""" - offset = (page - 1) * limit - - query = ( - supabase.table("audio_listening_history") - .select(""" - *, - posts!inner(*, - users!inner(user_id, username, display_name, profile_image_url) - ) - """) - .eq("user_id", user_id) - ) - - if completed_only: - query = query.eq("completed", True) - - response = query.order("listened_at", desc=True).range(offset, offset + limit - 1).execute() - data = getattr(response, "data", None) or [] - - return data - - -def get_search_history(user_id: int, page: int = 1, limit: int = 50) -> List[Dict[str, Any]]: - """Get user's search history.""" - offset = (page - 1) * limit - - response = ( - supabase.table("search_history") + supabase.table("archive_files") .select("*") - .eq("user_id", user_id) - .order("searched_at", desc=True) - .range(offset, offset + limit - 1) + .eq("post_id", post_id) + .order("created_at", desc=False) .execute() ) - - data = getattr(response, "data", None) or [] - return data + return _rows(response) -# ==================== SEARCH QUERIES ==================== +# ==================== Metadata / Rights ==================== -def search_posts( - query: str, - category_id: Optional[int] = None, - page: int = 1, - limit: int = 20, - requesting_user_id: Optional[int] = None -) -> List[Dict[str, Any]]: - """ - Search posts by text query. - Uses full-text search on title and transcribed_text. - """ - offset = (page - 1) * limit +def upsert_archive_metadata(post_id: int, metadata: str) -> Dict[str, Any]: + data = {"post_id": post_id, "metadata": metadata} - # Basic search using ilike (for simple text matching) - # For production, you'd want to use PostgreSQL full-text search - search_query = ( - supabase.table("posts") - .select(""" - *, - users!inner(user_id, username, display_name, profile_image_url), - post_categories(category_id, categories(name)) - """) - .eq("is_private", False) - .or_(f"title.ilike.%{query}%,transcribed_text.ilike.%{query}%") - ) + existing = _first(supabase.table("archive_metadata").select("post_id").eq("post_id", post_id).limit(1).execute()) + if existing: + response = supabase.table("archive_metadata").update({"metadata": metadata}).eq("post_id", post_id).execute() + else: + response = supabase.table("archive_metadata").insert(data).execute() - if category_id: - # This would need a join with post_categories - pass - - response = search_query.order("created_at", desc=True).range(offset, offset + limit - 1).execute() - data = getattr(response, "data", None) or [] - - return [_format_post(post) for post in data] + row = _first(response) + if not row: + raise RuntimeError("Failed to upsert archive metadata.") + return row -# ==================== TRENDING QUERIES ==================== - -def get_trending_topics(limit: int = 5) -> List[Dict[str, Any]]: - """ - Get trending categories based on recent post activity. - This is a simplified version - for production, you'd want a materialized view. - """ - # This would ideally be a database view or function - # For now, we'll get categories with most posts in last 7 days - response = ( - supabase.rpc("get_trending_categories", {"p_limit": limit}) - .execute() - ) - - data = getattr(response, "data", None) or [] - return data +def get_archive_metadata(post_id: int) -> Optional[Dict[str, Any]]: + return _first(supabase.table("archive_metadata").select("*").eq("post_id", post_id).limit(1).execute()) -# ==================== BOOKMARKS QUERIES ==================== - -def get_user_bookmarks(user_id: int, page: int = 1, limit: int = 20) -> List[Dict[str, Any]]: - """Get user's bookmarked posts.""" - offset = (page - 1) * limit - - response = ( - supabase.table("bookmarks") - .select(""" - *, - posts!inner(*, - users!inner(user_id, username, display_name, profile_image_url), - post_categories(category_id, categories(name)) - ) - """) - .eq("user_id", user_id) - .order("bookmarked_at", desc=True) - .range(offset, offset + limit - 1) - .execute() - ) - - data = getattr(response, "data", None) or [] - return [_format_post(item["posts"]) for item in data] - - -# ==================== HELPER FUNCTIONS ==================== - -def _format_post(post: Dict[str, Any]) -> Dict[str, Any]: - """Format post data to include engagement metrics and clean structure.""" - post_id = post.get("post_id") - - # Get engagement metrics - engagement = get_post_engagement(post_id) if post_id else {"likes": 0, "comments": 0, "listens": 0, "bookmarks": 0} - - # Extract categories - categories = [] - if "post_categories" in post and post["post_categories"]: - for pc in post["post_categories"]: - if "categories" in pc and pc["categories"]: - categories.append(pc["categories"]) - - # Clean user data - user_data = post.get("users", {}) - - return { - "id": post.get("post_id"), - "user_id": post.get("user_id"), - "title": post.get("title"), - "audio_url": post.get("audio_url"), - "transcribed_text": post.get("transcribed_text"), - "audio_duration_seconds": post.get("audio_duration_seconds"), - "image_url": post.get("image_url"), - "is_private": post.get("is_private"), - "created_at": post.get("created_at"), - "updated_at": post.get("updated_at"), - "user": { - "id": user_data.get("user_id"), - "username": user_data.get("username"), - "display_name": user_data.get("display_name"), - "profile_image_url": user_data.get("profile_image_url") - }, - "categories": categories, - "likes": engagement["likes"], - "comments": engagement["comments"], - "listens": engagement["listens"], - "bookmarks": engagement["bookmarks"] +def upsert_archive_rights(post_id: int, payload: Dict[str, Any]) -> Dict[str, Any]: + data = { + "post_id": post_id, + "has_speaker_consent": payload.get("has_speaker_consent", False), + "license": payload.get("license"), + "consent_notes": payload.get("consent_notes"), + "allowed_use": payload.get("allowed_use"), + "restrictions": payload.get("restrictions"), } + existing = _first(supabase.table("archive_rights").select("post_id").eq("post_id", post_id).limit(1).execute()) + if existing: + response = ( + supabase.table("archive_rights") + .update({k: v for k, v in data.items() if k != "post_id"}) + .eq("post_id", post_id) + .execute() + ) + else: + response = supabase.table("archive_rights").insert(data).execute() -def get_pagination_info(total_count: int, page: int, limit: int) -> Dict[str, Any]: - """Calculate pagination information.""" - total_pages = (total_count + limit - 1) // limit - has_more = page < total_pages + row = _first(response) + if not row: + raise RuntimeError("Failed to upsert archive rights.") + return row + + +def get_archive_rights(post_id: int) -> Optional[Dict[str, Any]]: + return _first(supabase.table("archive_rights").select("*").eq("post_id", post_id).limit(1).execute()) + + +# ==================== RAG Chunks ==================== + +def add_rag_chunks(post_id: int, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + if not chunks: + return [] + + rows = [] + for c in chunks: + rows.append( + { + "post_id": post_id, + "start_sec": c.get("start_sec"), + "end_sec": c.get("end_sec"), + "text": c.get("text"), + "confidence": c.get("confidence"), + "embedding": c.get("embedding"), + } + ) + + response = supabase.table("rag_chunks").insert(rows).execute() + return _rows(response) + + +def list_rag_chunks(post_id: int, page: int = 1, limit: int = 200) -> List[Dict[str, Any]]: + start, end = _paginate(page, limit) + response = ( + supabase.table("rag_chunks") + .select("*") + .eq("post_id", post_id) + .order("start_sec", desc=False) + .range(start, end) + .execute() + ) + return _rows(response) + + +# ==================== Audit Log ==================== + +def add_audit_log(payload: Dict[str, Any]) -> Dict[str, Any]: + if not payload.get("action"): + raise ValueError("'action' is required.") + + data = { + "post_id": payload.get("post_id"), + "user_id": payload.get("user_id"), + "action": payload["action"], + "details": payload.get("details"), + } + + response = supabase.table("audit_log").insert(data).execute() + row = _first(response) + if not row: + raise RuntimeError("Failed to create audit log.") + return row + + +def list_audit_logs(post_id: Optional[int] = None, user_id: Optional[int] = None, page: int = 1, limit: int = 100) -> List[Dict[str, Any]]: + start, end = _paginate(page, limit) + query = supabase.table("audit_log").select("*") + + if post_id is not None: + query = query.eq("post_id", post_id) + if user_id is not None: + query = query.eq("user_id", user_id) + + response = query.order("created_at", desc=True).range(start, end).execute() + return _rows(response) + + +# ==================== Aggregate View ==================== + +def get_post_bundle(post_id: int) -> Dict[str, Any]: + post = get_audio_post_by_id(post_id) + if not post: + return {} return { - "current_page": page, - "total_pages": total_pages, - "total_items": total_count, - "items_per_page": limit, - "has_more": has_more + "post": post, + "files": list_archive_files(post_id), + "metadata": get_archive_metadata(post_id), + "rights": get_archive_rights(post_id), + "rag_chunks": list_rag_chunks(post_id, page=1, limit=1000), + "audit_log": list_audit_logs(post_id=post_id, page=1, limit=200), }