"""简历两阶段并行提取 第一阶段:5路并行提取主表短字段 + 各子表标识名(极快,输出极短)。 第二阶段:N+1路并行提取每条子表记录的详情(含description原文)+ 个人信息补充(skills/certificates/summary)。 最终组装为与原方案完全一致的 dict 结构,上下游无感知。 """ import asyncio import time from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from app.ai.model_config import ResumeExtractorModel from app.ai.resume_extractor.prompts import ( OVERVIEW_PROFILE_PROMPT, OVERVIEW_EDUCATION_PROMPT, OVERVIEW_WORK_PROMPT, OVERVIEW_PROJECT_PROMPT, OVERVIEW_COMPETITION_PROMPT, DETAIL_PROFILE_PROMPT, DETAIL_EDUCATION_PROMPT, DETAIL_WORK_PROMPT, DETAIL_INTERNSHIP_PROMPT, DETAIL_PROJECT_PROMPT, DETAIL_COMPETITION_PROMPT, ) from app.core.logger import log from app.tool.json_helper import parse_llm_json # ==================== LLM 调用工具 ==================== def _build_chain(prompt: str): """构建提取链:prompt → LLM → 文本输出""" return ChatPromptTemplate.from_messages([("system", prompt), ("human", "{text}")]) | ResumeExtractorModel.PARSE | StrOutputParser() async def _safe_invoke(chain, inp: dict, label: str): """单个链调用,记录耗时,失败返回空""" start = time.perf_counter() try: raw = await chain.ainvoke(inp) log.info(f"AI提取[{label}]完成,耗时: {time.perf_counter() - start:.2f}s") return parse_llm_json(raw) except Exception as e: log.warning(f"AI提取[{label}]失败,耗时: {time.perf_counter() - start:.2f}s,错误: {e}") return None # ==================== 第一阶段:概览 ==================== _overview_profile_chain = _build_chain(OVERVIEW_PROFILE_PROMPT) _overview_education_chain = _build_chain(OVERVIEW_EDUCATION_PROMPT) _overview_work_chain = _build_chain(OVERVIEW_WORK_PROMPT) _overview_project_chain = _build_chain(OVERVIEW_PROJECT_PROMPT) _overview_competition_chain = _build_chain(OVERVIEW_COMPETITION_PROMPT) async def _extract_overview(text: str) -> dict: """第一阶段:5路并行提取概览信息""" inp = {"text": text} profile, edu_names, work_names, proj_names, comp_names = await asyncio.gather( _safe_invoke(_overview_profile_chain, inp, "概览-个人信息"), _safe_invoke(_overview_education_chain, inp, "概览-教育"), _safe_invoke(_overview_work_chain, inp, "概览-工作实习"), _safe_invoke(_overview_project_chain, inp, "概览-项目"), _safe_invoke(_overview_competition_chain, inp, "概览-竞赛"), ) return { "profile": profile if isinstance(profile, dict) else {}, "education": edu_names if isinstance(edu_names, list) else [], "work": work_names.get("work", []) if isinstance(work_names, dict) else [], "internship": work_names.get("internship", []) if isinstance(work_names, dict) else [], "project": proj_names if isinstance(proj_names, list) else [], "competition": comp_names if isinstance(comp_names, list) else [], } # ==================== 第二阶段:详情 ==================== async def _extract_detail(prompt_tpl: str, name: str, text: str, label: str) -> dict | None: """单条子表记录详情提取:用 name 替换 prompt 中的 {name}""" prompt = prompt_tpl.replace("{name}", name) chain = _build_chain(prompt) return await _safe_invoke(chain, {"text": text}, label) async def _extract_all_details(overview: dict, text: str) -> dict: """第二阶段:根据概览结果,N+1路并行提取所有子表记录详情 + 个人信息补充""" tasks: list = [] task_meta: list[tuple[str, int]] = [] # profile 补充:skills/certificates/summary tasks.append(_extract_detail(DETAIL_PROFILE_PROMPT, "", text, "详情-个人信息补充")) task_meta.append(("profile_extra", 0)) for i, name in enumerate(overview["education"]): tasks.append(_extract_detail(DETAIL_EDUCATION_PROMPT, name, text, f"详情-教育-{name}")) task_meta.append(("education", i)) for i, name in enumerate(overview["work"]): tasks.append(_extract_detail(DETAIL_WORK_PROMPT, name, text, f"详情-工作-{name}")) task_meta.append(("work", i)) for i, name in enumerate(overview["internship"]): tasks.append(_extract_detail(DETAIL_INTERNSHIP_PROMPT, name, text, f"详情-实习-{name}")) task_meta.append(("internship", i)) for i, name in enumerate(overview["project"]): tasks.append(_extract_detail(DETAIL_PROJECT_PROMPT, name, text, f"详情-项目-{name}")) task_meta.append(("project", i)) for i, name in enumerate(overview["competition"]): tasks.append(_extract_detail(DETAIL_COMPETITION_PROMPT, name, text, f"详情-竞赛-{name}")) task_meta.append(("competition", i)) results = await asyncio.gather(*tasks) details: dict[str, list] = {"profile_extra": [], "education": [], "work": [], "internship": [], "project": [], "competition": []} for (module, _idx), result in zip(task_meta, results): details[module].append(result if isinstance(result, dict) else {}) return details # ==================== 组装 ==================== def _assemble(overview: dict, details: dict) -> dict: """将两阶段结果组装为与原方案一致的 dict 结构""" profile = overview["profile"] profile_extra = details.get("profile_extra", [{}])[0] if details.get("profile_extra") else {} profile["skills"] = profile_extra.get("skills") or [] profile["certificates"] = profile_extra.get("certificates") or [] profile["summary"] = profile_extra.get("summary") result = dict(profile) for module in ("education", "work", "internship", "project", "competition"): items = [] for item in details.get(module, []): if not item.get("description"): item["description"] = [] items.append(item) result[module] = items return result # ==================== 入口 ==================== async def extract_all(text: str) -> dict: """两阶段并行提取简历,返回与原方案完全一致的结构化数据""" log.info("第一阶段:5路并行概览提取") overview = await _extract_overview(text) log.info(f"概览完成 - 教育:{len(overview['education'])} 工作:{len(overview['work'])} 实习:{len(overview['internship'])} 项目:{len(overview['project'])} 竞赛:{len(overview['competition'])}") total = sum(len(overview[m]) for m in ("education", "work", "internship", "project", "competition")) log.info(f"第二阶段:{total + 1}路并行详情提取") details = await _extract_all_details(overview, text) result = _assemble(overview, details) log.info("两阶段提取完成,数据组装完毕") return result