Files
2026-05-13 15:41:56 +08:00

356 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""技能差距分析 + 定制简历 Service
岗位技能差距分析 → 定制简历生成/查询/编辑/回滚 → AI 对话式编辑。
依赖:skill_gap_analyzerAI引擎)
使用表:bg_job(读)、bg_user_resume + 5张子表(读)
存储:Redis(定制简历 + 回滚数据)
"""
import asyncio
import json
import time
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.ai.skill_gap_analyzer.analyzer import (
analyze_skill_gap, optimize_summary, optimize_module,
plan_edit, execute_record_edit, execute_record_add,
)
from app.ai.skill_gap_analyzer.prompts import MODULE_SCHEMAS
from app.core.logger import log
from app.schemas.customize_resume import (
CustomizeResume, ResumeProfile, Education, Work, Internship, Project, Competition,
)
from app.models.job import Job
from app.models.user_resume import UserResume
from app.services.resume_loader import ResumeDetail, load_resume_detail, load_default_resume_detail
from app.services import customize_resume_store
# 模块名 → 中文标签映射
_MODULE_LABELS = {
"resume": "个人简介",
"education": "教育经历",
"work": "工作经历",
"internship": "实习经历",
"project": "项目经历",
"competition": "竞赛经历",
}
def _build_resume_json(detail: ResumeDetail) -> str:
"""拼装简历 JSON 字符串供 AI 使用"""
resume = detail.resume
data = {
"skills": resume.skills or [],
"certificates": resume.certificates or [],
"summary": resume.summary or "",
"targetPosition": resume.target_position or "",
}
if detail.education:
data["education"] = [{"school": r.school, "major": r.major, "degree": r.degree, "description": r.description} for r in detail.education]
if detail.work:
data["work"] = [{"companyName": r.company_name, "position": r.position, "description": r.description} for r in detail.work]
if detail.internship:
data["internship"] = [{"companyName": r.company_name, "position": r.position, "description": r.description} for r in detail.internship]
if detail.project:
data["project"] = [{"companyName": r.company_name, "projectName": r.project_name, "role": r.role, "description": r.description} for r in detail.project]
if detail.competition:
data["competition"] = [{"competitionName": r.competition_name, "award": r.award, "description": r.description} for r in detail.competition]
return json.dumps(data, ensure_ascii=False)
class SkillGapService:
def __init__(self, session: AsyncSession):
self.session = session
# ===== 差距分析 =====
async def analyze_skill_gap(self, user_id: int, job_id: int) -> dict:
"""差距分析完整流程:查简历 → 查岗位 → AI分析 → 计算匹配分"""
# 1. 自动选择简历
detail = await load_default_resume_detail(self.session, user_id)
# 2. 查岗位
job = await self._get_job(job_id)
skill_tags: list[str] = job.skill_tags or []
# 3. skill_tags 为空 → 满分
if not skill_tags:
return self._gap_result(10.0, job, detail.resume, [])
# 4. 拼 AI 输入
resume_json = _build_resume_json(detail)
# 5. AI 分析
missing = await analyze_skill_gap(skill_tags, resume_json)
# 6. 计算匹配分
score = round((len(skill_tags) - len(missing)) / len(skill_tags) * 10, 1)
return self._gap_result(score, job, detail.resume, missing)
@staticmethod
def _gap_result(score: float, job: Job, resume: UserResume, missing: list[str]) -> dict:
return {
"score": score,
"job": {"jobId": str(job.id), "title": job.title, "skillTags": job.skill_tags or []},
"resume": {"resumeId": str(resume.id), "resumeName": resume.resume_name or "", "targetPosition": resume.target_position or ""},
"missingSkills": missing,
}
# ===== 生成定制简历 =====
async def generate_customize_resume(self, user_id: int, job_id: int, resume_id: int,
optimize_modules: list[str], add_skills: list[str]) -> None:
"""生成定制简历:查数据 → 按单条记录并发AI优化 → 存数据库"""
if not optimize_modules:
raise ValueError("请至少选择一个优化模块")
# 1. 查简历 + 岗位
detail = await load_resume_detail(self.session, resume_id, user_id)
job = await self._get_job(job_id)
# 2. 组装基础定制简历
cr = customize_resume_store.build_from_detail(detail)
# 3. 构建并发任务(按单条记录粒度)
job_desc = f"{job.description or ''}\n{job.requirement or ''}"
tasks: list[tuple[str, int, object]] = []
if "summary" in optimize_modules:
tasks.append(("summary", 0, optimize_summary(job.title or "", add_skills, detail.resume.summary or "")))
if "experience" in optimize_modules:
for mod_name, idx, record_json in self._experience_tasks(cr):
tasks.append((mod_name, idx, optimize_module(job.title or "", job_desc, record_json)))
log.info(f"定制简历优化开始: {len(tasks)}个并发任务 [modules={optimize_modules}, job={job_id}, resume={resume_id}]")
# 4. 并发执行
if tasks:
t0 = time.monotonic()
results = await asyncio.gather(*[t[2] for t in tasks], return_exceptions=True)
log.info(f"定制简历优化全部完成, 总耗时={round(time.monotonic() - t0, 2)}s")
for (mod_name, idx, _), result in zip(tasks, results):
if isinstance(result, Exception) or result is None:
continue
self._apply_optimize_result(cr, mod_name, idx, result)
# 5. skills 追加
if "skills" in optimize_modules and add_skills:
existing = set(cr.resume.skills)
new_skills = [s for s in add_skills if s not in existing]
cr.resume.skills.extend(new_skills)
if new_skills:
log.info(f"定制简历追加技能: {new_skills}")
# 6. 存数据库
await customize_resume_store.save(user_id, job_id, cr)
log.info(f"定制简历已保存 [user={user_id}, job={job_id}]")
@staticmethod
def _experience_tasks(cr: CustomizeResume) -> list[tuple[str, int, str]]:
"""构建各子表的 AI 优化任务列表,按单条记录拆分"""
result: list[tuple[str, int, str]] = []
for name, items in [("education", cr.education), ("work", cr.work), ("internship", cr.internship),
("project", cr.project), ("competition", cr.competition)]:
for idx, item in enumerate(items or []):
result.append((name, idx, json.dumps(item.model_dump(by_alias=True), ensure_ascii=False)))
return result
@staticmethod
def _apply_optimize_result(cr: CustomizeResume, key: str, idx: int, result) -> None:
"""将 AI 优化结果应用到定制简历(单条记录粒度)"""
if key == "summary" and isinstance(result, str):
cr.resume.summary = result
return
model_map = {"education": Education, "work": Work, "internship": Internship, "project": Project, "competition": Competition}
list_map = {"education": cr.education, "work": cr.work, "internship": cr.internship, "project": cr.project, "competition": cr.competition}
model_cls = model_map.get(key)
items = list_map.get(key)
if model_cls is None or items is None:
log.warning(f"未知优化模块: {key}")
return
if isinstance(result, dict):
try:
items[idx] = model_cls.model_validate(result)
except (IndexError, Exception) as e:
log.warning(f"应用优化结果[{key}[{idx}]]失败: {e}")
elif isinstance(result, list) and len(result) == 1 and isinstance(result[0], dict):
# 兼容 LLM 偶尔返回单元素数组的情况
try:
items[idx] = model_cls.model_validate(result[0])
except (IndexError, Exception) as e:
log.warning(f"应用优化结果[{key}[{idx}]]失败(数组兼容): {e}")
else:
log.warning(f"优化结果格式异常[{key}[{idx}]]: type={type(result).__name__}")
# ===== AI 对话编辑 =====
async def ai_edit_customize_resume(self, user_id: int, job_id: int,
instruction: str, chat_history: list) -> dict:
"""AI 对话式编辑定制简历(原子化操作版)"""
log.info(f"AI编辑定制简历开始 [user={user_id}, job={job_id}, instruction={instruction[:50]}]")
t_start = time.monotonic()
# 1. 取当前定制简历
cr_data = await customize_resume_store.get(user_id, job_id)
if not cr_data:
raise ValueError("定制简历不存在,请先生成")
cr = CustomizeResume.model_validate(cr_data)
resume_json = cr.model_dump_json(by_alias=True)
# 2. 查岗位
job = await self._get_job(job_id)
job_desc = f"{job.description or ''}\n{job.requirement or ''}"
# 3. 规划 AI(意图识别 + 操作原子化)
history_str = json.dumps(chat_history, ensure_ascii=False) if chat_history else ""
t0 = time.monotonic()
plan = await plan_edit(job.title or "", job_desc, resume_json, history_str, instruction)
log.info(f"AI编辑规划完成 ({round(time.monotonic() - t0, 2)}s), plan_action={plan.get('action') if plan else None} , 详情计划:{plan}")
if not plan:
return {"type": "message", "message": "抱歉,我没有理解你的意思,请再描述一下。"}
if plan.get("action") == "chat":
return {"type": "message", "message": plan.get("message", "请再描述一下你的需求。")}
# 4. 解析操作列表
operations = plan.get("operations", [])
if not operations:
return {"type": "message", "message": plan.get("message", "请再描述一下你的需求。")}
log.info(f"AI编辑操作列表: {len(operations)}个操作 {[op.get('type') + '/' + op.get('module', '') for op in operations]}")
# 截取最近10条对话历史
recent_history = chat_history[-10:] if len(chat_history) > 10 else chat_history
recent_history_str = json.dumps(recent_history, ensure_ascii=False) if recent_history else ""
# 5. 按操作类型分发执行
# 先处理 delete(零 AI 开销)
delete_count = 0
for op in operations:
if op.get("type") == "delete":
self._apply_delete(cr, op.get("module", ""), op.get("id", ""))
delete_count += 1
if delete_count:
log.info(f"AI编辑删除操作完成: {delete_count}条记录")
# 并发执行 update 和 add
ai_tasks = []
for op in operations:
op_type = op.get("type", "")
mod_name = op.get("module", "")
op_instruction = op.get("instruction", "")
schema = MODULE_SCHEMAS.get(mod_name, "")
if op_type == "update":
record_data = self._get_record_data(cr, mod_name, op.get("id"))
if record_data is not None:
ai_tasks.append((
"update", mod_name, op.get("id"),
execute_record_edit(
job.title or "", job_desc, op_instruction,
recent_history_str, schema, record_data,
),
))
elif op_type == "add":
ai_tasks.append((
"add", mod_name, None,
execute_record_add(
job.title or "", job_desc, op_instruction,
recent_history_str, schema,
),
))
# 并发执行
if ai_tasks:
log.info(f"AI编辑执行开始: {len(ai_tasks)}个并发任务")
t0 = time.monotonic()
coros = [t[3] for t in ai_tasks]
results = await asyncio.gather(*coros, return_exceptions=True)
log.info(f"AI编辑执行完成, 耗时={round(time.monotonic() - t0, 2)}s")
for (op_type, mod_name, record_id, _), result in zip(ai_tasks, results):
if isinstance(result, Exception):
log.warning(f"AI编辑[{op_type}/{mod_name}/{record_id}]失败: {result}")
continue
if result is None:
continue
if op_type == "update":
self._apply_record_update(cr, mod_name, record_id, result)
elif op_type == "add":
self._apply_record_add(cr, mod_name, result)
# 6. 保存(自动备份回滚)
await customize_resume_store.save(user_id, job_id, cr)
# 拼接更新模块标签
updated_modules = list(dict.fromkeys(op.get("module", "") for op in operations))
label = "".join(_MODULE_LABELS.get(m, m) for m in updated_modules if m)
log.info(f"AI编辑定制简历完成 [user={user_id}, job={job_id}], 总耗时={round(time.monotonic() - t_start, 2)}s, 更新模块={label}")
return {"type": "updated", "message": f"完成!已更新:{label or '简历内容'}"}
@staticmethod
def _get_record_data(cr: CustomizeResume, mod_name: str, record_id: str | None) -> str | None:
"""获取单条记录的 JSON 数据,resume 主表返回整个对象"""
if mod_name == "resume":
return cr.resume.model_dump_json(by_alias=True)
mapping = {
"education": cr.education, "work": cr.work, "internship": cr.internship,
"project": cr.project, "competition": cr.competition,
}
items = mapping.get(mod_name, [])
if not record_id:
return None
for item in items:
if item.id == record_id:
return item.model_dump_json(by_alias=True)
log.warning(f"未找到记录[{mod_name}/{record_id}]")
return None
@staticmethod
def _apply_delete(cr: CustomizeResume, mod_name: str, record_id: str) -> None:
"""删除指定模块中的一条记录"""
if not record_id or mod_name == "resume":
return
mapping = {
"education": cr.education, "work": cr.work, "internship": cr.internship,
"project": cr.project, "competition": cr.competition,
}
items = mapping.get(mod_name)
if items is not None:
for i, item in enumerate(items):
if item.id == record_id:
items.pop(i)
break
@staticmethod
def _apply_record_update(cr: CustomizeResume, mod_name: str, record_id: str | None, result) -> None:
"""将 AI 修改结果替换回对应记录"""
try:
if mod_name == "resume" and isinstance(result, dict):
cr.resume = ResumeProfile.model_validate(result)
return
model_map = {
"education": Education, "work": Work, "internship": Internship,
"project": Project, "competition": Competition,
}
model_cls = model_map.get(mod_name)
if not model_cls or not isinstance(result, dict) or not record_id:
return
list_map = {
"education": cr.education, "work": cr.work, "internship": cr.internship,
"project": cr.project, "competition": cr.competition,
}
items = list_map.get(mod_name, [])
new_item = model_cls.model_validate(result)
for i, item in enumerate(items):
if item.id == record_id:
items[i] = new_item
break
except Exception as e:
log.warning(f"应用AI编辑结果[{mod_name}/{record_id}]失败: {e}")
@staticmethod
def _apply_record_add(cr: CustomizeResume, mod_name: str, result) -> None:
"""将 AI 新增的记录追加到对应模块"""
try:
model_map = {
"education": (Education, cr.education),
"work": (Work, cr.work),
"internship": (Internship, cr.internship),
"project": (Project, cr.project),
"competition": (Competition, cr.competition),
}
entry = model_map.get(mod_name)
if not entry or not isinstance(result, dict):
return
model_cls, items = entry
items.append(model_cls.model_validate(result))
except Exception as e:
log.warning(f"应用AI新增记录[{mod_name}]失败: {e}")
# ===== 内部工具方法 =====
async def _get_job(self, job_id: int) -> Job:
"""查岗位"""
result = await self.session.execute(select(Job).where(Job.id == job_id))
job = result.scalar_one_or_none()
if not job:
raise ValueError("岗位不存在")
return job