151 lines
6.8 KiB
Python
151 lines
6.8 KiB
Python
"""简历两阶段并行提取
|
|
|
|
第一阶段:5路并行提取主表短字段 + 各子表标识名(极快,输出极短)。
|
|
第二阶段:N+1路并行提取每条子表记录的详情(含description原文)+ 个人信息补充(skills/certificates/summary)。
|
|
最终组装为与原方案完全一致的 dict 结构,上下游无感知。
|
|
"""
|
|
|
|
import asyncio
|
|
import time
|
|
|
|
from langchain_core.output_parsers import StrOutputParser
|
|
from langchain_core.prompts import ChatPromptTemplate
|
|
|
|
from app.ai.models import LLM
|
|
from app.ai.resume_extractor.prompts import (
|
|
OVERVIEW_PROFILE_PROMPT, OVERVIEW_EDUCATION_PROMPT, OVERVIEW_WORK_PROMPT,
|
|
OVERVIEW_PROJECT_PROMPT, OVERVIEW_COMPETITION_PROMPT,
|
|
DETAIL_PROFILE_PROMPT, DETAIL_EDUCATION_PROMPT, DETAIL_WORK_PROMPT,
|
|
DETAIL_INTERNSHIP_PROMPT, DETAIL_PROJECT_PROMPT, DETAIL_COMPETITION_PROMPT,
|
|
)
|
|
from app.core.logger import log
|
|
from app.tool.json_helper import parse_llm_json
|
|
|
|
_LLM_MODEL = LLM.DOUBAO_LITE_32K
|
|
|
|
|
|
# ==================== LLM 调用工具 ====================
|
|
|
|
def _build_chain(prompt: str):
|
|
"""构建提取链:prompt → LLM → 文本输出"""
|
|
return ChatPromptTemplate.from_messages([("system", prompt), ("human", "{text}")]) | _LLM_MODEL.create(temperature=0) | StrOutputParser()
|
|
|
|
|
|
async def _safe_invoke(chain, inp: dict, label: str):
|
|
"""单个链调用,记录耗时,失败返回空"""
|
|
start = time.perf_counter()
|
|
try:
|
|
raw = await chain.ainvoke(inp)
|
|
log.info(f"AI提取[{label}]完成,耗时: {time.perf_counter() - start:.2f}s")
|
|
return parse_llm_json(raw)
|
|
except Exception as e:
|
|
log.warning(f"AI提取[{label}]失败,耗时: {time.perf_counter() - start:.2f}s,错误: {e}")
|
|
return None
|
|
|
|
|
|
# ==================== 第一阶段:概览 ====================
|
|
|
|
_overview_profile_chain = _build_chain(OVERVIEW_PROFILE_PROMPT)
|
|
_overview_education_chain = _build_chain(OVERVIEW_EDUCATION_PROMPT)
|
|
_overview_work_chain = _build_chain(OVERVIEW_WORK_PROMPT)
|
|
_overview_project_chain = _build_chain(OVERVIEW_PROJECT_PROMPT)
|
|
_overview_competition_chain = _build_chain(OVERVIEW_COMPETITION_PROMPT)
|
|
|
|
|
|
async def _extract_overview(text: str) -> dict:
|
|
"""第一阶段:5路并行提取概览信息"""
|
|
inp = {"text": text}
|
|
profile, edu_names, work_names, proj_names, comp_names = await asyncio.gather(
|
|
_safe_invoke(_overview_profile_chain, inp, "概览-个人信息"),
|
|
_safe_invoke(_overview_education_chain, inp, "概览-教育"),
|
|
_safe_invoke(_overview_work_chain, inp, "概览-工作实习"),
|
|
_safe_invoke(_overview_project_chain, inp, "概览-项目"),
|
|
_safe_invoke(_overview_competition_chain, inp, "概览-竞赛"),
|
|
)
|
|
return {
|
|
"profile": profile if isinstance(profile, dict) else {},
|
|
"education": edu_names if isinstance(edu_names, list) else [],
|
|
"work": work_names.get("work", []) if isinstance(work_names, dict) else [],
|
|
"internship": work_names.get("internship", []) if isinstance(work_names, dict) else [],
|
|
"project": proj_names if isinstance(proj_names, list) else [],
|
|
"competition": comp_names if isinstance(comp_names, list) else [],
|
|
}
|
|
|
|
|
|
# ==================== 第二阶段:详情 ====================
|
|
|
|
async def _extract_detail(prompt_tpl: str, name: str, text: str, label: str) -> dict | None:
|
|
"""单条子表记录详情提取:用 name 替换 prompt 中的 {name}"""
|
|
prompt = prompt_tpl.replace("{name}", name)
|
|
chain = _build_chain(prompt)
|
|
return await _safe_invoke(chain, {"text": text}, label)
|
|
|
|
|
|
async def _extract_all_details(overview: dict, text: str) -> dict:
|
|
"""第二阶段:根据概览结果,N+1路并行提取所有子表记录详情 + 个人信息补充"""
|
|
tasks: list = []
|
|
task_meta: list[tuple[str, int]] = []
|
|
|
|
# profile 补充:skills/certificates/summary
|
|
tasks.append(_extract_detail(DETAIL_PROFILE_PROMPT, "", text, "详情-个人信息补充"))
|
|
task_meta.append(("profile_extra", 0))
|
|
|
|
for i, name in enumerate(overview["education"]):
|
|
tasks.append(_extract_detail(DETAIL_EDUCATION_PROMPT, name, text, f"详情-教育-{name}"))
|
|
task_meta.append(("education", i))
|
|
for i, name in enumerate(overview["work"]):
|
|
tasks.append(_extract_detail(DETAIL_WORK_PROMPT, name, text, f"详情-工作-{name}"))
|
|
task_meta.append(("work", i))
|
|
for i, name in enumerate(overview["internship"]):
|
|
tasks.append(_extract_detail(DETAIL_INTERNSHIP_PROMPT, name, text, f"详情-实习-{name}"))
|
|
task_meta.append(("internship", i))
|
|
for i, name in enumerate(overview["project"]):
|
|
tasks.append(_extract_detail(DETAIL_PROJECT_PROMPT, name, text, f"详情-项目-{name}"))
|
|
task_meta.append(("project", i))
|
|
for i, name in enumerate(overview["competition"]):
|
|
tasks.append(_extract_detail(DETAIL_COMPETITION_PROMPT, name, text, f"详情-竞赛-{name}"))
|
|
task_meta.append(("competition", i))
|
|
|
|
results = await asyncio.gather(*tasks)
|
|
details: dict[str, list] = {"profile_extra": [], "education": [], "work": [], "internship": [], "project": [], "competition": []}
|
|
for (module, _idx), result in zip(task_meta, results):
|
|
details[module].append(result if isinstance(result, dict) else {})
|
|
return details
|
|
|
|
|
|
# ==================== 组装 ====================
|
|
|
|
def _assemble(overview: dict, details: dict) -> dict:
|
|
"""将两阶段结果组装为与原方案一致的 dict 结构"""
|
|
profile = overview["profile"]
|
|
profile_extra = details.get("profile_extra", [{}])[0] if details.get("profile_extra") else {}
|
|
profile["skills"] = profile_extra.get("skills") or []
|
|
profile["certificates"] = profile_extra.get("certificates") or []
|
|
profile["summary"] = profile_extra.get("summary")
|
|
result = dict(profile)
|
|
for module in ("education", "work", "internship", "project", "competition"):
|
|
items = []
|
|
for item in details.get(module, []):
|
|
if not item.get("description"):
|
|
item["description"] = []
|
|
items.append(item)
|
|
result[module] = items
|
|
return result
|
|
|
|
|
|
# ==================== 入口 ====================
|
|
|
|
async def extract_all(text: str) -> dict:
|
|
"""两阶段并行提取简历,返回与原方案完全一致的结构化数据"""
|
|
log.info("第一阶段:5路并行概览提取")
|
|
overview = await _extract_overview(text)
|
|
log.info(f"概览完成 - 教育:{len(overview['education'])} 工作:{len(overview['work'])} 实习:{len(overview['internship'])} 项目:{len(overview['project'])} 竞赛:{len(overview['competition'])}")
|
|
|
|
total = sum(len(overview[m]) for m in ("education", "work", "internship", "project", "competition"))
|
|
log.info(f"第二阶段:{total + 1}路并行详情提取")
|
|
details = await _extract_all_details(overview, text)
|
|
|
|
result = _assemble(overview, details)
|
|
log.info("两阶段提取完成,数据组装完毕")
|
|
return result
|