Files
2026-06-02 18:38:36 +08:00

8.4 KiB
Raw Permalink Blame History

inclusion
inclusion
manual

数据清洗方案

总体架构

爬虫(本地网络) → PostgreSQL(app_job_data)
                        ↓
            offerpie_job_cleaner(本地Python服务)
                        ↓
              asyncio协程 + Semaphore信号量并发
                        ↓
              哈希续命 或 三次AI调用
                        ↓
         MySQL(bg_job + bg_company + 关联表 + bg_skill_tag)
                        ↓
            公司信息不完整的 → 调AI补充

本服务部署在本地非公网环境,纯后台定时任务,无HTTP接口。通过APScheduler调度,asyncio协程提供并发能力。


一、岗位清洗

1.1 触发方式

  • 定时:每3分钟触发一次(clean_interval_seconds 可配)
  • 启动时立即执行第一次

1.2 状态管理

app_job_data.clean_statusPostgreSQL):

  • pending = 待清洗:新爬到的数据,默认值
  • cleaning = 清洗中:已锁定,正在处理
  • cleaned = 已清洗:处理完成(入库或续命)
  • discarded = 已丢弃:AI判定无效或描述过短

1.3 批量锁定

SELECT * FROM app_job_data
WHERE clean_status = 'pending'
LIMIT {batch_size}
FOR UPDATE SKIP LOCKED

锁定后立即更新:

UPDATE app_job_data
SET clean_status = 'cleaning', clean_started_at = NOW()
WHERE id = ANY(ids)

使用 FOR UPDATE SKIP LOCKED 而非 FOR UPDATE,避免多实例部署时阻塞等待。

1.4 并发模型

  • 一批 batch_size(默认100)条数据,全部启动协程
  • asyncio.Semaphore(concurrency)(默认50)控制同时调AI的并发数
  • 做完一条立刻让位给下一条,不存在批次内空等

1.5 单条清洗流程

1. 前置校验
   └─ description 为空或长度 < 20 → discarded

2. 哈希去重续命(新增)
   ├─ 用 content_hash 查 bg_job
   ├─ 命中 → UPDATE bg_job SET status=0, create_time=NOW(), update_time=NOW()
   ├─ 更新 PG: cleaned
   └─ return(不调AI,秒完成)

3. 第一次AI:结构化提取
   ├─ 输入:原始字段 + 岗位分类列表 + 行业列表
   ├─ 输出JSONvalid/title/salary/education/categoryId/cities/companyShortName/...
   └─ valid=false → discarded

4. 公司处理
   ├─ 按 short_name 查 bg_company
   ├─ 存在 → 拿 company_id
   └─ 不存在 → 创建一条 status=0 的记录(asyncio.Lock 防并发重复)

6. 地区匹配
   └─ AI返回的 cities → 字典缓存模糊匹配 → region_codes

7. 写入 MySQL
   ├─ INSERT bg_job(带 content_hash、recruit_category、expire_at
   └─ INSERT bg_job_region_relation

8. 更新 PG: cleaned

9. 第二次AI:专业匹配(失败不影响岗位入库)
   ├─ 输入:title + description + requirement + 专业列表(845条)
   └─ 输出:requiredMajorIds + majorSensitivity → UPDATE bg_job

10. 第三次AI:技能提取(失败不影响岗位入库)
    ├─ 输入:title + description + requirement
    └─ 输出:技能名数组 → INSERT IGNORE bg_skill_tag → INSERT bg_job_skill_tag_relation

1.6 哈希续命机制

背景:爬虫每隔7天全量刷新,大部分岗位内容未变。

逻辑

  • app_job_data.content_hash:爬虫入库时对岗位内容生成的哈希值
  • bg_job.content_hash:清洗入库时透传过来
  • 清洗时优先用 hash 查 bg_job,命中说明岗位还活着
  • 续命动作:status=0(重新上架)+ 刷新 create_time/update_time
  • 不调AI,秒级完成,大幅节省时间和API费用

1.7 僵尸恢复(每30分钟)

UPDATE app_job_data
SET clean_status = 'pending', clean_started_at = NULL
WHERE clean_status = 'cleaning'
  AND clean_started_at < NOW() - INTERVAL '10 minutes'

二、公司数据补充

2.1 触发方式

  • 定时:每5分钟触发一次(company_interval_seconds 可配)
  • 启动时立即执行第一次

2.2 状态管理

bg_company.statusMySQL):

  • 0 = 待完善:岗位清洗时创建,只有 short_name
  • 1 = 已完善:AI补充完成
  • 2 = 禁用
  • 3 = 补充中:已锁定,正在处理
  • 4 = 补充失败:AI不认识该公司

