338 lines
15 KiB
Python
338 lines
15 KiB
Python
# myapp/ai_services/ai_routes.py
|
|
# This file handles API Key management logic.
|
|
|
|
import datetime
|
|
import logging
|
|
from flask import request, jsonify, current_app, has_app_context # Flask utilities
|
|
from bson.objectid import ObjectId, InvalidId # For MongoDB ObjectIds
|
|
from functools import wraps # Import wraps for dummy decorator
|
|
|
|
# --- Local Blueprint Import ---
|
|
from . import bp # Import the 'bp' instance defined in the local __init__.py
|
|
|
|
# --- Shared Extensions and Utilities Imports ---
|
|
try:
|
|
from ..extensions import mongo # Import the initialized PyMongo instance
|
|
from ..utils import token_required # Import the authentication decorator
|
|
except ImportError:
|
|
# Fallback or error handling if imports fail
|
|
print("Warning: Could not import mongo or token_required in ai_services/ai_routes.py.")
|
|
mongo = None
|
|
# Define a dummy decorator if token_required is missing
|
|
def token_required(f):
|
|
@wraps(f)
|
|
def wrapper(*args, **kwargs):
|
|
print("ERROR: token_required decorator is not available!")
|
|
return jsonify({"message": "Server configuration error: Missing authentication utility."}), 500
|
|
return wrapper
|
|
|
|
# --- Schema Imports ---
|
|
try:
|
|
# Import the relevant schemas defined in schemas.py
|
|
from ..schemas import APIKeyCreateSchema, APIKeyUpdateSchema, APIKeySchema
|
|
from marshmallow import ValidationError
|
|
except ImportError:
|
|
print("Warning: Could not import APIKey schemas or ValidationError in ai_services/ai_routes.py.")
|
|
APIKeyCreateSchema = None
|
|
APIKeyUpdateSchema = None
|
|
APIKeySchema = None
|
|
ValidationError = None # Define ValidationError as None if import fails
|
|
|
|
# --- Helper to get logger safely ---
|
|
def _get_logger():
|
|
if has_app_context():
|
|
return current_app.logger
|
|
return logging.getLogger(__name__)
|
|
|
|
# Note: Routes use paths relative to the '/api/ai' prefix.
|
|
# Original '/api_list' becomes '/keys'
|
|
# Original '/api_list/<api_id>' becomes '/keys/<api_id>'
|
|
|
|
@bp.route('/keys', methods=['GET']) # Path relative to blueprint prefix
|
|
@token_required
|
|
def list_api_keys(current_user):
|
|
"""
|
|
List all API keys belonging to the authenticated user.
|
|
Uses APIKeySchema for output serialization.
|
|
Fetches keys from the 'api_list' collection associated with the user's ID.
|
|
Sorts by update time descending.
|
|
"""
|
|
logger = _get_logger()
|
|
# Validate user object from token
|
|
if not current_user or not current_user.get("_id"):
|
|
return jsonify({"message": "Internal authorization error."}), 500
|
|
try:
|
|
user_id = ObjectId(current_user["_id"])
|
|
except (InvalidId, TypeError) as e:
|
|
logger.error(f"User ID conversion error in list_api_keys: {e}")
|
|
return jsonify({"message": "Invalid user ID format in token."}), 400
|
|
|
|
# Check dependencies
|
|
if not mongo: return jsonify({"message": "Database connection not available."}), 500
|
|
if not APIKeySchema: return jsonify({"message": "Server configuration error: Schema unavailable."}), 500
|
|
|
|
try:
|
|
# Find all documents in the 'api_list' collection for this user
|
|
db = mongo.db
|
|
cursor = db.api_list.find({"uid": user_id}).sort("updatedAt", -1)
|
|
api_key_docs = list(cursor) # Convert cursor to list
|
|
|
|
# --- Serialize results using the schema ---
|
|
output_schema = APIKeySchema(many=True)
|
|
# Schema handles ObjectId and datetime conversion, and field selection/exclusion
|
|
# NOTE: APIKeySchema currently dumps the full key. Consider masking in schema if needed.
|
|
serialized_result = output_schema.dump(api_key_docs)
|
|
|
|
return jsonify({"api_keys": serialized_result}), 200
|
|
|
|
except KeyError: # Should be caught by initial user_id check
|
|
logger.error(f"User ID (_id) not found in token payload for list_api_keys.")
|
|
return jsonify({"message": "Authentication token is invalid or missing user ID."}), 401
|
|
except AttributeError:
|
|
logger.error("PyMongo extension not initialized or attached correctly.")
|
|
return jsonify({"message": "Database configuration error."}), 500
|
|
except Exception as e:
|
|
logger.error(f"Error listing API keys for user {current_user.get('_id', 'UNKNOWN')}: {e}", exc_info=True)
|
|
return jsonify({"message": "An error occurred while listing API keys."}), 500
|
|
|
|
|
|
@bp.route('/keys', methods=['POST']) # Path relative to blueprint prefix
|
|
@token_required
|
|
def create_api_key(current_user):
|
|
"""
|
|
Create a new API key entry for the authenticated user.
|
|
Uses APIKeyCreateSchema for input validation.
|
|
Expects 'name', 'key', and optional 'selected' in JSON payload.
|
|
Prevents duplicate names per user.
|
|
"""
|
|
logger = _get_logger()
|
|
# Validate user object from token
|
|
if not current_user or not current_user.get("_id"):
|
|
return jsonify({"message": "Internal authorization error."}), 500
|
|
try:
|
|
user_id = ObjectId(current_user["_id"])
|
|
except (InvalidId, TypeError) as e:
|
|
logger.error(f"User ID conversion error in create_api_key: {e}")
|
|
return jsonify({"message": "Invalid user ID format in token."}), 400
|
|
|
|
# Check dependencies
|
|
if not mongo: return jsonify({"message": "Database connection not available."}), 500
|
|
if not APIKeyCreateSchema or not ValidationError:
|
|
return jsonify({"message": "Server configuration error: Schema unavailable."}), 500
|
|
|
|
# Get and validate JSON data using the schema
|
|
json_data = request.get_json() or {}
|
|
schema = APIKeyCreateSchema()
|
|
try:
|
|
validated_data = schema.load(json_data)
|
|
except ValidationError as err:
|
|
logger.warning(f"Create API key validation failed: {err.messages}")
|
|
return jsonify(err.messages), 422 # Return validation errors
|
|
|
|
# Extract validated data
|
|
name = validated_data['name']
|
|
api_key = validated_data['key']
|
|
selected = validated_data['selected'] # Schema provides default if missing
|
|
|
|
try:
|
|
# Check if an API key with the same name already exists for this user
|
|
db = mongo.db
|
|
existing = db.api_list.find_one({"uid": user_id, "name": name})
|
|
if existing:
|
|
# Return 409 Conflict status code for duplicates
|
|
return jsonify({"message": f"User already has an API key for {name}."}), 409
|
|
|
|
# --- Prepare and Insert Document ---
|
|
now = datetime.datetime.now(datetime.timezone.utc) # Use timezone-aware UTC time
|
|
doc = {
|
|
"uid": user_id, # Store user's ObjectId
|
|
"name": name,
|
|
"key": api_key, # Store the provided key
|
|
"selected": selected, # Use validated boolean
|
|
"createdAt": now,
|
|
"updatedAt": now
|
|
}
|
|
result = db.api_list.insert_one(doc)
|
|
|
|
# Return success response with the ID of the newly created key
|
|
return jsonify({
|
|
"message": "API key created successfully.",
|
|
"api_id": str(result.inserted_id) # Convert ObjectId to string
|
|
}), 201 # 201 Created status code
|
|
|
|
except KeyError: # Should be caught by initial user_id check
|
|
logger.error(f"User ID (_id) not found in token payload for create_api_key.")
|
|
return jsonify({"message": "Authentication token is invalid or missing user ID."}), 401
|
|
except AttributeError:
|
|
logger.error("PyMongo extension not initialized or attached correctly.")
|
|
return jsonify({"message": "Database configuration error."}), 500
|
|
except Exception as e:
|
|
logger.error(f"Error creating API key for user {current_user.get('_id', 'UNKNOWN')}: {e}", exc_info=True)
|
|
return jsonify({"message": "An error occurred while creating API key."}), 500
|
|
|
|
|
|
@bp.route('/keys/<string:api_id>', methods=['PUT']) # Path relative to blueprint prefix
|
|
@token_required
|
|
def update_api_key(current_user, api_id):
|
|
"""
|
|
Update an existing API key identified by its ID.
|
|
Uses APIKeyUpdateSchema for input validation.
|
|
Allows updating 'name', 'key', and 'selected' fields.
|
|
Verifies ownership before updating.
|
|
"""
|
|
logger = _get_logger()
|
|
# Validate user object from token
|
|
if not current_user or not current_user.get("_id"):
|
|
return jsonify({"message": "Internal authorization error."}), 500
|
|
try:
|
|
user_id = ObjectId(current_user["_id"])
|
|
except (InvalidId, TypeError) as e:
|
|
logger.error(f"User ID conversion error in update_api_key: {e}")
|
|
return jsonify({"message": "Invalid user ID format in token."}), 400
|
|
|
|
# Check dependencies
|
|
if not mongo: return jsonify({"message": "Database connection not available."}), 500
|
|
if not APIKeyUpdateSchema or not ValidationError:
|
|
return jsonify({"message": "Server configuration error: Schema unavailable."}), 500
|
|
|
|
# Get and validate JSON data using the schema
|
|
json_data = request.get_json() or {}
|
|
# Note: Update schema should not have required fields, so load won't fail if empty,
|
|
# but we check if validated_data is empty later.
|
|
schema = APIKeyUpdateSchema()
|
|
try:
|
|
validated_data = schema.load(json_data)
|
|
except ValidationError as err:
|
|
logger.warning(f"Update API key validation failed: {err.messages}")
|
|
return jsonify(err.messages), 422
|
|
|
|
# If validation passed but no valid fields were provided
|
|
if not validated_data:
|
|
return jsonify({"message": "No valid fields provided for update."}), 400
|
|
|
|
try:
|
|
# Validate api_id format from URL path
|
|
try:
|
|
object_id = ObjectId(api_id)
|
|
except InvalidId:
|
|
return jsonify({"message": "Invalid API key ID format."}), 400
|
|
|
|
# Find the document to update
|
|
db = mongo.db
|
|
doc = db.api_list.find_one({"_id": object_id})
|
|
if not doc:
|
|
return jsonify({"message": "API key not found."}), 404 # 404 Not Found
|
|
|
|
# Verify that the authenticated user owns this API key
|
|
doc_uid = doc.get("uid")
|
|
if not doc_uid or doc_uid != user_id:
|
|
# 403 Forbidden - user is authenticated but not authorized for this resource
|
|
return jsonify({"message": "You do not have permission to update this API key."}), 403
|
|
|
|
# --- Prepare Update Fields based on validated data ---
|
|
update_fields = {}
|
|
if "name" in validated_data:
|
|
new_name = validated_data["name"]
|
|
# Check for name conflict only if name is actually changing
|
|
if new_name != doc.get("name") and db.api_list.find_one({"uid": user_id, "name": new_name, "_id": {"$ne": object_id}}):
|
|
return jsonify({"message": f"User already has another API key named {new_name}."}), 409 # Conflict
|
|
update_fields["name"] = new_name
|
|
|
|
if "key" in validated_data:
|
|
update_fields["key"] = validated_data["key"]
|
|
|
|
if "selected" in validated_data:
|
|
update_fields["selected"] = validated_data["selected"] # Already boolean from schema
|
|
|
|
# If, after validation and processing, there's nothing to update (e.g., only invalid fields were sent)
|
|
if not update_fields:
|
|
return jsonify({"message": "No valid changes detected in the provided data."}), 400
|
|
|
|
# Always update the 'updatedAt' timestamp
|
|
update_fields["updatedAt"] = datetime.datetime.now(datetime.timezone.utc)
|
|
|
|
# Perform the update operation in the database
|
|
result = db.api_list.update_one(
|
|
{"_id": object_id}, # Filter by ID
|
|
{"$set": update_fields} # Set the new values
|
|
)
|
|
|
|
# Check if the document was found and potentially modified
|
|
if result.matched_count == 1:
|
|
return jsonify({"message": "API key updated successfully."}), 200
|
|
else:
|
|
# This case should ideally not happen if find_one succeeded, but included for safety
|
|
logger.warning(f"Update matched count was {result.matched_count} for api_id {api_id}")
|
|
return jsonify({"message": "API key update failed (key not found after initial check)."}), 404
|
|
|
|
except KeyError: # Should be caught by initial user_id check
|
|
logger.error(f"User ID (_id) not found in token payload for update_api_key.")
|
|
return jsonify({"message": "Authentication token is invalid or missing user ID."}), 401
|
|
except AttributeError:
|
|
logger.error("PyMongo extension not initialized or attached correctly.")
|
|
return jsonify({"message": "Database configuration error."}), 500
|
|
except Exception as e:
|
|
logger.error(f"Error updating API key {api_id} for user {current_user.get('_id', 'UNKNOWN')}: {e}", exc_info=True)
|
|
return jsonify({"message": "An error occurred while updating the API key."}), 500
|
|
|
|
|
|
@bp.route('/keys/<string:api_id>', methods=['DELETE']) # Path relative to blueprint prefix
|
|
@token_required
|
|
def delete_api_key(current_user, api_id):
|
|
"""
|
|
Delete an API key identified by its ID.
|
|
Verifies ownership before deleting.
|
|
(No schema needed for input/output here)
|
|
"""
|
|
logger = _get_logger()
|
|
# Validate user object from token
|
|
if not current_user or not current_user.get("_id"):
|
|
return jsonify({"message": "Internal authorization error."}), 500
|
|
try:
|
|
user_id = ObjectId(current_user["_id"])
|
|
except (InvalidId, TypeError) as e:
|
|
logger.error(f"User ID conversion error in delete_api_key: {e}")
|
|
return jsonify({"message": "Invalid user ID format in token."}), 400
|
|
|
|
if not mongo: return jsonify({"message": "Database connection not available."}), 500
|
|
|
|
try:
|
|
# Validate api_id format from URL path
|
|
try:
|
|
object_id = ObjectId(api_id)
|
|
except InvalidId:
|
|
return jsonify({"message": "Invalid API key ID format."}), 400
|
|
|
|
# Find the document to delete
|
|
db = mongo.db
|
|
doc = db.api_list.find_one({"_id": object_id}, {"uid": 1}) # Fetch only uid for check
|
|
if not doc:
|
|
return jsonify({"message": "API key not found."}), 404 # 404 Not Found
|
|
|
|
# Verify that the authenticated user owns this API key
|
|
doc_uid = doc.get("uid")
|
|
if not doc_uid or doc_uid != user_id:
|
|
# 403 Forbidden
|
|
return jsonify({"message": "You do not have permission to delete this API key."}), 403
|
|
|
|
# Perform the delete operation
|
|
result = db.api_list.delete_one({"_id": object_id})
|
|
|
|
# Check if the deletion was successful
|
|
if result.deleted_count == 1:
|
|
return jsonify({"message": "API key deleted successfully."}), 200 # 200 OK or 204 No Content are suitable
|
|
else:
|
|
# This case means the document existed initially but couldn't be deleted
|
|
logger.error(f"Failed to delete API key {api_id} despite finding it initially.")
|
|
return jsonify({"message": "Failed to delete API key (already deleted?)."}), 404 # Or 500
|
|
|
|
except KeyError: # Should be caught by initial user_id check
|
|
logger.error(f"User ID (_id) not found in token payload for delete_api_key.")
|
|
return jsonify({"message": "Authentication token is invalid or missing user ID."}), 401
|
|
except AttributeError:
|
|
logger.error("PyMongo extension not initialized or attached correctly.")
|
|
return jsonify({"message": "Database configuration error."}), 500
|
|
except Exception as e:
|
|
logger.error(f"Error deleting API key {api_id} for user {current_user.get('_id', 'UNKNOWN')}: {e}", exc_info=True)
|
|
return jsonify({"message": "An error occurred while deleting the API key."}), 500
|