Fixed: miss append data while multithreading

Pushed: new parsed zip
This commit is contained in:
ldy 2023-11-02 11:19:59 +08:00
parent 61ef0081d8
commit ad427c24dc
3 changed files with 36 additions and 29 deletions

Binary file not shown.

View File

@ -15,7 +15,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
爬取网站'ejde.math.txstate.edu' 爬取网站'ejde.math.txstate.edu'
Total number of papers: 2023/08/08 - 4785 Total number of papers: 2023/08/08 - 4785
Total Time via VPN w/100ms-delay: 48.04s Total Time via VPN w/100ms-delay: 96.30s
==========运行顺序========== ==========运行顺序==========
1ejde_main 获取各年份的期刊链接 -> 抓取各篇论文的信息和作者信息 -> 调用ejde_save -> 存入小文件json暂存 1ejde_main 获取各年份的期刊链接 -> 抓取各篇论文的信息和作者信息 -> 调用ejde_save -> 存入小文件json暂存
@ -24,9 +24,19 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
''' '''
def save_data_thread_safe(data, data_lock, data_type): def append_data_thread_safe(from_list, to_list, data_lock):
with data_lock: with data_lock:
ejde_save.save_data(data, f"{data_type}", str(uuid.uuid4()) + ".json") to_list.append(from_list)
def save_data_thread_safe(data, data_lock, data_type):
global articleNum, authorNum
with data_lock:
ejde_save.save_data(data, f"{data_type}_TS", str(uuid.uuid4()) + ".json")
if data_type == "Article":
articleNum += len(data)
else:
authorNum += len(data)
data.clear() data.clear()
@ -155,7 +165,6 @@ def process_html_article(baseweb, article):
@retry(wait_fixed=5000, stop_max_attempt_number=5) @retry(wait_fixed=5000, stop_max_attempt_number=5)
def process_article(title, article_url): def process_article(title, article_url):
global articleNum, authorNum
headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.75 Safari/537.36'} 'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.75 Safari/537.36'}
article_response = requests.get(article_url, headers=headers) article_response = requests.get(article_url, headers=headers)
@ -302,8 +311,7 @@ def process_article(title, article_url):
} }
] ]
} }
authorData.append(author_data) append_data_thread_safe(author_data, authorData, authorDataLock)
authorNum += 1
# If no author table # If no author table
else: else:
match_type = 0 match_type = 0
@ -374,8 +382,7 @@ def process_article(title, article_url):
} }
] ]
} }
authorData.append(author_data) append_data_thread_safe(author_data, authorData, authorDataLock)
authorNum += 1
else: else:
print("AUTHOR SEARCHING ERROR:", article_url) print("AUTHOR SEARCHING ERROR:", article_url)
fail = { fail = {
@ -402,15 +409,14 @@ def process_article(title, article_url):
"issue": issue, "issue": issue,
"page": pp "page": pp
} }
articleData.append(article_data) append_data_thread_safe(article_data, articleData, articleDataLock)
articleNum += 1
# Save the data periodically based on batch size # Save the data periodically based on batch size
if len(articleData) % batch_size == 0: if len(articleData) % batch_size == 0:
save_data_thread_safe(articleData, articleDataLock, "Article_TS") save_data_thread_safe(articleData, articleDataLock, "Article")
if len(authorData) % batch_size == 0: if len(authorData) % batch_size == 0:
save_data_thread_safe(authorData, authorDataLock, "Author_TS") save_data_thread_safe(authorData, authorDataLock, "Author")
start_time = time.time() start_time = time.time()
@ -463,28 +469,29 @@ for future in as_completed(futures):
print("VOLUME PROCESSING ERROR:", str(vol_err)) print("VOLUME PROCESSING ERROR:", str(vol_err))
# Retry failed processing paper # Retry failed processing paper
print("START RETRYING:", len(failedData)) if len(failedData):
while failedData: print("START RETRYING:", len(failedData))
fail_data = failedData.pop(0) while failedData:
articleTitle = fail_data["title"] fail_data = failedData.pop(0)
articleUrl = fail_data["URL"] articleTitle = fail_data["title"]
try: articleUrl = fail_data["URL"]
process_article(articleTitle, articleUrl) try:
except Exception as retry_err: process_article(articleTitle, articleUrl)
print("ARTICLE RETRYING FAILURE:", str(retry_err)) except Exception as retry_err:
totally_fail = { print("ARTICLE RETRYING FAILURE:", str(retry_err))
"title": articleTitle, totally_fail = {
"URL": articleUrl "title": articleTitle,
} "URL": articleUrl
totallyFailedData.append(totally_fail) }
totallyFailedData.append(totally_fail)
# Save remaining data # Save remaining data
if len(articleData) > 0: if len(articleData) > 0:
ejde_save.save_data(articleData, "Article_TS", str(uuid.uuid4()) + ".json") save_data_thread_safe(articleData, articleDataLock, "Article")
print("COMPLETE: All EJDE article data has been saved to ./ejde_buffer/Article_TS/") print("COMPLETE: All EJDE article data has been saved to ./ejde_buffer/Article_TS/")
if len(authorData) > 0: if len(authorData) > 0:
ejde_save.save_data(authorData, "Author_TS", str(uuid.uuid4()) + ".json") save_data_thread_safe(authorData, authorDataLock, "Author")
print("COMPLETE: All EJDE author data has been saved to ./ejde_buffer/Author_TS/") print("COMPLETE: All EJDE author data has been saved to ./ejde_buffer/Author_TS/")
# Save error record # Save error record
@ -507,4 +514,4 @@ print("time elapsed: {:.2f}s".format(time.time() - start_time))
# Transfer to large file and delete the temporary storage files # Transfer to large file and delete the temporary storage files
ejde_save.transform_data() ejde_save.transform_data()
# ejde_save.delete_data() ejde_save.delete_data()