Files
2026-06-02 18:38:36 +08:00

274 lines
8.4 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
---
inclusion: manual
---
# 数据清洗方案
## 总体架构
```
爬虫(本地网络) → PostgreSQL(app_job_data)
offerpie_job_cleaner(本地Python服务)
asyncio协程 + Semaphore信号量并发
哈希续命 或 三次AI调用
MySQL(bg_job + bg_company + 关联表 + bg_skill_tag)
公司信息不完整的 → 调AI补充
```
本服务部署在本地非公网环境,纯后台定时任务,无HTTP接口。通过APScheduler调度,asyncio协程提供并发能力。
---
## 一、岗位清洗
### 1.1 触发方式
- 定时:每3分钟触发一次(`clean_interval_seconds` 可配)
- 启动时立即执行第一次
### 1.2 状态管理
`app_job_data.clean_status`PostgreSQL):
- `pending` = 待清洗:新爬到的数据,默认值
- `cleaning` = 清洗中:已锁定,正在处理
- `cleaned` = 已清洗:处理完成(入库或续命)
- `discarded` = 已丢弃:AI判定无效或描述过短
### 1.3 批量锁定
```sql
SELECT * FROM app_job_data
WHERE clean_status = 'pending'
LIMIT {batch_size}
FOR UPDATE SKIP LOCKED
```
锁定后立即更新:
```sql
UPDATE app_job_data
SET clean_status = 'cleaning', clean_started_at = NOW()
WHERE id = ANY(ids)
```
使用 `FOR UPDATE SKIP LOCKED` 而非 `FOR UPDATE`,避免多实例部署时阻塞等待。
### 1.4 并发模型
- 一批 `batch_size`(默认100)条数据,全部启动协程
- `asyncio.Semaphore(concurrency)`(默认50)控制同时调AI的并发数
- 做完一条立刻让位给下一条,不存在批次内空等
### 1.5 单条清洗流程
```
1. 前置校验
└─ description 为空或长度 < 20 → discarded
2. 哈希去重续命(新增)
├─ 用 content_hash 查 bg_job
├─ 命中 → UPDATE bg_job SET status=0, create_time=NOW(), update_time=NOW()
├─ 更新 PG: cleaned
└─ return(不调AI,秒完成)
3. 第一次AI:结构化提取
├─ 输入:原始字段 + 岗位分类列表 + 行业列表
├─ 输出JSONvalid/title/salary/education/categoryId/cities/companyShortName/...
└─ valid=false → discarded
4. 公司处理
├─ 按 short_name 查 bg_company
├─ 存在 → 拿 company_id
└─ 不存在 → 创建一条 status=0 的记录(asyncio.Lock 防并发重复)
6. 地区匹配
└─ AI返回的 cities → 字典缓存模糊匹配 → region_codes
7. 写入 MySQL
├─ INSERT bg_job(带 content_hash、recruit_category、expire_at
└─ INSERT bg_job_region_relation
8. 更新 PG: cleaned
9. 第二次AI:专业匹配(失败不影响岗位入库)
├─ 输入:title + description + requirement + 专业列表(845条)
└─ 输出:requiredMajorIds + majorSensitivity → UPDATE bg_job
10. 第三次AI:技能提取(失败不影响岗位入库)
├─ 输入:title + description + requirement
└─ 输出:技能名数组 → INSERT IGNORE bg_skill_tag → INSERT bg_job_skill_tag_relation
```
### 1.6 哈希续命机制
**背景**:爬虫每隔7天全量刷新,大部分岗位内容未变。
**逻辑**
- `app_job_data.content_hash`:爬虫入库时对岗位内容生成的哈希值
- `bg_job.content_hash`:清洗入库时透传过来
- 清洗时优先用 hash 查 bg_job,命中说明岗位还活着
- 续命动作:`status=0`(重新上架)+ 刷新 `create_time`/`update_time`
- 不调AI,秒级完成,大幅节省时间和API费用
### 1.7 僵尸恢复(每30分钟)
```sql
UPDATE app_job_data
SET clean_status = 'pending', clean_started_at = NULL
WHERE clean_status = 'cleaning'
AND clean_started_at < NOW() - INTERVAL '10 minutes'
```
---
## 二、公司数据补充
### 2.1 触发方式
- 定时:每5分钟触发一次(`company_interval_seconds` 可配)
- 启动时立即执行第一次
### 2.2 状态管理
`bg_company.status`MySQL):
- 0 = 待完善:岗位清洗时创建,只有 short_name
- 1 = 已完善:AI补充完成
- 2 = 禁用
- 3 = 补充中:已锁定,正在处理
- 4 = 补充失败:AI不认识该公司
### 2.3 流程
```
1. 锁定一批 status=0 的公司(batch_size=20FOR UPDATE SKIP LOCKED
2. 批量更新 status=3
3. 协程并发(Semaphore=10
4. 每条:
├─ AI调用:short_name + 行业列表 → 公司详情JSON
├─ valid=false → status=4
└─ valid=true → 回填全部字段 → status=1
```
### 2.4 AI返回字段
```json
{
"valid": true,
"name": "公司全称",
"city": "总部城市",
"companyType": "企业类型",
"industryId": ID,
"tags": ["标签"],
"summary": "一句话简介",
"description": "详细描述",
"foundedYear": "成立年份",
"address": "地址",
"scale": "规模",
"website": "官网",
"financingStage": "融资状态",
"latestValuation": "估值",
"news": ["新闻"]
}
```
### 2.5 僵尸恢复(每小时)
```sql
UPDATE bg_company
SET status = 0, update_time = NOW()
WHERE status = 3
AND update_time < NOW() - INTERVAL 10 MINUTE
```
---
## 三、岗位下架
### 3.1 触发方式
- 每天凌晨2点执行
### 3.2 逻辑
```sql
UPDATE bg_job
SET status = 2, update_time = NOW()
WHERE status = 0
AND create_time < DATE_SUB(NOW(), INTERVAL {job_expire_days} DAY)
```
- `job_expire_days` 可配,默认7天
- 配合续命机制:活跃岗位每次刷新会被续命(create_time 刷新),所以不会被误下架
- 真正过期的岗位(7天内未再被爬到)才会下架
---
## 四、字典数据缓存
### 4.1 加载时机
- 服务启动时从 MySQL 全量加载到内存
### 4.2 缓存内容
| 数据 | 来源表 | 格式 | 用途 |
|------|--------|------|------|
| 岗位分类 | bg_job_categorylevel=3 | `id:name(一级/二级)` | 第一次AI prompt |
| 行业 | bg_industrylevel=2 | `id:name(一级)` | 第一次AI prompt + 公司补充 |
| 专业分类 | bg_major_categorylevel=3 | `id:name(一级/二级)` | 第二次AI prompt |
| 地区 | bg_china_regions_code(市级) | code + name | 城市名匹配地区编码 |
---
## 五、关键设计决策
| 决策点 | 结论 | 原因 |
|--------|------|------|
| 为什么从 Java 迁到 Python | asyncio 协程在 IO 密集场景下内存开销远小于 Java 线程 | 10万+数据量,50并发协程 vs 50线程 |
| 并发控制 | Semaphore 信号量 | 做完一条立刻补一条,吞吐量最大化 |
| 哈希续命 | content_hash 匹配免AI | 7天全量刷新时大部分是重复数据,节省90%+ AI调用 |
| 行锁方式 | FOR UPDATE SKIP LOCKED | 不阻塞,适合未来多实例部署 |
| 技能并发去重 | INSERT IGNORE + 唯一索引 | 不加应用层锁,数据库保证 |
| 公司创建并发 | asyncio.Lock | 防止同名公司被多个协程同时插入 |
| AI调用次数 | 三次(结构化 + 专业 + 技能) | 各维度独立,容错互不影响 |
| 公司数据来源 | AI补充 | 公司简称查工商API不精确,AI覆盖率高 |
| 事务跨库 | 不做分布式事务 | PG和MySQL独立操作,僵尸恢复兜底 |
| 岗位下架 | create_time + N天 | 配合续命机制,活跃岗位不会被误下架 |
---
## 六、配置参数
| 参数 | 默认值 | 说明 |
|------|--------|------|
| `clean_batch_size` | 100 | 每批锁定数据量 |
| `clean_concurrency` | 50 | AI并发调用上限 |
| `clean_interval_seconds` | 180 | 清洗任务间隔(秒) |
| `company_batch_size` | 20 | 公司补充每批数量 |
| `company_concurrency` | 10 | 公司AI并发上限 |
| `company_interval_seconds` | 300 | 公司补充间隔(秒) |
| `job_expire_days` | 7 | 岗位过期天数 |
所有参数通过 `.env` 文件配置,修改后重启生效。
---
## 七、涉及的表
| 表 | 库 | 读/写 | 说明 |
|----|-----|-------|------|
| `app_job_data` | PG | 读+写状态 | 爬虫原始数据,clean_status 流转 |
| `bg_job` | MySQL | 写+改 | 清洗后岗位(含 content_hash 用于续命) |
| `bg_company` | MySQL | 读+写+改 | 公司(查或创建 + AI补充) |
| `bg_job_region_relation` | MySQL | 写 | 岗位-地区关联 |
| `bg_skill_tag` | MySQL | 写(INSERT IGNORE) | 技能标签自动入库 |
| `bg_job_skill_tag_relation` | MySQL | 写 | 岗位-技能关联 |
| `bg_job_category` | MySQL | 读(缓存) | 岗位分类字典 |
| `bg_industry` | MySQL | 读(缓存) | 行业字典 |
| `bg_major_category` | MySQL | 读(缓存) | 专业字典 |
| `bg_china_regions_code` | MySQL | 读(缓存) | 地区字典 |