Compare commits

..

2 Commits

Author SHA1 Message Date
zk e0cb4493e3 删除数据清洗相关代码 2026-06-02 17:08:47 +08:00
zk b9d8c2724a 修海数据清洗相关代码 2026-06-02 14:45:05 +08:00
13 changed files with 11 additions and 1290 deletions
+2 -2
View File
@@ -1,5 +1,5 @@
*/target
**/.idea
.idea/
./log
./logs
logs/
log/
-202
View File
@@ -1,202 +0,0 @@
---
inclusion: manual
---
# 数据清洗方案
## 总体架构
```
爬虫(公司网络) → app_job_data(原始数据)
Java定时任务读取(多线程)
三次AI调用:结构化 → 专业匹配 → 技能提取
写入业务表(bg_company + bg_job + 关联表 + bg_skill_tag)
公司信息不完整的 → 调AI补充
```
所有清洗逻辑放在 `manager` 模块,通过 `@Scheduled` 定时任务触发。
---
## 一、岗位清洗触发逻辑
### 1.1 状态管理
`app_job_data.clean_status`
- 0=待清洗:新爬到的数据,默认值
- 1=清洗中:定时任务已锁定,正在处理
- 2=已入库:清洗成功,已写入 bg_job
- 3=已丢弃:AI判定为无效数据
### 1.2 两个定时任务
#### 任务A:岗位清洗(每5分钟)
1. 事务内 `SELECT FOR UPDATE` + `UPDATE clean_status=1` 批量锁定
2. 丢入线程池,多线程并发处理
3. 每条独立更新状态
#### 任务B:僵尸恢复(每30分钟)
将超时10分钟仍在清洗中的数据重置为待清洗。
---
## 二、岗位清洗逻辑(三次AI调用)
### 2.1 前置校验
- `description` 为空或长度 < 20 → 标记丢弃,跳过
### 2.2 参考数据缓存(DictCacheService
启动时加载:
- `bg_job_category` 叶子节点(level=3),格式 `id:name(一级/二级)`
- `bg_industry` 叶子节点(level=2),格式 `id:name(一级)`
- `bg_major_category` 叶子节点(level=3),格式 `id:name(一级/二级)`
- `bg_china_regions_code` 省市级,供城市名匹配
### 2.3 第一次AI:提取岗位结构化信息
#### 输入
- 原始字段:job_title、salary、location、company、experience、education、description
- 参考列表:岗位分类、行业
#### 返回JSON
```json
{
"valid": true,
"title": "Java高级开发工程师",
"salary": "15-25K",
"education": 2,
"minExperience": 3,
"employmentType": 0,
"categoryId": 12,
"requiredIndustryId": 5,
"description": "岗位职责...",
"requirement": "任职要求...",
"bonus": "加分项...",
"tags": ["数据分析", "产品策略"],
"skillTags": ["Java", "Spring Boot"],
"companyShortName": "字节跳动",
"cities": ["北京", "上海"]
}
```
#### Java处理流程
1. valid=false → 丢弃
2. source_id 去重检查
3. 公司处理(查或创建 bg_company
4. 地区匹配(cities → region_code
5. 写入 bg_job + bg_job_region_relation + 更新 clean_status=2
### 2.4 第二次AI:专业匹配
岗位入库后,单独调用AI匹配专业要求。
#### 输入
- 岗位标题、职责、要求
- 三级专业分类列表(845条,格式 `id:name(一级/二级)`
#### 返回JSON
```json
{
"requiredMajorIds": [101, 203],
"majorSensitivity": 1
}
```
#### 规则
- requiredMajorIds:从专业列表中选最相关的,最多3个,无明确要求则空数组
- majorSensitivity0=不限 1=优先 2=强制
- majorSensitivity=0 时,requiredMajorIds 应为空数组
- 结果更新到 bg_job 的 required_major_ids 和 major_sensitivity 字段
### 2.5 第三次AI:技能提取
自由提取岗位核心技能,不再依赖预定义标签库。
#### 输入
- 岗位标题、职责、要求
#### 返回JSON
```json
["java", "spring boot", "mysql", "redis"]
```
#### prompt 规则
- 统一小写字母
- 尽量简短,使用业界通用缩写
- 提取范围:技术栈、专业领域知识、行业工具、专业资质能力
- 不提取纯软技能(沟通、协作、学习能力等)
- 无专业能力要求的岗位(销售、行政等)返回空数组
- 最多15个
#### 技能入库流程
1. 遍历AI返回的技能名,统一转小写
2. `INSERT IGNORE INTO bg_skill_tag`(依靠 name 唯一索引去重,ID 由 IdWorker 生成)
3. `SELECT id FROM bg_skill_tag WHERE name=?` 拿到ID
4. 写入 bg_job_skill_tag_relation
#### 并发安全
依靠数据库唯一索引保证,不加应用层锁。多线程同时插入相同技能名时,INSERT IGNORE 自动忽略重复。
### 2.6 容错设计
- 第二次、第三次AI调用失败不影响岗位入库
- 每次调用独立 try-catch,仅日志记录
---
## 三、公司数据触发逻辑
### 3.1 状态
`bg_company.status`:0=待完善 1=已完善 2=禁用 3=补充中 4=补充失败
### 3.2 定时任务
- 任务C:公司补充(每小时),SELECT FOR UPDATE 锁定 status=0 的数据
- 任务D:僵尸恢复(每小时,与C错开),重置超时10分钟的 status=3 数据
---
## 四、公司数据补充逻辑(AI补充)
### 4.1 流程
1. 拿 short_name 调AI
2. prompt 附带行业列表,AI直接返回 industryId
3. valid=false → status=4(补充失败)
4. regionCodeAI返回城市名,Java侧匹配
5. 回填 bg_company 各字段,status=1
### 4.2 AI返回JSON
```json
{
"valid": true,
"name": "北京字节跳动科技有限公司",
"city": "北京",
"companyType": "独角兽",
"industryId": 5,
"tags": ["短视频", "人工智能"],
"summary": "全球领先的内容平台和科技公司",
"description": "字节跳动成立于2012年...",
"foundedYear": "2012",
"address": "北京市海淀区...",
"scale": "10000人以上",
"website": "https://www.bytedance.com",
"financingStage": "已上市",
"latestValuation": "2200亿美元",
"news": ["新闻1", "新闻2", "新闻3"]
}
```
---
## 五、关键设计决策
| 决策点 | 结论 | 原因 |
|--------|------|------|
| 技能标签来源 | AI自由提取,自动入库 bg_skill_tag | 去掉预定义标签限制,覆盖面更广 |
| 技能并发去重 | INSERT IGNORE + 唯一索引 | 不加应用层锁,性能好 |
| 专业匹配 | 单独一次AI调用 | 专业列表845条,和第一次prompt合并会超token |
| AI调用次数 | 三次(结构化 + 专业 + 技能) | 各维度独立,容错互不影响 |
| 公司数据来源 | AI补充,不用工商API | 公司简称查API不精确,AI覆盖率高 |
@@ -97,22 +97,7 @@ app:
# base-url 配到版本路径,如 DeepSeek: https://api.deepseek.com/v1,豆包: https://ark.cn-beijing.volces.com/api/v3
ai:
providers:
job-clean:
base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3}
api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02}
model: ${AI_MODEL:doubao-seed-2-0-lite-260215}
job-recommend:
base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3}
api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02}
model: ${AI_MODEL:doubao-1-5-lite-32k-250115}
# 岗位清洗配置
job-clean:
batch-size: 60
thread-pool-size: 60
# 公司数据补充配置
company-clean:
batch-size: 10
thread-pool-size: 3
model: ${AI_MODEL:doubao-1-5-lite-32k-250115}
@@ -97,22 +97,7 @@ app:
# base-url 配到版本路径,如 DeepSeek: https://api.deepseek.com/v1,豆包: https://ark.cn-beijing.volces.com/api/v3
ai:
providers:
job-clean:
base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3}
api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02}
model: ${AI_MODEL:doubao-seed-2-0-lite-260215}
job-recommend:
base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3}
api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02}
model: ${AI_MODEL:doubao-1-5-lite-32k-250115}
# 岗位清洗配置
job-clean:
batch-size: 60
thread-pool-size: 60
# 公司数据补充配置
company-clean:
batch-size: 10
thread-pool-size: 3
model: ${AI_MODEL:doubao-1-5-lite-32k-250115}
@@ -97,22 +97,7 @@ app:
# base-url 配到版本路径,如 DeepSeek: https://api.deepseek.com/v1,豆包: https://ark.cn-beijing.volces.com/api/v3
ai:
providers:
job-clean:
base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3}
api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02}
model: ${AI_MODEL:doubao-seed-2-0-lite-260215}
job-recommend:
base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3}
api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02}
model: ${AI_MODEL:doubao-1-5-lite-32k-250115}
# 岗位清洗配置
job-clean:
batch-size: 60
thread-pool-size: 60
# 公司数据补充配置
company-clean:
batch-size: 10
thread-pool-size: 3
model: ${AI_MODEL:doubao-1-5-lite-32k-250115}
@@ -1,24 +0,0 @@
package org.jiayunet.mapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.jiayunet.pojo.po.AppJobData;
import java.util.List;
/**
* 爬虫岗位原始数据Mapper
*
* @author zk
*/
@Mapper
public interface AppJobDataMapper extends CommonMapper<AppJobData> {
/**
* 查询待清洗数据并加行锁(SELECT ... FOR UPDATE
* <p>必须在事务内调用,配合状态更新实现原子锁定</p>
*/
@Select("SELECT * FROM app_job_data WHERE clean_status = 0 AND is_valid = 1 LIMIT #{limit} FOR UPDATE")
List<AppJobData> selectForUpdate(@Param("limit") int limit);
}
@@ -1,79 +0,0 @@
package org.jiayunet.pojo.po;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.Instant;
/**
* 爬虫岗位原始数据表(app_job_data
* <p>存储爬虫抓取的原始岗位数据,供清洗服务读取并写入业务表</p>
*
* @author zk
*/
@Data
@TableName(value = "app_job_data")
public class AppJobData {
@TableId(type = IdType.AUTO)
private Long id;
/** 关联爬取任务ID */
private Long taskCrawlId;
/** 职位名称 */
private String jobTitle;
/** 薪资 */
private String salary;
/** 工作地点 */
private String location;
/** 公司名称 */
private String company;
/** 经验要求 */
private String experience;
/** 学历要求 */
private String education;
/** 岗位详情(职责+要求+介绍) */
private String description;
/** 详情页URL */
private String detailUrl;
/** 内容哈希值,用于查重 */
private String contentHash;
/** 数据来源 0=官网 1=平台 */
private Integer sources;
/** 是否独立URL 0=页内展示 1=独立页面 */
private Integer isIndependentUrl;
/** 是否有效 0=无效 1=有效 */
private Integer isValid;
/** 有效期 */
private Instant expireAt;
/** 验证状态 pending=待验证 checking=验证中 checked=已验证 */
private String checkStatus;
/** 清洗状态 0=待清洗 1=清洗中 2=已入库 3=已丢弃 */
private Integer cleanStatus;
/** 上次验证时间 */
private Instant lastCheckAt;
/** 创建时间 */
private Instant createdAt;
/** 更新时间 */
private Instant updatedAt;
}
@@ -77,6 +77,12 @@ public class Job {
/** 爬虫原始数据ID,用于去重 */
private String sourceId;
/** 招聘分类 0=校招 1=实习 2=社招 3=其他 */
private Integer recruitCategory;
/** 发布日期 */
private Instant expireAt;
/** 状态 0=上架 1=下架 2=已失效 */
private Integer status;
@@ -1,184 +0,0 @@
package org.jiayunet.service;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.jiayunet.ai.AiChatAbility;
import org.jiayunet.mapper.CompanyMapper;
import org.jiayunet.pojo.po.Company;
import org.jiayunet.tool.HttpTool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 公司数据补充服务
* <p>定时从 bg_company 捞取待完善数据,调用 AI 补充公司信息</p>
* <p>依赖:AiChatAbilityAI调用)、DictCacheService(行业列表/地区匹配)、CompanyCleanTransactionService(事务操作)</p>
* <p>使用表:bg_company(读取/更新)</p>
*
* @author zk
*/
@Slf4j
@Service
public class CompanyCleanService {
@Autowired
private AiChatAbility aiChatAbility;
@Autowired
private DictCacheService dictCacheService;
@Autowired
private CompanyCleanTransactionService companyCleanTransactionService;
@Autowired
private CompanyMapper companyMapper;
@Value("${app.company-clean.batch-size:10}")
private int batchSize;
@Value("${app.company-clean.thread-pool-size:3}")
private int threadPoolSize;
private ExecutorService executorService;
/** 初始化线程池 */
@javax.annotation.PostConstruct
public void init() {
executorService = Executors.newFixedThreadPool(threadPoolSize);
}
/**
* 任务C:公司数据补充(每小时)
* <p>1. 批量锁定待完善公司 2. 多线程并发调用AI补充 3. 回填数据</p>
*/
@Scheduled(cron = "0 */1 * * * ?")
public void cleanCompany() {
List<Company> companyList = companyCleanTransactionService.lockBatch(batchSize);
if (companyList.isEmpty()) {
return;
}
log.info("公司补充:锁定{}条数据", companyList.size());
// 多线程并发处理
for (Company company : companyList) {
executorService.submit(() -> {
try {
cleanOne(company);
} catch (Exception e) {
log.error("公司补充异常, id={}, shortName={}", company.getId(), company.getShortName(), e);
}
});
}
}
/**
* 任务D:公司僵尸恢复(每小时,与任务C错开30分钟)
* <p>将超时10分钟仍在补充中的数据重置为待完善</p>
*/
@Scheduled(cron = "0 30 */1 * * ?")
public void recoverZombie() {
int recovered = companyMapper.update(null,
new LambdaUpdateWrapper<Company>()
.set(Company::getStatus, 0)
.eq(Company::getStatus, 3)
.lt(Company::getUpdateTime, Instant.now().minusSeconds(600)));
if (recovered > 0) {
log.info("公司僵尸恢复:重置{}条数据", recovered);
}
}
/**
* 补充单条公司数据
* <p>1. 拼prompt调AI 2. 解析结果 3. 回填数据</p>
*/
public void cleanOne(Company company) {
log.info("公司补充开始, id={}, shortName={}", company.getId(), company.getShortName());
String systemPrompt = buildSystemPrompt();
String userMessage = buildUserMessage(company.getShortName());
String aiResponse = aiChatAbility.chat(systemPrompt, userMessage);
try {
// 去掉可能的 markdown 代码块标记
String json = aiResponse.trim();
if (json.startsWith("```")) {
json = json.replaceAll("^```\\w*\\n?", "").replaceAll("\\n?```$", "").trim();
}
// 清除控制字符(Tab等),保留换行符
json = json.replaceAll("[\\x00-\\x08\\x0B\\x0C\\x0E-\\x1F]", "");
JsonNode root = HttpTool.objectMapper.readTree(json);
// valid 校验:AI不认识该公司
if (!root.path("valid").asBoolean(false)) {
log.info("公司补充:AI不认识该公司, id={}, shortName={}", company.getId(), company.getShortName());
companyCleanTransactionService.updateCompanyStatus(company.getId(), 4);
return;
}
// 回填数据
companyCleanTransactionService.saveCompanyData(root, company);
log.info("公司补充完成, id={}, shortName={}", company.getId(), company.getShortName());
} catch (Exception e) {
log.error("公司AI返回解析失败, id={}, shortName={}, response={}",
company.getId(), company.getShortName(), aiResponse, e);
// 保持 status=3,由僵尸恢复任务重置
}
}
/** 构建系统提示词 */
private String buildSystemPrompt() {
return """
你是一个企业信息补充助手。根据提供的公司简称,补充该公司的详细信息。
返回JSON格式要求:
{
"valid": true/false,
"name": "公司全称",
"city": "总部所在城市,精确到市",
"companyType": "企业类型",
"industryId": 行业ID,
"tags": ["公司标签,最多5个"],
"summary": "一句话简介,100字以内",
"description": "公司详细描述,500字以内",
"foundedYear": "成立年份",
"address": "总部/注册地址",
"scale": "企业规模",
"website": "官网地址",
"financingStage": "融资状态",
"latestValuation": "最新估值",
"news": ["相关新闻,最多3条,每条50字以内"]
}
规则:
1. 如果不认识该公司,返回 {"valid": false}
2. name 根据公司简称推断完整的企业注册名称
3. companyType 取值:上市企业、独角兽、国企、央企、民营企业、外资企业、合资企业、事业单位、其他
4. industryId 必须从给定行业列表中选择,不确定则null
5. scale 取值:少于50人、50-150人、150-500人、500-1000人、1000-5000人、5000-10000人、10000人以上
6. tags 体现公司核心业务特征,最多5个
7. news 基于你的知识提供该公司最新的3条相关新闻,每条50字以内
8. latestValuation 知道就提供,不知道则null
9. 不确定的字段返回null,不要编造
10. 字符串值中不允许出现Tab、换行等控制字符
11. 只返回JSON,不要其他内容
""";
}
/** 构建用户消息 */
private String buildUserMessage(String shortName) {
return "【公司简称】\n" + shortName +
"\n\n【行业列表】\n" + dictCacheService.getIndustryText();
}
}
@@ -1,165 +0,0 @@
package org.jiayunet.service;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.jiayunet.mapper.CompanyMapper;
import org.jiayunet.pojo.po.Company;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
/**
* 公司数据补充事务服务
* <p>独立出来解决 @Transactional 同类自调用失效问题</p>
* <p>依赖:CompanyMapper、DictCacheService(地区匹配、行业校验)</p>
* <p>使用表:bg_company(更新)</p>
*
* @author zk
*/
@Slf4j
@Service
public class CompanyCleanTransactionService {
@Autowired
private CompanyMapper companyMapper;
@Autowired
private DictCacheService dictCacheService;
/**
* 回填公司数据(事务)
* <p>1. 解析AI返回的各字段 2. 地区匹配 3. 行业校验 4. 更新bg_company</p>
*/
@Transactional(rollbackFor = Exception.class)
public void saveCompanyData(JsonNode root, Company company) {
// nameAI根据shortName推断的全称
String name = root.path("name").asText(null);
if (name != null && !name.isBlank()) {
company.setName(name);
}
// regionCodeAI返回城市名,Java侧匹配
String city = root.path("city").asText(null);
if (city != null && !city.isBlank()) {
String regionCode = dictCacheService.matchRegionCode(city);
company.setRegionCode(regionCode);
}
// companyType
String companyType = root.path("companyType").asText(null);
if (companyType != null && !"null".equals(companyType)) {
company.setCompanyType(companyType);
}
// industryId:校验是否存在于行业列表中
long industryId = root.path("industryId").asLong(0);
company.setIndustryId(industryId > 0 ? industryId : null);
// tagsJSON数组
JsonNode tagsNode = root.path("tags");
if (tagsNode.isArray() && !tagsNode.isEmpty()) {
List<String> tags = new ArrayList<>();
tagsNode.forEach(node -> tags.add(node.asText()));
company.setTags(tags);
}
// summary
String summary = root.path("summary").asText(null);
if (summary != null && !"null".equals(summary)) {
company.setSummary(summary);
}
// description
String description = root.path("description").asText(null);
if (description != null && !"null".equals(description)) {
company.setDescription(description);
}
// foundedYear
String foundedYear = root.path("foundedYear").asText(null);
if (foundedYear != null && !"null".equals(foundedYear)) {
company.setFoundedYear(foundedYear);
}
// address
String address = root.path("address").asText(null);
if (address != null && !"null".equals(address)) {
company.setAddress(address);
}
// scale
String scale = root.path("scale").asText(null);
if (scale != null && !"null".equals(scale)) {
company.setScale(scale);
}
// website
String website = root.path("website").asText(null);
if (website != null && !"null".equals(website)) {
company.setWebsite(website);
}
// financingStage
String financingStage = root.path("financingStage").asText(null);
if (financingStage != null && !"null".equals(financingStage)) {
company.setFinancingStage(financingStage);
}
// latestValuation
String latestValuation = root.path("latestValuation").asText(null);
if (latestValuation != null && !"null".equals(latestValuation)) {
company.setLatestValuation(latestValuation);
}
// newsJSON数组
JsonNode newsNode = root.path("news");
if (newsNode.isArray() && !newsNode.isEmpty()) {
List<String> news = new ArrayList<>();
newsNode.forEach(node -> news.add(node.asText()));
company.setNews(news);
}
// 更新状态和时间
company.setStatus(1);
company.setUpdateTime(Instant.now());
companyMapper.updateById(company);
}
/** 更新公司状态 */
public void updateCompanyStatus(Long id, int status) {
companyMapper.update(null,
new LambdaUpdateWrapper<Company>()
.set(Company::getStatus, status)
.set(Company::getUpdateTime, Instant.now())
.eq(Company::getId, id));
}
/**
* 原子锁定一批待完善公司(事务内 SELECT FOR UPDATE + UPDATE 状态)
* <p>行锁保证并发安全,其他线程会阻塞直到事务提交</p>
*
* @return 锁定成功的公司列表,可能为空
*/
@Transactional(rollbackFor = Exception.class)
public List<Company> lockBatch(int batchSize) {
List<Company> companyList = companyMapper.selectForUpdate(batchSize);
if (companyList.isEmpty()) {
return companyList;
}
List<Long> ids = companyList.stream().map(Company::getId).toList();
companyMapper.update(null,
new LambdaUpdateWrapper<Company>()
.set(Company::getStatus, 3)
.set(Company::getUpdateTime, Instant.now())
.in(Company::getId, ids));
return companyList;
}
}
@@ -116,15 +116,6 @@ public class DictCacheService {
regionList.size(), majorCategoryList.size(), majorLeafCount);
}
/** 获取岗位分类文本(叶子节点,带父级路径,逗号分隔) */
public String getJobCategoryText() {
return jobCategoryText;
}
/** 获取行业文本(叶子节点,带父级路径,逗号分隔) */
public String getIndustryText() {
return industryText;
}
/** 获取专业分类文本(三级叶子节点,带父级路径,逗号分隔) */
public String getMajorCategoryText() {
@@ -1,384 +0,0 @@
package org.jiayunet.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.jiayunet.ai.AiChatAbility;
import org.jiayunet.ai.AiResponseCleanTool;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import org.jiayunet.mapper.AppJobDataMapper;
import org.jiayunet.mapper.JobMapper;
import org.jiayunet.mapper.SkillTagMapper;
import org.jiayunet.pojo.po.AppJobData;
import org.jiayunet.pojo.po.Job;
import org.jiayunet.pojo.po.SkillTag;
import org.jiayunet.tool.HttpTool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 岗位清洗服务
* <p>定时从 app_job_data 捞取待清洗数据,调用 AI 清洗后写入业务表</p>
* <p>依赖:AiChatAbilityAI调用)、DictCacheService(字典缓存)、JobCleanTransactionService(事务操作)</p>
* <p>使用表:app_job_data(读取/更新状态)、bg_job(去重查询/更新专业)、bg_skill_tag(技能入库)</p>
*
* @author zk
*/
@Slf4j
@Service
public class JobCleanService {
@Autowired
private AiChatAbility aiChatAbility;
@Autowired
private DictCacheService dictCacheService;
@Autowired
private JobCleanTransactionService jobCleanTransactionService;
@Autowired
private AppJobDataMapper appJobDataMapper;
@Autowired
private JobMapper jobMapper;
@Autowired
private SkillTagMapper skillTagMapper;
@Value("${app.job-clean.batch-size:20}")
private int batchSize;
@Value("${app.job-clean.thread-pool-size:5}")
private int threadPoolSize;
private ExecutorService executorService;
/** 初始化线程池 */
@javax.annotation.PostConstruct
public void init() {
executorService = Executors.newFixedThreadPool(threadPoolSize);
}
/**
* 定时任务A:岗位清洗(每3分钟)
* <p>1. 批量锁定待清洗数据 2. 多线程并发调用AI清洗 3. 写入业务表</p>
*/
@Scheduled(cron = "0 */3 * * * ?")
public void cleanJob() {
List<AppJobData> dataList = jobCleanTransactionService.lockBatch(batchSize);
if (dataList.isEmpty()) {
return;
}
log.info("岗位清洗:锁定{}条数据", dataList.size());
// 多线程并发处理
for (AppJobData data : dataList) {
executorService.submit(() -> {
try {
cleanOne(data);
} catch (Exception e) {
log.error("岗位清洗异常, id={}", data.getId(), e);
// 异常时保持 clean_status=1,由僵尸恢复任务重置
}
});
}
}
/**
* 定时任务B:僵尸恢复(每30分钟)
* <p>将超时10分钟仍在清洗中的数据重置为待清洗</p>
*/
@Scheduled(cron = "0 */30 * * * ?")
public void recoverZombie() {
int recovered = appJobDataMapper.update(null,
new LambdaUpdateWrapper<AppJobData>()
.set(AppJobData::getCleanStatus, 0)
.eq(AppJobData::getCleanStatus, 1)
.lt(AppJobData::getUpdatedAt, Instant.now().minusSeconds(600)));
if (recovered > 0) {
log.info("僵尸恢复:重置{}条数据", recovered);
}
}
/**
* 清洗单条岗位数据
* <p>1. 前置校验 2. 第一次AI提取结构化信息 3. 写入业务表 4. 第二次AI匹配专业 5. 第三次AI提取技能</p>
*/
public void cleanOne(AppJobData data) {
// 1. 前置校验
if (data.getDescription() == null || data.getDescription().length() < 20) {
jobCleanTransactionService.updateCleanStatus(data.getId(), 3);
return;
}
// 2. 第一次AI:提取岗位结构化信息
String systemPrompt = buildSystemPrompt();
String userMessage = buildUserMessage(data);
String aiResponse = aiChatAbility.chat(systemPrompt, userMessage);
// 3. 解析JSON
try {
String json = AiResponseCleanTool.clean(aiResponse);
JsonNode root = HttpTool.objectMapper.readTree(json);
// valid 校验
if (!root.path("valid").asBoolean(false)) {
jobCleanTransactionService.updateCleanStatus(data.getId(), 3);
return;
}
// 4. 去重检查
String sourceId = String.valueOf(data.getId());
Long existJob = jobMapper.selectCount(new LambdaQueryWrapper<Job>().eq(Job::getSourceId, sourceId));
if (existJob > 0) {
jobCleanTransactionService.updateCleanStatus(data.getId(), 2);
return;
}
// 5. 公司处理(加锁防并发重复)
String companyShortName = root.path("companyShortName").asText("");
if (companyShortName.isBlank()) {
companyShortName = data.getCompany();
}
Long companyId = jobCleanTransactionService.findOrCreateCompany(companyShortName);
// 6. 地区处理
List<String> regionCodes = new ArrayList<>();
JsonNode citiesNode = root.path("cities");
if (citiesNode.isArray()) {
for (JsonNode city : citiesNode) {
String code = dictCacheService.matchRegionCode(city.asText());
if (code != null) {
regionCodes.add(code);
}
}
}
// 7. 写入业务表
jobCleanTransactionService.saveJobData(root, data, companyId, sourceId, regionCodes);
// 拿到刚插入的 job
Job insertedJob = jobMapper.selectOne(new LambdaQueryWrapper<Job>().eq(Job::getSourceId, sourceId).last("LIMIT 1"));
if (insertedJob == null) {
return;
}
// 8. 第二次AI:专业匹配(失败不影响岗位入库)
try {
String title = root.path("title").asText("");
String desc = root.path("description").asText("");
String req = root.path("requirement").asText("");
matchMajor(insertedJob.getId(), title, desc, req);
} catch (Exception ex) {
log.warn("专业匹配失败, id={}", data.getId(), ex);
}
// 9. 第三次AI:技能提取(失败不影响岗位入库)
try {
String title = root.path("title").asText("");
String desc = root.path("description").asText("");
String req = root.path("requirement").asText("");
extractSkillTags(insertedJob.getId(), title, desc, req);
} catch (Exception ex) {
log.warn("技能提取失败, id={}", data.getId(), ex);
}
} catch (Exception e) {
log.error("AI 返回解析失败, id={}, response={}", data.getId(), aiResponse, e);
}
}
/**
* 第二次AI调用:匹配专业 + 专业敏感度
* <p>传入岗位信息和三级专业分类列表,AI返回 requiredMajorIds + majorSensitivity → 更新 bg_job</p>
*/
private void matchMajor(Long jobId, String title, String description, String requirement) {
String systemPrompt = """
你是一个岗位专业匹配助手。根据岗位信息,判断该岗位对专业的要求。
返回JSON格式:
{
"requiredMajorIds": [专业ID数组,从专业列表中选择最相关的,最多3个,无明确要求则空数组],
"majorSensitivity": 0-2的数字(0=专业不限 1=优先相关专业 2=强制要求专业)
}
规则:
1. 只能从给定专业列表中选择ID
2. 根据岗位描述判断专业敏感度:明确写"XX专业"→2,写"相关专业优先"→1,未提及→0
3. majorSensitivity为0时,requiredMajorIds应为空数组
4. 只返回JSON,不要其他内容
""";
String userMessage = "【岗位信息】\n标题: " + title + "\n职责: " + description + "\n要求: " + requirement +
"\n\n【专业分类列表】\n" + dictCacheService.getMajorCategoryText();
String aiResponse = aiChatAbility.chat(systemPrompt, userMessage);
String json = AiResponseCleanTool.clean(aiResponse);
try {
JsonNode root = HttpTool.objectMapper.readTree(json);
int majorSensitivity = root.path("majorSensitivity").asInt(0);
List<Long> majorIds = new ArrayList<>();
JsonNode idsNode = root.path("requiredMajorIds");
if (idsNode.isArray()) {
for (JsonNode node : idsNode) {
long id = node.asLong(0);
if (id > 0) {
majorIds.add(id);
}
}
}
// 更新 bg_job
jobCleanTransactionService.updateJobMajor(jobId, majorIds.isEmpty() ? null : majorIds, majorSensitivity);
} catch (Exception e) {
log.warn("专业匹配AI返回解析失败: {}", json, e);
}
}
/**
* 第三次AI调用:自由提取技能标签
* <p>AI返回技能名数组 → INSERT IGNORE 入 bg_skill_tag → 查ID → 写关联表</p>
*/
private void extractSkillTags(Long jobId, String title, String description, String requirement) {
String systemPrompt = """
你是一个技能提取助手。根据岗位信息,提取该岗位要求的核心专业能力和工具技能。
返回JSON数组格式,如:["java", "spring boot", "mysql", "redis"]
规则:
1. 统一使用小写字母
2. 只保留核心词,去掉多余修饰(如"plc编程""plc""c语言""c""cad制图""cad"
3. 同一技能只保留最具体的表述,不要同时出现上位词和下位词(如有"机械设计"就不要再出"机械"
4. 提取范围包括:技术栈、专业领域知识、行业工具、专业资质能力等
5. 不提取纯软技能(如沟通能力、团队协作、学习能力、积极主动)
6. 不提取过于宽泛的标签(如"办公软件""windows"
7. 如果岗位完全没有专业能力要求(纯看态度和素质),返回空数组 []
8. 最多15个,按重要性排序
9. 只返回JSON数组,不要其他内容
示例1(技术岗):
输入:需要熟悉Java、Spring Boot框架,了解MySQL数据库和Redis缓存,会PLC编程
输出:["java", "spring boot", "mysql", "redis", "plc"]
示例2(财务岗):
输入:负责费用管理与审核,月度经营利润分析,要求财务管理、会计学相关专业
输出:["财务管理", "会计", "经营分析"]
示例3(制造岗):
输入:负责模具开发,熟悉CAD制图,了解注塑成型工艺,有SolidWorks经验
输出:["模具", "cad", "注塑", "solidworks"]
示例4(纯素质岗):
输入:具备较强的沟通能力和创新意识,积极主动,专业不限
输出:[]
""";
String userMessage = "【岗位信息】\n标题: " + title + "\n职责: " + description + "\n要求: " + requirement;
String aiResponse = aiChatAbility.chat(systemPrompt, userMessage);
String json = AiResponseCleanTool.clean(aiResponse);
try {
JsonNode arrayNode = HttpTool.objectMapper.readTree(json);
if (!arrayNode.isArray() || arrayNode.isEmpty()) {
return;
}
List<Long> skillTagIds = new ArrayList<>();
for (JsonNode node : arrayNode) {
String skillName = node.asText("").trim().toLowerCase();
if (skillName.isBlank() || skillName.length() > 50) {
continue;
}
// INSERT IGNORE + SELECT 获取ID
Long tagId = findOrCreateSkillTag(skillName);
if (tagId != null && !skillTagIds.contains(tagId)) {
skillTagIds.add(tagId);
}
}
if (!skillTagIds.isEmpty()) {
jobCleanTransactionService.saveSkillTagRelations(jobId, skillTagIds);
}
} catch (Exception e) {
log.warn("技能提取AI返回解析失败: {}", json, e);
}
}
/**
* 查找或创建技能标签(依靠数据库唯一索引保证并发安全)
* <p>INSERT IGNORE 后 SELECT,避免加锁</p>
*/
private Long findOrCreateSkillTag(String name) {
// 先尝试插入(忽略重复),ID 由 IdWorker 生成
skillTagMapper.insertIgnore(IdWorker.getId(), name);
// 再查询拿ID
SkillTag tag = skillTagMapper.selectOne(new LambdaQueryWrapper<SkillTag>().eq(SkillTag::getName, name).last("LIMIT 1"));
return tag != null ? tag.getId() : null;
}
/** 构建第一次AI的系统提示词 */
private String buildSystemPrompt() {
return """
你是一个岗位数据清洗助手。请根据提供的原始岗位数据,提取并结构化为JSON格式。
返回JSON格式要求:
{
"valid": true/false,
"title": "岗位名称",
"salary": "标准化薪资,如10-20K、面议,无效则null",
"education": 0-4的数字(0=不限 1=大专 2=本科 3=硕士 4=博士),
"minExperience": 最低工作年限数字(不要求则0),
"employmentType": 0或1(0=全职 1=兼职,默认0),
"categoryId": 岗位分类ID(必选,从分类列表中选最接近的),
"requiredIndustryId": 行业ID(仅当明确提到行业经验要求时设置,列表中无完全匹配则选最相似的,未提到则null),
"description": "岗位职责,保持原文风格,格式化展示",
"requirement": "任职要求,保持原文风格,格式化展示",
"bonus": "加分项,无则null",
"tags": ["核心职能标签,最多5个,如数据分析、产品策略"],
"skillTags": ["技能关键词,最多8个,如Java、Spring Boot"],
"companyShortName": "简洁的公司简称,如字节跳动、中国平安",
"cities": ["工作城市列表,精确到市"]
}
规则:
1. description/requirement/bonus 均从原始的 description+experience+education 内容中提取,保持原文风格
2. 岗位标题不存在时,从描述中归纳生成
3. 薪资标准化为 10-20K、20K、面议 等格式,无效或空则null
4. categoryId 必须从分类列表中选一个,不允许为null
5. requiredIndustryId 仅当描述中明确提到行业经验要求时设置
6. tags 是核心职能标签(如数据分析、团队协作),最多5个
7. skillTags 是技能关键词(如Java、MySQL),最多8个
8. companyShortName 去掉地区后缀、招聘后缀、括号内容,保持简洁
9. 字符串值中不允许出现Tab、换行等控制字符,用空格或中文标点替代
10. 只返回JSON,不要其他内容
""";
}
/** 构建第一次AI的用户消息 */
private String buildUserMessage(AppJobData data) {
StringBuilder sb = new StringBuilder();
sb.append("【原始数据】\n");
sb.append("岗位名称: ").append(nullToEmpty(data.getJobTitle())).append("\n");
sb.append("薪资: ").append(nullToEmpty(data.getSalary())).append("\n");
sb.append("工作地点: ").append(nullToEmpty(data.getLocation())).append("\n");
sb.append("公司: ").append(nullToEmpty(data.getCompany())).append("\n");
sb.append("经验要求: ").append(nullToEmpty(data.getExperience())).append("\n");
sb.append("学历要求: ").append(nullToEmpty(data.getEducation())).append("\n");
sb.append("岗位详情: ").append(nullToEmpty(data.getDescription())).append("\n\n");
sb.append("【岗位分类列表】\n").append(dictCacheService.getJobCategoryText()).append("\n\n");
sb.append("【行业列表】\n").append(dictCacheService.getIndustryText());
return sb.toString();
}
private String nullToEmpty(String s) {
return s == null ? "" : s;
}
}
@@ -1,193 +0,0 @@
package org.jiayunet.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.jiayunet.mapper.AppJobDataMapper;
import org.jiayunet.mapper.CompanyMapper;
import org.jiayunet.mapper.JobMapper;
import org.jiayunet.mapper.JobRegionRelationMapper;
import org.jiayunet.mapper.JobSkillTagRelationMapper;
import org.jiayunet.pojo.po.AppJobData;
import org.jiayunet.pojo.po.Company;
import org.jiayunet.pojo.po.Job;
import org.jiayunet.pojo.po.JobRegionRelation;
import org.jiayunet.pojo.po.JobSkillTagRelation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.util.List;
/**
* 岗位清洗事务服务
* <p>独立出来解决 @Transactional 同类自调用失效问题</p>
* <p>依赖:JobMapper、CompanyMapper、JobRegionRelationMapper、JobSkillTagRelationMapper、AppJobDataMapper</p>
* <p>使用表:bg_job(写入/更新)、bg_company(查询/创建)、bg_job_region_relation(写入)、bg_job_skill_tag_relation(写入)、app_job_data(更新状态)</p>
*
* @author zk
*/
@Slf4j
@Service
public class JobCleanTransactionService {
@Autowired
private JobMapper jobMapper;
@Autowired
private CompanyMapper companyMapper;
@Autowired
private JobRegionRelationMapper jobRegionRelationMapper;
@Autowired
private AppJobDataMapper appJobDataMapper;
@Autowired
private JobSkillTagRelationMapper jobSkillTagRelationMapper;
/**
* 写入 bg_job + bg_job_region_relation + 更新 clean_status(短事务)
*/
@Transactional(rollbackFor = Exception.class)
public void saveJobData(JsonNode root, AppJobData data, Long companyId, String sourceId, List<String> regionCodes) {
Job job = new Job();
job.setTitle(root.path("title").asText(""));
job.setCompanyId(companyId);
job.setCategoryId(root.path("categoryId").asLong(0));
job.setEmploymentType(root.path("employmentType").asInt(0));
job.setDescription(root.path("description").asText(""));
job.setRequirement(root.path("requirement").asText(""));
job.setBonus(root.path("bonus").asText(null));
// 解析 JSON 数组为 List<String>
JsonNode tagsNode = root.path("tags");
if (tagsNode.isArray()) {
List<String> tags = new java.util.ArrayList<>();
tagsNode.forEach(node -> tags.add(node.asText()));
job.setTags(tags);
}
JsonNode skillTagsNode = root.path("skillTags");
if (skillTagsNode.isArray()) {
List<String> skillTags = new java.util.ArrayList<>();
skillTagsNode.forEach(node -> skillTags.add(node.asText()));
job.setSkillTags(skillTags);
}
String salary = root.path("salary").asText(null);
job.setSalary("null".equals(salary) ? null : salary);
job.setEducation(root.path("education").asInt(0));
job.setMinExperience(root.path("minExperience").asInt(0));
Long requiredIndustryId = root.path("requiredIndustryId").asLong(0);
job.setRequiredIndustryId(requiredIndustryId == 0 ? null : requiredIndustryId);
job.setSourceUrl(data.getDetailUrl());
job.setSourceId(sourceId);
job.setStatus(0);
job.setCreateTime(Instant.now());
job.setUpdateTime(Instant.now());
jobMapper.insert(job);
// 写入岗位-地区关联(批量插入)
if (!regionCodes.isEmpty()) {
List<JobRegionRelation> relations = regionCodes.stream().map(regionCode -> {
JobRegionRelation relation = new JobRegionRelation();
relation.setJobId(job.getId());
relation.setRegionCode(regionCode);
relation.setCreateTime(Instant.now());
return relation;
}).toList();
jobRegionRelationMapper.batchInsert(relations);
}
// 更新清洗状态
updateCleanStatus(data.getId(), 2);
}
/**
* 查找或创建公司(加锁防并发重复)
* <p>按 short_name 查询,不存在则创建一条待完善记录</p>
*/
public synchronized Long findOrCreateCompany(String shortName) {
Company company = companyMapper.selectOne(
new LambdaQueryWrapper<Company>()
.eq(Company::getShortName, shortName)
.last("LIMIT 1"));
if (company != null) {
return company.getId();
}
Company newCompany = new Company();
newCompany.setName(shortName);
newCompany.setShortName(shortName);
newCompany.setStatus(0);
newCompany.setCreateTime(Instant.now());
newCompany.setUpdateTime(Instant.now());
companyMapper.insert(newCompany);
return newCompany.getId();
}
/**
* 写入岗位-技能标签关联(批量插入)
*/
@Transactional(rollbackFor = Exception.class)
public void saveSkillTagRelations(Long jobId, List<Long> skillTagIds) {
List<JobSkillTagRelation> relations = skillTagIds.stream().map(skillTagId -> {
JobSkillTagRelation relation = new JobSkillTagRelation();
relation.setJobId(jobId);
relation.setSkillTagId(skillTagId);
relation.setCreateTime(Instant.now());
return relation;
}).toList();
jobSkillTagRelationMapper.batchInsert(relations);
}
/**
* 更新岗位的专业要求和专业敏感度
*/
public void updateJobMajor(Long jobId, List<Long> requiredMajorIds, Integer majorSensitivity) {
Job job = new Job();
job.setId(jobId);
job.setRequiredMajorIds(requiredMajorIds);
job.setMajorSensitivity(majorSensitivity);
job.setUpdateTime(Instant.now());
jobMapper.updateById(job);
}
/** 更新清洗状态 */
public void updateCleanStatus(Long id, int status) {
appJobDataMapper.update(null,
new LambdaUpdateWrapper<AppJobData>()
.set(AppJobData::getCleanStatus, status)
.eq(AppJobData::getId, id));
}
/**
* 原子锁定一批待清洗数据(事务内 SELECT FOR UPDATE + UPDATE 状态)
* <p>行锁保证并发安全,其他线程会阻塞直到事务提交</p>
*
* @return 锁定成功的数据列表,可能为空
*/
@Transactional(rollbackFor = Exception.class)
public List<AppJobData> lockBatch(int batchSize) {
List<AppJobData> dataList = appJobDataMapper.selectForUpdate(batchSize);
if (dataList.isEmpty()) {
return dataList;
}
List<Long> ids = dataList.stream().map(AppJobData::getId).toList();
appJobDataMapper.update(null,
new LambdaUpdateWrapper<AppJobData>()
.set(AppJobData::getCleanStatus, 1)
.in(AppJobData::getId, ids));
return dataList;
}
}