岗位清洗,差岗位技能提取

This commit is contained in:
zk
2026-03-18 21:33:07 +08:00
parent f0eec5fe3e
commit 6bddeecb7a
12 changed files with 997 additions and 7 deletions
+274
View File
@@ -0,0 +1,274 @@
---
inclusion: manual
---
# 数据清洗方案
## 总体架构
```
爬虫(公司网络) → app_job_data(原始数据)
Java定时任务读取(多线程)
调用AI API清洗/结构化
写入业务表(bg_company + bg_job + 关联表)
公司信息不完整的 → 调工商API补充
```
所有清洗逻辑放在 `manager` 模块,通过 `@Scheduled` 定时任务触发。
## 讨论分区
整体方案分为四部分逐步讨论:
1. ✅ 岗位清洗触发逻辑
2. ✅ 岗位清洗逻辑
3. ✅ 公司数据触发逻辑
4. ⬜ 公司数据补充逻辑(待定API后补充)
---
## 一、岗位清洗触发逻辑
### 1.1 表结构变更
`app_job_data` 新增字段:
```sql
ALTER TABLE app_job_data ADD COLUMN clean_status TINYINT(1) DEFAULT 0 NOT NULL COMMENT '清洗状态 0=待清洗 1=清洗中 2=已入库 3=已丢弃';
CREATE INDEX idx_clean_status ON app_job_data (clean_status);
```
状态说明:
- 0=待清洗:新爬到的数据,默认值,不影响爬虫原有插入逻辑
- 1=清洗中:定时任务已锁定,正在处理
- 2=已入库:清洗成功,已写入 bg_job
- 3=已丢弃:AI判定为无效数据,不入库
时间记录:不加额外时间字段,利用已有的 `updated_at`ON UPDATE CURRENT_TIMESTAMP),状态变更时自动更新。
### 1.2 两个定时任务
#### 任务A:岗位清洗任务(高频,每5分钟)
1. 批量锁定:`UPDATE app_job_data SET clean_status=1 WHERE clean_status=0 AND is_valid=1 LIMIT N`(原子操作,防止多线程重复捞取)
2. 将锁定的数据丢入线程池,多线程并发调用 AI API 清洗
3. 每条处理完毕后,单独更新 `clean_status` 为 2(已入库)或 3(已丢弃)
4. 单条写入事务:bg_job 入库 + clean_status 更新放在同一个短事务中,保证一致性
#### 任务B:僵尸恢复任务(低频,每30分钟)
处理因发布重启导致卡在"清洗中"的僵尸数据:
```sql
UPDATE app_job_data SET clean_status=0 WHERE clean_status=1 AND updated_at < NOW() - INTERVAL 10 MINUTE
```
一条SQL搞定,将超时10分钟仍在"清洗中"的数据重置为待清洗,下次任务A会重新捞取处理。
### 1.3 去重保障
即使同一条数据被重复清洗(僵尸恢复后重新处理),写入 `bg_job` 时通过 `source_id` 判断是否已存在,存在则跳过,不会产生重复数据。
### 1.4 设计决策记录
| 决策点 | 结论 | 原因 |
|--------|------|------|
| 清洗状态放哪 | app_job_data 加字段 | 同库,简单直接 |
| 是否加"清洗中"状态 | 是 | 多线程并发需要锁定机制 |
| 长事务 vs 短事务 | 短事务(单条) | AI调用耗时长,不能hold连接 |
| 僵尸恢复方式 | 独立低频定时任务 | 避免每次清洗任务都多一次查询,节省性能 |
| 是否加 clean_time 字段 | 否 | updated_at 自动更新,够用 |
| 失败重试 | 僵尸恢复任务自动处理 | clean_status=1 超时后重置为0,自动重试 |
---
## 二、岗位清洗逻辑
### 2.1 前置校验(Java侧,不调AI)
- `description` 为空或长度 < 20 → 直接标记 `clean_status=3`(丢弃),跳过,节省AI调用成本
### 2.2 参考数据准备
应用启动时加载并缓存(定期刷新):
- `bg_job_category` 全量:拼成 `id:name` 文本列表
- `bg_industry` 全量:拼成 `id:name` 文本列表
这两份列表作为 prompt 的一部分传给AI,ID由人工维护为短数字,不使用雪花ID。
地区数据(`bg_china_regions_code`)不传给AI,由Java侧根据AI返回的城市名自行匹配。
### 2.3 AI 调用(单次调用,返回结构化JSON)
#### 输入
- 原始字段:job_title、salary、location、company、experience、education、description
- 参考列表:岗位分类(id:name)、行业(id:name
#### AI 返回 JSON 结构
```json
{
"valid": true,
"title": "Java高级开发工程师",
"salary": "15-25K",
"education": 2,
"minExperience": 3,
"employmentType": 0,
"categoryId": 12,
"industryId": 5,
"description": "1. 负责核心业务系统开发...",
"requirement": "1. 本科及以上学历...",
"bonus": "1. 有分布式系统经验优先...",
"tags": ["数据分析", "产品策略", "团队协作"],
"skillTags": ["Java", "Spring Boot", "MySQL"],
"companyShortName": "字节跳动",
"cities": ["北京", "上海"]
}
```
#### 各字段清洗规则
| 字段 | 来源 | 规则 |
|------|------|------|
| valid | AI综合判断 | 数据是否有效,false则丢弃 |
| title | job_title | 存在则保留;不存在则AI从description归纳生成 |
| salary | salary | 有效则标准化(10-20K / 20K / 面议);无效或空则null |
| education | education + description | 映射为 0=不限 1=大专 2=本科 3=硕士 4=博士 |
| minExperience | experience + description | 提取最低年限数值,不要求则为0 |
| employmentType | description | 判断 0=全职 1=兼职,默认0 |
| categoryId | description + job_title | 必选,从分类列表中选最接近的,不允许返回null |
| industryId | description(任职要求部分) | 仅当明确提到行业经验要求时设置;列表中无完全匹配则选最相似的;未提到则null |
| description | description + experience + education | 提取"岗位职责"部分,保持原文风格,格式化展示 |
| requirement | description + experience + education | 提取"任职要求"部分,保持原文风格,格式化展示 |
| bonus | description + experience + education | 提取"加分项"部分,无则空 |
| tags | description + job_title | 核心职能标签(如数据分析、产品策略、团队协作),最多5个 |
| skillTags | description | 技能关键词(如Java、Spring Boot、MySQL),最多8个 |
| companyShortName | company | 提取简洁的公司简称,去掉地区后缀、招聘后缀、括号内容等,保持"中国平安""字节跳动"风格 |
| cities | location | 提取城市名列表,精确到市级 |
### 2.4 AI 返回后的 Java 处理流程
1. **valid=false** → 更新 `clean_status=3`,结束
2. **公司处理**:按AI清洗后的 `companyShortName``bg_company.short_name`,存在则拿 `company_id`;不存在则创建一条(short_name=companyShortName, status=0待完善),拿新ID
3. **地区处理**`cities` 列表逐个匹配 `bg_china_regions_code`(按name匹配到市级),匹配上的准备写入关联表
4. **去重**:用 `source_id`app_job_data.id)查 `bg_job`,已存在则跳过,更新 `clean_status=2`
5. **写入 bg_job**:组装所有字段,`source_id`=app_job_data.id`source_url`=detail_url`status=0`(上架)
6. **写入 bg_job_region_relation**:岗位ID + 匹配到的region_code,一岗多地区
7. **更新 app_job_data.clean_status=2**
步骤 2-7 放在一个短事务中,保证数据一致性。
### 2.5 设计决策记录
| 决策点 | 结论 | 原因 |
|--------|------|------|
| AI调用次数 | 一次调用返回全部字段 | 减少API调用成本和延迟 |
| 分类/行业列表怎么给AI | 直接传 id:name 文本 | ID人工维护为短数字,token消耗可控 |
| 地区匹配方式 | AI输出城市名,Java侧匹配 | 城市名无歧义,不需要传参考列表 |
| categoryId 是否可空 | 不可空,必须选一个 | 岗位分类是核心维度 |
| industryId 何时设置 | 仅描述中明确提到行业经验时 | 行业经验是任职要求,不是所有岗位都有 |
| tags 定位 | 核心职能标签,最多5个 | 区别于福利标签,体现岗位核心能力要求 |
| skillTags 数量 | 最多8个 | 控制数量,保持精炼 |
| source_id 取值 | app_job_data.id | 简单直接,用于去重 |
| 公司不存在时 | 自动创建 status=0 待完善 | 后续由公司数据补充逻辑完善 |
---
## 三、公司数据触发逻辑
### 3.1 状态扩展
`bg_company.status` 扩展为4个值:
- 0=待完善:岗位清洗时自动创建的公司,只有 short_name
- 1=已完善:工商API补充完成
- 2=禁用:人工标记禁用
- 3=补充中:定时任务已锁定,正在调用工商API
### 3.2 两个定时任务(与岗位清洗同一套模式)
#### 任务C:公司数据补充任务(低频,每小时)
1. 批量锁定(原子操作):
```sql
UPDATE bg_company SET status=3, update_time=NOW() WHERE status=0 LIMIT N
```
⚠️ 锁定时必须同时更新 `update_time`,因为 `bg_company``update_time` 不像 `app_job_data.updated_at` 那样由数据库自动维护,需要 Java 侧手动设值。如果不更新,后续僵尸恢复任务无法正确判断超时。
2. 将锁定的数据丢入线程池,多线程并发调用工商API
3. 每条处理完毕后,回填公司信息,更新 `status=1`(已完善)
4. 工商API查不到或返回异常 → 保持 `status=3`,由僵尸恢复任务重置
#### 任务D:公司僵尸恢复任务(低频,每小时,与任务C错开)
处理因发布重启导致卡在"补充中"的僵尸数据:
```sql
UPDATE bg_company SET status=0 WHERE status=3 AND update_time < NOW() - INTERVAL 10 MINUTE
```
超时10分钟仍在"补充中"的数据重置为待完善,下次任务C会重新捞取处理。
### 3.3 与岗位清洗触发逻辑的对比
| 对比项 | 岗位清洗 | 公司补充 |
|--------|----------|----------|
| 状态字段 | app_job_data.clean_status | bg_company.status |
| 锁定值 | 1=清洗中 | 3=补充中 |
| 完成值 | 2=已入库 / 3=已丢弃 | 1=已完善 |
| 时间字段 | updated_at(数据库自动) | update_timeJava手动设值) |
| 锁定时是否需手动更新时间 | 不需要 | **需要**,否则僵尸恢复无法判断超时 |
| 触发频率 | 每5分钟 | 每小时 |
| 僵尸恢复频率 | 每30分钟 | 每小时(与任务C错开) |
### 3.4 设计决策记录
| 决策点 | 结论 | 原因 |
|--------|------|------|
| 是否与岗位清洗同步触发 | 否,独立定时任务 | 外部API不同,频率不同,失败场景不同 |
| 触发模式 | 复用岗位清洗的"原子锁定+僵尸恢复"模式 | 统一架构,代码可复用 |
| 锁定时是否更新时间 | 是 | bg_company.update_time 非数据库自动维护,不更新则僵尸恢复失效 |
| 补充频率 | 每小时 | 公司数据量少,工商API可能有频率限制 |
---
## 四、公司数据补充逻辑(待定API后补充)
### 4.1 补充流程概要
1.`short_name`(公司简称)调用工商API搜索
2. API返回匹配的企业列表,取最匹配的一条
3. 回填 `bg_company` 各字段,更新 `status=1`
### 4.2 需要回填的字段
| bg_company 字段 | 来源 | 说明 |
|-----------------|------|------|
| name | 工商API | 公司全称 |
| logoUrl | 待定 | 工商API可能不提供,需另外来源 |
| regionCode | 工商API(注册地址) | 匹配 bg_china_regions_code |
| companyType | 工商API | 上市企业、独角兽、国企等 |
| industryId | 工商API(行业分类) | 匹配 bg_industry |
| tags | 工商API / AI | 公司标签,JSON数组 |
| summary | 工商API / AI | 公司简要介绍 |
| description | 工商API | 公司详细描述 |
| foundedYear | 工商API | 成立时间 |
| address | 工商API | 注册地址 |
| scale | 工商API | 企业规模(人数) |
| website | 工商API | 官网地址 |
| financingStage | 工商API / 其他来源 | 融资状态 |
| latestValuation | 工商API / 其他来源 | 最新估值 |
| news | 新闻API(待定) | 新闻动态,JSON数组 |
### 4.3 待定事项
- [ ] 选定工商信息API(天眼查、企查查、爱企查等)
- [ ] 确认API返回字段与 bg_company 的映射关系
- [ ] 新闻动态数据来源(工商API是否包含,还是需要单独的新闻API)
- [ ] logoUrl 来源(工商API是否提供)
- [ ] 匹配到多条结果时的处理策略
- [ ] 查不到结果时的处理策略(保持待完善 or 标记为其他状态)
- [ ] API调用频率限制和成本评估
+9 -4
View File
@@ -45,6 +45,7 @@ offerpie/back-end
│ ├─ aop/ # AOP 日志切面
│ ├─ exception/ # 业务异常统一处理
│ ├─ email/ # 邮件发送抽象(EmailAbility
│ ├─ ai/ # AI 对话能力封装(AiChatAbility、AiChatConfig
│ ├─ wxPay/ # 微信支付相关能力(Js、Native、Transfer 等)
│ ├─ pojo/ # 公共 POJO(统一响应、登录/防重放 token 等)
│ └─ web/ # Spring MVC 全局响应体 advice
@@ -73,7 +74,8 @@ offerpie/back-end
│ ├─ SkillTagMapper.java # 技能标签Mapper
│ ├─ UserJobFavoriteMapper.java # 用户收藏岗位Mapper
│ ├─ UserJobApplicationMapper.java # 用户投递记录Mapper
─ UserJobDislikeMapper.java # 用户不感兴趣记录Mapper
─ UserJobDislikeMapper.java # 用户不感兴趣记录Mapper
│ └─ AppJobDataMapper.java # 爬虫岗位原始数据Mapper
├─ pojo/
│ ├─ po/ # 持久化实体
│ │ ├─ User.java
@@ -93,9 +95,10 @@ offerpie/back-end
│ │ ├─ SkillTag.java # 技能标签表(bg_skill_tag
│ │ ├─ UserJobFavorite.java # 用户收藏岗位表(bg_user_job_favorite
│ │ ├─ UserJobApplication.java # 用户投递记录表(bg_user_job_application
│ │ ─ UserJobDislike.java # 用户不感兴趣记录表(bg_user_job_dislike
│ │ ─ UserJobDislike.java # 用户不感兴趣记录表(bg_user_job_dislike
│ │ └─ AppJobData.java # 爬虫岗位原始数据表(app_job_data
│ └─ vo/ # ViewObjectOssUrlVo 等)
└─ service/ # 业务 ServiceOssService、SmsService 等)
└─ service/ # 业务 ServiceOssService、SmsService、DictCacheService、JobCleanService、JobCleanTransactionService 等)
```
> **设计理念** – 业务实体和 Mapper 位于 `manager`B 端和 C 端共享;C 端特有的注解、切面、权限服务、路由菜单服务位于 `client-api`,避免 B 端误用;`common` 提供统一的技术支撑。
@@ -104,7 +107,7 @@ offerpie/back-end
|------|----------|-----------|
| **client-api** | - 面向终端用户的 REST API <br> - 启动 Spring Boot 应用 <br> - 短信验证码登录(含自动注册、邀请码绑定) <br> - **功能权限校验**:注解 + 切面 + 权限服务(校验、扣减、回退) <br> - **路由菜单**:获取用户有效菜单树 | `ClientApplication``LoginController``RouteMenuController``FuncPermission``FuncPermissionAspect``FuncPermissionService``RouteMenuService``UserRegisterService``RouteMenuVo` |
| **common** | - **统一配置**OSS、Redis、Security、WxPay、Sms 等 <br> - **跨层工具**:HTTP、IP、认证、验证码、Redis Server 等 <br> - **全局拦截/切面**:日志、TraceId、黑名单、SQL 打印 <br> - **统一异常/响应**`GlobalExceptionAdvice``UnifiedResponse` <br> - **业务抽象**:邮件发送、微信支付(Native/JS/Transfer <br> - **公共 POJO**:登录令牌、防重放信息等 | `config/`, `tool/`, `interceptor/`, `aop/`, `exception/`, `email/`, `wxPay/`, `pojo/` |
| **manager** | - **业务实体**`User``OssFile``UserInvite``RouteMenu``FuncPermission``UserRouteMenuStock``UserFuncPermissionStock``UserFuncUsageLog``ChinaRegionsCode``JobCategory``Company``Job``JobRegion``UserJobFavorite``UserJobApplication` <br> - **MyBatis Mapper**`UserMapper``OssFileMapper``UserInviteMapper``RouteMenuMapper``FuncPermissionMapper``UserRouteMenuStockMapper``UserFuncPermissionStockMapper``UserFuncUsageLogMapper``ChinaRegionsCodeMapper``JobCategoryMapper``CompanyMapper``JobMapper``JobRegionRelationMapper``UserJobFavoriteMapper``UserJobApplicationMapper` <br> - **业务 API**:文件上传/下载、健康检查等 <br> - **业务逻辑**:服务层、工具类等 <br> - **既供 B 端 UI(待实现)使用,也供 C 端业务直接调用** | `controller/`, `mapper/`, `pojo/po/`, `pojo/vo/`, `service/`, `constant/` |
| **manager** | - **业务实体**`User``OssFile``UserInvite``RouteMenu``FuncPermission``UserRouteMenuStock``UserFuncPermissionStock``UserFuncUsageLog``ChinaRegionsCode``JobCategory``Company``Job``JobRegionRelation``Industry``SkillTag``UserJobFavorite``UserJobApplication``UserJobDislike``AppJobData` <br> - **MyBatis Mapper**`UserMapper``OssFileMapper``UserInviteMapper``RouteMenuMapper``FuncPermissionMapper``UserRouteMenuStockMapper``UserFuncPermissionStockMapper``UserFuncUsageLogMapper``ChinaRegionsCodeMapper``JobCategoryMapper``CompanyMapper``JobMapper``JobRegionRelationMapper``IndustryMapper``SkillTagMapper``UserJobFavoriteMapper``UserJobApplicationMapper``UserJobDislikeMapper``AppJobDataMapper` <br> - **业务 API**:文件上传/下载、健康检查等 <br> - **业务逻辑**:服务层、工具类等 <br> - **既供 B 端 UI(待实现)使用,也供 C 端业务直接调用** | `controller/`, `mapper/`, `pojo/po/`, `pojo/vo/`, `service/`, `constant/` |
## 3️⃣ 关键业务实体
| 实体 | 所属模块 | 作用概述 |
@@ -130,6 +133,7 @@ offerpie/back-end
| `Industry` | manager | 行业字典表(bg_industry),树形结构,一级/二级分类。 |
| `SkillTag` | manager | 技能标签表(bg_skill_tag),挂在岗位类型下,不分级,用于匹配度计算。 |
| `UserJobDislike` | manager | 用户不感兴趣记录表(bg_user_job_dislike),记录用户对岗位的不感兴趣原因,冗余公司ID/地区编码/行业ID方便推荐过滤。 |
| `AppJobData` | manager | 爬虫岗位原始数据表(app_job_data),存储爬虫抓取的原始岗位数据,供清洗服务读取并写入业务表。 |
## 4️⃣ 权限体系设计
### 整体架构
@@ -169,6 +173,7 @@ offerpie/back-end
| **配置** | `OssConfig`, `RedissonConf`, `SecurityConfig`, `WxPayConfig`, `SmsConfig` | `common/config` |
| **安全** | JWT 过滤器、登录令牌 (`RedisLoginTokenInfo`)、防重放 (`RedisPreventReplayInfo`) | `common/interceptor``common/pojo/interceptor` |
| **邮件** | `EmailAbility`(封装邮件发送) | `common/email` |
| **AI** | `AiChatAbility`OpenAI 兼容多供应商对话)、`AiChatConfig`(供应商配置) | `common/ai` |
| **微信支付** | `WxJsPayAbility`, `WxNativePayAbility`, `WxTransferPayAbility`, `WxPayNotifyController` | `common/wxPay` |
| **全局异常** | `GlobalExceptionAdvice`, `BusinessException`, `BusinessExpCodeEnum` | `common/exception` |
| **日志 & AOP** | `ControllerLogAspect`, `LoggingOriginalRequestFilter`, `SqlLoggerInterceptor` | `common/aop`, `common/interceptor` |
@@ -74,3 +74,17 @@ app:
#开放接口
ignore:
urls: "/public/**"
# AI 多供应商配置,第一个为默认 provider
# base-url 配到版本路径,如 DeepSeek: https://api.deepseek.com/v1,豆包: https://ark.cn-beijing.volces.com/api/v3
ai:
providers:
job-clean:
base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3}
api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02}
model: ${AI_MODEL:doubao-seed-2-0-lite-260215}
# 岗位清洗配置
job-clean:
batch-size: 20
thread-pool-size: 5
@@ -0,0 +1,77 @@
package org.jiayunet.ai;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.jiayunet.tool.HttpTool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* AI 对话能力封装(OpenAI 兼容)
* <p>支持多供应商配置,不传 key 时使用第一个 provider</p>
*
* @author zk
*/
@Slf4j
@Component
public class AiChatAbility {
@Autowired
private AiChatConfig aiChatConfig;
/**
* 使用默认 provider 发送对话
* <p>默认取 providers 配置中的第一个</p>
*/
public String chat(String systemPrompt, String userMessage) {
String defaultKey = aiChatConfig.getProviders().keySet().stream()
.findFirst()
.orElseThrow(() -> new RuntimeException("未配置任何 AI provider"));
return chat(defaultKey, systemPrompt, userMessage);
}
/**
* 使用指定 provider 发送对话
*
* @param providerKey 供应商标识,对应 yml 中 providers 的 key
* @param systemPrompt 系统提示词
* @param userMessage 用户消息
* @return AI 返回的文本内容
*/
public String chat(String providerKey, String systemPrompt, String userMessage) {
AiChatConfig.ProviderConfig config = aiChatConfig.getProviders().get(providerKey);
if (config == null) {
throw new RuntimeException("AI provider 不存在: " + providerKey);
}
String url = config.getBaseUrl() + "/chat/completions";
log.info("AI 请求 URL: {}, model: {}", url, config.getModel());
Map<String, Object> body = new HashMap<>();
body.put("model", config.getModel());
body.put("messages", List.of(
Map.of("role", "system", "content", systemPrompt),
Map.of("role", "user", "content", userMessage)
));
Map<String, String> headers = new HashMap<>();
headers.put("Authorization", "Bearer " + config.getApiKey());
try {
String response = HttpTool.sendJsonPost(body, url, headers);
JsonNode root = HttpTool.objectMapper.readTree(response);
String content = root.path("choices").path(0).path("message").path("content").asText();
if (content == null || content.isBlank()) {
throw new RuntimeException("AI 返回内容为空");
}
return content;
} catch (Exception e) {
log.error("AI 调用失败, provider={}, model={}", providerKey, config.getModel(), e);
throw new RuntimeException("AI 调用失败: " + e.getMessage(), e);
}
}
}
@@ -0,0 +1,33 @@
package org.jiayunet.ai;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* AI 多供应商配置
* <p>读取 app.ai.providers,第一个为默认 provider</p>
*
* @author zk
*/
@Data
@Component
@ConfigurationProperties(prefix = "app.ai")
public class AiChatConfig {
/** 供应商配置,key 为供应商标识,第一个为默认 */
private Map<String, ProviderConfig> providers = new LinkedHashMap<>();
@Data
public static class ProviderConfig {
/** API 地址 */
private String baseUrl;
/** API Key */
private String apiKey;
/** 模型名称 */
private String model;
}
}
@@ -76,7 +76,7 @@ public class HttpTool {
if (response.getStatusLine().getStatusCode() == 200) {
return bodyStr;
}
throw new IOException("Http请求出现错误,响应码:" + response.getStatusLine().getStatusCode());
throw new IOException("Http请求出现错误,响应码:" + response.getStatusLine().getStatusCode() + ",响应体:" + bodyStr);
}
} catch (IOException e) {
@@ -0,0 +1,13 @@
package org.jiayunet.mapper;
import org.apache.ibatis.annotations.Mapper;
import org.jiayunet.pojo.po.AppJobData;
/**
* 爬虫岗位原始数据Mapper
*
* @author zk
*/
@Mapper
public interface AppJobDataMapper extends CommonMapper<AppJobData> {
}
@@ -0,0 +1,79 @@
package org.jiayunet.pojo.po;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.Instant;
/**
* 爬虫岗位原始数据表(app_job_data
* <p>存储爬虫抓取的原始岗位数据,供清洗服务读取并写入业务表</p>
*
* @author zk
*/
@Data
@TableName(value = "app_job_data")
public class AppJobData {
@TableId(type = IdType.AUTO)
private Long id;
/** 关联爬取任务ID */
private Long taskCrawlId;
/** 职位名称 */
private String jobTitle;
/** 薪资 */
private String salary;
/** 工作地点 */
private String location;
/** 公司名称 */
private String company;
/** 经验要求 */
private String experience;
/** 学历要求 */
private String education;
/** 岗位详情(职责+要求+介绍) */
private String description;
/** 详情页URL */
private String detailUrl;
/** 内容哈希值,用于查重 */
private String contentHash;
/** 数据来源 0=官网 1=平台 */
private Integer sources;
/** 是否独立URL 0=页内展示 1=独立页面 */
private Integer isIndependentUrl;
/** 是否有效 0=无效 1=有效 */
private Integer isValid;
/** 有效期 */
private Instant expireAt;
/** 验证状态 pending=待验证 checking=验证中 checked=已验证 */
private String checkStatus;
/** 清洗状态 0=待清洗 1=清洗中 2=已入库 3=已丢弃 */
private Integer cleanStatus;
/** 上次验证时间 */
private Instant lastCheckAt;
/** 创建时间 */
private Instant createdAt;
/** 更新时间 */
private Instant updatedAt;
}
@@ -67,7 +67,7 @@ public class Company {
/** 新闻动态(JSON数组) */
private String news;
/** 状态 0=待完善 1=已完善 2=禁用 */
/** 状态 0=待完善 1=已完善 2=禁用 3=补充中 */
private Integer status;
/** 创建时间 */
@@ -0,0 +1,124 @@
package org.jiayunet.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.jiayunet.mapper.ChinaRegionsCodeMapper;
import org.jiayunet.mapper.IndustryMapper;
import org.jiayunet.mapper.JobCategoryMapper;
import org.jiayunet.pojo.po.ChinaRegionsCode;
import org.jiayunet.pojo.po.Industry;
import org.jiayunet.pojo.po.JobCategory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 字典数据缓存服务
* <p>启动时加载岗位分类、行业、地区数据到内存,供清洗/推荐等业务使用</p>
* <p>依赖:JobCategoryMapper、IndustryMapper、ChinaRegionsCodeMapper</p>
* <p>使用表:bg_job_category(全量缓存)、bg_industry(全量缓存)、bg_china_regions_code(市级缓存)</p>
*
* @author zk
*/
@Slf4j
@Service
public class DictCacheService {
@Autowired
private JobCategoryMapper jobCategoryMapper;
@Autowired
private IndustryMapper industryMapper;
@Autowired
private ChinaRegionsCodeMapper chinaRegionsCodeMapper;
private List<JobCategory> jobCategoryList;
private List<Industry> industryList;
private List<ChinaRegionsCode> regionList;
/** 岗位分类文本(叶子节点,带父级路径),供 AI prompt 使用 */
private String jobCategoryText;
/** 行业文本(叶子节点,带父级路径),供 AI prompt 使用 */
private String industryText;
/**
* 启动时加载全量字典数据
* <p>分类/行业全量加载用于构建父级路径,文本只取叶子节点</p>
*/
@PostConstruct
public void refresh() {
log.info("开始加载字典缓存...");
jobCategoryList = jobCategoryMapper.selectList(null);
industryList = industryMapper.selectList(null);
// 只缓存省级+市级地区(provinceCode 为 null 是省,provinceCode 不为 null 且 cityCode 为 null 是市)
regionList = chinaRegionsCodeMapper.selectList(
new LambdaQueryWrapper<ChinaRegionsCode>()
.isNull(ChinaRegionsCode::getCityCode)
);
// 构建岗位分类文本:只取三级(叶子),格式 id:name(一级/二级)
Map<Long, String> categoryNameMap = jobCategoryList.stream()
.collect(Collectors.toMap(JobCategory::getId, JobCategory::getName));
jobCategoryText = jobCategoryList.stream()
.filter(c -> c.getLevel() == 3)
.map(c -> {
String parentName = categoryNameMap.getOrDefault(c.getParentId(), "");
String rootName = categoryNameMap.getOrDefault(c.getRootId(), "");
return c.getId() + ":" + c.getName() + "(" + rootName + "/" + parentName + ")";
})
.collect(Collectors.joining(", "));
// 构建行业文本:只取二级(叶子),格式 id:name(一级)
Map<Long, String> industryNameMap = industryList.stream()
.collect(Collectors.toMap(Industry::getId, Industry::getName));
industryText = industryList.stream()
.filter(i -> i.getLevel() == 2)
.map(i -> {
String parentName = industryNameMap.getOrDefault(i.getParentId(), "");
return i.getId() + ":" + i.getName() + "(" + parentName + ")";
})
.collect(Collectors.joining(", "));
long categoryLeafCount = jobCategoryList.stream().filter(c -> c.getLevel() == 3).count();
long industryLeafCount = industryList.stream().filter(i -> i.getLevel() == 2).count();
log.info("字典缓存加载完成: 岗位分类{}条(叶子{}条), 行业{}条(叶子{}条), 地区{}条",
jobCategoryList.size(), categoryLeafCount, industryList.size(), industryLeafCount, regionList.size());
}
/** 获取岗位分类文本(叶子节点,带父级路径,逗号分隔) */
public String getJobCategoryText() {
return jobCategoryText;
}
/** 获取行业文本(叶子节点,带父级路径,逗号分隔) */
public String getIndustryText() {
return industryText;
}
/**
* 根据城市名匹配地区编码
* <p>模糊匹配,如"北京"匹配"北京市"</p>
*
* @param cityName 城市名
* @return region_code,匹配不上返回 null
*/
public String matchRegionCode(String cityName) {
if (cityName == null || cityName.isBlank()) {
return null;
}
String name = cityName.replace("", "").replace("", "").trim();
return regionList.stream()
.filter(r -> r.getName().contains(name) || name.contains(r.getName().replace("", "").replace("", "")))
.map(ChinaRegionsCode::getCode)
.findFirst()
.orElse(null);
}
}
@@ -0,0 +1,248 @@
package org.jiayunet.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.jiayunet.ai.AiChatAbility;
import org.jiayunet.mapper.AppJobDataMapper;
import org.jiayunet.mapper.JobMapper;
import org.jiayunet.pojo.po.AppJobData;
import org.jiayunet.pojo.po.Job;
import org.jiayunet.tool.HttpTool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 岗位清洗服务
* <p>定时从 app_job_data 捞取待清洗数据,调用 AI 清洗后写入业务表</p>
* <p>依赖:AiChatAbilityAI调用)、DictCacheService(字典缓存)、JobCleanTransactionService(事务操作)</p>
* <p>使用表:app_job_data(读取/更新状态)、bg_job(去重查询)</p>
*
* @author zk
*/
@Slf4j
@Service
public class JobCleanService {
@Autowired
private AiChatAbility aiChatAbility;
@Autowired
private DictCacheService dictCacheService;
@Autowired
private JobCleanTransactionService jobCleanTransactionService;
@Autowired
private AppJobDataMapper appJobDataMapper;
@Autowired
private JobMapper jobMapper;
@Value("${app.job-clean.batch-size:20}")
private int batchSize;
@Value("${app.job-clean.thread-pool-size:5}")
private int threadPoolSize;
private ExecutorService executorService;
/** 初始化线程池 */
@javax.annotation.PostConstruct
public void init() {
executorService = Executors.newFixedThreadPool(threadPoolSize);
}
/**
* 定时任务A:岗位清洗(每5分钟)
* <p>1. 批量锁定待清洗数据 2. 多线程并发调用AI清洗 3. 写入业务表</p>
*/
@Scheduled(cron = "0 */1 * * * ?")
public void cleanJob() {
// 批量锁定:原子操作,clean_status 0→1
int locked = appJobDataMapper.update(null,
new LambdaUpdateWrapper<AppJobData>()
.set(AppJobData::getCleanStatus, 1)
.eq(AppJobData::getCleanStatus, 0)
.eq(AppJobData::getIsValid, 1)
.last("LIMIT " + batchSize));
if (locked == 0) {
return;
}
log.info("岗位清洗:锁定{}条数据", locked);
// 查出刚锁定的数据
List<AppJobData> dataList = appJobDataMapper.selectList(
new LambdaQueryWrapper<AppJobData>()
.eq(AppJobData::getCleanStatus, 1)
.eq(AppJobData::getIsValid, 1)
.last("LIMIT " + batchSize));
// 多线程并发处理
for (AppJobData data : dataList) {
executorService.submit(() -> {
try {
cleanOne(data);
} catch (Exception e) {
log.error("岗位清洗异常, id={}", data.getId(), e);
// 异常时保持 clean_status=1,由僵尸恢复任务重置
}
});
}
}
/**
* 定时任务B:僵尸恢复(每30分钟)
* <p>将超时10分钟仍在清洗中的数据重置为待清洗</p>
*/
@Scheduled(cron = "0 */30 * * * ?")
public void recoverZombie() {
int recovered = appJobDataMapper.update(null,
new LambdaUpdateWrapper<AppJobData>()
.set(AppJobData::getCleanStatus, 0)
.eq(AppJobData::getCleanStatus, 1)
.lt(AppJobData::getUpdatedAt, Instant.now().minusSeconds(600)));
if (recovered > 0) {
log.info("僵尸恢复:重置{}条数据", recovered);
}
}
/**
* 清洗单条岗位数据
* <p>1. 前置校验 2. 拼prompt调AI 3. 解析结果 4. 写入业务表</p>
*/
public void cleanOne(AppJobData data) {
// 1. 前置校验
if (data.getDescription() == null || data.getDescription().length() < 20) {
jobCleanTransactionService.updateCleanStatus(data.getId(), 3);
return;
}
// 2. 拼 prompt
String systemPrompt = buildSystemPrompt();
String userMessage = buildUserMessage(data);
// 3. 调用 AI
String aiResponse = aiChatAbility.chat(systemPrompt, userMessage);
// 4. 解析 JSON
try {
// 去掉可能的 markdown 代码块标记
String json = aiResponse.trim();
if (json.startsWith("```")) {
json = json.replaceAll("^```\\w*\\n?", "").replaceAll("\\n?```$", "").trim();
}
JsonNode root = HttpTool.objectMapper.readTree(json);
// valid 校验
if (!root.path("valid").asBoolean(false)) {
jobCleanTransactionService.updateCleanStatus(data.getId(), 3);
return;
}
// 5. 去重检查
String sourceId = String.valueOf(data.getId());
Long existJob = jobMapper.selectCount(
new LambdaQueryWrapper<Job>().eq(Job::getSourceId, sourceId));
if (existJob > 0) {
jobCleanTransactionService.updateCleanStatus(data.getId(), 2);
return;
}
// 6. 公司处理(加锁防并发重复)
String companyShortName = root.path("companyShortName").asText("");
if (companyShortName.isBlank()) {
companyShortName = data.getCompany();
}
Long companyId = jobCleanTransactionService.findOrCreateCompany(companyShortName);
// 7. 地区处理
List<String> regionCodes = new ArrayList<>();
JsonNode citiesNode = root.path("cities");
if (citiesNode.isArray()) {
for (JsonNode city : citiesNode) {
String code = dictCacheService.matchRegionCode(city.asText());
if (code != null) {
regionCodes.add(code);
}
}
}
// 8. 写入业务表(短事务,通过独立Service保证@Transactional生效)
jobCleanTransactionService.saveJobData(root, data, companyId, sourceId, regionCodes);
} catch (Exception e) {
log.error("AI 返回解析失败, id={}, response={}", data.getId(), aiResponse, e);
// 保持 clean_status=1,由僵尸恢复任务重置
}
}
/** 构建系统提示词 */
private String buildSystemPrompt() {
return """
你是一个岗位数据清洗助手。请根据提供的原始岗位数据,提取并结构化为JSON格式。
返回JSON格式要求:
{
"valid": true/false,
"title": "岗位名称",
"salary": "标准化薪资,如10-20K、面议,无效则null",
"education": 0-4的数字(0=不限 1=大专 2=本科 3=硕士 4=博士),
"minExperience": 最低工作年限数字(不要求则0),
"employmentType": 0或1(0=全职 1=兼职,默认0),
"categoryId": 岗位分类ID(必选,从分类列表中选最接近的),
"industryId": 行业ID(仅当明确提到行业经验要求时设置,列表中无完全匹配则选最相似的,未提到则null),
"description": "岗位职责,保持原文风格,格式化展示",
"requirement": "任职要求,保持原文风格,格式化展示",
"bonus": "加分项,无则null",
"tags": ["核心职能标签,最多5个,如数据分析、产品策略"],
"skillTags": ["技能关键词,最多8个,如Java、Spring Boot"],
"companyShortName": "简洁的公司简称,如字节跳动、中国平安",
"cities": ["工作城市列表,精确到市"]
}
规则:
1. description/requirement/bonus 均从原始的 description+experience+education 内容中提取,保持原文风格
2. 岗位标题不存在时,从描述中归纳生成
3. 薪资标准化为 10-20K、20K、面议 等格式,无效或空则null
4. categoryId 必须从分类列表中选一个,不允许为null
5. industryId 仅当描述中明确提到行业经验要求时设置
6. tags 是核心职能标签(如数据分析、团队协作),最多5个
7. skillTags 是技能关键词(如Java、MySQL),最多8个
8. companyShortName 去掉地区后缀、招聘后缀、括号内容,保持简洁
9. 只返回JSON,不要其他内容
""";
}
/** 构建用户消息 */
private String buildUserMessage(AppJobData data) {
StringBuilder sb = new StringBuilder();
sb.append("【原始数据】\n");
sb.append("岗位名称: ").append(nullToEmpty(data.getJobTitle())).append("\n");
sb.append("薪资: ").append(nullToEmpty(data.getSalary())).append("\n");
sb.append("工作地点: ").append(nullToEmpty(data.getLocation())).append("\n");
sb.append("公司: ").append(nullToEmpty(data.getCompany())).append("\n");
sb.append("经验要求: ").append(nullToEmpty(data.getExperience())).append("\n");
sb.append("学历要求: ").append(nullToEmpty(data.getEducation())).append("\n");
sb.append("岗位详情: ").append(nullToEmpty(data.getDescription())).append("\n\n");
sb.append("【岗位分类列表】\n").append(dictCacheService.getJobCategoryText()).append("\n\n");
sb.append("【行业列表】\n").append(dictCacheService.getIndustryText());
return sb.toString();
}
private String nullToEmpty(String s) {
return s == null ? "" : s;
}
}
@@ -0,0 +1,123 @@
package org.jiayunet.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.jiayunet.mapper.AppJobDataMapper;
import org.jiayunet.mapper.CompanyMapper;
import org.jiayunet.mapper.JobMapper;
import org.jiayunet.mapper.JobRegionRelationMapper;
import org.jiayunet.pojo.po.AppJobData;
import org.jiayunet.pojo.po.Company;
import org.jiayunet.pojo.po.Job;
import org.jiayunet.pojo.po.JobRegionRelation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.util.List;
/**
* 岗位清洗事务服务
* <p>独立出来解决 @Transactional 同类自调用失效问题</p>
* <p>依赖:JobMapper、CompanyMapper、JobRegionRelationMapper、AppJobDataMapper</p>
* <p>使用表:bg_job(写入)、bg_company(查询/创建)、bg_job_region_relation(写入)、app_job_data(更新状态)</p>
*
* @author zk
*/
@Slf4j
@Service
public class JobCleanTransactionService {
@Autowired
private JobMapper jobMapper;
@Autowired
private CompanyMapper companyMapper;
@Autowired
private JobRegionRelationMapper jobRegionRelationMapper;
@Autowired
private AppJobDataMapper appJobDataMapper;
/**
* 写入 bg_job + bg_job_region_relation + 更新 clean_status(短事务)
*/
@Transactional(rollbackFor = Exception.class)
public void saveJobData(JsonNode root, AppJobData data, Long companyId, String sourceId, List<String> regionCodes) {
Job job = new Job();
job.setTitle(root.path("title").asText(""));
job.setCompanyId(companyId);
job.setCategoryId(root.path("categoryId").asLong(0));
job.setEmploymentType(root.path("employmentType").asInt(0));
job.setDescription(root.path("description").asText(""));
job.setRequirement(root.path("requirement").asText(""));
job.setBonus(root.path("bonus").asText(null));
job.setTags(root.path("tags").toString());
job.setSkillTags(root.path("skillTags").toString());
String salary = root.path("salary").asText(null);
job.setSalary("null".equals(salary) ? null : salary);
job.setEducation(root.path("education").asInt(0));
job.setMinExperience(root.path("minExperience").asInt(0));
Long industryId = root.path("industryId").asLong(0);
job.setIndustryId(industryId == 0 ? null : industryId);
job.setSourceUrl(data.getDetailUrl());
job.setSourceId(sourceId);
job.setStatus(0);
job.setCreateTime(Instant.now());
job.setUpdateTime(Instant.now());
jobMapper.insert(job);
// 写入岗位-地区关联
for (String regionCode : regionCodes) {
JobRegionRelation relation = new JobRegionRelation();
relation.setJobId(job.getId());
relation.setRegionCode(regionCode);
relation.setCreateTime(Instant.now());
jobRegionRelationMapper.insert(relation);
}
// 更新清洗状态
updateCleanStatus(data.getId(), 2);
}
/**
* 查找或创建公司(加锁防并发重复)
* <p>按 short_name 查询,不存在则创建一条待完善记录</p>
*/
public synchronized Long findOrCreateCompany(String shortName) {
Company company = companyMapper.selectOne(
new LambdaQueryWrapper<Company>()
.eq(Company::getShortName, shortName)
.last("LIMIT 1"));
if (company != null) {
return company.getId();
}
Company newCompany = new Company();
newCompany.setName(shortName);
newCompany.setShortName(shortName);
newCompany.setStatus(0);
newCompany.setCreateTime(Instant.now());
newCompany.setUpdateTime(Instant.now());
companyMapper.insert(newCompany);
return newCompany.getId();
}
/** 更新清洗状态 */
public void updateCleanStatus(Long id, int status) {
appJobDataMapper.update(null,
new LambdaUpdateWrapper<AppJobData>()
.set(AppJobData::getCleanStatus, status)
.eq(AppJobData::getId, id));
}
}