Files
offerpai_backend/.kiro/steering/数据清洗方案.md
T
2026-03-19 15:22:11 +08:00

17 KiB
Raw Blame History

inclusion
inclusion
manual

数据清洗方案

总体架构

爬虫(公司网络) → app_job_data(原始数据)
                      ↓
              Java定时任务读取(多线程)
                      ↓
            调用AI API清洗/结构化
                      ↓
         写入业务表(bg_company + bg_job + 关联表)
                      ↓
            公司信息不完整的 → 调工商API补充

所有清洗逻辑放在 manager 模块,通过 @Scheduled 定时任务触发。

讨论分区

整体方案分为四部分逐步讨论:

  1. 岗位清洗触发逻辑
  2. 岗位清洗逻辑
  3. 公司数据触发逻辑
  4. 公司数据补充逻辑(AI补充)

一、岗位清洗触发逻辑

1.1 表结构变更

app_job_data 新增字段:

ALTER TABLE app_job_data ADD COLUMN clean_status TINYINT(1) DEFAULT 0 NOT NULL COMMENT '清洗状态 0=待清洗 1=清洗中 2=已入库 3=已丢弃';
CREATE INDEX idx_clean_status ON app_job_data (clean_status);

状态说明:

  • 0=待清洗:新爬到的数据,默认值,不影响爬虫原有插入逻辑
  • 1=清洗中:定时任务已锁定,正在处理
  • 2=已入库:清洗成功,已写入 bg_job
  • 3=已丢弃:AI判定为无效数据,不入库

时间记录:不加额外时间字段,利用已有的 updated_atON UPDATE CURRENT_TIMESTAMP),状态变更时自动更新。

1.2 两个定时任务

任务A:岗位清洗任务(高频,每5分钟)

  1. 批量锁定(事务内 SELECT ... FOR UPDATE + UPDATE):先在事务内对 clean_status=0 AND is_valid=1 的行加行锁并查出数据,再更新 clean_status=1,事务提交后释放锁。行锁保证并发安全,其他线程会阻塞直到事务提交
  2. 将锁定的数据丢入线程池,多线程并发调用 AI API 清洗
  3. 每条处理完毕后,单独更新 clean_status 为 2(已入库)或 3(已丢弃)
  4. 单条写入事务:bg_job 入库 + clean_status 更新放在同一个短事务中,保证一致性

任务B:僵尸恢复任务(低频,每30分钟)

处理因发布重启导致卡在"清洗中"的僵尸数据:

UPDATE app_job_data SET clean_status=0 WHERE clean_status=1 AND updated_at < NOW() - INTERVAL 10 MINUTE

一条SQL搞定,将超时10分钟仍在"清洗中"的数据重置为待清洗,下次任务A会重新捞取处理。

1.3 去重保障

即使同一条数据被重复清洗(僵尸恢复后重新处理),写入 bg_job 时通过 source_id 判断是否已存在,存在则跳过,不会产生重复数据。

1.4 设计决策记录

决策点 结论 原因
清洗状态放哪 app_job_data 加字段 同库,简单直接
是否加"清洗中"状态 多线程并发需要锁定机制
长事务 vs 短事务 短事务(单条) AI调用耗时长,不能hold连接
僵尸恢复方式 独立低频定时任务 避免每次清洗任务都多一次查询,节省性能
是否加 clean_time 字段 updated_at 自动更新,够用
失败重试 僵尸恢复任务自动处理 clean_status=1 超时后重置为0,自动重试

二、岗位清洗逻辑

2.1 前置校验(Java侧,不调AI)

  • description 为空或长度 < 20 → 直接标记 clean_status=3(丢弃),跳过,节省AI调用成本

2.2 参考数据准备

应用启动时加载并缓存(定期刷新):

  • bg_job_category 全量:只取叶子节点(level=3),拼成 id:name(一级/二级) 文本列表
  • bg_industry 全量:只取叶子节点(level=2),拼成 id:name(一级) 文本列表
  • bg_skill_tag 全量:按 categoryId 分组缓存为 Map<Long, List<SkillTag>>,供第二次 AI 调用使用

分类和行业列表作为 prompt 的一部分传给AI,ID由人工维护为短数字,不使用雪花ID。

