273 lines
9.2 KiB
Python
273 lines
9.2 KiB
Python
# pip install mysql.connector
|
|
import mysql.connector
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
import numpy as np
|
|
import logging
|
|
import random
|
|
|
|
def database():
|
|
db_connection = mysql.connector.connect(
|
|
host = "localhost",
|
|
port = "3306",
|
|
user = "root",
|
|
database = "Marketplace"
|
|
)
|
|
return db_connection
|
|
|
|
def delete_user_recommendations(user_id):
|
|
db_con = database()
|
|
cursor = db_con.cursor()
|
|
|
|
try:
|
|
cursor.execute("DELETE FROM Recommendation WHERE UserID = %s", (user_id,))
|
|
db_con.commit()
|
|
print(f"Deleted existing recommendations for user {user_id}")
|
|
logging.info(f"Deleted existing recommendations for user {user_id}")
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"Error deleting recommendations for user {user_id}: {str(e)}")
|
|
db_con.rollback()
|
|
return False
|
|
finally:
|
|
cursor.close()
|
|
db_con.close()
|
|
|
|
def get_random_products(count=10, exclude_list=None):
|
|
"""Get random products from the database, excluding any in the exclude_list"""
|
|
db_con = database()
|
|
cursor = db_con.cursor()
|
|
|
|
try:
|
|
if exclude_list and len(exclude_list) > 0:
|
|
# Convert exclude_list to string for SQL IN clause
|
|
exclude_str = ', '.join(map(str, exclude_list))
|
|
cursor.execute(f"SELECT ProductID FROM Product WHERE ProductID NOT IN ({exclude_str}) ORDER BY RAND() LIMIT {count}")
|
|
else:
|
|
cursor.execute(f"SELECT ProductID FROM Product ORDER BY RAND() LIMIT {count}")
|
|
|
|
random_products = [row[0] for row in cursor.fetchall()]
|
|
return random_products
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error getting random products: {str(e)}")
|
|
return []
|
|
finally:
|
|
cursor.close()
|
|
db_con.close()
|
|
|
|
def get_popular_products(count=10):
|
|
"""Get popular products based on history table frequency"""
|
|
db_con = database()
|
|
cursor = db_con.cursor()
|
|
|
|
try:
|
|
# Get products that appear most frequently in history
|
|
cursor.execute("""
|
|
SELECT ProductID, COUNT(*) as count
|
|
FROM History
|
|
GROUP BY ProductID
|
|
ORDER BY count DESC
|
|
LIMIT %s
|
|
""", (count,))
|
|
|
|
popular_products = [row[0] for row in cursor.fetchall()]
|
|
|
|
# If not enough popular products, supplement with random ones
|
|
if len(popular_products) < count:
|
|
random_products = get_random_products(count - len(popular_products), popular_products)
|
|
popular_products.extend(random_products)
|
|
|
|
return popular_products
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error getting popular products: {str(e)}")
|
|
return get_random_products(count) # Fallback to random products
|
|
finally:
|
|
cursor.close()
|
|
db_con.close()
|
|
|
|
def has_user_history_or_recommendations(user_id):
|
|
"""Check if user exists in History or Recommendation table"""
|
|
db_con = database()
|
|
cursor = db_con.cursor()
|
|
|
|
try:
|
|
# Check if user has history
|
|
cursor.execute("SELECT COUNT(*) FROM History WHERE UserID = %s", (user_id,))
|
|
history_count = cursor.fetchone()[0]
|
|
|
|
# Check if user has recommendations
|
|
cursor.execute("SELECT COUNT(*) FROM Recommendation WHERE UserID = %s", (user_id,))
|
|
recommendation_count = cursor.fetchone()[0]
|
|
|
|
return history_count > 0 or recommendation_count > 0
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error checking user history/recommendations: {str(e)}")
|
|
return False
|
|
finally:
|
|
cursor.close()
|
|
db_con.close()
|
|
|
|
def get_all_products():
|
|
db_con = database()
|
|
cursor = db_con.cursor()
|
|
|
|
try:
|
|
cursor.execute("SELECT CategoryID FROM Category")
|
|
categories = cursor.fetchall()
|
|
|
|
select_clause = "SELECT p.ProductID"
|
|
for category in categories:
|
|
category_id = category[0]
|
|
select_clause += f", MAX(CASE WHEN pc.CategoryID = {category_id} THEN 1 ELSE 0 END) AS `Cat_{category_id}`"
|
|
|
|
final_query = f"""
|
|
{select_clause}
|
|
FROM Product p
|
|
LEFT JOIN Product_Category pc ON p.ProductID = pc.ProductID
|
|
LEFT JOIN Category c ON pc.CategoryID = c.CategoryID
|
|
GROUP BY p.ProductID;
|
|
"""
|
|
|
|
cursor.execute(final_query)
|
|
results = cursor.fetchall()
|
|
|
|
final = []
|
|
product_ids = []
|
|
for row in results:
|
|
text_list = list(row)
|
|
product_id = text_list.pop(0) # Save the product ID before removing it
|
|
final.append(text_list)
|
|
product_ids.append(product_id)
|
|
|
|
cursor.close()
|
|
db_con.close()
|
|
return final, product_ids # Return both feature vectors and product IDs
|
|
except Exception as e:
|
|
logging.error(f"Error getting all products: {str(e)}")
|
|
cursor.close()
|
|
db_con.close()
|
|
return [], []
|
|
|
|
def get_user_history(user_id):
|
|
db_con = database()
|
|
cursor = db_con.cursor()
|
|
|
|
try:
|
|
cursor.execute("SELECT CategoryID FROM Category")
|
|
categories = cursor.fetchall()
|
|
|
|
select_clause = "SELECT p.ProductID"
|
|
for category in categories:
|
|
category_id = category[0] # get the uid of the category and then append that to the new column
|
|
select_clause += f", MAX(CASE WHEN pc.CategoryID = {category_id} THEN 1 ELSE 0 END) AS `Cat_{category_id}`"
|
|
|
|
final_query = f"""
|
|
{select_clause}
|
|
FROM Product p
|
|
LEFT JOIN Product_Category pc ON p.ProductID = pc.ProductID
|
|
LEFT JOIN Category c ON pc.CategoryID = c.CategoryID
|
|
where p.ProductID in (select ProductID from History where UserID = {user_id})
|
|
GROUP BY p.ProductID;
|
|
"""
|
|
|
|
cursor.execute(final_query)
|
|
results = cursor.fetchall()
|
|
final = []
|
|
for row in results:
|
|
text_list = list(row)
|
|
text_list.pop(0)
|
|
final.append(text_list)
|
|
|
|
cursor.close()
|
|
db_con.close()
|
|
return final
|
|
except Exception as e:
|
|
logging.error(f"Error getting user history: {str(e)}")
|
|
cursor.close()
|
|
db_con.close()
|
|
return []
|
|
|
|
def get_recommendations(user_id, top_n=5):
|
|
try:
|
|
# Always delete existing recommendations first
|
|
delete_user_recommendations(user_id)
|
|
|
|
# Check if user has history or recommendations
|
|
if not has_user_history_or_recommendations(user_id):
|
|
# Cold start: return random products
|
|
random_recs = get_random_products(top_n)
|
|
# Store these random recommendations
|
|
history_upload(user_id, random_recs)
|
|
|
|
# Add 5 more unique random products
|
|
additional_random = get_random_products(5, random_recs)
|
|
history_upload(user_id, additional_random)
|
|
|
|
return random_recs + additional_random
|
|
|
|
# Get all products and user history with their category vectors
|
|
all_product_features, all_product_ids = get_all_products()
|
|
user_history = get_user_history(user_id)
|
|
|
|
if not user_history:
|
|
# User exists but has no history yet
|
|
popular_recs = get_popular_products(top_n)
|
|
history_upload(user_id, popular_recs)
|
|
|
|
# Add 5 more unique random products
|
|
additional_random = get_random_products(5, popular_recs)
|
|
history_upload(user_id, additional_random)
|
|
|
|
return popular_recs + additional_random
|
|
|
|
# Calculate similarity between all products and user history
|
|
user_profile = np.mean(user_history, axis=0) # Average user preferences
|
|
similarities = cosine_similarity([user_profile], all_product_features)
|
|
print(similarities)
|
|
|
|
# Get indices of the top N products sorted by similarity
|
|
product_indices = similarities[0].argsort()[-top_n:][::-1]
|
|
|
|
# Get the actual product IDs using the indices
|
|
recommended_product_ids = [all_product_ids[i] for i in product_indices]
|
|
|
|
# Upload the core recommendations to the database
|
|
history_upload(user_id, recommended_product_ids)
|
|
|
|
# Add 5 more unique random products that aren't in the recommendations
|
|
additional_random = get_random_products(5, recommended_product_ids)
|
|
history_upload(user_id, additional_random)
|
|
|
|
# Return both the similarity-based recommendations and the random ones
|
|
return recommended_product_ids + additional_random
|
|
|
|
except Exception as e:
|
|
logging.error(f"Recommendation error for user {user_id}: {str(e)}")
|
|
# Fallback to random products
|
|
random_products = get_random_products(top_n + 5)
|
|
return random_products
|
|
|
|
def history_upload(userID, products):
|
|
"""Upload product recommendations to the database"""
|
|
db_con = database()
|
|
cursor = db_con.cursor()
|
|
|
|
try:
|
|
for product_id in products:
|
|
# Use parameterized queries to prevent SQL injection
|
|
cursor.execute("INSERT INTO Recommendation (UserID, RecommendedProductID) VALUES (%s, %s)",
|
|
(userID, product_id))
|
|
|
|
# Commit the changes
|
|
db_con.commit()
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error uploading recommendations: {str(e)}")
|
|
db_con.rollback()
|
|
finally:
|
|
# Close the cursor and connection
|
|
cursor.close()
|
|
db_con.close()
|