Files
offerpai_python_ai/app/services/skill_gap_service.py
T
2026-04-09 18:22:10 +08:00

356 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""技能差距分析 + 定制简历 Service
岗位技能差距分析 → 定制简历生成/查询/编辑/回滚 → AI 对话式编辑。
依赖:skill_gap_analyzerAI引擎)
使用表:bg_job(读)、bg_user_resume + 5张子表(读)
存储:Redis(定制简历 + 回滚数据)
"""
import asyncio
import json
import random
import string
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession
from app.ai.skill_gap_analyzer.analyzer import (
analyze_skill_gap, optimize_summary, optimize_module,
plan_edit, execute_module_edit,
)
from app.ai.skill_gap_analyzer.prompts import MODULE_SCHEMAS
from app.core.logger import log
from app.core.redis import redis_client
from app.schemas.skill_gap import (
CustomizeResume, ResumeProfile, Education, Work, Internship, Project, Competition, Paragraph,
)
from app.models.job import Job
from app.models.user_resume import UserResume
from app.models.user_resume_competition import UserResumeCompetition
from app.models.user_resume_education import UserResumeEducation
from app.models.user_resume_internship import UserResumeInternship
from app.models.user_resume_project import UserResumeProject
from app.models.user_resume_work import UserResumeWork
# Redis 常量
CUSTOMIZE_RESUME_KEY_PREFIX = "customize:resume:"
CUSTOMIZE_RESUME_EXPIRE = 12 * 60 * 60 # 12小时
CUSTOMIZE_RESUME_ROLLBACK_KEY_PREFIX = "customize:resume:rollback:"
CUSTOMIZE_RESUME_ROLLBACK_EXPIRE = 30 * 60 # 30分钟
_CHARS = string.ascii_letters + string.digits
def _rand_id() -> str:
"""生成随机8位字符串标识"""
return "".join(random.choices(_CHARS, k=8))
def _build_paragraphs(description: list[dict] | None) -> list[Paragraph]:
"""将数据库 description [{id, text}] 转为 Paragraph 列表,id 用随机8位替换"""
if not description:
return []
return [Paragraph(id=_rand_id(), text=item.get("text", "")) for item in description if isinstance(item, dict)]
def _build_resume_json(resume: UserResume, edu_list, work_list, intern_list, proj_list, comp_list) -> str:
"""拼装简历 JSON 字符串供 AI 使用"""
data = {
"skills": resume.skills or [],
"certificates": resume.certificates or [],
"summary": resume.summary or "",
"targetPosition": resume.target_position or "",
}
if edu_list:
data["education"] = [{"school": r.school, "major": r.major, "degree": r.degree, "description": r.description} for r in edu_list]
if work_list:
data["work"] = [{"companyName": r.company_name, "position": r.position, "description": r.description} for r in work_list]
if intern_list:
data["internship"] = [{"companyName": r.company_name, "position": r.position, "description": r.description} for r in intern_list]
if proj_list:
data["project"] = [{"companyName": r.company_name, "projectName": r.project_name, "role": r.role, "description": r.description} for r in proj_list]
if comp_list:
data["competition"] = [{"competitionName": r.competition_name, "award": r.award, "description": r.description} for r in comp_list]
return json.dumps(data, ensure_ascii=False)
class SkillGapService:
def __init__(self, session: AsyncSession):
self.session = session
# ===== 差距分析 =====
async def analyze_skill_gap(self, user_id: int, job_id: int) -> dict:
"""差距分析完整流程:查简历 → 查岗位 → AI分析 → 计算匹配分"""
# 1. 自动选择简历
resume = await self._pick_resume(user_id)
# 2. 查岗位
job = await self._get_job(job_id)
skill_tags: list[str] = job.skill_tags or []
# 3. skill_tags 为空 → 满分
if not skill_tags:
return self._gap_result(10.0, job, resume, [])
# 4. 查子表拼 AI 输入
edu, work, intern, proj, comp = await self._load_sub_tables(resume.id)
resume_json = _build_resume_json(resume, edu, work, intern, proj, comp)
# 5. AI 分析
missing = await analyze_skill_gap(skill_tags, resume_json)
# 6. 计算匹配分
score = round((len(skill_tags) - len(missing)) / len(skill_tags) * 10, 1)
return self._gap_result(score, job, resume, missing)
@staticmethod
def _gap_result(score: float, job: Job, resume: UserResume, missing: list[str]) -> dict:
return {
"score": score,
"job": {"jobId": str(job.id), "title": job.title, "skillTags": job.skill_tags or []},
"resume": {"resumeId": str(resume.id), "resumeName": resume.resume_name or "", "targetPosition": resume.target_position or ""},
"missingSkills": missing,
}
# ===== 生成定制简历 =====
async def generate_customize_resume(self, user_id: int, job_id: int, resume_id: int,
optimize_modules: list[str], add_skills: list[str]) -> None:
"""生成定制简历:查数据 → 并发AI优化 → 存Redis"""
if not optimize_modules:
raise ValueError("请至少选择一个优化模块")
# 1. 查简历 + 岗位
resume = await self._get_resume(resume_id, user_id)
job = await self._get_job(job_id)
edu_rows, work_rows, intern_rows, proj_rows, comp_rows = await self._load_sub_tables(resume.id)
# 2. 组装基础定制简历
cr = self._build_customize_resume(resume, edu_rows, work_rows, intern_rows, proj_rows, comp_rows)
# 3. 并发 AI 优化
tasks = []
job_desc = f"{job.description or ''}\n{job.requirement or ''}"
if "summary" in optimize_modules:
tasks.append(("summary", optimize_summary(job.title or "", add_skills, resume.summary or "")))
if "experience" in optimize_modules:
for module_name, rows_json in self._experience_tasks(cr, job.title or "", job_desc):
tasks.append((module_name, optimize_module(job.title or "", job_desc, rows_json)))
# 执行并发
if tasks:
keys = [t[0] for t in tasks]
results = await asyncio.gather(*[t[1] for t in tasks], return_exceptions=True)
for key, result in zip(keys, results):
if isinstance(result, Exception):
log.warning(f"定制简历优化[{key}]失败: {result}")
continue
self._apply_optimize_result(cr, key, result)
# 4. skills 追加(纯内存操作)
if "skills" in optimize_modules and add_skills:
existing = set(cr.resume.skills)
cr.resume.skills.extend([s for s in add_skills if s not in existing])
# 5. 存 Redis
await self._save_customize_resume(user_id, cr)
@staticmethod
def _experience_tasks(cr: CustomizeResume, job_title: str, job_desc: str) -> list[tuple[str, str]]:
"""构建各子表的 AI 优化任务列表"""
result = []
for name, items in [("education", cr.education), ("work", cr.work), ("internship", cr.internship),
("project", cr.project), ("competition", cr.competition)]:
if items:
result.append((name, json.dumps([item.model_dump(by_alias=True) for item in items], ensure_ascii=False)))
return result
@staticmethod
def _apply_optimize_result(cr: CustomizeResume, key: str, result) -> None:
"""将 AI 优化结果应用到定制简历"""
if key == "summary" and isinstance(result, str):
cr.resume.summary = result
elif key == "education" and isinstance(result, list):
cr.education = [Education.model_validate(item) for item in result]
elif key == "work" and isinstance(result, list):
cr.work = [Work.model_validate(item) for item in result]
elif key == "internship" and isinstance(result, list):
cr.internship = [Internship.model_validate(item) for item in result]
elif key == "project" and isinstance(result, list):
cr.project = [Project.model_validate(item) for item in result]
elif key == "competition" and isinstance(result, list):
cr.competition = [Competition.model_validate(item) for item in result]
# ===== 查询 / 修改 / 回滚 =====
async def get_customize_resume(self, user_id: int) -> dict | None:
"""查询定制简历"""
key = f"{CUSTOMIZE_RESUME_KEY_PREFIX}{user_id}"
data = await redis_client.get(key)
if not data:
return None
return CustomizeResume.model_validate_json(data).model_dump(by_alias=True)
async def update_customize_resume(self, user_id: int, data: dict) -> None:
"""手动编辑定制简历(整体覆盖)"""
cr = CustomizeResume.model_validate(data)
await self._save_customize_resume(user_id, cr)
async def rollback_customize_resume(self, user_id: int) -> None:
"""回滚定制简历"""
rollback_key = f"{CUSTOMIZE_RESUME_ROLLBACK_KEY_PREFIX}{user_id}"
data = await redis_client.get(rollback_key)
if not data:
raise ValueError("没有可回滚的版本")
key = f"{CUSTOMIZE_RESUME_KEY_PREFIX}{user_id}"
await redis_client.set(key, data, ex=CUSTOMIZE_RESUME_EXPIRE)
await redis_client.delete(rollback_key)
# ===== AI 对话编辑 =====
async def ai_edit_customize_resume(self, user_id: int, job_id: int,
instruction: str, chat_history: list[dict]) -> dict:
"""AI 对话式编辑定制简历"""
# 1. 取当前定制简历
key = f"{CUSTOMIZE_RESUME_KEY_PREFIX}{user_id}"
raw = await redis_client.get(key)
if not raw:
raise ValueError("定制简历不存在,请先生成")
cr = CustomizeResume.model_validate_json(raw)
resume_json = cr.model_dump_json(by_alias=True)
# 2. 查岗位
job = await self._get_job(job_id)
# 3. 规划 AI
history_str = json.dumps(chat_history, ensure_ascii=False) if chat_history else ""
plan = await plan_edit(job.title or "", resume_json, history_str, instruction)
if not plan:
return {"type": "message", "message": "抱歉,我没有理解你的意思,请再描述一下。"}
if plan.get("action") == "chat":
return {"type": "message", "message": plan.get("message", "请再描述一下你的需求。")}
# 4. 按模块并发执行修改
modules = plan.get("modules", [])
if not modules:
return {"type": "message", "message": plan.get("message", "请再描述一下你的需求。")}
edit_tasks = []
for m in modules:
mod_name = m.get("module", "")
mod_instr = m.get("instruction", "")
schema = MODULE_SCHEMAS.get(mod_name, "")
mod_data = self._get_module_data(cr, mod_name)
edit_tasks.append((mod_name, execute_module_edit(job.title or "", mod_instr, schema, mod_data)))
keys = [t[0] for t in edit_tasks]
results = await asyncio.gather(*[t[1] for t in edit_tasks], return_exceptions=True)
# 5. 合并结果
for mod_key, result in zip(keys, results):
if isinstance(result, Exception):
log.warning(f"AI编辑模块[{mod_key}]失败: {result}")
continue
if result is None:
continue
self._apply_edit_result(cr, mod_key, result)
# 6. 保存回滚 + 新版本
rollback_key = f"{CUSTOMIZE_RESUME_ROLLBACK_KEY_PREFIX}{user_id}"
await redis_client.set(rollback_key, raw, ex=CUSTOMIZE_RESUME_ROLLBACK_EXPIRE)
await self._save_customize_resume(user_id, cr)
label = plan.get("updatedModulesLabel", "简历内容")
return {"type": "updated", "message": f"完成!已更新:{label}"}
@staticmethod
def _get_module_data(cr: CustomizeResume, mod_name: str) -> str:
"""获取指定模块的 JSON 数据"""
if mod_name == "resume":
return cr.resume.model_dump_json(by_alias=True)
mapping = {"education": cr.education, "work": cr.work, "internship": cr.internship,
"project": cr.project, "competition": cr.competition}
items = mapping.get(mod_name, [])
return json.dumps([item.model_dump(by_alias=True) for item in items], ensure_ascii=False)
@staticmethod
def _apply_edit_result(cr: CustomizeResume, mod_name: str, result) -> None:
"""将 AI 编辑结果应用到定制简历"""
try:
if mod_name == "resume" and isinstance(result, dict):
cr.resume = ResumeProfile.model_validate(result)
elif mod_name == "education" and isinstance(result, list):
cr.education = [Education.model_validate(item) for item in result]
elif mod_name == "work" and isinstance(result, list):
cr.work = [Work.model_validate(item) for item in result]
elif mod_name == "internship" and isinstance(result, list):
cr.internship = [Internship.model_validate(item) for item in result]
elif mod_name == "project" and isinstance(result, list):
cr.project = [Project.model_validate(item) for item in result]
elif mod_name == "competition" and isinstance(result, list):
cr.competition = [Competition.model_validate(item) for item in result]
except Exception as e:
log.warning(f"应用AI编辑结果[{mod_name}]失败: {e}")
# ===== 内部工具方法 =====
async def _pick_resume(self, user_id: int) -> UserResume:
"""自动选择简历:先查默认,再查最新"""
result = await self.session.execute(
select(UserResume).where(UserResume.user_id == user_id, UserResume.is_default == 1)
.order_by(desc(UserResume.update_time)).limit(1))
resume = result.scalar_one_or_none()
if not resume:
result = await self.session.execute(
select(UserResume).where(UserResume.user_id == user_id)
.order_by(desc(UserResume.update_time)).limit(1))
resume = result.scalar_one_or_none()
if not resume:
raise ValueError("请先创建简历")
return resume
async def _get_resume(self, resume_id: int, user_id: int) -> UserResume:
"""查指定简历"""
result = await self.session.execute(
select(UserResume).where(UserResume.id == resume_id, UserResume.user_id == user_id))
resume = result.scalar_one_or_none()
if not resume:
raise ValueError("简历不存在")
return resume
async def _get_job(self, job_id: int) -> Job:
"""查岗位"""
result = await self.session.execute(select(Job).where(Job.id == job_id))
job = result.scalar_one_or_none()
if not job:
raise ValueError("岗位不存在")
return job
async def _load_sub_tables(self, resume_id: int):
"""查询简历5张子表"""
edu = (await self.session.execute(select(UserResumeEducation).where(UserResumeEducation.resume_id == resume_id))).scalars().all()
work = (await self.session.execute(select(UserResumeWork).where(UserResumeWork.resume_id == resume_id))).scalars().all()
intern = (await self.session.execute(select(UserResumeInternship).where(UserResumeInternship.resume_id == resume_id))).scalars().all()
proj = (await self.session.execute(select(UserResumeProject).where(UserResumeProject.resume_id == resume_id))).scalars().all()
comp = (await self.session.execute(select(UserResumeCompetition).where(UserResumeCompetition.resume_id == resume_id))).scalars().all()
return edu, work, intern, proj, comp
def _build_customize_resume(self, resume: UserResume, edu_rows, work_rows,
intern_rows, proj_rows, comp_rows) -> CustomizeResume:
"""从数据库记录组装 CustomizeResume"""
profile = ResumeProfile(
avatarUrl=resume.avatar_url or "", name=resume.name or "", email=resume.email or "",
mobileNumber=resume.mobile_number or "", city=resume.city or "",
wechatNumber=resume.wechat_number or "", portfolioUrl=resume.portfolio_url or "",
skills=resume.skills or [], certificates=resume.certificates or [],
summary=resume.summary or "",
)
return CustomizeResume(
resume=profile,
education=[Education(id=_rand_id(), school=r.school or "", major=r.major or "",
degree=r.degree or "", studyType=r.study_type or "",
startDate=r.start_date or "", endDate=r.end_date or "",
description=_build_paragraphs(r.description)) for r in edu_rows],
work=[Work(id=_rand_id(), companyName=r.company_name or "", position=r.position or "",
startDate=r.start_date or "", endDate=r.end_date or "",
description=_build_paragraphs(r.description)) for r in work_rows],
internship=[Internship(id=_rand_id(), companyName=r.company_name or "", position=r.position or "",
startDate=r.start_date or "", endDate=r.end_date or "",
description=_build_paragraphs(r.description)) for r in intern_rows],
project=[Project(id=_rand_id(), companyName=r.company_name or "", projectName=r.project_name or "",
role=r.role or "", startDate=r.start_date or "", endDate=r.end_date or "",
description=_build_paragraphs(r.description)) for r in proj_rows],
competition=[Competition(id=_rand_id(), competitionName=r.competition_name or "", award=r.award or "",
awardDate=r.award_date or "",
description=_build_paragraphs(r.description)) for r in comp_rows],
)
@staticmethod
async def _save_customize_resume(user_id: int, cr: CustomizeResume) -> None:
"""存定制简历到 Redis"""
key = f"{CUSTOMIZE_RESUME_KEY_PREFIX}{user_id}"
await redis_client.set(key, cr.model_dump_json(by_alias=True), ex=CUSTOMIZE_RESUME_EXPIRE)