地区数据(bg_china_regions_code)不传给AI,由Java侧根据AI返回的城市名自行匹配。

2.3 AI 调用(单次调用,返回结构化JSON)

输入

  • 原始字段:job_title、salary、location、company、experience、education、description
  • 参考列表:岗位分类(id:name)、行业(id:name

AI 返回 JSON 结构

{
  "valid": true,
  "title": "Java高级开发工程师",
  "salary": "15-25K",
  "education": 2,
  "minExperience": 3,
  "employmentType": 0,
  "categoryId": 12,
  "industryId": 5,
  "description": "1. 负责核心业务系统开发...",
  "requirement": "1. 本科及以上学历...",
  "bonus": "1. 有分布式系统经验优先...",
  "tags": ["数据分析", "产品策略", "团队协作"],
  "skillTags": ["Java", "Spring Boot", "MySQL"],
  "companyShortName": "字节跳动",
  "cities": ["北京", "上海"]
}

各字段清洗规则

字段 来源 规则
valid AI综合判断 数据是否有效,false则丢弃
title job_title 存在则保留;不存在则AI从description归纳生成
salary salary 有效则标准化(10-20K / 20K / 面议);无效或空则null
education education + description 映射为 0=不限 1=大专 2=本科 3=硕士 4=博士
minExperience experience + description 提取最低年限数值,不要求则为0
employmentType description 判断 0=全职 1=兼职,默认0
categoryId description + job_title 必选,从分类列表中选最接近的,不允许返回null
industryId description(任职要求部分) 仅当明确提到行业经验要求时设置;列表中无完全匹配则选最相似的;未提到则null
description description + experience + education 提取"岗位职责"部分,保持原文风格,格式化展示
requirement description + experience + education 提取"任职要求"部分,保持原文风格,格式化展示
bonus description + experience + education 提取"加分项"部分,无则空
tags description + job_title 核心职能标签(如数据分析、产品策略、团队协作),最多5个
skillTags description 技能关键词(如Java、Spring Boot、MySQL),最多8个
companyShortName company 提取简洁的公司简称,去掉地区后缀、招聘后缀、括号内容等,保持"中国平安""字节跳动"风格
cities location 提取城市名列表,精确到市级

2.4 AI 返回后的 Java 处理流程

  1. valid=false → 更新 clean_status=3,结束
  2. 公司处理:按AI清洗后的 companyShortNamebg_company.short_name,存在则拿 company_id;不存在则创建一条(short_name=companyShortName, status=0待完善),拿新ID
  3. 地区处理cities 列表逐个匹配 bg_china_regions_code(按name匹配到市级),匹配上的准备写入关联表
  4. 去重:用 source_idapp_job_data.id)查 bg_job,已存在则跳过,更新 clean_status=2
  5. 写入 bg_job:组装所有字段,source_id=app_job_data.idsource_url=detail_urlstatus=0(上架)
  6. 写入 bg_job_region_relation:岗位ID + 匹配到的region_code,一岗多地区
  7. 更新 app_job_data.clean_status=2

步骤 2-7 放在一个短事务中,保证数据一致性。

2.5 技能标签匹配(第二次 AI 调用)

bg_skill_tag 是预定义的技能标签池,挂在岗位类型下,用于岗位-简历匹配度计算。与 bg_job.skill_tags(自由文本,展示用)是两套东西。

关联表

