From 6967e4ba54783e8f67ba9223f2b9dde77d14d43b Mon Sep 17 00:00:00 2001 From: zk Date: Tue, 2 Jun 2026 18:38:36 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A1=A5=E5=85=85=E6=96=87=E6=A1=A3=EF=BC=8C?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=B8=85=E6=B4=97=E6=96=B9=E6=A1=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .kiro/steering/清洗方案.md | 273 ++++++++++++++++++++++++++++++ app/models/mysql/company.py | 38 ++--- app/models/mysql/job.py | 43 ++--- app/models/mysql/relations.py | 16 +- app/models/mysql/skill_tag.py | 4 +- app/models/pg/app_job_data.py | 36 ++-- app/services/job_clean_service.py | 33 ++-- 7 files changed, 363 insertions(+), 80 deletions(-) create mode 100644 .kiro/steering/清洗方案.md diff --git a/.kiro/steering/清洗方案.md b/.kiro/steering/清洗方案.md new file mode 100644 index 0000000..68063fe --- /dev/null +++ b/.kiro/steering/清洗方案.md @@ -0,0 +1,273 @@ +--- +inclusion: manual +--- + +# 数据清洗方案 + +## 总体架构 + +``` +爬虫(本地网络) → PostgreSQL(app_job_data) + ↓ + offerpie_job_cleaner(本地Python服务) + ↓ + asyncio协程 + Semaphore信号量并发 + ↓ + 哈希续命 或 三次AI调用 + ↓ + MySQL(bg_job + bg_company + 关联表 + bg_skill_tag) + ↓ + 公司信息不完整的 → 调AI补充 +``` + +本服务部署在本地非公网环境,纯后台定时任务,无HTTP接口。通过APScheduler调度,asyncio协程提供并发能力。 + +--- + +## 一、岗位清洗 + +### 1.1 触发方式 + +- 定时:每3分钟触发一次(`clean_interval_seconds` 可配) +- 启动时立即执行第一次 + +### 1.2 状态管理 + +`app_job_data.clean_status`(PostgreSQL): +- `pending` = 待清洗:新爬到的数据,默认值 +- `cleaning` = 清洗中:已锁定,正在处理 +- `cleaned` = 已清洗:处理完成(入库或续命) +- `discarded` = 已丢弃:AI判定无效或描述过短 + +### 1.3 批量锁定 + +```sql +SELECT * FROM app_job_data +WHERE clean_status = 'pending' +LIMIT {batch_size} +FOR UPDATE SKIP LOCKED +``` + +锁定后立即更新: +```sql +UPDATE app_job_data +SET clean_status = 'cleaning', clean_started_at = NOW() +WHERE id = ANY(ids) +``` + +使用 `FOR UPDATE SKIP LOCKED` 而非 `FOR UPDATE`,避免多实例部署时阻塞等待。 + +### 1.4 并发模型 + +- 一批 `batch_size`(默认100)条数据,全部启动协程 +- `asyncio.Semaphore(concurrency)`(默认50)控制同时调AI的并发数 +- 做完一条立刻让位给下一条,不存在批次内空等 + +### 1.5 单条清洗流程 + +``` +1. 前置校验 + └─ description 为空或长度 < 20 → discarded + +2. 哈希去重续命(新增) + ├─ 用 content_hash 查 bg_job + ├─ 命中 → UPDATE bg_job SET status=0, create_time=NOW(), update_time=NOW() + ├─ 更新 PG: cleaned + └─ return(不调AI,秒完成) + +3. 第一次AI:结构化提取 + ├─ 输入:原始字段 + 岗位分类列表 + 行业列表 + ├─ 输出JSON:valid/title/salary/education/categoryId/cities/companyShortName/... + └─ valid=false → discarded + +4. 公司处理 + ├─ 按 short_name 查 bg_company + ├─ 存在 → 拿 company_id + └─ 不存在 → 创建一条 status=0 的记录(asyncio.Lock 防并发重复) + +6. 地区匹配 + └─ AI返回的 cities → 字典缓存模糊匹配 → region_codes + +7. 写入 MySQL + ├─ INSERT bg_job(带 content_hash、recruit_category、expire_at) + └─ INSERT bg_job_region_relation + +8. 更新 PG: cleaned + +9. 第二次AI:专业匹配(失败不影响岗位入库) + ├─ 输入:title + description + requirement + 专业列表(845条) + └─ 输出:requiredMajorIds + majorSensitivity → UPDATE bg_job + +10. 第三次AI:技能提取(失败不影响岗位入库) + ├─ 输入:title + description + requirement + └─ 输出:技能名数组 → INSERT IGNORE bg_skill_tag → INSERT bg_job_skill_tag_relation +``` + +### 1.6 哈希续命机制 + +**背景**:爬虫每隔7天全量刷新,大部分岗位内容未变。 + +**逻辑**: +- `app_job_data.content_hash`:爬虫入库时对岗位内容生成的哈希值 +- `bg_job.content_hash`:清洗入库时透传过来 +- 清洗时优先用 hash 查 bg_job,命中说明岗位还活着 +- 续命动作:`status=0`(重新上架)+ 刷新 `create_time`/`update_time` +- 不调AI,秒级完成,大幅节省时间和API费用 + +### 1.7 僵尸恢复(每30分钟) + +```sql +UPDATE app_job_data +SET clean_status = 'pending', clean_started_at = NULL +WHERE clean_status = 'cleaning' + AND clean_started_at < NOW() - INTERVAL '10 minutes' +``` + +--- + +## 二、公司数据补充 + +### 2.1 触发方式 + +- 定时:每5分钟触发一次(`company_interval_seconds` 可配) +- 启动时立即执行第一次 + +### 2.2 状态管理 + +`bg_company.status`(MySQL): +- 0 = 待完善:岗位清洗时创建,只有 short_name +- 1 = 已完善:AI补充完成 +- 2 = 禁用 +- 3 = 补充中:已锁定,正在处理 +- 4 = 补充失败:AI不认识该公司 + +### 2.3 流程 + +``` +1. 锁定一批 status=0 的公司(batch_size=20,FOR UPDATE SKIP LOCKED) +2. 批量更新 status=3 +3. 协程并发(Semaphore=10) +4. 每条: + ├─ AI调用:short_name + 行业列表 → 公司详情JSON + ├─ valid=false → status=4 + └─ valid=true → 回填全部字段 → status=1 +``` + +### 2.4 AI返回字段 + +```json +{ + "valid": true, + "name": "公司全称", + "city": "总部城市", + "companyType": "企业类型", + "industryId": 行业ID, + "tags": ["标签"], + "summary": "一句话简介", + "description": "详细描述", + "foundedYear": "成立年份", + "address": "地址", + "scale": "规模", + "website": "官网", + "financingStage": "融资状态", + "latestValuation": "估值", + "news": ["新闻"] +} +``` + +### 2.5 僵尸恢复(每小时) + +```sql +UPDATE bg_company +SET status = 0, update_time = NOW() +WHERE status = 3 + AND update_time < NOW() - INTERVAL 10 MINUTE +``` + +--- + +## 三、岗位下架 + +### 3.1 触发方式 + +- 每天凌晨2点执行 + +### 3.2 逻辑 + +```sql +UPDATE bg_job +SET status = 2, update_time = NOW() +WHERE status = 0 + AND create_time < DATE_SUB(NOW(), INTERVAL {job_expire_days} DAY) +``` + +- `job_expire_days` 可配,默认7天 +- 配合续命机制:活跃岗位每次刷新会被续命(create_time 刷新),所以不会被误下架 +- 真正过期的岗位(7天内未再被爬到)才会下架 + +--- + +## 四、字典数据缓存 + +### 4.1 加载时机 + +- 服务启动时从 MySQL 全量加载到内存 + +### 4.2 缓存内容 + +| 数据 | 来源表 | 格式 | 用途 | +|------|--------|------|------| +| 岗位分类 | bg_job_category(level=3) | `id:name(一级/二级)` | 第一次AI prompt | +| 行业 | bg_industry(level=2) | `id:name(一级)` | 第一次AI prompt + 公司补充 | +| 专业分类 | bg_major_category(level=3) | `id:name(一级/二级)` | 第二次AI prompt | +| 地区 | bg_china_regions_code(市级) | code + name | 城市名匹配地区编码 | + +--- + +## 五、关键设计决策 + +| 决策点 | 结论 | 原因 | +|--------|------|------| +| 为什么从 Java 迁到 Python | asyncio 协程在 IO 密集场景下内存开销远小于 Java 线程 | 10万+数据量,50并发协程 vs 50线程 | +| 并发控制 | Semaphore 信号量 | 做完一条立刻补一条,吞吐量最大化 | +| 哈希续命 | content_hash 匹配免AI | 7天全量刷新时大部分是重复数据,节省90%+ AI调用 | +| 行锁方式 | FOR UPDATE SKIP LOCKED | 不阻塞,适合未来多实例部署 | +| 技能并发去重 | INSERT IGNORE + 唯一索引 | 不加应用层锁,数据库保证 | +| 公司创建并发 | asyncio.Lock | 防止同名公司被多个协程同时插入 | +| AI调用次数 | 三次(结构化 + 专业 + 技能) | 各维度独立,容错互不影响 | +| 公司数据来源 | AI补充 | 公司简称查工商API不精确,AI覆盖率高 | +| 事务跨库 | 不做分布式事务 | PG和MySQL独立操作,僵尸恢复兜底 | +| 岗位下架 | create_time + N天 | 配合续命机制,活跃岗位不会被误下架 | + +--- + +## 六、配置参数 + +| 参数 | 默认值 | 说明 | +|------|--------|------| +| `clean_batch_size` | 100 | 每批锁定数据量 | +| `clean_concurrency` | 50 | AI并发调用上限 | +| `clean_interval_seconds` | 180 | 清洗任务间隔(秒) | +| `company_batch_size` | 20 | 公司补充每批数量 | +| `company_concurrency` | 10 | 公司AI并发上限 | +| `company_interval_seconds` | 300 | 公司补充间隔(秒) | +| `job_expire_days` | 7 | 岗位过期天数 | + +所有参数通过 `.env` 文件配置,修改后重启生效。 + +--- + +## 七、涉及的表 + +| 表 | 库 | 读/写 | 说明 | +|----|-----|-------|------| +| `app_job_data` | PG | 读+写状态 | 爬虫原始数据,clean_status 流转 | +| `bg_job` | MySQL | 写+改 | 清洗后岗位(含 content_hash 用于续命) | +| `bg_company` | MySQL | 读+写+改 | 公司(查或创建 + AI补充) | +| `bg_job_region_relation` | MySQL | 写 | 岗位-地区关联 | +| `bg_skill_tag` | MySQL | 写(INSERT IGNORE) | 技能标签自动入库 | +| `bg_job_skill_tag_relation` | MySQL | 写 | 岗位-技能关联 | +| `bg_job_category` | MySQL | 读(缓存) | 岗位分类字典 | +| `bg_industry` | MySQL | 读(缓存) | 行业字典 | +| `bg_major_category` | MySQL | 读(缓存) | 专业字典 | +| `bg_china_regions_code` | MySQL | 读(缓存) | 地区字典 | diff --git a/app/models/mysql/company.py b/app/models/mysql/company.py index 4bc7d3a..1a147f0 100644 --- a/app/models/mysql/company.py +++ b/app/models/mysql/company.py @@ -14,23 +14,23 @@ class Company(MysqlBase): __tablename__ = "bg_company" - id: Mapped[int] = mapped_column(BigInteger, primary_key=True) - name: Mapped[Optional[str]] = mapped_column(String(255)) - short_name: Mapped[str] = mapped_column(String(128), nullable=False) - logo_url: Mapped[Optional[str]] = mapped_column(String(512)) - region_code: Mapped[Optional[str]] = mapped_column(String(20)) - company_type: Mapped[Optional[str]] = mapped_column(String(32)) - industry_id: Mapped[Optional[int]] = mapped_column(BigInteger) - tags: Mapped[Optional[list]] = mapped_column(JSON) - summary: Mapped[Optional[str]] = mapped_column(String(512)) - description: Mapped[Optional[str]] = mapped_column(Text) - founded_year: Mapped[Optional[str]] = mapped_column(String(10)) - address: Mapped[Optional[str]] = mapped_column(String(255)) - scale: Mapped[Optional[str]] = mapped_column(String(32)) - website: Mapped[Optional[str]] = mapped_column(String(255)) - financing_stage: Mapped[Optional[str]] = mapped_column(String(32)) - latest_valuation: Mapped[Optional[str]] = mapped_column(String(64)) - news: Mapped[Optional[list]] = mapped_column(JSON) + id: Mapped[int] = mapped_column(BigInteger, primary_key=True, comment="主键ID(雪花)") + name: Mapped[Optional[str]] = mapped_column(String(255), comment="公司全称") + short_name: Mapped[str] = mapped_column(String(128), nullable=False, comment="公司简称") + logo_url: Mapped[Optional[str]] = mapped_column(String(512), comment="Logo地址") + region_code: Mapped[Optional[str]] = mapped_column(String(20), comment="地区编码") + company_type: Mapped[Optional[str]] = mapped_column(String(32), comment="企业类型:上市企业/独角兽/国企等") + industry_id: Mapped[Optional[int]] = mapped_column(BigInteger, comment="行业ID") + tags: Mapped[Optional[list]] = mapped_column(JSON, comment="公司标签JSON数组") + summary: Mapped[Optional[str]] = mapped_column(String(512), comment="一句话简介") + description: Mapped[Optional[str]] = mapped_column(Text, comment="公司详细描述") + founded_year: Mapped[Optional[str]] = mapped_column(String(10), comment="成立年份") + address: Mapped[Optional[str]] = mapped_column(String(255), comment="总部地址") + scale: Mapped[Optional[str]] = mapped_column(String(32), comment="企业规模") + website: Mapped[Optional[str]] = mapped_column(String(255), comment="官网地址") + financing_stage: Mapped[Optional[str]] = mapped_column(String(32), comment="融资状态") + latest_valuation: Mapped[Optional[str]] = mapped_column(String(64), comment="最新估值") + news: Mapped[Optional[list]] = mapped_column(JSON, comment="相关新闻JSON数组") status: Mapped[int] = mapped_column(Integer, default=0, comment="0=待完善 1=已完善 2=禁用 3=补充中 4=补充失败") - create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) - update_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) + create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="创建时间") + update_time: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="更新时间") diff --git a/app/models/mysql/job.py b/app/models/mysql/job.py index a12682a..a2e2e49 100644 --- a/app/models/mysql/job.py +++ b/app/models/mysql/job.py @@ -14,26 +14,27 @@ class Job(MysqlBase): __tablename__ = "bg_job" - id: Mapped[int] = mapped_column(BigInteger, primary_key=True) - title: Mapped[str] = mapped_column(String(255), nullable=False) - company_id: Mapped[int] = mapped_column(BigInteger, nullable=False) - category_id: Mapped[int] = mapped_column(BigInteger, nullable=False) - employment_type: Mapped[int] = mapped_column(Integer, default=0) - description: Mapped[Optional[str]] = mapped_column(Text) - requirement: Mapped[Optional[str]] = mapped_column(Text) - bonus: Mapped[Optional[str]] = mapped_column(Text) - tags: Mapped[Optional[list]] = mapped_column(JSON) - skill_tags: Mapped[Optional[list]] = mapped_column(JSON) - salary: Mapped[Optional[str]] = mapped_column(String(64)) - education: Mapped[int] = mapped_column(Integer, default=0) - min_experience: Mapped[int] = mapped_column(Integer, default=0) - required_industry_id: Mapped[Optional[int]] = mapped_column(BigInteger) - required_major_ids: Mapped[Optional[list]] = mapped_column(JSON) - major_sensitivity: Mapped[Optional[int]] = mapped_column(Integer) - source_url: Mapped[Optional[str]] = mapped_column(String(1024)) - source_id: Mapped[Optional[str]] = mapped_column(String(64)) - recruit_category: Mapped[Optional[int]] = mapped_column(Integer, comment="招聘分类: 0=校招, 1=实习, 2=社招, 3=其他") + id: Mapped[int] = mapped_column(BigInteger, primary_key=True, comment="主键ID(雪花)") + title: Mapped[str] = mapped_column(String(255), nullable=False, comment="岗位名称") + company_id: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="关联公司ID") + category_id: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="岗位类型ID") + employment_type: Mapped[int] = mapped_column(Integer, default=0, comment="0=全职 1=兼职") + description: Mapped[Optional[str]] = mapped_column(Text, comment="岗位职责") + requirement: Mapped[Optional[str]] = mapped_column(Text, comment="任职要求") + bonus: Mapped[Optional[str]] = mapped_column(Text, comment="加分项") + tags: Mapped[Optional[list]] = mapped_column(JSON, comment="岗位标签JSON数组") + skill_tags: Mapped[Optional[list]] = mapped_column(JSON, comment="技能标签JSON数组") + salary: Mapped[Optional[str]] = mapped_column(String(64), comment="薪资描述,如15-25K") + education: Mapped[int] = mapped_column(Integer, default=0, comment="学历要求 0=不限 1=大专 2=本科 3=硕士 4=博士") + min_experience: Mapped[int] = mapped_column(Integer, default=0, comment="最低工作年限,0=不要求") + required_industry_id: Mapped[Optional[int]] = mapped_column(BigInteger, comment="要求的行业经验ID") + required_major_ids: Mapped[Optional[list]] = mapped_column(JSON, comment="要求专业ID数组") + major_sensitivity: Mapped[Optional[int]] = mapped_column(Integer, comment="专业敏感度 0=不限 1=优先 2=强制") + source_url: Mapped[Optional[str]] = mapped_column(String(1024), comment="来源链接") + source_id: Mapped[Optional[str]] = mapped_column(String(64), comment="爬虫原始数据ID,用于去重") + content_hash: Mapped[Optional[str]] = mapped_column(String(64), comment="内容哈希,用于去重续命") + recruit_category: Mapped[Optional[int]] = mapped_column(Integer, comment="招聘分类: 0=校招 1=实习 2=社招 3=其他") expire_at: Mapped[Optional[datetime]] = mapped_column(DateTime, comment="发布日期") status: Mapped[int] = mapped_column(Integer, default=0, comment="0=上架 1=下架 2=已失效") - create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) - update_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) + create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="创建时间") + update_time: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="更新时间") diff --git a/app/models/mysql/relations.py b/app/models/mysql/relations.py index ee70376..b35a2f1 100644 --- a/app/models/mysql/relations.py +++ b/app/models/mysql/relations.py @@ -13,10 +13,10 @@ class JobRegionRelation(MysqlBase): __tablename__ = "bg_job_region_relation" - id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) - job_id: Mapped[int] = mapped_column(BigInteger, nullable=False) - region_code: Mapped[str] = mapped_column(String(20), nullable=False) - create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) + id: Mapped[int] = mapped_column(BigInteger, primary_key=True, comment="主键ID(雪花)") + job_id: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="岗位ID") + region_code: Mapped[str] = mapped_column(String(20), nullable=False, comment="地区编码") + create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="创建时间") class JobSkillTagRelation(MysqlBase): @@ -24,7 +24,7 @@ class JobSkillTagRelation(MysqlBase): __tablename__ = "bg_job_skill_tag_relation" - id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) - job_id: Mapped[int] = mapped_column(BigInteger, nullable=False) - skill_tag_id: Mapped[int] = mapped_column(BigInteger, nullable=False) - create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) + id: Mapped[int] = mapped_column(BigInteger, primary_key=True, comment="主键ID(雪花)") + job_id: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="岗位ID") + skill_tag_id: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="技能标签ID") + create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="创建时间") diff --git a/app/models/mysql/skill_tag.py b/app/models/mysql/skill_tag.py index e612a41..9f833b6 100644 --- a/app/models/mysql/skill_tag.py +++ b/app/models/mysql/skill_tag.py @@ -11,5 +11,5 @@ class SkillTag(MysqlBase): __tablename__ = "bg_skill_tag" - id: Mapped[int] = mapped_column(BigInteger, primary_key=True) - name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False) + id: Mapped[int] = mapped_column(BigInteger, primary_key=True, comment="主键ID(雪花)") + name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False, comment="标签名称(唯一索引)") diff --git a/app/models/pg/app_job_data.py b/app/models/pg/app_job_data.py index 7ffd87c..d7fe3ab 100644 --- a/app/models/pg/app_job_data.py +++ b/app/models/pg/app_job_data.py @@ -3,7 +3,7 @@ from datetime import datetime from typing import Optional -from sqlalchemy import BigInteger, DateTime, Integer, SmallInteger, String, Text +from sqlalchemy import BigInteger, DateTime, SmallInteger, String, Text from sqlalchemy.orm import Mapped, mapped_column from app.core.database import PgBase @@ -14,22 +14,22 @@ class AppJobData(PgBase): __tablename__ = "app_job_data" - id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True, comment="自增主键") urllistid: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="关联urllistid") - job_title: Mapped[Optional[str]] = mapped_column(String(255)) - salary: Mapped[Optional[str]] = mapped_column(String(128)) - location: Mapped[Optional[str]] = mapped_column(String(2048)) - company: Mapped[Optional[str]] = mapped_column(String(255), comment="公司名字") - experience: Mapped[Optional[str]] = mapped_column(String(64)) - education: Mapped[Optional[str]] = mapped_column(String(64)) - description: Mapped[str] = mapped_column(Text, nullable=False) - detail_url: Mapped[str] = mapped_column(String(1024), nullable=False) - recruit_category: Mapped[int] = mapped_column(SmallInteger, default=3, nullable=False, comment="招聘分类: 0=校招, 1=实习, 2=社招, 3=其他") - content_hash: Mapped[str] = mapped_column(String(64), nullable=False) - sources: Mapped[int] = mapped_column(SmallInteger, default=0, nullable=False) + job_title: Mapped[Optional[str]] = mapped_column(String(255), comment="职位名称") + salary: Mapped[Optional[str]] = mapped_column(String(128), comment="薪资") + location: Mapped[Optional[str]] = mapped_column(String(2048), comment="工作地点") + company: Mapped[Optional[str]] = mapped_column(String(255), comment="公司名称") + experience: Mapped[Optional[str]] = mapped_column(String(64), comment="经验要求") + education: Mapped[Optional[str]] = mapped_column(String(64), comment="学历要求") + description: Mapped[str] = mapped_column(Text, nullable=False, comment="岗位详情(职责+要求+介绍)") + detail_url: Mapped[str] = mapped_column(String(1024), nullable=False, comment="详情页URL") + recruit_category: Mapped[int] = mapped_column(SmallInteger, default=3, nullable=False, comment="招聘分类: 0=校招 1=实习 2=社招 3=其他") + content_hash: Mapped[str] = mapped_column(String(64), nullable=False, comment="内容哈希,用于去重") + sources: Mapped[int] = mapped_column(SmallInteger, default=0, nullable=False, comment="数据来源 0=官网 1=平台") expire_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="发布日期") - created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False) - updated_at: Mapped[datetime] = mapped_column(DateTime, nullable=False) - clean_status: Mapped[str] = mapped_column(String(20), default="pending", nullable=False, comment="pending/cleaning/cleaned/discarded") - clean_started_at: Mapped[Optional[datetime]] = mapped_column(DateTime) - cleaned_at: Mapped[Optional[datetime]] = mapped_column(DateTime) + created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="创建时间") + updated_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="更新时间") + clean_status: Mapped[str] = mapped_column(String(20), default="pending", nullable=False, comment="清洗状态: pending/cleaning/cleaned/discarded") + clean_started_at: Mapped[Optional[datetime]] = mapped_column(DateTime, comment="清洗开始时间") + cleaned_at: Mapped[Optional[datetime]] = mapped_column(DateTime, comment="清洗完成时间") diff --git a/app/services/job_clean_service.py b/app/services/job_clean_service.py index 9769d64..993ace1 100644 --- a/app/services/job_clean_service.py +++ b/app/services/job_clean_service.py @@ -86,6 +86,26 @@ async def _do_clean(data: dict) -> None: await _update_pg_status(data_id, "discarded") return + # 哈希去重续命:content_hash 已存在于 bg_job → 说明是重复刷新,直接续命 + content_hash = data.get("content_hash") + if content_hash: + async with MysqlSession() as mysql: + row = await mysql.execute( + text("SELECT id FROM bg_job WHERE content_hash = :hash LIMIT 1"), + {"hash": content_hash}, + ) + existing_id = row.scalar() + if existing_id: + now = datetime.now() + await mysql.execute( + text("UPDATE bg_job SET status = 0, create_time = :now, update_time = :now WHERE id = :id"), + {"now": now, "id": existing_id}, + ) + await mysql.commit() + await _update_pg_status(data_id, "cleaned") + log.info("[id={}] 续命:hash命中,岗位ID={}", data_id, existing_id) + return + # 第一次AI:结构化提取 user_message = _build_user_message(data) result = await ai_chat_json(JobCleanModel.STRUCTURE, JOB_STRUCTURE_SYSTEM, user_message) @@ -94,18 +114,6 @@ async def _do_clean(data: dict) -> None: await _update_pg_status(data_id, "discarded") return - # 去重检查 - source_id = str(data_id) - async with MysqlSession() as mysql: - existing = await mysql.execute( - text("SELECT COUNT(*) AS cnt FROM bg_job WHERE source_id = :sid"), - {"sid": source_id}, - ) - if existing.scalar() > 0: - log.info("[id={}] 跳过:已入库(去重)", data_id) - await _update_pg_status(data_id, "cleaned") - return - # 公司处理 company_short_name = result.get("companyShortName") or data.get("company") or "" company_id = await _find_or_create_company(company_short_name) @@ -139,6 +147,7 @@ async def _do_clean(data: dict) -> None: required_industry_id=result.get("requiredIndustryId"), recruit_category=data.get("recruit_category", 3), expire_at=data.get("expire_at"), + content_hash=data.get("content_hash"), source_url=data.get("detail_url"), source_id=source_id, status=0,