generated from kgod/ai-review-template
提交
This commit is contained in:
Binary file not shown.
@@ -0,0 +1,173 @@
|
||||
"""Manual entry point for testing the formal Step5 crawl-and-save pipeline."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
import sys
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
||||
if str(PROJECT_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(PROJECT_ROOT))
|
||||
|
||||
from src.config import settings # noqa: E402
|
||||
from src.database import CrawlTask, JobData, SessionLocal, TaskCrawl # noqa: E402
|
||||
from src.scheduler.jobs import job_step5_crawl # noqa: E402
|
||||
|
||||
|
||||
SAMSUNG_CONFIG = {
|
||||
"url": "https://dearsamsung.zhiye.com/#/samsung/pc/szzw",
|
||||
"job_item_selector": ".BHGkB li",
|
||||
"item_change_type": "in_page",
|
||||
"next_page_selector": "._8x6MD .ant-pagination-next:not([aria-disabled='true']) .ant-pagination-item-link",
|
||||
"page_change_type": "content_change",
|
||||
"detail_area_selector": ".FLf6j",
|
||||
"field_selectors": {
|
||||
"job_title": {"selector": ["h2"]},
|
||||
"description": {"selector": [".aCl-8 p", ".aCl-8 pre"]},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@contextmanager
|
||||
def temporarily_pause_other_pending_tasks(db, current_task_id: int):
|
||||
"""Make job_step5_crawl pick the task created by this script."""
|
||||
paused_ids = [
|
||||
row.id
|
||||
for row in db.query(TaskCrawl.id)
|
||||
.filter(TaskCrawl.status == "pending", TaskCrawl.id != current_task_id)
|
||||
.all()
|
||||
]
|
||||
|
||||
if paused_ids:
|
||||
db.query(TaskCrawl).filter(TaskCrawl.id.in_(paused_ids)).update(
|
||||
{TaskCrawl.status: "manual_paused"},
|
||||
synchronize_session=False,
|
||||
)
|
||||
db.commit()
|
||||
|
||||
try:
|
||||
yield paused_ids
|
||||
finally:
|
||||
if paused_ids:
|
||||
db.query(TaskCrawl).filter(TaskCrawl.id.in_(paused_ids)).update(
|
||||
{TaskCrawl.status: "pending"},
|
||||
synchronize_session=False,
|
||||
)
|
||||
db.commit()
|
||||
|
||||
|
||||
def create_manual_task(company_name: str) -> tuple[int, int]:
|
||||
db = SessionLocal()
|
||||
try:
|
||||
crawl_task = db.query(CrawlTask).filter(CrawlTask.company_name == company_name).first()
|
||||
if crawl_task is None:
|
||||
crawl_task = CrawlTask(
|
||||
company_name=company_name,
|
||||
config_step=4,
|
||||
config_status="success",
|
||||
crawl_status="pending",
|
||||
)
|
||||
db.add(crawl_task)
|
||||
db.flush()
|
||||
else:
|
||||
crawl_task.config_step = 4
|
||||
crawl_task.config_status = "success"
|
||||
crawl_task.crawl_status = "pending"
|
||||
|
||||
task_crawl = TaskCrawl(
|
||||
crawl_task_id=crawl_task.id,
|
||||
status="pending",
|
||||
max_retry=1,
|
||||
input_config=SAMSUNG_CONFIG,
|
||||
)
|
||||
db.add(task_crawl)
|
||||
db.commit()
|
||||
return crawl_task.id, task_crawl.id
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def print_result(crawl_task_id: int, task_crawl_id: int) -> None:
|
||||
db = SessionLocal()
|
||||
try:
|
||||
task = db.query(TaskCrawl).filter(TaskCrawl.id == task_crawl_id).first()
|
||||
inserted_count = db.query(JobData).filter(JobData.task_crawl_id == task_crawl_id).count()
|
||||
|
||||
print()
|
||||
print("Manual Step5 crawl finished")
|
||||
print(f"crawl_task_id: {crawl_task_id}")
|
||||
print(f"task_crawl_id: {task_crawl_id}")
|
||||
print(f"status: {task.status if task else 'missing'}")
|
||||
print(f"crawled_count: {task.crawled_count if task else None}")
|
||||
print(f"inserted_count: {inserted_count}")
|
||||
if task and task.error_message:
|
||||
print(f"error_message: {task.error_message}")
|
||||
|
||||
print()
|
||||
print("Check task status:")
|
||||
print(
|
||||
"SELECT id,status,crawled_count,error_message,started_at,finished_at "
|
||||
"FROM table_comple.app_task_crawl "
|
||||
f"WHERE id = {task_crawl_id};"
|
||||
)
|
||||
print()
|
||||
print("Check saved jobs:")
|
||||
print(
|
||||
"SELECT id,job_title,company,recruit_category,created_at "
|
||||
"FROM table_comple.app_job_data "
|
||||
f"WHERE task_crawl_id = {task_crawl_id} "
|
||||
"ORDER BY id LIMIT 50;"
|
||||
)
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Create one formal Step5 crawl task and run job_step5_crawl()."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--company",
|
||||
default=f"manual_samsung_test_{datetime.now():%Y%m%d_%H%M%S}",
|
||||
help="Company name saved into app_crawl_task/app_job_data.company.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--headless",
|
||||
action="store_true",
|
||||
help="Run Chromium in headless mode. Default opens a visible browser.",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
args = parse_args()
|
||||
settings.browser_headless = bool(args.headless)
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||
)
|
||||
|
||||
crawl_task_id, task_crawl_id = create_manual_task(args.company)
|
||||
print(f"Created formal Step5 task: crawl_task_id={crawl_task_id}, task_crawl_id={task_crawl_id}")
|
||||
|
||||
db = SessionLocal()
|
||||
try:
|
||||
with temporarily_pause_other_pending_tasks(db, task_crawl_id) as paused_ids:
|
||||
if paused_ids:
|
||||
print(f"Temporarily paused other pending Step5 tasks: {paused_ids}")
|
||||
await job_step5_crawl()
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
print_result(crawl_task_id, task_crawl_id)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user