"""技能差距分析 + 定制简历 Service 岗位技能差距分析 → 定制简历生成/查询/编辑/回滚 → AI 对话式编辑。 依赖:skill_gap_analyzer(AI引擎) 使用表:bg_job(读)、bg_user_resume + 5张子表(读) 存储:Redis(定制简历 + 回滚数据) """ import asyncio import json import random import string from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.ai.skill_gap_analyzer.analyzer import ( analyze_skill_gap, optimize_summary, optimize_module, plan_edit, execute_record_edit, execute_record_add, ) from app.ai.skill_gap_analyzer.prompts import MODULE_SCHEMAS from app.core.logger import log from app.schemas.customize_resume import ( CustomizeResume, ResumeProfile, Education, Work, Internship, Project, Competition, Paragraph, ) from app.models.job import Job from app.models.user_resume import UserResume from app.services.resume_loader import ResumeDetail, load_resume_detail, load_default_resume_detail from app.services import customize_resume_store _CHARS = string.ascii_letters + string.digits # 模块名 → 中文标签映射 _MODULE_LABELS = { "resume": "个人简介", "education": "教育经历", "work": "工作经历", "internship": "实习经历", "project": "项目经历", "competition": "竞赛经历", } def _rand_id() -> str: """生成随机8位字符串标识""" return "".join(random.choices(_CHARS, k=8)) def _build_paragraphs(description: list[dict] | None) -> list[Paragraph]: """将数据库 description [{id, text}] 转为 Paragraph 列表,id 用随机8位替换""" if not description: return [] return [Paragraph(id=_rand_id(), text=item.get("text", "")) for item in description if isinstance(item, dict)] def _build_resume_json(detail: ResumeDetail) -> str: """拼装简历 JSON 字符串供 AI 使用""" resume = detail.resume data = { "skills": resume.skills or [], "certificates": resume.certificates or [], "summary": resume.summary or "", "targetPosition": resume.target_position or "", } if detail.education: data["education"] = [{"school": r.school, "major": r.major, "degree": r.degree, "description": r.description} for r in detail.education] if detail.work: data["work"] = [{"companyName": r.company_name, "position": r.position, "description": r.description} for r in detail.work] if detail.internship: data["internship"] = [{"companyName": r.company_name, "position": r.position, "description": r.description} for r in detail.internship] if detail.project: data["project"] = [{"companyName": r.company_name, "projectName": r.project_name, "role": r.role, "description": r.description} for r in detail.project] if detail.competition: data["competition"] = [{"competitionName": r.competition_name, "award": r.award, "description": r.description} for r in detail.competition] return json.dumps(data, ensure_ascii=False) class SkillGapService: def __init__(self, session: AsyncSession): self.session = session # ===== 差距分析 ===== async def analyze_skill_gap(self, user_id: int, job_id: int) -> dict: """差距分析完整流程:查简历 → 查岗位 → AI分析 → 计算匹配分""" # 1. 自动选择简历 detail = await load_default_resume_detail(self.session, user_id) # 2. 查岗位 job = await self._get_job(job_id) skill_tags: list[str] = job.skill_tags or [] # 3. skill_tags 为空 → 满分 if not skill_tags: return self._gap_result(10.0, job, detail.resume, []) # 4. 拼 AI 输入 resume_json = _build_resume_json(detail) # 5. AI 分析 missing = await analyze_skill_gap(skill_tags, resume_json) # 6. 计算匹配分 score = round((len(skill_tags) - len(missing)) / len(skill_tags) * 10, 1) return self._gap_result(score, job, detail.resume, missing) @staticmethod def _gap_result(score: float, job: Job, resume: UserResume, missing: list[str]) -> dict: return { "score": score, "job": {"jobId": str(job.id), "title": job.title, "skillTags": job.skill_tags or []}, "resume": {"resumeId": str(resume.id), "resumeName": resume.resume_name or "", "targetPosition": resume.target_position or ""}, "missingSkills": missing, } # ===== 生成定制简历 ===== async def generate_customize_resume(self, user_id: int, job_id: int, resume_id: int, optimize_modules: list[str], add_skills: list[str]) -> None: """生成定制简历:查数据 → 并发AI优化 → 存Redis""" if not optimize_modules: raise ValueError("请至少选择一个优化模块") # 1. 查简历 + 岗位 detail = await load_resume_detail(self.session, resume_id, user_id) job = await self._get_job(job_id) # 2. 组装基础定制简历 cr = self._build_customize_resume(detail) # 3. 并发 AI 优化 tasks = [] job_desc = f"{job.description or ''}\n{job.requirement or ''}" if "summary" in optimize_modules: tasks.append(("summary", optimize_summary(job.title or "", add_skills, detail.resume.summary or ""))) if "experience" in optimize_modules: for module_name, rows_json in self._experience_tasks(cr, job.title or "", job_desc): tasks.append((module_name, optimize_module(job.title or "", job_desc, rows_json))) # 执行并发 if tasks: keys = [t[0] for t in tasks] results = await asyncio.gather(*[t[1] for t in tasks], return_exceptions=True) for key, result in zip(keys, results): if isinstance(result, Exception): log.warning(f"定制简历优化[{key}]失败: {result}") continue self._apply_optimize_result(cr, key, result) # 4. skills 追加(纯内存操作) if "skills" in optimize_modules and add_skills: existing = set(cr.resume.skills) cr.resume.skills.extend([s for s in add_skills if s not in existing]) # 5. 存 Redis await customize_resume_store.save(user_id, cr) @staticmethod def _experience_tasks(cr: CustomizeResume, job_title: str, job_desc: str) -> list[tuple[str, str]]: """构建各子表的 AI 优化任务列表""" result = [] for name, items in [("education", cr.education), ("work", cr.work), ("internship", cr.internship), ("project", cr.project), ("competition", cr.competition)]: if items: result.append((name, json.dumps([item.model_dump(by_alias=True) for item in items], ensure_ascii=False))) return result @staticmethod def _apply_optimize_result(cr: CustomizeResume, key: str, result) -> None: """将 AI 优化结果应用到定制简历""" if key == "summary" and isinstance(result, str): cr.resume.summary = result elif key == "education" and isinstance(result, list): cr.education = [Education.model_validate(item) for item in result] elif key == "work" and isinstance(result, list): cr.work = [Work.model_validate(item) for item in result] elif key == "internship" and isinstance(result, list): cr.internship = [Internship.model_validate(item) for item in result] elif key == "project" and isinstance(result, list): cr.project = [Project.model_validate(item) for item in result] elif key == "competition" and isinstance(result, list): cr.competition = [Competition.model_validate(item) for item in result] # ===== AI 对话编辑 ===== async def ai_edit_customize_resume(self, user_id: int, job_id: int, instruction: str, chat_history: list) -> dict: """AI 对话式编辑定制简历(原子化操作版)""" # 1. 取当前定制简历 cr_data = await customize_resume_store.get(user_id) if not cr_data: raise ValueError("定制简历不存在,请先生成") cr = CustomizeResume.model_validate(cr_data) resume_json = cr.model_dump_json(by_alias=True) # 2. 查岗位 job = await self._get_job(job_id) job_desc = f"{job.description or ''}\n{job.requirement or ''}" # 3. 规划 AI(意图识别 + 操作原子化) history_str = json.dumps(chat_history, ensure_ascii=False) if chat_history else "无" plan = await plan_edit(job.title or "", job_desc, resume_json, history_str, instruction) if not plan: return {"type": "message", "message": "抱歉,我没有理解你的意思,请再描述一下。"} if plan.get("action") == "chat": return {"type": "message", "message": plan.get("message", "请再描述一下你的需求。")} # 4. 解析操作列表 operations = plan.get("operations", []) if not operations: return {"type": "message", "message": plan.get("message", "请再描述一下你的需求。")} # 截取最近10条对话历史 recent_history = chat_history[-10:] if len(chat_history) > 10 else chat_history recent_history_str = json.dumps(recent_history, ensure_ascii=False) if recent_history else "无" # 5. 按操作类型分发执行 # 先处理 delete(零 AI 开销) for op in operations: if op.get("type") == "delete": self._apply_delete(cr, op.get("module", ""), op.get("id", "")) # 并发执行 update 和 add ai_tasks = [] for op in operations: op_type = op.get("type", "") mod_name = op.get("module", "") op_instruction = op.get("instruction", "") schema = MODULE_SCHEMAS.get(mod_name, "") if op_type == "update": record_data = self._get_record_data(cr, mod_name, op.get("id")) if record_data is not None: ai_tasks.append(( "update", mod_name, op.get("id"), execute_record_edit( job.title or "", job_desc, op_instruction, recent_history_str, schema, record_data, ), )) elif op_type == "add": ai_tasks.append(( "add", mod_name, None, execute_record_add( job.title or "", job_desc, op_instruction, recent_history_str, schema, ), )) # 并发执行 if ai_tasks: coros = [t[3] for t in ai_tasks] results = await asyncio.gather(*coros, return_exceptions=True) for (op_type, mod_name, record_id, _), result in zip(ai_tasks, results): if isinstance(result, Exception): log.warning(f"AI编辑[{op_type}/{mod_name}/{record_id}]失败: {result}") continue if result is None: continue if op_type == "update": self._apply_record_update(cr, mod_name, record_id, result) elif op_type == "add": self._apply_record_add(cr, mod_name, result) # 6. 保存(自动备份回滚) await customize_resume_store.save(user_id, cr) # 拼接更新模块标签 updated_modules = list(dict.fromkeys(op.get("module", "") for op in operations)) label = "、".join(_MODULE_LABELS.get(m, m) for m in updated_modules if m) return {"type": "updated", "message": f"完成!已更新:{label or '简历内容'}"} @staticmethod def _get_record_data(cr: CustomizeResume, mod_name: str, record_id: str | None) -> str | None: """获取单条记录的 JSON 数据,resume 主表返回整个对象""" if mod_name == "resume": return cr.resume.model_dump_json(by_alias=True) mapping = { "education": cr.education, "work": cr.work, "internship": cr.internship, "project": cr.project, "competition": cr.competition, } items = mapping.get(mod_name, []) if not record_id: return None for item in items: if item.id == record_id: return item.model_dump_json(by_alias=True) log.warning(f"未找到记录[{mod_name}/{record_id}]") return None @staticmethod def _apply_delete(cr: CustomizeResume, mod_name: str, record_id: str) -> None: """删除指定模块中的一条记录""" if not record_id or mod_name == "resume": return mapping = { "education": cr.education, "work": cr.work, "internship": cr.internship, "project": cr.project, "competition": cr.competition, } items = mapping.get(mod_name) if items is not None: for i, item in enumerate(items): if item.id == record_id: items.pop(i) break @staticmethod def _apply_record_update(cr: CustomizeResume, mod_name: str, record_id: str | None, result) -> None: """将 AI 修改结果替换回对应记录""" try: if mod_name == "resume" and isinstance(result, dict): cr.resume = ResumeProfile.model_validate(result) return model_map = { "education": Education, "work": Work, "internship": Internship, "project": Project, "competition": Competition, } model_cls = model_map.get(mod_name) if not model_cls or not isinstance(result, dict) or not record_id: return list_map = { "education": cr.education, "work": cr.work, "internship": cr.internship, "project": cr.project, "competition": cr.competition, } items = list_map.get(mod_name, []) new_item = model_cls.model_validate(result) for i, item in enumerate(items): if item.id == record_id: items[i] = new_item break except Exception as e: log.warning(f"应用AI编辑结果[{mod_name}/{record_id}]失败: {e}") @staticmethod def _apply_record_add(cr: CustomizeResume, mod_name: str, result) -> None: """将 AI 新增的记录追加到对应模块""" try: model_map = { "education": (Education, cr.education), "work": (Work, cr.work), "internship": (Internship, cr.internship), "project": (Project, cr.project), "competition": (Competition, cr.competition), } entry = model_map.get(mod_name) if not entry or not isinstance(result, dict): return model_cls, items = entry items.append(model_cls.model_validate(result)) except Exception as e: log.warning(f"应用AI新增记录[{mod_name}]失败: {e}") # ===== 内部工具方法 ===== async def _get_job(self, job_id: int) -> Job: """查岗位""" result = await self.session.execute(select(Job).where(Job.id == job_id)) job = result.scalar_one_or_none() if not job: raise ValueError("岗位不存在") return job def _build_customize_resume(self, detail: ResumeDetail) -> CustomizeResume: """从 ResumeDetail 组装 CustomizeResume""" resume = detail.resume profile = ResumeProfile( avatarUrl=resume.avatar_url or "", name=resume.name or "", email=resume.email or "", mobileNumber=resume.mobile_number or "", city=resume.city or "", wechatNumber=resume.wechat_number or "", portfolioUrl=resume.portfolio_url or "", skills=resume.skills or [], certificates=resume.certificates or [], summary=resume.summary or "", ) return CustomizeResume( resume=profile, education=[Education(id=_rand_id(), school=r.school or "", major=r.major or "", degree=r.degree or "", studyType=r.study_type or "", startDate=r.start_date or "", endDate=r.end_date or "", description=_build_paragraphs(r.description)) for r in detail.education], work=[Work(id=_rand_id(), companyName=r.company_name or "", position=r.position or "", startDate=r.start_date or "", endDate=r.end_date or "", description=_build_paragraphs(r.description)) for r in detail.work], internship=[Internship(id=_rand_id(), companyName=r.company_name or "", position=r.position or "", startDate=r.start_date or "", endDate=r.end_date or "", description=_build_paragraphs(r.description)) for r in detail.internship], project=[Project(id=_rand_id(), companyName=r.company_name or "", projectName=r.project_name or "", role=r.role or "", startDate=r.start_date or "", endDate=r.end_date or "", description=_build_paragraphs(r.description)) for r in detail.project], competition=[Competition(id=_rand_id(), competitionName=r.competition_name or "", award=r.award or "", awardDate=r.award_date or "", description=_build_paragraphs(r.description)) for r in detail.competition], )