2025-06-09 17:53:19 +08:00

338 lines
15 KiB
Python

# myapp/ai_services/ai_routes.py
# This file handles API Key management logic.
import datetime
import logging
from flask import request, jsonify, current_app, has_app_context # Flask utilities
from bson.objectid import ObjectId, InvalidId # For MongoDB ObjectIds
from functools import wraps # Import wraps for dummy decorator
# --- Local Blueprint Import ---
from . import bp # Import the 'bp' instance defined in the local __init__.py
# --- Shared Extensions and Utilities Imports ---
try:
from ..extensions import mongo # Import the initialized PyMongo instance
from ..utils import token_required # Import the authentication decorator
except ImportError:
# Fallback or error handling if imports fail
print("Warning: Could not import mongo or token_required in ai_services/ai_routes.py.")
mongo = None
# Define a dummy decorator if token_required is missing
def token_required(f):
@wraps(f)
def wrapper(*args, **kwargs):
print("ERROR: token_required decorator is not available!")
return jsonify({"message": "Server configuration error: Missing authentication utility."}), 500
return wrapper
# --- Schema Imports ---
try:
# Import the relevant schemas defined in schemas.py
from ..schemas import APIKeyCreateSchema, APIKeyUpdateSchema, APIKeySchema
from marshmallow import ValidationError
except ImportError:
print("Warning: Could not import APIKey schemas or ValidationError in ai_services/ai_routes.py.")
APIKeyCreateSchema = None
APIKeyUpdateSchema = None
APIKeySchema = None
ValidationError = None # Define ValidationError as None if import fails
# --- Helper to get logger safely ---
def _get_logger():
if has_app_context():
return current_app.logger
return logging.getLogger(__name__)
# Note: Routes use paths relative to the '/api/ai' prefix.
# Original '/api_list' becomes '/keys'
# Original '/api_list/<api_id>' becomes '/keys/<api_id>'
@bp.route('/keys', methods=['GET']) # Path relative to blueprint prefix
@token_required
def list_api_keys(current_user):
"""
List all API keys belonging to the authenticated user.
Uses APIKeySchema for output serialization.
Fetches keys from the 'api_list' collection associated with the user's ID.
Sorts by update time descending.
"""
logger = _get_logger()
# Validate user object from token
if not current_user or not current_user.get("_id"):
return jsonify({"message": "Internal authorization error."}), 500
try:
user_id = ObjectId(current_user["_id"])
except (InvalidId, TypeError) as e:
logger.error(f"User ID conversion error in list_api_keys: {e}")
return jsonify({"message": "Invalid user ID format in token."}), 400
# Check dependencies
if not mongo: return jsonify({"message": "Database connection not available."}), 500
if not APIKeySchema: return jsonify({"message": "Server configuration error: Schema unavailable."}), 500
try:
# Find all documents in the 'api_list' collection for this user
db = mongo.db
cursor = db.api_list.find({"uid": user_id}).sort("updatedAt", -1)
api_key_docs = list(cursor) # Convert cursor to list
# --- Serialize results using the schema ---
output_schema = APIKeySchema(many=True)
# Schema handles ObjectId and datetime conversion, and field selection/exclusion
# NOTE: APIKeySchema currently dumps the full key. Consider masking in schema if needed.
serialized_result = output_schema.dump(api_key_docs)
return jsonify({"api_keys": serialized_result}), 200
except KeyError: # Should be caught by initial user_id check
logger.error(f"User ID (_id) not found in token payload for list_api_keys.")
return jsonify({"message": "Authentication token is invalid or missing user ID."}), 401
except AttributeError:
logger.error("PyMongo extension not initialized or attached correctly.")
return jsonify({"message": "Database configuration error."}), 500
except Exception as e:
logger.error(f"Error listing API keys for user {current_user.get('_id', 'UNKNOWN')}: {e}", exc_info=True)
return jsonify({"message": "An error occurred while listing API keys."}), 500
@bp.route('/keys', methods=['POST']) # Path relative to blueprint prefix
@token_required
def create_api_key(current_user):
"""
Create a new API key entry for the authenticated user.
Uses APIKeyCreateSchema for input validation.
Expects 'name', 'key', and optional 'selected' in JSON payload.
Prevents duplicate names per user.
"""
logger = _get_logger()
# Validate user object from token
if not current_user or not current_user.get("_id"):
return jsonify({"message": "Internal authorization error."}), 500
try:
user_id = ObjectId(current_user["_id"])
except (InvalidId, TypeError) as e:
logger.error(f"User ID conversion error in create_api_key: {e}")
return jsonify({"message": "Invalid user ID format in token."}), 400
# Check dependencies
if not mongo: return jsonify({"message": "Database connection not available."}), 500
if not APIKeyCreateSchema or not ValidationError:
return jsonify({"message": "Server configuration error: Schema unavailable."}), 500
# Get and validate JSON data using the schema
json_data = request.get_json() or {}
schema = APIKeyCreateSchema()
try:
validated_data = schema.load(json_data)
except ValidationError as err:
logger.warning(f"Create API key validation failed: {err.messages}")
return jsonify(err.messages), 422 # Return validation errors
# Extract validated data
name = validated_data['name']
api_key = validated_data['key']
selected = validated_data['selected'] # Schema provides default if missing
try:
# Check if an API key with the same name already exists for this user
db = mongo.db
existing = db.api_list.find_one({"uid": user_id, "name": name})
if existing:
# Return 409 Conflict status code for duplicates
return jsonify({"message": f"User already has an API key for {name}."}), 409
# --- Prepare and Insert Document ---
now = datetime.datetime.now(datetime.timezone.utc) # Use timezone-aware UTC time
doc = {
"uid": user_id, # Store user's ObjectId
"name": name,
"key": api_key, # Store the provided key
"selected": selected, # Use validated boolean
"createdAt": now,
"updatedAt": now
}
result = db.api_list.insert_one(doc)
# Return success response with the ID of the newly created key
return jsonify({
"message": "API key created successfully.",
"api_id": str(result.inserted_id) # Convert ObjectId to string
}), 201 # 201 Created status code
except KeyError: # Should be caught by initial user_id check
logger.error(f"User ID (_id) not found in token payload for create_api_key.")
return jsonify({"message": "Authentication token is invalid or missing user ID."}), 401
except AttributeError:
logger.error("PyMongo extension not initialized or attached correctly.")
return jsonify({"message": "Database configuration error."}), 500
except Exception as e:
logger.error(f"Error creating API key for user {current_user.get('_id', 'UNKNOWN')}: {e}", exc_info=True)
return jsonify({"message": "An error occurred while creating API key."}), 500
@bp.route('/keys/<string:api_id>', methods=['PUT']) # Path relative to blueprint prefix
@token_required
def update_api_key(current_user, api_id):
"""
Update an existing API key identified by its ID.
Uses APIKeyUpdateSchema for input validation.
Allows updating 'name', 'key', and 'selected' fields.
Verifies ownership before updating.
"""
logger = _get_logger()
# Validate user object from token
if not current_user or not current_user.get("_id"):
return jsonify({"message": "Internal authorization error."}), 500
try:
user_id = ObjectId(current_user["_id"])
except (InvalidId, TypeError) as e:
logger.error(f"User ID conversion error in update_api_key: {e}")
return jsonify({"message": "Invalid user ID format in token."}), 400
# Check dependencies
if not mongo: return jsonify({"message": "Database connection not available."}), 500
if not APIKeyUpdateSchema or not ValidationError:
return jsonify({"message": "Server configuration error: Schema unavailable."}), 500
# Get and validate JSON data using the schema
json_data = request.get_json() or {}
# Note: Update schema should not have required fields, so load won't fail if empty,
# but we check if validated_data is empty later.
schema = APIKeyUpdateSchema()
try:
validated_data = schema.load(json_data)
except ValidationError as err:
logger.warning(f"Update API key validation failed: {err.messages}")
return jsonify(err.messages), 422
# If validation passed but no valid fields were provided
if not validated_data:
return jsonify({"message": "No valid fields provided for update."}), 400
try:
# Validate api_id format from URL path
try:
object_id = ObjectId(api_id)
except InvalidId:
return jsonify({"message": "Invalid API key ID format."}), 400
# Find the document to update
db = mongo.db
doc = db.api_list.find_one({"_id": object_id})
if not doc:
return jsonify({"message": "API key not found."}), 404 # 404 Not Found
# Verify that the authenticated user owns this API key
doc_uid = doc.get("uid")
if not doc_uid or doc_uid != user_id:
# 403 Forbidden - user is authenticated but not authorized for this resource
return jsonify({"message": "You do not have permission to update this API key."}), 403
# --- Prepare Update Fields based on validated data ---
update_fields = {}
if "name" in validated_data:
new_name = validated_data["name"]
# Check for name conflict only if name is actually changing
if new_name != doc.get("name") and db.api_list.find_one({"uid": user_id, "name": new_name, "_id": {"$ne": object_id}}):
return jsonify({"message": f"User already has another API key named {new_name}."}), 409 # Conflict
update_fields["name"] = new_name
if "key" in validated_data:
update_fields["key"] = validated_data["key"]
if "selected" in validated_data:
update_fields["selected"] = validated_data["selected"] # Already boolean from schema
# If, after validation and processing, there's nothing to update (e.g., only invalid fields were sent)
if not update_fields:
return jsonify({"message": "No valid changes detected in the provided data."}), 400
# Always update the 'updatedAt' timestamp
update_fields["updatedAt"] = datetime.datetime.now(datetime.timezone.utc)
# Perform the update operation in the database
result = db.api_list.update_one(
{"_id": object_id}, # Filter by ID
{"$set": update_fields} # Set the new values
)
# Check if the document was found and potentially modified
if result.matched_count == 1:
return jsonify({"message": "API key updated successfully."}), 200
else:
# This case should ideally not happen if find_one succeeded, but included for safety
logger.warning(f"Update matched count was {result.matched_count} for api_id {api_id}")
return jsonify({"message": "API key update failed (key not found after initial check)."}), 404
except KeyError: # Should be caught by initial user_id check
logger.error(f"User ID (_id) not found in token payload for update_api_key.")
return jsonify({"message": "Authentication token is invalid or missing user ID."}), 401
except AttributeError:
logger.error("PyMongo extension not initialized or attached correctly.")
return jsonify({"message": "Database configuration error."}), 500
except Exception as e:
logger.error(f"Error updating API key {api_id} for user {current_user.get('_id', 'UNKNOWN')}: {e}", exc_info=True)
return jsonify({"message": "An error occurred while updating the API key."}), 500
@bp.route('/keys/<string:api_id>', methods=['DELETE']) # Path relative to blueprint prefix
@token_required
def delete_api_key(current_user, api_id):
"""
Delete an API key identified by its ID.
Verifies ownership before deleting.
(No schema needed for input/output here)
"""
logger = _get_logger()
# Validate user object from token
if not current_user or not current_user.get("_id"):
return jsonify({"message": "Internal authorization error."}), 500
try:
user_id = ObjectId(current_user["_id"])
except (InvalidId, TypeError) as e:
logger.error(f"User ID conversion error in delete_api_key: {e}")
return jsonify({"message": "Invalid user ID format in token."}), 400
if not mongo: return jsonify({"message": "Database connection not available."}), 500
try:
# Validate api_id format from URL path
try:
object_id = ObjectId(api_id)
except InvalidId:
return jsonify({"message": "Invalid API key ID format."}), 400
# Find the document to delete
db = mongo.db
doc = db.api_list.find_one({"_id": object_id}, {"uid": 1}) # Fetch only uid for check
if not doc:
return jsonify({"message": "API key not found."}), 404 # 404 Not Found
# Verify that the authenticated user owns this API key
doc_uid = doc.get("uid")
if not doc_uid or doc_uid != user_id:
# 403 Forbidden
return jsonify({"message": "You do not have permission to delete this API key."}), 403
# Perform the delete operation
result = db.api_list.delete_one({"_id": object_id})
# Check if the deletion was successful
if result.deleted_count == 1:
return jsonify({"message": "API key deleted successfully."}), 200 # 200 OK or 204 No Content are suitable
else:
# This case means the document existed initially but couldn't be deleted
logger.error(f"Failed to delete API key {api_id} despite finding it initially.")
return jsonify({"message": "Failed to delete API key (already deleted?)."}), 404 # Or 500
except KeyError: # Should be caught by initial user_id check
logger.error(f"User ID (_id) not found in token payload for delete_api_key.")
return jsonify({"message": "Authentication token is invalid or missing user ID."}), 401
except AttributeError:
logger.error("PyMongo extension not initialized or attached correctly.")
return jsonify({"message": "Database configuration error."}), 500
except Exception as e:
logger.error(f"Error deleting API key {api_id} for user {current_user.get('_id', 'UNKNOWN')}: {e}", exc_info=True)
return jsonify({"message": "An error occurred while deleting the API key."}), 500