"""技能差距分析 + 定制简历 Service 岗位技能差距分析 → 定制简历生成/查询/编辑/回滚 → AI 对话式编辑。 依赖:skill_gap_analyzer(AI引擎) 使用表:bg_job(读)、bg_user_resume + 5张子表(读) 存储:Redis(定制简历 + 回滚数据) """ import asyncio import json import random import string from sqlalchemy import select, desc from sqlalchemy.ext.asyncio import AsyncSession from app.ai.skill_gap_analyzer.analyzer import ( analyze_skill_gap, optimize_summary, optimize_module, plan_edit, execute_module_edit, ) from app.ai.skill_gap_analyzer.prompts import MODULE_SCHEMAS from app.core.logger import log from app.core.redis import RedisManager from app.schemas.skill_gap import ( CustomizeResume, ResumeProfile, Education, Work, Internship, Project, Competition, Paragraph, ) from app.models.job import Job from app.models.user_resume import UserResume from app.models.user_resume_competition import UserResumeCompetition from app.models.user_resume_education import UserResumeEducation from app.models.user_resume_internship import UserResumeInternship from app.models.user_resume_project import UserResumeProject from app.models.user_resume_work import UserResumeWork # Redis 常量 CUSTOMIZE_RESUME_KEY_PREFIX = "customize:resume:" CUSTOMIZE_RESUME_EXPIRE = 12 * 60 * 60 # 12小时 CUSTOMIZE_RESUME_ROLLBACK_KEY_PREFIX = "customize:resume:rollback:" CUSTOMIZE_RESUME_ROLLBACK_EXPIRE = 30 * 60 # 30分钟 _CHARS = string.ascii_letters + string.digits def _rand_id() -> str: """生成随机8位字符串标识""" return "".join(random.choices(_CHARS, k=8)) def _build_paragraphs(description: list[dict] | None) -> list[Paragraph]: """将数据库 description [{id, text}] 转为 Paragraph 列表,id 用随机8位替换""" if not description: return [] return [Paragraph(id=_rand_id(), text=item.get("text", "")) for item in description if isinstance(item, dict)] def _build_resume_json(resume: UserResume, edu_list, work_list, intern_list, proj_list, comp_list) -> str: """拼装简历 JSON 字符串供 AI 使用""" data = { "skills": resume.skills or [], "certificates": resume.certificates or [], "summary": resume.summary or "", "targetPosition": resume.target_position or "", } if edu_list: data["education"] = [{"school": r.school, "major": r.major, "degree": r.degree, "description": r.description} for r in edu_list] if work_list: data["work"] = [{"companyName": r.company_name, "position": r.position, "description": r.description} for r in work_list] if intern_list: data["internship"] = [{"companyName": r.company_name, "position": r.position, "description": r.description} for r in intern_list] if proj_list: data["project"] = [{"companyName": r.company_name, "projectName": r.project_name, "role": r.role, "description": r.description} for r in proj_list] if comp_list: data["competition"] = [{"competitionName": r.competition_name, "award": r.award, "description": r.description} for r in comp_list] return json.dumps(data, ensure_ascii=False) class SkillGapService: def __init__(self, session: AsyncSession): self.session = session # ===== 差距分析 ===== async def analyze_skill_gap(self, user_id: int, job_id: int) -> dict: """差距分析完整流程:查简历 → 查岗位 → AI分析 → 计算匹配分""" # 1. 自动选择简历 resume = await self._pick_resume(user_id) # 2. 查岗位 job = await self._get_job(job_id) skill_tags: list[str] = job.skill_tags or [] # 3. skill_tags 为空 → 满分 if not skill_tags: return self._gap_result(10.0, job, resume, []) # 4. 查子表拼 AI 输入 edu, work, intern, proj, comp = await self._load_sub_tables(resume.id) resume_json = _build_resume_json(resume, edu, work, intern, proj, comp) # 5. AI 分析 missing = await analyze_skill_gap(skill_tags, resume_json) # 6. 计算匹配分 score = round((len(skill_tags) - len(missing)) / len(skill_tags) * 10, 1) return self._gap_result(score, job, resume, missing) @staticmethod def _gap_result(score: float, job: Job, resume: UserResume, missing: list[str]) -> dict: return { "score": score, "job": {"jobId": str(job.id), "title": job.title, "skillTags": job.skill_tags or []}, "resume": {"resumeId": str(resume.id), "resumeName": resume.resume_name or "", "targetPosition": resume.target_position or ""}, "missingSkills": missing, } # ===== 生成定制简历 ===== async def generate_customize_resume(self, user_id: int, job_id: int, resume_id: int, optimize_modules: list[str], add_skills: list[str]) -> None: """生成定制简历:查数据 → 并发AI优化 → 存Redis""" if not optimize_modules: raise ValueError("请至少选择一个优化模块") # 1. 查简历 + 岗位 resume = await self._get_resume(resume_id, user_id) job = await self._get_job(job_id) edu_rows, work_rows, intern_rows, proj_rows, comp_rows = await self._load_sub_tables(resume.id) # 2. 组装基础定制简历 cr = self._build_customize_resume(resume, edu_rows, work_rows, intern_rows, proj_rows, comp_rows) # 3. 并发 AI 优化 tasks = [] job_desc = f"{job.description or ''}\n{job.requirement or ''}" if "summary" in optimize_modules: tasks.append(("summary", optimize_summary(job.title or "", add_skills, resume.summary or ""))) if "experience" in optimize_modules: for module_name, rows_json in self._experience_tasks(cr, job.title or "", job_desc): tasks.append((module_name, optimize_module(job.title or "", job_desc, rows_json))) # 执行并发 if tasks: keys = [t[0] for t in tasks] results = await asyncio.gather(*[t[1] for t in tasks], return_exceptions=True) for key, result in zip(keys, results): if isinstance(result, Exception): log.warning(f"定制简历优化[{key}]失败: {result}") continue self._apply_optimize_result(cr, key, result) # 4. skills 追加(纯内存操作) if "skills" in optimize_modules and add_skills: existing = set(cr.resume.skills) cr.resume.skills.extend([s for s in add_skills if s not in existing]) # 5. 存 Redis await self._save_customize_resume(user_id, cr) @staticmethod def _experience_tasks(cr: CustomizeResume, job_title: str, job_desc: str) -> list[tuple[str, str]]: """构建各子表的 AI 优化任务列表""" result = [] for name, items in [("education", cr.education), ("work", cr.work), ("internship", cr.internship), ("project", cr.project), ("competition", cr.competition)]: if items: result.append((name, json.dumps([item.model_dump(by_alias=True) for item in items], ensure_ascii=False))) return result @staticmethod def _apply_optimize_result(cr: CustomizeResume, key: str, result) -> None: """将 AI 优化结果应用到定制简历""" if key == "summary" and isinstance(result, str): cr.resume.summary = result elif key == "education" and isinstance(result, list): cr.education = [Education.model_validate(item) for item in result] elif key == "work" and isinstance(result, list): cr.work = [Work.model_validate(item) for item in result] elif key == "internship" and isinstance(result, list): cr.internship = [Internship.model_validate(item) for item in result] elif key == "project" and isinstance(result, list): cr.project = [Project.model_validate(item) for item in result] elif key == "competition" and isinstance(result, list): cr.competition = [Competition.model_validate(item) for item in result] # ===== 查询 / 修改 / 回滚 ===== async def get_customize_resume(self, user_id: int) -> dict | None: """查询定制简历""" key = f"{CUSTOMIZE_RESUME_KEY_PREFIX}{user_id}" data = await RedisManager.client.get(key) if data: return CustomizeResume.model_validate_json(data).model_dump(by_alias=True) return None async def update_customize_resume(self, user_id: int, data: dict) -> None: """手动编辑定制简历(整体覆盖)""" cr = CustomizeResume.model_validate(data) await self._save_customize_resume(user_id, cr) async def rollback_customize_resume(self, user_id: int) -> None: """回滚定制简历""" rollback_key = f"{CUSTOMIZE_RESUME_ROLLBACK_KEY_PREFIX}{user_id}" data = await RedisManager.client.get(rollback_key) if not data: raise ValueError("没有可回滚的版本") key = f"{CUSTOMIZE_RESUME_KEY_PREFIX}{user_id}" await RedisManager.client.set(key, data, ex=CUSTOMIZE_RESUME_EXPIRE) await RedisManager.client.delete(rollback_key) # ===== AI 对话编辑 ===== async def ai_edit_customize_resume(self, user_id: int, job_id: int, instruction: str, chat_history: list) -> dict: """AI 对话式编辑定制简历""" # 1. 取当前定制简历 key = f"{CUSTOMIZE_RESUME_KEY_PREFIX}{user_id}" raw = await RedisManager.client.get(key) if not raw: raise ValueError("定制简历不存在,请先生成") cr = CustomizeResume.model_validate_json(raw) resume_json = cr.model_dump_json(by_alias=True) # 2. 查岗位 job = await self._get_job(job_id) # 3. 规划 AI history_str = json.dumps(chat_history, ensure_ascii=False) if chat_history else "无" plan = await plan_edit(job.title or "", resume_json, history_str, instruction) if not plan: return {"type": "message", "message": "抱歉,我没有理解你的意思,请再描述一下。"} if plan.get("action") == "chat": return {"type": "message", "message": plan.get("message", "请再描述一下你的需求。")} # 4. 按模块并发执行修改 modules = plan.get("modules", []) if not modules: return {"type": "message", "message": plan.get("message", "请再描述一下你的需求。")} edit_tasks = [] for m in modules: mod_name = m.get("module", "") mod_instr = m.get("instruction", "") schema = MODULE_SCHEMAS.get(mod_name, "") mod_data = self._get_module_data(cr, mod_name) edit_tasks.append((mod_name, execute_module_edit(job.title or "", mod_instr, schema, mod_data))) keys = [t[0] for t in edit_tasks] results = await asyncio.gather(*[t[1] for t in edit_tasks], return_exceptions=True) # 5. 合并结果 for mod_key, result in zip(keys, results): if isinstance(result, Exception): log.warning(f"AI编辑模块[{mod_key}]失败: {result}") continue if result is None: continue self._apply_edit_result(cr, mod_key, result) # 6. 保存回滚 + 新版本 rollback_key = f"{CUSTOMIZE_RESUME_ROLLBACK_KEY_PREFIX}{user_id}" await RedisManager.client.set(rollback_key, raw, ex=CUSTOMIZE_RESUME_ROLLBACK_EXPIRE) await self._save_customize_resume(user_id, cr) label = plan.get("updatedModulesLabel", "简历内容") return {"type": "updated", "message": f"完成!已更新:{label}"} @staticmethod def _get_module_data(cr: CustomizeResume, mod_name: str) -> str: """获取指定模块的 JSON 数据""" if mod_name == "resume": return cr.resume.model_dump_json(by_alias=True) mapping = {"education": cr.education, "work": cr.work, "internship": cr.internship, "project": cr.project, "competition": cr.competition} items = mapping.get(mod_name, []) return json.dumps([item.model_dump(by_alias=True) for item in items], ensure_ascii=False) @staticmethod def _apply_edit_result(cr: CustomizeResume, mod_name: str, result) -> None: """将 AI 编辑结果应用到定制简历""" try: if mod_name == "resume" and isinstance(result, dict): cr.resume = ResumeProfile.model_validate(result) elif mod_name == "education" and isinstance(result, list): cr.education = [Education.model_validate(item) for item in result] elif mod_name == "work" and isinstance(result, list): cr.work = [Work.model_validate(item) for item in result] elif mod_name == "internship" and isinstance(result, list): cr.internship = [Internship.model_validate(item) for item in result] elif mod_name == "project" and isinstance(result, list): cr.project = [Project.model_validate(item) for item in result] elif mod_name == "competition" and isinstance(result, list): cr.competition = [Competition.model_validate(item) for item in result] except Exception as e: log.warning(f"应用AI编辑结果[{mod_name}]失败: {e}") # ===== 内部工具方法 ===== async def _pick_resume(self, user_id: int) -> UserResume: """自动选择简历:先查默认,再查最新""" result = await self.session.execute( select(UserResume).where(UserResume.user_id == user_id, UserResume.is_default == 1) .order_by(desc(UserResume.update_time)).limit(1)) resume = result.scalar_one_or_none() if not resume: result = await self.session.execute( select(UserResume).where(UserResume.user_id == user_id) .order_by(desc(UserResume.update_time)).limit(1)) resume = result.scalar_one_or_none() if not resume: raise ValueError("请先创建简历") return resume async def _get_resume(self, resume_id: int, user_id: int) -> UserResume: """查指定简历""" result = await self.session.execute( select(UserResume).where(UserResume.id == resume_id, UserResume.user_id == user_id)) resume = result.scalar_one_or_none() if not resume: raise ValueError("简历不存在") return resume async def _get_job(self, job_id: int) -> Job: """查岗位""" result = await self.session.execute(select(Job).where(Job.id == job_id)) job = result.scalar_one_or_none() if not job: raise ValueError("岗位不存在") return job async def _load_sub_tables(self, resume_id: int): """查询简历5张子表""" edu = (await self.session.execute(select(UserResumeEducation).where(UserResumeEducation.resume_id == resume_id))).scalars().all() work = (await self.session.execute(select(UserResumeWork).where(UserResumeWork.resume_id == resume_id))).scalars().all() intern = (await self.session.execute(select(UserResumeInternship).where(UserResumeInternship.resume_id == resume_id))).scalars().all() proj = (await self.session.execute(select(UserResumeProject).where(UserResumeProject.resume_id == resume_id))).scalars().all() comp = (await self.session.execute(select(UserResumeCompetition).where(UserResumeCompetition.resume_id == resume_id))).scalars().all() return edu, work, intern, proj, comp def _build_customize_resume(self, resume: UserResume, edu_rows, work_rows, intern_rows, proj_rows, comp_rows) -> CustomizeResume: """从数据库记录组装 CustomizeResume""" profile = ResumeProfile( avatarUrl=resume.avatar_url or "", name=resume.name or "", email=resume.email or "", mobileNumber=resume.mobile_number or "", city=resume.city or "", wechatNumber=resume.wechat_number or "", portfolioUrl=resume.portfolio_url or "", skills=resume.skills or [], certificates=resume.certificates or [], summary=resume.summary or "", ) return CustomizeResume( resume=profile, education=[Education(id=_rand_id(), school=r.school or "", major=r.major or "", degree=r.degree or "", studyType=r.study_type or "", startDate=r.start_date or "", endDate=r.end_date or "", description=_build_paragraphs(r.description)) for r in edu_rows], work=[Work(id=_rand_id(), companyName=r.company_name or "", position=r.position or "", startDate=r.start_date or "", endDate=r.end_date or "", description=_build_paragraphs(r.description)) for r in work_rows], internship=[Internship(id=_rand_id(), companyName=r.company_name or "", position=r.position or "", startDate=r.start_date or "", endDate=r.end_date or "", description=_build_paragraphs(r.description)) for r in intern_rows], project=[Project(id=_rand_id(), companyName=r.company_name or "", projectName=r.project_name or "", role=r.role or "", startDate=r.start_date or "", endDate=r.end_date or "", description=_build_paragraphs(r.description)) for r in proj_rows], competition=[Competition(id=_rand_id(), competitionName=r.competition_name or "", award=r.award or "", awardDate=r.award_date or "", description=_build_paragraphs(r.description)) for r in comp_rows], ) @staticmethod async def _save_customize_resume(user_id: int, cr: CustomizeResume) -> None: """存定制简历到 Redis""" key = f"{CUSTOMIZE_RESUME_KEY_PREFIX}{user_id}" await RedisManager.client.set(key, cr.model_dump_json(by_alias=True), ex=CUSTOMIZE_RESUME_EXPIRE)