127 lines
6.1 KiB
Python
127 lines
6.1 KiB
Python
"""求职助手 Agent 对话 + 岗位简历优化 Service
|
|
|
|
主要功能:查询简历数据,调用 AI 模块完成对话;针对岗位并发优化简历。
|
|
依赖:resume_loader(简历统一查询)、customize_resume_store(定制简历存取+构建)、job_agent.chat AI 模块、job_agent.resume_optimizer(岗位简历优化)
|
|
使用表:bg_user_resume + 5张子表(通过 resume_loader 查询)、bg_job(查岗位)
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.ai.job_agent.chat import agent_chat
|
|
from app.ai.job_agent.resume_optimizer import optimize_summary, optimize_experience
|
|
from app.core.logger import log
|
|
from app.models.job import Job
|
|
from app.schemas.customize_resume import CustomizeResume, Education, Work, Internship, Project, Competition
|
|
from app.services.resume_loader import ResumeDetail, load_resume_detail
|
|
from app.services import customize_resume_store
|
|
|
|
|
|
class JobAgentChatService:
|
|
|
|
def __init__(self, session: AsyncSession):
|
|
self.session = session
|
|
|
|
async def chat(self, user_id: int, resume_id: int, message: str,
|
|
history: list[dict], job_categories: list[str],
|
|
regions: list[str], industries: list[str]) -> dict:
|
|
"""求职助手对话:查简历 → 序列化 → 调 AI 模块"""
|
|
detail = await load_resume_detail(self.session, resume_id, user_id)
|
|
resume_text = self._build_resume_text(detail)
|
|
return await agent_chat(resume_text, message, history, job_categories, regions, industries)
|
|
|
|
@staticmethod
|
|
def _build_resume_text(detail: ResumeDetail) -> str:
|
|
"""将简历数据序列化为文本供 AI 使用"""
|
|
resume = detail.resume
|
|
parts = []
|
|
if resume.name:
|
|
parts.append(f"姓名:{resume.name}")
|
|
if resume.target_position:
|
|
parts.append(f"目标岗位:{resume.target_position}")
|
|
if resume.skills:
|
|
parts.append(f"技能:{'、'.join(resume.skills)}")
|
|
if resume.certificates:
|
|
parts.append(f"证书:{'、'.join(resume.certificates)}")
|
|
if resume.summary:
|
|
parts.append(f"个人概述:{resume.summary}")
|
|
if detail.education:
|
|
parts.append("教育经历:")
|
|
for r in detail.education:
|
|
parts.append(f" - {r.school or ''} {r.major or ''} {r.degree or ''}")
|
|
if detail.work:
|
|
parts.append("工作经历:")
|
|
for r in detail.work:
|
|
parts.append(f" - {r.company_name or ''} {r.position or ''}")
|
|
if detail.internship:
|
|
parts.append("实习经历:")
|
|
for r in detail.internship:
|
|
parts.append(f" - {r.company_name or ''} {r.position or ''}")
|
|
if detail.project:
|
|
parts.append("项目经历:")
|
|
for r in detail.project:
|
|
parts.append(f" - {r.project_name or ''} {r.role or ''}")
|
|
if detail.competition:
|
|
parts.append("竞赛经历:")
|
|
for r in detail.competition:
|
|
parts.append(f" - {r.competition_name or ''} {r.award or ''}")
|
|
return "\n".join(parts) if parts else "暂无简历信息"
|
|
|
|
async def optimize_resume(self, user_id: int, resume_id: int, job_id: int) -> dict:
|
|
"""针对岗位优化简历:查简历+岗位 → 构建定制简历 → 并发AI优化 → 存Redis → 返回"""
|
|
# 1. 查简历 + 岗位
|
|
detail = await load_resume_detail(self.session, resume_id, user_id)
|
|
job = await self._get_job(job_id)
|
|
# 2. 构建定制简历
|
|
cr = customize_resume_store.build_from_detail(detail)
|
|
# 3. 并发 AI 优化(summary + 5张子表经历)
|
|
tasks = []
|
|
job_desc = f"{job.description or ''}\n{job.requirement or ''}"
|
|
if cr.resume.summary:
|
|
tasks.append(("summary", optimize_summary(job.title or "", job_desc, cr.resume.summary)))
|
|
for name, items in [("education", cr.education), ("work", cr.work), ("internship", cr.internship),
|
|
("project", cr.project), ("competition", cr.competition)]:
|
|
if items:
|
|
rows_json = json.dumps([item.model_dump(by_alias=True) for item in items], ensure_ascii=False)
|
|
tasks.append((name, optimize_experience(job.title or "", job_desc, rows_json)))
|
|
# 执行并发
|
|
if tasks:
|
|
keys = [t[0] for t in tasks]
|
|
results = await asyncio.gather(*[t[1] for t in tasks], return_exceptions=True)
|
|
for key, result in zip(keys, results):
|
|
if isinstance(result, Exception):
|
|
log.warning(f"岗位简历优化[{key}]失败: {result}")
|
|
continue
|
|
self._apply_optimize_result(cr, key, result)
|
|
# 4. 存数据库
|
|
await customize_resume_store.save(user_id, job_id, cr)
|
|
# 5. 返回
|
|
return cr.model_dump(by_alias=True)
|
|
|
|
async def _get_job(self, job_id: int) -> Job:
|
|
"""查岗位"""
|
|
result = await self.session.execute(select(Job).where(Job.id == job_id))
|
|
job = result.scalar_one_or_none()
|
|
if not job:
|
|
raise ValueError("岗位不存在")
|
|
return job
|
|
|
|
@staticmethod
|
|
def _apply_optimize_result(cr: CustomizeResume, key: str, result) -> None:
|
|
"""将 AI 优化结果应用到定制简历"""
|
|
if key == "summary" and isinstance(result, str):
|
|
cr.resume.summary = result
|
|
elif key == "education" and isinstance(result, list):
|
|
cr.education = [Education.model_validate(item) for item in result]
|
|
elif key == "work" and isinstance(result, list):
|
|
cr.work = [Work.model_validate(item) for item in result]
|
|
elif key == "internship" and isinstance(result, list):
|
|
cr.internship = [Internship.model_validate(item) for item in result]
|
|
elif key == "project" and isinstance(result, list):
|
|
cr.project = [Project.model_validate(item) for item in result]
|
|
elif key == "competition" and isinstance(result, list):
|
|
cr.competition = [Competition.model_validate(item) for item in result]
|