import datetime import re import string from collections import Counter, defaultdict import requests from requests.exceptions import Timeout, RequestException, ConnectionError, HTTPError from bs4 import BeautifulSoup from bson.objectid import ObjectId, InvalidId from celery import Celery import pymongo from ..myapp.config import Config # --- Database Connection Setup for Celery --- mongo_client = None mongo_db = None def get_db(): """ Helper to get a dedicated DB connection for Celery tasks. """ global mongo_client, mongo_db if mongo_db is None: try: print("Task: Initializing new MongoDB client connection...") mongo_client = pymongo.MongoClient(Config.MONGO_URI, serverSelectionTimeoutMS=5000) mongo_client.admin.command('ismaster') mongo_db = mongo_client.get_database() print(f"Task: MongoDB connection successful. Using database: {mongo_db.name}") except Exception as e: print(f"FATAL Task Error: Could not connect to MongoDB: {e}") mongo_db = None raise RuntimeError(f"Database connection failed in Celery task: {e}") return mongo_db # --- Celery configuration --- celery = Celery("my_celery_app") celery.conf.broker_url = Config.CELERY_BROKER_URL celery.conf.result_backend = Config.CELERY_RESULT_BACKEND REQUEST_TIMEOUT = 15 # --- Text Cleaning Setup --- PUNCT_REGEX = re.compile(rf"[{re.escape(string.punctuation)}]+") STOPWORDS = { "the", "is", "a", "an", "of", "to", "and", "for", "on", "in", "that", "it", "with", "as", "at", "be", "this", "are", "was", "were", "will", "would", "or", "so", "if", "then", "from", "not", "by", "we", "you", "i", "he", "she", "they", "have", "has", "had", "but", "too", "some", "any", "my", "your", "their", "me" } def clean_and_tokenize(text): """ Performs minimal cleaning by splitting the text. For English this works reasonably; for Chinese you may use a segmentation library like jieba. """ if not text: return [] return text.split() # -------------------------- # Task: Asynchronous Title and Keyword Extraction # -------------------------- @celery.task(bind=True, max_retries=1, default_retry_delay=10) def async_extract_title_and_keywords(self, url_id_str, user_id_str): """ Fetches the webpage, extracts the title and computes the top 20 keywords from its body text. Updates the URL document with the new title and keywords. """ print(f"Task: Starting title/keyword extraction for URL ID: {url_id_str}") try: db = get_db() url_obj_id = ObjectId(url_id_str) except InvalidId: print(f"Task Error: Invalid URL ID format: {url_id_str}") return "Invalid URL ID format." except Exception as e: print(f"Task Error: Could not initialize DB or ObjectId: {e}") return f"DB/ObjectId Error: {e}" if db is None: print(f"Task Error: DB connection is None for URL ID: {url_id_str}") return "DB connection error." try: url_doc = db.urls.find_one({"_id": url_obj_id}) if not url_doc: print(f"Task Error: URL doc not found for ID: {url_id_str}") return "URL doc not found." page_url = url_doc.get("url", "") if not page_url: print(f"Task Error: No URL found in doc: {url_id_str}") db.urls.update_one({"_id": url_obj_id}, {"$set": {"processingStatus": "failed", "updatedAt": datetime.datetime.utcnow()}}) return "No URL found in doc." page_title = "" keywords_list = [] status_to_set = "failed" try: print(f"Task: Fetching URL: {page_url} with timeout={REQUEST_TIMEOUT}") headers = {'User-Agent': 'Mozilla/5.0 (compatible; SurfSmartBot/1.0; +http://example.com/bot)'} r = requests.get(page_url, timeout=REQUEST_TIMEOUT, headers=headers, allow_redirects=True) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") page_title = soup.title.string.strip() if soup.title else url_doc.get("title", "") body_text = soup.body.get_text(" ", strip=True) if soup.body else "" tokens = clean_and_tokenize(body_text) if tokens: counter = Counter(tokens) top_20 = counter.most_common(20) total_count = sum(count for _, count in top_20) for word, count in top_20: perc = round((count / total_count) * 100, 2) if total_count > 0 else 0 keywords_list.append({"word": word, "percentage": perc}) status_to_set = "completed" print( f"Task: Extraction completed for URL {url_id_str}. Title: '{page_title}', Keywords count: {len(keywords_list)}") except Timeout: print(f"Task Error: Request timed out for URL: {page_url}") except ConnectionError: print(f"Task Error: Connection error for URL: {page_url}") except HTTPError as http_err: print(f"Task Error: HTTP error occurred: {http_err} for URL: {page_url}") except RequestException as req_err: print(f"Task Error: Request exception for URL {page_url}: {req_err}") except Exception as e: print(f"Task Error: Unexpected error processing URL {page_url}: {e}") try: self.retry(exc=e) except Exception as retry_err: print(f"Task Error: Retry failed for URL {url_id_str}: {retry_err}") update_data = { "processingStatus": status_to_set, "updatedAt": datetime.datetime.utcnow() } if status_to_set == "completed": update_data["title"] = page_title update_data["keywords"] = keywords_list db.urls.update_one({"_id": url_obj_id}, {"$set": update_data}) print(f"Task: DB updated for URL {url_id_str} with extraction status '{status_to_set}'") return f"OK: Extraction task completed with status {status_to_set}" except Exception as e: print(f"Task Error: Failed during extraction for URL {url_id_str}: {e}") try: db.urls.update_one({"_id": url_obj_id}, {"$set": {"processingStatus": "failed", "updatedAt": datetime.datetime.utcnow()}}) except Exception: pass return f"Error: Extraction task failed for URL {url_id_str}" # -------------------------- # Task: Asynchronous Summarization # -------------------------- @celery.task(bind=True, max_retries=1, default_retry_delay=10) def async_summarize_url(self, url_id_str, user_id_str, use_gemini): """ Fetches webpage content and extracts up to the first 1000 words. If use_gemini is True and a valid Gemini API key is present, builds an effective prompt and calls Gemini to generate a ~300-word summary (under 350 words). Otherwise, truncates the text to around 300 words. Updates the URL document's 'summary' and 'processingStatus' accordingly. """ print(f"Task: Starting summary generation for URL ID: {url_id_str}") try: db = get_db() url_obj_id = ObjectId(url_id_str) except InvalidId: print(f"Task Error: Invalid URL ID format: {url_id_str}") return "Invalid URL ID format." except Exception as e: print(f"Task Error: Could not initialize DB or ObjectId: {e}") return f"DB/ObjectId Error: {e}" if db is None: print(f"Task Error: DB connection is None for URL ID: {url_id_str}") return "DB connection error." try: url_doc = db.urls.find_one({"_id": url_obj_id}) if not url_doc: print(f"Task Error: URL doc not found for ID: {url_id_str}") return "URL doc not found." page_url = url_doc.get("url", "") if not page_url: print(f"Task Error: No URL found in doc: {url_id_str}") db.urls.update_one({"_id": url_obj_id}, {"$set": {"processingStatus": "failed", "updatedAt": datetime.datetime.utcnow()}}) return "No URL found in doc." headers = {'User-Agent': 'Mozilla/5.0 (compatible; SurfSmartBot/1.0; +http://example.com/bot)'} r = requests.get(page_url, timeout=REQUEST_TIMEOUT, headers=headers, allow_redirects=True) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") body_text = soup.body.get_text(" ", strip=True) if soup.body else "" words_full = body_text.split() # For better language support, integrate a segmentation tool if needed. text_1000 = " ".join(words_full[:1000]) summary_result = "" if use_gemini: api_doc = db.api_list.find_one({"uid": ObjectId(user_id_str), "selected": True, "name": "Gemini"}) if api_doc and api_doc.get("key"): gemini_key = api_doc.get("key") prompt = ( "You are an expert summarizer. Below is text extracted from a webpage. " "Please generate a concise, high-quality summary of approximately 300 words (but under 350 words). " "Ensure the summary is in the same language as the input text.\n\n" + text_1000 ) try: import google.generativeai as genai from google.api_core import exceptions as google_exceptions genai.configure(api_key=gemini_key) GEMINI_MODEL_NAME = 'gemini-1.5-pro-latest' model = genai.GenerativeModel(GEMINI_MODEL_NAME) gemini_input = [{"role": "user", "parts": [{"text": prompt}]}] llm_response = model.generate_content(gemini_input) summary_result = llm_response.text if llm_response.parts else "" except Exception as gem_err: print(f"Task Error: Gemini API error: {gem_err}. Falling back to truncation.") summary_result = " ".join(text_1000.split()[:300]) else: summary_result = " ".join(text_1000.split()[:300]) else: summary_result = " ".join(text_1000.split()[:300]) status_to_set = "completed" if summary_result.strip() else "failed" update_data = { "summary": summary_result, "processingStatus": status_to_set, "updatedAt": datetime.datetime.utcnow() } db.urls.update_one({"_id": url_obj_id}, {"$set": update_data}) print( f"Task: Summary generation for URL {url_id_str} completed with status '{status_to_set}'. Word count: {len(summary_result.split())}") return f"OK: Summary task completed with status {status_to_set}" except Timeout: print(f"Task Error: Request timed out for URL: {page_url}") except ConnectionError: print(f"Task Error: Connection error for URL: {page_url}") except HTTPError as http_err: print(f"Task Error: HTTP error occurred: {http_err} for URL: {page_url}") except RequestException as req_err: print(f"Task Error: Request exception for URL {page_url}: {req_err}") except Exception as e: print(f"Task Error: Unexpected error during summarization for URL {page_url}: {e}") try: self.retry(exc=e) except Exception as retry_err: print(f"Task Error: Retry failed for URL {url_id_str}: {retry_err}") try: db.urls.update_one({"_id": url_obj_id}, {"$set": {"processingStatus": "failed", "updatedAt": datetime.datetime.utcnow()}}) except Exception: pass return f"Error: Summarization task failed for URL {url_id_str}" # -------------------------- # Task: Asynchronous Recalculate Project Keywords # -------------------------- @celery.task(bind=True, max_retries=1, default_retry_delay=10) def async_recalc_project_keywords(self, project_id, user_id_str): """ Recalculates project keywords by summing the percentages from all associated URL documents. Retains the top 20 keywords and updates the project document. """ print(f"Task: Starting keywords recalculation for project {project_id}") try: db = get_db() project_obj_id = ObjectId(project_id) except InvalidId: print(f"Task Error: Invalid project ID format: {project_id}") return "Invalid project ID format." except Exception as e: print(f"Task Error: Unable to initialize DB or convert project ID: {e}") return f"DB/ObjectId Error: {e}" if db is None: print(f"Task Error: DB connection is None for project {project_id}") return "DB connection error." try: cursor = db.urls.find({"projectId": project_obj_id}, {"keywords": 1}) combined = defaultdict(float) for doc in cursor: keywords_list = doc.get("keywords", []) if isinstance(keywords_list, list): for kw in keywords_list: if isinstance(kw, dict): word = kw.get("word", "").strip() try: percentage = float(kw.get("percentage", 0.0)) except (ValueError, TypeError): percentage = 0.0 if word and isinstance(word, str): combined[word] += percentage else: print(f"Task Warning: Non-dict item in keywords for a URL in project {project_id}") else: print(f"Task Warning: Keywords field is not a list for a URL in project {project_id}") sorted_kw = sorted(combined.items(), key=lambda x: x[1], reverse=True)[:20] top_keywords = [{"word": w, "percentage": round(val, 2)} for w, val in sorted_kw] update_data = { "keywords": top_keywords, "updatedAt": datetime.datetime.utcnow() } db.projects.update_one({"_id": project_obj_id}, {"$set": update_data}) print(f"Task: Keywords recalculation for project {project_id} completed. Top keywords: {top_keywords}") return f"OK: Project keywords recalculated successfully." except Exception as e: print(f"Task Error: Failed during keywords recalculation for project {project_id}: {e}") try: db.projects.update_one({"_id": project_obj_id}, {"$set": {"updatedAt": datetime.datetime.utcnow()}}) except Exception: pass return f"Error: Keywords recalculation failed for project {project_id}"