Added Repository

This commit is contained in:
2026-01-30 14:02:35 +03:30
parent 8917e625a5
commit dbc8f70b4a
53 changed files with 7758 additions and 2 deletions

526
routes/admin.py Normal file
View File

@@ -0,0 +1,526 @@
from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify, current_app
from flask_login import login_required, current_user
from models import db, Product, Category, ProductImage, Review, SizeEnum, User, Order, OrderItem
import os
from werkzeug.utils import secure_filename
import json
from functools import wraps
from datetime import datetime
from sqlalchemy import func
admin_bp = Blueprint('admin', __name__, url_prefix='/admin')
def admin_required(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not current_user.is_authenticated:
flash("Please login to access admin area", "danger")
return redirect(url_for('auth.login'))
if not current_user.is_admin:
flash("Admin access only!", "danger")
return redirect(url_for('main.home'))
return func(*args, **kwargs)
return wrapper
def allowed_file(filename):
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def get_or_create_default_category():
default = Category.query.filter_by(name='Uncategorized').first()
if not default:
default = Category(name='Uncategorized')
db.session.add(default)
db.session.commit()
return default
def get_categories():
categories = Category.query.all()
if not categories:
default_cat = get_or_create_default_category()
categories = [default_cat]
return categories
def get_admin_stats():
try:
total_revenue_result = db.session.query(
func.sum(OrderItem.price_at_purchase * OrderItem.quantity)
).join(Order).filter(Order.status.in_(['delivered', 'completed'])).scalar()
total_revenue = float(
total_revenue_result) if total_revenue_result else 0.0
except:
total_revenue = 0.0
stats = {
'total_products': Product.query.count(),
'total_categories': Category.query.count(),
'total_reviews': Review.query.count(),
'total_orders': Order.query.count() if hasattr(Order, 'query') else 0,
'total_users': User.query.count() if hasattr(User, 'query') else 1,
'total_revenue': total_revenue,
'low_stock': Product.query.filter(Product.stock > 0, Product.stock <= 5).count(),
'out_of_stock': Product.query.filter(Product.stock == 0).count(),
'total_value': db.session.query(func.sum(Product.price * Product.stock)).scalar() or 0,
}
return stats
@admin_bp.route('/')
@login_required
@admin_required
def dashboard():
page = request.args.get('page', 1, type=int)
per_page = 20
products = Product.query.order_by(
Product.id.desc()).paginate(page=page, per_page=per_page)
stats = get_admin_stats()
return render_template('admin/dashboard.html',
products=products,
stats=stats,
SizeEnum=SizeEnum)
@admin_bp.route('/add', methods=['GET', 'POST'])
@login_required
@admin_required
def add_product():
if request.method == 'POST':
try:
name = request.form['name']
description = request.form.get('description', '')
price = float(request.form['price'])
stock = int(request.form.get('stock', 0))
sku = request.form.get('sku', '')
color = request.form.get('color', '')
size = request.form.get('size', '')
product_type = request.form.get('product_type', '')
material = request.form.get('material', '')
company = request.form.get('company', '')
weight = request.form.get('weight')
dimensions = request.form.get('dimensions', '')
if weight:
try:
weight = float(weight)
except ValueError:
weight = None
else:
weight = None
category_id = request.form.get('category_id')
if not category_id:
category = get_or_create_default_category()
category_id = category.id
product = Product(
name=name,
description=description,
price=price,
stock=stock,
sku=sku,
color=color,
size=size,
material=material,
company=company,
weight=weight,
dimensions=dimensions,
category_id=category_id
)
db.session.add(product)
db.session.flush()
if 'images[]' in request.files:
files = request.files.getlist('images[]')
for i, file in enumerate(files):
if file and file.filename and allowed_file(file.filename):
upload_dir = 'static/uploads/products'
os.makedirs(upload_dir, exist_ok=True)
filename = secure_filename(file.filename)
name_part, ext = os.path.splitext(filename)
unique_filename = f"{name_part}_{product.id}_{i}{ext}"
file_path = os.path.join(upload_dir, unique_filename)
file.save(file_path)
product_image = ProductImage(
product_id=product.id,
filename=unique_filename,
is_primary=(i == 0),
display_order=i
)
db.session.add(product_image)
db.session.commit()
flash(f"Product '{name}' added successfully!", "success")
return redirect(url_for('admin.dashboard'))
except Exception as e:
db.session.rollback()
flash(f"Error adding product: {str(e)}", "danger")
import traceback
traceback.print_exc()
categories = get_categories()
return render_template('admin/add_product.html',
categories=categories,
sizes=SizeEnum.ALL)
@admin_bp.route('/edit/<int:id>', methods=['GET', 'POST'])
@login_required
@admin_required
def edit_product(id):
product = Product.query.get_or_404(id)
if request.method == 'POST':
try:
product.name = request.form['name']
product.description = request.form.get('description', '')
product.price = float(request.form['price'])
product.stock = int(request.form.get('stock', 0))
product.sku = request.form.get('sku', '')
product.color = request.form.get('color', '')
product.size = request.form.get('size', '')
product.material = request.form.get('material', '')
product.company = request.form.get('company', '')
weight = request.form.get('weight')
if weight:
try:
product.weight = float(weight)
except ValueError:
product.weight = None
else:
product.weight = None
product.dimensions = request.form.get('dimensions', '')
category_id = request.form.get('category_id')
if category_id:
product.category_id = category_id
if 'images[]' in request.files:
files = request.files.getlist('images[]')
for i, file in enumerate(files):
if file and file.filename and allowed_file(file.filename):
upload_dir = 'static/uploads/products'
os.makedirs(upload_dir, exist_ok=True)
filename = secure_filename(file.filename)
name_part, ext = os.path.splitext(filename)
unique_filename = f"{name_part}_{product.id}_{len(product.images) + i}{ext}"
file_path = os.path.join(upload_dir, unique_filename)
file.save(file_path)
product_image = ProductImage(
product_id=product.id,
filename=unique_filename,
display_order=len(product.images) + i
)
db.session.add(product_image)
if 'deleted_images' in request.form:
deleted_images_str = request.form['deleted_images']
if deleted_images_str:
try:
deleted_ids = json.loads(deleted_images_str)
for img_id in deleted_ids:
image = ProductImage.query.get(img_id)
if image and image.product_id == product.id:
file_path = os.path.join(
'static/uploads/products', image.filename)
if os.path.exists(file_path):
os.remove(file_path)
db.session.delete(image)
except json.JSONDecodeError:
pass
if 'primary_image' in request.form:
primary_id = request.form['primary_image']
if primary_id:
try:
primary_id = int(primary_id)
for img in product.images:
img.is_primary = (img.id == primary_id)
except ValueError:
pass
db.session.commit()
flash("Product updated successfully!", "success")
return redirect(url_for('admin.dashboard'))
except Exception as e:
db.session.rollback()
flash(f"Error updating product: {str(e)}", "danger")
import traceback
traceback.print_exc()
categories = get_categories()
return render_template('admin/edit_product.html',
product=product,
categories=categories,
sizes=SizeEnum.ALL)
@admin_bp.route('/delete/<int:id>', methods=['POST'])
@login_required
@admin_required
def delete_product(id):
product = Product.query.get_or_404(id)
try:
for image in product.images:
file_path = os.path.join('static/uploads/products', image.filename)
if os.path.exists(file_path):
os.remove(file_path)
db.session.delete(product)
db.session.commit()
flash("Product deleted successfully!", "info")
except Exception as e:
flash(f"Error deleting product: {str(e)}", "danger")
return redirect(url_for('admin.dashboard'))
@admin_bp.route('/image/delete/<int:id>', methods=['POST'])
@login_required
@admin_required
def delete_image(id):
try:
image = ProductImage.query.get_or_404(id)
file_path = os.path.join('static/uploads/products', image.filename)
if os.path.exists(file_path):
os.remove(file_path)
db.session.delete(image)
db.session.commit()
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@admin_bp.route('/reviews')
@login_required
@admin_required
def manage_reviews():
reviews = Review.query.order_by(Review.created_at.desc()).all()
return render_template('admin/reviews.html', reviews=reviews)
@admin_bp.route('/review/delete/<int:id>', methods=['POST'])
@login_required
@admin_required
def delete_review(id):
review = Review.query.get_or_404(id)
try:
db.session.delete(review)
db.session.commit()
flash("Review deleted successfully!", "info")
except Exception as e:
flash(f"Error deleting review: {str(e)}", "danger")
return redirect(url_for('admin.manage_reviews'))
@admin_bp.route('/categories')
@login_required
@admin_required
def manage_categories():
categories = Category.query.order_by(Category.name.asc()).all()
return render_template('admin/categories.html', categories=categories)
@admin_bp.route('/category/add', methods=['POST'])
@login_required
@admin_required
def add_category():
name = request.form.get('name', '').strip()
if not name:
flash("Category name cannot be empty", "danger")
return redirect(url_for('admin.manage_categories'))
existing = Category.query.filter_by(name=name).first()
if existing:
flash(f"Category '{name}' already exists", "warning")
return redirect(url_for('admin.manage_categories'))
try:
category = Category(name=name)
db.session.add(category)
db.session.commit()
flash(f"Category '{name}' added successfully!", "success")
except Exception as e:
flash(f"Error adding category: {str(e)}", "danger")
return redirect(url_for('admin.manage_categories'))
@admin_bp.route('/category/delete/<int:id>', methods=['POST'])
@login_required
@admin_required
def delete_category(id):
category = Category.query.get_or_404(id)
if category.products:
flash(
f"Cannot delete category '{category.name}' because it has {len(category.products)} product(s)", "danger")
return redirect(url_for('admin.manage_categories'))
try:
db.session.delete(category)
db.session.commit()
flash(f"Category '{category.name}' deleted successfully!", "info")
except Exception as e:
flash(f"Error deleting category: {str(e)}", "danger")
return redirect(url_for('admin.manage_categories'))
@admin_bp.route('/stats')
@login_required
@admin_required
def stats():
stats_data = get_admin_stats()
return render_template('admin/stats.html', stats=stats_data)
@admin_bp.route('/users')
@login_required
@admin_required
def manage_users():
users = User.query.order_by(User.id.desc()).all()
return render_template('admin/users.html', users=users)
@admin_bp.route('/user/toggle_admin/<int:id>', methods=['POST'])
@login_required
@admin_required
def toggle_user_admin(id):
user = User.query.get_or_404(id)
if user.id == current_user.id:
flash("You cannot remove admin privileges from yourself", "warning")
return redirect(url_for('admin.manage_users'))
try:
user.is_admin = not user.is_admin
db.session.commit()
status = "granted" if user.is_admin else "removed"
flash(f"Admin privileges {status} for {user.email}", "success")
except Exception as e:
flash(f"Error updating user: {str(e)}", "danger")
return redirect(url_for('admin.manage_users'))
@admin_bp.route('/user/delete/<int:id>', methods=['POST'])
@login_required
@admin_required
def delete_user(id):
user = User.query.get_or_404(id)
if user.id == current_user.id:
flash("You cannot delete your own account", "warning")
return redirect(url_for('admin.manage_users'))
try:
db.session.delete(user)
db.session.commit()
flash(f"User {user.email} deleted successfully", "info")
except Exception as e:
flash(f"Error deleting user: {str(e)}", "danger")
return redirect(url_for('admin.manage_users'))
@admin_bp.route('/orders')
@login_required
@admin_required
def manage_orders():
orders = Order.query.order_by(Order.created_at.desc()).all()
return render_template('admin/orders.html', orders=orders)
@admin_bp.route('/order/<int:id>')
@login_required
@admin_required
def order_detail(id):
order = Order.query.get_or_404(id)
return render_template('admin/order_detail.html', order=order)
@admin_bp.route('/order/update_status/<int:id>', methods=['POST'])
@login_required
@admin_required
def update_order_status(id):
order = Order.query.get_or_404(id)
new_status = request.form.get('status')
if new_status in ['pending', 'processing', 'shipped', 'delivered', 'cancelled']:
try:
order.status = new_status
db.session.commit()
flash(f"Order #{order.id} status updated to {new_status}", "success")
except Exception as e:
flash(f"Error updating order: {str(e)}", "danger")
else:
flash("Invalid status", "danger")
return redirect(url_for('admin.order_detail', id=id))
@admin_bp.route('/quick/restock_low')
@login_required
@admin_required
def quick_restock_low():
"""Quick action to restock low inventory items"""
try:
low_stock_products = Product.query.filter(
Product.stock > 0,
Product.stock <= 5
).all()
count = 0
for product in low_stock_products:
product.stock += 10
count += 1
if count > 0:
db.session.commit()
flash(f"Restocked {count} low inventory products", "success")
else:
flash("No low inventory products found", "info")
except Exception as e:
flash(f"Error restocking products: {str(e)}", "danger")
return redirect(url_for('admin.dashboard'))
@admin_bp.route('/quick/update_prices', methods=['POST'])
@login_required
@admin_required
def quick_update_prices():
"""Quick action to update prices by percentage"""
try:
percentage = float(request.form.get('percentage', 0))
if percentage != 0:
multiplier = 1 + (percentage / 100)
products = Product.query.all()
for product in products:
product.price = round(product.price * multiplier, 2)
db.session.commit()
action = "increased" if percentage > 0 else "decreased"
flash(
f"Prices {action} by {abs(percentage)}% for all products", "success")
else:
flash("No price change specified", "warning")
except Exception as e:
flash(f"Error updating prices: {str(e)}", "danger")
return redirect(url_for('admin.dashboard'))