# myapp/ai_services/ai_routes.py # This file handles API Key management logic. import datetime import logging from flask import request, jsonify, current_app, has_app_context # Flask utilities from bson.objectid import ObjectId, InvalidId # For MongoDB ObjectIds from functools import wraps # Import wraps for dummy decorator # --- Local Blueprint Import --- from . import bp # Import the 'bp' instance defined in the local __init__.py # --- Shared Extensions and Utilities Imports --- try: from ..extensions import mongo # Import the initialized PyMongo instance from ..utils import token_required # Import the authentication decorator except ImportError: # Fallback or error handling if imports fail print("Warning: Could not import mongo or token_required in ai_services/ai_routes.py.") mongo = None # Define a dummy decorator if token_required is missing def token_required(f): @wraps(f) def wrapper(*args, **kwargs): print("ERROR: token_required decorator is not available!") return jsonify({"message": "Server configuration error: Missing authentication utility."}), 500 return wrapper # --- Schema Imports --- try: # Import the relevant schemas defined in schemas.py from ..schemas import APIKeyCreateSchema, APIKeyUpdateSchema, APIKeySchema from marshmallow import ValidationError except ImportError: print("Warning: Could not import APIKey schemas or ValidationError in ai_services/ai_routes.py.") APIKeyCreateSchema = None APIKeyUpdateSchema = None APIKeySchema = None ValidationError = None # Define ValidationError as None if import fails # --- Helper to get logger safely --- def _get_logger(): if has_app_context(): return current_app.logger return logging.getLogger(__name__) # Note: Routes use paths relative to the '/api/ai' prefix. # Original '/api_list' becomes '/keys' # Original '/api_list/' becomes '/keys/' @bp.route('/keys', methods=['GET']) # Path relative to blueprint prefix @token_required def list_api_keys(current_user): """ List all API keys belonging to the authenticated user. Uses APIKeySchema for output serialization. Fetches keys from the 'api_list' collection associated with the user's ID. Sorts by update time descending. """ logger = _get_logger() # Validate user object from token if not current_user or not current_user.get("_id"): return jsonify({"message": "Internal authorization error."}), 500 try: user_id = ObjectId(current_user["_id"]) except (InvalidId, TypeError) as e: logger.error(f"User ID conversion error in list_api_keys: {e}") return jsonify({"message": "Invalid user ID format in token."}), 400 # Check dependencies if not mongo: return jsonify({"message": "Database connection not available."}), 500 if not APIKeySchema: return jsonify({"message": "Server configuration error: Schema unavailable."}), 500 try: # Find all documents in the 'api_list' collection for this user db = mongo.db cursor = db.api_list.find({"uid": user_id}).sort("updatedAt", -1) api_key_docs = list(cursor) # Convert cursor to list # --- Serialize results using the schema --- output_schema = APIKeySchema(many=True) # Schema handles ObjectId and datetime conversion, and field selection/exclusion # NOTE: APIKeySchema currently dumps the full key. Consider masking in schema if needed. serialized_result = output_schema.dump(api_key_docs) return jsonify({"api_keys": serialized_result}), 200 except KeyError: # Should be caught by initial user_id check logger.error(f"User ID (_id) not found in token payload for list_api_keys.") return jsonify({"message": "Authentication token is invalid or missing user ID."}), 401 except AttributeError: logger.error("PyMongo extension not initialized or attached correctly.") return jsonify({"message": "Database configuration error."}), 500 except Exception as e: logger.error(f"Error listing API keys for user {current_user.get('_id', 'UNKNOWN')}: {e}", exc_info=True) return jsonify({"message": "An error occurred while listing API keys."}), 500 @bp.route('/keys', methods=['POST']) # Path relative to blueprint prefix @token_required def create_api_key(current_user): """ Create a new API key entry for the authenticated user. Uses APIKeyCreateSchema for input validation. Expects 'name', 'key', and optional 'selected' in JSON payload. Prevents duplicate names per user. """ logger = _get_logger() # Validate user object from token if not current_user or not current_user.get("_id"): return jsonify({"message": "Internal authorization error."}), 500 try: user_id = ObjectId(current_user["_id"]) except (InvalidId, TypeError) as e: logger.error(f"User ID conversion error in create_api_key: {e}") return jsonify({"message": "Invalid user ID format in token."}), 400 # Check dependencies if not mongo: return jsonify({"message": "Database connection not available."}), 500 if not APIKeyCreateSchema or not ValidationError: return jsonify({"message": "Server configuration error: Schema unavailable."}), 500 # Get and validate JSON data using the schema json_data = request.get_json() or {} schema = APIKeyCreateSchema() try: validated_data = schema.load(json_data) except ValidationError as err: logger.warning(f"Create API key validation failed: {err.messages}") return jsonify(err.messages), 422 # Return validation errors # Extract validated data name = validated_data['name'] api_key = validated_data['key'] selected = validated_data['selected'] # Schema provides default if missing try: # Check if an API key with the same name already exists for this user db = mongo.db existing = db.api_list.find_one({"uid": user_id, "name": name}) if existing: # Return 409 Conflict status code for duplicates return jsonify({"message": f"User already has an API key for {name}."}), 409 # --- Prepare and Insert Document --- now = datetime.datetime.now(datetime.timezone.utc) # Use timezone-aware UTC time doc = { "uid": user_id, # Store user's ObjectId "name": name, "key": api_key, # Store the provided key "selected": selected, # Use validated boolean "createdAt": now, "updatedAt": now } result = db.api_list.insert_one(doc) # Return success response with the ID of the newly created key return jsonify({ "message": "API key created successfully.", "api_id": str(result.inserted_id) # Convert ObjectId to string }), 201 # 201 Created status code except KeyError: # Should be caught by initial user_id check logger.error(f"User ID (_id) not found in token payload for create_api_key.") return jsonify({"message": "Authentication token is invalid or missing user ID."}), 401 except AttributeError: logger.error("PyMongo extension not initialized or attached correctly.") return jsonify({"message": "Database configuration error."}), 500 except Exception as e: logger.error(f"Error creating API key for user {current_user.get('_id', 'UNKNOWN')}: {e}", exc_info=True) return jsonify({"message": "An error occurred while creating API key."}), 500 @bp.route('/keys/', methods=['PUT']) # Path relative to blueprint prefix @token_required def update_api_key(current_user, api_id): """ Update an existing API key identified by its ID. Uses APIKeyUpdateSchema for input validation. Allows updating 'name', 'key', and 'selected' fields. Verifies ownership before updating. """ logger = _get_logger() # Validate user object from token if not current_user or not current_user.get("_id"): return jsonify({"message": "Internal authorization error."}), 500 try: user_id = ObjectId(current_user["_id"]) except (InvalidId, TypeError) as e: logger.error(f"User ID conversion error in update_api_key: {e}") return jsonify({"message": "Invalid user ID format in token."}), 400 # Check dependencies if not mongo: return jsonify({"message": "Database connection not available."}), 500 if not APIKeyUpdateSchema or not ValidationError: return jsonify({"message": "Server configuration error: Schema unavailable."}), 500 # Get and validate JSON data using the schema json_data = request.get_json() or {} # Note: Update schema should not have required fields, so load won't fail if empty, # but we check if validated_data is empty later. schema = APIKeyUpdateSchema() try: validated_data = schema.load(json_data) except ValidationError as err: logger.warning(f"Update API key validation failed: {err.messages}") return jsonify(err.messages), 422 # If validation passed but no valid fields were provided if not validated_data: return jsonify({"message": "No valid fields provided for update."}), 400 try: # Validate api_id format from URL path try: object_id = ObjectId(api_id) except InvalidId: return jsonify({"message": "Invalid API key ID format."}), 400 # Find the document to update db = mongo.db doc = db.api_list.find_one({"_id": object_id}) if not doc: return jsonify({"message": "API key not found."}), 404 # 404 Not Found # Verify that the authenticated user owns this API key doc_uid = doc.get("uid") if not doc_uid or doc_uid != user_id: # 403 Forbidden - user is authenticated but not authorized for this resource return jsonify({"message": "You do not have permission to update this API key."}), 403 # --- Prepare Update Fields based on validated data --- update_fields = {} if "name" in validated_data: new_name = validated_data["name"] # Check for name conflict only if name is actually changing if new_name != doc.get("name") and db.api_list.find_one({"uid": user_id, "name": new_name, "_id": {"$ne": object_id}}): return jsonify({"message": f"User already has another API key named {new_name}."}), 409 # Conflict update_fields["name"] = new_name if "key" in validated_data: update_fields["key"] = validated_data["key"] if "selected" in validated_data: update_fields["selected"] = validated_data["selected"] # Already boolean from schema # If, after validation and processing, there's nothing to update (e.g., only invalid fields were sent) if not update_fields: return jsonify({"message": "No valid changes detected in the provided data."}), 400 # Always update the 'updatedAt' timestamp update_fields["updatedAt"] = datetime.datetime.now(datetime.timezone.utc) # Perform the update operation in the database result = db.api_list.update_one( {"_id": object_id}, # Filter by ID {"$set": update_fields} # Set the new values ) # Check if the document was found and potentially modified if result.matched_count == 1: return jsonify({"message": "API key updated successfully."}), 200 else: # This case should ideally not happen if find_one succeeded, but included for safety logger.warning(f"Update matched count was {result.matched_count} for api_id {api_id}") return jsonify({"message": "API key update failed (key not found after initial check)."}), 404 except KeyError: # Should be caught by initial user_id check logger.error(f"User ID (_id) not found in token payload for update_api_key.") return jsonify({"message": "Authentication token is invalid or missing user ID."}), 401 except AttributeError: logger.error("PyMongo extension not initialized or attached correctly.") return jsonify({"message": "Database configuration error."}), 500 except Exception as e: logger.error(f"Error updating API key {api_id} for user {current_user.get('_id', 'UNKNOWN')}: {e}", exc_info=True) return jsonify({"message": "An error occurred while updating the API key."}), 500 @bp.route('/keys/', methods=['DELETE']) # Path relative to blueprint prefix @token_required def delete_api_key(current_user, api_id): """ Delete an API key identified by its ID. Verifies ownership before deleting. (No schema needed for input/output here) """ logger = _get_logger() # Validate user object from token if not current_user or not current_user.get("_id"): return jsonify({"message": "Internal authorization error."}), 500 try: user_id = ObjectId(current_user["_id"]) except (InvalidId, TypeError) as e: logger.error(f"User ID conversion error in delete_api_key: {e}") return jsonify({"message": "Invalid user ID format in token."}), 400 if not mongo: return jsonify({"message": "Database connection not available."}), 500 try: # Validate api_id format from URL path try: object_id = ObjectId(api_id) except InvalidId: return jsonify({"message": "Invalid API key ID format."}), 400 # Find the document to delete db = mongo.db doc = db.api_list.find_one({"_id": object_id}, {"uid": 1}) # Fetch only uid for check if not doc: return jsonify({"message": "API key not found."}), 404 # 404 Not Found # Verify that the authenticated user owns this API key doc_uid = doc.get("uid") if not doc_uid or doc_uid != user_id: # 403 Forbidden return jsonify({"message": "You do not have permission to delete this API key."}), 403 # Perform the delete operation result = db.api_list.delete_one({"_id": object_id}) # Check if the deletion was successful if result.deleted_count == 1: return jsonify({"message": "API key deleted successfully."}), 200 # 200 OK or 204 No Content are suitable else: # This case means the document existed initially but couldn't be deleted logger.error(f"Failed to delete API key {api_id} despite finding it initially.") return jsonify({"message": "Failed to delete API key (already deleted?)."}), 404 # Or 500 except KeyError: # Should be caught by initial user_id check logger.error(f"User ID (_id) not found in token payload for delete_api_key.") return jsonify({"message": "Authentication token is invalid or missing user ID."}), 401 except AttributeError: logger.error("PyMongo extension not initialized or attached correctly.") return jsonify({"message": "Database configuration error."}), 500 except Exception as e: logger.error(f"Error deleting API key {api_id} for user {current_user.get('_id', 'UNKNOWN')}: {e}", exc_info=True) return jsonify({"message": "An error occurred while deleting the API key."}), 500