home and selligns page polished
This commit is contained in:
Binary file not shown.
@@ -3,7 +3,7 @@ import mysql.connector
|
||||
from sklearn.metrics.pairwise import cosine_similarity
|
||||
import numpy as np
|
||||
import logging
|
||||
from unittest import result
|
||||
import random
|
||||
|
||||
def database():
|
||||
db_connection = mysql.connector.connect(
|
||||
@@ -14,145 +14,257 @@ def database():
|
||||
)
|
||||
return db_connection
|
||||
|
||||
def get_popular_products():
|
||||
pass
|
||||
|
||||
|
||||
def delete_user_recommendation(userID, Array):
|
||||
def delete_user_recommendations(user_id):
|
||||
db_con = database()
|
||||
cursor = db_con.cursor()
|
||||
|
||||
try:
|
||||
for item in Array:
|
||||
#Product ID starts form index 1
|
||||
item_value = item + 1
|
||||
print(item_value)
|
||||
# Use parameterized queries to prevent SQL injection
|
||||
cursor.execute(f"INTO Recommendation (UserID, RecommendedProductID) VALUES ({userID}, {item_value});")
|
||||
|
||||
cursor.execute("DELETE FROM Recommendation WHERE UserID = %s", (user_id,))
|
||||
db_con.commit()
|
||||
print(f"Deleted existing recommendations for user {user_id}")
|
||||
logging.info(f"Deleted existing recommendations for user {user_id}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logging.error(f"Error deleting recommendations for user {user_id}: {str(e)}")
|
||||
db_con.rollback()
|
||||
return False
|
||||
finally:
|
||||
cursor.close()
|
||||
db_con.close()
|
||||
|
||||
#results = cursor.fetchall()
|
||||
#print(results)
|
||||
except:
|
||||
pass
|
||||
|
||||
def get_all_products():
|
||||
|
||||
def get_random_products(count=10, exclude_list=None):
|
||||
"""Get random products from the database, excluding any in the exclude_list"""
|
||||
db_con = database()
|
||||
cursor = db_con.cursor()
|
||||
|
||||
cursor.execute("SELECT CategoryID FROM Category")
|
||||
categories = cursor.fetchall()
|
||||
try:
|
||||
if exclude_list and len(exclude_list) > 0:
|
||||
# Convert exclude_list to string for SQL IN clause
|
||||
exclude_str = ', '.join(map(str, exclude_list))
|
||||
cursor.execute(f"SELECT ProductID FROM Product WHERE ProductID NOT IN ({exclude_str}) ORDER BY RAND() LIMIT {count}")
|
||||
else:
|
||||
cursor.execute(f"SELECT ProductID FROM Product ORDER BY RAND() LIMIT {count}")
|
||||
|
||||
select_clause = "SELECT p.ProductID"
|
||||
for category in categories:
|
||||
category_id = category[0]
|
||||
select_clause += f", MAX(CASE WHEN pc.CategoryID = {category_id} THEN 1 ELSE 0 END) AS `Cat_{category_id}`"
|
||||
random_products = [row[0] for row in cursor.fetchall()]
|
||||
return random_products
|
||||
|
||||
final_query = f"""
|
||||
{select_clause}
|
||||
FROM Product p
|
||||
LEFT JOIN Product_Category pc ON p.ProductID = pc.ProductID
|
||||
LEFT JOIN Category c ON pc.CategoryID = c.CategoryID
|
||||
GROUP BY p.ProductID;
|
||||
"""
|
||||
except Exception as e:
|
||||
logging.error(f"Error getting random products: {str(e)}")
|
||||
return []
|
||||
finally:
|
||||
cursor.close()
|
||||
db_con.close()
|
||||
|
||||
cursor.execute(final_query)
|
||||
results = cursor.fetchall()
|
||||
def get_popular_products(count=10):
|
||||
"""Get popular products based on history table frequency"""
|
||||
db_con = database()
|
||||
cursor = db_con.cursor()
|
||||
|
||||
final = []
|
||||
for row in results:
|
||||
text_list = list(row)
|
||||
text_list.pop(0)
|
||||
final.append(text_list)
|
||||
try:
|
||||
# Get products that appear most frequently in history
|
||||
cursor.execute("""
|
||||
SELECT ProductID, COUNT(*) as count
|
||||
FROM History
|
||||
GROUP BY ProductID
|
||||
ORDER BY count DESC
|
||||
LIMIT %s
|
||||
""", (count,))
|
||||
|
||||
cursor.close()
|
||||
db_con.close()
|
||||
return final
|
||||
popular_products = [row[0] for row in cursor.fetchall()]
|
||||
|
||||
# If not enough popular products, supplement with random ones
|
||||
if len(popular_products) < count:
|
||||
random_products = get_random_products(count - len(popular_products), popular_products)
|
||||
popular_products.extend(random_products)
|
||||
|
||||
return popular_products
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error getting popular products: {str(e)}")
|
||||
return get_random_products(count) # Fallback to random products
|
||||
finally:
|
||||
cursor.close()
|
||||
db_con.close()
|
||||
|
||||
def has_user_history_or_recommendations(user_id):
|
||||
"""Check if user exists in History or Recommendation table"""
|
||||
db_con = database()
|
||||
cursor = db_con.cursor()
|
||||
|
||||
try:
|
||||
# Check if user has history
|
||||
cursor.execute("SELECT COUNT(*) FROM History WHERE UserID = %s", (user_id,))
|
||||
history_count = cursor.fetchone()[0]
|
||||
|
||||
# Check if user has recommendations
|
||||
cursor.execute("SELECT COUNT(*) FROM Recommendation WHERE UserID = %s", (user_id,))
|
||||
recommendation_count = cursor.fetchone()[0]
|
||||
|
||||
return history_count > 0 or recommendation_count > 0
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error checking user history/recommendations: {str(e)}")
|
||||
return False
|
||||
finally:
|
||||
cursor.close()
|
||||
db_con.close()
|
||||
|
||||
def get_all_products():
|
||||
db_con = database()
|
||||
cursor = db_con.cursor()
|
||||
|
||||
try:
|
||||
cursor.execute("SELECT CategoryID FROM Category")
|
||||
categories = cursor.fetchall()
|
||||
|
||||
select_clause = "SELECT p.ProductID"
|
||||
for category in categories:
|
||||
category_id = category[0]
|
||||
select_clause += f", MAX(CASE WHEN pc.CategoryID = {category_id} THEN 1 ELSE 0 END) AS `Cat_{category_id}`"
|
||||
|
||||
final_query = f"""
|
||||
{select_clause}
|
||||
FROM Product p
|
||||
LEFT JOIN Product_Category pc ON p.ProductID = pc.ProductID
|
||||
LEFT JOIN Category c ON pc.CategoryID = c.CategoryID
|
||||
GROUP BY p.ProductID;
|
||||
"""
|
||||
|
||||
cursor.execute(final_query)
|
||||
results = cursor.fetchall()
|
||||
|
||||
final = []
|
||||
product_ids = []
|
||||
for row in results:
|
||||
text_list = list(row)
|
||||
product_id = text_list.pop(0) # Save the product ID before removing it
|
||||
final.append(text_list)
|
||||
product_ids.append(product_id)
|
||||
|
||||
cursor.close()
|
||||
db_con.close()
|
||||
return final, product_ids # Return both feature vectors and product IDs
|
||||
except Exception as e:
|
||||
logging.error(f"Error getting all products: {str(e)}")
|
||||
cursor.close()
|
||||
db_con.close()
|
||||
return [], []
|
||||
|
||||
def get_user_history(user_id):
|
||||
db_con = database()
|
||||
cursor = db_con.cursor()
|
||||
|
||||
cursor.execute("SELECT CategoryID FROM Category")
|
||||
categories = cursor.fetchall()
|
||||
|
||||
select_clause = "SELECT p.ProductID"
|
||||
for category in categories:
|
||||
category_id = category[0] # get the uid of the catefory and then append that to the new column
|
||||
select_clause += f", MAX(CASE WHEN pc.CategoryID = {category_id} THEN 1 ELSE 0 END) AS `Cat_{category_id}`"
|
||||
|
||||
final_query = f"""
|
||||
{select_clause}
|
||||
FROM Product p
|
||||
LEFT JOIN Product_Category pc ON p.ProductID = pc.ProductID
|
||||
LEFT JOIN Category c ON pc.CategoryID = c.CategoryID
|
||||
where p.ProductID in (select ProductID from History where UserID = {user_id})
|
||||
GROUP BY p.ProductID;
|
||||
"""
|
||||
|
||||
cursor.execute(final_query)
|
||||
results = cursor.fetchall()
|
||||
final = []
|
||||
for row in results:
|
||||
text_list = list(row)
|
||||
text_list.pop(0)
|
||||
final.append(text_list)
|
||||
|
||||
cursor.close()
|
||||
db_con.close()
|
||||
return final
|
||||
|
||||
def get_recommendations(user_id, top_n=10):
|
||||
try:
|
||||
cursor.execute("SELECT CategoryID FROM Category")
|
||||
categories = cursor.fetchall()
|
||||
|
||||
select_clause = "SELECT p.ProductID"
|
||||
for category in categories:
|
||||
category_id = category[0] # get the uid of the category and then append that to the new column
|
||||
select_clause += f", MAX(CASE WHEN pc.CategoryID = {category_id} THEN 1 ELSE 0 END) AS `Cat_{category_id}`"
|
||||
|
||||
final_query = f"""
|
||||
{select_clause}
|
||||
FROM Product p
|
||||
LEFT JOIN Product_Category pc ON p.ProductID = pc.ProductID
|
||||
LEFT JOIN Category c ON pc.CategoryID = c.CategoryID
|
||||
where p.ProductID in (select ProductID from History where UserID = {user_id})
|
||||
GROUP BY p.ProductID;
|
||||
"""
|
||||
|
||||
cursor.execute(final_query)
|
||||
results = cursor.fetchall()
|
||||
final = []
|
||||
for row in results:
|
||||
text_list = list(row)
|
||||
text_list.pop(0)
|
||||
final.append(text_list)
|
||||
|
||||
cursor.close()
|
||||
db_con.close()
|
||||
return final
|
||||
except Exception as e:
|
||||
logging.error(f"Error getting user history: {str(e)}")
|
||||
cursor.close()
|
||||
db_con.close()
|
||||
return []
|
||||
|
||||
def get_recommendations(user_id, top_n=5):
|
||||
try:
|
||||
# Always delete existing recommendations first
|
||||
delete_user_recommendations(user_id)
|
||||
|
||||
# Check if user has history or recommendations
|
||||
if not has_user_history_or_recommendations(user_id):
|
||||
# Cold start: return random products
|
||||
random_recs = get_random_products(top_n)
|
||||
# Store these random recommendations
|
||||
history_upload(user_id, random_recs)
|
||||
|
||||
# Add 5 more unique random products
|
||||
additional_random = get_random_products(5, random_recs)
|
||||
history_upload(user_id, additional_random)
|
||||
|
||||
return random_recs + additional_random
|
||||
|
||||
# Get all products and user history with their category vectors
|
||||
all_products = get_all_products()
|
||||
all_product_features, all_product_ids = get_all_products()
|
||||
user_history = get_user_history(user_id)
|
||||
# if not user_history:
|
||||
# #Cold start: return popular products
|
||||
# return get_popular_products(top_n)
|
||||
|
||||
if not user_history:
|
||||
# User exists but has no history yet
|
||||
popular_recs = get_popular_products(top_n)
|
||||
history_upload(user_id, popular_recs)
|
||||
|
||||
# Add 5 more unique random products
|
||||
additional_random = get_random_products(5, popular_recs)
|
||||
history_upload(user_id, additional_random)
|
||||
|
||||
return popular_recs + additional_random
|
||||
|
||||
# Calculate similarity between all products and user history
|
||||
user_profile = np.mean(user_history, axis=0) # Average user preferences
|
||||
similarities = cosine_similarity([user_profile], all_products)
|
||||
# finds the indices of the top N products that have the highest
|
||||
# cosine similarity with the user's profile and sorted from most similar to least similar.
|
||||
similarities = cosine_similarity([user_profile], all_product_features)
|
||||
print(similarities)
|
||||
|
||||
# Get indices of the top N products sorted by similarity
|
||||
product_indices = similarities[0].argsort()[-top_n:][::-1]
|
||||
print("product", product_indices)
|
||||
|
||||
# Get the recommended product IDs
|
||||
recommended_products = [all_products[i][0] for i in product_indices] # Product IDs
|
||||
# Get the actual product IDs using the indices
|
||||
recommended_product_ids = [all_product_ids[i] for i in product_indices]
|
||||
|
||||
# Upload the recommendations to the database
|
||||
history_upload(user_id, product_indices) # Pass the indices directly to history_upload
|
||||
# Upload the core recommendations to the database
|
||||
history_upload(user_id, recommended_product_ids)
|
||||
|
||||
# Add 5 more unique random products that aren't in the recommendations
|
||||
additional_random = get_random_products(5, recommended_product_ids)
|
||||
history_upload(user_id, additional_random)
|
||||
|
||||
# Return both the similarity-based recommendations and the random ones
|
||||
return recommended_product_ids + additional_random
|
||||
|
||||
# Return recommended product IDs
|
||||
return recommended_products
|
||||
except Exception as e:
|
||||
logging.error(f"Recommendation error for user {user_id}: {str(e)}")
|
||||
# return get_popular_products(top_n) # Fallback to popular products
|
||||
# Fallback to random products
|
||||
random_products = get_random_products(top_n + 5)
|
||||
return random_products
|
||||
|
||||
def history_upload(userID, anrr):
|
||||
def history_upload(userID, products):
|
||||
"""Upload product recommendations to the database"""
|
||||
db_con = database()
|
||||
cursor = db_con.cursor()
|
||||
|
||||
try:
|
||||
for item in anrr:
|
||||
#Product ID starts form index 1
|
||||
item_value = item + 1
|
||||
print(item_value)
|
||||
for product_id in products:
|
||||
# Use parameterized queries to prevent SQL injection
|
||||
cursor.execute(f"INSERT INTO Recommendation (UserID, RecommendedProductID) VALUES ({userID}, {item_value});")
|
||||
cursor.execute("INSERT INTO Recommendation (UserID, RecommendedProductID) VALUES (%s, %s)",
|
||||
(userID, product_id))
|
||||
|
||||
# Commit the changes
|
||||
db_con.commit()
|
||||
|
||||
# If you need results, you'd typically fetch them after a SELECT query
|
||||
#results = cursor.fetchall()
|
||||
#print(results)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
logging.error(f"Error uploading recommendations: {str(e)}")
|
||||
db_con.rollback()
|
||||
finally:
|
||||
# Close the cursor and connection
|
||||
|
||||
Reference in New Issue
Block a user