2.3 流程

1. 锁定一批 status=0 的公司(batch_size=20FOR UPDATE SKIP LOCKED
2. 批量更新 status=3
3. 协程并发(Semaphore=10
4. 每条:
   ├─ AI调用:short_name + 行业列表 → 公司详情JSON
   ├─ valid=false → status=4
   └─ valid=true → 回填全部字段 → status=1

2.4 AI返回字段

{
  "valid": true,
  "name": "公司全称",
  "city": "总部城市",
  "companyType": "企业类型",
  "industryId": 行业ID,
  "tags": ["标签"],
  "summary": "一句话简介",
  "description": "详细描述",
  "foundedYear": "成立年份",
  "address": "地址",
  "scale": "规模",
  "website": "官网",
  "financingStage": "融资状态",
  "latestValuation": "估值",
  "news": ["新闻"]
}

2.5 僵尸恢复(每小时)

UPDATE bg_company
SET status = 0, update_time = NOW()
WHERE status = 3
  AND update_time < NOW() - INTERVAL 10 MINUTE

三、岗位下架

3.1 触发方式

  • 每天凌晨2点执行

3.2 逻辑

UPDATE bg_job
SET status = 2, update_time = NOW()
WHERE status = 0
  AND create_time < DATE_SUB(NOW(), INTERVAL {job_expire_days} DAY)
  • job_expire_days 可配,默认7天
  • 配合续命机制:活跃岗位每次刷新会被续命(create_time 刷新),所以不会被误下架
  • 真正过期的岗位(7天内未再被爬到)才会下架

四、字典数据缓存

4.1 加载时机

  • 服务启动时从 MySQL 全量加载到内存

4.2 缓存内容

数据 来源表 格式 用途
岗位分类 bg_job_categorylevel=3 id:name(一级/二级) 第一次AI prompt
行业 bg_industrylevel=2 id:name(一级) 第一次AI prompt + 公司补充
专业分类 bg_major_categorylevel=3 id:name(一级/二级) 第二次AI prompt
地区 bg_china_regions_code(市级) code + name 城市名匹配地区编码

五、关键设计决策

决策点 结论 原因
为什么从 Java 迁到 Python asyncio 协程在 IO 密集场景下内存开销远小于 Java 线程 10万+数据量,50并发协程 vs 50线程
并发控制 Semaphore 信号量 做完一条立刻补一条,吞吐量最大化
哈希续命 content_hash 匹配免AI 7天全量刷新时大部分是重复数据,节省90%+ AI调用
行锁方式 FOR UPDATE SKIP LOCKED 不阻塞,适合未来多实例部署
技能并发去重 INSERT IGNORE + 唯一索引 不加应用层锁,数据库保证
公司创建并发 asyncio.Lock 防止同名公司被多个协程同时插入
AI调用次数 三次(结构化 + 专业 + 技能) 各维度独立,容错互不影响
公司数据来源 AI补充 公司简称查工商API不精确,AI覆盖率高
事务跨库 不做分布式事务 PG和MySQL独立操作,僵尸恢复兜底
岗位下架 create_time + N天 配合续命机制,活跃岗位不会被误下架

六、配置参数

参数 默认值 说明
clean_batch_size 100 每批锁定数据量
clean_concurrency 50 AI并发调用上限
clean_interval_seconds 180 清洗任务间隔(秒)
company_batch_size 20 公司补充每批数量
company_concurrency 10 公司AI并发上限
company_interval_seconds 300 公司补充间隔(秒)
job_expire_days 7 岗位过期天数

所有参数通过 .env 文件配置,修改后重启生效。


七、涉及的表

读/写 说明
app_job_data PG 读+写状态 爬虫原始数据,clean_status 流转
bg_job MySQL 写+改 清洗后岗位(含 content_hash 用于续命)
bg_company MySQL 读+写+改 公司(查或创建 + AI补充)
bg_job_region_relation MySQL 岗位-地区关联
bg_skill_tag MySQL 写(INSERT IGNORE) 技能标签自动入库
bg_job_skill_tag_relation MySQL 岗位-技能关联
bg_job_category MySQL 读(缓存) 岗位分类字典
bg_industry MySQL 读(缓存) 行业字典
bg_major_category MySQL 读(缓存) 专业字典
bg_china_regions_code MySQL 读(缓存) 地区字典