Files
offerpie_job_cleaner/app/services/job_clean_service.py
T
2026-06-02 19:27:00 +08:00

329 lines
12 KiB
Python

"""岗位清洗服务(协程版)"""
import asyncio
import json
from datetime import datetime
from snowflake import SnowflakeGenerator
from sqlalchemy import text, insert
from app.config import settings
from app.core.database import PgSession, MysqlSession
from app.core.logger import log
from app.ai.model_config import JobCleanModel
from app.ai.prompts import JOB_STRUCTURE_SYSTEM, MAJOR_MATCH_SYSTEM, SKILL_EXTRACT_SYSTEM
from app.models.mysql.job import Job
from app.models.mysql.company import Company
from app.models.mysql.relations import JobRegionRelation, JobSkillTagRelation
from app.services.ai_tool import ai_chat_json
from app.services.dict_cache_service import dict_cache
# 雪花ID生成器
_id_gen = SnowflakeGenerator(instance=1)
# 公司创建锁(防止并发重复插入同一公司)
_company_lock = asyncio.Lock()
async def run_job_clean() -> None:
"""一次批量清洗任务"""
# 1. 从 PG 锁定一批待清洗数据
async with PgSession() as pg:
result = await pg.execute(
text("""
SELECT * FROM app_job_data
WHERE clean_status = 'pending'
LIMIT :limit
FOR UPDATE SKIP LOCKED
"""),
{"limit": settings.clean_batch_size},
)
rows = result.mappings().all()
if not rows:
return
ids = [r["id"] for r in rows]
await pg.execute(
text("""
UPDATE app_job_data
SET clean_status = 'cleaning', clean_started_at = NOW()
WHERE id = ANY(:ids)
"""),
{"ids": ids},
)
await pg.commit()
log.info("岗位清洗:锁定{}条数据", len(rows))
# 2. 协程并发清洗,信号量限流
sem = asyncio.Semaphore(settings.clean_concurrency)
tasks = [_clean_one(sem, dict(r)) for r in rows]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 汇总
errors = sum(1 for r in results if isinstance(r, Exception))
log.info("岗位清洗:本批完成,共{}条,异常{}", len(rows), errors)
async def _clean_one(sem: asyncio.Semaphore, data: dict) -> None:
"""单条岗位清洗"""
async with sem:
try:
await _do_clean(data)
except Exception as e:
log.error("岗位清洗异常, id={}: {}", data["id"], e)
# 保持 cleaning 状态,由僵尸恢复任务重置
async def _do_clean(data: dict) -> None:
"""清洗逻辑"""
data_id = data["id"]
# 前置校验
description = data.get("description") or ""
if len(description) < 20:
log.info("[id={}] 丢弃:描述过短({}字符)", data_id, len(description))
await _update_pg_status(data_id, "discarded")
return
# 哈希去重续命:content_hash 已存在于 bg_job → 说明是重复刷新,直接续命
content_hash = data.get("content_hash")
if content_hash:
async with MysqlSession() as mysql:
row = await mysql.execute(
text("SELECT id FROM bg_job WHERE content_hash = :hash LIMIT 1"),
{"hash": content_hash},
)
existing_id = row.scalar()
if existing_id:
now = datetime.now()
await mysql.execute(
text("UPDATE bg_job SET status = 0, source_url = :url, create_time = :now, update_time = :now WHERE id = :id"),
{"url": data.get("detail_url"), "now": now, "id": existing_id},
)
await mysql.commit()
await _update_pg_status(data_id, "cleaned")
log.info("[id={}] 续命:hash命中,岗位ID={}", data_id, existing_id)
return
# 第一次AI:结构化提取
user_message = _build_user_message(data)
result = await ai_chat_json(JobCleanModel.STRUCTURE, JOB_STRUCTURE_SYSTEM, user_message)
if result is None or not result.get("valid", False):
log.info("[id={}] 丢弃:AI判定无效", data_id)
await _update_pg_status(data_id, "discarded")
return
# 公司处理
company_short_name = result.get("companyShortName") or data.get("company") or ""
company_id = await _find_or_create_company(company_short_name)
# 地区处理
region_codes = []
for city in result.get("cities") or []:
code = dict_cache.match_region_code(city)
if code:
region_codes.append(code)
# 写入 bg_job
job_id = next(_id_gen)
now = datetime.now()
# expire_at 校验:超出合理范围的设为 NULL
expire_at = data.get("expire_at")
if expire_at and isinstance(expire_at, datetime) and expire_at.year > 2035:
expire_at = None
async with MysqlSession() as mysql:
await mysql.execute(
insert(Job).values(
id=job_id,
title=result.get("title", ""),
company_id=company_id,
category_id=result.get("categoryId", 0),
employment_type=result.get("employmentType", 0),
description=result.get("description", ""),
requirement=result.get("requirement", ""),
bonus=result.get("bonus"),
tags=result.get("tags"),
skill_tags=result.get("skillTags"),
salary=result.get("salary"),
education=result.get("education", 0),
min_experience=result.get("minExperience", 0),
required_industry_id=result.get("requiredIndustryId"),
recruit_category=data.get("recruit_category", 3),
expire_at=expire_at,
content_hash=data.get("content_hash"),
source_url=data.get("detail_url"),
source_id=str(data_id),
status=0,
create_time=now,
update_time=now,
)
)
# 写入地区关联
if region_codes:
await mysql.execute(
insert(JobRegionRelation),
[{"id": next(_id_gen), "job_id": job_id, "region_code": code, "create_time": now} for code in region_codes],
)
await mysql.commit()
# 更新 PG 状态
await _update_pg_status(data_id, "cleaned")
log.info("[id={}] 入库成功:{} | 公司={} | 地区={}", data_id, result.get("title"), company_short_name, region_codes)
# 第二次AI:专业匹配(失败不影响)
try:
await _match_major(job_id, result)
log.debug("[id={}] 专业匹配完成", data_id)
except Exception as e:
log.warning("[id={}] 专业匹配失败: {}", data_id, e)
# 第三次AI:技能提取(失败不影响)
try:
await _extract_skill_tags(job_id, result)
log.debug("[id={}] 技能提取完成", data_id)
except Exception as e:
log.warning("[id={}] 技能提取失败: {}", data_id, e)
async def _match_major(job_id: int, result: dict) -> None:
"""第二次AI:专业匹配"""
title = result.get("title", "")
desc = result.get("description", "")
req = result.get("requirement", "")
user_msg = f"【岗位信息】\n标题: {title}\n职责: {desc}\n要求: {req}\n\n【专业分类列表】\n{dict_cache.major_category_text}"
data = await ai_chat_json(JobCleanModel.MAJOR_MATCH, MAJOR_MATCH_SYSTEM, user_msg)
if data is None:
return
major_ids = [mid for mid in (data.get("requiredMajorIds") or []) if mid > 0]
sensitivity = data.get("majorSensitivity", 0)
async with MysqlSession() as mysql:
await mysql.execute(
text("""
UPDATE bg_job SET required_major_ids = :ids, major_sensitivity = :s, update_time = :t
WHERE id = :jid
"""),
{"ids": json.dumps(major_ids) if major_ids else None, "s": sensitivity, "t": datetime.now(), "jid": job_id},
)
await mysql.commit()
async def _extract_skill_tags(job_id: int, result: dict) -> None:
"""第三次AI:技能提取"""
title = result.get("title", "")
desc = result.get("description", "")
req = result.get("requirement", "")
user_msg = f"【岗位信息】\n标题: {title}\n职责: {desc}\n要求: {req}"
skills = await ai_chat_json(JobCleanModel.SKILL_EXTRACT, SKILL_EXTRACT_SYSTEM, user_msg)
if not skills or not isinstance(skills, list):
return
now = datetime.now()
tag_ids = []
for name in skills:
name = str(name).strip().lower()
if not name or len(name) > 50:
continue
# 每个 tag 单独 session,避免死锁
real_id = await _find_or_create_skill_tag(name)
if real_id and real_id not in tag_ids:
tag_ids.append(real_id)
if tag_ids:
async with MysqlSession() as mysql:
await mysql.execute(
insert(JobSkillTagRelation),
[{"id": next(_id_gen), "job_id": job_id, "skill_tag_id": tid, "create_time": now} for tid in tag_ids],
)
await mysql.commit()
async def _find_or_create_skill_tag(name: str) -> int | None:
"""查找或创建技能标签(单独事务,避免死锁)"""
async with MysqlSession() as mysql:
tag_id = next(_id_gen)
await mysql.execute(
text("INSERT IGNORE INTO bg_skill_tag (id, name) VALUES (:id, :name)"),
{"id": tag_id, "name": name},
)
await mysql.commit()
async with MysqlSession() as mysql:
row = await mysql.execute(
text("SELECT id FROM bg_skill_tag WHERE name = :name LIMIT 1"),
{"name": name},
)
return row.scalar()
async def _find_or_create_company(short_name: str) -> int:
"""查找或创建公司(加锁防并发重复)"""
async with _company_lock:
async with MysqlSession() as mysql:
row = await mysql.execute(
text("SELECT id FROM bg_company WHERE short_name = :name LIMIT 1"),
{"name": short_name},
)
existing = row.scalar()
if existing:
return existing
company_id = next(_id_gen)
now = datetime.now()
await mysql.execute(
insert(Company).values(
id=company_id,
name=short_name,
short_name=short_name,
status=0,
create_time=now,
update_time=now,
)
)
await mysql.commit()
return company_id
async def _update_pg_status(data_id: int, status: str) -> None:
"""更新 PG 清洗状态"""
async with PgSession() as pg:
if status == "cleaned":
await pg.execute(
text("UPDATE app_job_data SET clean_status = :s, cleaned_at = NOW() WHERE id = :id"),
{"s": status, "id": data_id},
)
else:
await pg.execute(
text("UPDATE app_job_data SET clean_status = :s WHERE id = :id"),
{"s": status, "id": data_id},
)
await pg.commit()
def _build_user_message(data: dict) -> str:
"""构建第一次AI的用户消息"""
parts = [
"【原始数据】",
f"岗位名称: {data.get('job_title') or ''}",
f"薪资: {data.get('salary') or ''}",
f"工作地点: {data.get('location') or ''}",
f"公司: {data.get('company') or ''}",
f"经验要求: {data.get('experience') or ''}",
f"学历要求: {data.get('education') or ''}",
f"岗位详情: {data.get('description') or ''}",
"",
f"【岗位分类列表】\n{dict_cache.job_category_text}",
"",
f"【行业列表】\n{dict_cache.industry_text}",
]
return "\n".join(parts)