wearwell/app.py

553 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# app.py - COMPLETE VERSION
import os
from dotenv import load_dotenv
from flask import Flask, jsonify, request
import pymysql
import pymysql.cursors
from config import config
# Load environment variables at the start
load_dotenv()
app = Flask(__name__)
# Configure the app
env = os.getenv('FLASK_ENV', 'development')
app.config.from_object(config[env])
print(f"🔧 App configured for: {env}")
print(f"📡 Will run on port: {app.config['FLASK_PORT']}")
# Database configuration
def get_db_connection():
db_config = {
'host': app.config['DB_HOST'],
'user': app.config['DB_USER'],
'password': app.config['DB_PASSWORD'],
'database': app.config['DB_NAME'],
'port': app.config['DB_PORT'],
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
try:
return pymysql.connect(**db_config)
except pymysql.Error as e:
print(f"Database connection error: {e}")
return None
def init_users_table():
"""Create users table if it doesn't exist"""
conn = get_db_connection()
if conn:
try:
with conn.cursor() as cursor:
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(80) UNIQUE NOT NULL,
email VARCHAR(120) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
print("✅ Users table is ready")
except Exception as e:
print(f"Error creating users table: {e}")
finally:
conn.close()
# Initialize the users table when app starts
init_users_table()
# Routes
@app.route('/')
def hello():
return jsonify({
"message": "WearWell API is running!",
"port": app.config['FLASK_PORT'],
"endpoints": {
"health_check": "/health",
"all_users": "/users",
"get_user": "/users/<id>",
"add_user": "/users/add (POST)",
"delete_user": "/users/delete/<id> (DELETE)",
"web_interface": "/users/manage"
}
})
@app.route('/health')
def health_check():
conn = get_db_connection()
if conn:
conn.close()
return jsonify({"status": "healthy", "database": "connected"})
else:
return jsonify({"status": "unhealthy", "database": "disconnected"}), 500
# User Management Routes
@app.route('/users')
def get_all_users():
"""Get all users"""
conn = get_db_connection()
if not conn:
return jsonify({"error": "Database connection failed"}), 500
try:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM users ORDER BY created_at DESC")
users = cursor.fetchall()
return jsonify({"users": users, "count": len(users)})
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
conn.close()
@app.route('/users/<int:user_id>')
def get_user(user_id):
"""Get a specific user by ID"""
conn = get_db_connection()
if not conn:
return jsonify({"error": "Database connection failed"}), 500
try:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user = cursor.fetchone()
if user:
return jsonify({"user": user})
else:
return jsonify({"error": "User not found"}), 404
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
conn.close()
@app.route('/users/add', methods=['POST'])
def add_user():
"""Add a new user"""
conn = get_db_connection()
if not conn:
return jsonify({"error": "Database connection failed"}), 500
try:
# Check if request is JSON
if not request.is_json:
return jsonify({"error": "Content-Type must be application/json"}), 400
data = request.get_json()
if not data or not data.get('username') or not data.get('email'):
return jsonify({"error": "Username and email are required"}), 400
username = data['username'].strip()
email = data['email'].strip()
# Basic validation
if len(username) < 3:
return jsonify({"error": "Username must be at least 3 characters"}), 400
if '@' not in email:
return jsonify({"error": "Invalid email format"}), 400
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO users (username, email) VALUES (%s, %s)",
(username, email)
)
conn.commit()
user_id = cursor.lastrowid
return jsonify({
"message": "User added successfully",
"user_id": user_id,
"username": username,
"email": email
}), 201
except pymysql.IntegrityError as e:
if "username" in str(e):
return jsonify({"error": "Username already exists"}), 400
elif "email" in str(e):
return jsonify({"error": "Email already exists"}), 400
else:
return jsonify({"error": "Database integrity error"}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
conn.close()
@app.route('/users/delete/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
"""Delete a user by ID"""
conn = get_db_connection()
if not conn:
return jsonify({"error": "Database connection failed"}), 500
try:
with conn.cursor() as cursor:
# First check if user exists
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user = cursor.fetchone()
if not user:
return jsonify({"error": "User not found"}), 404
# Delete the user
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
conn.commit()
return jsonify({
"message": "User deleted successfully",
"deleted_user": {
"id": user_id,
"username": user['username'],
"email": user['email']
}
})
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
conn.close()
@app.route('/users/update/<int:user_id>', methods=['PUT'])
def update_user(user_id):
"""Update a user's information"""
conn = get_db_connection()
if not conn:
return jsonify({"error": "Database connection failed"}), 500
try:
if not request.is_json:
return jsonify({"error": "Content-Type must be application/json"}), 400
data = request.get_json()
if not data or (not data.get('username') and not data.get('email')):
return jsonify({"error": "At least username or email is required to update"}), 400
# Build update query dynamically based on provided fields
update_fields = []
values = []
if 'username' in data and data['username']:
update_fields.append("username = %s")
values.append(data['username'].strip())
if 'email' in data and data['email']:
update_fields.append("email = %s")
values.append(data['email'].strip())
values.append(user_id)
with conn.cursor() as cursor:
cursor.execute(
f"UPDATE users SET {', '.join(update_fields)} WHERE id = %s",
values
)
conn.commit()
if cursor.rowcount == 0:
return jsonify({"error": "User not found"}), 404
return jsonify({"message": "User updated successfully"})
except pymysql.IntegrityError as e:
if "username" in str(e):
return jsonify({"error": "Username already exists"}), 400
elif "email" in str(e):
return jsonify({"error": "Email already exists"}), 400
else:
return jsonify({"error": "Database integrity error"}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
conn.close()
# Web Interface
@app.route('/users/manage')
def manage_users():
"""Simple web interface to manage users"""
return '''
<!DOCTYPE html>
<html>
<head>
<title>User Management - WearWell</title>
<style>
body {
font-family: 'Arial', sans-serif;
margin: 0;
padding: 20px;
background-color: #f5f5f5;
}
.container {
max-width: 1000px;
margin: 0 auto;
background: white;
padding: 30px;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
h1 {
color: #333;
text-align: center;
margin-bottom: 30px;
}
.section {
margin: 30px 0;
padding: 25px;
border: 1px solid #e0e0e0;
border-radius: 8px;
background: #fafafa;
}
h2 {
color: #555;
border-bottom: 2px solid #007cba;
padding-bottom: 10px;
}
input, button {
margin: 8px;
padding: 12px;
border: 1px solid #ddd;
border-radius: 4px;
font-size: 14px;
}
input {
width: 200px;
}
button {
background: #007cba;
color: white;
border: none;
cursor: pointer;
padding: 12px 20px;
}
button:hover {
background: #005a87;
}
button.delete {
background: #dc3545;
}
button.delete:hover {
background: #c82333;
}
.user {
padding: 15px;
border-bottom: 1px solid #eee;
display: flex;
justify-content: space-between;
align-items: center;
}
.user:hover {
background: #f8f9fa;
}
.user-info {
flex-grow: 1;
}
.user-actions {
display: flex;
gap: 10px;
}
.success { color: #28a745; }
.error { color: #dc3545; }
.loading { color: #6c757d; }
#statusMessage {
padding: 10px;
margin: 10px 0;
border-radius: 4px;
text-align: center;
}
.success-message { background: #d4edda; color: #155724; }
.error-message { background: #f8d7da; color: #721c24; }
</style>
</head>
<body>
<div class="container">
<h1>👥 User Management - WearWell</h1>
<div class="section">
<h2> Add New User</h2>
<input type="text" id="username" placeholder="Username (min 3 chars)" required>
<input type="email" id="email" placeholder="Email address" required>
<button onclick="addUser()">Add User</button>
<div id="addStatus"></div>
</div>
<div class="section">
<h2>📋 Users List</h2>
<button onclick="loadUsers()">🔄 Refresh Users</button>
<div id="usersCount" class="loading">Loading user count...</div>
<div id="usersList"></div>
</div>
<div class="section">
<h2>🔧 API Testing</h2>
<p>Test the API endpoints directly:</p>
<button onclick="testGetUsers()">Test GET /users</button>
<button onclick="testHealth()">Test GET /health</button>
<pre id="apiResponse" style="background: #f8f9fa; padding: 15px; border-radius: 4px; margin-top: 10px; display: none;"></pre>
</div>
</div>
<script>
let currentUsers = [];
async function loadUsers() {
showStatus('Loading users...', 'loading');
try {
const response = await fetch('/users');
const data = await response.json();
if (response.ok) {
currentUsers = data.users;
const usersList = document.getElementById('usersList');
const usersCount = document.getElementById('usersCount');
usersCount.innerHTML = `<span class="success">📊 Total Users: ${data.count}</span>`;
usersList.innerHTML = '';
if (data.users.length === 0) {
usersList.innerHTML = '<div class="user">No users found. Add some users above!</div>';
return;
}
data.users.forEach(user => {
const userDiv = document.createElement('div');
userDiv.className = 'user';
userDiv.innerHTML = `
<div class="user-info">
<strong>ID: ${user.id}</strong> |
👤 ${user.username} |
📧 ${user.email} |
📅 ${new Date(user.created_at).toLocaleString()}
</div>
<div class="user-actions">
<button class="delete" onclick="deleteUser(${user.id})">🗑️ Delete</button>
</div>
`;
usersList.appendChild(userDiv);
});
showStatus('Users loaded successfully!', 'success');
} else {
showStatus('Error loading users: ' + data.error, 'error');
}
} catch (error) {
showStatus('Network error: ' + error.message, 'error');
}
}
async function addUser() {
const username = document.getElementById('username').value;
const email = document.getElementById('email').value;
const addStatus = document.getElementById('addStatus');
if (!username || !email) {
showStatus('Please fill in both username and email', 'error', addStatus);
return;
}
if (username.length < 3) {
showStatus('Username must be at least 3 characters', 'error', addStatus);
return;
}
showStatus('Adding user...', 'loading', addStatus);
try {
const response = await fetch('/users/add', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({username, email})
});
const data = await response.json();
if (response.ok) {
showStatus('✅ User added successfully! ID: ' + data.user_id, 'success', addStatus);
document.getElementById('username').value = '';
document.getElementById('email').value = '';
loadUsers();
} else {
showStatus('❌ Error: ' + data.error, 'error', addStatus);
}
} catch (error) {
showStatus('❌ Network error: ' + error.message, 'error', addStatus);
}
}
async function deleteUser(userId) {
if (!confirm('Are you sure you want to delete this user?')) {
return;
}
try {
const response = await fetch(`/users/delete/${userId}`, {
method: 'DELETE'
});
const data = await response.json();
if (response.ok) {
showStatus('✅ User deleted successfully!', 'success');
loadUsers();
} else {
showStatus('❌ Error: ' + data.error, 'error');
}
} catch (error) {
showStatus('❌ Network error: ' + error.message, 'error');
}
}
async function testGetUsers() {
try {
const response = await fetch('/users');
const data = await response.json();
document.getElementById('apiResponse').style.display = 'block';
document.getElementById('apiResponse').innerHTML = JSON.stringify(data, null, 2);
} catch (error) {
document.getElementById('apiResponse').style.display = 'block';
document.getElementById('apiResponse').innerHTML = 'Error: ' + error.message;
}
}
async function testHealth() {
try {
const response = await fetch('/health');
const data = await response.json();
document.getElementById('apiResponse').style.display = 'block';
document.getElementById('apiResponse').innerHTML = JSON.stringify(data, null, 2);
} catch (error) {
document.getElementById('apiResponse').style.display = 'block';
document.getElementById('apiResponse').innerHTML = 'Error: ' + error.message;
}
}
function showStatus(message, type, element = null) {
const targetElement = element || document.getElementById('statusMessage');
if (!targetElement) return;
targetElement.innerHTML = message;
targetElement.className = type + '-message';
targetElement.style.display = 'block';
if (type !== 'loading') {
setTimeout(() => {
targetElement.style.display = 'none';
}, 5000);
}
}
// Create status message element
const statusElement = document.createElement('div');
statusElement.id = 'statusMessage';
statusElement.style.display = 'none';
document.querySelector('.container').insertBefore(statusElement, document.querySelector('.container').firstChild);
// Load users when page loads
loadUsers();
</script>
</body>
</html>
'''