Files
2025-04-20 21:42:34 -06:00

273 lines
9.2 KiB
Python

# pip install mysql.connector
import mysql.connector
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import logging
import random
def database():
db_connection = mysql.connector.connect(
host = "localhost",
port = "3306",
user = "root",
database = "Marketplace"
)
return db_connection
def delete_user_recommendations(user_id):
db_con = database()
cursor = db_con.cursor()
try:
cursor.execute("DELETE FROM Recommendation WHERE UserID = %s", (user_id,))
db_con.commit()
print(f"Deleted existing recommendations for user {user_id}")
logging.info(f"Deleted existing recommendations for user {user_id}")
return True
except Exception as e:
logging.error(f"Error deleting recommendations for user {user_id}: {str(e)}")
db_con.rollback()
return False
finally:
cursor.close()
db_con.close()
def get_random_products(count=10, exclude_list=None):
"""Get random products from the database, excluding any in the exclude_list"""
db_con = database()
cursor = db_con.cursor()
try:
if exclude_list and len(exclude_list) > 0:
# Convert exclude_list to string for SQL IN clause
exclude_str = ', '.join(map(str, exclude_list))
cursor.execute(f"SELECT ProductID FROM Product WHERE ProductID NOT IN ({exclude_str}) ORDER BY RAND() LIMIT {count}")
else:
cursor.execute(f"SELECT ProductID FROM Product ORDER BY RAND() LIMIT {count}")
random_products = [row[0] for row in cursor.fetchall()]
return random_products
except Exception as e:
logging.error(f"Error getting random products: {str(e)}")
return []
finally:
cursor.close()
db_con.close()
def get_popular_products(count=10):
"""Get popular products based on history table frequency"""
db_con = database()
cursor = db_con.cursor()
try:
# Get products that appear most frequently in history
cursor.execute("""
SELECT ProductID, COUNT(*) as count
FROM History
GROUP BY ProductID
ORDER BY count DESC
LIMIT %s
""", (count,))
popular_products = [row[0] for row in cursor.fetchall()]
# If not enough popular products, supplement with random ones
if len(popular_products) < count:
random_products = get_random_products(count - len(popular_products), popular_products)
popular_products.extend(random_products)
return popular_products
except Exception as e:
logging.error(f"Error getting popular products: {str(e)}")
return get_random_products(count) # Fallback to random products
finally:
cursor.close()
db_con.close()
def has_user_history_or_recommendations(user_id):
"""Check if user exists in History or Recommendation table"""
db_con = database()
cursor = db_con.cursor()
try:
# Check if user has history
cursor.execute("SELECT COUNT(*) FROM History WHERE UserID = %s", (user_id,))
history_count = cursor.fetchone()[0]
# Check if user has recommendations
cursor.execute("SELECT COUNT(*) FROM Recommendation WHERE UserID = %s", (user_id,))
recommendation_count = cursor.fetchone()[0]
return history_count > 0 or recommendation_count > 0
except Exception as e:
logging.error(f"Error checking user history/recommendations: {str(e)}")
return False
finally:
cursor.close()
db_con.close()
def get_all_products():
db_con = database()
cursor = db_con.cursor()
try:
cursor.execute("SELECT CategoryID FROM Category")
categories = cursor.fetchall()
select_clause = "SELECT p.ProductID"
for category in categories:
category_id = category[0]
select_clause += f", MAX(CASE WHEN pc.CategoryID = {category_id} THEN 1 ELSE 0 END) AS `Cat_{category_id}`"
final_query = f"""
{select_clause}
FROM Product p
LEFT JOIN Product_Category pc ON p.ProductID = pc.ProductID
LEFT JOIN Category c ON pc.CategoryID = c.CategoryID
GROUP BY p.ProductID;
"""
cursor.execute(final_query)
results = cursor.fetchall()
final = []
product_ids = []
for row in results:
text_list = list(row)
product_id = text_list.pop(0) # Save the product ID before removing it
final.append(text_list)
product_ids.append(product_id)
cursor.close()
db_con.close()
return final, product_ids # Return both feature vectors and product IDs
except Exception as e:
logging.error(f"Error getting all products: {str(e)}")
cursor.close()
db_con.close()
return [], []
def get_user_history(user_id):
db_con = database()
cursor = db_con.cursor()
try:
cursor.execute("SELECT CategoryID FROM Category")
categories = cursor.fetchall()
select_clause = "SELECT p.ProductID"
for category in categories:
category_id = category[0] # get the uid of the category and then append that to the new column
select_clause += f", MAX(CASE WHEN pc.CategoryID = {category_id} THEN 1 ELSE 0 END) AS `Cat_{category_id}`"
final_query = f"""
{select_clause}
FROM Product p
LEFT JOIN Product_Category pc ON p.ProductID = pc.ProductID
LEFT JOIN Category c ON pc.CategoryID = c.CategoryID
where p.ProductID in (select ProductID from History where UserID = {user_id})
GROUP BY p.ProductID;
"""
cursor.execute(final_query)
results = cursor.fetchall()
final = []
for row in results:
text_list = list(row)
text_list.pop(0)
final.append(text_list)
cursor.close()
db_con.close()
return final
except Exception as e:
logging.error(f"Error getting user history: {str(e)}")
cursor.close()
db_con.close()
return []
def get_recommendations(user_id, top_n=5):
try:
# Always delete existing recommendations first
delete_user_recommendations(user_id)
# Check if user has history or recommendations
if not has_user_history_or_recommendations(user_id):
# Cold start: return random products
random_recs = get_random_products(top_n)
# Store these random recommendations
history_upload(user_id, random_recs)
# Add 5 more unique random products
additional_random = get_random_products(5, random_recs)
history_upload(user_id, additional_random)
return random_recs + additional_random
# Get all products and user history with their category vectors
all_product_features, all_product_ids = get_all_products()
user_history = get_user_history(user_id)
if not user_history:
# User exists but has no history yet
popular_recs = get_popular_products(top_n)
history_upload(user_id, popular_recs)
# Add 5 more unique random products
additional_random = get_random_products(5, popular_recs)
history_upload(user_id, additional_random)
return popular_recs + additional_random
# Calculate similarity between all products and user history
user_profile = np.mean(user_history, axis=0) # Average user preferences
similarities = cosine_similarity([user_profile], all_product_features)
print(similarities)
# Get indices of the top N products sorted by similarity
product_indices = similarities[0].argsort()[-top_n:][::-1]
# Get the actual product IDs using the indices
recommended_product_ids = [all_product_ids[i] for i in product_indices]
# Upload the core recommendations to the database
history_upload(user_id, recommended_product_ids)
# Add 5 more unique random products that aren't in the recommendations
additional_random = get_random_products(5, recommended_product_ids)
history_upload(user_id, additional_random)
# Return both the similarity-based recommendations and the random ones
return recommended_product_ids + additional_random
except Exception as e:
logging.error(f"Recommendation error for user {user_id}: {str(e)}")
# Fallback to random products
random_products = get_random_products(top_n + 5)
return random_products
def history_upload(userID, products):
"""Upload product recommendations to the database"""
db_con = database()
cursor = db_con.cursor()
try:
for product_id in products:
# Use parameterized queries to prevent SQL injection
cursor.execute("INSERT INTO Recommendation (UserID, RecommendedProductID) VALUES (%s, %s)",
(userID, product_id))
# Commit the changes
db_con.commit()
except Exception as e:
logging.error(f"Error uploading recommendations: {str(e)}")
db_con.rollback()
finally:
# Close the cursor and connection
cursor.close()
db_con.close()