"""简历诊断 Service 加载简历描述数据 → 并行 AI 诊断 → 统计评级 → AI 汇总评价 → 写入数据库。 依赖:resume_diagnoser(AI诊断引擎) 使用表:bg_user_resume + 5张子表(读)、bg_resume_diagnosis_report + issue(写) """ import json from sqlalchemy import select, desc from sqlalchemy.ext.asyncio import AsyncSession from app.core.logger import log from app.models.resume_diagnosis_issue import ResumeDiagnosisIssue from app.models.resume_diagnosis_report import ResumeDiagnosisReport from app.models.user_resume import UserResume from app.services.resume_loader import load_resume_detail from app.tool.snowflake import next_id # 模块中文名映射 _MODULE_LABELS = { "summary": "个人概述", "education": "教育经历", "work": "工作经历", "internship": "实习经历", "project": "项目经历", "competition": "竞赛经历", } class ResumeDiagnoseService: def __init__(self, session: AsyncSession): self.session = session async def load_resume_data(self, resume_id: int, user_id: int) -> tuple[UserResume, list[dict]]: """加载简历主表 + 5 张子表数据,组装 AI 任务列表""" detail = await load_resume_detail(self.session, resume_id, user_id) resume = detail.resume target_position = resume.target_position or "" tasks: list[dict] = [] # summary if resume.summary and resume.summary.strip(): tasks.append({ "module_type": "个人概述", "target_position": target_position or "未指定", "context": f"姓名: {resume.name or '未填写'}", "description_text": resume.summary, "_module_type_key": "summary", "_module_record_id": resume_id, }) # 子表 self._collect_tasks(tasks, target_position, "education", detail.education, lambda r: f"学校: {r.school or ''}, 专业: {r.major or ''}, 学历: {r.degree or ''}") self._collect_tasks(tasks, target_position, "work", detail.work, lambda r: f"公司: {r.company_name or ''}, 职位: {r.position or ''}") self._collect_tasks(tasks, target_position, "internship", detail.internship, lambda r: f"公司: {r.company_name or ''}, 职位: {r.position or ''}") self._collect_tasks(tasks, target_position, "project", detail.project, lambda r: f"公司: {r.company_name or ''}, 项目: {r.project_name or ''}, 角色: {r.role or ''}") self._collect_tasks(tasks, target_position, "competition", detail.competition, lambda r: f"竞赛: {r.competition_name or ''}, 获奖: {r.award or ''}") return resume, tasks @staticmethod def _collect_tasks(tasks: list[dict], target_position: str, module_type: str, records: list, context_fn): """将有 description 的记录加入 tasks""" for record in records: desc_text = _build_description_text(record.description) if not desc_text: continue tasks.append({ "module_type": _MODULE_LABELS[module_type], "target_position": target_position or "未指定", "context": context_fn(record), "description_text": desc_text, "_module_type_key": module_type, "_module_record_id": record.id, "_original_description": record.description, }) async def save_report(self, resume_id: int, user_id: int, grade: str, summary: str, urgent_total: int, important_total: int, expression_total: int, tasks: list[dict], ai_results: list[dict]) -> int: """纯写入:接收已算好的 grade、summary、统计数据,写入 report + issues""" report_id = next_id() self.session.add(ResumeDiagnosisReport( id=report_id, resume_id=resume_id, user_id=user_id, grade=grade, summary=summary, urgent_total=urgent_total, important_total=important_total, expression_total=expression_total, )) for task, ai_result in zip(tasks, ai_results): if not _has_issues(ai_result): continue self.session.add(ResumeDiagnosisIssue( id=next_id(), report_id=report_id, resume_id=resume_id, user_id=user_id, module_type=task["_module_type_key"], module_record_id=task["_module_record_id"], finding=ai_result.get("finding", ""), importance=ai_result.get("importance", ""), suggestion=ai_result.get("suggestion", ""), urgent_issues=ai_result.get("urgent_issues"), important_issues=ai_result.get("important_issues"), expression_issues=ai_result.get("expression_issues"), optimized_content=_build_optimized_content(task, ai_result.get("optimized_content")), status=0, user_feedback=0, )) await self.session.flush() log.info(f"诊断报告保存完成 reportId:{report_id} grade:{grade}") return report_id async def get_latest_report(self, resume_id: int, user_id: int) -> dict | None: """查询最近一次诊断报告 + 所有 issues""" result = await self.session.execute( select(ResumeDiagnosisReport).where( ResumeDiagnosisReport.resume_id == resume_id, ResumeDiagnosisReport.user_id == user_id, ).order_by(desc(ResumeDiagnosisReport.create_time)).limit(1)) report = result.scalar_one_or_none() if report is None: return None result = await self.session.execute( select(ResumeDiagnosisIssue).where(ResumeDiagnosisIssue.report_id == report.id)) issues = result.scalars().all() return { "report": { "id": str(report.id), "resumeId": str(report.resume_id), "grade": report.grade, "summary": report.summary, "urgentTotal": report.urgent_total, "importantTotal": report.important_total, "expressionTotal": report.expression_total, "createTime": report.create_time.strftime("%Y-%m-%d %H:%M:%S") if report.create_time else None, }, "issues": [_issue_to_dict(i) for i in issues], } async def resolve_issue(self, issue_id: int, user_id: int) -> None: """标记问题已处理""" result = await self.session.execute( select(ResumeDiagnosisIssue).where( ResumeDiagnosisIssue.id == issue_id, ResumeDiagnosisIssue.user_id == user_id)) issue = result.scalar_one_or_none() if issue is None: raise ValueError("诊断问题不存在") issue.status = 1 await self.session.flush() async def update_feedback(self, issue_id: int, user_id: int, user_feedback: int) -> None: """单独更新用户评价""" result = await self.session.execute( select(ResumeDiagnosisIssue).where( ResumeDiagnosisIssue.id == issue_id, ResumeDiagnosisIssue.user_id == user_id)) issue = result.scalar_one_or_none() if issue is None: raise ValueError("诊断问题不存在") issue.user_feedback = user_feedback await self.session.flush() async def get_issue_for_polish(self, issue_id: int, user_id: int) -> dict: """获取 issue 润色所需的上下文信息""" result = await self.session.execute( select(ResumeDiagnosisIssue).where( ResumeDiagnosisIssue.id == issue_id, ResumeDiagnosisIssue.user_id == user_id)) issue = result.scalar_one_or_none() if issue is None: raise ValueError("诊断问题不存在") return { "module_type": issue.module_type, "module_label": _MODULE_LABELS.get(issue.module_type, issue.module_type), "optimized_content": issue.optimized_content, "is_summary": issue.module_type == "summary", } # ===== 工具函数 ===== def _build_optimized_content(task: dict, ai_texts: list[str] | None): """将 AI 返回的纯文本数组映射回存储格式 - summary 模块:取第一个元素作为纯文本字符串 - 子表模块:用原始 description 的 id + AI 改写的 text 组合成 [{id, text}] """ if not ai_texts or not isinstance(ai_texts, list): return None original = task.get("_original_description") if original is None: # summary 模块,存纯文本 return ai_texts[0] if ai_texts else None # 子表模块,映射回 [{id, text}] result = [] for i, item in enumerate(original): if not isinstance(item, dict): continue text = ai_texts[i] if i < len(ai_texts) else item.get("text", "") result.append({"id": item.get("id"), "text": text}) return result def _build_description_text(description: list[dict] | None) -> str: """子表 description [{id, text}] → JSON 字符串传给 AI(保留 id 以便 AI 返回同格式)""" if not description: return "" valid = [item for item in description if isinstance(item, dict) and item.get("text")] if not valid: return "" return json.dumps(valid, ensure_ascii=False) def aggregate_results(tasks: list[dict], ai_results: list[dict]) -> dict: """统计汇总 + 评级,返回 {grade, urgent_total, important_total, expression_total, has_weak_relevance, all_findings}""" urgent_total = 0 important_total = 0 expression_total = 0 has_weak_relevance = False all_findings: list[str] = [] for task, ai_result in zip(tasks, ai_results): urgent = ai_result.get("urgent_issues", {}) important = ai_result.get("important_issues", {}) expression = ai_result.get("expression_issues", {}) urgent_total += sum(v for v in urgent.values() if isinstance(v, int)) important_total += sum(v for v in important.values() if isinstance(v, int)) expression_total += sum(v for v in expression.values() if isinstance(v, int)) if important.get("weak_relevance", 0) > 0: has_weak_relevance = True finding = ai_result.get("finding", "") if finding and _has_issues(ai_result): label = _MODULE_LABELS.get(task["_module_type_key"], task["_module_type_key"]) all_findings.append(f"【{label}】{finding}") grade = _calc_grade(urgent_total, important_total, expression_total, has_weak_relevance) return { "grade": grade, "urgent_total": urgent_total, "important_total": important_total, "expression_total": expression_total, "all_findings": "\n".join(all_findings), } def _calc_grade(urgent: int, important: int, expression: int, has_weak_relevance: bool) -> str: """评级硬算:D → C → B → A A:urgent=0, important<=1, expression<=1 B:urgent=0, important<=3, expression<=2(且不满足A) C:urgent=1, 或 important 3-4 D:urgent>=2, 或 (important>=4 且 has_weak_relevance) """ if urgent >= 2 or (important >= 4 and has_weak_relevance): return "D" if urgent == 1 or 3 <= important <= 4: return "C" if urgent == 0 and important <= 1 and expression <= 1: return "A" if urgent == 0 and important <= 3 and expression <= 2: return "B" return "C" def _has_issues(ai_result: dict) -> bool: """判断诊断结果是否存在问题(所有计数都为 0 则无问题)""" for key in ("urgent_issues", "important_issues", "expression_issues"): counts = ai_result.get(key, {}) if any(v > 0 for v in counts.values() if isinstance(v, int)): return True return False def _issue_to_dict(issue: ResumeDiagnosisIssue) -> dict: """ORM → API 响应字典""" return { "id": str(issue.id), "moduleType": issue.module_type, "moduleRecordId": str(issue.module_record_id), "finding": issue.finding, "importance": issue.importance, "suggestion": issue.suggestion, "urgentIssues": issue.urgent_issues, "importantIssues": issue.important_issues, "expressionIssues": issue.expression_issues, "optimizedContent": issue.optimized_content, "status": issue.status, "userFeedback": issue.user_feedback, }