8.4 KiB
8.4 KiB
inclusion
| inclusion |
|---|
| manual |
数据清洗方案
总体架构
爬虫(本地网络) → PostgreSQL(app_job_data)
↓
offerpie_job_cleaner(本地Python服务)
↓
asyncio协程 + Semaphore信号量并发
↓
哈希续命 或 三次AI调用
↓
MySQL(bg_job + bg_company + 关联表 + bg_skill_tag)
↓
公司信息不完整的 → 调AI补充
本服务部署在本地非公网环境,纯后台定时任务,无HTTP接口。通过APScheduler调度,asyncio协程提供并发能力。
一、岗位清洗
1.1 触发方式
- 定时:每3分钟触发一次(
clean_interval_seconds可配) - 启动时立即执行第一次
1.2 状态管理
app_job_data.clean_status(PostgreSQL):
pending= 待清洗:新爬到的数据,默认值cleaning= 清洗中:已锁定,正在处理cleaned= 已清洗:处理完成(入库或续命)discarded= 已丢弃:AI判定无效或描述过短
1.3 批量锁定
SELECT * FROM app_job_data
WHERE clean_status = 'pending'
LIMIT {batch_size}
FOR UPDATE SKIP LOCKED
锁定后立即更新:
UPDATE app_job_data
SET clean_status = 'cleaning', clean_started_at = NOW()
WHERE id = ANY(ids)
使用 FOR UPDATE SKIP LOCKED 而非 FOR UPDATE,避免多实例部署时阻塞等待。
1.4 并发模型
- 一批
batch_size(默认100)条数据,全部启动协程 asyncio.Semaphore(concurrency)(默认50)控制同时调AI的并发数- 做完一条立刻让位给下一条,不存在批次内空等
1.5 单条清洗流程
1. 前置校验
└─ description 为空或长度 < 20 → discarded
2. 哈希去重续命(新增)
├─ 用 content_hash 查 bg_job
├─ 命中 → UPDATE bg_job SET status=0, create_time=NOW(), update_time=NOW()
├─ 更新 PG: cleaned
└─ return(不调AI,秒完成)
3. 第一次AI:结构化提取
├─ 输入:原始字段 + 岗位分类列表 + 行业列表
├─ 输出JSON:valid/title/salary/education/categoryId/cities/companyShortName/...
└─ valid=false → discarded
4. 公司处理
├─ 按 short_name 查 bg_company
├─ 存在 → 拿 company_id
└─ 不存在 → 创建一条 status=0 的记录(asyncio.Lock 防并发重复)
6. 地区匹配
└─ AI返回的 cities → 字典缓存模糊匹配 → region_codes
7. 写入 MySQL
├─ INSERT bg_job(带 content_hash、recruit_category、expire_at)
└─ INSERT bg_job_region_relation
8. 更新 PG: cleaned
9. 第二次AI:专业匹配(失败不影响岗位入库)
├─ 输入:title + description + requirement + 专业列表(845条)
└─ 输出:requiredMajorIds + majorSensitivity → UPDATE bg_job
10. 第三次AI:技能提取(失败不影响岗位入库)
├─ 输入:title + description + requirement
└─ 输出:技能名数组 → INSERT IGNORE bg_skill_tag → INSERT bg_job_skill_tag_relation
1.6 哈希续命机制
背景:爬虫每隔7天全量刷新,大部分岗位内容未变。
逻辑:
app_job_data.content_hash:爬虫入库时对岗位内容生成的哈希值bg_job.content_hash:清洗入库时透传过来- 清洗时优先用 hash 查 bg_job,命中说明岗位还活着
- 续命动作:
status=0(重新上架)+ 刷新create_time/update_time - 不调AI,秒级完成,大幅节省时间和API费用
1.7 僵尸恢复(每30分钟)
UPDATE app_job_data
SET clean_status = 'pending', clean_started_at = NULL
WHERE clean_status = 'cleaning'
AND clean_started_at < NOW() - INTERVAL '10 minutes'
二、公司数据补充
2.1 触发方式
- 定时:每5分钟触发一次(
company_interval_seconds可配) - 启动时立即执行第一次
2.2 状态管理
bg_company.status(MySQL):
- 0 = 待完善:岗位清洗时创建,只有 short_name
- 1 = 已完善:AI补充完成
- 2 = 禁用
- 3 = 补充中:已锁定,正在处理
- 4 = 补充失败:AI不认识该公司
2.3 流程
1. 锁定一批 status=0 的公司(batch_size=20,FOR UPDATE SKIP LOCKED)
2. 批量更新 status=3
3. 协程并发(Semaphore=10)
4. 每条:
├─ AI调用:short_name + 行业列表 → 公司详情JSON
├─ valid=false → status=4
└─ valid=true → 回填全部字段 → status=1
2.4 AI返回字段
{
"valid": true,
"name": "公司全称",
"city": "总部城市",
"companyType": "企业类型",
"industryId": 行业ID,
"tags": ["标签"],
"summary": "一句话简介",
"description": "详细描述",
"foundedYear": "成立年份",
"address": "地址",
"scale": "规模",
"website": "官网",
"financingStage": "融资状态",
"latestValuation": "估值",
"news": ["新闻"]
}
2.5 僵尸恢复(每小时)
UPDATE bg_company
SET status = 0, update_time = NOW()
WHERE status = 3
AND update_time < NOW() - INTERVAL 10 MINUTE
三、岗位下架
3.1 触发方式
- 每天凌晨2点执行
3.2 逻辑
UPDATE bg_job
SET status = 2, update_time = NOW()
WHERE status = 0
AND create_time < DATE_SUB(NOW(), INTERVAL {job_expire_days} DAY)
job_expire_days可配,默认7天- 配合续命机制:活跃岗位每次刷新会被续命(create_time 刷新),所以不会被误下架
- 真正过期的岗位(7天内未再被爬到)才会下架
四、字典数据缓存
4.1 加载时机
- 服务启动时从 MySQL 全量加载到内存
4.2 缓存内容
| 数据 | 来源表 | 格式 | 用途 |
|---|---|---|---|
| 岗位分类 | bg_job_category(level=3) | id:name(一级/二级) |
第一次AI prompt |
| 行业 | bg_industry(level=2) | id:name(一级) |
第一次AI prompt + 公司补充 |
| 专业分类 | bg_major_category(level=3) | id:name(一级/二级) |
第二次AI prompt |
| 地区 | bg_china_regions_code(市级) | code + name | 城市名匹配地区编码 |
五、关键设计决策
| 决策点 | 结论 | 原因 |
|---|---|---|
| 为什么从 Java 迁到 Python | asyncio 协程在 IO 密集场景下内存开销远小于 Java 线程 | 10万+数据量,50并发协程 vs 50线程 |
| 并发控制 | Semaphore 信号量 | 做完一条立刻补一条,吞吐量最大化 |
| 哈希续命 | content_hash 匹配免AI | 7天全量刷新时大部分是重复数据,节省90%+ AI调用 |
| 行锁方式 | FOR UPDATE SKIP LOCKED | 不阻塞,适合未来多实例部署 |
| 技能并发去重 | INSERT IGNORE + 唯一索引 | 不加应用层锁,数据库保证 |
| 公司创建并发 | asyncio.Lock | 防止同名公司被多个协程同时插入 |
| AI调用次数 | 三次(结构化 + 专业 + 技能) | 各维度独立,容错互不影响 |
| 公司数据来源 | AI补充 | 公司简称查工商API不精确,AI覆盖率高 |
| 事务跨库 | 不做分布式事务 | PG和MySQL独立操作,僵尸恢复兜底 |
| 岗位下架 | create_time + N天 | 配合续命机制,活跃岗位不会被误下架 |
六、配置参数
| 参数 | 默认值 | 说明 |
|---|---|---|
clean_batch_size |
100 | 每批锁定数据量 |
clean_concurrency |
50 | AI并发调用上限 |
clean_interval_seconds |
180 | 清洗任务间隔(秒) |
company_batch_size |
20 | 公司补充每批数量 |
company_concurrency |
10 | 公司AI并发上限 |
company_interval_seconds |
300 | 公司补充间隔(秒) |
job_expire_days |
7 | 岗位过期天数 |
所有参数通过 .env 文件配置,修改后重启生效。
七、涉及的表
| 表 | 库 | 读/写 | 说明 |
|---|---|---|---|
app_job_data |
PG | 读+写状态 | 爬虫原始数据,clean_status 流转 |
bg_job |
MySQL | 写+改 | 清洗后岗位(含 content_hash 用于续命) |
bg_company |
MySQL | 读+写+改 | 公司(查或创建 + AI补充) |
bg_job_region_relation |
MySQL | 写 | 岗位-地区关联 |
bg_skill_tag |
MySQL | 写(INSERT IGNORE) | 技能标签自动入库 |
bg_job_skill_tag_relation |
MySQL | 写 | 岗位-技能关联 |
bg_job_category |
MySQL | 读(缓存) | 岗位分类字典 |
bg_industry |
MySQL | 读(缓存) | 行业字典 |
bg_major_category |
MySQL | 读(缓存) | 专业字典 |
bg_china_regions_code |
MySQL | 读(缓存) | 地区字典 |