17 KiB
inclusion
| inclusion |
|---|
| manual |
数据清洗方案
总体架构
爬虫(公司网络) → app_job_data(原始数据)
↓
Java定时任务读取(多线程)
↓
调用AI API清洗/结构化
↓
写入业务表(bg_company + bg_job + 关联表)
↓
公司信息不完整的 → 调工商API补充
所有清洗逻辑放在 manager 模块,通过 @Scheduled 定时任务触发。
讨论分区
整体方案分为四部分逐步讨论:
- ✅ 岗位清洗触发逻辑
- ✅ 岗位清洗逻辑
- ✅ 公司数据触发逻辑
- ✅ 公司数据补充逻辑(AI补充)
一、岗位清洗触发逻辑
1.1 表结构变更
app_job_data 新增字段:
ALTER TABLE app_job_data ADD COLUMN clean_status TINYINT(1) DEFAULT 0 NOT NULL COMMENT '清洗状态 0=待清洗 1=清洗中 2=已入库 3=已丢弃';
CREATE INDEX idx_clean_status ON app_job_data (clean_status);
状态说明:
- 0=待清洗:新爬到的数据,默认值,不影响爬虫原有插入逻辑
- 1=清洗中:定时任务已锁定,正在处理
- 2=已入库:清洗成功,已写入 bg_job
- 3=已丢弃:AI判定为无效数据,不入库
时间记录:不加额外时间字段,利用已有的 updated_at(ON UPDATE CURRENT_TIMESTAMP),状态变更时自动更新。
1.2 两个定时任务
任务A:岗位清洗任务(高频,每5分钟)
- 批量锁定(事务内
SELECT ... FOR UPDATE+UPDATE):先在事务内对clean_status=0 AND is_valid=1的行加行锁并查出数据,再更新clean_status=1,事务提交后释放锁。行锁保证并发安全,其他线程会阻塞直到事务提交 - 将锁定的数据丢入线程池,多线程并发调用 AI API 清洗
- 每条处理完毕后,单独更新
clean_status为 2(已入库)或 3(已丢弃) - 单条写入事务:bg_job 入库 + clean_status 更新放在同一个短事务中,保证一致性
任务B:僵尸恢复任务(低频,每30分钟)
处理因发布重启导致卡在"清洗中"的僵尸数据:
UPDATE app_job_data SET clean_status=0 WHERE clean_status=1 AND updated_at < NOW() - INTERVAL 10 MINUTE
一条SQL搞定,将超时10分钟仍在"清洗中"的数据重置为待清洗,下次任务A会重新捞取处理。
1.3 去重保障
即使同一条数据被重复清洗(僵尸恢复后重新处理),写入 bg_job 时通过 source_id 判断是否已存在,存在则跳过,不会产生重复数据。
1.4 设计决策记录
| 决策点 | 结论 | 原因 |
|---|---|---|
| 清洗状态放哪 | app_job_data 加字段 | 同库,简单直接 |
| 是否加"清洗中"状态 | 是 | 多线程并发需要锁定机制 |
| 长事务 vs 短事务 | 短事务(单条) | AI调用耗时长,不能hold连接 |
| 僵尸恢复方式 | 独立低频定时任务 | 避免每次清洗任务都多一次查询,节省性能 |
| 是否加 clean_time 字段 | 否 | updated_at 自动更新,够用 |
| 失败重试 | 僵尸恢复任务自动处理 | clean_status=1 超时后重置为0,自动重试 |
二、岗位清洗逻辑
2.1 前置校验(Java侧,不调AI)
description为空或长度 < 20 → 直接标记clean_status=3(丢弃),跳过,节省AI调用成本
2.2 参考数据准备
应用启动时加载并缓存(定期刷新):
bg_job_category全量:只取叶子节点(level=3),拼成id:name(一级/二级)文本列表bg_industry全量:只取叶子节点(level=2),拼成id:name(一级)文本列表bg_skill_tag全量:按categoryId分组缓存为Map<Long, List<SkillTag>>,供第二次 AI 调用使用
分类和行业列表作为 prompt 的一部分传给AI,ID由人工维护为短数字,不使用雪花ID。
地区数据(bg_china_regions_code)不传给AI,由Java侧根据AI返回的城市名自行匹配。
2.3 AI 调用(单次调用,返回结构化JSON)
输入
- 原始字段:job_title、salary、location、company、experience、education、description
- 参考列表:岗位分类(id:name)、行业(id:name)
AI 返回 JSON 结构
{
"valid": true,
"title": "Java高级开发工程师",
"salary": "15-25K",
"education": 2,
"minExperience": 3,
"employmentType": 0,
"categoryId": 12,
"requiredIndustryId": 5,
"description": "1. 负责核心业务系统开发...",
"requirement": "1. 本科及以上学历...",
"bonus": "1. 有分布式系统经验优先...",
"tags": ["数据分析", "产品策略", "团队协作"],
"skillTags": ["Java", "Spring Boot", "MySQL"],
"companyShortName": "字节跳动",
"cities": ["北京", "上海"]
}
各字段清洗规则
| 字段 | 来源 | 规则 |
|---|---|---|
| valid | AI综合判断 | 数据是否有效,false则丢弃 |
| title | job_title | 存在则保留;不存在则AI从description归纳生成 |
| salary | salary | 有效则标准化(10-20K / 20K / 面议);无效或空则null |
| education | education + description | 映射为 0=不限 1=大专 2=本科 3=硕士 4=博士 |
| minExperience | experience + description | 提取最低年限数值,不要求则为0 |
| employmentType | description | 判断 0=全职 1=兼职,默认0 |
| categoryId | description + job_title | 必选,从分类列表中选最接近的,不允许返回null |
| requiredIndustryId | description(任职要求部分) | 仅当明确提到行业经验要求时设置;列表中无完全匹配则选最相似的;未提到则null |
| description | description + experience + education | 提取"岗位职责"部分,保持原文风格,格式化展示 |
| requirement | description + experience + education | 提取"任职要求"部分,保持原文风格,格式化展示 |
| bonus | description + experience + education | 提取"加分项"部分,无则空 |
| tags | description + job_title | 核心职能标签(如数据分析、产品策略、团队协作),最多5个 |
| skillTags | description | 技能关键词(如Java、Spring Boot、MySQL),最多8个 |
| companyShortName | company | 提取简洁的公司简称,去掉地区后缀、招聘后缀、括号内容等,保持"中国平安""字节跳动"风格 |
| cities | location | 提取城市名列表,精确到市级 |
2.4 AI 返回后的 Java 处理流程
- valid=false → 更新
clean_status=3,结束 - 公司处理:按AI清洗后的
companyShortName查bg_company.short_name,存在则拿company_id;不存在则创建一条(short_name=companyShortName, status=0待完善),拿新ID - 地区处理:
cities列表逐个匹配bg_china_regions_code(按name匹配到市级),匹配上的准备写入关联表 - 去重:用
source_id(app_job_data.id)查bg_job,已存在则跳过,更新clean_status=2 - 写入 bg_job:组装所有字段,
source_id=app_job_data.id,source_url=detail_url,status=0(上架) - 写入 bg_job_region_relation:岗位ID + 匹配到的region_code,一岗多地区
- 更新 app_job_data.clean_status=2
步骤 2-7 放在一个短事务中,保证数据一致性。
2.5 技能标签匹配(第二次 AI 调用)
bg_skill_tag 是预定义的技能标签池,挂在岗位类型下,用于岗位-简历匹配度计算。与 bg_job.skill_tags(自由文本,展示用)是两套东西。
关联表
CREATE TABLE bg_job_skill_tag_relation (
id BIGINT NOT NULL,
job_id BIGINT NOT NULL COMMENT '岗位ID',
skill_tag_id BIGINT NOT NULL COMMENT '技能标签ID',
create_time DATETIME NOT NULL COMMENT '创建时间',
PRIMARY KEY (id),
INDEX idx_job_id (job_id),
INDEX idx_skill_tag_id (skill_tag_id),
UNIQUE INDEX uk_job_skill (job_id, skill_tag_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='岗位-技能标签关联表';
流程
- 第一次 AI 调用完成后,拿到
categoryId - 从缓存取该
categoryId下的 skillTag 列表,为空则跳过 - 发起第二次 AI 调用:传岗位标题+职责+要求 + 该分类下的标签列表(
id:name),AI 返回匹配的标签 ID 数组 - 校验返回的 ID 确实存在于该 categoryId 的标签池中(防幻觉)
- 写入
bg_job_skill_tag_relation(在 saveJobData 事务中一并写入) - 第二次 AI 调用失败不影响岗位入库,仅日志记录
prompt 规则
- 只能从给定列表中选择,不允许自创
- 不限制个数,不重复即可
- 只返回 ID 数组,如
[1, 3, 7]
2.6 设计决策记录
| 决策点 | 结论 | 原因 |
|---|---|---|
| AI调用次数 | 一次调用返回全部字段 | 减少API调用成本和延迟 |
| 分类/行业列表怎么给AI | 直接传 id:name 文本 | ID人工维护为短数字,token消耗可控 |
| 地区匹配方式 | AI输出城市名,Java侧匹配 | 城市名无歧义,不需要传参考列表 |
| categoryId 是否可空 | 不可空,必须选一个 | 岗位分类是核心维度 |
| requiredIndustryId 何时设置 | 仅描述中明确提到行业经验要求时 | 行业经验是任职要求,不是所有岗位都有 |
| tags 定位 | 核心职能标签,最多5个 | 区别于福利标签,体现岗位核心能力要求 |
| skillTags 数量 | 最多8个 | 控制数量,保持精炼 |
| source_id 取值 | app_job_data.id | 简单直接,用于去重 |
| 公司不存在时 | 自动创建 status=0 待完善 | 后续由公司数据补充逻辑完善 |
| skillTag 存储方式 | 独立关联表 bg_job_skill_tag_relation | 百万级数据量,JSON查询性能差 |
| skillTag 匹配时机 | 第二次 AI 调用,岗位入库后 | 需要先确定 categoryId 才能缩小标签范围 |
| skillTag 与 skill_tags 的关系 | 两套独立数据 | skill_tags 是展示用自由文本,skillTag 是计算用预定义标签 |
| 第二次 AI 失败是否影响入库 | 不影响 | 标签匹配是增强功能,不阻断主流程 |
三、公司数据触发逻辑
3.1 状态扩展
bg_company.status 扩展为5个值:
- 0=待完善:岗位清洗时自动创建的公司,只有 short_name
- 1=已完善:AI补充完成
- 2=禁用:人工标记禁用
- 3=补充中:定时任务已锁定,正在调用AI
- 4=补充失败:AI明确不认识该公司,不再自动重试
3.2 两个定时任务(与岗位清洗同一套模式)
任务C:公司数据补充任务(低频,每小时)
- 批量锁定(事务内
SELECT ... FOR UPDATE+UPDATE):
-- 事务内执行,行锁保证并发安全
SELECT * FROM bg_company WHERE status=0 LIMIT N FOR UPDATE;
UPDATE bg_company SET status=3, update_time=NOW() WHERE id IN (...);
⚠️ 锁定时必须同时更新 update_time,因为 bg_company 的 update_time 不像 app_job_data.updated_at 那样由数据库自动维护,需要 Java 侧手动设值。如果不更新,后续僵尸恢复任务无法正确判断超时。
- 将锁定的数据丢入线程池,多线程并发调用AI补充
- 每条处理完毕后,回填公司信息,更新
status=1(已完善) - AI明确不认识该公司(valid=false)→ 更新
status=4(补充失败) - AI调用异常或解析失败 → 保持
status=3,由僵尸恢复任务重置
任务D:公司僵尸恢复任务(低频,每小时,与任务C错开)
处理因发布重启导致卡在"补充中"的僵尸数据:
UPDATE bg_company SET status=0 WHERE status=3 AND update_time < NOW() - INTERVAL 10 MINUTE
超时10分钟仍在"补充中"的数据重置为待完善,下次任务C会重新捞取处理。
3.3 与岗位清洗触发逻辑的对比
| 对比项 | 岗位清洗 | 公司补充 |
|---|---|---|
| 状态字段 | app_job_data.clean_status | bg_company.status |
| 锁定值 | 1=清洗中 | 3=补充中 |
| 完成值 | 2=已入库 / 3=已丢弃 | 1=已完善 / 4=补充失败 |
| 时间字段 | updated_at(数据库自动) | update_time(Java手动设值) |
| 锁定时是否需手动更新时间 | 不需要 | 需要,否则僵尸恢复无法判断超时 |
| 触发频率 | 每5分钟 | 每小时 |
| 僵尸恢复频率 | 每30分钟 | 每小时(与任务C错开) |
3.4 设计决策记录
| 决策点 | 结论 | 原因 |
|---|---|---|
| 是否与岗位清洗同步触发 | 否,独立定时任务 | AI调用场景不同,频率不同 |
| 触发模式 | 复用岗位清洗的"SELECT FOR UPDATE + 僵尸恢复"模式 | 统一架构,行锁保证并发安全 |
| 锁定时是否更新时间 | 是 | bg_company.update_time 非数据库自动维护,不更新则僵尸恢复失效 |
| 补充频率 | 每小时 | 公司数据量少,AI调用成本可控 |
四、公司数据补充逻辑(AI补充)
4.1 方案说明
原计划使用工商API(天眼查等)查询公司信息,但考虑到:
- 公司简称不能很好地查询到精确数据
- 用AI生成完整企业名字再查API,准确性也无法保证
- 我们的公司数据来源于招聘平台,基本都是有一定规模的企业,AI训练数据覆盖率高
因此改为直接使用AI补充公司数据,一次调用返回全部字段。
4.2 补充流程
- 拿
short_name(公司简称),拼 prompt 调 AI - prompt 中附带行业列表(
DictCacheService.getIndustryText()),让 AI 直接返回industryId - AI 返回结构化 JSON,包含
valid字段判断是否认识该公司 valid=false→ 更新status=4(补充失败),结束regionCode:AI 返回城市名,Java 侧DictCacheService.matchRegionCode匹配- 解析 JSON,回填
bg_company各字段 - 更新
status=1(已完善)
4.3 AI 返回 JSON 结构
{
"valid": true,
"name": "北京字节跳动科技有限公司",
"city": "北京",
"companyType": "独角兽",
"industryId": 5,
"tags": ["短视频", "人工智能", "社交平台"],
"summary": "全球领先的内容平台和科技公司,旗下拥有抖音、今日头条等产品",
"description": "字节跳动成立于2012年,是一家以技术驱动的全球化互联网公司...",
"foundedYear": "2012",
"address": "北京市海淀区北三环西路27号",
"scale": "10000人以上",
"website": "https://www.bytedance.com",
"financingStage": "已上市",
"latestValuation": "2200亿美元",
"news": [
"字节跳动2024年营收突破1200亿美元创历史新高",
"TikTok全球月活用户突破15亿大关",
"字节跳动加大AI大模型研发投入布局人工智能赛道"
]
}
4.4 各字段补充规则
| bg_company 字段 | AI返回字段 | 规则 |
|---|---|---|
| name | name | 公司全称,AI不确定则null |
| logoUrl | — | AI无法提供,留空 |
| regionCode | city | AI返回城市名,Java侧matchRegionCode匹配 |
| companyType | companyType | 上市企业、独角兽、国企、民营企业、外资企业等 |
| industryId | industryId | 从行业列表中选,不确定则null |
| tags | tags | 公司标签,JSON数组,最多5个 |
| summary | summary | 一句话简介,100字以内 |
| description | description | 公司详细描述,500字以内 |
| foundedYear | foundedYear | 成立年份,如"2012" |
| address | address | 注册/总部地址 |
| scale | scale | 企业规模,如"1000-5000人"、"10000人以上" |
| website | website | 官网地址 |
| financingStage | financingStage | 融资状态,如"A轮"、"已上市"、"不需要融资" |
| latestValuation | latestValuation | 最新估值,AI知道就给,不知道null |
| news | news | 3条相关新闻,每条50字以内,JSON数组 |
4.5 prompt 规则
- AI 不认识该公司 →
valid=false,其他字段不需要返回 - 不确定的字段返回 null,不要编造
industryId必须从给定行业列表中选择news最多3条,每条50字以内,基于AI知识库中最新的信息tags最多5个,体现公司核心业务特征- 字符串值中不允许出现Tab、换行等控制字符
- 只返回JSON,不要其他内容
4.6 设计决策记录
| 决策点 | 结论 | 原因 |
|---|---|---|
| 数据来源 | AI补充,不用工商API | 公司简称查API不精确,AI对招聘平台企业覆盖率高 |
| AI不认识的公司 | status=4(补充失败),不再自动重试 | 避免无限重试浪费AI调用 |
| logoUrl | 留空 | AI无法提供图片URL |
| news 时效性 | 不要求实时,取AI知识库内最新的3条 | 求职场景不需要实时新闻 |
| latestValuation | AI知道就给,不知道null | 大部分招聘企业AI有数据 |
| regionCode 匹配方式 | AI返回城市名,Java侧匹配 | 复用已有的matchRegionCode逻辑 |
| industryId 匹配方式 | prompt带行业列表,AI直接返回ID | 复用已有的行业列表文本 |