Files

149 lines
6.8 KiB
Python

"""简历两阶段并行提取
第一阶段:5路并行提取主表短字段 + 各子表标识名(极快,输出极短)。
第二阶段:N+1路并行提取每条子表记录的详情(含description原文)+ 个人信息补充(skills/certificates/summary)。
最终组装为与原方案完全一致的 dict 结构,上下游无感知。
"""
import asyncio
import time
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from app.ai.model_config import ResumeExtractorModel
from app.ai.resume_extractor.prompts import (
OVERVIEW_PROFILE_PROMPT, OVERVIEW_EDUCATION_PROMPT, OVERVIEW_WORK_PROMPT,
OVERVIEW_PROJECT_PROMPT, OVERVIEW_COMPETITION_PROMPT,
DETAIL_PROFILE_PROMPT, DETAIL_EDUCATION_PROMPT, DETAIL_WORK_PROMPT,
DETAIL_INTERNSHIP_PROMPT, DETAIL_PROJECT_PROMPT, DETAIL_COMPETITION_PROMPT,
)
from app.core.logger import log
from app.tool.json_helper import parse_llm_json
# ==================== LLM 调用工具 ====================
def _build_chain(prompt: str):
"""构建提取链:prompt → LLM → 文本输出"""
return ChatPromptTemplate.from_messages([("system", prompt), ("human", "{text}")]) | ResumeExtractorModel.PARSE | StrOutputParser()
async def _safe_invoke(chain, inp: dict, label: str):
"""单个链调用,记录耗时,失败返回空"""
start = time.perf_counter()
try:
raw = await chain.ainvoke(inp)
log.info(f"AI提取[{label}]完成,耗时: {time.perf_counter() - start:.2f}s")
return parse_llm_json(raw)
except Exception as e:
log.warning(f"AI提取[{label}]失败,耗时: {time.perf_counter() - start:.2f}s,错误: {e}")
return None
# ==================== 第一阶段:概览 ====================
_overview_profile_chain = _build_chain(OVERVIEW_PROFILE_PROMPT)
_overview_education_chain = _build_chain(OVERVIEW_EDUCATION_PROMPT)
_overview_work_chain = _build_chain(OVERVIEW_WORK_PROMPT)
_overview_project_chain = _build_chain(OVERVIEW_PROJECT_PROMPT)
_overview_competition_chain = _build_chain(OVERVIEW_COMPETITION_PROMPT)
async def _extract_overview(text: str) -> dict:
"""第一阶段:5路并行提取概览信息"""
inp = {"text": text}
profile, edu_names, work_names, proj_names, comp_names = await asyncio.gather(
_safe_invoke(_overview_profile_chain, inp, "概览-个人信息"),
_safe_invoke(_overview_education_chain, inp, "概览-教育"),
_safe_invoke(_overview_work_chain, inp, "概览-工作实习"),
_safe_invoke(_overview_project_chain, inp, "概览-项目"),
_safe_invoke(_overview_competition_chain, inp, "概览-竞赛"),
)
return {
"profile": profile if isinstance(profile, dict) else {},
"education": edu_names if isinstance(edu_names, list) else [],
"work": work_names.get("work", []) if isinstance(work_names, dict) else [],
"internship": work_names.get("internship", []) if isinstance(work_names, dict) else [],
"project": proj_names if isinstance(proj_names, list) else [],
"competition": comp_names if isinstance(comp_names, list) else [],
}
# ==================== 第二阶段:详情 ====================
async def _extract_detail(prompt_tpl: str, name: str, text: str, label: str) -> dict | None:
"""单条子表记录详情提取:用 name 替换 prompt 中的 {name}"""
prompt = prompt_tpl.replace("{name}", name)
chain = _build_chain(prompt)
return await _safe_invoke(chain, {"text": text}, label)
async def _extract_all_details(overview: dict, text: str) -> dict:
"""第二阶段:根据概览结果,N+1路并行提取所有子表记录详情 + 个人信息补充"""
tasks: list = []
task_meta: list[tuple[str, int]] = []
# profile 补充:skills/certificates/summary
tasks.append(_extract_detail(DETAIL_PROFILE_PROMPT, "", text, "详情-个人信息补充"))
task_meta.append(("profile_extra", 0))
for i, name in enumerate(overview["education"]):
tasks.append(_extract_detail(DETAIL_EDUCATION_PROMPT, name, text, f"详情-教育-{name}"))
task_meta.append(("education", i))
for i, name in enumerate(overview["work"]):
tasks.append(_extract_detail(DETAIL_WORK_PROMPT, name, text, f"详情-工作-{name}"))
task_meta.append(("work", i))
for i, name in enumerate(overview["internship"]):
tasks.append(_extract_detail(DETAIL_INTERNSHIP_PROMPT, name, text, f"详情-实习-{name}"))
task_meta.append(("internship", i))
for i, name in enumerate(overview["project"]):
tasks.append(_extract_detail(DETAIL_PROJECT_PROMPT, name, text, f"详情-项目-{name}"))
task_meta.append(("project", i))
for i, name in enumerate(overview["competition"]):
tasks.append(_extract_detail(DETAIL_COMPETITION_PROMPT, name, text, f"详情-竞赛-{name}"))
task_meta.append(("competition", i))
results = await asyncio.gather(*tasks)
details: dict[str, list] = {"profile_extra": [], "education": [], "work": [], "internship": [], "project": [], "competition": []}
for (module, _idx), result in zip(task_meta, results):
details[module].append(result if isinstance(result, dict) else {})
return details
# ==================== 组装 ====================
def _assemble(overview: dict, details: dict) -> dict:
"""将两阶段结果组装为与原方案一致的 dict 结构"""
profile = overview["profile"]
profile_extra = details.get("profile_extra", [{}])[0] if details.get("profile_extra") else {}
profile["skills"] = profile_extra.get("skills") or []
profile["certificates"] = profile_extra.get("certificates") or []
profile["summary"] = profile_extra.get("summary")
result = dict(profile)
for module in ("education", "work", "internship", "project", "competition"):
items = []
for item in details.get(module, []):
if not item.get("description"):
item["description"] = []
items.append(item)
result[module] = items
return result
# ==================== 入口 ====================
async def extract_all(text: str) -> dict:
"""两阶段并行提取简历,返回与原方案完全一致的结构化数据"""
log.info("第一阶段:5路并行概览提取")
overview = await _extract_overview(text)
log.info(f"概览完成 - 教育:{len(overview['education'])} 工作:{len(overview['work'])} 实习:{len(overview['internship'])} 项目:{len(overview['project'])} 竞赛:{len(overview['competition'])}")
total = sum(len(overview[m]) for m in ("education", "work", "internship", "project", "competition"))
log.info(f"第二阶段:{total + 1}路并行详情提取")
details = await _extract_all_details(overview, text)
result = _assemble(overview, details)
log.info("两阶段提取完成,数据组装完毕")
return result