--- inclusion: manual --- # 数据清洗方案 ## 总体架构 ``` 爬虫(本地网络) → PostgreSQL(app_job_data) ↓ offerpie_job_cleaner(本地Python服务) ↓ asyncio协程 + Semaphore信号量并发 ↓ 哈希续命 或 三次AI调用 ↓ MySQL(bg_job + bg_company + 关联表 + bg_skill_tag) ↓ 公司信息不完整的 → 调AI补充 ``` 本服务部署在本地非公网环境,纯后台定时任务,无HTTP接口。通过APScheduler调度,asyncio协程提供并发能力。 --- ## 一、岗位清洗 ### 1.1 触发方式 - 定时:每3分钟触发一次(`clean_interval_seconds` 可配) - 启动时立即执行第一次 ### 1.2 状态管理 `app_job_data.clean_status`(PostgreSQL): - `pending` = 待清洗:新爬到的数据,默认值 - `cleaning` = 清洗中:已锁定,正在处理 - `cleaned` = 已清洗:处理完成(入库或续命) - `discarded` = 已丢弃:AI判定无效或描述过短 ### 1.3 批量锁定 ```sql SELECT * FROM app_job_data WHERE clean_status = 'pending' LIMIT {batch_size} FOR UPDATE SKIP LOCKED ``` 锁定后立即更新: ```sql UPDATE app_job_data SET clean_status = 'cleaning', clean_started_at = NOW() WHERE id = ANY(ids) ``` 使用 `FOR UPDATE SKIP LOCKED` 而非 `FOR UPDATE`,避免多实例部署时阻塞等待。 ### 1.4 并发模型 - 一批 `batch_size`(默认100)条数据,全部启动协程 - `asyncio.Semaphore(concurrency)`(默认50)控制同时调AI的并发数 - 做完一条立刻让位给下一条,不存在批次内空等 ### 1.5 单条清洗流程 ``` 1. 前置校验 └─ description 为空或长度 < 20 → discarded 2. 哈希去重续命(新增) ├─ 用 content_hash 查 bg_job ├─ 命中 → UPDATE bg_job SET status=0, create_time=NOW(), update_time=NOW() ├─ 更新 PG: cleaned └─ return(不调AI,秒完成) 3. 第一次AI:结构化提取 ├─ 输入:原始字段 + 岗位分类列表 + 行业列表 ├─ 输出JSON:valid/title/salary/education/categoryId/cities/companyShortName/... └─ valid=false → discarded 4. 公司处理 ├─ 按 short_name 查 bg_company ├─ 存在 → 拿 company_id └─ 不存在 → 创建一条 status=0 的记录(asyncio.Lock 防并发重复) 6. 地区匹配 └─ AI返回的 cities → 字典缓存模糊匹配 → region_codes 7. 写入 MySQL ├─ INSERT bg_job(带 content_hash、recruit_category、expire_at) └─ INSERT bg_job_region_relation 8. 更新 PG: cleaned 9. 第二次AI:专业匹配(失败不影响岗位入库) ├─ 输入:title + description + requirement + 专业列表(845条) └─ 输出:requiredMajorIds + majorSensitivity → UPDATE bg_job 10. 第三次AI:技能提取(失败不影响岗位入库) ├─ 输入:title + description + requirement └─ 输出:技能名数组 → INSERT IGNORE bg_skill_tag → INSERT bg_job_skill_tag_relation ``` ### 1.6 哈希续命机制 **背景**:爬虫每隔7天全量刷新,大部分岗位内容未变。 **逻辑**: - `app_job_data.content_hash`:爬虫入库时对岗位内容生成的哈希值 - `bg_job.content_hash`:清洗入库时透传过来 - 清洗时优先用 hash 查 bg_job,命中说明岗位还活着 - 续命动作:`status=0`(重新上架)+ 刷新 `create_time`/`update_time` - 不调AI,秒级完成,大幅节省时间和API费用 ### 1.7 僵尸恢复(每30分钟) ```sql UPDATE app_job_data SET clean_status = 'pending', clean_started_at = NULL WHERE clean_status = 'cleaning' AND clean_started_at < NOW() - INTERVAL '10 minutes' ``` --- ## 二、公司数据补充 ### 2.1 触发方式 - 定时:每5分钟触发一次(`company_interval_seconds` 可配) - 启动时立即执行第一次 ### 2.2 状态管理 `bg_company.status`(MySQL): - 0 = 待完善:岗位清洗时创建,只有 short_name - 1 = 已完善:AI补充完成 - 2 = 禁用 - 3 = 补充中:已锁定,正在处理 - 4 = 补充失败:AI不认识该公司 ### 2.3 流程 ``` 1. 锁定一批 status=0 的公司(batch_size=20,FOR UPDATE SKIP LOCKED) 2. 批量更新 status=3 3. 协程并发(Semaphore=10) 4. 每条: ├─ AI调用:short_name + 行业列表 → 公司详情JSON ├─ valid=false → status=4 └─ valid=true → 回填全部字段 → status=1 ``` ### 2.4 AI返回字段 ```json { "valid": true, "name": "公司全称", "city": "总部城市", "companyType": "企业类型", "industryId": 行业ID, "tags": ["标签"], "summary": "一句话简介", "description": "详细描述", "foundedYear": "成立年份", "address": "地址", "scale": "规模", "website": "官网", "financingStage": "融资状态", "latestValuation": "估值", "news": ["新闻"] } ``` ### 2.5 僵尸恢复(每小时) ```sql UPDATE bg_company SET status = 0, update_time = NOW() WHERE status = 3 AND update_time < NOW() - INTERVAL 10 MINUTE ``` --- ## 三、岗位下架 ### 3.1 触发方式 - 每天凌晨2点执行 ### 3.2 逻辑 ```sql UPDATE bg_job SET status = 2, update_time = NOW() WHERE status = 0 AND create_time < DATE_SUB(NOW(), INTERVAL {job_expire_days} DAY) ``` - `job_expire_days` 可配,默认7天 - 配合续命机制:活跃岗位每次刷新会被续命(create_time 刷新),所以不会被误下架 - 真正过期的岗位(7天内未再被爬到)才会下架 --- ## 四、字典数据缓存 ### 4.1 加载时机 - 服务启动时从 MySQL 全量加载到内存 ### 4.2 缓存内容 | 数据 | 来源表 | 格式 | 用途 | |------|--------|------|------| | 岗位分类 | bg_job_category(level=3) | `id:name(一级/二级)` | 第一次AI prompt | | 行业 | bg_industry(level=2) | `id:name(一级)` | 第一次AI prompt + 公司补充 | | 专业分类 | bg_major_category(level=3) | `id:name(一级/二级)` | 第二次AI prompt | | 地区 | bg_china_regions_code(市级) | code + name | 城市名匹配地区编码 | --- ## 五、关键设计决策 | 决策点 | 结论 | 原因 | |--------|------|------| | 为什么从 Java 迁到 Python | asyncio 协程在 IO 密集场景下内存开销远小于 Java 线程 | 10万+数据量,50并发协程 vs 50线程 | | 并发控制 | Semaphore 信号量 | 做完一条立刻补一条,吞吐量最大化 | | 哈希续命 | content_hash 匹配免AI | 7天全量刷新时大部分是重复数据,节省90%+ AI调用 | | 行锁方式 | FOR UPDATE SKIP LOCKED | 不阻塞,适合未来多实例部署 | | 技能并发去重 | INSERT IGNORE + 唯一索引 | 不加应用层锁,数据库保证 | | 公司创建并发 | asyncio.Lock | 防止同名公司被多个协程同时插入 | | AI调用次数 | 三次(结构化 + 专业 + 技能) | 各维度独立,容错互不影响 | | 公司数据来源 | AI补充 | 公司简称查工商API不精确,AI覆盖率高 | | 事务跨库 | 不做分布式事务 | PG和MySQL独立操作,僵尸恢复兜底 | | 岗位下架 | create_time + N天 | 配合续命机制,活跃岗位不会被误下架 | --- ## 六、配置参数 | 参数 | 默认值 | 说明 | |------|--------|------| | `clean_batch_size` | 100 | 每批锁定数据量 | | `clean_concurrency` | 50 | AI并发调用上限 | | `clean_interval_seconds` | 180 | 清洗任务间隔(秒) | | `company_batch_size` | 20 | 公司补充每批数量 | | `company_concurrency` | 10 | 公司AI并发上限 | | `company_interval_seconds` | 300 | 公司补充间隔(秒) | | `job_expire_days` | 7 | 岗位过期天数 | 所有参数通过 `.env` 文件配置,修改后重启生效。 --- ## 七、涉及的表 | 表 | 库 | 读/写 | 说明 | |----|-----|-------|------| | `app_job_data` | PG | 读+写状态 | 爬虫原始数据,clean_status 流转 | | `bg_job` | MySQL | 写+改 | 清洗后岗位(含 content_hash 用于续命) | | `bg_company` | MySQL | 读+写+改 | 公司(查或创建 + AI补充) | | `bg_job_region_relation` | MySQL | 写 | 岗位-地区关联 | | `bg_skill_tag` | MySQL | 写(INSERT IGNORE) | 技能标签自动入库 | | `bg_job_skill_tag_relation` | MySQL | 写 | 岗位-技能关联 | | `bg_job_category` | MySQL | 读(缓存) | 岗位分类字典 | | `bg_industry` | MySQL | 读(缓存) | 行业字典 | | `bg_major_category` | MySQL | 读(缓存) | 专业字典 | | `bg_china_regions_code` | MySQL | 读(缓存) | 地区字典 |