"""简历并行提取:将完整简历文本拆分为5个AI任务并行提取""" import asyncio from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from app.ai.models import LLM from app.ai.resume_extractor.prompts import ( PROFILE_PROMPT, EDUCATION_PROMPT, WORK_PROMPT, PROJECT_PROMPT, COMPETITION_PROMPT, ) from app.core.logger import log from app.tool.json_helper import parse_llm_json def _build_chain(prompt: str): """构建单个提取链:prompt → LLM → 文本输出""" return ( ChatPromptTemplate.from_messages([("system", prompt), ("human", "{text}")]) | LLM.JIAYU_CLAUDE_SONNET_4_5.create(temperature=0) | StrOutputParser() ) # 5 条独立的提取链 _profile_chain = _build_chain(PROFILE_PROMPT) _education_chain = _build_chain(EDUCATION_PROMPT) _work_chain = _build_chain(WORK_PROMPT) _project_chain = _build_chain(PROJECT_PROMPT) _competition_chain = _build_chain(COMPETITION_PROMPT) async def extract_all(text: str) -> dict: """asyncio.gather 并行提取简历所有模块,返回合并后的结构化数据""" log.info("开始5路并行AI提取") inp = {"text": text} profile, education, work_intern, project, competition = await asyncio.gather( _safe_invoke(_profile_chain, inp, "个人信息"), _safe_invoke(_education_chain, inp, "教育经历"), _safe_invoke(_work_chain, inp, "工作+实习经历"), _safe_invoke(_project_chain, inp, "项目经历"), _safe_invoke(_competition_chain, inp, "竞赛经历"), ) result = profile if isinstance(profile, dict) else {} result["education"] = education if isinstance(education, list) else [] result["work"] = work_intern.get("work", []) if isinstance(work_intern, dict) else [] result["internship"] = work_intern.get("internship", []) if isinstance(work_intern, dict) else [] result["project"] = project if isinstance(project, list) else [] result["competition"] = competition if isinstance(competition, list) else [] return result async def _safe_invoke(chain, inp: dict, label: str): """单个链调用,失败返回空""" try: raw = await chain.ainvoke(inp) return parse_llm_json(raw) except Exception as e: log.warning(f"AI提取[{label}]失败: {e}") return {} if "个人信息" in label else []