Compare commits
2 Commits
9a58722282
...
e0cb4493e3
| Author | SHA1 | Date | |
|---|---|---|---|
| e0cb4493e3 | |||
| b9d8c2724a |
+2
-2
@@ -1,5 +1,5 @@
|
|||||||
*/target
|
*/target
|
||||||
**/.idea
|
**/.idea
|
||||||
.idea/
|
.idea/
|
||||||
./log
|
logs/
|
||||||
./logs
|
log/
|
||||||
@@ -1,202 +0,0 @@
|
|||||||
---
|
|
||||||
inclusion: manual
|
|
||||||
---
|
|
||||||
|
|
||||||
# 数据清洗方案
|
|
||||||
|
|
||||||
## 总体架构
|
|
||||||
|
|
||||||
```
|
|
||||||
爬虫(公司网络) → app_job_data(原始数据)
|
|
||||||
↓
|
|
||||||
Java定时任务读取(多线程)
|
|
||||||
↓
|
|
||||||
三次AI调用:结构化 → 专业匹配 → 技能提取
|
|
||||||
↓
|
|
||||||
写入业务表(bg_company + bg_job + 关联表 + bg_skill_tag)
|
|
||||||
↓
|
|
||||||
公司信息不完整的 → 调AI补充
|
|
||||||
```
|
|
||||||
|
|
||||||
所有清洗逻辑放在 `manager` 模块,通过 `@Scheduled` 定时任务触发。
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 一、岗位清洗触发逻辑
|
|
||||||
|
|
||||||
### 1.1 状态管理
|
|
||||||
|
|
||||||
`app_job_data.clean_status`:
|
|
||||||
- 0=待清洗:新爬到的数据,默认值
|
|
||||||
- 1=清洗中:定时任务已锁定,正在处理
|
|
||||||
- 2=已入库:清洗成功,已写入 bg_job
|
|
||||||
- 3=已丢弃:AI判定为无效数据
|
|
||||||
|
|
||||||
### 1.2 两个定时任务
|
|
||||||
|
|
||||||
#### 任务A:岗位清洗(每5分钟)
|
|
||||||
1. 事务内 `SELECT FOR UPDATE` + `UPDATE clean_status=1` 批量锁定
|
|
||||||
2. 丢入线程池,多线程并发处理
|
|
||||||
3. 每条独立更新状态
|
|
||||||
|
|
||||||
#### 任务B:僵尸恢复(每30分钟)
|
|
||||||
将超时10分钟仍在清洗中的数据重置为待清洗。
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 二、岗位清洗逻辑(三次AI调用)
|
|
||||||
|
|
||||||
### 2.1 前置校验
|
|
||||||
- `description` 为空或长度 < 20 → 标记丢弃,跳过
|
|
||||||
|
|
||||||
### 2.2 参考数据缓存(DictCacheService)
|
|
||||||
|
|
||||||
启动时加载:
|
|
||||||
- `bg_job_category` 叶子节点(level=3),格式 `id:name(一级/二级)`
|
|
||||||
- `bg_industry` 叶子节点(level=2),格式 `id:name(一级)`
|
|
||||||
- `bg_major_category` 叶子节点(level=3),格式 `id:name(一级/二级)`
|
|
||||||
- `bg_china_regions_code` 省市级,供城市名匹配
|
|
||||||
|
|
||||||
### 2.3 第一次AI:提取岗位结构化信息
|
|
||||||
|
|
||||||
#### 输入
|
|
||||||
- 原始字段:job_title、salary、location、company、experience、education、description
|
|
||||||
- 参考列表:岗位分类、行业
|
|
||||||
|
|
||||||
#### 返回JSON
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"valid": true,
|
|
||||||
"title": "Java高级开发工程师",
|
|
||||||
"salary": "15-25K",
|
|
||||||
"education": 2,
|
|
||||||
"minExperience": 3,
|
|
||||||
"employmentType": 0,
|
|
||||||
"categoryId": 12,
|
|
||||||
"requiredIndustryId": 5,
|
|
||||||
"description": "岗位职责...",
|
|
||||||
"requirement": "任职要求...",
|
|
||||||
"bonus": "加分项...",
|
|
||||||
"tags": ["数据分析", "产品策略"],
|
|
||||||
"skillTags": ["Java", "Spring Boot"],
|
|
||||||
"companyShortName": "字节跳动",
|
|
||||||
"cities": ["北京", "上海"]
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
#### Java处理流程
|
|
||||||
1. valid=false → 丢弃
|
|
||||||
2. source_id 去重检查
|
|
||||||
3. 公司处理(查或创建 bg_company)
|
|
||||||
4. 地区匹配(cities → region_code)
|
|
||||||
5. 写入 bg_job + bg_job_region_relation + 更新 clean_status=2
|
|
||||||
|
|
||||||
### 2.4 第二次AI:专业匹配
|
|
||||||
|
|
||||||
岗位入库后,单独调用AI匹配专业要求。
|
|
||||||
|
|
||||||
#### 输入
|
|
||||||
- 岗位标题、职责、要求
|
|
||||||
- 三级专业分类列表(845条,格式 `id:name(一级/二级)`)
|
|
||||||
|
|
||||||
#### 返回JSON
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"requiredMajorIds": [101, 203],
|
|
||||||
"majorSensitivity": 1
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
#### 规则
|
|
||||||
- requiredMajorIds:从专业列表中选最相关的,最多3个,无明确要求则空数组
|
|
||||||
- majorSensitivity:0=不限 1=优先 2=强制
|
|
||||||
- majorSensitivity=0 时,requiredMajorIds 应为空数组
|
|
||||||
- 结果更新到 bg_job 的 required_major_ids 和 major_sensitivity 字段
|
|
||||||
|
|
||||||
### 2.5 第三次AI:技能提取
|
|
||||||
|
|
||||||
自由提取岗位核心技能,不再依赖预定义标签库。
|
|
||||||
|
|
||||||
#### 输入
|
|
||||||
- 岗位标题、职责、要求
|
|
||||||
|
|
||||||
#### 返回JSON
|
|
||||||
```json
|
|
||||||
["java", "spring boot", "mysql", "redis"]
|
|
||||||
```
|
|
||||||
|
|
||||||
#### prompt 规则
|
|
||||||
- 统一小写字母
|
|
||||||
- 尽量简短,使用业界通用缩写
|
|
||||||
- 提取范围:技术栈、专业领域知识、行业工具、专业资质能力
|
|
||||||
- 不提取纯软技能(沟通、协作、学习能力等)
|
|
||||||
- 无专业能力要求的岗位(销售、行政等)返回空数组
|
|
||||||
- 最多15个
|
|
||||||
|
|
||||||
#### 技能入库流程
|
|
||||||
1. 遍历AI返回的技能名,统一转小写
|
|
||||||
2. `INSERT IGNORE INTO bg_skill_tag`(依靠 name 唯一索引去重,ID 由 IdWorker 生成)
|
|
||||||
3. `SELECT id FROM bg_skill_tag WHERE name=?` 拿到ID
|
|
||||||
4. 写入 bg_job_skill_tag_relation
|
|
||||||
|
|
||||||
#### 并发安全
|
|
||||||
依靠数据库唯一索引保证,不加应用层锁。多线程同时插入相同技能名时,INSERT IGNORE 自动忽略重复。
|
|
||||||
|
|
||||||
### 2.6 容错设计
|
|
||||||
- 第二次、第三次AI调用失败不影响岗位入库
|
|
||||||
- 每次调用独立 try-catch,仅日志记录
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 三、公司数据触发逻辑
|
|
||||||
|
|
||||||
### 3.1 状态
|
|
||||||
`bg_company.status`:0=待完善 1=已完善 2=禁用 3=补充中 4=补充失败
|
|
||||||
|
|
||||||
### 3.2 定时任务
|
|
||||||
- 任务C:公司补充(每小时),SELECT FOR UPDATE 锁定 status=0 的数据
|
|
||||||
- 任务D:僵尸恢复(每小时,与C错开),重置超时10分钟的 status=3 数据
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 四、公司数据补充逻辑(AI补充)
|
|
||||||
|
|
||||||
### 4.1 流程
|
|
||||||
1. 拿 short_name 调AI
|
|
||||||
2. prompt 附带行业列表,AI直接返回 industryId
|
|
||||||
3. valid=false → status=4(补充失败)
|
|
||||||
4. regionCode:AI返回城市名,Java侧匹配
|
|
||||||
5. 回填 bg_company 各字段,status=1
|
|
||||||
|
|
||||||
### 4.2 AI返回JSON
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"valid": true,
|
|
||||||
"name": "北京字节跳动科技有限公司",
|
|
||||||
"city": "北京",
|
|
||||||
"companyType": "独角兽",
|
|
||||||
"industryId": 5,
|
|
||||||
"tags": ["短视频", "人工智能"],
|
|
||||||
"summary": "全球领先的内容平台和科技公司",
|
|
||||||
"description": "字节跳动成立于2012年...",
|
|
||||||
"foundedYear": "2012",
|
|
||||||
"address": "北京市海淀区...",
|
|
||||||
"scale": "10000人以上",
|
|
||||||
"website": "https://www.bytedance.com",
|
|
||||||
"financingStage": "已上市",
|
|
||||||
"latestValuation": "2200亿美元",
|
|
||||||
"news": ["新闻1", "新闻2", "新闻3"]
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 五、关键设计决策
|
|
||||||
|
|
||||||
| 决策点 | 结论 | 原因 |
|
|
||||||
|--------|------|------|
|
|
||||||
| 技能标签来源 | AI自由提取,自动入库 bg_skill_tag | 去掉预定义标签限制,覆盖面更广 |
|
|
||||||
| 技能并发去重 | INSERT IGNORE + 唯一索引 | 不加应用层锁,性能好 |
|
|
||||||
| 专业匹配 | 单独一次AI调用 | 专业列表845条,和第一次prompt合并会超token |
|
|
||||||
| AI调用次数 | 三次(结构化 + 专业 + 技能) | 各维度独立,容错互不影响 |
|
|
||||||
| 公司数据来源 | AI补充,不用工商API | 公司简称查API不精确,AI覆盖率高 |
|
|
||||||
@@ -97,22 +97,7 @@ app:
|
|||||||
# base-url 配到版本路径,如 DeepSeek: https://api.deepseek.com/v1,豆包: https://ark.cn-beijing.volces.com/api/v3
|
# base-url 配到版本路径,如 DeepSeek: https://api.deepseek.com/v1,豆包: https://ark.cn-beijing.volces.com/api/v3
|
||||||
ai:
|
ai:
|
||||||
providers:
|
providers:
|
||||||
job-clean:
|
|
||||||
base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3}
|
|
||||||
api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02}
|
|
||||||
model: ${AI_MODEL:doubao-seed-2-0-lite-260215}
|
|
||||||
job-recommend:
|
job-recommend:
|
||||||
base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3}
|
base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3}
|
||||||
api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02}
|
api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02}
|
||||||
model: ${AI_MODEL:doubao-1-5-lite-32k-250115}
|
model: ${AI_MODEL:doubao-1-5-lite-32k-250115}
|
||||||
|
|
||||||
|
|
||||||
# 岗位清洗配置
|
|
||||||
job-clean:
|
|
||||||
batch-size: 60
|
|
||||||
thread-pool-size: 60
|
|
||||||
|
|
||||||
# 公司数据补充配置
|
|
||||||
company-clean:
|
|
||||||
batch-size: 10
|
|
||||||
thread-pool-size: 3
|
|
||||||
@@ -97,22 +97,7 @@ app:
|
|||||||
# base-url 配到版本路径,如 DeepSeek: https://api.deepseek.com/v1,豆包: https://ark.cn-beijing.volces.com/api/v3
|
# base-url 配到版本路径,如 DeepSeek: https://api.deepseek.com/v1,豆包: https://ark.cn-beijing.volces.com/api/v3
|
||||||
ai:
|
ai:
|
||||||
providers:
|
providers:
|
||||||
job-clean:
|
|
||||||
base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3}
|
|
||||||
api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02}
|
|
||||||
model: ${AI_MODEL:doubao-seed-2-0-lite-260215}
|
|
||||||
job-recommend:
|
job-recommend:
|
||||||
base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3}
|
base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3}
|
||||||
api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02}
|
api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02}
|
||||||
model: ${AI_MODEL:doubao-1-5-lite-32k-250115}
|
model: ${AI_MODEL:doubao-1-5-lite-32k-250115}
|
||||||
|
|
||||||
|
|
||||||
# 岗位清洗配置
|
|
||||||
job-clean:
|
|
||||||
batch-size: 60
|
|
||||||
thread-pool-size: 60
|
|
||||||
|
|
||||||
# 公司数据补充配置
|
|
||||||
company-clean:
|
|
||||||
batch-size: 10
|
|
||||||
thread-pool-size: 3
|
|
||||||
@@ -97,22 +97,7 @@ app:
|
|||||||
# base-url 配到版本路径,如 DeepSeek: https://api.deepseek.com/v1,豆包: https://ark.cn-beijing.volces.com/api/v3
|
# base-url 配到版本路径,如 DeepSeek: https://api.deepseek.com/v1,豆包: https://ark.cn-beijing.volces.com/api/v3
|
||||||
ai:
|
ai:
|
||||||
providers:
|
providers:
|
||||||
job-clean:
|
|
||||||
base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3}
|
|
||||||
api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02}
|
|
||||||
model: ${AI_MODEL:doubao-seed-2-0-lite-260215}
|
|
||||||
job-recommend:
|
job-recommend:
|
||||||
base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3}
|
base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3}
|
||||||
api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02}
|
api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02}
|
||||||
model: ${AI_MODEL:doubao-1-5-lite-32k-250115}
|
model: ${AI_MODEL:doubao-1-5-lite-32k-250115}
|
||||||
|
|
||||||
|
|
||||||
# 岗位清洗配置
|
|
||||||
job-clean:
|
|
||||||
batch-size: 60
|
|
||||||
thread-pool-size: 60
|
|
||||||
|
|
||||||
# 公司数据补充配置
|
|
||||||
company-clean:
|
|
||||||
batch-size: 10
|
|
||||||
thread-pool-size: 3
|
|
||||||
@@ -1,24 +0,0 @@
|
|||||||
package org.jiayunet.mapper;
|
|
||||||
|
|
||||||
import org.apache.ibatis.annotations.Mapper;
|
|
||||||
import org.apache.ibatis.annotations.Param;
|
|
||||||
import org.apache.ibatis.annotations.Select;
|
|
||||||
import org.jiayunet.pojo.po.AppJobData;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 爬虫岗位原始数据Mapper
|
|
||||||
*
|
|
||||||
* @author zk
|
|
||||||
*/
|
|
||||||
@Mapper
|
|
||||||
public interface AppJobDataMapper extends CommonMapper<AppJobData> {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 查询待清洗数据并加行锁(SELECT ... FOR UPDATE)
|
|
||||||
* <p>必须在事务内调用,配合状态更新实现原子锁定</p>
|
|
||||||
*/
|
|
||||||
@Select("SELECT * FROM app_job_data WHERE clean_status = 0 AND is_valid = 1 LIMIT #{limit} FOR UPDATE")
|
|
||||||
List<AppJobData> selectForUpdate(@Param("limit") int limit);
|
|
||||||
}
|
|
||||||
@@ -1,79 +0,0 @@
|
|||||||
package org.jiayunet.pojo.po;
|
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.annotation.IdType;
|
|
||||||
import com.baomidou.mybatisplus.annotation.TableId;
|
|
||||||
import com.baomidou.mybatisplus.annotation.TableName;
|
|
||||||
import lombok.Data;
|
|
||||||
|
|
||||||
import java.time.Instant;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 爬虫岗位原始数据表(app_job_data)
|
|
||||||
* <p>存储爬虫抓取的原始岗位数据,供清洗服务读取并写入业务表</p>
|
|
||||||
*
|
|
||||||
* @author zk
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
@TableName(value = "app_job_data")
|
|
||||||
public class AppJobData {
|
|
||||||
|
|
||||||
@TableId(type = IdType.AUTO)
|
|
||||||
private Long id;
|
|
||||||
|
|
||||||
/** 关联爬取任务ID */
|
|
||||||
private Long taskCrawlId;
|
|
||||||
|
|
||||||
/** 职位名称 */
|
|
||||||
private String jobTitle;
|
|
||||||
|
|
||||||
/** 薪资 */
|
|
||||||
private String salary;
|
|
||||||
|
|
||||||
/** 工作地点 */
|
|
||||||
private String location;
|
|
||||||
|
|
||||||
/** 公司名称 */
|
|
||||||
private String company;
|
|
||||||
|
|
||||||
/** 经验要求 */
|
|
||||||
private String experience;
|
|
||||||
|
|
||||||
/** 学历要求 */
|
|
||||||
private String education;
|
|
||||||
|
|
||||||
/** 岗位详情(职责+要求+介绍) */
|
|
||||||
private String description;
|
|
||||||
|
|
||||||
/** 详情页URL */
|
|
||||||
private String detailUrl;
|
|
||||||
|
|
||||||
/** 内容哈希值,用于查重 */
|
|
||||||
private String contentHash;
|
|
||||||
|
|
||||||
/** 数据来源 0=官网 1=平台 */
|
|
||||||
private Integer sources;
|
|
||||||
|
|
||||||
/** 是否独立URL 0=页内展示 1=独立页面 */
|
|
||||||
private Integer isIndependentUrl;
|
|
||||||
|
|
||||||
/** 是否有效 0=无效 1=有效 */
|
|
||||||
private Integer isValid;
|
|
||||||
|
|
||||||
/** 有效期 */
|
|
||||||
private Instant expireAt;
|
|
||||||
|
|
||||||
/** 验证状态 pending=待验证 checking=验证中 checked=已验证 */
|
|
||||||
private String checkStatus;
|
|
||||||
|
|
||||||
/** 清洗状态 0=待清洗 1=清洗中 2=已入库 3=已丢弃 */
|
|
||||||
private Integer cleanStatus;
|
|
||||||
|
|
||||||
/** 上次验证时间 */
|
|
||||||
private Instant lastCheckAt;
|
|
||||||
|
|
||||||
/** 创建时间 */
|
|
||||||
private Instant createdAt;
|
|
||||||
|
|
||||||
/** 更新时间 */
|
|
||||||
private Instant updatedAt;
|
|
||||||
}
|
|
||||||
@@ -77,6 +77,12 @@ public class Job {
|
|||||||
/** 爬虫原始数据ID,用于去重 */
|
/** 爬虫原始数据ID,用于去重 */
|
||||||
private String sourceId;
|
private String sourceId;
|
||||||
|
|
||||||
|
/** 招聘分类 0=校招 1=实习 2=社招 3=其他 */
|
||||||
|
private Integer recruitCategory;
|
||||||
|
|
||||||
|
/** 发布日期 */
|
||||||
|
private Instant expireAt;
|
||||||
|
|
||||||
/** 状态 0=上架 1=下架 2=已失效 */
|
/** 状态 0=上架 1=下架 2=已失效 */
|
||||||
private Integer status;
|
private Integer status;
|
||||||
|
|
||||||
|
|||||||
@@ -1,184 +0,0 @@
|
|||||||
package org.jiayunet.service;
|
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.jiayunet.ai.AiChatAbility;
|
|
||||||
import org.jiayunet.mapper.CompanyMapper;
|
|
||||||
import org.jiayunet.pojo.po.Company;
|
|
||||||
import org.jiayunet.tool.HttpTool;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import java.time.Instant;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 公司数据补充服务
|
|
||||||
* <p>定时从 bg_company 捞取待完善数据,调用 AI 补充公司信息</p>
|
|
||||||
* <p>依赖:AiChatAbility(AI调用)、DictCacheService(行业列表/地区匹配)、CompanyCleanTransactionService(事务操作)</p>
|
|
||||||
* <p>使用表:bg_company(读取/更新)</p>
|
|
||||||
*
|
|
||||||
* @author zk
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
@Service
|
|
||||||
public class CompanyCleanService {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private AiChatAbility aiChatAbility;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private DictCacheService dictCacheService;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private CompanyCleanTransactionService companyCleanTransactionService;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private CompanyMapper companyMapper;
|
|
||||||
|
|
||||||
@Value("${app.company-clean.batch-size:10}")
|
|
||||||
private int batchSize;
|
|
||||||
|
|
||||||
@Value("${app.company-clean.thread-pool-size:3}")
|
|
||||||
private int threadPoolSize;
|
|
||||||
|
|
||||||
private ExecutorService executorService;
|
|
||||||
|
|
||||||
/** 初始化线程池 */
|
|
||||||
@javax.annotation.PostConstruct
|
|
||||||
public void init() {
|
|
||||||
executorService = Executors.newFixedThreadPool(threadPoolSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 任务C:公司数据补充(每小时)
|
|
||||||
* <p>1. 批量锁定待完善公司 2. 多线程并发调用AI补充 3. 回填数据</p>
|
|
||||||
*/
|
|
||||||
@Scheduled(cron = "0 */1 * * * ?")
|
|
||||||
public void cleanCompany() {
|
|
||||||
List<Company> companyList = companyCleanTransactionService.lockBatch(batchSize);
|
|
||||||
if (companyList.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
log.info("公司补充:锁定{}条数据", companyList.size());
|
|
||||||
|
|
||||||
// 多线程并发处理
|
|
||||||
for (Company company : companyList) {
|
|
||||||
executorService.submit(() -> {
|
|
||||||
try {
|
|
||||||
cleanOne(company);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("公司补充异常, id={}, shortName={}", company.getId(), company.getShortName(), e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 任务D:公司僵尸恢复(每小时,与任务C错开30分钟)
|
|
||||||
* <p>将超时10分钟仍在补充中的数据重置为待完善</p>
|
|
||||||
*/
|
|
||||||
@Scheduled(cron = "0 30 */1 * * ?")
|
|
||||||
public void recoverZombie() {
|
|
||||||
int recovered = companyMapper.update(null,
|
|
||||||
new LambdaUpdateWrapper<Company>()
|
|
||||||
.set(Company::getStatus, 0)
|
|
||||||
.eq(Company::getStatus, 3)
|
|
||||||
.lt(Company::getUpdateTime, Instant.now().minusSeconds(600)));
|
|
||||||
|
|
||||||
if (recovered > 0) {
|
|
||||||
log.info("公司僵尸恢复:重置{}条数据", recovered);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 补充单条公司数据
|
|
||||||
* <p>1. 拼prompt调AI 2. 解析结果 3. 回填数据</p>
|
|
||||||
*/
|
|
||||||
public void cleanOne(Company company) {
|
|
||||||
log.info("公司补充开始, id={}, shortName={}", company.getId(), company.getShortName());
|
|
||||||
|
|
||||||
String systemPrompt = buildSystemPrompt();
|
|
||||||
String userMessage = buildUserMessage(company.getShortName());
|
|
||||||
|
|
||||||
String aiResponse = aiChatAbility.chat(systemPrompt, userMessage);
|
|
||||||
|
|
||||||
try {
|
|
||||||
// 去掉可能的 markdown 代码块标记
|
|
||||||
String json = aiResponse.trim();
|
|
||||||
if (json.startsWith("```")) {
|
|
||||||
json = json.replaceAll("^```\\w*\\n?", "").replaceAll("\\n?```$", "").trim();
|
|
||||||
}
|
|
||||||
// 清除控制字符(Tab等),保留换行符
|
|
||||||
json = json.replaceAll("[\\x00-\\x08\\x0B\\x0C\\x0E-\\x1F]", "");
|
|
||||||
|
|
||||||
JsonNode root = HttpTool.objectMapper.readTree(json);
|
|
||||||
|
|
||||||
// valid 校验:AI不认识该公司
|
|
||||||
if (!root.path("valid").asBoolean(false)) {
|
|
||||||
log.info("公司补充:AI不认识该公司, id={}, shortName={}", company.getId(), company.getShortName());
|
|
||||||
companyCleanTransactionService.updateCompanyStatus(company.getId(), 4);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 回填数据
|
|
||||||
companyCleanTransactionService.saveCompanyData(root, company);
|
|
||||||
log.info("公司补充完成, id={}, shortName={}", company.getId(), company.getShortName());
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("公司AI返回解析失败, id={}, shortName={}, response={}",
|
|
||||||
company.getId(), company.getShortName(), aiResponse, e);
|
|
||||||
// 保持 status=3,由僵尸恢复任务重置
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** 构建系统提示词 */
|
|
||||||
private String buildSystemPrompt() {
|
|
||||||
return """
|
|
||||||
你是一个企业信息补充助手。根据提供的公司简称,补充该公司的详细信息。
|
|
||||||
|
|
||||||
返回JSON格式要求:
|
|
||||||
{
|
|
||||||
"valid": true/false,
|
|
||||||
"name": "公司全称",
|
|
||||||
"city": "总部所在城市,精确到市",
|
|
||||||
"companyType": "企业类型",
|
|
||||||
"industryId": 行业ID,
|
|
||||||
"tags": ["公司标签,最多5个"],
|
|
||||||
"summary": "一句话简介,100字以内",
|
|
||||||
"description": "公司详细描述,500字以内",
|
|
||||||
"foundedYear": "成立年份",
|
|
||||||
"address": "总部/注册地址",
|
|
||||||
"scale": "企业规模",
|
|
||||||
"website": "官网地址",
|
|
||||||
"financingStage": "融资状态",
|
|
||||||
"latestValuation": "最新估值",
|
|
||||||
"news": ["相关新闻,最多3条,每条50字以内"]
|
|
||||||
}
|
|
||||||
|
|
||||||
规则:
|
|
||||||
1. 如果不认识该公司,返回 {"valid": false}
|
|
||||||
2. name 根据公司简称推断完整的企业注册名称
|
|
||||||
3. companyType 取值:上市企业、独角兽、国企、央企、民营企业、外资企业、合资企业、事业单位、其他
|
|
||||||
4. industryId 必须从给定行业列表中选择,不确定则null
|
|
||||||
5. scale 取值:少于50人、50-150人、150-500人、500-1000人、1000-5000人、5000-10000人、10000人以上
|
|
||||||
6. tags 体现公司核心业务特征,最多5个
|
|
||||||
7. news 基于你的知识提供该公司最新的3条相关新闻,每条50字以内
|
|
||||||
8. latestValuation 知道就提供,不知道则null
|
|
||||||
9. 不确定的字段返回null,不要编造
|
|
||||||
10. 字符串值中不允许出现Tab、换行等控制字符
|
|
||||||
11. 只返回JSON,不要其他内容
|
|
||||||
""";
|
|
||||||
}
|
|
||||||
|
|
||||||
/** 构建用户消息 */
|
|
||||||
private String buildUserMessage(String shortName) {
|
|
||||||
return "【公司简称】\n" + shortName +
|
|
||||||
"\n\n【行业列表】\n" + dictCacheService.getIndustryText();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,165 +0,0 @@
|
|||||||
package org.jiayunet.service;
|
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.jiayunet.mapper.CompanyMapper;
|
|
||||||
import org.jiayunet.pojo.po.Company;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
|
||||||
|
|
||||||
import java.time.Instant;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 公司数据补充事务服务
|
|
||||||
* <p>独立出来解决 @Transactional 同类自调用失效问题</p>
|
|
||||||
* <p>依赖:CompanyMapper、DictCacheService(地区匹配、行业校验)</p>
|
|
||||||
* <p>使用表:bg_company(更新)</p>
|
|
||||||
*
|
|
||||||
* @author zk
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
@Service
|
|
||||||
public class CompanyCleanTransactionService {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private CompanyMapper companyMapper;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private DictCacheService dictCacheService;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 回填公司数据(事务)
|
|
||||||
* <p>1. 解析AI返回的各字段 2. 地区匹配 3. 行业校验 4. 更新bg_company</p>
|
|
||||||
*/
|
|
||||||
@Transactional(rollbackFor = Exception.class)
|
|
||||||
public void saveCompanyData(JsonNode root, Company company) {
|
|
||||||
// name:AI根据shortName推断的全称
|
|
||||||
String name = root.path("name").asText(null);
|
|
||||||
if (name != null && !name.isBlank()) {
|
|
||||||
company.setName(name);
|
|
||||||
}
|
|
||||||
|
|
||||||
// regionCode:AI返回城市名,Java侧匹配
|
|
||||||
String city = root.path("city").asText(null);
|
|
||||||
if (city != null && !city.isBlank()) {
|
|
||||||
String regionCode = dictCacheService.matchRegionCode(city);
|
|
||||||
company.setRegionCode(regionCode);
|
|
||||||
}
|
|
||||||
|
|
||||||
// companyType
|
|
||||||
String companyType = root.path("companyType").asText(null);
|
|
||||||
if (companyType != null && !"null".equals(companyType)) {
|
|
||||||
company.setCompanyType(companyType);
|
|
||||||
}
|
|
||||||
|
|
||||||
// industryId:校验是否存在于行业列表中
|
|
||||||
long industryId = root.path("industryId").asLong(0);
|
|
||||||
company.setIndustryId(industryId > 0 ? industryId : null);
|
|
||||||
|
|
||||||
// tags:JSON数组
|
|
||||||
JsonNode tagsNode = root.path("tags");
|
|
||||||
if (tagsNode.isArray() && !tagsNode.isEmpty()) {
|
|
||||||
List<String> tags = new ArrayList<>();
|
|
||||||
tagsNode.forEach(node -> tags.add(node.asText()));
|
|
||||||
company.setTags(tags);
|
|
||||||
}
|
|
||||||
|
|
||||||
// summary
|
|
||||||
String summary = root.path("summary").asText(null);
|
|
||||||
if (summary != null && !"null".equals(summary)) {
|
|
||||||
company.setSummary(summary);
|
|
||||||
}
|
|
||||||
|
|
||||||
// description
|
|
||||||
String description = root.path("description").asText(null);
|
|
||||||
if (description != null && !"null".equals(description)) {
|
|
||||||
company.setDescription(description);
|
|
||||||
}
|
|
||||||
|
|
||||||
// foundedYear
|
|
||||||
String foundedYear = root.path("foundedYear").asText(null);
|
|
||||||
if (foundedYear != null && !"null".equals(foundedYear)) {
|
|
||||||
company.setFoundedYear(foundedYear);
|
|
||||||
}
|
|
||||||
|
|
||||||
// address
|
|
||||||
String address = root.path("address").asText(null);
|
|
||||||
if (address != null && !"null".equals(address)) {
|
|
||||||
company.setAddress(address);
|
|
||||||
}
|
|
||||||
|
|
||||||
// scale
|
|
||||||
String scale = root.path("scale").asText(null);
|
|
||||||
if (scale != null && !"null".equals(scale)) {
|
|
||||||
company.setScale(scale);
|
|
||||||
}
|
|
||||||
|
|
||||||
// website
|
|
||||||
String website = root.path("website").asText(null);
|
|
||||||
if (website != null && !"null".equals(website)) {
|
|
||||||
company.setWebsite(website);
|
|
||||||
}
|
|
||||||
|
|
||||||
// financingStage
|
|
||||||
String financingStage = root.path("financingStage").asText(null);
|
|
||||||
if (financingStage != null && !"null".equals(financingStage)) {
|
|
||||||
company.setFinancingStage(financingStage);
|
|
||||||
}
|
|
||||||
|
|
||||||
// latestValuation
|
|
||||||
String latestValuation = root.path("latestValuation").asText(null);
|
|
||||||
if (latestValuation != null && !"null".equals(latestValuation)) {
|
|
||||||
company.setLatestValuation(latestValuation);
|
|
||||||
}
|
|
||||||
|
|
||||||
// news:JSON数组
|
|
||||||
JsonNode newsNode = root.path("news");
|
|
||||||
if (newsNode.isArray() && !newsNode.isEmpty()) {
|
|
||||||
List<String> news = new ArrayList<>();
|
|
||||||
newsNode.forEach(node -> news.add(node.asText()));
|
|
||||||
company.setNews(news);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新状态和时间
|
|
||||||
company.setStatus(1);
|
|
||||||
company.setUpdateTime(Instant.now());
|
|
||||||
|
|
||||||
companyMapper.updateById(company);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** 更新公司状态 */
|
|
||||||
public void updateCompanyStatus(Long id, int status) {
|
|
||||||
companyMapper.update(null,
|
|
||||||
new LambdaUpdateWrapper<Company>()
|
|
||||||
.set(Company::getStatus, status)
|
|
||||||
.set(Company::getUpdateTime, Instant.now())
|
|
||||||
.eq(Company::getId, id));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 原子锁定一批待完善公司(事务内 SELECT FOR UPDATE + UPDATE 状态)
|
|
||||||
* <p>行锁保证并发安全,其他线程会阻塞直到事务提交</p>
|
|
||||||
*
|
|
||||||
* @return 锁定成功的公司列表,可能为空
|
|
||||||
*/
|
|
||||||
@Transactional(rollbackFor = Exception.class)
|
|
||||||
public List<Company> lockBatch(int batchSize) {
|
|
||||||
List<Company> companyList = companyMapper.selectForUpdate(batchSize);
|
|
||||||
if (companyList.isEmpty()) {
|
|
||||||
return companyList;
|
|
||||||
}
|
|
||||||
|
|
||||||
List<Long> ids = companyList.stream().map(Company::getId).toList();
|
|
||||||
companyMapper.update(null,
|
|
||||||
new LambdaUpdateWrapper<Company>()
|
|
||||||
.set(Company::getStatus, 3)
|
|
||||||
.set(Company::getUpdateTime, Instant.now())
|
|
||||||
.in(Company::getId, ids));
|
|
||||||
|
|
||||||
return companyList;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -116,15 +116,6 @@ public class DictCacheService {
|
|||||||
regionList.size(), majorCategoryList.size(), majorLeafCount);
|
regionList.size(), majorCategoryList.size(), majorLeafCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** 获取岗位分类文本(叶子节点,带父级路径,逗号分隔) */
|
|
||||||
public String getJobCategoryText() {
|
|
||||||
return jobCategoryText;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** 获取行业文本(叶子节点,带父级路径,逗号分隔) */
|
|
||||||
public String getIndustryText() {
|
|
||||||
return industryText;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** 获取专业分类文本(三级叶子节点,带父级路径,逗号分隔) */
|
/** 获取专业分类文本(三级叶子节点,带父级路径,逗号分隔) */
|
||||||
public String getMajorCategoryText() {
|
public String getMajorCategoryText() {
|
||||||
|
|||||||
@@ -1,384 +0,0 @@
|
|||||||
package org.jiayunet.service;
|
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
||||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.jiayunet.ai.AiChatAbility;
|
|
||||||
import org.jiayunet.ai.AiResponseCleanTool;
|
|
||||||
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
|
|
||||||
import org.jiayunet.mapper.AppJobDataMapper;
|
|
||||||
import org.jiayunet.mapper.JobMapper;
|
|
||||||
import org.jiayunet.mapper.SkillTagMapper;
|
|
||||||
import org.jiayunet.pojo.po.AppJobData;
|
|
||||||
import org.jiayunet.pojo.po.Job;
|
|
||||||
import org.jiayunet.pojo.po.SkillTag;
|
|
||||||
import org.jiayunet.tool.HttpTool;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import java.time.Instant;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 岗位清洗服务
|
|
||||||
* <p>定时从 app_job_data 捞取待清洗数据,调用 AI 清洗后写入业务表</p>
|
|
||||||
* <p>依赖:AiChatAbility(AI调用)、DictCacheService(字典缓存)、JobCleanTransactionService(事务操作)</p>
|
|
||||||
* <p>使用表:app_job_data(读取/更新状态)、bg_job(去重查询/更新专业)、bg_skill_tag(技能入库)</p>
|
|
||||||
*
|
|
||||||
* @author zk
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
@Service
|
|
||||||
public class JobCleanService {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private AiChatAbility aiChatAbility;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private DictCacheService dictCacheService;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private JobCleanTransactionService jobCleanTransactionService;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private AppJobDataMapper appJobDataMapper;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private JobMapper jobMapper;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private SkillTagMapper skillTagMapper;
|
|
||||||
|
|
||||||
@Value("${app.job-clean.batch-size:20}")
|
|
||||||
private int batchSize;
|
|
||||||
|
|
||||||
@Value("${app.job-clean.thread-pool-size:5}")
|
|
||||||
private int threadPoolSize;
|
|
||||||
|
|
||||||
private ExecutorService executorService;
|
|
||||||
|
|
||||||
/** 初始化线程池 */
|
|
||||||
@javax.annotation.PostConstruct
|
|
||||||
public void init() {
|
|
||||||
executorService = Executors.newFixedThreadPool(threadPoolSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 定时任务A:岗位清洗(每3分钟)
|
|
||||||
* <p>1. 批量锁定待清洗数据 2. 多线程并发调用AI清洗 3. 写入业务表</p>
|
|
||||||
*/
|
|
||||||
@Scheduled(cron = "0 */3 * * * ?")
|
|
||||||
public void cleanJob() {
|
|
||||||
List<AppJobData> dataList = jobCleanTransactionService.lockBatch(batchSize);
|
|
||||||
if (dataList.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
log.info("岗位清洗:锁定{}条数据", dataList.size());
|
|
||||||
|
|
||||||
// 多线程并发处理
|
|
||||||
for (AppJobData data : dataList) {
|
|
||||||
executorService.submit(() -> {
|
|
||||||
try {
|
|
||||||
cleanOne(data);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("岗位清洗异常, id={}", data.getId(), e);
|
|
||||||
// 异常时保持 clean_status=1,由僵尸恢复任务重置
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 定时任务B:僵尸恢复(每30分钟)
|
|
||||||
* <p>将超时10分钟仍在清洗中的数据重置为待清洗</p>
|
|
||||||
*/
|
|
||||||
@Scheduled(cron = "0 */30 * * * ?")
|
|
||||||
public void recoverZombie() {
|
|
||||||
int recovered = appJobDataMapper.update(null,
|
|
||||||
new LambdaUpdateWrapper<AppJobData>()
|
|
||||||
.set(AppJobData::getCleanStatus, 0)
|
|
||||||
.eq(AppJobData::getCleanStatus, 1)
|
|
||||||
.lt(AppJobData::getUpdatedAt, Instant.now().minusSeconds(600)));
|
|
||||||
|
|
||||||
if (recovered > 0) {
|
|
||||||
log.info("僵尸恢复:重置{}条数据", recovered);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 清洗单条岗位数据
|
|
||||||
* <p>1. 前置校验 2. 第一次AI提取结构化信息 3. 写入业务表 4. 第二次AI匹配专业 5. 第三次AI提取技能</p>
|
|
||||||
*/
|
|
||||||
public void cleanOne(AppJobData data) {
|
|
||||||
// 1. 前置校验
|
|
||||||
if (data.getDescription() == null || data.getDescription().length() < 20) {
|
|
||||||
jobCleanTransactionService.updateCleanStatus(data.getId(), 3);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. 第一次AI:提取岗位结构化信息
|
|
||||||
String systemPrompt = buildSystemPrompt();
|
|
||||||
String userMessage = buildUserMessage(data);
|
|
||||||
String aiResponse = aiChatAbility.chat(systemPrompt, userMessage);
|
|
||||||
|
|
||||||
// 3. 解析JSON
|
|
||||||
try {
|
|
||||||
String json = AiResponseCleanTool.clean(aiResponse);
|
|
||||||
JsonNode root = HttpTool.objectMapper.readTree(json);
|
|
||||||
|
|
||||||
// valid 校验
|
|
||||||
if (!root.path("valid").asBoolean(false)) {
|
|
||||||
jobCleanTransactionService.updateCleanStatus(data.getId(), 3);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 4. 去重检查
|
|
||||||
String sourceId = String.valueOf(data.getId());
|
|
||||||
Long existJob = jobMapper.selectCount(new LambdaQueryWrapper<Job>().eq(Job::getSourceId, sourceId));
|
|
||||||
if (existJob > 0) {
|
|
||||||
jobCleanTransactionService.updateCleanStatus(data.getId(), 2);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 5. 公司处理(加锁防并发重复)
|
|
||||||
String companyShortName = root.path("companyShortName").asText("");
|
|
||||||
if (companyShortName.isBlank()) {
|
|
||||||
companyShortName = data.getCompany();
|
|
||||||
}
|
|
||||||
Long companyId = jobCleanTransactionService.findOrCreateCompany(companyShortName);
|
|
||||||
|
|
||||||
// 6. 地区处理
|
|
||||||
List<String> regionCodes = new ArrayList<>();
|
|
||||||
JsonNode citiesNode = root.path("cities");
|
|
||||||
if (citiesNode.isArray()) {
|
|
||||||
for (JsonNode city : citiesNode) {
|
|
||||||
String code = dictCacheService.matchRegionCode(city.asText());
|
|
||||||
if (code != null) {
|
|
||||||
regionCodes.add(code);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 7. 写入业务表
|
|
||||||
jobCleanTransactionService.saveJobData(root, data, companyId, sourceId, regionCodes);
|
|
||||||
|
|
||||||
// 拿到刚插入的 job
|
|
||||||
Job insertedJob = jobMapper.selectOne(new LambdaQueryWrapper<Job>().eq(Job::getSourceId, sourceId).last("LIMIT 1"));
|
|
||||||
if (insertedJob == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 8. 第二次AI:专业匹配(失败不影响岗位入库)
|
|
||||||
try {
|
|
||||||
String title = root.path("title").asText("");
|
|
||||||
String desc = root.path("description").asText("");
|
|
||||||
String req = root.path("requirement").asText("");
|
|
||||||
matchMajor(insertedJob.getId(), title, desc, req);
|
|
||||||
} catch (Exception ex) {
|
|
||||||
log.warn("专业匹配失败, id={}", data.getId(), ex);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 9. 第三次AI:技能提取(失败不影响岗位入库)
|
|
||||||
try {
|
|
||||||
String title = root.path("title").asText("");
|
|
||||||
String desc = root.path("description").asText("");
|
|
||||||
String req = root.path("requirement").asText("");
|
|
||||||
extractSkillTags(insertedJob.getId(), title, desc, req);
|
|
||||||
} catch (Exception ex) {
|
|
||||||
log.warn("技能提取失败, id={}", data.getId(), ex);
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("AI 返回解析失败, id={}, response={}", data.getId(), aiResponse, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 第二次AI调用:匹配专业 + 专业敏感度
|
|
||||||
* <p>传入岗位信息和三级专业分类列表,AI返回 requiredMajorIds + majorSensitivity → 更新 bg_job</p>
|
|
||||||
*/
|
|
||||||
private void matchMajor(Long jobId, String title, String description, String requirement) {
|
|
||||||
String systemPrompt = """
|
|
||||||
你是一个岗位专业匹配助手。根据岗位信息,判断该岗位对专业的要求。
|
|
||||||
返回JSON格式:
|
|
||||||
{
|
|
||||||
"requiredMajorIds": [专业ID数组,从专业列表中选择最相关的,最多3个,无明确要求则空数组],
|
|
||||||
"majorSensitivity": 0-2的数字(0=专业不限 1=优先相关专业 2=强制要求专业)
|
|
||||||
}
|
|
||||||
规则:
|
|
||||||
1. 只能从给定专业列表中选择ID
|
|
||||||
2. 根据岗位描述判断专业敏感度:明确写"XX专业"→2,写"相关专业优先"→1,未提及→0
|
|
||||||
3. majorSensitivity为0时,requiredMajorIds应为空数组
|
|
||||||
4. 只返回JSON,不要其他内容
|
|
||||||
""";
|
|
||||||
|
|
||||||
String userMessage = "【岗位信息】\n标题: " + title + "\n职责: " + description + "\n要求: " + requirement +
|
|
||||||
"\n\n【专业分类列表】\n" + dictCacheService.getMajorCategoryText();
|
|
||||||
|
|
||||||
String aiResponse = aiChatAbility.chat(systemPrompt, userMessage);
|
|
||||||
String json = AiResponseCleanTool.clean(aiResponse);
|
|
||||||
|
|
||||||
try {
|
|
||||||
JsonNode root = HttpTool.objectMapper.readTree(json);
|
|
||||||
int majorSensitivity = root.path("majorSensitivity").asInt(0);
|
|
||||||
|
|
||||||
List<Long> majorIds = new ArrayList<>();
|
|
||||||
JsonNode idsNode = root.path("requiredMajorIds");
|
|
||||||
if (idsNode.isArray()) {
|
|
||||||
for (JsonNode node : idsNode) {
|
|
||||||
long id = node.asLong(0);
|
|
||||||
if (id > 0) {
|
|
||||||
majorIds.add(id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新 bg_job
|
|
||||||
jobCleanTransactionService.updateJobMajor(jobId, majorIds.isEmpty() ? null : majorIds, majorSensitivity);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.warn("专业匹配AI返回解析失败: {}", json, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 第三次AI调用:自由提取技能标签
|
|
||||||
* <p>AI返回技能名数组 → INSERT IGNORE 入 bg_skill_tag → 查ID → 写关联表</p>
|
|
||||||
*/
|
|
||||||
private void extractSkillTags(Long jobId, String title, String description, String requirement) {
|
|
||||||
String systemPrompt = """
|
|
||||||
你是一个技能提取助手。根据岗位信息,提取该岗位要求的核心专业能力和工具技能。
|
|
||||||
返回JSON数组格式,如:["java", "spring boot", "mysql", "redis"]
|
|
||||||
规则:
|
|
||||||
1. 统一使用小写字母
|
|
||||||
2. 只保留核心词,去掉多余修饰(如"plc编程"→"plc","c语言"→"c","cad制图"→"cad")
|
|
||||||
3. 同一技能只保留最具体的表述,不要同时出现上位词和下位词(如有"机械设计"就不要再出"机械")
|
|
||||||
4. 提取范围包括:技术栈、专业领域知识、行业工具、专业资质能力等
|
|
||||||
5. 不提取纯软技能(如沟通能力、团队协作、学习能力、积极主动)
|
|
||||||
6. 不提取过于宽泛的标签(如"办公软件"、"windows")
|
|
||||||
7. 如果岗位完全没有专业能力要求(纯看态度和素质),返回空数组 []
|
|
||||||
8. 最多15个,按重要性排序
|
|
||||||
9. 只返回JSON数组,不要其他内容
|
|
||||||
示例1(技术岗):
|
|
||||||
输入:需要熟悉Java、Spring Boot框架,了解MySQL数据库和Redis缓存,会PLC编程
|
|
||||||
输出:["java", "spring boot", "mysql", "redis", "plc"]
|
|
||||||
示例2(财务岗):
|
|
||||||
输入:负责费用管理与审核,月度经营利润分析,要求财务管理、会计学相关专业
|
|
||||||
输出:["财务管理", "会计", "经营分析"]
|
|
||||||
示例3(制造岗):
|
|
||||||
输入:负责模具开发,熟悉CAD制图,了解注塑成型工艺,有SolidWorks经验
|
|
||||||
输出:["模具", "cad", "注塑", "solidworks"]
|
|
||||||
示例4(纯素质岗):
|
|
||||||
输入:具备较强的沟通能力和创新意识,积极主动,专业不限
|
|
||||||
输出:[]
|
|
||||||
""";
|
|
||||||
|
|
||||||
String userMessage = "【岗位信息】\n标题: " + title + "\n职责: " + description + "\n要求: " + requirement;
|
|
||||||
|
|
||||||
String aiResponse = aiChatAbility.chat(systemPrompt, userMessage);
|
|
||||||
String json = AiResponseCleanTool.clean(aiResponse);
|
|
||||||
|
|
||||||
try {
|
|
||||||
JsonNode arrayNode = HttpTool.objectMapper.readTree(json);
|
|
||||||
if (!arrayNode.isArray() || arrayNode.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
List<Long> skillTagIds = new ArrayList<>();
|
|
||||||
for (JsonNode node : arrayNode) {
|
|
||||||
String skillName = node.asText("").trim().toLowerCase();
|
|
||||||
if (skillName.isBlank() || skillName.length() > 50) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// INSERT IGNORE + SELECT 获取ID
|
|
||||||
Long tagId = findOrCreateSkillTag(skillName);
|
|
||||||
if (tagId != null && !skillTagIds.contains(tagId)) {
|
|
||||||
skillTagIds.add(tagId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!skillTagIds.isEmpty()) {
|
|
||||||
jobCleanTransactionService.saveSkillTagRelations(jobId, skillTagIds);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.warn("技能提取AI返回解析失败: {}", json, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 查找或创建技能标签(依靠数据库唯一索引保证并发安全)
|
|
||||||
* <p>INSERT IGNORE 后 SELECT,避免加锁</p>
|
|
||||||
*/
|
|
||||||
private Long findOrCreateSkillTag(String name) {
|
|
||||||
// 先尝试插入(忽略重复),ID 由 IdWorker 生成
|
|
||||||
skillTagMapper.insertIgnore(IdWorker.getId(), name);
|
|
||||||
|
|
||||||
// 再查询拿ID
|
|
||||||
SkillTag tag = skillTagMapper.selectOne(new LambdaQueryWrapper<SkillTag>().eq(SkillTag::getName, name).last("LIMIT 1"));
|
|
||||||
return tag != null ? tag.getId() : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** 构建第一次AI的系统提示词 */
|
|
||||||
private String buildSystemPrompt() {
|
|
||||||
return """
|
|
||||||
你是一个岗位数据清洗助手。请根据提供的原始岗位数据,提取并结构化为JSON格式。
|
|
||||||
|
|
||||||
返回JSON格式要求:
|
|
||||||
{
|
|
||||||
"valid": true/false,
|
|
||||||
"title": "岗位名称",
|
|
||||||
"salary": "标准化薪资,如10-20K、面议,无效则null",
|
|
||||||
"education": 0-4的数字(0=不限 1=大专 2=本科 3=硕士 4=博士),
|
|
||||||
"minExperience": 最低工作年限数字(不要求则0),
|
|
||||||
"employmentType": 0或1(0=全职 1=兼职,默认0),
|
|
||||||
"categoryId": 岗位分类ID(必选,从分类列表中选最接近的),
|
|
||||||
"requiredIndustryId": 行业ID(仅当明确提到行业经验要求时设置,列表中无完全匹配则选最相似的,未提到则null),
|
|
||||||
"description": "岗位职责,保持原文风格,格式化展示",
|
|
||||||
"requirement": "任职要求,保持原文风格,格式化展示",
|
|
||||||
"bonus": "加分项,无则null",
|
|
||||||
"tags": ["核心职能标签,最多5个,如数据分析、产品策略"],
|
|
||||||
"skillTags": ["技能关键词,最多8个,如Java、Spring Boot"],
|
|
||||||
"companyShortName": "简洁的公司简称,如字节跳动、中国平安",
|
|
||||||
"cities": ["工作城市列表,精确到市"]
|
|
||||||
}
|
|
||||||
|
|
||||||
规则:
|
|
||||||
1. description/requirement/bonus 均从原始的 description+experience+education 内容中提取,保持原文风格
|
|
||||||
2. 岗位标题不存在时,从描述中归纳生成
|
|
||||||
3. 薪资标准化为 10-20K、20K、面议 等格式,无效或空则null
|
|
||||||
4. categoryId 必须从分类列表中选一个,不允许为null
|
|
||||||
5. requiredIndustryId 仅当描述中明确提到行业经验要求时设置
|
|
||||||
6. tags 是核心职能标签(如数据分析、团队协作),最多5个
|
|
||||||
7. skillTags 是技能关键词(如Java、MySQL),最多8个
|
|
||||||
8. companyShortName 去掉地区后缀、招聘后缀、括号内容,保持简洁
|
|
||||||
9. 字符串值中不允许出现Tab、换行等控制字符,用空格或中文标点替代
|
|
||||||
10. 只返回JSON,不要其他内容
|
|
||||||
""";
|
|
||||||
}
|
|
||||||
|
|
||||||
/** 构建第一次AI的用户消息 */
|
|
||||||
private String buildUserMessage(AppJobData data) {
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
sb.append("【原始数据】\n");
|
|
||||||
sb.append("岗位名称: ").append(nullToEmpty(data.getJobTitle())).append("\n");
|
|
||||||
sb.append("薪资: ").append(nullToEmpty(data.getSalary())).append("\n");
|
|
||||||
sb.append("工作地点: ").append(nullToEmpty(data.getLocation())).append("\n");
|
|
||||||
sb.append("公司: ").append(nullToEmpty(data.getCompany())).append("\n");
|
|
||||||
sb.append("经验要求: ").append(nullToEmpty(data.getExperience())).append("\n");
|
|
||||||
sb.append("学历要求: ").append(nullToEmpty(data.getEducation())).append("\n");
|
|
||||||
sb.append("岗位详情: ").append(nullToEmpty(data.getDescription())).append("\n\n");
|
|
||||||
sb.append("【岗位分类列表】\n").append(dictCacheService.getJobCategoryText()).append("\n\n");
|
|
||||||
sb.append("【行业列表】\n").append(dictCacheService.getIndustryText());
|
|
||||||
return sb.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
private String nullToEmpty(String s) {
|
|
||||||
return s == null ? "" : s;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,193 +0,0 @@
|
|||||||
package org.jiayunet.service;
|
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
||||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.jiayunet.mapper.AppJobDataMapper;
|
|
||||||
import org.jiayunet.mapper.CompanyMapper;
|
|
||||||
import org.jiayunet.mapper.JobMapper;
|
|
||||||
import org.jiayunet.mapper.JobRegionRelationMapper;
|
|
||||||
import org.jiayunet.mapper.JobSkillTagRelationMapper;
|
|
||||||
import org.jiayunet.pojo.po.AppJobData;
|
|
||||||
import org.jiayunet.pojo.po.Company;
|
|
||||||
import org.jiayunet.pojo.po.Job;
|
|
||||||
import org.jiayunet.pojo.po.JobRegionRelation;
|
|
||||||
import org.jiayunet.pojo.po.JobSkillTagRelation;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
|
||||||
|
|
||||||
import java.time.Instant;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 岗位清洗事务服务
|
|
||||||
* <p>独立出来解决 @Transactional 同类自调用失效问题</p>
|
|
||||||
* <p>依赖:JobMapper、CompanyMapper、JobRegionRelationMapper、JobSkillTagRelationMapper、AppJobDataMapper</p>
|
|
||||||
* <p>使用表:bg_job(写入/更新)、bg_company(查询/创建)、bg_job_region_relation(写入)、bg_job_skill_tag_relation(写入)、app_job_data(更新状态)</p>
|
|
||||||
*
|
|
||||||
* @author zk
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
@Service
|
|
||||||
public class JobCleanTransactionService {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private JobMapper jobMapper;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private CompanyMapper companyMapper;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private JobRegionRelationMapper jobRegionRelationMapper;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private AppJobDataMapper appJobDataMapper;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private JobSkillTagRelationMapper jobSkillTagRelationMapper;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 写入 bg_job + bg_job_region_relation + 更新 clean_status(短事务)
|
|
||||||
*/
|
|
||||||
@Transactional(rollbackFor = Exception.class)
|
|
||||||
public void saveJobData(JsonNode root, AppJobData data, Long companyId, String sourceId, List<String> regionCodes) {
|
|
||||||
Job job = new Job();
|
|
||||||
job.setTitle(root.path("title").asText(""));
|
|
||||||
job.setCompanyId(companyId);
|
|
||||||
job.setCategoryId(root.path("categoryId").asLong(0));
|
|
||||||
job.setEmploymentType(root.path("employmentType").asInt(0));
|
|
||||||
job.setDescription(root.path("description").asText(""));
|
|
||||||
job.setRequirement(root.path("requirement").asText(""));
|
|
||||||
job.setBonus(root.path("bonus").asText(null));
|
|
||||||
|
|
||||||
// 解析 JSON 数组为 List<String>
|
|
||||||
JsonNode tagsNode = root.path("tags");
|
|
||||||
if (tagsNode.isArray()) {
|
|
||||||
List<String> tags = new java.util.ArrayList<>();
|
|
||||||
tagsNode.forEach(node -> tags.add(node.asText()));
|
|
||||||
job.setTags(tags);
|
|
||||||
}
|
|
||||||
|
|
||||||
JsonNode skillTagsNode = root.path("skillTags");
|
|
||||||
if (skillTagsNode.isArray()) {
|
|
||||||
List<String> skillTags = new java.util.ArrayList<>();
|
|
||||||
skillTagsNode.forEach(node -> skillTags.add(node.asText()));
|
|
||||||
job.setSkillTags(skillTags);
|
|
||||||
}
|
|
||||||
|
|
||||||
String salary = root.path("salary").asText(null);
|
|
||||||
job.setSalary("null".equals(salary) ? null : salary);
|
|
||||||
|
|
||||||
job.setEducation(root.path("education").asInt(0));
|
|
||||||
job.setMinExperience(root.path("minExperience").asInt(0));
|
|
||||||
|
|
||||||
Long requiredIndustryId = root.path("requiredIndustryId").asLong(0);
|
|
||||||
job.setRequiredIndustryId(requiredIndustryId == 0 ? null : requiredIndustryId);
|
|
||||||
|
|
||||||
job.setSourceUrl(data.getDetailUrl());
|
|
||||||
job.setSourceId(sourceId);
|
|
||||||
job.setStatus(0);
|
|
||||||
job.setCreateTime(Instant.now());
|
|
||||||
job.setUpdateTime(Instant.now());
|
|
||||||
|
|
||||||
jobMapper.insert(job);
|
|
||||||
|
|
||||||
// 写入岗位-地区关联(批量插入)
|
|
||||||
if (!regionCodes.isEmpty()) {
|
|
||||||
List<JobRegionRelation> relations = regionCodes.stream().map(regionCode -> {
|
|
||||||
JobRegionRelation relation = new JobRegionRelation();
|
|
||||||
relation.setJobId(job.getId());
|
|
||||||
relation.setRegionCode(regionCode);
|
|
||||||
relation.setCreateTime(Instant.now());
|
|
||||||
return relation;
|
|
||||||
}).toList();
|
|
||||||
jobRegionRelationMapper.batchInsert(relations);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新清洗状态
|
|
||||||
updateCleanStatus(data.getId(), 2);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 查找或创建公司(加锁防并发重复)
|
|
||||||
* <p>按 short_name 查询,不存在则创建一条待完善记录</p>
|
|
||||||
*/
|
|
||||||
public synchronized Long findOrCreateCompany(String shortName) {
|
|
||||||
Company company = companyMapper.selectOne(
|
|
||||||
new LambdaQueryWrapper<Company>()
|
|
||||||
.eq(Company::getShortName, shortName)
|
|
||||||
.last("LIMIT 1"));
|
|
||||||
|
|
||||||
if (company != null) {
|
|
||||||
return company.getId();
|
|
||||||
}
|
|
||||||
|
|
||||||
Company newCompany = new Company();
|
|
||||||
newCompany.setName(shortName);
|
|
||||||
newCompany.setShortName(shortName);
|
|
||||||
newCompany.setStatus(0);
|
|
||||||
newCompany.setCreateTime(Instant.now());
|
|
||||||
newCompany.setUpdateTime(Instant.now());
|
|
||||||
companyMapper.insert(newCompany);
|
|
||||||
return newCompany.getId();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 写入岗位-技能标签关联(批量插入)
|
|
||||||
*/
|
|
||||||
@Transactional(rollbackFor = Exception.class)
|
|
||||||
public void saveSkillTagRelations(Long jobId, List<Long> skillTagIds) {
|
|
||||||
List<JobSkillTagRelation> relations = skillTagIds.stream().map(skillTagId -> {
|
|
||||||
JobSkillTagRelation relation = new JobSkillTagRelation();
|
|
||||||
relation.setJobId(jobId);
|
|
||||||
relation.setSkillTagId(skillTagId);
|
|
||||||
relation.setCreateTime(Instant.now());
|
|
||||||
return relation;
|
|
||||||
}).toList();
|
|
||||||
jobSkillTagRelationMapper.batchInsert(relations);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 更新岗位的专业要求和专业敏感度
|
|
||||||
*/
|
|
||||||
public void updateJobMajor(Long jobId, List<Long> requiredMajorIds, Integer majorSensitivity) {
|
|
||||||
Job job = new Job();
|
|
||||||
job.setId(jobId);
|
|
||||||
job.setRequiredMajorIds(requiredMajorIds);
|
|
||||||
job.setMajorSensitivity(majorSensitivity);
|
|
||||||
job.setUpdateTime(Instant.now());
|
|
||||||
jobMapper.updateById(job);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** 更新清洗状态 */
|
|
||||||
public void updateCleanStatus(Long id, int status) {
|
|
||||||
appJobDataMapper.update(null,
|
|
||||||
new LambdaUpdateWrapper<AppJobData>()
|
|
||||||
.set(AppJobData::getCleanStatus, status)
|
|
||||||
.eq(AppJobData::getId, id));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 原子锁定一批待清洗数据(事务内 SELECT FOR UPDATE + UPDATE 状态)
|
|
||||||
* <p>行锁保证并发安全,其他线程会阻塞直到事务提交</p>
|
|
||||||
*
|
|
||||||
* @return 锁定成功的数据列表,可能为空
|
|
||||||
*/
|
|
||||||
@Transactional(rollbackFor = Exception.class)
|
|
||||||
public List<AppJobData> lockBatch(int batchSize) {
|
|
||||||
List<AppJobData> dataList = appJobDataMapper.selectForUpdate(batchSize);
|
|
||||||
if (dataList.isEmpty()) {
|
|
||||||
return dataList;
|
|
||||||
}
|
|
||||||
|
|
||||||
List<Long> ids = dataList.stream().map(AppJobData::getId).toList();
|
|
||||||
appJobDataMapper.update(null,
|
|
||||||
new LambdaUpdateWrapper<AppJobData>()
|
|
||||||
.set(AppJobData::getCleanStatus, 1)
|
|
||||||
.in(AppJobData::getId, ids));
|
|
||||||
|
|
||||||
return dataList;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user