Files
offerpai_python_ai/app/services/skill_gap_service.py
T
2026-04-28 11:16:50 +08:00

326 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""技能差距分析 + 定制简历 Service
岗位技能差距分析 → 定制简历生成/查询/编辑/回滚 → AI 对话式编辑。
依赖:skill_gap_analyzerAI引擎)
使用表:bg_job(读)、bg_user_resume + 5张子表(读)
存储:Redis(定制简历 + 回滚数据)
"""
import asyncio
import json
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.ai.skill_gap_analyzer.analyzer import (
analyze_skill_gap, optimize_summary, optimize_module,
plan_edit, execute_record_edit, execute_record_add,
)
from app.ai.skill_gap_analyzer.prompts import MODULE_SCHEMAS
from app.core.logger import log
from app.schemas.customize_resume import (
CustomizeResume, ResumeProfile, Education, Work, Internship, Project, Competition,
)
from app.models.job import Job
from app.models.user_resume import UserResume
from app.services.resume_loader import ResumeDetail, load_resume_detail, load_default_resume_detail
from app.services import customize_resume_store
# 模块名 → 中文标签映射
_MODULE_LABELS = {
"resume": "个人简介",
"education": "教育经历",
"work": "工作经历",
"internship": "实习经历",
"project": "项目经历",
"competition": "竞赛经历",
}
def _build_resume_json(detail: ResumeDetail) -> str:
"""拼装简历 JSON 字符串供 AI 使用"""
resume = detail.resume
data = {
"skills": resume.skills or [],
"certificates": resume.certificates or [],
"summary": resume.summary or "",
"targetPosition": resume.target_position or "",
}
if detail.education:
data["education"] = [{"school": r.school, "major": r.major, "degree": r.degree, "description": r.description} for r in detail.education]
if detail.work:
data["work"] = [{"companyName": r.company_name, "position": r.position, "description": r.description} for r in detail.work]
if detail.internship:
data["internship"] = [{"companyName": r.company_name, "position": r.position, "description": r.description} for r in detail.internship]
if detail.project:
data["project"] = [{"companyName": r.company_name, "projectName": r.project_name, "role": r.role, "description": r.description} for r in detail.project]
if detail.competition:
data["competition"] = [{"competitionName": r.competition_name, "award": r.award, "description": r.description} for r in detail.competition]
return json.dumps(data, ensure_ascii=False)
class SkillGapService:
def __init__(self, session: AsyncSession):
self.session = session
# ===== 差距分析 =====
async def analyze_skill_gap(self, user_id: int, job_id: int) -> dict:
"""差距分析完整流程:查简历 → 查岗位 → AI分析 → 计算匹配分"""
# 1. 自动选择简历
detail = await load_default_resume_detail(self.session, user_id)
# 2. 查岗位
job = await self._get_job(job_id)
skill_tags: list[str] = job.skill_tags or []
# 3. skill_tags 为空 → 满分
if not skill_tags:
return self._gap_result(10.0, job, detail.resume, [])
# 4. 拼 AI 输入
resume_json = _build_resume_json(detail)
# 5. AI 分析
missing = await analyze_skill_gap(skill_tags, resume_json)
# 6. 计算匹配分
score = round((len(skill_tags) - len(missing)) / len(skill_tags) * 10, 1)
return self._gap_result(score, job, detail.resume, missing)
@staticmethod
def _gap_result(score: float, job: Job, resume: UserResume, missing: list[str]) -> dict:
return {
"score": score,
"job": {"jobId": str(job.id), "title": job.title, "skillTags": job.skill_tags or []},
"resume": {"resumeId": str(resume.id), "resumeName": resume.resume_name or "", "targetPosition": resume.target_position or ""},
"missingSkills": missing,
}
# ===== 生成定制简历 =====
async def generate_customize_resume(self, user_id: int, job_id: int, resume_id: int,
optimize_modules: list[str], add_skills: list[str]) -> None:
"""生成定制简历:查数据 → 并发AI优化 → 存Redis"""
if not optimize_modules:
raise ValueError("请至少选择一个优化模块")
# 1. 查简历 + 岗位
detail = await load_resume_detail(self.session, resume_id, user_id)
job = await self._get_job(job_id)
# 2. 组装基础定制简历
cr = customize_resume_store.build_from_detail(detail)
# 3. 并发 AI 优化
tasks = []
job_desc = f"{job.description or ''}\n{job.requirement or ''}"
if "summary" in optimize_modules:
tasks.append(("summary", optimize_summary(job.title or "", add_skills, detail.resume.summary or "")))
if "experience" in optimize_modules:
for module_name, rows_json in self._experience_tasks(cr, job.title or "", job_desc):
tasks.append((module_name, optimize_module(job.title or "", job_desc, rows_json)))
# 执行并发
if tasks:
keys = [t[0] for t in tasks]
results = await asyncio.gather(*[t[1] for t in tasks], return_exceptions=True)
for key, result in zip(keys, results):
if isinstance(result, Exception):
log.warning(f"定制简历优化[{key}]失败: {result}")
continue
self._apply_optimize_result(cr, key, result)
# 4. skills 追加(纯内存操作)
if "skills" in optimize_modules and add_skills:
existing = set(cr.resume.skills)
cr.resume.skills.extend([s for s in add_skills if s not in existing])
# 5. 存数据库
await customize_resume_store.save(user_id, job_id, cr)
@staticmethod
def _experience_tasks(cr: CustomizeResume, job_title: str, job_desc: str) -> list[tuple[str, str]]:
"""构建各子表的 AI 优化任务列表"""
result = []
for name, items in [("education", cr.education), ("work", cr.work), ("internship", cr.internship),
("project", cr.project), ("competition", cr.competition)]:
if items:
result.append((name, json.dumps([item.model_dump(by_alias=True) for item in items], ensure_ascii=False)))
return result
@staticmethod
def _apply_optimize_result(cr: CustomizeResume, key: str, result) -> None:
"""将 AI 优化结果应用到定制简历"""
if key == "summary" and isinstance(result, str):
cr.resume.summary = result
elif key == "education" and isinstance(result, list):
cr.education = [Education.model_validate(item) for item in result]
elif key == "work" and isinstance(result, list):
cr.work = [Work.model_validate(item) for item in result]
elif key == "internship" and isinstance(result, list):
cr.internship = [Internship.model_validate(item) for item in result]
elif key == "project" and isinstance(result, list):
cr.project = [Project.model_validate(item) for item in result]
elif key == "competition" and isinstance(result, list):
cr.competition = [Competition.model_validate(item) for item in result]
# ===== AI 对话编辑 =====
async def ai_edit_customize_resume(self, user_id: int, job_id: int,
instruction: str, chat_history: list) -> dict:
"""AI 对话式编辑定制简历(原子化操作版)"""
# 1. 取当前定制简历
cr_data = await customize_resume_store.get(user_id, job_id)
if not cr_data:
raise ValueError("定制简历不存在,请先生成")
cr = CustomizeResume.model_validate(cr_data)
resume_json = cr.model_dump_json(by_alias=True)
# 2. 查岗位
job = await self._get_job(job_id)
job_desc = f"{job.description or ''}\n{job.requirement or ''}"
# 3. 规划 AI(意图识别 + 操作原子化)
history_str = json.dumps(chat_history, ensure_ascii=False) if chat_history else ""
plan = await plan_edit(job.title or "", job_desc, resume_json, history_str, instruction)
if not plan:
return {"type": "message", "message": "抱歉,我没有理解你的意思,请再描述一下。"}
if plan.get("action") == "chat":
return {"type": "message", "message": plan.get("message", "请再描述一下你的需求。")}
# 4. 解析操作列表
operations = plan.get("operations", [])
if not operations:
return {"type": "message", "message": plan.get("message", "请再描述一下你的需求。")}
# 截取最近10条对话历史
recent_history = chat_history[-10:] if len(chat_history) > 10 else chat_history
recent_history_str = json.dumps(recent_history, ensure_ascii=False) if recent_history else ""
# 5. 按操作类型分发执行
# 先处理 delete(零 AI 开销)
for op in operations:
if op.get("type") == "delete":
self._apply_delete(cr, op.get("module", ""), op.get("id", ""))
# 并发执行 update 和 add
ai_tasks = []
for op in operations:
op_type = op.get("type", "")
mod_name = op.get("module", "")
op_instruction = op.get("instruction", "")
schema = MODULE_SCHEMAS.get(mod_name, "")
if op_type == "update":
record_data = self._get_record_data(cr, mod_name, op.get("id"))
if record_data is not None:
ai_tasks.append((
"update", mod_name, op.get("id"),
execute_record_edit(
job.title or "", job_desc, op_instruction,
recent_history_str, schema, record_data,
),
))
elif op_type == "add":
ai_tasks.append((
"add", mod_name, None,
execute_record_add(
job.title or "", job_desc, op_instruction,
recent_history_str, schema,
),
))
# 并发执行
if ai_tasks:
coros = [t[3] for t in ai_tasks]
results = await asyncio.gather(*coros, return_exceptions=True)
for (op_type, mod_name, record_id, _), result in zip(ai_tasks, results):
if isinstance(result, Exception):
log.warning(f"AI编辑[{op_type}/{mod_name}/{record_id}]失败: {result}")
continue
if result is None:
continue
if op_type == "update":
self._apply_record_update(cr, mod_name, record_id, result)
elif op_type == "add":
self._apply_record_add(cr, mod_name, result)
# 6. 保存(自动备份回滚)
await customize_resume_store.save(user_id, job_id, cr)
# 拼接更新模块标签
updated_modules = list(dict.fromkeys(op.get("module", "") for op in operations))
label = "".join(_MODULE_LABELS.get(m, m) for m in updated_modules if m)
return {"type": "updated", "message": f"完成!已更新:{label or '简历内容'}"}
@staticmethod
def _get_record_data(cr: CustomizeResume, mod_name: str, record_id: str | None) -> str | None:
"""获取单条记录的 JSON 数据,resume 主表返回整个对象"""
if mod_name == "resume":
return cr.resume.model_dump_json(by_alias=True)
mapping = {
"education": cr.education, "work": cr.work, "internship": cr.internship,
"project": cr.project, "competition": cr.competition,
}
items = mapping.get(mod_name, [])
if not record_id:
return None
for item in items:
if item.id == record_id:
return item.model_dump_json(by_alias=True)
log.warning(f"未找到记录[{mod_name}/{record_id}]")
return None
@staticmethod
def _apply_delete(cr: CustomizeResume, mod_name: str, record_id: str) -> None:
"""删除指定模块中的一条记录"""
if not record_id or mod_name == "resume":
return
mapping = {
"education": cr.education, "work": cr.work, "internship": cr.internship,
"project": cr.project, "competition": cr.competition,
}
items = mapping.get(mod_name)
if items is not None:
for i, item in enumerate(items):
if item.id == record_id:
items.pop(i)
break
@staticmethod
def _apply_record_update(cr: CustomizeResume, mod_name: str, record_id: str | None, result) -> None:
"""将 AI 修改结果替换回对应记录"""
try:
if mod_name == "resume" and isinstance(result, dict):
cr.resume = ResumeProfile.model_validate(result)
return
model_map = {
"education": Education, "work": Work, "internship": Internship,
"project": Project, "competition": Competition,
}
model_cls = model_map.get(mod_name)
if not model_cls or not isinstance(result, dict) or not record_id:
return
list_map = {
"education": cr.education, "work": cr.work, "internship": cr.internship,
"project": cr.project, "competition": cr.competition,
}
items = list_map.get(mod_name, [])
new_item = model_cls.model_validate(result)
for i, item in enumerate(items):
if item.id == record_id:
items[i] = new_item
break
except Exception as e:
log.warning(f"应用AI编辑结果[{mod_name}/{record_id}]失败: {e}")
@staticmethod
def _apply_record_add(cr: CustomizeResume, mod_name: str, result) -> None:
"""将 AI 新增的记录追加到对应模块"""
try:
model_map = {
"education": (Education, cr.education),
"work": (Work, cr.work),
"internship": (Internship, cr.internship),
"project": (Project, cr.project),
"competition": (Competition, cr.competition),
}
entry = model_map.get(mod_name)
if not entry or not isinstance(result, dict):
return
model_cls, items = entry
items.append(model_cls.model_validate(result))
except Exception as e:
log.warning(f"应用AI新增记录[{mod_name}]失败: {e}")
# ===== 内部工具方法 =====
async def _get_job(self, job_id: int) -> Job:
"""查岗位"""
result = await self.session.execute(select(Job).where(Job.id == job_id))
job = result.scalar_one_or_none()
if not job:
raise ValueError("岗位不存在")
return job