Files
Fin-Track/Display_API.py

101 lines
3.7 KiB
Python
Raw Normal View History

2025-02-16 09:52:09 -07:00
import serial
import json
import requests
import time
from datetime import datetime, timedelta
class PlaidDataSender:
def __init__(self, access_token, serial_port='/dev/tty.usbserial-0001', baud_rate=115200):
self.access_token = access_token
self.serial_port = serial_port
self.baud_rate = baud_rate
self.api_base_url = 'http://localhost:3000'
def fetch_financial_data(self):
"""Fetch real financial data from Plaid API"""
headers = {'plaid-access-token': self.access_token}
# Fetch accounts
accounts_response = requests.get(
f'{self.api_base_url}/api/accounts',
headers=headers
)
accounts_data = accounts_response.json()
# Fetch transactions
transactions_response = requests.get(
f'{self.api_base_url}/api/transactions',
headers=headers
)
transactions_data = transactions_response.json()
# Calculate totals
total_balance = sum(account['balances']['current'] for account in accounts_data['accounts'])
transactions = transactions_data['transactions']
total_income = sum(trans['amount'] for trans in transactions if trans['amount'] > 0)
total_expenses = sum(trans['amount'] for trans in transactions if trans['amount'] < 0)
# Format data for ESP32
financial_summary = {
"bank_balance": round(total_balance, 2),
"total_income": round(total_income, 2),
"total_expenses": round(abs(total_expenses), 2),
"net_amount": round(total_income + total_expenses, 2),
"recent_transactions": [
{
"name": trans['merchant_name'] or trans['name'],
"amount": trans['amount'],
"type": "income" if trans['amount'] > 0 else "expense",
"date": trans['date']
}
for trans in sorted(transactions, key=lambda x: x['date'], reverse=True)[:5]
]
}
return financial_summary
def run(self):
"""Run the data sender"""
try:
# Setup serial connection
with serial.Serial(self.serial_port, self.baud_rate, timeout=1) as ser:
print(f"Connected to ESP32 on {self.serial_port}")
time.sleep(2) # Wait for connection to establish
while True:
try:
# Fetch latest financial data
financial_data = self.fetch_financial_data()
# Convert to JSON and send
json_data = json.dumps(financial_data)
ser.write((json_data + '\n').encode('utf-8'))
print(f"Sent financial data: {json_data}")
time.sleep(10) # Wait 5 seconds before next update
except requests.RequestException as e:
print(f"API Error: {e}")
time.sleep(5)
except Exception as e:
print(f"Error sending data: {e}")
time.sleep(1)
except serial.SerialException as e:
print(f"Serial connection error: {e}")
except KeyboardInterrupt:
print("\nStopped by user")
def main():
access_token = "access-sandbox-1331c29e-2e5a-492b-ac03-623c4a7aa146"
sender = PlaidDataSender(
access_token=access_token,
2025-02-26 20:37:55 -07:00
serial_port='/dev/tty.usbserial-0001',
2025-02-16 09:52:09 -07:00
baud_rate=115200
)
sender.run()
if __name__ == "__main__":
2025-02-26 20:37:55 -07:00
main()