diff --git a/src/scheduler/scheduler.py b/src/scheduler/scheduler.py new file mode 100644 index 00000000..6c9cab97 --- /dev/null +++ b/src/scheduler/scheduler.py @@ -0,0 +1,117 @@ +"""定时任务调度器""" +import logging +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from datetime import datetime + +from src.config import settings +from src.scheduler.jobs import ( + job_step1_search, + job_step2_page, + job_step3_next, + job_step4_detail, + job_step5_crawl, + job_periodic_crawl, + job_check_validity, + job_generate_company, +) + +logger = logging.getLogger(__name__) + +# 创建调度器 +scheduler = AsyncIOScheduler( + job_defaults={ + 'coalesce': True, # 合并错过的任务 + 'max_instances': 3, # 同一任务最多3个实例并发 + } +) + + +def start_scheduler(): + """启动调度器并注册所有任务""" + + # Step 1: 搜索招聘页面 + + scheduler.add_job( + job_step1_search, + 'interval', + seconds=settings.job_step1_search_interval, + id='job_step1_search', + max_instances=2, + name='搜索招聘页面', + next_run_time=datetime.now() # 立即执行第一次 + ) + + # Step 2: 岗位列表分析 + scheduler.add_job( + job_step2_page, + 'interval', + seconds=settings.job_step2_page_interval, + id='job_step2_page', + name='岗位列表分析' + ) + + # Step 3: 分页分析 + scheduler.add_job( + job_step3_next, + 'interval', + seconds=settings.job_step3_next_interval, + id='job_step3_next', + name='分页分析' + ) + + #Step 4: 详情页分析 + scheduler.add_job( + job_step4_detail, + 'interval', + seconds=settings.job_step4_detail_interval, + id='job_step4_detail', + name='详情页分析' + ) + + # Step 5: 数据爬取 + scheduler.add_job( + job_step5_crawl, + 'interval', + seconds=settings.job_step5_crawl_interval, + id='job_step5_crawl', + next_run_time=datetime.now(), + name='数据爬取' + ) + + # # 周期爬取 + # scheduler.add_job( + # job_periodic_crawl, + # 'interval', + # seconds=settings.job_periodic_crawl_interval, + # id='job_periodic_crawl', + # name='周期爬取' + # ) + # + # # 有效性检查 + # scheduler.add_job( + # job_check_validity, + # 'interval', + # seconds=settings.job_check_validity_interval, + # id='job_check_validity', + # name='有效性检查' + # ) + + # AI生成公司名 + scheduler.add_job( + job_generate_company, + 'interval', + seconds=settings.job_generate_company_interval, + id='job_generate_company', + max_instances=1, # 单实例 + next_run_time=datetime.now(), + name='AI生成公司名' + ) + + scheduler.start() + logger.info("定时任务调度器已启动") + + +def shutdown_scheduler(): + """关闭调度器""" + scheduler.shutdown() + logger.info("定时任务调度器已关闭") diff --git a/src/search_company_graph/__init__.py b/src/search_company_graph/__init__.py new file mode 100644 index 00000000..2e39d57d --- /dev/null +++ b/src/search_company_graph/__init__.py @@ -0,0 +1,4 @@ +"""搜索公司招聘页面模块""" +from .main import search_company_recruitment + +__all__ = ["search_company_recruitment"] diff --git a/src/search_company_graph/__pycache__/__init__.cpython-312.pyc b/src/search_company_graph/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 00000000..58232c19 Binary files /dev/null and b/src/search_company_graph/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/search_company_graph/__pycache__/graph.cpython-312.pyc b/src/search_company_graph/__pycache__/graph.cpython-312.pyc new file mode 100644 index 00000000..49a124b7 Binary files /dev/null and b/src/search_company_graph/__pycache__/graph.cpython-312.pyc differ diff --git a/src/search_company_graph/__pycache__/main.cpython-312.pyc b/src/search_company_graph/__pycache__/main.cpython-312.pyc new file mode 100644 index 00000000..7827fccb Binary files /dev/null and b/src/search_company_graph/__pycache__/main.cpython-312.pyc differ diff --git a/src/search_company_graph/__pycache__/nodes.cpython-312.pyc b/src/search_company_graph/__pycache__/nodes.cpython-312.pyc new file mode 100644 index 00000000..5c3a7536 Binary files /dev/null and b/src/search_company_graph/__pycache__/nodes.cpython-312.pyc differ diff --git a/src/search_company_graph/__pycache__/prompts.cpython-312.pyc b/src/search_company_graph/__pycache__/prompts.cpython-312.pyc new file mode 100644 index 00000000..8f415645 Binary files /dev/null and b/src/search_company_graph/__pycache__/prompts.cpython-312.pyc differ diff --git a/src/search_company_graph/__pycache__/state.cpython-312.pyc b/src/search_company_graph/__pycache__/state.cpython-312.pyc new file mode 100644 index 00000000..3026151e Binary files /dev/null and b/src/search_company_graph/__pycache__/state.cpython-312.pyc differ diff --git a/src/search_company_graph/graph.py b/src/search_company_graph/graph.py new file mode 100644 index 00000000..720ced6b --- /dev/null +++ b/src/search_company_graph/graph.py @@ -0,0 +1,105 @@ +"""LangGraph 流程定义""" +from langgraph.graph import StateGraph, END +from .state import SearchState +from .nodes import search_sogou, extract_links, visit_and_verify, navigate_to_recruitment + + +def route_entry(state: SearchState) -> str: + """入口路由:有已知候选链接则跳过搜索""" + if state.get("candidate_urls"): + return "has_url" + return "need_search" + + +def check_links(state: SearchState) -> str: + """检查是否有候选链接""" + urls = state.get("candidate_urls", []) + if not urls: + return "no_links" + return "has_links" + + +def check_verify_result(state: SearchState) -> str: + """检查验证结果""" + # 已找到招聘列表页 + if state.get("result_url"): + return "found" + + # 有错误(候选用完) + if state.get("error"): + return "failed" + + # 未找到,需要跳转 + return "need_navigate" + + +def check_navigate_result(state: SearchState) -> str: + """检查导航后的状态""" + current_url_index = state.get("current_url_index", 0) + candidate_urls = state.get("candidate_urls", []) + + # 候选用完 + if current_url_index >= len(candidate_urls): + return "no_more_candidates" + + # 回 Node 3 验证 + return "verify" + + +def create_graph() -> StateGraph: + """创建流程图""" + + graph = StateGraph(SearchState) + + # 添加节点 + graph.add_node("route_entry", lambda state: state) # 路由节点,不修改状态 + graph.add_node("search_sogou", search_sogou) + graph.add_node("extract_links", extract_links) + graph.add_node("visit_and_verify", visit_and_verify) + graph.add_node("navigate_to_recruitment", navigate_to_recruitment) + + # 入口路由 + graph.set_entry_point("route_entry") + + # 路由:有已知URL直接验证,否则走搜索 + graph.add_conditional_edges( + "route_entry", + route_entry, + { + "has_url": "visit_and_verify", + "need_search": "search_sogou" + } + ) + + # 流程 + graph.add_edge("search_sogou", "extract_links") + + graph.add_conditional_edges( + "extract_links", + check_links, + { + "no_links": END, + "has_links": "visit_and_verify" + } + ) + + graph.add_conditional_edges( + "visit_and_verify", + check_verify_result, + { + "found": END, + "failed": END, + "need_navigate": "navigate_to_recruitment" + } + ) + + graph.add_conditional_edges( + "navigate_to_recruitment", + check_navigate_result, + { + "no_more_candidates": END, + "verify": "visit_and_verify" + } + ) + + return graph.compile() diff --git a/src/search_company_graph/main.py b/src/search_company_graph/main.py new file mode 100644 index 00000000..037498fb --- /dev/null +++ b/src/search_company_graph/main.py @@ -0,0 +1,88 @@ +"""入口""" +import asyncio +import sys +from pathlib import Path + +# 支持直接运行 +if __name__ == "__main__": + sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +from playwright.async_api import async_playwright +from playwright_stealth import Stealth +from src.search_company_graph.graph import create_graph + + +async def search_company_recruitment(company_name: str, input_url: str = None, headless: bool = False) -> list[str]: + """ + 搜索公司招聘列表页面 + + Args: + company_name: 公司名称 + input_url: 已知招聘地址(可选),有值时跳过搜索直接验证 + headless: 是否无头模式 + + Returns: + list[str]: 招聘列表页 URL 列表,失败返回空列表 + """ + async with async_playwright() as p: + browser = await p.chromium.launch(headless=headless, channel="chrome") + context = await browser.new_context() + + # 应用 stealth + stealth = Stealth() + await stealth.apply_stealth_async(context) + + page = await context.new_page() + + try: + graph = create_graph() + + initial_state = { + "company_name": company_name, + "page": page + } + + # 已知URL时,直接作为候选链接跳过搜索 + if input_url: + initial_state["candidate_urls"] = [input_url] + initial_state["current_url_index"] = 0 + initial_state["clicked_selectors"] = [] + initial_state["navigate_retry_count"] = 0 + + print(f"\n{'='*50}") + print(f"开始搜索: {company_name}") + print(f"{'='*50}\n") + + final_state = await graph.ainvoke(initial_state) + + print(f"\n{'='*50}") + print("搜索完成") + print(f"{'='*50}\n") + + # 返回结果 + result_url = final_state.get("result_url") + if result_url: + return [result_url] + return [] + + finally: + await browser.close() + + +async def main(): + """测试""" + company_name = "五粮液" + + result = await search_company_recruitment(company_name) + + print("\n最终结果:") + if result: + print(f"✅ 成功找到招聘页面:") + for url in result: + print(f" {url}") + else: + print("❌ 未找到招聘页面") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/search_company_graph/nodes.py b/src/search_company_graph/nodes.py new file mode 100644 index 00000000..7eadf1b3 --- /dev/null +++ b/src/search_company_graph/nodes.py @@ -0,0 +1,457 @@ +"""节点实现""" +import asyncio +from pydantic import BaseModel, Field +from langchain_core.messages import HumanMessage +from src.bash_model import GeneralLlm +from .state import SearchState +from .prompts import EXTRACT_LINKS_PROMPT, VERIFY_RECRUITMENT_LIST_PROMPT, FIND_RECRUITMENT_ENTRY_PROMPT + + +# 结构化输出模型 +class ExtractLinksResult(BaseModel): + """提取链接结果""" + urls: list[str] = Field(description="候选URL列表,最多3个") + reason: str = Field(description="选择理由") + + +class VerifyResult(BaseModel): + """验证结果""" + is_recruitment_list: bool = Field(description="是否为招聘列表页") + reason: str = Field(description="判断依据") + + +class FindEntryResult(BaseModel): + """找入口结果""" + selector: str | None = Field(description="CSS选择器,找不到则为None") + reason: str = Field(description="选择或找不到的原因") + + +async def get_clean_html(page, max_retries: int = 3) -> str: + """获取清理后的页面 HTML(移除 style/script 等),带重试逻辑""" + for attempt in range(max_retries): + try: + # 等待页面加载稳定 + try: + await page.wait_for_load_state("domcontentloaded", timeout=5000) + except Exception: + pass + + # 额外等待一下,确保页面不再跳转 + await asyncio.sleep(1) + + html = await page.evaluate(""" + () => { + const clone = document.body.cloneNode(true); + clone.querySelectorAll('style, script, noscript, svg, link').forEach(el => el.remove()); + return clone.innerHTML; + } + """) + return html + except Exception as e: + if attempt < max_retries - 1: + # 可能是页面正在跳转,等待后重试 + await asyncio.sleep(2) + else: + # 最后一次尝试失败,返回空字符串 + print(f"[get_clean_html] 获取页面内容失败: {e}") + return "" + + +async def get_search_results(page) -> str: + """提取搜狗搜索结果(结构化文本),并解析搜狗跳转链接获取真实 URL""" + # 第一步:提取搜索结果基本信息 + raw_results = await page.evaluate(""" + () => { + // 获取自然搜索结果(排除广告和推荐框) + const results = document.querySelectorAll('.vrwrap:not(.middle-better-hintBox)'); + if (!results.length) return []; + + return Array.from(results).map((el, i) => { + const title = el.querySelector('.vr-title a, h3 a')?.innerText?.trim() || ''; + + // 优先从后续的 .r-sech 元素的 data-url 获取完整 URL + let url = ''; + const nextEl = el.nextElementSibling; + if (nextEl && nextEl.classList.contains('r-sech')) { + url = nextEl.getAttribute('data-url') || ''; + } + + // 备选:从链接 href 获取 + if (!url) { + const linkEl = el.querySelector('.vr-title a, h3 a'); + url = linkEl?.href || ''; + } + + const desc = el.querySelector('.fz-mid, .star-wiki')?.innerText?.trim() || ''; + return { title, url, desc }; + }).filter(item => item.url && (item.url.startsWith('http://') || item.url.startsWith('https://'))); + } + """) + + if not raw_results: + # 回退到纯文本提取 + return await page.evaluate("() => document.body.innerText.slice(0, 30000)") + + # 第二步:解析搜狗跳转链接获取真实 URL + resolved_results = [] + for item in raw_results: + url = item.get('url', '') + + # 如果是搜狗跳转链接,尝试解析真实 URL + if 'sogou.com/link' in url: + try: + real_url = await resolve_sogou_redirect(page, url) + if real_url: + url = real_url + except Exception: + pass # 解析失败则保留原 URL + + resolved_results.append({ + 'title': item.get('title', ''), + 'url': url, + 'desc': item.get('desc', '') + }) + + # 第三步:格式化输出 + output_lines = [] + for i, item in enumerate(resolved_results): + output_lines.append(f"{i+1}. {item['title']}\nURL: {item['url']}\n{item['desc']}") + + return "\n\n".join(output_lines) + + +async def resolve_sogou_redirect(page, sogou_url: str) -> str: + """解析搜狗跳转链接,获取真实目标 URL""" + try: + # 使用 fetch 获取跳转页面内容 + response_text = await page.evaluate(""" + async (url) => { + try { + const resp = await fetch(url, { redirect: 'manual' }); + const text = await resp.text(); + return text; + } catch (e) { + return ''; + } + } + """, sogou_url) + + if not response_text: + return '' + + # 从 meta refresh 标签提取真实 URL + # 匹配模式: + import re + match = re.search(r"URL='([^']+)'", response_text, re.IGNORECASE) + if match: + return match.group(1) + + # 备选匹配模式: url=http://xxx.com + match = re.search(r'url=([^"\s>]+)', response_text, re.IGNORECASE) + if match: + return match.group(1) + + return '' + except Exception: + return '' + + +async def search_sogou(state: SearchState) -> dict: + """Node 1: 搜狗搜索""" + page = state["page"] + company_name = state["company_name"] + + print(f"[Node 1] 搜狗搜索: {company_name} 校园招聘 官网") + + # 访问搜狗 + await page.goto("https://www.sogou.com") + await asyncio.sleep(1) + + # 搜索 + await page.fill('input[name="query"]', f"{company_name} 校园招聘 官网") + await page.press('input[name="query"]', "Enter") + try: + await page.wait_for_load_state("networkidle", timeout=10000) + except Exception: + pass # 超时也继续 + await asyncio.sleep(2) + + # 获取搜索结果(结构化文本) + search_result_text = await get_search_results(page) + + print(f"[Node 1] 获取搜索结果完成,长度: {len(search_result_text)}") + + # 强制删除微公众号信息网页 + search_result_text = search_result_text.replace("mp.weixin.qq.com", "") + + return {"search_result_html": search_result_text} + + +async def extract_links(state: SearchState) -> dict: + """Node 2: 提取候选链接""" + search_text = state["search_result_html"] + company_name = state["company_name"] + + print("[Node 2] 分析搜索结果,提取候选链接...") + + # LLM 分析 + prompt = EXTRACT_LINKS_PROMPT.format(company_name=company_name, html=search_text) + llm = GeneralLlm.with_structured_output(ExtractLinksResult) + + try: + result = await llm.ainvoke([HumanMessage(content=prompt)]) + except Exception as e: + print(f"[Node 2] LLM 调用失败: {e}") + return { + "candidate_urls": [], + "current_url_index": 0, + "clicked_selectors": [], + "navigate_retry_count": 0 + } + + # 规范化 URL(确保有协议前缀) + normalized_urls = [] + for url in result.urls: + if url and not url.startswith(('http://', 'https://')): + url = 'https://' + url + if url: + normalized_urls.append(url) + + print(f"[Node 2] 提取到 {len(normalized_urls)} 个候选链接:") + for i, url in enumerate(normalized_urls): + print(f" {i+1}. {url}") + + return { + "candidate_urls": normalized_urls, + "current_url_index": 0, + "clicked_selectors": [], + "navigate_retry_count": 0 + } + + +async def visit_and_verify(state: SearchState) -> dict: + """Node 3: 访问并验证是否为招聘列表页""" + context = state["page"].context + candidate_urls = state.get("candidate_urls", []) + current_url_index = state.get("current_url_index", 0) + + # 确保有有效页面 + if not context.pages: + page = await context.new_page() + else: + # 只保留最新的页面 + page = context.pages[-1] + for p in context.pages[:-1]: + try: + await p.close() + except Exception: + pass + + # 检查是否还有候选链接 + if current_url_index >= len(candidate_urls): + print("[Node 3] 候选链接已用完") + return {"error": "所有候选链接均未找到招聘列表页"} + + current_url = candidate_urls[current_url_index] + clicked_selectors = state.get("clicked_selectors", []) + + # 判断是否从 Node 4 跳转回来(已经点击过入口) + if clicked_selectors: + # 从 Node 4 返回,直接使用当前页面 + print(f"[Node 3] 验证跳转后的页面: {page.url}") + page_html = await get_clean_html(page) + else: + # 首次访问该候选链接 + print(f"[Node 3] 访问候选链接 {current_url_index + 1}/{len(candidate_urls)}: {current_url}") + + # 重试机制 + max_retries = 3 + for attempt in range(max_retries): + try: + await page.goto(current_url, wait_until="domcontentloaded", timeout=20000) + # 等待页面稳定(处理可能的跳转) + await asyncio.sleep(2) + try: + await page.wait_for_load_state("networkidle", timeout=5000) + except Exception: + pass + await asyncio.sleep(1) + break + except Exception as e: + print(f"[Node 3] 访问失败 ({attempt + 1}/{max_retries}): {e}") + if attempt == max_retries - 1: + return { + "current_url": current_url, + "current_url_index": current_url_index + 1, + "clicked_selectors": [], + "navigate_retry_count": 0 + } + await asyncio.sleep(2) + + # 获取页面内容 + page_html = await get_clean_html(page) + + # 如果获取失败,尝试下一个候选 + if not page_html: + print("[Node 3] 获取页面内容失败,尝试下一个候选") + return { + "current_url": current_url, + "current_url_index": current_url_index + 1, + "clicked_selectors": [], + "navigate_retry_count": 0 + } + + # 截断 + display_html = page_html + # if len(display_html) > 50000: + # display_html = display_html[:50000] + "\n... (已截断)" + + # LLM 判断 + prompt = VERIFY_RECRUITMENT_LIST_PROMPT.format(html=display_html) + llm = GeneralLlm.with_structured_output(VerifyResult) + + try: + result = await llm.ainvoke([HumanMessage(content=prompt)]) + except Exception as e: + print(f"[Node 3] LLM 调用失败: {e}") + # LLM 失败,尝试下一个候选 + return { + "current_url": current_url, + "current_url_index": current_url_index + 1, + "clicked_selectors": [], + "navigate_retry_count": 0 + } + + print(f"[Node 3] 是否为招聘列表页: {result.is_recruitment_list}") + print(f"[Node 3] 原因: {result.reason}") + + if result.is_recruitment_list: + return { + "current_url": current_url, + "page_html": page_html, + "result_url": page.url # 使用实际 URL(可能有跳转) + } + + return { + "current_url": current_url, + "page_html": page_html + } + + +async def navigate_to_recruitment(state: SearchState) -> dict: + """Node 4: 找入口并跳转""" + context = state["page"].context + + # 确保有有效页面 + if not context.pages: + # 没有页面了,换下一个候选 + current_url_index = state.get("current_url_index", 0) + return { + "current_url_index": current_url_index + 1, + "clicked_selectors": [], + "navigate_retry_count": 0 + } + page = context.pages[-1] + + page_html = state.get("page_html", "") + clicked_selectors = state.get("clicked_selectors", []) + navigate_retry_count = state.get("navigate_retry_count", 0) + current_url_index = state.get("current_url_index", 0) + + print(f"[Node 4] 尝试找跳转入口 (第 {navigate_retry_count + 1}/10 次)") + + # 检查重试次数 + if navigate_retry_count >= 5: + print("[Node 4] 已尝试 5 次,换下一个候选链接") + return { + "current_url_index": current_url_index + 1, + "clicked_selectors": [], + "navigate_retry_count": 0 + } + + # 检查 page_html 是否有效 + if not page_html or len(page_html) < 100: + print("[Node 4] page_html 无效,重新获取") + page_html = await get_clean_html(page) + if not page_html or len(page_html) < 100: + print("[Node 4] 页面内容仍无效,换下一个候选") + return { + "current_url_index": current_url_index + 1, + "clicked_selectors": [], + "navigate_retry_count": 0 + } + + # 截断 HTML + display_html = page_html + # if len(display_html) > 50000: + # display_html = display_html[:50000] + "\n... (已截断)" + + # LLM 分析 + clicked_str = "\n".join(clicked_selectors) if clicked_selectors else "无" + prompt = FIND_RECRUITMENT_ENTRY_PROMPT.format( + html=display_html, + clicked_selectors=clicked_str + ) + llm = GeneralLlm.with_structured_output(FindEntryResult) + + try: + result = await llm.ainvoke([HumanMessage(content=prompt)]) + except Exception as e: + print(f"[Node 4] LLM 调用失败: {e}") + return { + "page_html": page_html, + "clicked_selectors": clicked_selectors, + "navigate_retry_count": navigate_retry_count + 1 + } + + if not result.selector: + print(f"[Node 4] 未找到入口: {result.reason}") + # 找不到也计入重试,继续循环 + return { + "page_html": page_html, + "clicked_selectors": clicked_selectors, + "navigate_retry_count": navigate_retry_count + 1 + } + + print(f"[Node 4] 找到入口: {result.selector}") + print(f"[Node 4] 原因: {result.reason}") + + # 记录点击前的状态 + tabs_before = len(context.pages) + + try: + element = page.locator(result.selector).first + await element.scroll_into_view_if_needed() + await asyncio.sleep(0.2) + await element.hover() + await asyncio.sleep(0.3) + await element.click() + await asyncio.sleep(3) + except Exception as e: + print(f"[Node 4] 点击失败: {e}") + return { + "page_html": page_html, + "clicked_selectors": clicked_selectors + [result.selector], + "navigate_retry_count": navigate_retry_count + 1 + } + + # 检查是否打开新标签页 + tabs_after = len(context.pages) + if tabs_after > tabs_before: + page = context.pages[-1] + print(f"[Node 4] 打开新标签页: {page.url}") + + # 等待页面加载 + try: + await page.wait_for_load_state("networkidle", timeout=10000) + except Exception: + pass + + new_html = await get_clean_html(page) + print(f"[Node 4] 点击后页面: {page.url}") + + return { + "page_html": new_html, + "clicked_selectors": clicked_selectors + [result.selector], + "navigate_retry_count": navigate_retry_count + 1 + } diff --git a/src/search_company_graph/prompts.py b/src/search_company_graph/prompts.py new file mode 100644 index 00000000..179a68b4 --- /dev/null +++ b/src/search_company_graph/prompts.py @@ -0,0 +1,119 @@ +"""LLM 提示词""" + +# Node 2: 从搜索结果提取链接 +EXTRACT_LINKS_PROMPT = """分析以下搜狗搜索结果,找出最可能是 {company_name} 校园招聘 官方页面的链接。 + +## 搜索结果 +{html} + +## 任务 +从上面的搜索结果中提取最可能的官网页面 URL(最多3个),按可能性排序,并去重。 + +## 优先级 +1. 公司官网的招聘频道(如 careers.xxx.com, jobs.xxx.com, xxx.com/careers, talent.xxx.com) +2. 公司官网首页 + +## 排除 +- 新闻、资讯页面 +- 招聘相关但非该公司的页面 +- 第三方招聘地址 +- 和{company_name}公司无关地址 + +## 输出 +仅输出 JSON: +{{ + "urls": ["https://xxx.com/careers"], + "reason": "选择理由" #字数限制15字 +}} + +找不到则: +{{ + "urls": [], + "reason": "原因" #字数限制15字 +}} +""" + +# Node 3: 判断是否为岗位列表页 +VERIFY_RECRUITMENT_LIST_PROMPT = """判断以下页面是否为岗位列表页。 + +## 页面 HTML +{html} + +## 判断标准 +是岗位列表页(举例): +- 展示多个岗位/职位的页面 +- 平铺展示多个岗位的页面(无分页) +- 有列表结构但显示「暂无岗位」的空列表页 + +## 严格标准 +“是岗位列表页”必须满足以下条件之一: + +### 情况1:有岗位数据 +- 页面包含 >= 2 个岗位信息 +- 每个岗位至少有:岗位名称、工作地点/薪资/发布时间之一 +- 岗位以列表/卡片/表格形式展示 + +### 情况2:空列表页(无岗位数据) +必须同时满足以下所有条件: +1. 存在明确的空状态提示,如: + - "暂无岗位"、"无符合条件的职位"、"No results"、"暂无数据" + - 空列表图标 + 提示文字 +2. 存在岗位筛选/搜索功能,如: + - 岗位类型下拉框、地点筛选、关键词搜索框 +3. 页面结构简洁,主体区域明显是用于展示列表 + +### 不是岗位列表页的情况 +- 只有招聘宣传语(如"加入我们"、"企业文化"),无岗位展示区域 +- 只有招聘流程介绍,无具体岗位 +- 只有公司介绍/团队介绍 +- 只有单个岗位的详情页 +- 纯导航页/入口页(需要点击才能看到岗位) + +不是岗位列表页(举例): +- 单个岗位详情页 +- 公司介绍页 +- 招聘宣传页(无岗位列表区域) + +根据以上示例,判断当前页面更接近哪一类。 + +## 输出 +仅输出 JSON: +{{ + "is_recruitment_list": true或false, + "reason": "判断依据" #字数限制20字 +}} +""" + +# Node 4: 找跳转入口 +FIND_RECRUITMENT_ENTRY_PROMPT = """分析以下页面,找出最可能跳转到校园招聘岗位列表的元素。 + +## 页面 HTML +{html} + +## 已尝试过的选择器(避免使用) +{clicked_selectors} + +## 任务 +找到一个最可能通向招聘岗位列表的可点击元素。 + +## 可能的线索 +- 文字:招聘、加入我们、社会招聘、校园招聘、查看职位、岗位列表、人才、career、jobs、join us ... +- 搜索按钮、搜索图标(点击后可能展示岗位列表) +- 导航菜单中的相关项 +- 页面主体区域的按钮或链接 +- 即使没有明确招聘文字,也可能是通向招聘的入口 +- 根据您的理解选择可能连接到岗位列表的线索 + +## 输出 +仅输出 JSON: +{{ + "selector": "CSS选择器", + "reason": "选择原因" #字数限制30字 +}} + +找不到则: +{{ + "selector": null, + "reason": "原因" #字数限制15字 +}} +""" diff --git a/src/search_company_graph/state.py b/src/search_company_graph/state.py new file mode 100644 index 00000000..dad31eae --- /dev/null +++ b/src/search_company_graph/state.py @@ -0,0 +1,31 @@ +"""状态定义""" +from typing import TypedDict +from playwright.async_api import Page + + +class SearchState(TypedDict, total=False): + """搜索公司招聘页面的状态""" + + # 输入 + company_name: str + page: Page + + # Node 1: search_sogou + search_result_html: str + + # Node 2: extract_links + candidate_urls: list[str] # 候选链接列表(最多3个) + current_url_index: int # 当前尝试的索引 + + # Node 3: visit_and_verify + current_url: str # 当前访问的 URL + page_html: str # 当前页面 HTML + + # Node 4: navigate_to_recruitment + clicked_selectors: list[str] # 已点击过的选择器 + navigate_retry_count: int # 页内跳转重试次数(上限10次) + page_changed: bool # 点击后页面是否变化 + + # 结果 + result_url: str # 最终找到的招聘列表页 URL + error: str # 错误信息 diff --git a/test_crawler.py b/test_crawler.py new file mode 100644 index 00000000..0a6887ec --- /dev/null +++ b/test_crawler.py @@ -0,0 +1,62 @@ +"""Crawler 测试脚本""" +import asyncio +from src.crawler import crawl, CrawlerConfig + + +# 中兴招聘测试配置 +# test_config: CrawlerConfig = { +# "url": "https://app.mokahr.com/social-recruitment/zte/47588#/jobs", +# "job_item_selector": ".jobs-list-WmE84RgZxp .container-aOp138AX_X.normal-TBuWTpDMcE.list-oR2doUijv4", +# "item_change_type": "redirect", +# "next_page_selector": ".sd-Pagination-pagination-2kuN2 .sd-Pagination-forward-3z80f", +# "page_change_type": "url_change", +# "field_selectors": { +# "job_title": {"selector": [".title-ROUQFdjmhP"]}, +# "description": {"selector": [".job-description-VvfEUGocNE"]}, +# "location": {"selector": [".info-UcB_mxJq8y span:first-child"]}, +# "company": {"selector": [".basic-info-dB86EjV5uU span:nth-child(2)"]}, +# }, +# "detail_area_selector": None, +# } + +# 美宜佳招聘测试配置 +# test_config: CrawlerConfig = { +# "url": "https://meiyijia.jobs.feishu.cn/social/position/list", +# "job_item_selector": ".listItems__fca8c0 a", +# "item_change_type": "new_tab", +# "next_page_selector": ".pager__fca8c0 .atsx-pagination-next:not(.atsx-pagination-disabled)", +# "page_change_type": "url_change", +# "field_selectors": { +# "job_title": {"selector": [".positionItem-title-text"]}, +# "description": {"selector": [".positionItem-jobDesc"]}, +# "location": {"selector": [".positionItem-subTitle span"]}, +# }, +# "detail_area_selector": None, +# } + +# 三星招聘测试配置 +test_config: CrawlerConfig = { + "url": "https://dearsamsung.zhiye.com/#/samsung/pc/szzw", + "job_item_selector": ".BHGkB li", + "item_change_type": "in_page", + "next_page_selector": "._8x6MD .ant-pagination-next:not([aria-disabled='true']) .ant-pagination-item-link", + "page_change_type": "content_change", + "field_selectors": { + "job_title": {"selector": ["h2"]}, + "description": {"selector": ['.aCl-8 p', '.aCl-8 pre']}, + }, + "detail_area_selector": ".FLf6j", +} + + +async def main(): + results = await crawl(test_config, headless=False) + print(f"\n爬取完成,共 {len(results)} 条数据") + for i, item in enumerate(results): + print(f"\n--- 岗位 {i+1} ---") + for k, v in item.items(): + print(f"{k}: {v}") + + +if __name__ == "__main__": + asyncio.run(main())