2025-06-09 17:53:19 +08:00

818 lines
37 KiB
Python

# myapp/urls/urls_routes.py
import datetime
import logging
from flask import request, jsonify, current_app, has_app_context # Flask utilities
from bson.objectid import ObjectId, InvalidId # For MongoDB ObjectIds
from collections import defaultdict # Potentially useful for keyword aggregation etc.
from functools import wraps # For creating dummy decorators
import re # For escaping regex characters in search
# --- Local Blueprint Import ---
from . import bp # Import the 'bp' instance defined in the local __init__.py
# --- Shared Extensions and Utilities Imports ---
try:
from ..extensions import mongo # Import the initialized PyMongo instance
from ..utils import token_required # Import the authentication decorator
except ImportError:
# Fallback or error handling if imports fail
print("Warning: Could not import mongo or token_required in urls/urls_routes.py.")
mongo = None
# Define a dummy decorator if token_required is missing
def token_required(f):
@wraps(f)
def wrapper(*args, **kwargs):
print("ERROR: token_required decorator is not available!")
return jsonify({"message": "Server configuration error: Missing authentication utility."}), 500
return wrapper
# --- Schema Imports ---
try:
# Import the relevant schemas defined in schemas.py
from ..schemas import (
URLCreateSchema, URLUpdateSchema, URLSchema,
URLListSchema, URLSearchResultSchema
)
from marshmallow import ValidationError
except ImportError:
print("Warning: Could not import URL schemas or ValidationError in urls/urls_routes.py.")
URLCreateSchema = None
URLUpdateSchema = None
URLSchema = None
URLListSchema = None
URLSearchResultSchema = None
ValidationError = None
# --- Celery Task Imports ---
# IMPORTANT: Assumes the project root directory ('your_fullstack_project/') is in PYTHONPATH
try:
from backend_flask.celery_worker.celery_app import async_extract_title_and_keywords, async_summarize_url, async_recalc_project_keywords
except ModuleNotFoundError:
print("Warning: Could not import Celery tasks from 'celery_worker'. Ensure project root is in PYTHONPATH.")
# Define dummy task functions to prevent NameError if Celery isn't set up
def _dummy_celery_task(*args, **kwargs):
task_name = args[0] if args else 'dummy_task'
print(f"ERROR: Celery task {task_name} not available!")
class DummyTask:
def __init__(self, name):
self.__name__ = name
def delay(self, *a, **kw):
print(f"ERROR: Tried to call delay() on dummy task {self.__name__}")
pass
return DummyTask(task_name)
async_extract_title_and_keywords = _dummy_celery_task('async_extract_title_and_keywords')
async_summarize_url = _dummy_celery_task('async_summarize_url')
async_recalc_project_keywords = _dummy_celery_task('async_recalc_project_keywords')
# --- Helper to get logger safely ---
def _get_logger():
if has_app_context():
return current_app.logger
return logging.getLogger(__name__)
# Note: Routes use paths relative to the '/api' prefix defined in __init__.py.
# --------------------------
# Create URL Endpoint
# Path: POST /api/projects/<project_id>/urls
# --------------------------
@bp.route('/projects/<string:project_id>/urls', methods=['POST'])
@token_required
def create_url(current_user, project_id):
"""
Create a new URL entry within a specific project.
Uses URLCreateSchema for input validation.
Expects 'url' and optional fields in JSON payload.
Verifies project access for the authenticated user.
Triggers background Celery tasks for title/keyword extraction and summarization.
"""
logger = _get_logger()
# Validate user object from token
if not current_user or not current_user.get("_id"):
logger.error("Invalid current_user object received in create_url")
return jsonify({"message": "Internal authorization error."}), 500
try:
user_id = ObjectId(current_user["_id"])
user_id_str = str(user_id) # Keep string version for Celery tasks
except (InvalidId, TypeError) as e:
logger.error(f"User ID conversion error in create_url: {e}")
return jsonify({"message": "Invalid user ID format in token."}), 400
# Check dependencies
if not mongo: return jsonify({"message": "Database connection not available."}), 500
if not URLCreateSchema or not ValidationError:
return jsonify({"message": "Server configuration error: Schema unavailable."}), 500
# Get and validate JSON data using the schema
json_data = request.get_json() or {}
logger.debug(f"create_url called: project_id={project_id}, data={json_data}, user_id={user_id_str}")
schema = URLCreateSchema()
try:
# Validate only the required 'url' field initially
validated_input = schema.load(json_data)
except ValidationError as err:
logger.warning(f"Create URL validation failed: {err.messages}")
return jsonify(err.messages), 422
user_url = validated_input['url'] # URL is guaranteed by schema
try:
# Validate project ID format from URL path
try:
project_obj_id = ObjectId(project_id)
except InvalidId:
return jsonify({"message": "Invalid project ID format in URL path."}), 400
# --- Verify Project Access ---
db = mongo.db
project = db.projects.find_one({"_id": project_obj_id}, {"ownerId": 1, "collaborators": 1})
if not project:
return jsonify({"message": "Project not found."}), 404
owner_id = project.get("ownerId")
collaborators = project.get("collaborators", [])
if not owner_id:
logger.error(f"Project {project_obj_id} is missing ownerId field.")
return jsonify({"message": "Project data integrity issue."}), 500
if owner_id != user_id and user_id not in collaborators:
return jsonify({"message": "You do not have access to add URLs to this project."}), 403
# --- Prepare URL Document (using original data for optional fields) ---
# Optional fields are taken directly from original data, not schema output here
keywords_data = data.get("keywords", []) # Process keywords manually as before
keywords_converted = []
if isinstance(keywords_data, list):
for kw in keywords_data:
if isinstance(kw, dict):
word = kw.get("word", "").strip()
if word:
try:
percentage = float(kw.get("percentage", 0.0))
keywords_converted.append({"word": word, "percentage": percentage})
except (ValueError, TypeError):
logger.warning(f"Could not convert keyword percentage for word '{word}' during URL creation.")
else:
logger.warning("Non-dict item found in keywords during URL creation.")
now = datetime.datetime.now(datetime.timezone.utc)
url_doc = {
"projectId": project_obj_id,
"url": user_url, # Use validated URL
"title": data.get("title", "").strip(),
"favicon": data.get("favicon", ""),
"starred": bool(data.get("starred", False)),
"note": data.get("note", "").strip(),
"keywords": keywords_converted,
"summary": data.get("summary", "").strip(),
"processingStatus": "pending",
"createdAt": now,
"updatedAt": now
}
# Insert the new URL document
result = db.urls.insert_one(url_doc)
new_url_id_str = str(result.inserted_id)
logger.info(f"Successfully inserted URL {new_url_id_str} for project {project_id}")
# --- Trigger Background Tasks ---
tasks_queued = True
try:
async_extract_title_and_keywords.delay(new_url_id_str, user_id_str)
api_doc = db.api_list.find_one({"uid": user_id, "selected": True, "name": "Gemini"})
use_gemini = bool(api_doc and api_doc.get("key"))
async_summarize_url.delay(new_url_id_str, user_id_str, use_gemini)
logger.info(f"Queued Celery tasks for URL {new_url_id_str} (use_gemini={use_gemini})")
except NameError as ne:
logger.error(f"Celery tasks not available for URL {new_url_id_str}: {ne}. Processing cannot be initiated.")
tasks_queued = False
except Exception as celery_err:
logger.error(f"Failed to queue Celery tasks for URL {new_url_id_str}: {celery_err}", exc_info=True)
tasks_queued = False
response_message = "URL created successfully and processing initiated." if tasks_queued else "URL created, but failed to initiate background processing."
return jsonify({"message": response_message, "url_id": new_url_id_str}), 201
except Exception as e:
logger.error(f"Error creating URL for project {project_id}: {e}", exc_info=True)
return jsonify({"message": "An internal error occurred while creating the URL."}), 500
# --------------------------
# List URLs for Project (Simplified)
# Path: GET /api/projects/<project_id>/urls
# --------------------------
@bp.route('/projects/<string:project_id>/urls', methods=['GET'])
@token_required
def list_urls_for_project(current_user, project_id):
"""
Retrieve a simplified list (id, title, url) of all URLs within a specific project.
Uses URLListSchema for output serialization.
Verifies user access to the project.
"""
logger = _get_logger()
# Validate user object from token
if not current_user or not current_user.get("_id"):
return jsonify({"message": "Internal authorization error."}), 500
try:
user_id = ObjectId(current_user["_id"])
except (InvalidId, TypeError) as e:
logger.error(f"User ID conversion error in list_urls_for_project: {e}")
return jsonify({"message": "Invalid user ID format in token."}), 400
# Check dependencies
if not mongo: return jsonify({"message": "Database connection not available."}), 500
if not URLListSchema: return jsonify({"message": "Server configuration error: Schema unavailable."}), 500
try:
# Validate project ID format from URL path
try:
obj_project_id = ObjectId(project_id)
except InvalidId:
return jsonify({"message": "Invalid project ID format in URL path."}), 400
db = mongo.db
# --- Verify Project Access ---
project = db.projects.find_one(
{"_id": obj_project_id},
{"ownerId": 1, "collaborators": 1} # Projection for access check
)
if not project:
return jsonify({"message": "Project not found."}), 404
owner_id = project.get("ownerId")
collaborators = project.get("collaborators", [])
if owner_id != user_id and user_id not in collaborators:
return jsonify({"message": "You do not have access to this project's URLs."}), 403
# --- Fetch and Serialize URLs ---
# Find URLs for the project, projecting only fields needed by schema
cursor = db.urls.find(
{"projectId": obj_project_id},
{"_id": 1, "title": 1, "url": 1} # Projection matching URLListSchema
).sort("updatedAt", -1) # Sort by most recently updated
url_docs = list(cursor) # Convert cursor to list
# Serialize using the schema
output_schema = URLListSchema(many=True)
serialized_result = output_schema.dump(url_docs)
# Return the serialized list of URLs
return jsonify({"urls": serialized_result}), 200
except Exception as e:
logger.error(f"Error listing URLs for project {project_id}: {e}", exc_info=True)
return jsonify({"message": "An error occurred while listing URLs."}), 500
# --------------------------
# Get URL Detail
# Path: GET /api/urls/<url_id>
# --------------------------
@bp.route('/urls/<string:url_id>', methods=['GET'])
@token_required
def get_url_detail(current_user, url_id):
"""
Retrieve the full details for a specific URL entry by its ID.
Uses URLSchema for output serialization.
Verifies user access via the associated project.
"""
logger = _get_logger()
# Validate user object from token
if not current_user or not current_user.get("_id"):
return jsonify({"message": "Internal authorization error."}), 500
try:
user_id = ObjectId(current_user["_id"])
except (InvalidId, TypeError) as e:
logger.error(f"User ID conversion error in get_url_detail: {e}")
return jsonify({"message": "Invalid user ID format in token."}), 400
# Check dependencies
if not mongo: return jsonify({"message": "Database connection not available."}), 500
if not URLSchema: return jsonify({"message": "Server configuration error: Schema unavailable."}), 500
try:
# Validate URL ID format from URL path
try:
obj_url_id = ObjectId(url_id)
except InvalidId:
return jsonify({"message": "Invalid URL ID format."}), 400
db = mongo.db
# Find the URL document
url_doc = db.urls.find_one({"_id": obj_url_id})
if not url_doc:
return jsonify({"message": "URL not found."}), 404
# --- Verify Project Access ---
project_obj_id = url_doc.get("projectId")
if not project_obj_id or not isinstance(project_obj_id, ObjectId):
logger.error(f"URL {url_id} has missing or invalid projectId.")
return jsonify({"message": "URL data integrity issue (missing project link)."}), 500
project = db.projects.find_one(
{"_id": project_obj_id},
{"ownerId": 1, "collaborators": 1} # Projection for access check
)
if not project:
logger.error(f"Project {project_obj_id} associated with URL {url_id} not found.")
return jsonify({"message": "Associated project not found; cannot verify access."}), 404 # Or 500
owner_id = project.get("ownerId")
collaborators = project.get("collaborators", [])
if not owner_id:
logger.error(f"Project {project_obj_id} is missing ownerId in get_url_detail.")
return jsonify({"message": "Project data integrity issue."}), 500
if owner_id != user_id and user_id not in collaborators:
return jsonify({"message": "You do not have access to the project containing this URL."}), 403
# --- Serialize and Return URL Details ---
output_schema = URLSchema()
# Schema handles ObjectId, datetime conversion, and field selection
serialized_result = output_schema.dump(url_doc)
return jsonify(serialized_result), 200
except Exception as e:
logger.error(f"Error retrieving URL detail for {url_id}: {e}", exc_info=True)
return jsonify({"message": "An error occurred while retrieving the URL details."}), 500
# --------------------------
# Update URL
# Path: PUT /api/urls/<url_id>
# --------------------------
@bp.route('/urls/<string:url_id>', methods=['PUT'])
@token_required
def update_url(current_user, url_id):
"""
Update specific fields of a URL entry (title, starred, note, keywords).
Uses URLUpdateSchema for input validation.
Verifies user access via the associated project.
Triggers project keyword recalculation if keywords are changed.
Returns simplified updated URL info using URLListSchema.
"""
logger = _get_logger()
# Validate user object from token
if not current_user or not current_user.get("_id"):
return jsonify({"message": "Internal authorization error."}), 500
try:
user_id = ObjectId(current_user["_id"])
except (InvalidId, TypeError) as e:
logger.error(f"User ID conversion error in update_url: {e}")
return jsonify({"message": "Invalid user ID format in token."}), 400
# Check dependencies
if not mongo: return jsonify({"message": "Database connection not available."}), 500
if not URLUpdateSchema or not URLListSchema or not ValidationError:
return jsonify({"message": "Server configuration error: Schema unavailable."}), 500
# Get and validate JSON data using the schema
json_data = request.get_json() or {}
update_schema = URLUpdateSchema()
try:
# Load validates only the allowed fields (title, starred, note, keywords)
validated_data = update_schema.load(json_data)
except ValidationError as err:
logger.warning(f"Update URL validation failed: {err.messages}")
return jsonify(err.messages), 422
# If validation passed but no valid fields were provided
if not validated_data:
return jsonify({"message": "No valid fields provided for update."}), 400
try:
# Validate URL ID format
try:
obj_url_id = ObjectId(url_id)
except InvalidId:
return jsonify({"message": "Invalid URL ID format."}), 400
db = mongo.db
# --- Find URL and Verify Access ---
# Fetch projectId needed for access check
url_doc = db.urls.find_one({"_id": obj_url_id}, {"projectId": 1})
if not url_doc:
return jsonify({"message": "URL not found."}), 404
project_obj_id = url_doc.get("projectId")
if not project_obj_id or not isinstance(project_obj_id, ObjectId):
logger.error(f"URL {url_id} has missing or invalid projectId during update.")
return jsonify({"message": "URL data integrity issue (missing project link)."}), 500
project = db.projects.find_one(
{"_id": project_obj_id},
{"ownerId": 1, "collaborators": 1} # Projection for access check
)
if not project:
logger.error(f"Project {project_obj_id} associated with URL {url_id} not found during update.")
return jsonify({"message": "Associated project not found; cannot verify access."}), 404 # Or 500
owner_id = project.get("ownerId")
collaborators = project.get("collaborators", [])
if not owner_id:
logger.error(f"Project {project_obj_id} is missing ownerId during URL update.")
return jsonify({"message": "Project data integrity issue."}), 500
if owner_id != user_id and user_id not in collaborators:
return jsonify({"message": "You do not have access to update URLs in this project."}), 403
# --- Prepare Update Fields from validated data ---
update_fields = validated_data # Use the validated dictionary directly
keywords_changed = "keywords" in update_fields # Check if keywords were part of the update
# Always update the 'updatedAt' timestamp
update_fields["updatedAt"] = datetime.datetime.now(datetime.timezone.utc)
# --- Perform Update ---
result = db.urls.update_one({"_id": obj_url_id}, {"$set": update_fields})
# --- Return Response ---
if result.matched_count == 1:
# Retrieve the updated URL doc to return simplified info
updated_url_doc = db.urls.find_one(
{"_id": obj_url_id},
{"_id": 1, "title": 1, "url": 1} # Projection for list schema
)
if updated_url_doc:
# Serialize using the list schema for consistency
output_schema = URLListSchema()
serialized_url = output_schema.dump(updated_url_doc)
# Trigger keyword recalc for the project in background if keywords changed
if keywords_changed:
try:
async_recalc_project_keywords.delay(str(project_obj_id), str(user_id))
logger.info(f"Queued keyword recalc task for project {project_obj_id} after URL {url_id} update.")
except NameError:
logger.error("Celery task 'async_recalc_project_keywords' not available during URL update.")
except Exception as celery_err:
logger.error(f"Failed to queue Celery recalc task for project {project_obj_id} after URL update: {celery_err}", exc_info=True)
return jsonify({"message": "URL updated successfully.", "url": serialized_url}), 200
else:
logger.warning(f"URL {url_id} updated but could not be retrieved.")
return jsonify({"message": "URL updated successfully, but failed to retrieve updated data."}), 200
else:
# Matched count was 0
return jsonify({"message": "URL update failed (document not found)."}), 404
except Exception as e:
logger.error(f"Error updating URL {url_id}: {e}", exc_info=True)
return jsonify({"message": "An error occurred while updating the URL."}), 500
# --------------------------
# Delete URL
# Path: DELETE /api/urls/<url_id>
# --------------------------
@bp.route('/urls/<string:url_id>', methods=['DELETE'])
@token_required
def delete_url(current_user, url_id):
"""
Delete a specific URL entry by its ID.
Verifies user access via the associated project.
Triggers project keyword recalculation after deletion.
(No schema needed for input/output here)
"""
logger = _get_logger()
# Validate user object from token
if not current_user or not current_user.get("_id"):
return jsonify({"message": "Internal authorization error."}), 500
try:
user_id = ObjectId(current_user["_id"])
except (InvalidId, TypeError) as e:
logger.error(f"User ID conversion error in delete_url: {e}")
return jsonify({"message": "Invalid user ID format in token."}), 400
# Check DB connection
if not mongo: return jsonify({"message": "Database connection not available."}), 500
try:
# Validate URL ID format
try:
obj_url_id = ObjectId(url_id)
except InvalidId:
return jsonify({"message": "Invalid URL ID format."}), 400
db = mongo.db
# --- Find URL and Verify Access ---
# Fetch projectId needed for access check and recalc trigger
url_doc = db.urls.find_one({"_id": obj_url_id}, {"projectId": 1})
if not url_doc:
return jsonify({"message": "URL not found."}), 404
project_obj_id = url_doc.get("projectId")
if not project_obj_id or not isinstance(project_obj_id, ObjectId):
logger.error(f"URL {url_id} has missing or invalid projectId during delete.")
return jsonify({"message": "URL data integrity issue (missing project link)."}), 500
project = db.projects.find_one(
{"_id": project_obj_id},
{"ownerId": 1, "collaborators": 1} # Projection for access check
)
# If associated project is missing, we cannot verify access, deny deletion.
if not project:
logger.error(f"Project {project_obj_id} associated with URL {url_id} not found during delete.")
return jsonify({"message": "Cannot verify access; associated project missing."}), 403 # Deny access
owner_id = project.get("ownerId")
collaborators = project.get("collaborators", [])
if not owner_id:
logger.error(f"Project {project_obj_id} is missing ownerId during URL delete.")
return jsonify({"message": "Project data integrity issue."}), 500
# Check if user has access rights (owner or collaborator)
if owner_id != user_id and user_id not in collaborators:
return jsonify({"message": "You do not have permission to delete URLs in this project."}), 403
# --- Perform Deletion ---
delete_result = db.urls.delete_one({"_id": obj_url_id})
# --- Return Response ---
if delete_result.deleted_count == 1:
# Trigger keyword recalc for the project in background after successful URL deletion
try:
async_recalc_project_keywords.delay(str(project_obj_id), str(user_id))
logger.info(f"Queued keyword recalc task for project {project_obj_id} after URL {url_id} deletion.")
except NameError:
logger.error("Celery task 'async_recalc_project_keywords' not available during URL deletion.")
except Exception as celery_err:
logger.error(f"Failed to queue Celery recalc task for project {project_obj_id} after URL deletion: {celery_err}", exc_info=True)
# Still return success for the deletion itself
return jsonify({"message": "URL deleted successfully."}), 200 # 200 OK or 204 No Content
else:
# Document existed (find_one succeeded) but delete failed
logger.error(f"URL {obj_url_id} found but delete_one failed (deleted_count=0).")
return jsonify({"message": "Failed to delete URL (already deleted?)."}), 404 # Or 500
except Exception as e:
logger.error(f"Error deleting URL {url_id}: {e}", exc_info=True)
return jsonify({"message": "An error occurred while deleting the URL."}), 500
# --------------------------
# Celery Task Trigger Endpoints
# Path: PUT /api/urls/<url_id>/extract_title_and_keywords
# Path: PUT /api/urls/<url_id>/summarize
# --------------------------
@bp.route('/urls/<string:url_id>/extract_title_and_keywords', methods=['PUT'])
@token_required
def trigger_extract_title_and_keywords(current_user, url_id):
"""
Manually triggers the background task for extracting title and keywords for a URL.
Verifies user access via the associated project.
Sets processingStatus to 'pending'.
(No schema needed for input/output here)
"""
logger = _get_logger()
# Validate user object from token
if not current_user or not current_user.get("_id"):
return jsonify({"message": "Internal authorization error."}), 500
try:
user_id = ObjectId(current_user["_id"])
user_id_str = str(user_id) # Keep string version for Celery task
except (InvalidId, TypeError) as e:
logger.error(f"User ID conversion error in trigger_extract: {e}")
return jsonify({"message": "Invalid user ID format in token."}), 400
# Check DB connection
if not mongo: return jsonify({"message": "Database connection not available."}), 500
try:
# Validate URL ID format
try:
obj_url_id = ObjectId(url_id)
except InvalidId:
return jsonify({"message": "Invalid URL ID format."}), 400
db = mongo.db
# --- Find URL and Verify Access ---
url_doc = db.urls.find_one({"_id": obj_url_id}, {"projectId": 1})
if not url_doc:
return jsonify({"message": "URL not found."}), 404
project_obj_id = url_doc.get("projectId")
if not project_obj_id or not isinstance(project_obj_id, ObjectId):
logger.error(f"URL {url_id} has missing or invalid projectId during trigger_extract.")
return jsonify({"message": "URL data integrity issue (missing project link)."}), 500
project = db.projects.find_one(
{"_id": project_obj_id},
{"ownerId": 1, "collaborators": 1} # Projection for access check
)
if not project:
logger.error(f"Project {project_obj_id} associated with URL {url_id} not found during trigger_extract.")
return jsonify({"message": "Associated project not found; cannot verify access."}), 404 # Or 500
owner_id = project.get("ownerId")
collaborators = project.get("collaborators", [])
if not owner_id:
logger.error(f"Project {project_obj_id} is missing ownerId during trigger_extract.")
return jsonify({"message": "Project data integrity issue."}), 500
if owner_id != user_id and user_id not in collaborators:
return jsonify({"message": "You do not have access to trigger processing for this URL."}), 403
# --- Update Status and Queue Task ---
# Set status to pending before queueing
db.urls.update_one({"_id": obj_url_id},
{"$set": {"processingStatus": "pending", "updatedAt": datetime.datetime.now(datetime.timezone.utc)}})
try:
# Queue the Celery task
async_extract_title_and_keywords.delay(url_id, user_id_str)
logger.info(f"Queued title/keyword extraction task for URL {url_id}")
return jsonify({"message": "Title and keyword extraction task queued successfully."}), 202 # 202 Accepted
except NameError:
logger.error("Celery task 'async_extract_title_and_keywords' is not defined or imported correctly.")
# Revert status? Or leave as pending with error? Let's leave as pending.
return jsonify({"message": "Server configuration error: Extraction feature unavailable."}), 500
except Exception as e:
logger.error(f"Error queueing extraction task for URL {url_id}: {e}", exc_info=True)
# Revert status? Or leave as pending with error? Let's leave as pending.
return jsonify({"message": "An error occurred while queueing the extraction task."}), 500
except Exception as e:
logger.error(f"Error in trigger_extract_title_and_keywords endpoint for URL {url_id}: {e}", exc_info=True)
return jsonify({"message": "An internal error occurred before queueing the task."}), 500
@bp.route('/urls/<string:url_id>/summarize', methods=['PUT'])
@token_required
def trigger_summarize_url(current_user, url_id):
"""
Manually triggers the background task for summarizing a URL.
Verifies user access via the associated project.
Determines whether to use Gemini based on user's selected API key.
Sets processingStatus to 'pending'.
(No schema needed for input/output here)
"""
logger = _get_logger()
# Validate user object from token
if not current_user or not current_user.get("_id"):
return jsonify({"message": "Internal authorization error."}), 500
try:
user_id = ObjectId(current_user["_id"])
user_id_str = str(user_id) # Keep string version for Celery task
except (InvalidId, TypeError) as e:
logger.error(f"User ID conversion error in trigger_summarize: {e}")
return jsonify({"message": "Invalid user ID format in token."}), 400
# Check DB connection
if not mongo: return jsonify({"message": "Database connection not available."}), 500
try:
# Validate URL ID format
try:
obj_url_id = ObjectId(url_id)
except InvalidId:
return jsonify({"message": "Invalid URL ID format."}), 400
db = mongo.db
# --- Find URL and Verify Access ---
url_doc = db.urls.find_one({"_id": obj_url_id}, {"projectId": 1})
if not url_doc:
return jsonify({"message": "URL not found."}), 404
project_obj_id = url_doc.get("projectId")
if not project_obj_id or not isinstance(project_obj_id, ObjectId):
logger.error(f"URL {url_id} has missing or invalid projectId during trigger_summarize.")
return jsonify({"message": "URL data integrity issue (missing project link)."}), 500
project = db.projects.find_one(
{"_id": project_obj_id},
{"ownerId": 1, "collaborators": 1} # Projection for access check
)
if not project:
logger.error(f"Project {project_obj_id} associated with URL {url_id} not found during trigger_summarize.")
return jsonify({"message": "Associated project not found; cannot verify access."}), 404 # Or 500
owner_id = project.get("ownerId")
collaborators = project.get("collaborators", [])
if not owner_id:
logger.error(f"Project {project_obj_id} is missing ownerId during trigger_summarize.")
return jsonify({"message": "Project data integrity issue."}), 500
if owner_id != user_id and user_id not in collaborators:
return jsonify({"message": "You do not have access to trigger processing for this URL."}), 403
# --- Update Status, Check API Key, and Queue Task ---
# Set status to pending before queueing
db.urls.update_one({"_id": obj_url_id},
{"$set": {"processingStatus": "pending", "updatedAt": datetime.datetime.now(datetime.timezone.utc)}})
# Check for user's selected Gemini API key
api_doc = db.api_list.find_one({"uid": user_id, "selected": True, "name": "Gemini"})
use_gemini = bool(api_doc and api_doc.get("key")) # True if Gemini selected and key exists
try:
# Queue the Celery task, passing the use_gemini flag
async_summarize_url.delay(url_id, user_id_str, use_gemini)
logger.info(f"Queued summarization task for URL {url_id} (use_gemini={use_gemini})")
return jsonify({"message": "Summarization task queued successfully."}), 202 # 202 Accepted
except NameError:
logger.error("Celery task 'async_summarize_url' is not defined or imported correctly.")
# Revert status? Or leave as pending? Leave as pending.
return jsonify({"message": "Server configuration error: Summarization feature unavailable."}), 500
except Exception as e:
logger.error(f"Error queueing summarization task for URL {url_id}: {e}", exc_info=True)
# Revert status? Or leave as pending? Leave as pending.
return jsonify({"message": "An error occurred while queueing the summarization task."}), 500
except Exception as e:
logger.error(f"Error in trigger_summarize_url endpoint for URL {url_id}: {e}", exc_info=True)
return jsonify({"message": "An internal error occurred before queueing the task."}), 500
# --------------------------
# Search URLs within Project
# Path: GET /api/projects/<project_id>/search?q=...
# --------------------------
@bp.route('/projects/<string:project_id>/search', methods=['GET'])
@token_required
def search_urls(current_user, project_id):
"""
Search for URLs within a specific project based on a query string.
Uses URLSearchResultSchema for output serialization.
Searches 'title', 'note', 'keywords.word', and 'summary' fields using regex.
Returns a simplified list (id, title, url) of matching URLs.
Verifies user access to the project.
"""
logger = _get_logger()
# Validate user object from token
if not current_user or not current_user.get("_id"):
return jsonify({"message": "Internal authorization error."}), 500
try:
user_id = ObjectId(current_user["_id"])
except (InvalidId, TypeError) as e:
logger.error(f"User ID conversion error in search_urls: {e}")
return jsonify({"message": "Invalid user ID format in token."}), 400
# Check dependencies
if not mongo: return jsonify({"message": "Database connection not available."}), 500
if not URLSearchResultSchema: return jsonify({"message": "Server configuration error: Schema unavailable."}), 500
try:
# Get search query string from query parameters
query_str = request.args.get("q", "").strip()
# If query string is empty, return empty results immediately
if not query_str:
return jsonify({"results": []}), 200
# Validate project ID format from URL path
try:
obj_project_id = ObjectId(project_id)
except InvalidId:
return jsonify({"message": "Invalid project ID format in URL path."}), 400
db = mongo.db
# --- Verify Project Access ---
project = db.projects.find_one(
{"_id": obj_project_id},
{"ownerId": 1, "collaborators": 1} # Projection for access check
)
if not project:
return jsonify({"message": "Project not found."}), 404
owner_id = project.get("ownerId")
collaborators = project.get("collaborators", [])
if owner_id != user_id and user_id not in collaborators:
return jsonify({"message": "Access denied to search URLs in this project."}), 403
# --- Perform Search using Aggregation Pipeline ---
# Escape regex special characters in the query string for safety
escaped_query = re.escape(query_str)
search_pipeline = [
{"$match": {"projectId": obj_project_id}},
{"$match": {
"$or": [
{"title": {"$regex": escaped_query, "$options": "i"}},
{"note": {"$regex": escaped_query, "$options": "i"}},
{"keywords.word": {"$regex": escaped_query, "$options": "i"}},
{"summary": {"$regex": escaped_query, "$options": "i"}}
]
}},
# Project only fields needed by the output schema
{"$project": {"_id": 1, "title": 1, "url": 1, "updatedAt": 1}},
{"$sort": {"updatedAt": -1}} # Sort by update time
# Add $limit stage if needed
]
# Execute the aggregation pipeline
results_cursor = db.urls.aggregate(search_pipeline)
search_result_docs = list(results_cursor) # Convert cursor to list
# --- Serialize results using the schema ---
output_schema = URLSearchResultSchema(many=True)
# Schema handles ObjectId conversion and field selection
serialized_result = output_schema.dump(search_result_docs)
# Return the search results
return jsonify({"results": serialized_result}), 200
except Exception as e:
logger.error(f"Error searching URLs in project {project_id} with query '{query_str}': {e}", exc_info=True)
return jsonify({"message": "An error occurred during URL search."}), 500