"""求职助手 Agent 对话 + 岗位简历优化 Service 主要功能:查询简历数据,调用 AI 模块完成对话;针对岗位并发优化简历。 依赖:resume_loader(简历统一查询)、customize_resume_store(定制简历存取+构建)、job_agent.chat AI 模块、job_agent.resume_optimizer(岗位简历优化) 使用表:bg_user_resume + 5张子表(通过 resume_loader 查询)、bg_job(查岗位) """ import asyncio import json import time from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.ai.job_agent.chat import agent_chat from app.ai.job_agent.resume_optimizer import optimize_summary, optimize_experience_record from app.core.logger import log from app.models.job import Job from app.schemas.customize_resume import CustomizeResume, Education, Work, Internship, Project, Competition from app.services.resume_loader import ResumeDetail, load_resume_detail from app.services import customize_resume_store class JobAgentChatService: def __init__(self, session: AsyncSession): self.session = session async def chat(self, user_id: int, resume_id: int, message: str, history: list[dict], job_categories: list[str], regions: list[str], industries: list[str]) -> dict: """求职助手对话:查简历 → 序列化 → 调 AI 模块""" detail = await load_resume_detail(self.session, resume_id, user_id) resume_text = self._build_resume_text(detail) return await agent_chat(resume_text, message, history, job_categories, regions, industries) @staticmethod def _build_resume_text(detail: ResumeDetail) -> str: """将简历数据序列化为文本供 AI 使用""" resume = detail.resume parts = [] if resume.name: parts.append(f"姓名:{resume.name}") if resume.target_position: parts.append(f"目标岗位:{resume.target_position}") if resume.skills: parts.append(f"技能:{'、'.join(resume.skills)}") if resume.certificates: parts.append(f"证书:{'、'.join(resume.certificates)}") if resume.summary: parts.append(f"个人概述:{resume.summary}") if detail.education: parts.append("教育经历:") for r in detail.education: parts.append(f" - {r.school or ''} {r.major or ''} {r.degree or ''}") if detail.work: parts.append("工作经历:") for r in detail.work: parts.append(f" - {r.company_name or ''} {r.position or ''}") if detail.internship: parts.append("实习经历:") for r in detail.internship: parts.append(f" - {r.company_name or ''} {r.position or ''}") if detail.project: parts.append("项目经历:") for r in detail.project: parts.append(f" - {r.project_name or ''} {r.role or ''}") if detail.competition: parts.append("竞赛经历:") for r in detail.competition: parts.append(f" - {r.competition_name or ''} {r.award or ''}") return "\n".join(parts) if parts else "暂无简历信息" async def optimize_resume(self, user_id: int, resume_id: int, job_id: int) -> dict: """针对岗位优化简历:查简历+岗位 → 构建定制简历 → 按单条记录并发AI优化 → 存数据库 → 返回""" # 1. 查简历 + 岗位 detail = await load_resume_detail(self.session, resume_id, user_id) job = await self._get_job(job_id) # 2. 构建定制简历 cr = customize_resume_store.build_from_detail(detail) # 3. 构建并发任务(按单条记录粒度) tasks: list[tuple[str, int, object]] = [] job_desc = f"{job.description or ''}\n{job.requirement or ''}" if cr.resume.summary: tasks.append(("summary", 0, optimize_summary(job.title or "", job_desc, cr.resume.summary))) for mod_name, idx, record_json in self._experience_tasks(cr): tasks.append((mod_name, idx, optimize_experience_record(job.title or "", job_desc, record_json))) log.info(f"岗位简历优化开始: {len(tasks)}个并发任务 [job={job_id}, resume={resume_id}]") # 4. 并发执行 if tasks: t0 = time.monotonic() results = await asyncio.gather(*[t[2] for t in tasks], return_exceptions=True) log.info(f"岗位简历优化全部完成, 总耗时={round(time.monotonic() - t0, 2)}s") for (mod_name, idx, _), result in zip(tasks, results): if isinstance(result, Exception): log.warning(f"岗位简历优化[{mod_name}[{idx}]]失败: {result}") continue if result is None: continue self._apply_optimize_result(cr, mod_name, idx, result) # 5. 存数据库 await customize_resume_store.save(user_id, job_id, cr) log.info(f"岗位简历优化已保存 [user={user_id}, job={job_id}]") # 6. 返回 return cr.model_dump(by_alias=True) @staticmethod def _experience_tasks(cr: CustomizeResume) -> list[tuple[str, int, str]]: """构建各子表的 AI 优化任务列表,按单条记录拆分""" result: list[tuple[str, int, str]] = [] for name, items in [("education", cr.education), ("work", cr.work), ("internship", cr.internship), ("project", cr.project), ("competition", cr.competition)]: for idx, item in enumerate(items or []): result.append((name, idx, json.dumps(item.model_dump(by_alias=True), ensure_ascii=False))) return result async def _get_job(self, job_id: int) -> Job: """查岗位""" result = await self.session.execute(select(Job).where(Job.id == job_id)) job = result.scalar_one_or_none() if not job: raise ValueError("岗位不存在") return job @staticmethod def _apply_optimize_result(cr: CustomizeResume, key: str, idx: int, result) -> None: """将 AI 优化结果应用到定制简历(单条记录粒度)""" if key == "summary" and isinstance(result, str): cr.resume.summary = result return model_map = {"education": Education, "work": Work, "internship": Internship, "project": Project, "competition": Competition} list_map = {"education": cr.education, "work": cr.work, "internship": cr.internship, "project": cr.project, "competition": cr.competition} model_cls = model_map.get(key) items = list_map.get(key) if model_cls is None or items is None: log.warning(f"未知优化模块: {key}") return if isinstance(result, dict): try: items[idx] = model_cls.model_validate(result) except (IndexError, Exception) as e: log.warning(f"应用优化结果[{key}[{idx}]]失败: {e}") elif isinstance(result, list) and len(result) == 1 and isinstance(result[0], dict): # 兼容 LLM 偶尔返回单元素数组的情况 try: items[idx] = model_cls.model_validate(result[0]) except (IndexError, Exception) as e: log.warning(f"应用优化结果[{key}[{idx}]]失败(数组兼容): {e}") else: log.warning(f"优化结果格式异常[{key}[{idx}]]: type={type(result).__name__}")