2025-06-09 17:53:19 +08:00

445 lines
20 KiB
Python

# myapp/auth/auth_routes.py
import datetime
import jwt # For encoding JWT tokens
import logging
from flask import request, jsonify, current_app, has_app_context # Flask utilities
from werkzeug.security import generate_password_hash, check_password_hash # For hashing and checking passwords
from bson.objectid import ObjectId, InvalidId # For converting string IDs to MongoDB ObjectId
from functools import wraps # Import wraps for dummy decorator
# --- Local Blueprint Import (Moved to Top) ---
# Import the 'bp' instance defined in the local __init__.py FIRST
# This often helps resolve circular import issues involving blueprints and utilities/models.
from . import bp
# --- Shared Utilities Import ---
# Import the token_required decorator from the utils module
try:
# Assumes utils.py is in the parent 'myapp' package
from ..utils import token_required
except ImportError as e:
# Fallback or error handling if the decorator isn't found
print("Warning: token_required decorator not found in auth/auth_routes.py. Protected routes will fail.")
print(e)
# Define a dummy decorator to prevent NameError, but it won't protect routes
def token_required(f):
@wraps(f)
def wrapper(*args, **kwargs):
print("ERROR: token_required decorator is not available!")
return jsonify({"message": "Server configuration error: Missing authentication utility."}), 500
return wrapper
# --- Schema Imports ---
try:
# Import the relevant schemas defined in schemas.py
from ..schemas import UserRegistrationSchema, UserLoginSchema, UserSchema, UserUpdateSchema
from marshmallow import ValidationError
except ImportError:
print("Warning: Could not import User schemas or ValidationError in auth/auth_routes.py.")
UserRegistrationSchema = None
UserLoginSchema = None
UserSchema = None
UserUpdateSchema = None
ValidationError = None
# --- Shared Extensions Import ---
# Import mongo for direct use (alternative to current_app.mongo)
try:
from ..extensions import mongo
except ImportError:
print("Warning: Could not import mongo extension in auth/auth_routes.py.")
mongo = None
# --- Helper to get logger safely ---
def _get_logger():
if has_app_context():
return current_app.logger
return logging.getLogger(__name__)
# Note: Routes use paths relative to the '/api/auth' prefix defined in __init__.py.
@bp.route('/register', methods=['POST'])
def register():
"""
Register a new user.
Uses UserRegistrationSchema for input validation.
Expects 'username', 'email', 'password' in JSON payload.
Checks for existing username/email. Hashes password. Stores user.
Returns a JWT token and serialized user info (using UserSchema) upon success.
"""
logger = _get_logger()
# Check dependencies
if not mongo: return jsonify({"message": "Database connection not available."}), 500
if not UserRegistrationSchema or not UserSchema or not ValidationError:
return jsonify({"message": "Server configuration error: Schema unavailable."}), 500
# Get and validate JSON data using the schema
json_data = request.get_json() or {}
schema = UserRegistrationSchema()
try:
validated_data = schema.load(json_data)
except ValidationError as err:
logger.warning(f"Registration validation failed: {err.messages}")
return jsonify(err.messages), 422 # Return validation errors
# Extract validated data
username = validated_data['username']
email = validated_data['email']
password = validated_data['password'] # Raw password (load_only)
try:
db = mongo.db # Use imported mongo instance's db attribute
# Check if username or email already exists
if db.users.find_one({"username": username}):
return jsonify({"message": "Username already exists."}), 409 # 409 Conflict
if db.users.find_one({"email": email}):
return jsonify({"message": "Email already registered."}), 409 # 409 Conflict
except AttributeError:
logger.error("PyMongo extension not initialized or db attribute missing.")
return jsonify({"message": "Database configuration error."}), 500
except Exception as e:
logger.error(f"Database error checking existing user: {e}", exc_info=True)
return jsonify({"message": "Database error during registration check."}), 500
# Hash the password before storing
hashed_pw = generate_password_hash(password)
# Create the new user document
now = datetime.datetime.now(datetime.timezone.utc) # Use timezone-aware UTC time
new_user_doc = {
"username": username,
"email": email,
"password": hashed_pw, # Store the hashed password
"createdAt": now,
"updatedAt": now
}
# Insert the new user into the database
try:
result = db.users.insert_one(new_user_doc)
user_id = result.inserted_id # This is an ObjectId
# Fetch the created user document to serialize it
created_user = db.users.find_one({"_id": user_id})
if not created_user: # Should not happen, but check
logger.error(f"Failed to retrieve user immediately after insertion: {user_id}")
# Don't fail the whole registration, maybe just log and proceed without user data in response
created_user = {"_id": user_id, "username": username, "email": email} # Construct manually if needed
except Exception as e:
logger.error(f"Error inserting new user: {e}", exc_info=True)
return jsonify({"message": "An error occurred during registration."}), 500
# Generate JWT token using settings from app config
try:
secret_key = current_app.config['SECRET_KEY']
algo = current_app.config.get('JWT_ALGORITHM', 'HS256')
exp_hours = current_app.config.get('JWT_EXP_DELTA_HOURS', 24)
token_payload = {
"user_id": str(user_id), # Convert ObjectId to string for JWT payload
"exp": datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=exp_hours)
}
token = jwt.encode(token_payload, secret_key, algorithm=algo)
except KeyError:
logger.error("SECRET_KEY not configured in Flask app for JWT.")
return jsonify({"message": "Server configuration error: JWT secret missing."}), 500
except Exception as e:
logger.error(f"Error encoding JWT during registration: {e}", exc_info=True)
return jsonify({"message": "Could not generate authentication token."}), 500
# Serialize the created user data using UserSchema (excludes password)
output_schema = UserSchema()
serialized_user = output_schema.dump(created_user)
# Return success response with token and serialized user info
return jsonify({
"message": "User registered successfully.",
"token": token,
"user": serialized_user # Return user object instead of just id
}), 201 # 201 Created
@bp.route('/login', methods=['POST'])
def login():
"""
Log in an existing user.
Uses UserLoginSchema for input validation.
Expects 'username' and 'password' in JSON payload.
Verifies credentials against the database.
Returns a JWT token and serialized user info (using UserSchema) upon success.
"""
logger = _get_logger()
# Check dependencies
if not mongo: return jsonify({"message": "Database connection not available."}), 500
if not UserLoginSchema or not UserSchema or not ValidationError:
return jsonify({"message": "Server configuration error: Schema unavailable."}), 500
# Get and validate JSON data using the schema
json_data = request.get_json() or {}
schema = UserLoginSchema()
try:
validated_data = schema.load(json_data)
except ValidationError as err:
logger.warning(f"Login validation failed: {err.messages}")
return jsonify(err.messages), 422
username = validated_data['username']
password = validated_data['password'] # Raw password (load_only)
# Access the database
try:
db = mongo.db
if db is None: raise AttributeError("db attribute is None")
# Find user by username
user_doc = db.users.find_one({"username": username})
except AttributeError:
logger.error("PyMongo extension not initialized or attached correctly during login.")
return jsonify({"message": "Database configuration error."}), 500
except Exception as e:
logger.error(f"Database error during login for user {username}: {e}", exc_info=True)
return jsonify({"message": "An error occurred during login."}), 500
# Check if user exists and if the password hash matches
if not user_doc or 'password' not in user_doc or not check_password_hash(user_doc["password"], password):
return jsonify({"message": "Invalid credentials."}), 401 # Use 401 for authentication failure
# Generate JWT token using settings from app config
try:
user_id = user_doc["_id"] # Get ObjectId
secret_key = current_app.config['SECRET_KEY']
algo = current_app.config.get('JWT_ALGORITHM', 'HS256')
exp_hours = current_app.config.get('JWT_EXP_DELTA_HOURS', 24)
token_payload = {
"user_id": str(user_id), # Convert ObjectId to string
"exp": datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=exp_hours)
}
token = jwt.encode(token_payload, secret_key, algorithm=algo)
except KeyError:
logger.error("SECRET_KEY not configured in Flask app for JWT.")
return jsonify({"message": "Server configuration error: JWT secret missing."}), 500
except Exception as e:
logger.error(f"Error encoding JWT for user {username}: {e}", exc_info=True)
return jsonify({"message": "Could not generate authentication token."}), 500
# Serialize the user data using UserSchema (excludes password)
output_schema = UserSchema()
serialized_user = output_schema.dump(user_doc)
# Return success response with token and serialized user info
return jsonify({
"message": "Login successful.",
"token": token,
"user": serialized_user # Return user object instead of just id
}), 200
@bp.route('/delete_account', methods=['DELETE'])
@token_required # Apply the decorator to protect the route and inject 'current_user'
def delete_account(current_user):
"""
Delete the account of the currently authenticated user (identified by token).
Also handles associated data like projects and URLs.
Requires a valid JWT token.
(No schema needed for input/output here)
"""
logger = _get_logger()
# Validate user object from token
if not current_user or not current_user.get("_id"):
return jsonify({"message": "Internal authorization error."}), 500
try:
user_id_str = str(current_user.get("user_id") or current_user.get("_id"))
if not user_id_str:
return jsonify({"message": "Invalid token or user information not found in token."}), 401
user_id = ObjectId(user_id_str) # Convert string ID back to ObjectId
except (InvalidId, TypeError) as e:
logger.error(f"User ID conversion error in delete_account from token data {current_user}: {e}")
return jsonify({"message": "Invalid user ID format in token."}), 400
if not mongo: return jsonify({"message": "Database connection not available."}), 500
try:
db = mongo.db
# --- Data handling logic (remains the same) ---
# [ Deletion logic for user, projects, urls, activity, dialogs ]
# 1. Delete the user document itself
user_result = db.users.delete_one({"_id": user_id})
# 2. Remove user from collaborator lists in projects they didn't own
db.projects.update_many(
{"ownerId": {"$ne": user_id}, "collaborators": user_id},
{"$pull": {"collaborators": user_id}}
)
# 3. Handle projects owned by the user
owned_projects_cursor = db.projects.find({"ownerId": user_id}, {"_id": 1, "collaborators": 1})
project_ids_to_delete = []
projects_to_reassign = []
for project in owned_projects_cursor:
project_id = project["_id"]
collaborators = [collab_id for collab_id in project.get("collaborators", []) if collab_id != user_id]
if collaborators:
new_owner = collaborators[0]
projects_to_reassign.append({
"filter": {"_id": project_id},
"update": {
"$set": {"ownerId": new_owner, "lastActivityBy": new_owner},
"$pull": {"collaborators": new_owner}
}
})
else:
project_ids_to_delete.append(project_id)
if projects_to_reassign:
for reassignment in projects_to_reassign:
db.projects.update_one(reassignment["filter"], reassignment["update"])
logger.info(f"Reassigned ownership for {len(projects_to_reassign)} projects previously owned by {user_id_str}")
if project_ids_to_delete:
delete_owned_projects_result = db.projects.delete_many({"_id": {"$in": project_ids_to_delete}})
logger.info(f"Deleted {delete_owned_projects_result.deleted_count} projects owned by {user_id_str} with no remaining collaborators.")
# Cascade deletes
delete_urls_result = db.urls.delete_many({"projectId": {"$in": project_ids_to_delete}})
logger.info(f"Deleted {delete_urls_result.deleted_count} URLs for deleted projects of user {user_id_str}")
delete_activity_result = db.project_activity.delete_many({"projectId": {"$in": project_ids_to_delete}})
logger.info(f"Deleted {delete_activity_result.deleted_count} activity logs for deleted projects of user {user_id_str}")
delete_dialog_result = db.dialog_activity.delete_many({"projectId": {"$in": project_ids_to_delete}})
logger.info(f"Deleted {delete_dialog_result.deleted_count} dialog sessions for deleted projects of user {user_id_str}")
# --- End data handling logic ---
if user_result.deleted_count == 1:
return jsonify({"message": "Account and associated data handled successfully."}), 200
elif user_result.deleted_count == 0:
return jsonify({"message": "User not found or already deleted."}), 404
else:
logger.warning(f"Unexpected deleted_count ({user_result.deleted_count}) for user {user_id}")
return jsonify({"message": "An issue occurred during account deletion."}), 500
except AttributeError:
logger.error("PyMongo extension not initialized or attached correctly.")
return jsonify({"message": "Database configuration error."}), 500
except Exception as e:
logger.error(f"Error during account deletion for user {user_id_str}: {e}", exc_info=True)
return jsonify({"message": "An internal error occurred during account deletion."}), 500
@bp.route('/logout', methods=['POST'])
@token_required # Ensures only logged-in users can call logout (though it's stateless)
def logout(current_user):
"""
Logs out a user (stateless JWT). Client is responsible for discarding the token.
(No schema needed for input/output here)
"""
return jsonify({"message": "Logout successful. Please discard your token."}), 200
@bp.route('/account', methods=['PUT'])
@token_required # Protect the route and get user info from token
def update_account(current_user):
"""
Update the authenticated user's username, email, and/or password.
Uses UserUpdateSchema for input validation.
Expects JSON payload with optional 'username', 'email', 'password' fields.
(Returns simple message, no schema needed for output)
"""
logger = _get_logger()
# Validate user object from token
if not current_user or not current_user.get("_id"):
return jsonify({"message": "Internal authorization error."}), 500
try:
user_id_str = str(current_user.get("_id") or current_user.get("user_id"))
if not user_id_str:
return jsonify({"message": "User ID not found in token."}), 401
user_id = ObjectId(user_id_str)
except (InvalidId, TypeError) as e:
logger.error(f"User ID conversion error from token ({current_user}) in update_account: {e}")
return jsonify({"message": "Invalid user ID format in token."}), 400
# Check dependencies
if not mongo: return jsonify({"message": "Database connection not available."}), 500
if not UserUpdateSchema or not ValidationError:
return jsonify({"message": "Server configuration error: Schema unavailable."}), 500
# Get and validate JSON data using the schema
json_data = request.get_json() or {}
schema = UserUpdateSchema()
try:
# Load validates optional fields based on schema rules
validated_data = schema.load(json_data)
except ValidationError as err:
logger.warning(f"Update account validation failed: {err.messages}")
return jsonify(err.messages), 422
# If validation passed but no valid fields were provided
if not validated_data:
return jsonify({"message": "No valid update fields provided (username, email, or password)."}), 400
db = mongo.db
update_fields = {} # Dictionary to hold fields to be updated
db_validation_errors = {} # Store potential db-level validation errors (like uniqueness)
# --- Validate uniqueness and prepare updates based on validated_data ---
try:
# Check username uniqueness if provided and validated
if "username" in validated_data:
new_username = validated_data["username"]
if db.users.find_one({"username": new_username, "_id": {"$ne": user_id}}):
db_validation_errors["username"] = "Username is already taken."
else:
update_fields["username"] = new_username
# Check email uniqueness if provided and validated
if "email" in validated_data:
new_email = validated_data["email"]
if db.users.find_one({"email": new_email, "_id": {"$ne": user_id}}):
db_validation_errors["email"] = "Email is already registered by another user."
else:
update_fields["email"] = new_email
# Hash password if provided and validated
if "password" in validated_data:
update_fields["password"] = generate_password_hash(validated_data["password"])
except AttributeError:
logger.error("PyMongo extension not initialized or attached correctly during validation.")
return jsonify({"message": "Database configuration error."}), 500
except Exception as e:
logger.error(f"Error during database validation for user {user_id}: {e}", exc_info=True)
return jsonify({"message": "An error occurred during data validation."}), 500
# If database validation errors occurred (e.g., uniqueness checks)
if db_validation_errors:
return jsonify({"message": "Validation errors occurred.", "errors": db_validation_errors}), 409 # 409 Conflict
# If there are fields to update, add the timestamp and perform the update
if update_fields:
update_fields["updatedAt"] = datetime.datetime.now(datetime.timezone.utc)
try:
result = db.users.update_one({"_id": user_id}, {"$set": update_fields})
if result.matched_count == 0:
# This case means the user_id from the token doesn't exist in the DB anymore
return jsonify({"message": "User not found."}), 404
# modified_count might be 0 if the provided data was the same as existing data
# We consider it a success even if no fields were technically modified
return jsonify({"message": "Account updated successfully."}), 200
except AttributeError:
logger.error("PyMongo extension not initialized or attached correctly during update.")
return jsonify({"message": "Database configuration error."}), 500
except Exception as e:
logger.error(f"Error updating account for user {user_id}: {e}", exc_info=True)
return jsonify({"message": "An error occurred while updating the account."}), 500
else:
# This case should ideally not be reached due to the checks at the beginning,
# but included for completeness if validation passed with no update fields.
return jsonify({"message": "No changes were requested or fields were invalid."}), 400