2025-06-09 17:53:19 +08:00

788 lines
36 KiB
Python

# myapp/dialog/dialog_routes.py
import datetime
import os
import logging
from flask import request, jsonify, current_app, has_app_context # Flask utilities
from bson.objectid import ObjectId, InvalidId # For MongoDB ObjectIds
from functools import wraps # Import wraps for dummy decorator
# --- Local Blueprint Import ---
from . import bp # Import the 'bp' instance defined in the local __init__.py
# --- Shared Extensions and Utilities Imports ---
try:
from ..extensions import mongo # Import the initialized PyMongo instance
from ..utils import token_required # Import the authentication decorator
except ImportError:
# Fallback or error handling if imports fail
print("Warning: Could not import mongo or token_required in dialog/dialog_routes.py.")
mongo = None
# Define a dummy decorator if token_required is missing
def token_required(f):
@wraps(f)
def wrapper(*args, **kwargs):
print("ERROR: token_required decorator is not available!")
return jsonify({"message": "Server configuration error: Missing authentication utility."}), 500
return wrapper
# --- Schema Imports ---
try:
# Import the relevant schemas defined in schemas.py
from ..schemas import (
DialogCreateSchema, DialogSendMessageSchema,
DialogSchema, DialogSummarySchema
)
from marshmallow import ValidationError
except ImportError:
print("Warning: Could not import Dialog schemas or ValidationError in dialog/dialog_routes.py.")
DialogCreateSchema = None
DialogSendMessageSchema = None
DialogSchema = None
DialogSummarySchema = None
ValidationError = None # Define ValidationError as None if import fails
# --- External API and Langchain Imports ---
# Keep these imports conditional to avoid errors if libraries are not installed
try:
import google.generativeai as genai
from google.api_core import exceptions as google_exceptions
except ImportError:
print("Warning: google.generativeai not installed. Gemini functionality will fail.")
genai = None
google_exceptions = None
try:
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.docstore.document import Document
except ImportError:
print("Warning: Langchain components not installed. Vector store functionality will fail.")
OpenAIEmbeddings = None
FAISS = None
Document = None
# --- Constants ---
# Adjust model name if needed, potentially move to config
MAX_HISTORY_MESSAGES = 20 # Max messages to keep in history for context (if applicable, not used in current Gemini call)
# --- Helper to get logger safely ---
def _get_logger():
if has_app_context():
return current_app.logger
return logging.getLogger(__name__)
##################################################
# Helper Functions (kept within this module)
##################################################
# --- Knowledge Base Helpers ---
# (generate_knowledge_base_message, update_project_knowledge, process_api_response_and_update_knowledge - unchanged from previous version)
def generate_knowledge_base_message(project_id):
"""
Retrieves all URL documents for the given project and formats them
into a single knowledge base message string for context.
Args:
project_id (ObjectId): The ObjectId of the project.
Returns:
str: A formatted string containing knowledge base entries, or an empty string on error.
Returns "No external knowledge available." if no URLs are found.
"""
logger = _get_logger()
if not mongo:
logger.error("Mongo extension not available in generate_knowledge_base_message.")
return "" # Cannot proceed without DB connection
try:
# Ensure project_id is ObjectId
if not isinstance(project_id, ObjectId):
project_id = ObjectId(project_id)
# Find all URL documents linked to the project ID
urls_cursor = mongo.db.urls.find({"projectId": project_id})
knowledge_entries = []
for doc in urls_cursor:
# Format keywords with percentages
keywords_list = [f"{kw.get('word', '')}({kw.get('percentage', 'N/A')}%)" for kw in doc.get("keywords", [])]
# Aggregate relevant fields into a string entry
aggregated = (
f"Title: {doc.get('title', 'N/A')}\n"
f"URL: {doc.get('url', 'N/A')}\n"
# f"Starred: {doc.get('starred', False)}\n" # Optionally include starred status
f"Note: {doc.get('note', 'N/A')}\n"
f"Keywords: {', '.join(keywords_list) if keywords_list else 'N/A'}\n"
f"Summary: {doc.get('summary', 'N/A')}"
)
knowledge_entries.append(aggregated)
# Handle case where no URLs are found
if not knowledge_entries:
return "No external knowledge available for this project."
# Combine entries and truncate if necessary
combined = "\n\n---\n\n".join(knowledge_entries)
# Use Flask config for max length if available, otherwise default
max_length = current_app.config.get('KNOWLEDGE_BASE_MAX_LENGTH', 4000) if has_app_context() else 4000
if len(combined) > max_length:
combined = combined[:max_length] + " ... [truncated]"
return combined
except InvalidId:
logger.error(f"Invalid project_id format passed to generate_knowledge_base_message: {project_id}")
return "Error: Invalid project identifier."
except AttributeError:
logger.error("PyMongo extension not initialized or available.")
return "Error: Database configuration issue."
except Exception as e:
# Log the error with project ID for easier debugging
logger.error(f"Error generating knowledge base message for project {project_id}: {e}", exc_info=True)
return "" # Return empty string on generic error
def update_project_knowledge(project_id):
"""
Updates the project's 'summary' field with a condensed version of its knowledge base.
This acts as a cache or snapshot for quick reference.
Args:
project_id (ObjectId): The ObjectId of the project.
"""
logger = _get_logger()
if not mongo:
logger.error("Mongo extension not available in update_project_knowledge.")
return
try:
# Ensure project_id is ObjectId
if not isinstance(project_id, ObjectId):
project_id = ObjectId(project_id)
knowledge_message = generate_knowledge_base_message(project_id)
# Condense the message for storage (e.g., first 1000 chars)
condensed = knowledge_message[:1000] if len(knowledge_message) > 1000 else knowledge_message
# Update the project document in the 'projects' collection
mongo.db.projects.update_one(
{"_id": project_id},
{"$set": {"summary": condensed, "updatedAt": datetime.datetime.now(datetime.timezone.utc)}}
)
except InvalidId:
logger.error(f"Invalid project_id format passed to update_project_knowledge: {project_id}")
except AttributeError:
logger.error("PyMongo extension not initialized or available.")
except Exception as e:
logger.error(f"Error updating project knowledge cache for {project_id}: {e}", exc_info=True)
def process_api_response_and_update_knowledge(api_response, project_id):
"""
Placeholder function to process LLM responses. Currently updates project knowledge cache.
"""
# For now, simply update the cached summary in the project document
update_project_knowledge(project_id)
# Future enhancements could go here
# --- Vector Store Helpers ---
def build_vector_knowledge_base(project_id, query, k=3):
"""
Builds a vector index (FAISS) from project URL content and retrieves top-k relevant documents.
Args:
project_id (ObjectId): The ObjectId of the project.
query (str): The user query for similarity search.
k (int): The number of top similar documents to retrieve.
Returns:
List[Document]: A list of LangChain Document objects, or an empty list on error/no data.
"""
logger = _get_logger()
# Check if necessary components are available
if not mongo or not OpenAIEmbeddings or not FAISS or not Document:
logger.error("Missing dependencies (Mongo, Langchain) for build_vector_knowledge_base.")
return []
try:
# Ensure project_id is ObjectId
if not isinstance(project_id, ObjectId):
project_id = ObjectId(project_id)
# Fetch URL documents from MongoDB
urls_cursor = mongo.db.urls.find({"projectId": project_id})
texts = []
metadatas = []
for doc in urls_cursor:
# Aggregate text content for embedding
keywords_list = [f"{kw.get('word', '')}({kw.get('percentage', 'N/A')}%)" for kw in doc.get("keywords", [])]
aggregated = (
f"Title: {doc.get('title', 'N/A')}\n"
f"URL: {doc.get('url', 'N/A')}\n"
# f"Starred: {doc.get('starred', False)}\n" # Optionally include more fields
f"Note: {doc.get('note', 'N/A')}\n"
f"Keywords: {', '.join(keywords_list) if keywords_list else 'N/A'}\n"
f"Summary: {doc.get('summary', 'N/A')}"
)
texts.append(aggregated)
# Store relevant metadata alongside the text
metadatas.append({"url": doc.get("url", ""), "title": doc.get("title", ""), "doc_id": str(doc["_id"])})
# If no text content found, return empty list
if not texts:
logger.info(f"No URL text content found for project {project_id} to build vector base.")
return []
# Initialize embeddings model (ensure OPENAI_API_KEY is set in environment or config)
try:
# Check if OPENAI_API_KEY exists (more robust check)
openai_api_key = os.environ.get("OPENAI_API_KEY") or (current_app.config.get("OPENAI_API_KEY") if has_app_context() else None)
if not openai_api_key:
raise ValueError("OPENAI_API_KEY environment variable or Flask config not set.")
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)
except Exception as e:
logger.error(f"Failed to initialize OpenAIEmbeddings: {e}. Check API key.", exc_info=False) # Avoid logging key
return []
# Build FAISS vector store from the texts and metadata
vectorstore = FAISS.from_texts(texts=texts, embedding=embeddings, metadatas=metadatas)
# Perform similarity search
docs = vectorstore.similarity_search(query, k=k)
return docs
except InvalidId:
logger.error(f"Invalid project_id format passed to build_vector_knowledge_base: {project_id}")
return []
except AttributeError:
logger.error("PyMongo or Langchain components not initialized or available.")
return []
except Exception as e:
logger.error(f"Error building vector knowledge base for project {project_id}: {e}", exc_info=True)
return [] # Return empty list on error
def build_vector_based_prompt_with_knowledge(user_message, project_id):
"""
Constructs a prompt for the LLM, incorporating context from vector search results.
Args:
user_message (str): The user's latest message/query.
project_id (ObjectId): The ObjectId of the project.
Returns:
str: The formatted prompt string including retrieved knowledge.
"""
# Retrieve top 3 relevant documents using vector search
retrieved_docs = build_vector_knowledge_base(project_id, user_message, k=3)
# Format the retrieved knowledge for inclusion in the prompt
if retrieved_docs:
# Join the page_content of each retrieved LangChain Document
knowledge_text = "\n\n---\n\n".join([f"Source URL: {doc.metadata.get('url', 'N/A')}\n{doc.page_content}" for doc in retrieved_docs])
else:
knowledge_text = "No relevant external knowledge found via vector search for this query."
# Construct the final prompt with instructions, knowledge, and user query
# Make prompt more specific about using ONLY the provided knowledge
prompt = (
"You are an expert research assistant. Analyze the following retrieved documents, which contain information "
"(titles, URLs, notes, keywords, summaries) from websites related to the current research project. "
"Base your response *only* on this provided information and the user's query.\n\n"
"Common user questions might involve:\n"
"- Summarizing key topics from the retrieved documents.\n"
"- Suggesting research directions based *only* on the retrieved documents.\n"
"- Recommending specific URLs *from the retrieved documents* that are most relevant.\n"
"- Identifying potentially redundant information *within the retrieved documents*.\n\n"
"--- Relevant Retrieved Knowledge ---\n"
f"{knowledge_text}\n"
"--- End Retrieved Knowledge ---\n\n"
"User Query:\n"
f"{user_message}\n\n"
"Based strictly on the retrieved knowledge and the user query, provide your analysis and recommendations:"
)
return prompt
# --- Gemini Message Formatting Helper (Not currently used by send_dialog_message) ---
def format_messages_for_gemini(db_messages, max_history=MAX_HISTORY_MESSAGES):
"""
Converts dialog history from DB format to Gemini API format.
Handles role mapping ('system' -> 'model') and ensures role alternation.
"""
logger = _get_logger()
contents = []
last_role = None
recent_messages = db_messages[-max_history:] # Get the most recent messages
for msg in recent_messages:
gemini_role = "model" if msg.get("role") == "system" else "user"
if gemini_role == last_role:
logger.warning(f"Skipping consecutive message of role '{gemini_role}' in formatting.")
continue
contents.append({
"role": gemini_role,
"parts": [{"text": msg.get("content", "")}]
})
last_role = gemini_role
if contents and contents[-1]["role"] != "user":
logger.warning("Formatted history for Gemini does not end with a 'user' message.")
return contents
##################################################
# Dialog API Endpoints
##################################################
# Note: Routes use paths relative to the '/api/dialog' prefix.
@bp.route('/', methods=['POST']) # Path relative to prefix
@token_required
def create_dialog_session(current_user):
"""
Creates a new dialog session associated with a project.
Uses DialogCreateSchema for input validation.
Expects JSON: { "projectId": "<ObjectId_string>", "sessionId": "<optional_string>", "startMessage": "<optional_string>" }
Determines the LLM provider based on the user's selected API key.
"""
logger = _get_logger()
# Validate user
if not current_user or not current_user.get("_id"):
return jsonify({"message": "Internal authorization error."}), 500
try:
user_id = ObjectId(current_user["_id"])
except (InvalidId, TypeError) as e:
logger.error(f"User ID conversion error in create_dialog_session: {e}")
return jsonify({"message": "Invalid user ID format in token."}), 400
# Check dependencies
if not mongo: return jsonify({"message": "Database connection not available."}), 500
if not DialogCreateSchema or not ValidationError:
return jsonify({"message": "Server configuration error: Schema unavailable."}), 500
# Get and validate JSON data using the schema
json_data = request.get_json() or {}
schema = DialogCreateSchema()
try:
validated_data = schema.load(json_data)
except ValidationError as err:
logger.warning(f"Create dialog session validation failed: {err.messages}")
return jsonify(err.messages), 422 # Return validation errors
# Extract validated data
project_id_str = validated_data['projectId'] # Already validated as ObjectId string by schema
session_id = validated_data.get("sessionId", "") # Optional
start_message = validated_data.get("startMessage", "").strip() # Optional
try:
# Convert project ID
project_obj_id = ObjectId(project_id_str) # Conversion should succeed due to schema validation
# Find the user's selected API key
db = mongo.db
selected_api = db.api_list.find_one({"uid": user_id, "selected": True})
if not selected_api:
return jsonify({"message": "User has no selected API provider. Please select one in API Keys."}), 400
provider = selected_api.get("name")
api_key_exists = bool(selected_api.get("key")) # Check if key value exists
# Validate provider and key presence
allowed_providers = ["Gemini", "Deepseek", "Chatgpt"] # Consider from config
if provider not in allowed_providers:
return jsonify({"message": f"Selected provider '{provider}' is not supported."}), 400
if not api_key_exists:
return jsonify({"message": f"API key value missing for selected provider '{provider}'."}), 400
# Verify project exists and user has access
project = db.projects.find_one({"_id": project_obj_id}, {"ownerId": 1, "collaborators": 1})
if not project:
return jsonify({"message": "Project not found."}), 404
owner_id = project.get("ownerId")
collaborators = project.get("collaborators", [])
if owner_id != user_id and user_id not in collaborators:
return jsonify({"message": "Access denied to the specified project."}), 403
# Prepare initial messages if startMessage exists
now = datetime.datetime.now(datetime.timezone.utc)
messages_array = []
if start_message:
messages_array.append({
"role": "user",
"content": start_message,
"timestamp": now # Store timestamp for messages
})
# Prepare the new dialog document
dialog_doc = {
"uid": user_id,
"projectId": project_obj_id,
"provider": provider, # Store the provider used for this session
"sessionStartedAt": now,
"sessionEndedAt": None, # Mark as null initially
"messages": messages_array
}
if session_id: dialog_doc["sessionId"] = session_id
# Insert the new dialog session
result = db.dialog_activity.insert_one(dialog_doc)
# Return success response with the new dialog ID
return jsonify({
"message": "Dialog session created successfully.",
"dialog_id": str(result.inserted_id)
}), 201
except KeyError: # Should be caught by initial user_id check
logger.error(f"User ID (_id) not found in token payload for create_dialog_session.")
return jsonify({"message": "Authentication token is invalid or missing user ID."}), 401
except AttributeError:
logger.error("PyMongo extension not initialized or attached correctly.")
return jsonify({"message": "Database configuration error."}), 500
except Exception as e:
logger.error(f"Error creating dialog session for user {current_user.get('_id', 'UNKNOWN')}: {e}", exc_info=True)
return jsonify({"message": "Internal server error creating dialog session."}), 500
@bp.route('/<string:dialog_id>/send', methods=['POST']) # Path relative to prefix
@token_required
def send_dialog_message(current_user, dialog_id):
"""
Sends a user message within a specific dialog session.
Uses DialogSendMessageSchema for input validation.
Expects JSON: { "content": "User's message text" }
Retrieves context using vector search, builds a prompt, calls the LLM (Gemini),
and stores the conversation turn in the dialog history.
"""
logger = _get_logger()
# Check dependencies
if not mongo: return jsonify({"message": "Database connection not available."}), 500
if not genai or not google_exceptions: return jsonify({"message": "Gemini API library not available."}), 500
if not DialogSendMessageSchema or not ValidationError:
return jsonify({"message": "Server configuration error: Schema unavailable."}), 500
try:
# Validate IDs
user_id_str = str(current_user.get("_id"))
if not user_id_str: return jsonify({"message": "Missing user ID in token."}), 400
try:
user_id = ObjectId(user_id_str)
dialog_obj_id = ObjectId(dialog_id)
except InvalidId:
return jsonify({"message": "Invalid user or dialog ID format."}), 400
# Get and validate user message content using schema
json_data = request.get_json() or {}
schema = DialogSendMessageSchema()
try:
validated_data = schema.load(json_data)
except ValidationError as err:
logger.warning(f"Send dialog message validation failed: {err.messages}")
return jsonify(err.messages), 422
content = validated_data['content'] # Use validated content
# --- Retrieve Dialog and API Key ---
db = mongo.db
dialog_doc = db.dialog_activity.find_one({"_id": dialog_obj_id, "uid": user_id})
if not dialog_doc: return jsonify({"message": "Dialog session not found or access denied."}), 404
if dialog_doc.get("sessionEndedAt"): return jsonify({"message": "This dialog session has ended."}), 409 # 409 Conflict
provider = dialog_doc.get("provider")
if provider != "Gemini": # This endpoint currently only supports Gemini
return jsonify({"message": f"This endpoint only supports 'Gemini', but session provider is '{provider}'."}), 400
# Find the active Gemini API key for the user
api_doc = db.api_list.find_one({"uid": user_id, "name": "Gemini", "selected": True})
if not (api_doc and api_doc.get("key")):
logger.error(f"No valid Gemini key found or selected for user {user_id} during send message.")
return jsonify({"message": "Gemini API key not configured or selected."}), 400
gemini_key = api_doc["key"]
# --- Build Prompt with Vector Knowledge ---
project_id = dialog_doc.get("projectId")
if not project_id or not isinstance(project_id, ObjectId):
logger.error(f"Dialog {dialog_id} is missing valid projectId.")
return jsonify({"message": "Internal error: Project reference missing."}), 500
# This builds the prompt incorporating vector search results
detailed_prompt = build_vector_based_prompt_with_knowledge(content, project_id)
# Prepare history for Gemini (currently just the detailed prompt as a single user turn)
gemini_history = [{"role": "user", "parts": [{"text": detailed_prompt}]}]
# --- Call Gemini API ---
llm_response_text = "[LLM Call Skipped/Failed]" # Default response text
try:
genai.configure(api_key=gemini_key)
model = genai.GenerativeModel(current_app.config["GEMINI_MODEL_NAME"])
# Consider adding generation_config and safety_settings from Flask config
llm_response = model.generate_content(gemini_history)
# Extract text, handling potential blocks or empty responses
try:
llm_response_text = llm_response.text
except ValueError:
logger.warning(f"Gemini response for dialog {dialog_id} may have been blocked or empty. Feedback: {llm_response.prompt_feedback}")
llm_response_text = "[Response blocked by safety filters or returned no text content]"
except google_exceptions.PermissionDenied as ex:
logger.warning(f"Gemini Permission Denied for user {user_id}: {ex}")
return jsonify({"message": "Gemini API Error: Invalid API key or insufficient permissions."}), 403
except google_exceptions.ResourceExhausted as ex:
logger.warning(f"Gemini Resource Exhausted for user {user_id}: {ex}")
return jsonify({"message": "Gemini API Error: Rate limit or quota exceeded."}), 429
except google_exceptions.GoogleAPIError as ex: # Catch other Google API errors
logger.error(f"Gemini API communication error for user {user_id}: {ex}", exc_info=True)
return jsonify({"message": "An error occurred while communicating with the Gemini API."}), 503 # 503 Service Unavailable
except Exception as e: # Catch potential genai configuration errors etc.
logger.error(f"Unexpected error during Gemini call setup or execution for user {user_id}: {e}", exc_info=True)
return jsonify({"message": "Internal server error during LLM communication."}), 500
# --- Process Response and Update DB ---
now = datetime.datetime.now(datetime.timezone.utc)
user_msg_entry = {"role": "user", "content": content, "timestamp": now}
system_msg_entry = {"role": "system", "content": llm_response_text, "timestamp": now} # Use same timestamp for pair
# Add both messages to the dialog history in MongoDB atomically
update_res = db.dialog_activity.update_one(
{"_id": dialog_obj_id},
{"$push": {"messages": {"$each": [user_msg_entry, system_msg_entry]}}}
)
if update_res.modified_count != 1:
logger.warning(f"Dialog {dialog_id} DB update failed after LLM call (modified_count={update_res.modified_count}).")
# Decide if this should be an error response to the user
# Process the response (e.g., update cached knowledge)
process_api_response_and_update_knowledge(llm_response_text, project_id)
# Return the LLM's response text to the client
return jsonify({"message": "LLM response received.", "llmResponse": llm_response_text}), 200
except KeyError: # Should be caught by initial user_id check
logger.error(f"User ID (_id) not found in token payload for send_dialog_message.")
return jsonify({"message": "Authentication token is invalid or missing user ID."}), 401
except AttributeError:
logger.error("PyMongo or other extension not initialized correctly.")
return jsonify({"message": "Server configuration error."}), 500
except Exception as e:
logger.error(f"Unexpected error in send_dialog_message for dialog {dialog_id}: {e}", exc_info=True)
return jsonify({"message": "Internal server error processing message."}), 500
@bp.route('/', methods=['GET']) # Path relative to prefix
@token_required
def list_dialog_sessions(current_user):
"""
Lists dialog sessions for the authenticated user.
Uses DialogSummarySchema for output serialization.
Supports filtering by 'projectId' query parameter.
Excludes the 'messages' array for brevity.
"""
logger = _get_logger()
# Check dependencies
if not mongo: return jsonify({"message": "Database connection not available."}), 500
if not DialogSummarySchema: return jsonify({"message": "Server configuration error: Schema unavailable."}), 500
try:
# Validate user ID
user_id_str = str(current_user.get("_id"))
if not user_id_str: return jsonify({"message": "Missing user ID in token."}), 400
try:
user_id = ObjectId(user_id_str)
except InvalidId:
return jsonify({"message": "Invalid user ID format in token."}), 400
# Base query for the user's dialogs
query = {"uid": user_id}
# Add projectId filter if provided in query parameters
project_id_str = request.args.get("projectId")
if project_id_str:
try:
project_obj_id = ObjectId(project_id_str)
query["projectId"] = project_obj_id
except InvalidId:
return jsonify({"message": "Invalid projectId format in query parameter."}), 400
# Fetch dialogs, excluding the messages field, sort by start time descending
db = mongo.db
cursor = db.dialog_activity.find(
query,
{"messages": 0} # Projection to exclude messages
).sort("sessionStartedAt", -1)
dialog_docs = list(cursor) # Convert cursor to list
# --- Serialize results using the schema ---
output_schema = DialogSummarySchema(many=True)
# Schema handles ObjectId and datetime conversion, and field exclusion
serialized_result = output_schema.dump(dialog_docs)
return jsonify({"dialogs": serialized_result}), 200
except KeyError: # Should be caught by initial user_id check
logger.error(f"User ID (_id) not found in token payload for list_dialog_sessions.")
return jsonify({"message": "Authentication token is invalid or missing user ID."}), 401
except AttributeError:
logger.error("PyMongo extension not initialized or attached correctly.")
return jsonify({"message": "Database configuration error."}), 500
except Exception as e:
logger.error(f"Error listing dialogs for user {current_user.get('_id', 'UNKNOWN')}: {e}", exc_info=True)
return jsonify({"message": "Internal server error listing dialog sessions."}), 500
@bp.route('/<string:dialog_id>', methods=['GET']) # Path relative to prefix
@token_required
def get_dialog_session(current_user, dialog_id):
"""
Retrieves the full details of a specific dialog session, including messages.
Uses DialogSchema for output serialization. Verifies ownership.
"""
logger = _get_logger()
# Check dependencies
if not mongo: return jsonify({"message": "Database connection not available."}), 500
if not DialogSchema: return jsonify({"message": "Server configuration error: Schema unavailable."}), 500
try:
# Validate IDs
user_id_str = str(current_user.get("_id"))
if not user_id_str: return jsonify({"message": "Missing user ID in token."}), 400
try:
user_id = ObjectId(user_id_str)
dial_obj_id = ObjectId(dialog_id)
except InvalidId:
return jsonify({"message": "Invalid user or dialog ID format."}), 400
# Find the specific dialog owned by the user
db = mongo.db
doc = db.dialog_activity.find_one({"_id": dial_obj_id, "uid": user_id})
if not doc:
return jsonify({"message": "Dialog session not found or access denied."}), 404
# --- Serialize results using the schema ---
output_schema = DialogSchema()
# Schema handles ObjectId, datetime, and nested message formatting
serialized_result = output_schema.dump(doc)
return jsonify(serialized_result), 200
except KeyError: # Should be caught by initial user_id check
logger.error(f"User ID (_id) not found in token payload for get_dialog_session.")
return jsonify({"message": "Authentication token is invalid or missing user ID."}), 401
except AttributeError:
logger.error("PyMongo extension not initialized or attached correctly.")
return jsonify({"message": "Database configuration error."}), 500
except Exception as e:
logger.error(f"Error retrieving dialog {dialog_id} for user {current_user.get('_id', 'UNKNOWN')}: {e}", exc_info=True)
return jsonify({"message": "Internal server error retrieving dialog session."}), 500
@bp.route('/<string:dialog_id>/end', methods=['PUT']) # Path relative to prefix
@token_required
def end_dialog_session(current_user, dialog_id):
"""
Marks a dialog session as ended by setting the 'sessionEndedAt' timestamp.
Prevents ending an already ended session. Verifies ownership.
(No schema needed for input/output here)
"""
logger = _get_logger()
if not mongo: return jsonify({"message": "Database connection not available."}), 500
try:
# Validate IDs
user_id_str = str(current_user.get("_id"))
if not user_id_str: return jsonify({"message": "Missing user ID in token."}), 400
try:
user_id = ObjectId(user_id_str)
dial_obj_id = ObjectId(dialog_id)
except InvalidId:
return jsonify({"message": "Invalid user or dialog ID format."}), 400
# Check if session exists, belongs to user, and is not already ended
db = mongo.db
existing_doc = db.dialog_activity.find_one({"_id": dial_obj_id, "uid": user_id}, {"sessionEndedAt": 1})
if not existing_doc:
return jsonify({"message": "Dialog session not found or access denied."}), 404
if existing_doc.get("sessionEndedAt") is not None:
# 409 Conflict - the session is already in the 'ended' state
return jsonify({"message": "Dialog session has already been ended."}), 409
# Update the document to set the end time
now = datetime.datetime.now(datetime.timezone.utc)
result = db.dialog_activity.update_one(
{"_id": dial_obj_id, "uid": user_id, "sessionEndedAt": None}, # Ensure it's not already ended atomically
{"$set": {"sessionEndedAt": now}}
)
# Check if the update was successful
if result.modified_count == 1:
return jsonify({"message": "Dialog session marked as ended."}), 200
elif result.matched_count == 1 and result.modified_count == 0:
# This could happen if the session was ended between find_one and update_one (race condition)
logger.warning(f"Dialog {dialog_id} was already ended before update (race condition?).")
return jsonify({"message": "Dialog session was already ended."}), 409
else: # matched_count == 0 (shouldn't happen if find_one worked unless deleted concurrently)
logger.warning(f"Dialog {dialog_id} matched 0 for ending update.")
return jsonify({"message": "Dialog session not found or already ended."}), 404
except KeyError: # Should be caught by initial user_id check
logger.error(f"User ID (_id) not found in token payload for end_dialog_session.")
return jsonify({"message": "Authentication token is invalid or missing user ID."}), 401
except AttributeError:
logger.error("PyMongo extension not initialized or attached correctly.")
return jsonify({"message": "Database configuration error."}), 500
except Exception as e:
logger.error(f"Error ending dialog {dialog_id} for user {current_user.get('_id', 'UNKNOWN')}: {e}", exc_info=True)
return jsonify({"message": "Internal server error ending dialog session."}), 500
@bp.route('/<string:dialog_id>', methods=['DELETE']) # Path relative to prefix
@token_required
def delete_dialog_session(current_user, dialog_id):
"""
Deletes an entire dialog session document. Verifies ownership.
(No schema needed for input/output here)
"""
logger = _get_logger()
if not mongo: return jsonify({"message": "Database connection not available."}), 500
try:
# Validate IDs
user_id_str = str(current_user.get("_id"))
if not user_id_str: return jsonify({"message": "Missing user ID in token."}), 400
try:
user_id = ObjectId(user_id_str)
dial_obj_id = ObjectId(dialog_id)
except InvalidId:
return jsonify({"message": "Invalid user or dialog ID format."}), 400
# Perform deletion, ensuring the user owns the dialog
db = mongo.db
result = db.dialog_activity.delete_one({"_id": dial_obj_id, "uid": user_id})
# Check if a document was deleted
if result.deleted_count == 1:
return jsonify({"message": "Dialog session deleted successfully."}), 200 # 200 OK or 204 No Content
else:
# If deleted_count is 0, the document either didn't exist or didn't belong to the user
return jsonify({"message": "Dialog session not found or access denied."}), 404
except KeyError: # Should be caught by initial user_id check
logger.error(f"User ID (_id) not found in token payload for delete_dialog_session.")
return jsonify({"message": "Authentication token is invalid or missing user ID."}), 401
except AttributeError:
logger.error("PyMongo extension not initialized or attached correctly.")
return jsonify({"message": "Database configuration error."}), 500
except Exception as e:
logger.error(f"Error deleting dialog {dialog_id} for user {current_user.get('_id', 'UNKNOWN')}: {e}", exc_info=True)
return jsonify({"message": "Internal server error deleting dialog session."}), 500