Files
offerpai_python_ai/app/services/resume_diagnose_service.py
2026-04-24 19:16:11 +08:00

275 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""简历诊断 Service
加载简历描述数据 → 并行 AI 诊断 → 统计评级 → AI 汇总评价 → 写入数据库。
依赖:resume_diagnoserAI诊断引擎)
使用表:bg_user_resume + 5张子表(读)、bg_resume_diagnosis_report + issue(写)
"""
import json
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.logger import log
from app.models.resume_diagnosis_issue import ResumeDiagnosisIssue
from app.models.resume_diagnosis_report import ResumeDiagnosisReport
from app.models.user_resume import UserResume
from app.services.resume_loader import load_resume_detail
from app.tool.snowflake import next_id
# 模块中文名映射
_MODULE_LABELS = {
"summary": "个人概述", "education": "教育经历", "work": "工作经历",
"internship": "实习经历", "project": "项目经历", "competition": "竞赛经历",
}
class ResumeDiagnoseService:
def __init__(self, session: AsyncSession):
self.session = session
async def load_resume_data(self, resume_id: int, user_id: int) -> tuple[UserResume, list[dict]]:
"""加载简历主表 + 5 张子表数据,组装 AI 任务列表"""
detail = await load_resume_detail(self.session, resume_id, user_id)
resume = detail.resume
target_position = resume.target_position or ""
tasks: list[dict] = []
# summary
if resume.summary and resume.summary.strip():
tasks.append({
"module_type": "个人概述", "target_position": target_position or "未指定",
"context": f"姓名: {resume.name or '未填写'}",
"description_text": resume.summary,
"_module_type_key": "summary", "_module_record_id": resume_id,
})
# 子表
self._collect_tasks(tasks, target_position, "education", detail.education,
lambda r: f"学校: {r.school or ''}, 专业: {r.major or ''}, 学历: {r.degree or ''}")
self._collect_tasks(tasks, target_position, "work", detail.work,
lambda r: f"公司: {r.company_name or ''}, 职位: {r.position or ''}")
self._collect_tasks(tasks, target_position, "internship", detail.internship,
lambda r: f"公司: {r.company_name or ''}, 职位: {r.position or ''}")
self._collect_tasks(tasks, target_position, "project", detail.project,
lambda r: f"公司: {r.company_name or ''}, 项目: {r.project_name or ''}, 角色: {r.role or ''}")
self._collect_tasks(tasks, target_position, "competition", detail.competition,
lambda r: f"竞赛: {r.competition_name or ''}, 获奖: {r.award or ''}")
return resume, tasks
@staticmethod
def _collect_tasks(tasks: list[dict], target_position: str,
module_type: str, records: list, context_fn):
"""将有 description 的记录加入 tasks"""
for record in records:
desc_text = _build_description_text(record.description)
if not desc_text:
continue
tasks.append({
"module_type": _MODULE_LABELS[module_type],
"target_position": target_position or "未指定",
"context": context_fn(record),
"description_text": desc_text,
"_module_type_key": module_type, "_module_record_id": record.id,
"_original_description": record.description,
})
async def save_report(self, resume_id: int, user_id: int, grade: str, summary: str,
urgent_total: int, important_total: int, expression_total: int,
tasks: list[dict], ai_results: list[dict]) -> int:
"""纯写入:接收已算好的 grade、summary、统计数据,写入 report + issues"""
report_id = next_id()
self.session.add(ResumeDiagnosisReport(
id=report_id, resume_id=resume_id, user_id=user_id,
grade=grade, summary=summary,
urgent_total=urgent_total, important_total=important_total, expression_total=expression_total,
))
for task, ai_result in zip(tasks, ai_results):
if not _has_issues(ai_result):
continue
self.session.add(ResumeDiagnosisIssue(
id=next_id(), report_id=report_id, resume_id=resume_id, user_id=user_id,
module_type=task["_module_type_key"], module_record_id=task["_module_record_id"],
finding=ai_result.get("finding", ""), importance=ai_result.get("importance", ""),
suggestion=ai_result.get("suggestion", ""),
urgent_issues=ai_result.get("urgent_issues"), important_issues=ai_result.get("important_issues"),
expression_issues=ai_result.get("expression_issues"),
optimized_content=_build_optimized_content(task, ai_result.get("optimized_content")),
status=0, user_feedback=0,
))
await self.session.flush()
log.info(f"诊断报告保存完成 reportId:{report_id} grade:{grade}")
return report_id
async def get_latest_report(self, resume_id: int, user_id: int) -> dict | None:
"""查询最近一次诊断报告 + 所有 issues"""
result = await self.session.execute(
select(ResumeDiagnosisReport).where(
ResumeDiagnosisReport.resume_id == resume_id, ResumeDiagnosisReport.user_id == user_id,
).order_by(desc(ResumeDiagnosisReport.create_time)).limit(1))
report = result.scalar_one_or_none()
if report is None:
return None
result = await self.session.execute(
select(ResumeDiagnosisIssue).where(ResumeDiagnosisIssue.report_id == report.id))
issues = result.scalars().all()
return {
"report": {
"id": str(report.id), "resumeId": str(report.resume_id),
"grade": report.grade, "summary": report.summary,
"urgentTotal": report.urgent_total, "importantTotal": report.important_total,
"expressionTotal": report.expression_total,
"createTime": report.create_time.strftime("%Y-%m-%d %H:%M:%S") if report.create_time else None,
},
"issues": [_issue_to_dict(i) for i in issues],
}
async def resolve_issue(self, issue_id: int, user_id: int) -> None:
"""标记问题已处理"""
result = await self.session.execute(
select(ResumeDiagnosisIssue).where(
ResumeDiagnosisIssue.id == issue_id, ResumeDiagnosisIssue.user_id == user_id))
issue = result.scalar_one_or_none()
if issue is None:
raise ValueError("诊断问题不存在")
issue.status = 1
await self.session.flush()
async def update_feedback(self, issue_id: int, user_id: int, user_feedback: int) -> None:
"""单独更新用户评价"""
result = await self.session.execute(
select(ResumeDiagnosisIssue).where(
ResumeDiagnosisIssue.id == issue_id, ResumeDiagnosisIssue.user_id == user_id))
issue = result.scalar_one_or_none()
if issue is None:
raise ValueError("诊断问题不存在")
issue.user_feedback = user_feedback
await self.session.flush()
async def get_issue_for_polish(self, issue_id: int, user_id: int) -> dict:
"""获取 issue 润色所需的上下文信息"""
result = await self.session.execute(
select(ResumeDiagnosisIssue).where(
ResumeDiagnosisIssue.id == issue_id, ResumeDiagnosisIssue.user_id == user_id))
issue = result.scalar_one_or_none()
if issue is None:
raise ValueError("诊断问题不存在")
return {
"module_type": issue.module_type,
"module_label": _MODULE_LABELS.get(issue.module_type, issue.module_type),
"optimized_content": issue.optimized_content,
"is_summary": issue.module_type == "summary",
}
# ===== 工具函数 =====
def _build_optimized_content(task: dict, ai_texts: list[str] | None):
"""将 AI 返回的纯文本数组映射回存储格式
- summary 模块:取第一个元素作为纯文本字符串
- 子表模块:用原始 description 的 id + AI 改写的 text 组合成 [{id, text}]
"""
if not ai_texts or not isinstance(ai_texts, list):
return None
original = task.get("_original_description")
if original is None:
# summary 模块,存纯文本
return ai_texts[0] if ai_texts else None
# 子表模块,映射回 [{id, text}]
result = []
for i, item in enumerate(original):
if not isinstance(item, dict):
continue
text = ai_texts[i] if i < len(ai_texts) else item.get("text", "")
result.append({"id": item.get("id"), "text": text})
return result
def _build_description_text(description: list[dict] | None) -> str:
"""子表 description [{id, text}] → JSON 字符串传给 AI(保留 id 以便 AI 返回同格式)"""
if not description:
return ""
valid = [item for item in description if isinstance(item, dict) and item.get("text")]
if not valid:
return ""
return json.dumps(valid, ensure_ascii=False)
def aggregate_results(tasks: list[dict], ai_results: list[dict]) -> dict:
"""统计汇总 + 评级,返回 {grade, urgent_total, important_total, expression_total, has_weak_relevance, all_findings}"""
urgent_total = 0
important_total = 0
expression_total = 0
has_weak_relevance = False
all_findings: list[str] = []
for task, ai_result in zip(tasks, ai_results):
urgent = ai_result.get("urgent_issues", {})
important = ai_result.get("important_issues", {})
expression = ai_result.get("expression_issues", {})
urgent_total += sum(v for v in urgent.values() if isinstance(v, int))
important_total += sum(v for v in important.values() if isinstance(v, int))
expression_total += sum(v for v in expression.values() if isinstance(v, int))
if important.get("weak_relevance", 0) > 0:
has_weak_relevance = True
finding = ai_result.get("finding", "")
if finding and _has_issues(ai_result):
label = _MODULE_LABELS.get(task["_module_type_key"], task["_module_type_key"])
all_findings.append(f"{label}{finding}")
grade = _calc_grade(urgent_total, important_total, expression_total, has_weak_relevance)
return {
"grade": grade, "urgent_total": urgent_total,
"important_total": important_total, "expression_total": expression_total,
"all_findings": "\n".join(all_findings),
}
def _calc_grade(urgent: int, important: int, expression: int, has_weak_relevance: bool) -> str:
"""评级硬算:D → C → B → A
Aurgent=0, important<=1, expression<=1
Burgent=0, important<=3, expression<=2(且不满足A
Curgent=1, 或 important 3-4
Durgent>=2, 或 (important>=4 且 has_weak_relevance)
"""
if urgent >= 2 or (important >= 4 and has_weak_relevance):
return "D"
if urgent == 1 or 3 <= important <= 4:
return "C"
if urgent == 0 and important <= 1 and expression <= 1:
return "A"
if urgent == 0 and important <= 3 and expression <= 2:
return "B"
return "C"
def _has_issues(ai_result: dict) -> bool:
"""判断诊断结果是否存在问题(所有计数都为 0 则无问题)"""
for key in ("urgent_issues", "important_issues", "expression_issues"):
counts = ai_result.get(key, {})
if any(v > 0 for v in counts.values() if isinstance(v, int)):
return True
return False
def _issue_to_dict(issue: ResumeDiagnosisIssue) -> dict:
"""ORM → API 响应字典"""
return {
"id": str(issue.id), "moduleType": issue.module_type,
"moduleRecordId": str(issue.module_record_id),
"finding": issue.finding, "importance": issue.importance, "suggestion": issue.suggestion,
"urgentIssues": issue.urgent_issues, "importantIssues": issue.important_issues,
"expressionIssues": issue.expression_issues, "optimizedContent": issue.optimized_content,
"status": issue.status, "userFeedback": issue.user_feedback,
}