527 lines
16 KiB
Python
527 lines
16 KiB
Python
|
|
from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify, current_app
|
|
from flask_login import login_required, current_user
|
|
from models import db, Product, Category, ProductImage, Review, SizeEnum, User, Order, OrderItem
|
|
import os
|
|
from werkzeug.utils import secure_filename
|
|
import json
|
|
from functools import wraps
|
|
from datetime import datetime
|
|
from sqlalchemy import func
|
|
|
|
admin_bp = Blueprint('admin', __name__, url_prefix='/admin')
|
|
|
|
|
|
|
|
|
|
def admin_required(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
if not current_user.is_authenticated:
|
|
flash("Please login to access admin area", "danger")
|
|
return redirect(url_for('auth.login'))
|
|
if not current_user.is_admin:
|
|
flash("Admin access only!", "danger")
|
|
return redirect(url_for('main.home'))
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
|
|
|
|
def allowed_file(filename):
|
|
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
|
|
return '.' in filename and \
|
|
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
|
|
def get_or_create_default_category():
|
|
default = Category.query.filter_by(name='Uncategorized').first()
|
|
if not default:
|
|
default = Category(name='Uncategorized')
|
|
db.session.add(default)
|
|
db.session.commit()
|
|
return default
|
|
|
|
|
|
def get_categories():
|
|
categories = Category.query.all()
|
|
if not categories:
|
|
default_cat = get_or_create_default_category()
|
|
categories = [default_cat]
|
|
return categories
|
|
|
|
|
|
def get_admin_stats():
|
|
try:
|
|
total_revenue_result = db.session.query(
|
|
func.sum(OrderItem.price_at_purchase * OrderItem.quantity)
|
|
).join(Order).filter(Order.status.in_(['delivered', 'completed'])).scalar()
|
|
|
|
total_revenue = float(
|
|
total_revenue_result) if total_revenue_result else 0.0
|
|
|
|
except:
|
|
|
|
total_revenue = 0.0
|
|
|
|
stats = {
|
|
'total_products': Product.query.count(),
|
|
'total_categories': Category.query.count(),
|
|
'total_reviews': Review.query.count(),
|
|
'total_orders': Order.query.count() if hasattr(Order, 'query') else 0,
|
|
'total_users': User.query.count() if hasattr(User, 'query') else 1,
|
|
'total_revenue': total_revenue,
|
|
'low_stock': Product.query.filter(Product.stock > 0, Product.stock <= 5).count(),
|
|
'out_of_stock': Product.query.filter(Product.stock == 0).count(),
|
|
'total_value': db.session.query(func.sum(Product.price * Product.stock)).scalar() or 0,
|
|
}
|
|
|
|
return stats
|
|
|
|
@admin_bp.route('/')
|
|
@login_required
|
|
@admin_required
|
|
def dashboard():
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = 20
|
|
products = Product.query.order_by(
|
|
Product.id.desc()).paginate(page=page, per_page=per_page)
|
|
|
|
stats = get_admin_stats()
|
|
|
|
return render_template('admin/dashboard.html',
|
|
products=products,
|
|
stats=stats,
|
|
SizeEnum=SizeEnum)
|
|
|
|
@admin_bp.route('/add', methods=['GET', 'POST'])
|
|
@login_required
|
|
@admin_required
|
|
def add_product():
|
|
if request.method == 'POST':
|
|
try:
|
|
name = request.form['name']
|
|
description = request.form.get('description', '')
|
|
price = float(request.form['price'])
|
|
stock = int(request.form.get('stock', 0))
|
|
sku = request.form.get('sku', '')
|
|
color = request.form.get('color', '')
|
|
size = request.form.get('size', '')
|
|
product_type = request.form.get('product_type', '')
|
|
material = request.form.get('material', '')
|
|
company = request.form.get('company', '')
|
|
weight = request.form.get('weight')
|
|
dimensions = request.form.get('dimensions', '')
|
|
|
|
if weight:
|
|
try:
|
|
weight = float(weight)
|
|
except ValueError:
|
|
weight = None
|
|
else:
|
|
weight = None
|
|
|
|
category_id = request.form.get('category_id')
|
|
if not category_id:
|
|
category = get_or_create_default_category()
|
|
category_id = category.id
|
|
|
|
product = Product(
|
|
name=name,
|
|
description=description,
|
|
price=price,
|
|
stock=stock,
|
|
sku=sku,
|
|
color=color,
|
|
size=size,
|
|
material=material,
|
|
company=company,
|
|
weight=weight,
|
|
dimensions=dimensions,
|
|
category_id=category_id
|
|
)
|
|
|
|
db.session.add(product)
|
|
db.session.flush()
|
|
|
|
if 'images[]' in request.files:
|
|
files = request.files.getlist('images[]')
|
|
for i, file in enumerate(files):
|
|
if file and file.filename and allowed_file(file.filename):
|
|
upload_dir = 'static/uploads/products'
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
|
|
filename = secure_filename(file.filename)
|
|
name_part, ext = os.path.splitext(filename)
|
|
unique_filename = f"{name_part}_{product.id}_{i}{ext}"
|
|
file_path = os.path.join(upload_dir, unique_filename)
|
|
file.save(file_path)
|
|
|
|
product_image = ProductImage(
|
|
product_id=product.id,
|
|
filename=unique_filename,
|
|
is_primary=(i == 0),
|
|
display_order=i
|
|
)
|
|
db.session.add(product_image)
|
|
|
|
db.session.commit()
|
|
flash(f"Product '{name}' added successfully!", "success")
|
|
return redirect(url_for('admin.dashboard'))
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
flash(f"Error adding product: {str(e)}", "danger")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
categories = get_categories()
|
|
return render_template('admin/add_product.html',
|
|
categories=categories,
|
|
sizes=SizeEnum.ALL)
|
|
|
|
@admin_bp.route('/edit/<int:id>', methods=['GET', 'POST'])
|
|
@login_required
|
|
@admin_required
|
|
def edit_product(id):
|
|
product = Product.query.get_or_404(id)
|
|
|
|
if request.method == 'POST':
|
|
try:
|
|
product.name = request.form['name']
|
|
product.description = request.form.get('description', '')
|
|
product.price = float(request.form['price'])
|
|
product.stock = int(request.form.get('stock', 0))
|
|
product.sku = request.form.get('sku', '')
|
|
product.color = request.form.get('color', '')
|
|
product.size = request.form.get('size', '')
|
|
product.material = request.form.get('material', '')
|
|
product.company = request.form.get('company', '')
|
|
weight = request.form.get('weight')
|
|
if weight:
|
|
try:
|
|
product.weight = float(weight)
|
|
except ValueError:
|
|
product.weight = None
|
|
else:
|
|
product.weight = None
|
|
|
|
product.dimensions = request.form.get('dimensions', '')
|
|
|
|
category_id = request.form.get('category_id')
|
|
if category_id:
|
|
product.category_id = category_id
|
|
|
|
if 'images[]' in request.files:
|
|
files = request.files.getlist('images[]')
|
|
for i, file in enumerate(files):
|
|
if file and file.filename and allowed_file(file.filename):
|
|
upload_dir = 'static/uploads/products'
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
filename = secure_filename(file.filename)
|
|
name_part, ext = os.path.splitext(filename)
|
|
unique_filename = f"{name_part}_{product.id}_{len(product.images) + i}{ext}"
|
|
file_path = os.path.join(upload_dir, unique_filename)
|
|
file.save(file_path)
|
|
product_image = ProductImage(
|
|
product_id=product.id,
|
|
filename=unique_filename,
|
|
display_order=len(product.images) + i
|
|
)
|
|
db.session.add(product_image)
|
|
|
|
if 'deleted_images' in request.form:
|
|
deleted_images_str = request.form['deleted_images']
|
|
if deleted_images_str:
|
|
try:
|
|
deleted_ids = json.loads(deleted_images_str)
|
|
for img_id in deleted_ids:
|
|
image = ProductImage.query.get(img_id)
|
|
if image and image.product_id == product.id:
|
|
|
|
file_path = os.path.join(
|
|
'static/uploads/products', image.filename)
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
db.session.delete(image)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
|
|
if 'primary_image' in request.form:
|
|
primary_id = request.form['primary_image']
|
|
if primary_id:
|
|
try:
|
|
primary_id = int(primary_id)
|
|
for img in product.images:
|
|
img.is_primary = (img.id == primary_id)
|
|
except ValueError:
|
|
pass
|
|
|
|
db.session.commit()
|
|
flash("Product updated successfully!", "success")
|
|
return redirect(url_for('admin.dashboard'))
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
flash(f"Error updating product: {str(e)}", "danger")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
categories = get_categories()
|
|
return render_template('admin/edit_product.html',
|
|
product=product,
|
|
categories=categories,
|
|
sizes=SizeEnum.ALL)
|
|
|
|
@admin_bp.route('/delete/<int:id>', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_product(id):
|
|
product = Product.query.get_or_404(id)
|
|
try:
|
|
for image in product.images:
|
|
file_path = os.path.join('static/uploads/products', image.filename)
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
|
|
db.session.delete(product)
|
|
db.session.commit()
|
|
flash("Product deleted successfully!", "info")
|
|
except Exception as e:
|
|
flash(f"Error deleting product: {str(e)}", "danger")
|
|
|
|
return redirect(url_for('admin.dashboard'))
|
|
|
|
@admin_bp.route('/image/delete/<int:id>', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_image(id):
|
|
try:
|
|
image = ProductImage.query.get_or_404(id)
|
|
|
|
file_path = os.path.join('static/uploads/products', image.filename)
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
|
|
db.session.delete(image)
|
|
db.session.commit()
|
|
|
|
return jsonify({'success': True})
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
@admin_bp.route('/reviews')
|
|
@login_required
|
|
@admin_required
|
|
def manage_reviews():
|
|
reviews = Review.query.order_by(Review.created_at.desc()).all()
|
|
return render_template('admin/reviews.html', reviews=reviews)
|
|
|
|
@admin_bp.route('/review/delete/<int:id>', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_review(id):
|
|
review = Review.query.get_or_404(id)
|
|
try:
|
|
db.session.delete(review)
|
|
db.session.commit()
|
|
flash("Review deleted successfully!", "info")
|
|
except Exception as e:
|
|
flash(f"Error deleting review: {str(e)}", "danger")
|
|
|
|
return redirect(url_for('admin.manage_reviews'))
|
|
|
|
@admin_bp.route('/categories')
|
|
@login_required
|
|
@admin_required
|
|
def manage_categories():
|
|
categories = Category.query.order_by(Category.name.asc()).all()
|
|
return render_template('admin/categories.html', categories=categories)
|
|
|
|
@admin_bp.route('/category/add', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def add_category():
|
|
name = request.form.get('name', '').strip()
|
|
if not name:
|
|
flash("Category name cannot be empty", "danger")
|
|
return redirect(url_for('admin.manage_categories'))
|
|
|
|
existing = Category.query.filter_by(name=name).first()
|
|
if existing:
|
|
flash(f"Category '{name}' already exists", "warning")
|
|
return redirect(url_for('admin.manage_categories'))
|
|
|
|
try:
|
|
category = Category(name=name)
|
|
db.session.add(category)
|
|
db.session.commit()
|
|
flash(f"Category '{name}' added successfully!", "success")
|
|
except Exception as e:
|
|
flash(f"Error adding category: {str(e)}", "danger")
|
|
|
|
return redirect(url_for('admin.manage_categories'))
|
|
|
|
@admin_bp.route('/category/delete/<int:id>', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_category(id):
|
|
category = Category.query.get_or_404(id)
|
|
if category.products:
|
|
flash(
|
|
f"Cannot delete category '{category.name}' because it has {len(category.products)} product(s)", "danger")
|
|
return redirect(url_for('admin.manage_categories'))
|
|
|
|
try:
|
|
db.session.delete(category)
|
|
db.session.commit()
|
|
flash(f"Category '{category.name}' deleted successfully!", "info")
|
|
except Exception as e:
|
|
flash(f"Error deleting category: {str(e)}", "danger")
|
|
|
|
return redirect(url_for('admin.manage_categories'))
|
|
|
|
@admin_bp.route('/stats')
|
|
@login_required
|
|
@admin_required
|
|
def stats():
|
|
stats_data = get_admin_stats()
|
|
return render_template('admin/stats.html', stats=stats_data)
|
|
|
|
@admin_bp.route('/users')
|
|
@login_required
|
|
@admin_required
|
|
def manage_users():
|
|
users = User.query.order_by(User.id.desc()).all()
|
|
return render_template('admin/users.html', users=users)
|
|
|
|
@admin_bp.route('/user/toggle_admin/<int:id>', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def toggle_user_admin(id):
|
|
user = User.query.get_or_404(id)
|
|
|
|
if user.id == current_user.id:
|
|
flash("You cannot remove admin privileges from yourself", "warning")
|
|
return redirect(url_for('admin.manage_users'))
|
|
|
|
try:
|
|
user.is_admin = not user.is_admin
|
|
db.session.commit()
|
|
status = "granted" if user.is_admin else "removed"
|
|
flash(f"Admin privileges {status} for {user.email}", "success")
|
|
except Exception as e:
|
|
flash(f"Error updating user: {str(e)}", "danger")
|
|
|
|
return redirect(url_for('admin.manage_users'))
|
|
|
|
@admin_bp.route('/user/delete/<int:id>', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_user(id):
|
|
user = User.query.get_or_404(id)
|
|
|
|
if user.id == current_user.id:
|
|
flash("You cannot delete your own account", "warning")
|
|
return redirect(url_for('admin.manage_users'))
|
|
|
|
try:
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
flash(f"User {user.email} deleted successfully", "info")
|
|
except Exception as e:
|
|
flash(f"Error deleting user: {str(e)}", "danger")
|
|
|
|
return redirect(url_for('admin.manage_users'))
|
|
|
|
@admin_bp.route('/orders')
|
|
@login_required
|
|
@admin_required
|
|
def manage_orders():
|
|
orders = Order.query.order_by(Order.created_at.desc()).all()
|
|
return render_template('admin/orders.html', orders=orders)
|
|
|
|
@admin_bp.route('/order/<int:id>')
|
|
@login_required
|
|
@admin_required
|
|
def order_detail(id):
|
|
order = Order.query.get_or_404(id)
|
|
return render_template('admin/order_detail.html', order=order)
|
|
|
|
@admin_bp.route('/order/update_status/<int:id>', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def update_order_status(id):
|
|
order = Order.query.get_or_404(id)
|
|
new_status = request.form.get('status')
|
|
|
|
if new_status in ['pending', 'processing', 'shipped', 'delivered', 'cancelled']:
|
|
try:
|
|
order.status = new_status
|
|
db.session.commit()
|
|
flash(f"Order #{order.id} status updated to {new_status}", "success")
|
|
except Exception as e:
|
|
flash(f"Error updating order: {str(e)}", "danger")
|
|
else:
|
|
flash("Invalid status", "danger")
|
|
|
|
return redirect(url_for('admin.order_detail', id=id))
|
|
|
|
@admin_bp.route('/quick/restock_low')
|
|
@login_required
|
|
@admin_required
|
|
def quick_restock_low():
|
|
"""Quick action to restock low inventory items"""
|
|
try:
|
|
low_stock_products = Product.query.filter(
|
|
Product.stock > 0,
|
|
Product.stock <= 5
|
|
).all()
|
|
|
|
count = 0
|
|
for product in low_stock_products:
|
|
product.stock += 10
|
|
count += 1
|
|
|
|
if count > 0:
|
|
db.session.commit()
|
|
flash(f"Restocked {count} low inventory products", "success")
|
|
else:
|
|
flash("No low inventory products found", "info")
|
|
|
|
except Exception as e:
|
|
flash(f"Error restocking products: {str(e)}", "danger")
|
|
|
|
return redirect(url_for('admin.dashboard'))
|
|
|
|
@admin_bp.route('/quick/update_prices', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def quick_update_prices():
|
|
"""Quick action to update prices by percentage"""
|
|
try:
|
|
percentage = float(request.form.get('percentage', 0))
|
|
|
|
if percentage != 0:
|
|
|
|
multiplier = 1 + (percentage / 100)
|
|
|
|
products = Product.query.all()
|
|
for product in products:
|
|
product.price = round(product.price * multiplier, 2)
|
|
|
|
db.session.commit()
|
|
|
|
action = "increased" if percentage > 0 else "decreased"
|
|
flash(
|
|
f"Prices {action} by {abs(percentage)}% for all products", "success")
|
|
else:
|
|
flash("No price change specified", "warning")
|
|
|
|
except Exception as e:
|
|
flash(f"Error updating prices: {str(e)}", "danger")
|
|
|
|
return redirect(url_for('admin.dashboard'))
|