Files
offerpai_python_ai/app/ai/resume_extractor/extractor.py
T
2026-04-23 15:37:26 +08:00

72 lines
2.6 KiB
Python

"""简历并行提取:将完整简历文本拆分为5个AI任务并行提取"""
import asyncio
import re
from json_repair import repair_json
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from app.ai.models import LLM
from app.ai.resume_extractor.prompts import (
PROFILE_PROMPT, EDUCATION_PROMPT, WORK_PROMPT,
PROJECT_PROMPT, COMPETITION_PROMPT,
)
from app.core.logger import log
def _parse_json(text: str) -> dict:
"""解析 AI 输出的 JSON,自动去除 markdown 代码块包裹,容错处理"""
cleaned = re.sub(r"^```(?:json)?\s*\n?", "", text.strip())
cleaned = re.sub(r"\n?```\s*$", "", cleaned)
return repair_json(cleaned, return_objects=True)
def _build_chain(prompt: str):
"""构建单个提取链:prompt → LLM → 文本输出"""
return (
ChatPromptTemplate.from_messages([("system", prompt), ("human", "{text}")])
| LLM.JIAYU_CLAUDE_SONNET_4_5.create(temperature=0)
| StrOutputParser()
)
# 5 条独立的提取链
_profile_chain = _build_chain(PROFILE_PROMPT)
_education_chain = _build_chain(EDUCATION_PROMPT)
_work_chain = _build_chain(WORK_PROMPT)
_project_chain = _build_chain(PROJECT_PROMPT)
_competition_chain = _build_chain(COMPETITION_PROMPT)
async def extract_all(text: str) -> dict:
"""asyncio.gather 并行提取简历所有模块,返回合并后的结构化数据"""
log.info("开始5路并行AI提取")
inp = {"text": text}
profile, education, work_intern, project, competition = await asyncio.gather(
_safe_invoke(_profile_chain, inp, "个人信息"),
_safe_invoke(_education_chain, inp, "教育经历"),
_safe_invoke(_work_chain, inp, "工作+实习经历"),
_safe_invoke(_project_chain, inp, "项目经历"),
_safe_invoke(_competition_chain, inp, "竞赛经历"),
)
result = profile if isinstance(profile, dict) else {}
result["education"] = education if isinstance(education, list) else []
result["work"] = work_intern.get("work", []) if isinstance(work_intern, dict) else []
result["internship"] = work_intern.get("internship", []) if isinstance(work_intern, dict) else []
result["project"] = project if isinstance(project, list) else []
result["competition"] = competition if isinstance(competition, list) else []
return result
async def _safe_invoke(chain, inp: dict, label: str):
"""单个链调用,失败返回空"""
try:
raw = await chain.ainvoke(inp)
return _parse_json(raw)
except Exception as e:
log.warning(f"AI提取[{label}]失败: {e}")
return {} if "个人信息" in label else []