Files
offerpai_python_ai/app/ai/resume_extractor/extractor.py
T
2026-04-29 15:55:58 +08:00

182 lines
8.4 KiB
Python

"""简历两阶段并行提取
第一阶段:5路并行提取主表信息 + 各子表标识名(极快,输出极短)。
第二阶段:N路并行提取每条子表记录的详情,description 用字母编号引用原文。
最终组装为与原方案完全一致的 dict 结构,上下游无感知。
"""
import asyncio
import time
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from app.ai.models import LLM
from app.ai.resume_extractor.prompts import (
OVERVIEW_PROFILE_PROMPT, OVERVIEW_EDUCATION_PROMPT, OVERVIEW_WORK_PROMPT,
OVERVIEW_PROJECT_PROMPT, OVERVIEW_COMPETITION_PROMPT,
DETAIL_PROFILE_PROMPT, DETAIL_EDUCATION_PROMPT, DETAIL_WORK_PROMPT,
DETAIL_INTERNSHIP_PROMPT, DETAIL_PROJECT_PROMPT, DETAIL_COMPETITION_PROMPT,
)
from app.core.logger import log
from app.tool.json_helper import parse_llm_json
_LLM_MODEL = LLM.DOUBAO_LITE_32K
# ==================== 文本编号 ====================
def _gen_alpha(n: int):
"""生成 n 个字母编号:a,b,...,z,aa,ab,...,az,ba,..."""
for i in range(n):
yield chr(ord('a') + i) if i < 26 else chr(ord('a') + (i // 26 - 1)) + chr(ord('a') + i % 26)
def _number_lines(text: str) -> tuple[dict[str, str], str]:
"""按换行分割、过滤空行、字母编号,返回 (字母→原文dict, 带编号文本)"""
raw_lines = [line for line in text.split("\n") if line.strip()]
alphas = list(_gen_alpha(len(raw_lines)))
line_map = dict(zip(alphas, raw_lines))
numbered = "\n".join(f"[{a}] {line}" for a, line in zip(alphas, raw_lines))
return line_map, numbered
def _resolve_desc(line_map: dict[str, str], desc_str: str | None) -> list[str]:
"""将逗号分隔的字母编号字符串解析为原文列表"""
if not desc_str or not isinstance(desc_str, str):
return []
keys = [k.strip() for k in desc_str.split(",") if k.strip()]
return [line_map[k] for k in keys if k in line_map]
# ==================== LLM 调用工具 ====================
def _build_chain(prompt: str):
"""构建提取链:prompt → LLM → 文本输出"""
return ChatPromptTemplate.from_messages([("system", prompt), ("human", "{text}")]) | _LLM_MODEL.create(temperature=0) | StrOutputParser()
async def _safe_invoke(chain, inp: dict, label: str):
"""单个链调用,记录耗时,失败返回空"""
start = time.perf_counter()
try:
raw = await chain.ainvoke(inp)
log.info(f"AI提取[{label}]完成,耗时: {time.perf_counter() - start:.2f}s")
return parse_llm_json(raw)
except Exception as e:
log.warning(f"AI提取[{label}]失败,耗时: {time.perf_counter() - start:.2f}s,错误: {e}")
return None
# ==================== 第一阶段:概览 ====================
_overview_profile_chain = _build_chain(OVERVIEW_PROFILE_PROMPT)
_overview_education_chain = _build_chain(OVERVIEW_EDUCATION_PROMPT)
_overview_work_chain = _build_chain(OVERVIEW_WORK_PROMPT)
_overview_project_chain = _build_chain(OVERVIEW_PROJECT_PROMPT)
_overview_competition_chain = _build_chain(OVERVIEW_COMPETITION_PROMPT)
async def _extract_overview(numbered_text: str) -> dict:
"""第一阶段:5路并行提取概览信息"""
inp = {"text": numbered_text}
profile, edu_names, work_names, proj_names, comp_names = await asyncio.gather(
_safe_invoke(_overview_profile_chain, inp, "概览-个人信息"),
_safe_invoke(_overview_education_chain, inp, "概览-教育"),
_safe_invoke(_overview_work_chain, inp, "概览-工作实习"),
_safe_invoke(_overview_project_chain, inp, "概览-项目"),
_safe_invoke(_overview_competition_chain, inp, "概览-竞赛"),
)
return {
"profile": profile if isinstance(profile, dict) else {},
"education": edu_names if isinstance(edu_names, list) else [],
"work": work_names.get("work", []) if isinstance(work_names, dict) else [],
"internship": work_names.get("internship", []) if isinstance(work_names, dict) else [],
"project": proj_names if isinstance(proj_names, list) else [],
"competition": comp_names if isinstance(comp_names, list) else [],
}
# ==================== 第二阶段:详情 ====================
async def _extract_detail(prompt_tpl: str, name: str, numbered_text: str, label: str) -> dict | None:
"""单条子表记录详情提取:用 name 替换 prompt 中的 {name},发送带编号全文"""
prompt = prompt_tpl.replace("{name}", name)
chain = _build_chain(prompt)
return await _safe_invoke(chain, {"text": numbered_text}, label)
async def _extract_all_details(overview: dict, numbered_text: str) -> dict:
"""第二阶段:根据概览结果,N路并行提取所有子表记录详情 + 个人信息的skills/certificates/summary"""
tasks: list = []
task_meta: list[tuple[str, int]] = [] # (模块名, 索引) 用于结果归位
# profile 的 skills/certificates/summaryLines
tasks.append(_extract_detail(DETAIL_PROFILE_PROMPT, "", numbered_text, "详情-个人信息补充"))
task_meta.append(("profile_extra", 0))
for i, name in enumerate(overview["education"]):
tasks.append(_extract_detail(DETAIL_EDUCATION_PROMPT, name, numbered_text, f"详情-教育-{name}"))
task_meta.append(("education", i))
for i, name in enumerate(overview["work"]):
tasks.append(_extract_detail(DETAIL_WORK_PROMPT, name, numbered_text, f"详情-工作-{name}"))
task_meta.append(("work", i))
for i, name in enumerate(overview["internship"]):
tasks.append(_extract_detail(DETAIL_INTERNSHIP_PROMPT, name, numbered_text, f"详情-实习-{name}"))
task_meta.append(("internship", i))
for i, name in enumerate(overview["project"]):
tasks.append(_extract_detail(DETAIL_PROJECT_PROMPT, name, numbered_text, f"详情-项目-{name}"))
task_meta.append(("project", i))
for i, name in enumerate(overview["competition"]):
tasks.append(_extract_detail(DETAIL_COMPETITION_PROMPT, name, numbered_text, f"详情-竞赛-{name}"))
task_meta.append(("competition", i))
results = await asyncio.gather(*tasks)
details: dict[str, list] = {"profile_extra": [], "education": [], "work": [], "internship": [], "project": [], "competition": []}
for (module, _idx), result in zip(task_meta, results):
details[module].append(result if isinstance(result, dict) else {})
return details
# ==================== 组装 ====================
def _assemble(overview: dict, details: dict, line_map: dict[str, str]) -> dict:
"""将两阶段结果组装为与原方案一致的 dict 结构"""
profile = overview["profile"]
# 合并第二阶段提取的 skills/certificates/summaryLines
profile_extra = details.get("profile_extra", [{}])[0] if details.get("profile_extra") else {}
profile["skills"] = profile_extra.get("skills") or []
profile["certificates"] = profile_extra.get("certificates") or []
summary_str = profile_extra.get("summaryLines")
summary_texts = _resolve_desc(line_map, summary_str)
profile["summary"] = "\n".join(summary_texts) if summary_texts else None
result = dict(profile)
for module in ("education", "work", "internship", "project", "competition"):
items = []
for item in details.get(module, []):
desc_str = item.pop("descLines", None)
item["description"] = _resolve_desc(line_map, desc_str)
items.append(item)
result[module] = items
return result
# ==================== 入口 ====================
async def extract_all(text: str) -> dict:
"""两阶段并行提取简历,返回与原方案完全一致的结构化数据"""
line_map, numbered_text = _number_lines(text)
log.info(f"文本编号完成,共 {len(line_map)}")
log.info("第一阶段:5路并行概览提取")
overview = await _extract_overview(numbered_text)
log.info(f"概览完成 - 教育:{len(overview['education'])} 工作:{len(overview['work'])} 实习:{len(overview['internship'])} 项目:{len(overview['project'])} 竞赛:{len(overview['competition'])}")
total = sum(len(overview[m]) for m in ("education", "work", "internship", "project", "competition"))
log.info(f"第二阶段:{total}路并行详情提取")
details = await _extract_all_details(overview, numbered_text)
result = _assemble(overview, details, line_map)
log.info("两阶段提取完成,数据组装完毕")
return result