优化 岗位编辑agent 性能

This commit is contained in:
zk
2026-04-14 12:12:46 +08:00
parent 9bc8bb492c
commit ebffbc8615
4 changed files with 326 additions and 120 deletions
+137 -43
View File
@@ -16,7 +16,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.ai.skill_gap_analyzer.analyzer import (
analyze_skill_gap, optimize_summary, optimize_module,
plan_edit, execute_module_edit,
plan_edit, execute_record_edit, execute_record_add,
)
from app.ai.skill_gap_analyzer.prompts import MODULE_SCHEMAS
from app.core.logger import log
@@ -40,6 +40,16 @@ CUSTOMIZE_RESUME_ROLLBACK_EXPIRE = 30 * 60 # 30分钟
_CHARS = string.ascii_letters + string.digits
# 模块名 → 中文标签映射
_MODULE_LABELS = {
"resume": "个人简介",
"education": "教育经历",
"work": "工作经历",
"internship": "实习经历",
"project": "项目经历",
"competition": "竞赛经历",
}
def _rand_id() -> str:
"""生成随机8位字符串标识"""
@@ -201,7 +211,7 @@ class SkillGapService:
async def ai_edit_customize_resume(self, user_id: int, job_id: int,
instruction: str, chat_history: list) -> dict:
"""AI 对话式编辑定制简历"""
"""AI 对话式编辑定制简历(原子化操作版)"""
# 1. 取当前定制简历
key = f"{CUSTOMIZE_RESUME_KEY_PREFIX}{user_id}"
raw = await RedisManager.client.get(key)
@@ -211,69 +221,153 @@ class SkillGapService:
resume_json = cr.model_dump_json(by_alias=True)
# 2. 查岗位
job = await self._get_job(job_id)
# 3. 规划 AI
job_desc = f"{job.description or ''}\n{job.requirement or ''}"
# 3. 规划 AI(意图识别 + 操作原子化)
history_str = json.dumps(chat_history, ensure_ascii=False) if chat_history else ""
plan = await plan_edit(job.title or "", resume_json, history_str, instruction)
plan = await plan_edit(job.title or "", job_desc, resume_json, history_str, instruction)
if not plan:
return {"type": "message", "message": "抱歉,我没有理解你的意思,请再描述一下。"}
if plan.get("action") == "chat":
return {"type": "message", "message": plan.get("message", "请再描述一下你的需求。")}
# 4. 按模块并发执行修改
modules = plan.get("modules", [])
if not modules:
# 4. 解析操作列表
operations = plan.get("operations", [])
if not operations:
return {"type": "message", "message": plan.get("message", "请再描述一下你的需求。")}
edit_tasks = []
for m in modules:
mod_name = m.get("module", "")
mod_instr = m.get("instruction", "")
# 截取最近10条对话历史
recent_history = chat_history[-10:] if len(chat_history) > 10 else chat_history
recent_history_str = json.dumps(recent_history, ensure_ascii=False) if recent_history else ""
# 5. 按操作类型分发执行
# 先处理 delete(零 AI 开销)
for op in operations:
if op.get("type") == "delete":
self._apply_delete(cr, op.get("module", ""), op.get("id", ""))
# 并发执行 update 和 add
ai_tasks = []
for op in operations:
op_type = op.get("type", "")
mod_name = op.get("module", "")
op_instruction = op.get("instruction", "")
schema = MODULE_SCHEMAS.get(mod_name, "")
mod_data = self._get_module_data(cr, mod_name)
edit_tasks.append((mod_name, execute_module_edit(job.title or "", mod_instr, schema, mod_data)))
keys = [t[0] for t in edit_tasks]
results = await asyncio.gather(*[t[1] for t in edit_tasks], return_exceptions=True)
# 5. 合并结果
for mod_key, result in zip(keys, results):
if isinstance(result, Exception):
log.warning(f"AI编辑模块[{mod_key}]失败: {result}")
continue
if result is None:
continue
self._apply_edit_result(cr, mod_key, result)
if op_type == "update":
record_data = self._get_record_data(cr, mod_name, op.get("id"))
if record_data is not None:
ai_tasks.append((
"update", mod_name, op.get("id"),
execute_record_edit(
job.title or "", job_desc, op_instruction,
recent_history_str, schema, record_data,
),
))
elif op_type == "add":
ai_tasks.append((
"add", mod_name, None,
execute_record_add(
job.title or "", job_desc, op_instruction,
recent_history_str, schema,
),
))
# 并发执行
if ai_tasks:
coros = [t[3] for t in ai_tasks]
results = await asyncio.gather(*coros, return_exceptions=True)
for (op_type, mod_name, record_id, _), result in zip(ai_tasks, results):
if isinstance(result, Exception):
log.warning(f"AI编辑[{op_type}/{mod_name}/{record_id}]失败: {result}")
continue
if result is None:
continue
if op_type == "update":
self._apply_record_update(cr, mod_name, record_id, result)
elif op_type == "add":
self._apply_record_add(cr, mod_name, result)
# 6. 保存回滚 + 新版本
rollback_key = f"{CUSTOMIZE_RESUME_ROLLBACK_KEY_PREFIX}{user_id}"
await RedisManager.client.set(rollback_key, raw, ex=CUSTOMIZE_RESUME_ROLLBACK_EXPIRE)
await self._save_customize_resume(user_id, cr)
label = plan.get("updatedModulesLabel", "简历内容")
return {"type": "updated", "message": f"完成!已更新:{label}"}
# 拼接更新模块标签
updated_modules = list(dict.fromkeys(op.get("module", "") for op in operations))
label = "".join(_MODULE_LABELS.get(m, m) for m in updated_modules if m)
return {"type": "updated", "message": f"完成!已更新:{label or '简历内容'}"}
@staticmethod
def _get_module_data(cr: CustomizeResume, mod_name: str) -> str:
"""获取指定模块的 JSON 数据"""
def _get_record_data(cr: CustomizeResume, mod_name: str, record_id: str | None) -> str | None:
"""获取单条记录的 JSON 数据resume 主表返回整个对象"""
if mod_name == "resume":
return cr.resume.model_dump_json(by_alias=True)
mapping = {"education": cr.education, "work": cr.work, "internship": cr.internship,
"project": cr.project, "competition": cr.competition}
mapping = {
"education": cr.education, "work": cr.work, "internship": cr.internship,
"project": cr.project, "competition": cr.competition,
}
items = mapping.get(mod_name, [])
return json.dumps([item.model_dump(by_alias=True) for item in items], ensure_ascii=False)
if not record_id:
return None
for item in items:
if item.id == record_id:
return item.model_dump_json(by_alias=True)
log.warning(f"未找到记录[{mod_name}/{record_id}]")
return None
@staticmethod
def _apply_edit_result(cr: CustomizeResume, mod_name: str, result) -> None:
"""将 AI 编辑结果应用到定制简历"""
def _apply_delete(cr: CustomizeResume, mod_name: str, record_id: str) -> None:
"""删除指定模块中的一条记录"""
if not record_id or mod_name == "resume":
return
mapping = {
"education": cr.education, "work": cr.work, "internship": cr.internship,
"project": cr.project, "competition": cr.competition,
}
items = mapping.get(mod_name)
if items is not None:
for i, item in enumerate(items):
if item.id == record_id:
items.pop(i)
break
@staticmethod
def _apply_record_update(cr: CustomizeResume, mod_name: str, record_id: str | None, result) -> None:
"""将 AI 修改结果替换回对应记录"""
try:
if mod_name == "resume" and isinstance(result, dict):
cr.resume = ResumeProfile.model_validate(result)
elif mod_name == "education" and isinstance(result, list):
cr.education = [Education.model_validate(item) for item in result]
elif mod_name == "work" and isinstance(result, list):
cr.work = [Work.model_validate(item) for item in result]
elif mod_name == "internship" and isinstance(result, list):
cr.internship = [Internship.model_validate(item) for item in result]
elif mod_name == "project" and isinstance(result, list):
cr.project = [Project.model_validate(item) for item in result]
elif mod_name == "competition" and isinstance(result, list):
cr.competition = [Competition.model_validate(item) for item in result]
return
model_map = {
"education": Education, "work": Work, "internship": Internship,
"project": Project, "competition": Competition,
}
model_cls = model_map.get(mod_name)
if not model_cls or not isinstance(result, dict) or not record_id:
return
list_map = {
"education": cr.education, "work": cr.work, "internship": cr.internship,
"project": cr.project, "competition": cr.competition,
}
items = list_map.get(mod_name, [])
new_item = model_cls.model_validate(result)
for i, item in enumerate(items):
if item.id == record_id:
items[i] = new_item
break
except Exception as e:
log.warning(f"应用AI编辑结果[{mod_name}]失败: {e}")
log.warning(f"应用AI编辑结果[{mod_name}/{record_id}]失败: {e}")
@staticmethod
def _apply_record_add(cr: CustomizeResume, mod_name: str, result) -> None:
"""将 AI 新增的记录追加到对应模块"""
try:
model_map = {
"education": (Education, cr.education),
"work": (Work, cr.work),
"internship": (Internship, cr.internship),
"project": (Project, cr.project),
"competition": (Competition, cr.competition),
}
entry = model_map.get(mod_name)
if not entry or not isinstance(result, dict):
return
model_cls, items = entry
items.append(model_cls.model_validate(result))
except Exception as e:
log.warning(f"应用AI新增记录[{mod_name}]失败: {e}")
# ===== 内部工具方法 =====