添加简历诊断功能

This commit is contained in:
zk
2026-04-07 20:15:43 +08:00
parent 602f226377
commit 8ffcb351a6
10 changed files with 1004 additions and 10 deletions
+257
View File
@@ -0,0 +1,257 @@
"""简历诊断 Service
加载简历描述数据 → 并行 AI 诊断 → 统计评级 → AI 汇总评价 → 写入数据库。
依赖:resume_diagnoserAI诊断引擎)
使用表:bg_user_resume + 5张子表(读)、bg_resume_diagnosis_report + issue(写)
"""
import json
import shortuuid
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.logger import log
from app.models.resume_diagnosis_issue import ResumeDiagnosisIssue
from app.models.resume_diagnosis_report import ResumeDiagnosisReport
from app.models.user_resume import UserResume
from app.models.user_resume_competition import UserResumeCompetition
from app.models.user_resume_education import UserResumeEducation
from app.models.user_resume_internship import UserResumeInternship
from app.models.user_resume_project import UserResumeProject
from app.models.user_resume_work import UserResumeWork
from app.tool.snowflake import next_id
# 模块中文名映射
_MODULE_LABELS = {
"summary": "个人概述", "education": "教育经历", "work": "工作经历",
"internship": "实习经历", "project": "项目经历", "competition": "竞赛经历",
}
class ResumeDiagnoseService:
def __init__(self, session: AsyncSession):
self.session = session
async def load_resume_data(self, resume_id: int, user_id: int) -> tuple[UserResume, list[dict]]:
"""加载简历主表 + 5 张子表数据,组装 AI 任务列表"""
result = await self.session.execute(
select(UserResume).where(UserResume.id == resume_id, UserResume.user_id == user_id))
resume = result.scalar_one_or_none()
if resume is None:
raise ValueError("简历不存在")
target_position = resume.target_position or ""
tasks: list[dict] = []
# summary
if resume.summary and resume.summary.strip():
tasks.append({
"module_type": "个人概述", "target_position": target_position or "未指定",
"context": f"姓名: {resume.name or '未填写'}",
"description_text": resume.summary,
"_module_type_key": "summary", "_module_record_id": resume_id,
})
# 子表
await self._collect_tasks(tasks, target_position, "education", UserResumeEducation, resume_id,
lambda r: f"学校: {r.school or ''}, 专业: {r.major or ''}, 学历: {r.degree or ''}")
await self._collect_tasks(tasks, target_position, "work", UserResumeWork, resume_id,
lambda r: f"公司: {r.company_name or ''}, 职位: {r.position or ''}")
await self._collect_tasks(tasks, target_position, "internship", UserResumeInternship, resume_id,
lambda r: f"公司: {r.company_name or ''}, 职位: {r.position or ''}")
await self._collect_tasks(tasks, target_position, "project", UserResumeProject, resume_id,
lambda r: f"公司: {r.company_name or ''}, 项目: {r.project_name or ''}, 角色: {r.role or ''}")
await self._collect_tasks(tasks, target_position, "competition", UserResumeCompetition, resume_id,
lambda r: f"竞赛: {r.competition_name or ''}, 获奖: {r.award or ''}")
return resume, tasks
async def _collect_tasks(self, tasks: list[dict], target_position: str,
module_type: str, model_cls, resume_id: int, context_fn):
"""查询子表记录,将有 description 的记录加入 tasks"""
result = await self.session.execute(select(model_cls).where(model_cls.resume_id == resume_id))
for record in result.scalars().all():
desc_text = _build_description_text(record.description)
if not desc_text:
continue
tasks.append({
"module_type": _MODULE_LABELS[module_type],
"target_position": target_position or "未指定",
"context": context_fn(record),
"description_text": desc_text,
"_module_type_key": module_type, "_module_record_id": record.id,
"_original_description": record.description, # 原始 [{id,text}],用于映射 optimized_content
})
async def save_report(self, resume_id: int, user_id: int, grade: str, summary: str,
urgent_total: int, important_total: int, expression_total: int,
tasks: list[dict], ai_results: list[dict]) -> int:
"""纯写入:接收已算好的 grade、summary、统计数据,写入 report + issues"""
report_id = next_id()
self.session.add(ResumeDiagnosisReport(
id=report_id, resume_id=resume_id, user_id=user_id,
grade=grade, summary=summary,
urgent_total=urgent_total, important_total=important_total, expression_total=expression_total,
))
for task, ai_result in zip(tasks, ai_results):
if not _has_issues(ai_result):
continue
self.session.add(ResumeDiagnosisIssue(
id=next_id(), report_id=report_id, resume_id=resume_id, user_id=user_id,
module_type=task["_module_type_key"], module_record_id=task["_module_record_id"],
finding=ai_result.get("finding", ""), importance=ai_result.get("importance", ""),
suggestion=ai_result.get("suggestion", ""),
urgent_issues=ai_result.get("urgent_issues"), important_issues=ai_result.get("important_issues"),
expression_issues=ai_result.get("expression_issues"),
optimized_content=_build_optimized_content(task, ai_result.get("optimized_content")),
status=0, user_feedback=0,
))
await self.session.flush()
log.info(f"诊断报告保存完成 reportId:{report_id} grade:{grade}")
return report_id
async def get_latest_report(self, resume_id: int, user_id: int) -> dict | None:
"""查询最近一次诊断报告 + 所有 issues"""
result = await self.session.execute(
select(ResumeDiagnosisReport).where(
ResumeDiagnosisReport.resume_id == resume_id, ResumeDiagnosisReport.user_id == user_id,
).order_by(desc(ResumeDiagnosisReport.create_time)).limit(1))
report = result.scalar_one_or_none()
if report is None:
return None
result = await self.session.execute(
select(ResumeDiagnosisIssue).where(ResumeDiagnosisIssue.report_id == report.id))
issues = result.scalars().all()
return {
"report": {
"id": str(report.id), "resumeId": str(report.resume_id),
"grade": report.grade, "summary": report.summary,
"urgentTotal": report.urgent_total, "importantTotal": report.important_total,
"expressionTotal": report.expression_total,
"createTime": report.create_time.strftime("%Y-%m-%d %H:%M:%S") if report.create_time else None,
},
"issues": [_issue_to_dict(i) for i in issues],
}
async def resolve_issue(self, issue_id: int, user_id: int, user_feedback: int) -> None:
"""标记问题已处理 + 用户评价"""
result = await self.session.execute(
select(ResumeDiagnosisIssue).where(
ResumeDiagnosisIssue.id == issue_id, ResumeDiagnosisIssue.user_id == user_id))
issue = result.scalar_one_or_none()
if issue is None:
raise ValueError("诊断问题不存在")
issue.status = 1
issue.user_feedback = user_feedback
await self.session.flush()
# ===== 工具函数 =====
def _build_optimized_content(task: dict, ai_texts: list[str] | None):
"""将 AI 返回的纯文本数组映射回存储格式
- summary 模块:取第一个元素作为纯文本字符串
- 子表模块:用原始 description 的 id + AI 改写的 text 组合成 [{id, text}]
"""
if not ai_texts or not isinstance(ai_texts, list):
return None
original = task.get("_original_description")
if original is None:
# summary 模块,存纯文本
return ai_texts[0] if ai_texts else None
# 子表模块,映射回 [{id, text}]
result = []
for i, item in enumerate(original):
if not isinstance(item, dict):
continue
text = ai_texts[i] if i < len(ai_texts) else item.get("text", "")
result.append({"id": item.get("id"), "text": text})
return result
def _build_description_text(description: list[dict] | None) -> str:
"""子表 description [{id, text}] → JSON 字符串传给 AI(保留 id 以便 AI 返回同格式)"""
if not description:
return ""
valid = [item for item in description if isinstance(item, dict) and item.get("text")]
if not valid:
return ""
return json.dumps(valid, ensure_ascii=False)
def aggregate_results(tasks: list[dict], ai_results: list[dict]) -> dict:
"""统计汇总 + 评级,返回 {grade, urgent_total, important_total, expression_total, has_weak_relevance, all_findings}"""
urgent_total = 0
important_total = 0
expression_total = 0
has_weak_relevance = False
all_findings: list[str] = []
for task, ai_result in zip(tasks, ai_results):
urgent = ai_result.get("urgent_issues", {})
important = ai_result.get("important_issues", {})
expression = ai_result.get("expression_issues", {})
urgent_total += sum(v for v in urgent.values() if isinstance(v, int))
important_total += sum(v for v in important.values() if isinstance(v, int))
expression_total += sum(v for v in expression.values() if isinstance(v, int))
if important.get("weak_relevance", 0) > 0:
has_weak_relevance = True
finding = ai_result.get("finding", "")
if finding and _has_issues(ai_result):
label = _MODULE_LABELS.get(task["_module_type_key"], task["_module_type_key"])
all_findings.append(f"{label}{finding}")
grade = _calc_grade(urgent_total, important_total, expression_total, has_weak_relevance)
return {
"grade": grade, "urgent_total": urgent_total,
"important_total": important_total, "expression_total": expression_total,
"all_findings": "\n".join(all_findings),
}
def _calc_grade(urgent: int, important: int, expression: int, has_weak_relevance: bool) -> str:
"""评级硬算:D → C → B → A
Aurgent=0, important<=1, expression<=1
Burgent=0, important<=3, expression<=2(且不满足A
Curgent=1, 或 important 3-4
Durgent>=2, 或 (important>=4 且 has_weak_relevance)
"""
if urgent >= 2 or (important >= 4 and has_weak_relevance):
return "D"
if urgent == 1 or 3 <= important <= 4:
return "C"
if urgent == 0 and important <= 1 and expression <= 1:
return "A"
if urgent == 0 and important <= 3 and expression <= 2:
return "B"
return "C"
def _has_issues(ai_result: dict) -> bool:
"""判断诊断结果是否存在问题(所有计数都为 0 则无问题)"""
for key in ("urgent_issues", "important_issues", "expression_issues"):
counts = ai_result.get(key, {})
if any(v > 0 for v in counts.values() if isinstance(v, int)):
return True
return False
def _issue_to_dict(issue: ResumeDiagnosisIssue) -> dict:
"""ORM → API 响应字典"""
return {
"id": str(issue.id), "moduleType": issue.module_type,
"moduleRecordId": str(issue.module_record_id),
"finding": issue.finding, "importance": issue.importance, "suggestion": issue.suggestion,
"urgentIssues": issue.urgent_issues, "importantIssues": issue.important_issues,
"expressionIssues": issue.expression_issues, "optimizedContent": issue.optimized_content,
"status": issue.status, "userFeedback": issue.user_feedback,
}