CREATE TABLE bg_job_skill_tag_relation (
    id BIGINT NOT NULL,
    job_id BIGINT NOT NULL COMMENT '岗位ID',
    skill_tag_id BIGINT NOT NULL COMMENT '技能标签ID',
    create_time DATETIME NOT NULL COMMENT '创建时间',
    PRIMARY KEY (id),
    INDEX idx_job_id (job_id),
    INDEX idx_skill_tag_id (skill_tag_id),
    UNIQUE INDEX uk_job_skill (job_id, skill_tag_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='岗位-技能标签关联表';

流程

  1. 第一次 AI 调用完成后,拿到 categoryId
  2. 从缓存取该 categoryId 下的 skillTag 列表,为空则跳过
  3. 发起第二次 AI 调用:传岗位标题+职责+要求 + 该分类下的标签列表(id:name),AI 返回匹配的标签 ID 数组
  4. 校验返回的 ID 确实存在于该 categoryId 的标签池中(防幻觉)
  5. 写入 bg_job_skill_tag_relation(在 saveJobData 事务中一并写入)
  6. 第二次 AI 调用失败不影响岗位入库,仅日志记录

prompt 规则

  • 只能从给定列表中选择,不允许自创
  • 不限制个数,不重复即可
  • 只返回 ID 数组,如 [1, 3, 7]

2.6 设计决策记录

决策点 结论 原因
AI调用次数 一次调用返回全部字段 减少API调用成本和延迟
分类/行业列表怎么给AI 直接传 id:name 文本 ID人工维护为短数字,token消耗可控
地区匹配方式 AI输出城市名,Java侧匹配 城市名无歧义,不需要传参考列表
categoryId 是否可空 不可空,必须选一个 岗位分类是核心维度
industryId 何时设置 仅描述中明确提到行业经验时 行业经验是任职要求,不是所有岗位都有
tags 定位 核心职能标签,最多5个 区别于福利标签,体现岗位核心能力要求
skillTags 数量 最多8个 控制数量,保持精炼
source_id 取值 app_job_data.id 简单直接,用于去重
公司不存在时 自动创建 status=0 待完善 后续由公司数据补充逻辑完善
skillTag 存储方式 独立关联表 bg_job_skill_tag_relation 百万级数据量,JSON查询性能差
skillTag 匹配时机 第二次 AI 调用,岗位入库后 需要先确定 categoryId 才能缩小标签范围
skillTag 与 skill_tags 的关系 两套独立数据 skill_tags 是展示用自由文本,skillTag 是计算用预定义标签
第二次 AI 失败是否影响入库 不影响 标签匹配是增强功能,不阻断主流程

三、公司数据触发逻辑

3.1 状态扩展

bg_company.status 扩展为5个值:

  • 0=待完善:岗位清洗时自动创建的公司,只有 short_name
  • 1=已完善:AI补充完成
  • 2=禁用:人工标记禁用
  • 3=补充中:定时任务已锁定,正在调用AI
  • 4=补充失败:AI明确不认识该公司,不再自动重试

3.2 两个定时任务(与岗位清洗同一套模式)

任务C:公司数据补充任务(低频,每小时)

  1. 批量锁定(事务内 SELECT ... FOR UPDATE + UPDATE):
-- 事务内执行,行锁保证并发安全
SELECT * FROM bg_company WHERE status=0 LIMIT N FOR UPDATE;
UPDATE bg_company SET status=3, update_time=NOW() WHERE id IN (...);

⚠️ 锁定时必须同时更新 update_time,因为 bg_companyupdate_time 不像 app_job_data.updated_at 那样由数据库自动维护,需要 Java 侧手动设值。如果不更新,后续僵尸恢复任务无法正确判断超时。

  1. 将锁定的数据丢入线程池,多线程并发调用AI补充
  2. 每条处理完毕后,回填公司信息,更新 status=1(已完善)
  3. AI明确不认识该公司(valid=false)→ 更新 status=4(补充失败)
  4. AI调用异常或解析失败 → 保持 status=3,由僵尸恢复任务重置

任务D:公司僵尸恢复任务(低频,每小时,与任务C错开)

处理因发布重启导致卡在"补充中"的僵尸数据:

UPDATE bg_company SET status=0 WHERE status=3 AND update_time < NOW() - INTERVAL 10 MINUTE

超时10分钟仍在"补充中"的数据重置为待完善,下次任务C会重新捞取处理。

3.3 与岗位清洗触发逻辑的对比

对比项 岗位清洗 公司补充
状态字段 app_job_data.clean_status bg_company.status
锁定值 1=清洗中 3=补充中
完成值 2=已入库 / 3=已丢弃 1=已完善 / 4=补充失败
时间字段 updated_at(数据库自动) update_timeJava手动设值)
锁定时是否需手动更新时间 不需要 需要,否则僵尸恢复无法判断超时
触发频率 每5分钟 每小时
僵尸恢复频率 每30分钟 每小时(与任务C错开)

3.4 设计决策记录

决策点 结论 原因
是否与岗位清洗同步触发 否,独立定时任务 AI调用场景不同,频率不同
触发模式 复用岗位清洗的"SELECT FOR UPDATE + 僵尸恢复"模式 统一架构,行锁保证并发安全
锁定时是否更新时间 bg_company.update_time 非数据库自动维护,不更新则僵尸恢复失效
补充频率 每小时 公司数据量少,AI调用成本可控

四、公司数据补充逻辑(AI补充)

4.1 方案说明

原计划使用工商API(天眼查等)查询公司信息,但考虑到:

  • 公司简称不能很好地查询到精确数据
  • 用AI生成完整企业名字再查API,准确性也无法保证
  • 我们的公司数据来源于招聘平台,基本都是有一定规模的企业,AI训练数据覆盖率高

因此改为直接使用AI补充公司数据,一次调用返回全部字段。

4.2 补充流程

  1. short_name(公司简称),拼 prompt 调 AI
  2. prompt 中附带行业列表(DictCacheService.getIndustryText()),让 AI 直接返回 industryId
  3. AI 返回结构化 JSON,包含 valid 字段判断是否认识该公司
  4. valid=false → 更新 status=4(补充失败),结束
  5. regionCodeAI 返回城市名,Java 侧 DictCacheService.matchRegionCode 匹配
  6. 解析 JSON,回填 bg_company 各字段
  7. 更新 status=1(已完善)

4.3 AI 返回 JSON 结构

{
  "valid": true,
  "name": "北京字节跳动科技有限公司",
  "city": "北京",
  "companyType": "独角兽",
  "industryId": 5,
  "tags": ["短视频", "人工智能", "社交平台"],
  "summary": "全球领先的内容平台和科技公司,旗下拥有抖音、今日头条等产品",
  "description": "字节跳动成立于2012年,是一家以技术驱动的全球化互联网公司...",
  "foundedYear": "2012",
  "address": "北京市海淀区北三环西路27号",
  "scale": "10000人以上",
  "website": "https://www.bytedance.com",
  "financingStage": "已上市",
  "latestValuation": "2200亿美元",
  "news": [
    "字节跳动2024年营收突破1200亿美元创历史新高",
    "TikTok全球月活用户突破15亿大关",
    "字节跳动加大AI大模型研发投入布局人工智能赛道"
  ]
}

4.4 各字段补充规则

bg_company 字段 AI返回字段 规则
name name 公司全称,AI不确定则null
logoUrl AI无法提供,留空
regionCode city AI返回城市名,Java侧matchRegionCode匹配
companyType companyType 上市企业、独角兽、国企、民营企业、外资企业等
industryId industryId 从行业列表中选,不确定则null
tags tags 公司标签,JSON数组,最多5个
summary summary 一句话简介,100字以内
description description 公司详细描述,500字以内
foundedYear foundedYear 成立年份,如"2012"
address address 注册/总部地址
scale scale 企业规模,如"1000-5000人"、"10000人以上"
website website 官网地址
financingStage financingStage 融资状态,如"A轮"、"已上市"、"不需要融资"
latestValuation latestValuation 最新估值,AI知道就给,不知道null
news news 3条相关新闻,每条50字以内,JSON数组

4.5 prompt 规则

  • AI 不认识该公司 → valid=false,其他字段不需要返回
  • 不确定的字段返回 null,不要编造
  • industryId 必须从给定行业列表中选择
  • news 最多3条,每条50字以内,基于AI知识库中最新的信息
  • tags 最多5个,体现公司核心业务特征
  • 字符串值中不允许出现Tab、换行等控制字符
  • 只返回JSON,不要其他内容

4.6 设计决策记录

决策点 结论 原因
数据来源 AI补充,不用工商API 公司简称查API不精确,AI对招聘平台企业覆盖率高
AI不认识的公司 status=4(补充失败),不再自动重试 避免无限重试浪费AI调用
logoUrl 留空 AI无法提供图片URL
news 时效性 不要求实时,取AI知识库内最新的3条 求职场景不需要实时新闻
latestValuation AI知道就给,不知道null 大部分招聘企业AI有数据
regionCode 匹配方式 AI返回城市名,Java侧匹配 复用已有的matchRegionCode逻辑
industryId 匹配方式 prompt带行业列表,AI直接返回ID 复用已有的行业列表文本