Files
post_crawler/scripts/manual_step5_crawl.py
T
2026-05-26 21:02:17 +08:00

174 lines
5.4 KiB
Python

"""Manual entry point for testing the formal Step5 crawl-and-save pipeline."""
from __future__ import annotations
import argparse
import asyncio
import logging
import sys
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from src.config import settings # noqa: E402
from src.database import CrawlTask, JobData, SessionLocal, TaskCrawl # noqa: E402
from src.scheduler.jobs import job_step5_crawl # noqa: E402
SAMSUNG_CONFIG = {
"url": "https://dearsamsung.zhiye.com/#/samsung/pc/szzw",
"job_item_selector": ".BHGkB li",
"item_change_type": "in_page",
"next_page_selector": "._8x6MD .ant-pagination-next:not([aria-disabled='true']) .ant-pagination-item-link",
"page_change_type": "content_change",
"detail_area_selector": ".FLf6j",
"field_selectors": {
"job_title": {"selector": ["h2"]},
"description": {"selector": [".aCl-8 p", ".aCl-8 pre"]},
},
}
@contextmanager
def temporarily_pause_other_pending_tasks(db, current_task_id: int):
"""Make job_step5_crawl pick the task created by this script."""
paused_ids = [
row.id
for row in db.query(TaskCrawl.id)
.filter(TaskCrawl.status == "pending", TaskCrawl.id != current_task_id)
.all()
]
if paused_ids:
db.query(TaskCrawl).filter(TaskCrawl.id.in_(paused_ids)).update(
{TaskCrawl.status: "manual_paused"},
synchronize_session=False,
)
db.commit()
try:
yield paused_ids
finally:
if paused_ids:
db.query(TaskCrawl).filter(TaskCrawl.id.in_(paused_ids)).update(
{TaskCrawl.status: "pending"},
synchronize_session=False,
)
db.commit()
def create_manual_task(company_name: str) -> tuple[int, int]:
db = SessionLocal()
try:
crawl_task = db.query(CrawlTask).filter(CrawlTask.company_name == company_name).first()
if crawl_task is None:
crawl_task = CrawlTask(
company_name=company_name,
config_step=4,
config_status="success",
crawl_status="pending",
)
db.add(crawl_task)
db.flush()
else:
crawl_task.config_step = 4
crawl_task.config_status = "success"
crawl_task.crawl_status = "pending"
task_crawl = TaskCrawl(
crawl_task_id=crawl_task.id,
status="pending",
max_retry=1,
input_config=SAMSUNG_CONFIG,
)
db.add(task_crawl)
db.commit()
return crawl_task.id, task_crawl.id
finally:
db.close()
def print_result(crawl_task_id: int, task_crawl_id: int) -> None:
db = SessionLocal()
try:
task = db.query(TaskCrawl).filter(TaskCrawl.id == task_crawl_id).first()
inserted_count = db.query(JobData).filter(JobData.task_crawl_id == task_crawl_id).count()
print()
print("Manual Step5 crawl finished")
print(f"crawl_task_id: {crawl_task_id}")
print(f"task_crawl_id: {task_crawl_id}")
print(f"status: {task.status if task else 'missing'}")
print(f"crawled_count: {task.crawled_count if task else None}")
print(f"inserted_count: {inserted_count}")
if task and task.error_message:
print(f"error_message: {task.error_message}")
print()
print("Check task status:")
print(
"SELECT id,status,crawled_count,error_message,started_at,finished_at "
"FROM table_comple.app_task_crawl "
f"WHERE id = {task_crawl_id};"
)
print()
print("Check saved jobs:")
print(
"SELECT id,job_title,company,recruit_category,created_at "
"FROM table_comple.app_job_data "
f"WHERE task_crawl_id = {task_crawl_id} "
"ORDER BY id LIMIT 50;"
)
finally:
db.close()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Create one formal Step5 crawl task and run job_step5_crawl()."
)
parser.add_argument(
"--company",
default=f"manual_samsung_test_{datetime.now():%Y%m%d_%H%M%S}",
help="Company name saved into app_crawl_task/app_job_data.company.",
)
parser.add_argument(
"--headless",
action="store_true",
help="Run Chromium in headless mode. Default opens a visible browser.",
)
return parser.parse_args()
async def main() -> None:
args = parse_args()
settings.browser_headless = bool(args.headless)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
crawl_task_id, task_crawl_id = create_manual_task(args.company)
print(f"Created formal Step5 task: crawl_task_id={crawl_task_id}, task_crawl_id={task_crawl_id}")
db = SessionLocal()
try:
with temporarily_pause_other_pending_tasks(db, task_crawl_id) as paused_ids:
if paused_ids:
print(f"Temporarily paused other pending Step5 tasks: {paused_ids}")
await job_step5_crawl()
finally:
db.close()
print_result(crawl_task_id, task_crawl_id)
if __name__ == "__main__":
asyncio.run(main())