330 lines
14 KiB
Python
330 lines
14 KiB
Python
import datetime
|
|
import re
|
|
import string
|
|
from collections import Counter, defaultdict
|
|
|
|
import requests
|
|
from requests.exceptions import Timeout, RequestException, ConnectionError, HTTPError
|
|
from bs4 import BeautifulSoup
|
|
from bson.objectid import ObjectId, InvalidId
|
|
from celery import Celery
|
|
|
|
import pymongo
|
|
from ..myapp.config import Config
|
|
|
|
# --- Database Connection Setup for Celery ---
|
|
mongo_client = None
|
|
mongo_db = None
|
|
|
|
|
|
def get_db():
|
|
"""
|
|
Helper to get a dedicated DB connection for Celery tasks.
|
|
"""
|
|
global mongo_client, mongo_db
|
|
if mongo_db is None:
|
|
try:
|
|
print("Task: Initializing new MongoDB client connection...")
|
|
mongo_client = pymongo.MongoClient(Config.MONGO_URI, serverSelectionTimeoutMS=5000)
|
|
mongo_client.admin.command('ismaster')
|
|
mongo_db = mongo_client.get_database()
|
|
print(f"Task: MongoDB connection successful. Using database: {mongo_db.name}")
|
|
except Exception as e:
|
|
print(f"FATAL Task Error: Could not connect to MongoDB: {e}")
|
|
mongo_db = None
|
|
raise RuntimeError(f"Database connection failed in Celery task: {e}")
|
|
return mongo_db
|
|
|
|
|
|
# --- Celery configuration ---
|
|
celery = Celery("my_celery_app")
|
|
celery.conf.broker_url = Config.CELERY_BROKER_URL
|
|
celery.conf.result_backend = Config.CELERY_RESULT_BACKEND
|
|
|
|
REQUEST_TIMEOUT = 15
|
|
|
|
# --- Text Cleaning Setup ---
|
|
PUNCT_REGEX = re.compile(rf"[{re.escape(string.punctuation)}]+")
|
|
STOPWORDS = {
|
|
"the", "is", "a", "an", "of", "to", "and", "for", "on", "in", "that", "it", "with", "as", "at", "be",
|
|
"this", "are", "was", "were", "will", "would", "or", "so", "if", "then", "from", "not", "by", "we", "you",
|
|
"i", "he", "she", "they", "have", "has", "had", "but", "too", "some", "any", "my", "your", "their", "me"
|
|
}
|
|
|
|
|
|
def clean_and_tokenize(text):
|
|
"""
|
|
Performs minimal cleaning by splitting the text.
|
|
For English this works reasonably; for Chinese you may use a segmentation library like jieba.
|
|
"""
|
|
if not text:
|
|
return []
|
|
return text.split()
|
|
|
|
|
|
# --------------------------
|
|
# Task: Asynchronous Title and Keyword Extraction
|
|
# --------------------------
|
|
@celery.task(bind=True, max_retries=1, default_retry_delay=10)
|
|
def async_extract_title_and_keywords(self, url_id_str, user_id_str):
|
|
"""
|
|
Fetches the webpage, extracts the title and computes the top 20 keywords from its body text.
|
|
Updates the URL document with the new title and keywords.
|
|
"""
|
|
print(f"Task: Starting title/keyword extraction for URL ID: {url_id_str}")
|
|
try:
|
|
db = get_db()
|
|
url_obj_id = ObjectId(url_id_str)
|
|
except InvalidId:
|
|
print(f"Task Error: Invalid URL ID format: {url_id_str}")
|
|
return "Invalid URL ID format."
|
|
except Exception as e:
|
|
print(f"Task Error: Could not initialize DB or ObjectId: {e}")
|
|
return f"DB/ObjectId Error: {e}"
|
|
|
|
if db is None:
|
|
print(f"Task Error: DB connection is None for URL ID: {url_id_str}")
|
|
return "DB connection error."
|
|
|
|
try:
|
|
url_doc = db.urls.find_one({"_id": url_obj_id})
|
|
if not url_doc:
|
|
print(f"Task Error: URL doc not found for ID: {url_id_str}")
|
|
return "URL doc not found."
|
|
|
|
page_url = url_doc.get("url", "")
|
|
if not page_url:
|
|
print(f"Task Error: No URL found in doc: {url_id_str}")
|
|
db.urls.update_one({"_id": url_obj_id},
|
|
{"$set": {"processingStatus": "failed", "updatedAt": datetime.datetime.utcnow()}})
|
|
return "No URL found in doc."
|
|
|
|
page_title = ""
|
|
keywords_list = []
|
|
status_to_set = "failed"
|
|
|
|
try:
|
|
print(f"Task: Fetching URL: {page_url} with timeout={REQUEST_TIMEOUT}")
|
|
headers = {'User-Agent': 'Mozilla/5.0 (compatible; SurfSmartBot/1.0; +http://example.com/bot)'}
|
|
r = requests.get(page_url, timeout=REQUEST_TIMEOUT, headers=headers, allow_redirects=True)
|
|
r.raise_for_status()
|
|
soup = BeautifulSoup(r.text, "html.parser")
|
|
page_title = soup.title.string.strip() if soup.title else url_doc.get("title", "")
|
|
body_text = soup.body.get_text(" ", strip=True) if soup.body else ""
|
|
tokens = clean_and_tokenize(body_text)
|
|
if tokens:
|
|
counter = Counter(tokens)
|
|
top_20 = counter.most_common(20)
|
|
total_count = sum(count for _, count in top_20)
|
|
for word, count in top_20:
|
|
perc = round((count / total_count) * 100, 2) if total_count > 0 else 0
|
|
keywords_list.append({"word": word, "percentage": perc})
|
|
status_to_set = "completed"
|
|
print(
|
|
f"Task: Extraction completed for URL {url_id_str}. Title: '{page_title}', Keywords count: {len(keywords_list)}")
|
|
except Timeout:
|
|
print(f"Task Error: Request timed out for URL: {page_url}")
|
|
except ConnectionError:
|
|
print(f"Task Error: Connection error for URL: {page_url}")
|
|
except HTTPError as http_err:
|
|
print(f"Task Error: HTTP error occurred: {http_err} for URL: {page_url}")
|
|
except RequestException as req_err:
|
|
print(f"Task Error: Request exception for URL {page_url}: {req_err}")
|
|
except Exception as e:
|
|
print(f"Task Error: Unexpected error processing URL {page_url}: {e}")
|
|
try:
|
|
self.retry(exc=e)
|
|
except Exception as retry_err:
|
|
print(f"Task Error: Retry failed for URL {url_id_str}: {retry_err}")
|
|
|
|
update_data = {
|
|
"processingStatus": status_to_set,
|
|
"updatedAt": datetime.datetime.utcnow()
|
|
}
|
|
if status_to_set == "completed":
|
|
update_data["title"] = page_title
|
|
update_data["keywords"] = keywords_list
|
|
|
|
db.urls.update_one({"_id": url_obj_id}, {"$set": update_data})
|
|
print(f"Task: DB updated for URL {url_id_str} with extraction status '{status_to_set}'")
|
|
return f"OK: Extraction task completed with status {status_to_set}"
|
|
except Exception as e:
|
|
print(f"Task Error: Failed during extraction for URL {url_id_str}: {e}")
|
|
try:
|
|
db.urls.update_one({"_id": url_obj_id},
|
|
{"$set": {"processingStatus": "failed", "updatedAt": datetime.datetime.utcnow()}})
|
|
except Exception:
|
|
pass
|
|
return f"Error: Extraction task failed for URL {url_id_str}"
|
|
|
|
|
|
# --------------------------
|
|
# Task: Asynchronous Summarization
|
|
# --------------------------
|
|
@celery.task(bind=True, max_retries=1, default_retry_delay=10)
|
|
def async_summarize_url(self, url_id_str, user_id_str, use_gemini):
|
|
"""
|
|
Fetches webpage content and extracts up to the first 1000 words.
|
|
If use_gemini is True and a valid Gemini API key is present, builds an effective prompt
|
|
and calls Gemini to generate a ~300-word summary (under 350 words). Otherwise, truncates the text
|
|
to around 300 words.
|
|
Updates the URL document's 'summary' and 'processingStatus' accordingly.
|
|
"""
|
|
print(f"Task: Starting summary generation for URL ID: {url_id_str}")
|
|
try:
|
|
db = get_db()
|
|
url_obj_id = ObjectId(url_id_str)
|
|
except InvalidId:
|
|
print(f"Task Error: Invalid URL ID format: {url_id_str}")
|
|
return "Invalid URL ID format."
|
|
except Exception as e:
|
|
print(f"Task Error: Could not initialize DB or ObjectId: {e}")
|
|
return f"DB/ObjectId Error: {e}"
|
|
|
|
if db is None:
|
|
print(f"Task Error: DB connection is None for URL ID: {url_id_str}")
|
|
return "DB connection error."
|
|
|
|
try:
|
|
url_doc = db.urls.find_one({"_id": url_obj_id})
|
|
if not url_doc:
|
|
print(f"Task Error: URL doc not found for ID: {url_id_str}")
|
|
return "URL doc not found."
|
|
|
|
page_url = url_doc.get("url", "")
|
|
if not page_url:
|
|
print(f"Task Error: No URL found in doc: {url_id_str}")
|
|
db.urls.update_one({"_id": url_obj_id},
|
|
{"$set": {"processingStatus": "failed", "updatedAt": datetime.datetime.utcnow()}})
|
|
return "No URL found in doc."
|
|
|
|
headers = {'User-Agent': 'Mozilla/5.0 (compatible; SurfSmartBot/1.0; +http://example.com/bot)'}
|
|
r = requests.get(page_url, timeout=REQUEST_TIMEOUT, headers=headers, allow_redirects=True)
|
|
r.raise_for_status()
|
|
soup = BeautifulSoup(r.text, "html.parser")
|
|
body_text = soup.body.get_text(" ", strip=True) if soup.body else ""
|
|
words_full = body_text.split() # For better language support, integrate a segmentation tool if needed.
|
|
text_1000 = " ".join(words_full[:1000])
|
|
|
|
summary_result = ""
|
|
if use_gemini:
|
|
api_doc = db.api_list.find_one({"uid": ObjectId(user_id_str), "selected": True, "name": "Gemini"})
|
|
if api_doc and api_doc.get("key"):
|
|
gemini_key = api_doc.get("key")
|
|
prompt = (
|
|
"You are an expert summarizer. Below is text extracted from a webpage. "
|
|
"Please generate a concise, high-quality summary of approximately 300 words (but under 350 words). "
|
|
"Ensure the summary is in the same language as the input text.\n\n" +
|
|
text_1000
|
|
)
|
|
try:
|
|
import google.generativeai as genai
|
|
from google.api_core import exceptions as google_exceptions
|
|
genai.configure(api_key=gemini_key)
|
|
GEMINI_MODEL_NAME = 'gemini-1.5-pro-latest'
|
|
model = genai.GenerativeModel(GEMINI_MODEL_NAME)
|
|
gemini_input = [{"role": "user", "parts": [{"text": prompt}]}]
|
|
llm_response = model.generate_content(gemini_input)
|
|
summary_result = llm_response.text if llm_response.parts else ""
|
|
except Exception as gem_err:
|
|
print(f"Task Error: Gemini API error: {gem_err}. Falling back to truncation.")
|
|
summary_result = " ".join(text_1000.split()[:300])
|
|
else:
|
|
summary_result = " ".join(text_1000.split()[:300])
|
|
else:
|
|
summary_result = " ".join(text_1000.split()[:300])
|
|
|
|
status_to_set = "completed" if summary_result.strip() else "failed"
|
|
update_data = {
|
|
"summary": summary_result,
|
|
"processingStatus": status_to_set,
|
|
"updatedAt": datetime.datetime.utcnow()
|
|
}
|
|
db.urls.update_one({"_id": url_obj_id}, {"$set": update_data})
|
|
print(
|
|
f"Task: Summary generation for URL {url_id_str} completed with status '{status_to_set}'. Word count: {len(summary_result.split())}")
|
|
return f"OK: Summary task completed with status {status_to_set}"
|
|
except Timeout:
|
|
print(f"Task Error: Request timed out for URL: {page_url}")
|
|
except ConnectionError:
|
|
print(f"Task Error: Connection error for URL: {page_url}")
|
|
except HTTPError as http_err:
|
|
print(f"Task Error: HTTP error occurred: {http_err} for URL: {page_url}")
|
|
except RequestException as req_err:
|
|
print(f"Task Error: Request exception for URL {page_url}: {req_err}")
|
|
except Exception as e:
|
|
print(f"Task Error: Unexpected error during summarization for URL {page_url}: {e}")
|
|
try:
|
|
self.retry(exc=e)
|
|
except Exception as retry_err:
|
|
print(f"Task Error: Retry failed for URL {url_id_str}: {retry_err}")
|
|
try:
|
|
db.urls.update_one({"_id": url_obj_id},
|
|
{"$set": {"processingStatus": "failed", "updatedAt": datetime.datetime.utcnow()}})
|
|
except Exception:
|
|
pass
|
|
return f"Error: Summarization task failed for URL {url_id_str}"
|
|
|
|
|
|
# --------------------------
|
|
# Task: Asynchronous Recalculate Project Keywords
|
|
# --------------------------
|
|
@celery.task(bind=True, max_retries=1, default_retry_delay=10)
|
|
def async_recalc_project_keywords(self, project_id, user_id_str):
|
|
"""
|
|
Recalculates project keywords by summing the percentages from all associated URL documents.
|
|
Retains the top 20 keywords and updates the project document.
|
|
"""
|
|
print(f"Task: Starting keywords recalculation for project {project_id}")
|
|
try:
|
|
db = get_db()
|
|
project_obj_id = ObjectId(project_id)
|
|
except InvalidId:
|
|
print(f"Task Error: Invalid project ID format: {project_id}")
|
|
return "Invalid project ID format."
|
|
except Exception as e:
|
|
print(f"Task Error: Unable to initialize DB or convert project ID: {e}")
|
|
return f"DB/ObjectId Error: {e}"
|
|
|
|
if db is None:
|
|
print(f"Task Error: DB connection is None for project {project_id}")
|
|
return "DB connection error."
|
|
|
|
try:
|
|
cursor = db.urls.find({"projectId": project_obj_id}, {"keywords": 1})
|
|
combined = defaultdict(float)
|
|
for doc in cursor:
|
|
keywords_list = doc.get("keywords", [])
|
|
if isinstance(keywords_list, list):
|
|
for kw in keywords_list:
|
|
if isinstance(kw, dict):
|
|
word = kw.get("word", "").strip()
|
|
try:
|
|
percentage = float(kw.get("percentage", 0.0))
|
|
except (ValueError, TypeError):
|
|
percentage = 0.0
|
|
if word and isinstance(word, str):
|
|
combined[word] += percentage
|
|
else:
|
|
print(f"Task Warning: Non-dict item in keywords for a URL in project {project_id}")
|
|
else:
|
|
print(f"Task Warning: Keywords field is not a list for a URL in project {project_id}")
|
|
|
|
sorted_kw = sorted(combined.items(), key=lambda x: x[1], reverse=True)[:20]
|
|
top_keywords = [{"word": w, "percentage": round(val, 2)} for w, val in sorted_kw]
|
|
|
|
update_data = {
|
|
"keywords": top_keywords,
|
|
"updatedAt": datetime.datetime.utcnow()
|
|
}
|
|
db.projects.update_one({"_id": project_obj_id}, {"$set": update_data})
|
|
print(f"Task: Keywords recalculation for project {project_id} completed. Top keywords: {top_keywords}")
|
|
return f"OK: Project keywords recalculated successfully."
|
|
except Exception as e:
|
|
print(f"Task Error: Failed during keywords recalculation for project {project_id}: {e}")
|
|
try:
|
|
db.projects.update_one({"_id": project_obj_id}, {"$set": {"updatedAt": datetime.datetime.utcnow()}})
|
|
except Exception:
|
|
pass
|
|
return f"Error: Keywords recalculation failed for project {project_id}"
|