diff --git a/.gitignore b/.gitignore index e58ef9d..75e8ebe 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ */target **/.idea .idea/ -./log -./logs \ No newline at end of file +logs/ +log/ \ No newline at end of file diff --git a/.kiro/steering/数据清洗方案.md b/.kiro/steering/数据清洗方案.md deleted file mode 100644 index 9d25757..0000000 --- a/.kiro/steering/数据清洗方案.md +++ /dev/null @@ -1,202 +0,0 @@ ---- -inclusion: manual ---- - -# 数据清洗方案 - -## 总体架构 - -``` -爬虫(公司网络) → app_job_data(原始数据) - ↓ - Java定时任务读取(多线程) - ↓ - 三次AI调用:结构化 → 专业匹配 → 技能提取 - ↓ - 写入业务表(bg_company + bg_job + 关联表 + bg_skill_tag) - ↓ - 公司信息不完整的 → 调AI补充 -``` - -所有清洗逻辑放在 `manager` 模块,通过 `@Scheduled` 定时任务触发。 - ---- - -## 一、岗位清洗触发逻辑 - -### 1.1 状态管理 - -`app_job_data.clean_status`: -- 0=待清洗:新爬到的数据,默认值 -- 1=清洗中:定时任务已锁定,正在处理 -- 2=已入库:清洗成功,已写入 bg_job -- 3=已丢弃:AI判定为无效数据 - -### 1.2 两个定时任务 - -#### 任务A:岗位清洗(每5分钟) -1. 事务内 `SELECT FOR UPDATE` + `UPDATE clean_status=1` 批量锁定 -2. 丢入线程池,多线程并发处理 -3. 每条独立更新状态 - -#### 任务B:僵尸恢复(每30分钟) -将超时10分钟仍在清洗中的数据重置为待清洗。 - ---- - -## 二、岗位清洗逻辑(三次AI调用) - -### 2.1 前置校验 -- `description` 为空或长度 < 20 → 标记丢弃,跳过 - -### 2.2 参考数据缓存(DictCacheService) - -启动时加载: -- `bg_job_category` 叶子节点(level=3),格式 `id:name(一级/二级)` -- `bg_industry` 叶子节点(level=2),格式 `id:name(一级)` -- `bg_major_category` 叶子节点(level=3),格式 `id:name(一级/二级)` -- `bg_china_regions_code` 省市级,供城市名匹配 - -### 2.3 第一次AI:提取岗位结构化信息 - -#### 输入 -- 原始字段:job_title、salary、location、company、experience、education、description -- 参考列表:岗位分类、行业 - -#### 返回JSON -```json -{ - "valid": true, - "title": "Java高级开发工程师", - "salary": "15-25K", - "education": 2, - "minExperience": 3, - "employmentType": 0, - "categoryId": 12, - "requiredIndustryId": 5, - "description": "岗位职责...", - "requirement": "任职要求...", - "bonus": "加分项...", - "tags": ["数据分析", "产品策略"], - "skillTags": ["Java", "Spring Boot"], - "companyShortName": "字节跳动", - "cities": ["北京", "上海"] -} -``` - -#### Java处理流程 -1. valid=false → 丢弃 -2. source_id 去重检查 -3. 公司处理(查或创建 bg_company) -4. 地区匹配(cities → region_code) -5. 写入 bg_job + bg_job_region_relation + 更新 clean_status=2 - -### 2.4 第二次AI:专业匹配 - -岗位入库后,单独调用AI匹配专业要求。 - -#### 输入 -- 岗位标题、职责、要求 -- 三级专业分类列表(845条,格式 `id:name(一级/二级)`) - -#### 返回JSON -```json -{ - "requiredMajorIds": [101, 203], - "majorSensitivity": 1 -} -``` - -#### 规则 -- requiredMajorIds:从专业列表中选最相关的,最多3个,无明确要求则空数组 -- majorSensitivity:0=不限 1=优先 2=强制 -- majorSensitivity=0 时,requiredMajorIds 应为空数组 -- 结果更新到 bg_job 的 required_major_ids 和 major_sensitivity 字段 - -### 2.5 第三次AI:技能提取 - -自由提取岗位核心技能,不再依赖预定义标签库。 - -#### 输入 -- 岗位标题、职责、要求 - -#### 返回JSON -```json -["java", "spring boot", "mysql", "redis"] -``` - -#### prompt 规则 -- 统一小写字母 -- 尽量简短,使用业界通用缩写 -- 提取范围:技术栈、专业领域知识、行业工具、专业资质能力 -- 不提取纯软技能(沟通、协作、学习能力等) -- 无专业能力要求的岗位(销售、行政等)返回空数组 -- 最多15个 - -#### 技能入库流程 -1. 遍历AI返回的技能名,统一转小写 -2. `INSERT IGNORE INTO bg_skill_tag`(依靠 name 唯一索引去重,ID 由 IdWorker 生成) -3. `SELECT id FROM bg_skill_tag WHERE name=?` 拿到ID -4. 写入 bg_job_skill_tag_relation - -#### 并发安全 -依靠数据库唯一索引保证,不加应用层锁。多线程同时插入相同技能名时,INSERT IGNORE 自动忽略重复。 - -### 2.6 容错设计 -- 第二次、第三次AI调用失败不影响岗位入库 -- 每次调用独立 try-catch,仅日志记录 - ---- - -## 三、公司数据触发逻辑 - -### 3.1 状态 -`bg_company.status`:0=待完善 1=已完善 2=禁用 3=补充中 4=补充失败 - -### 3.2 定时任务 -- 任务C:公司补充(每小时),SELECT FOR UPDATE 锁定 status=0 的数据 -- 任务D:僵尸恢复(每小时,与C错开),重置超时10分钟的 status=3 数据 - ---- - -## 四、公司数据补充逻辑(AI补充) - -### 4.1 流程 -1. 拿 short_name 调AI -2. prompt 附带行业列表,AI直接返回 industryId -3. valid=false → status=4(补充失败) -4. regionCode:AI返回城市名,Java侧匹配 -5. 回填 bg_company 各字段,status=1 - -### 4.2 AI返回JSON -```json -{ - "valid": true, - "name": "北京字节跳动科技有限公司", - "city": "北京", - "companyType": "独角兽", - "industryId": 5, - "tags": ["短视频", "人工智能"], - "summary": "全球领先的内容平台和科技公司", - "description": "字节跳动成立于2012年...", - "foundedYear": "2012", - "address": "北京市海淀区...", - "scale": "10000人以上", - "website": "https://www.bytedance.com", - "financingStage": "已上市", - "latestValuation": "2200亿美元", - "news": ["新闻1", "新闻2", "新闻3"] -} -``` - ---- - -## 五、关键设计决策 - -| 决策点 | 结论 | 原因 | -|--------|------|------| -| 技能标签来源 | AI自由提取,自动入库 bg_skill_tag | 去掉预定义标签限制,覆盖面更广 | -| 技能并发去重 | INSERT IGNORE + 唯一索引 | 不加应用层锁,性能好 | -| 专业匹配 | 单独一次AI调用 | 专业列表845条,和第一次prompt合并会超token | -| AI调用次数 | 三次(结构化 + 专业 + 技能) | 各维度独立,容错互不影响 | -| 公司数据来源 | AI补充,不用工商API | 公司简称查API不精确,AI覆盖率高 | diff --git a/client-api/src/main/resources/application-dev.yml b/client-api/src/main/resources/application-dev.yml index b9ae4b2..78371e1 100644 --- a/client-api/src/main/resources/application-dev.yml +++ b/client-api/src/main/resources/application-dev.yml @@ -97,22 +97,7 @@ app: # base-url 配到版本路径,如 DeepSeek: https://api.deepseek.com/v1,豆包: https://ark.cn-beijing.volces.com/api/v3 ai: providers: - job-clean: - base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3} - api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02} - model: ${AI_MODEL:doubao-seed-2-0-lite-260215} job-recommend: base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3} api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02} - model: ${AI_MODEL:doubao-1-5-lite-32k-250115} - - - # 岗位清洗配置 - job-clean: - batch-size: 60 - thread-pool-size: 60 - - # 公司数据补充配置 - company-clean: - batch-size: 10 - thread-pool-size: 3 \ No newline at end of file + model: ${AI_MODEL:doubao-1-5-lite-32k-250115} \ No newline at end of file diff --git a/client-api/src/main/resources/application-prod.yml b/client-api/src/main/resources/application-prod.yml index 9769e53..6064093 100644 --- a/client-api/src/main/resources/application-prod.yml +++ b/client-api/src/main/resources/application-prod.yml @@ -97,22 +97,7 @@ app: # base-url 配到版本路径,如 DeepSeek: https://api.deepseek.com/v1,豆包: https://ark.cn-beijing.volces.com/api/v3 ai: providers: - job-clean: - base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3} - api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02} - model: ${AI_MODEL:doubao-seed-2-0-lite-260215} job-recommend: base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3} api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02} - model: ${AI_MODEL:doubao-1-5-lite-32k-250115} - - - # 岗位清洗配置 - job-clean: - batch-size: 60 - thread-pool-size: 60 - - # 公司数据补充配置 - company-clean: - batch-size: 10 - thread-pool-size: 3 \ No newline at end of file + model: ${AI_MODEL:doubao-1-5-lite-32k-250115} \ No newline at end of file diff --git a/client-api/src/main/resources/application-test.yml b/client-api/src/main/resources/application-test.yml index b9ae4b2..78371e1 100644 --- a/client-api/src/main/resources/application-test.yml +++ b/client-api/src/main/resources/application-test.yml @@ -97,22 +97,7 @@ app: # base-url 配到版本路径,如 DeepSeek: https://api.deepseek.com/v1,豆包: https://ark.cn-beijing.volces.com/api/v3 ai: providers: - job-clean: - base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3} - api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02} - model: ${AI_MODEL:doubao-seed-2-0-lite-260215} job-recommend: base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3} api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02} - model: ${AI_MODEL:doubao-1-5-lite-32k-250115} - - - # 岗位清洗配置 - job-clean: - batch-size: 60 - thread-pool-size: 60 - - # 公司数据补充配置 - company-clean: - batch-size: 10 - thread-pool-size: 3 \ No newline at end of file + model: ${AI_MODEL:doubao-1-5-lite-32k-250115} \ No newline at end of file diff --git a/manager/src/main/java/org/jiayunet/mapper/AppJobDataMapper.java b/manager/src/main/java/org/jiayunet/mapper/AppJobDataMapper.java deleted file mode 100644 index 2685ed0..0000000 --- a/manager/src/main/java/org/jiayunet/mapper/AppJobDataMapper.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.jiayunet.mapper; - -import org.apache.ibatis.annotations.Mapper; -import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.Select; -import org.jiayunet.pojo.po.AppJobData; - -import java.util.List; - -/** - * 爬虫岗位原始数据Mapper - * - * @author zk - */ -@Mapper -public interface AppJobDataMapper extends CommonMapper { - - /** - * 查询待清洗数据并加行锁(SELECT ... FOR UPDATE) - *

必须在事务内调用,配合状态更新实现原子锁定

- */ - @Select("SELECT * FROM app_job_data WHERE clean_status = 'pending' LIMIT #{limit} FOR UPDATE") - List selectForUpdate(@Param("limit") int limit); -} diff --git a/manager/src/main/java/org/jiayunet/pojo/po/AppJobData.java b/manager/src/main/java/org/jiayunet/pojo/po/AppJobData.java deleted file mode 100644 index 9f0a608..0000000 --- a/manager/src/main/java/org/jiayunet/pojo/po/AppJobData.java +++ /dev/null @@ -1,78 +0,0 @@ -package org.jiayunet.pojo.po; - -import com.baomidou.mybatisplus.annotation.IdType; -import com.baomidou.mybatisplus.annotation.TableField; -import com.baomidou.mybatisplus.annotation.TableId; -import com.baomidou.mybatisplus.annotation.TableName; -import lombok.Data; - -import java.time.Instant; - -/** - * 爬虫岗位原始数据表(app_job_data) - *

存储爬虫抓取的原始岗位数据,供清洗服务读取并写入业务表

- * - * @author zk - */ -@Data -@TableName(value = "app_job_data") -public class AppJobData { - - @TableId(type = IdType.AUTO) - private Long id; - - /** 关联urllistid */ - @TableField("urllistid") - private Long urllistId; - - /** 职位名称 */ - private String jobTitle; - - /** 薪资 */ - private String salary; - - /** 工作地点 */ - private String location; - - /** 公司名称 */ - private String company; - - /** 经验要求 */ - private String experience; - - /** 学历要求 */ - private String education; - - /** 岗位详情(职责+要求+介绍) */ - private String description; - - /** 详情页URL */ - private String detailUrl; - - /** 招聘分类: 0=校招, 1=实习, 2=社招, 3=其他 */ - private Integer recruitCategory; - - /** 内容哈希值,用于查重 */ - private String contentHash; - - /** 数据来源 0=官网 1=平台 */ - private Integer sources; - - /** 发布日期 */ - private Instant expireAt; - - /** 创建时间 */ - private Instant createdAt; - - /** 更新时间 */ - private Instant updatedAt; - - /** 清洗状态: pending=待清洗 cleaning=清洗中 cleaned=已清洗 discarded=已丢弃 */ - private String cleanStatus; - - /** 清洗开始时间 */ - private Instant cleanStartedAt; - - /** 清洗完成时间 */ - private Instant cleanedAt; -} diff --git a/manager/src/main/java/org/jiayunet/service/CompanyCleanService.java b/manager/src/main/java/org/jiayunet/service/CompanyCleanService.java deleted file mode 100644 index 7ad5361..0000000 --- a/manager/src/main/java/org/jiayunet/service/CompanyCleanService.java +++ /dev/null @@ -1,184 +0,0 @@ -package org.jiayunet.service; - -import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; -import com.fasterxml.jackson.databind.JsonNode; -import lombok.extern.slf4j.Slf4j; -import org.jiayunet.ai.AiChatAbility; -import org.jiayunet.mapper.CompanyMapper; -import org.jiayunet.pojo.po.Company; -import org.jiayunet.tool.HttpTool; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Service; - -import java.time.Instant; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -/** - * 公司数据补充服务 - *

定时从 bg_company 捞取待完善数据,调用 AI 补充公司信息

- *

依赖:AiChatAbility(AI调用)、DictCacheService(行业列表/地区匹配)、CompanyCleanTransactionService(事务操作)

- *

使用表:bg_company(读取/更新)

- * - * @author zk - */ -@Slf4j -@Service -public class CompanyCleanService { - - @Autowired - private AiChatAbility aiChatAbility; - - @Autowired - private DictCacheService dictCacheService; - - @Autowired - private CompanyCleanTransactionService companyCleanTransactionService; - - @Autowired - private CompanyMapper companyMapper; - - @Value("${app.company-clean.batch-size:10}") - private int batchSize; - - @Value("${app.company-clean.thread-pool-size:3}") - private int threadPoolSize; - - private ExecutorService executorService; - - /** 初始化线程池 */ - @javax.annotation.PostConstruct - public void init() { - executorService = Executors.newFixedThreadPool(threadPoolSize); - } - - /** - * 任务C:公司数据补充(每小时) - *

1. 批量锁定待完善公司 2. 多线程并发调用AI补充 3. 回填数据

- */ - @Scheduled(cron = "0 */1 * * * ?") - public void cleanCompany() { - List companyList = companyCleanTransactionService.lockBatch(batchSize); - if (companyList.isEmpty()) { - return; - } - log.info("公司补充:锁定{}条数据", companyList.size()); - - // 多线程并发处理 - for (Company company : companyList) { - executorService.submit(() -> { - try { - cleanOne(company); - } catch (Exception e) { - log.error("公司补充异常, id={}, shortName={}", company.getId(), company.getShortName(), e); - } - }); - } - } - - /** - * 任务D:公司僵尸恢复(每小时,与任务C错开30分钟) - *

将超时10分钟仍在补充中的数据重置为待完善

- */ - @Scheduled(cron = "0 30 */1 * * ?") - public void recoverZombie() { - int recovered = companyMapper.update(null, - new LambdaUpdateWrapper() - .set(Company::getStatus, 0) - .eq(Company::getStatus, 3) - .lt(Company::getUpdateTime, Instant.now().minusSeconds(600))); - - if (recovered > 0) { - log.info("公司僵尸恢复:重置{}条数据", recovered); - } - } - - /** - * 补充单条公司数据 - *

1. 拼prompt调AI 2. 解析结果 3. 回填数据

- */ - public void cleanOne(Company company) { - log.info("公司补充开始, id={}, shortName={}", company.getId(), company.getShortName()); - - String systemPrompt = buildSystemPrompt(); - String userMessage = buildUserMessage(company.getShortName()); - - String aiResponse = aiChatAbility.chat(systemPrompt, userMessage); - - try { - // 去掉可能的 markdown 代码块标记 - String json = aiResponse.trim(); - if (json.startsWith("```")) { - json = json.replaceAll("^```\\w*\\n?", "").replaceAll("\\n?```$", "").trim(); - } - // 清除控制字符(Tab等),保留换行符 - json = json.replaceAll("[\\x00-\\x08\\x0B\\x0C\\x0E-\\x1F]", ""); - - JsonNode root = HttpTool.objectMapper.readTree(json); - - // valid 校验:AI不认识该公司 - if (!root.path("valid").asBoolean(false)) { - log.info("公司补充:AI不认识该公司, id={}, shortName={}", company.getId(), company.getShortName()); - companyCleanTransactionService.updateCompanyStatus(company.getId(), 4); - return; - } - - // 回填数据 - companyCleanTransactionService.saveCompanyData(root, company); - log.info("公司补充完成, id={}, shortName={}", company.getId(), company.getShortName()); - - } catch (Exception e) { - log.error("公司AI返回解析失败, id={}, shortName={}, response={}", - company.getId(), company.getShortName(), aiResponse, e); - // 保持 status=3,由僵尸恢复任务重置 - } - } - - /** 构建系统提示词 */ - private String buildSystemPrompt() { - return """ - 你是一个企业信息补充助手。根据提供的公司简称,补充该公司的详细信息。 - - 返回JSON格式要求: - { - "valid": true/false, - "name": "公司全称", - "city": "总部所在城市,精确到市", - "companyType": "企业类型", - "industryId": 行业ID, - "tags": ["公司标签,最多5个"], - "summary": "一句话简介,100字以内", - "description": "公司详细描述,500字以内", - "foundedYear": "成立年份", - "address": "总部/注册地址", - "scale": "企业规模", - "website": "官网地址", - "financingStage": "融资状态", - "latestValuation": "最新估值", - "news": ["相关新闻,最多3条,每条50字以内"] - } - - 规则: - 1. 如果不认识该公司,返回 {"valid": false} - 2. name 根据公司简称推断完整的企业注册名称 - 3. companyType 取值:上市企业、独角兽、国企、央企、民营企业、外资企业、合资企业、事业单位、其他 - 4. industryId 必须从给定行业列表中选择,不确定则null - 5. scale 取值:少于50人、50-150人、150-500人、500-1000人、1000-5000人、5000-10000人、10000人以上 - 6. tags 体现公司核心业务特征,最多5个 - 7. news 基于你的知识提供该公司最新的3条相关新闻,每条50字以内 - 8. latestValuation 知道就提供,不知道则null - 9. 不确定的字段返回null,不要编造 - 10. 字符串值中不允许出现Tab、换行等控制字符 - 11. 只返回JSON,不要其他内容 - """; - } - - /** 构建用户消息 */ - private String buildUserMessage(String shortName) { - return "【公司简称】\n" + shortName + - "\n\n【行业列表】\n" + dictCacheService.getIndustryText(); - } -} diff --git a/manager/src/main/java/org/jiayunet/service/CompanyCleanTransactionService.java b/manager/src/main/java/org/jiayunet/service/CompanyCleanTransactionService.java deleted file mode 100644 index 31ab1ab..0000000 --- a/manager/src/main/java/org/jiayunet/service/CompanyCleanTransactionService.java +++ /dev/null @@ -1,165 +0,0 @@ -package org.jiayunet.service; - -import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; -import com.fasterxml.jackson.databind.JsonNode; -import lombok.extern.slf4j.Slf4j; -import org.jiayunet.mapper.CompanyMapper; -import org.jiayunet.pojo.po.Company; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; - -/** - * 公司数据补充事务服务 - *

独立出来解决 @Transactional 同类自调用失效问题

- *

依赖:CompanyMapper、DictCacheService(地区匹配、行业校验)

- *

使用表:bg_company(更新)

- * - * @author zk - */ -@Slf4j -@Service -public class CompanyCleanTransactionService { - - @Autowired - private CompanyMapper companyMapper; - - @Autowired - private DictCacheService dictCacheService; - - /** - * 回填公司数据(事务) - *

1. 解析AI返回的各字段 2. 地区匹配 3. 行业校验 4. 更新bg_company

- */ - @Transactional(rollbackFor = Exception.class) - public void saveCompanyData(JsonNode root, Company company) { - // name:AI根据shortName推断的全称 - String name = root.path("name").asText(null); - if (name != null && !name.isBlank()) { - company.setName(name); - } - - // regionCode:AI返回城市名,Java侧匹配 - String city = root.path("city").asText(null); - if (city != null && !city.isBlank()) { - String regionCode = dictCacheService.matchRegionCode(city); - company.setRegionCode(regionCode); - } - - // companyType - String companyType = root.path("companyType").asText(null); - if (companyType != null && !"null".equals(companyType)) { - company.setCompanyType(companyType); - } - - // industryId:校验是否存在于行业列表中 - long industryId = root.path("industryId").asLong(0); - company.setIndustryId(industryId > 0 ? industryId : null); - - // tags:JSON数组 - JsonNode tagsNode = root.path("tags"); - if (tagsNode.isArray() && !tagsNode.isEmpty()) { - List tags = new ArrayList<>(); - tagsNode.forEach(node -> tags.add(node.asText())); - company.setTags(tags); - } - - // summary - String summary = root.path("summary").asText(null); - if (summary != null && !"null".equals(summary)) { - company.setSummary(summary); - } - - // description - String description = root.path("description").asText(null); - if (description != null && !"null".equals(description)) { - company.setDescription(description); - } - - // foundedYear - String foundedYear = root.path("foundedYear").asText(null); - if (foundedYear != null && !"null".equals(foundedYear)) { - company.setFoundedYear(foundedYear); - } - - // address - String address = root.path("address").asText(null); - if (address != null && !"null".equals(address)) { - company.setAddress(address); - } - - // scale - String scale = root.path("scale").asText(null); - if (scale != null && !"null".equals(scale)) { - company.setScale(scale); - } - - // website - String website = root.path("website").asText(null); - if (website != null && !"null".equals(website)) { - company.setWebsite(website); - } - - // financingStage - String financingStage = root.path("financingStage").asText(null); - if (financingStage != null && !"null".equals(financingStage)) { - company.setFinancingStage(financingStage); - } - - // latestValuation - String latestValuation = root.path("latestValuation").asText(null); - if (latestValuation != null && !"null".equals(latestValuation)) { - company.setLatestValuation(latestValuation); - } - - // news:JSON数组 - JsonNode newsNode = root.path("news"); - if (newsNode.isArray() && !newsNode.isEmpty()) { - List news = new ArrayList<>(); - newsNode.forEach(node -> news.add(node.asText())); - company.setNews(news); - } - - // 更新状态和时间 - company.setStatus(1); - company.setUpdateTime(Instant.now()); - - companyMapper.updateById(company); - } - - /** 更新公司状态 */ - public void updateCompanyStatus(Long id, int status) { - companyMapper.update(null, - new LambdaUpdateWrapper() - .set(Company::getStatus, status) - .set(Company::getUpdateTime, Instant.now()) - .eq(Company::getId, id)); - } - - /** - * 原子锁定一批待完善公司(事务内 SELECT FOR UPDATE + UPDATE 状态) - *

行锁保证并发安全,其他线程会阻塞直到事务提交

- * - * @return 锁定成功的公司列表,可能为空 - */ - @Transactional(rollbackFor = Exception.class) - public List lockBatch(int batchSize) { - List companyList = companyMapper.selectForUpdate(batchSize); - if (companyList.isEmpty()) { - return companyList; - } - - List ids = companyList.stream().map(Company::getId).toList(); - companyMapper.update(null, - new LambdaUpdateWrapper() - .set(Company::getStatus, 3) - .set(Company::getUpdateTime, Instant.now()) - .in(Company::getId, ids)); - - return companyList; - } -} diff --git a/manager/src/main/java/org/jiayunet/service/DictCacheService.java b/manager/src/main/java/org/jiayunet/service/DictCacheService.java index 4f30aae..df52415 100644 --- a/manager/src/main/java/org/jiayunet/service/DictCacheService.java +++ b/manager/src/main/java/org/jiayunet/service/DictCacheService.java @@ -116,15 +116,6 @@ public class DictCacheService { regionList.size(), majorCategoryList.size(), majorLeafCount); } - /** 获取岗位分类文本(叶子节点,带父级路径,逗号分隔) */ - public String getJobCategoryText() { - return jobCategoryText; - } - - /** 获取行业文本(叶子节点,带父级路径,逗号分隔) */ - public String getIndustryText() { - return industryText; - } /** 获取专业分类文本(三级叶子节点,带父级路径,逗号分隔) */ public String getMajorCategoryText() { diff --git a/manager/src/main/java/org/jiayunet/service/JobCleanService.java b/manager/src/main/java/org/jiayunet/service/JobCleanService.java deleted file mode 100644 index 3fac16d..0000000 --- a/manager/src/main/java/org/jiayunet/service/JobCleanService.java +++ /dev/null @@ -1,385 +0,0 @@ -package org.jiayunet.service; - -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; -import com.fasterxml.jackson.databind.JsonNode; -import lombok.extern.slf4j.Slf4j; -import org.jiayunet.ai.AiChatAbility; -import org.jiayunet.ai.AiResponseCleanTool; -import com.baomidou.mybatisplus.core.toolkit.IdWorker; -import org.jiayunet.mapper.AppJobDataMapper; -import org.jiayunet.mapper.JobMapper; -import org.jiayunet.mapper.SkillTagMapper; -import org.jiayunet.pojo.po.AppJobData; -import org.jiayunet.pojo.po.Job; -import org.jiayunet.pojo.po.SkillTag; -import org.jiayunet.tool.HttpTool; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Service; - -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -/** - * 岗位清洗服务 - *

定时从 app_job_data 捞取待清洗数据,调用 AI 清洗后写入业务表

- *

依赖:AiChatAbility(AI调用)、DictCacheService(字典缓存)、JobCleanTransactionService(事务操作)

- *

使用表:app_job_data(读取/更新状态)、bg_job(去重查询/更新专业)、bg_skill_tag(技能入库)

- * - * @author zk - */ -@Slf4j -@Service -public class JobCleanService { - - @Autowired - private AiChatAbility aiChatAbility; - - @Autowired - private DictCacheService dictCacheService; - - @Autowired - private JobCleanTransactionService jobCleanTransactionService; - - @Autowired - private AppJobDataMapper appJobDataMapper; - - @Autowired - private JobMapper jobMapper; - - @Autowired - private SkillTagMapper skillTagMapper; - - @Value("${app.job-clean.batch-size:20}") - private int batchSize; - - @Value("${app.job-clean.thread-pool-size:5}") - private int threadPoolSize; - - private ExecutorService executorService; - - /** 初始化线程池 */ - @javax.annotation.PostConstruct - public void init() { - executorService = Executors.newFixedThreadPool(threadPoolSize); - } - - /** - * 定时任务A:岗位清洗(每3分钟) - *

1. 批量锁定待清洗数据 2. 多线程并发调用AI清洗 3. 写入业务表

- */ - @Scheduled(cron = "0 */3 * * * ?") - public void cleanJob() { - List dataList = jobCleanTransactionService.lockBatch(batchSize); - if (dataList.isEmpty()) { - return; - } - log.info("岗位清洗:锁定{}条数据", dataList.size()); - - // 多线程并发处理 - for (AppJobData data : dataList) { - executorService.submit(() -> { - try { - cleanOne(data); - } catch (Exception e) { - log.error("岗位清洗异常, id={}", data.getId(), e); - // 异常时保持 clean_status=1,由僵尸恢复任务重置 - } - }); - } - } - - /** - * 定时任务B:僵尸恢复(每30分钟) - *

将超时10分钟仍在清洗中的数据重置为待清洗

- */ - @Scheduled(cron = "0 */30 * * * ?") - public void recoverZombie() { - int recovered = appJobDataMapper.update(null, - new LambdaUpdateWrapper() - .set(AppJobData::getCleanStatus, "pending") - .set(AppJobData::getCleanStartedAt, null) - .eq(AppJobData::getCleanStatus, "cleaning") - .lt(AppJobData::getCleanStartedAt, Instant.now().minusSeconds(600))); - - if (recovered > 0) { - log.info("僵尸恢复:重置{}条数据", recovered); - } - } - - /** - * 清洗单条岗位数据 - *

1. 前置校验 2. 第一次AI提取结构化信息 3. 写入业务表 4. 第二次AI匹配专业 5. 第三次AI提取技能

- */ - public void cleanOne(AppJobData data) { - // 1. 前置校验 - if (data.getDescription() == null || data.getDescription().length() < 20) { - jobCleanTransactionService.updateCleanStatus(data.getId(), "discarded"); - return; - } - - // 2. 第一次AI:提取岗位结构化信息 - String systemPrompt = buildSystemPrompt(); - String userMessage = buildUserMessage(data); - String aiResponse = aiChatAbility.chat(systemPrompt, userMessage); - - // 3. 解析JSON - try { - String json = AiResponseCleanTool.clean(aiResponse); - JsonNode root = HttpTool.objectMapper.readTree(json); - - // valid 校验 - if (!root.path("valid").asBoolean(false)) { - jobCleanTransactionService.updateCleanStatus(data.getId(), "discarded"); - return; - } - - // 4. 去重检查 - String sourceId = String.valueOf(data.getId()); - Long existJob = jobMapper.selectCount(new LambdaQueryWrapper().eq(Job::getSourceId, sourceId)); - if (existJob > 0) { - jobCleanTransactionService.updateCleanStatus(data.getId(), "cleaned"); - return; - } - - // 5. 公司处理(加锁防并发重复) - String companyShortName = root.path("companyShortName").asText(""); - if (companyShortName.isBlank()) { - companyShortName = data.getCompany(); - } - Long companyId = jobCleanTransactionService.findOrCreateCompany(companyShortName); - - // 6. 地区处理 - List regionCodes = new ArrayList<>(); - JsonNode citiesNode = root.path("cities"); - if (citiesNode.isArray()) { - for (JsonNode city : citiesNode) { - String code = dictCacheService.matchRegionCode(city.asText()); - if (code != null) { - regionCodes.add(code); - } - } - } - - // 7. 写入业务表 - jobCleanTransactionService.saveJobData(root, data, companyId, sourceId, regionCodes); - - // 拿到刚插入的 job - Job insertedJob = jobMapper.selectOne(new LambdaQueryWrapper().eq(Job::getSourceId, sourceId).last("LIMIT 1")); - if (insertedJob == null) { - return; - } - - // 8. 第二次AI:专业匹配(失败不影响岗位入库) - try { - String title = root.path("title").asText(""); - String desc = root.path("description").asText(""); - String req = root.path("requirement").asText(""); - matchMajor(insertedJob.getId(), title, desc, req); - } catch (Exception ex) { - log.warn("专业匹配失败, id={}", data.getId(), ex); - } - - // 9. 第三次AI:技能提取(失败不影响岗位入库) - try { - String title = root.path("title").asText(""); - String desc = root.path("description").asText(""); - String req = root.path("requirement").asText(""); - extractSkillTags(insertedJob.getId(), title, desc, req); - } catch (Exception ex) { - log.warn("技能提取失败, id={}", data.getId(), ex); - } - - } catch (Exception e) { - log.error("AI 返回解析失败, id={}, response={}", data.getId(), aiResponse, e); - } - } - - /** - * 第二次AI调用:匹配专业 + 专业敏感度 - *

传入岗位信息和三级专业分类列表,AI返回 requiredMajorIds + majorSensitivity → 更新 bg_job

- */ - private void matchMajor(Long jobId, String title, String description, String requirement) { - String systemPrompt = """ - 你是一个岗位专业匹配助手。根据岗位信息,判断该岗位对专业的要求。 - 返回JSON格式: - { - "requiredMajorIds": [专业ID数组,从专业列表中选择最相关的,最多3个,无明确要求则空数组], - "majorSensitivity": 0-2的数字(0=专业不限 1=优先相关专业 2=强制要求专业) - } - 规则: - 1. 只能从给定专业列表中选择ID - 2. 根据岗位描述判断专业敏感度:明确写"XX专业"→2,写"相关专业优先"→1,未提及→0 - 3. majorSensitivity为0时,requiredMajorIds应为空数组 - 4. 只返回JSON,不要其他内容 - """; - - String userMessage = "【岗位信息】\n标题: " + title + "\n职责: " + description + "\n要求: " + requirement + - "\n\n【专业分类列表】\n" + dictCacheService.getMajorCategoryText(); - - String aiResponse = aiChatAbility.chat(systemPrompt, userMessage); - String json = AiResponseCleanTool.clean(aiResponse); - - try { - JsonNode root = HttpTool.objectMapper.readTree(json); - int majorSensitivity = root.path("majorSensitivity").asInt(0); - - List majorIds = new ArrayList<>(); - JsonNode idsNode = root.path("requiredMajorIds"); - if (idsNode.isArray()) { - for (JsonNode node : idsNode) { - long id = node.asLong(0); - if (id > 0) { - majorIds.add(id); - } - } - } - - // 更新 bg_job - jobCleanTransactionService.updateJobMajor(jobId, majorIds.isEmpty() ? null : majorIds, majorSensitivity); - } catch (Exception e) { - log.warn("专业匹配AI返回解析失败: {}", json, e); - } - } - - /** - * 第三次AI调用:自由提取技能标签 - *

AI返回技能名数组 → INSERT IGNORE 入 bg_skill_tag → 查ID → 写关联表

- */ - private void extractSkillTags(Long jobId, String title, String description, String requirement) { - String systemPrompt = """ - 你是一个技能提取助手。根据岗位信息,提取该岗位要求的核心专业能力和工具技能。 - 返回JSON数组格式,如:["java", "spring boot", "mysql", "redis"] - 规则: - 1. 统一使用小写字母 - 2. 只保留核心词,去掉多余修饰(如"plc编程"→"plc","c语言"→"c","cad制图"→"cad") - 3. 同一技能只保留最具体的表述,不要同时出现上位词和下位词(如有"机械设计"就不要再出"机械") - 4. 提取范围包括:技术栈、专业领域知识、行业工具、专业资质能力等 - 5. 不提取纯软技能(如沟通能力、团队协作、学习能力、积极主动) - 6. 不提取过于宽泛的标签(如"办公软件"、"windows") - 7. 如果岗位完全没有专业能力要求(纯看态度和素质),返回空数组 [] - 8. 最多15个,按重要性排序 - 9. 只返回JSON数组,不要其他内容 - 示例1(技术岗): - 输入:需要熟悉Java、Spring Boot框架,了解MySQL数据库和Redis缓存,会PLC编程 - 输出:["java", "spring boot", "mysql", "redis", "plc"] - 示例2(财务岗): - 输入:负责费用管理与审核,月度经营利润分析,要求财务管理、会计学相关专业 - 输出:["财务管理", "会计", "经营分析"] - 示例3(制造岗): - 输入:负责模具开发,熟悉CAD制图,了解注塑成型工艺,有SolidWorks经验 - 输出:["模具", "cad", "注塑", "solidworks"] - 示例4(纯素质岗): - 输入:具备较强的沟通能力和创新意识,积极主动,专业不限 - 输出:[] - """; - - String userMessage = "【岗位信息】\n标题: " + title + "\n职责: " + description + "\n要求: " + requirement; - - String aiResponse = aiChatAbility.chat(systemPrompt, userMessage); - String json = AiResponseCleanTool.clean(aiResponse); - - try { - JsonNode arrayNode = HttpTool.objectMapper.readTree(json); - if (!arrayNode.isArray() || arrayNode.isEmpty()) { - return; - } - - List skillTagIds = new ArrayList<>(); - for (JsonNode node : arrayNode) { - String skillName = node.asText("").trim().toLowerCase(); - if (skillName.isBlank() || skillName.length() > 50) { - continue; - } - - // INSERT IGNORE + SELECT 获取ID - Long tagId = findOrCreateSkillTag(skillName); - if (tagId != null && !skillTagIds.contains(tagId)) { - skillTagIds.add(tagId); - } - } - - if (!skillTagIds.isEmpty()) { - jobCleanTransactionService.saveSkillTagRelations(jobId, skillTagIds); - } - } catch (Exception e) { - log.warn("技能提取AI返回解析失败: {}", json, e); - } - } - - /** - * 查找或创建技能标签(依靠数据库唯一索引保证并发安全) - *

INSERT IGNORE 后 SELECT,避免加锁

- */ - private Long findOrCreateSkillTag(String name) { - // 先尝试插入(忽略重复),ID 由 IdWorker 生成 - skillTagMapper.insertIgnore(IdWorker.getId(), name); - - // 再查询拿ID - SkillTag tag = skillTagMapper.selectOne(new LambdaQueryWrapper().eq(SkillTag::getName, name).last("LIMIT 1")); - return tag != null ? tag.getId() : null; - } - - /** 构建第一次AI的系统提示词 */ - private String buildSystemPrompt() { - return """ - 你是一个岗位数据清洗助手。请根据提供的原始岗位数据,提取并结构化为JSON格式。 - - 返回JSON格式要求: - { - "valid": true/false, - "title": "岗位名称", - "salary": "标准化薪资,如10-20K、面议,无效则null", - "education": 0-4的数字(0=不限 1=大专 2=本科 3=硕士 4=博士), - "minExperience": 最低工作年限数字(不要求则0), - "employmentType": 0或1(0=全职 1=兼职,默认0), - "categoryId": 岗位分类ID(必选,从分类列表中选最接近的), - "requiredIndustryId": 行业ID(仅当明确提到行业经验要求时设置,列表中无完全匹配则选最相似的,未提到则null), - "description": "岗位职责,保持原文风格,格式化展示", - "requirement": "任职要求,保持原文风格,格式化展示", - "bonus": "加分项,无则null", - "tags": ["核心职能标签,最多5个,如数据分析、产品策略"], - "skillTags": ["技能关键词,最多8个,如Java、Spring Boot"], - "companyShortName": "简洁的公司简称,如字节跳动、中国平安", - "cities": ["工作城市列表,精确到市"] - } - - 规则: - 1. description/requirement/bonus 均从原始的 description+experience+education 内容中提取,保持原文风格 - 2. 岗位标题不存在时,从描述中归纳生成 - 3. 薪资标准化为 10-20K、20K、面议 等格式,无效或空则null - 4. categoryId 必须从分类列表中选一个,不允许为null - 5. requiredIndustryId 仅当描述中明确提到行业经验要求时设置 - 6. tags 是核心职能标签(如数据分析、团队协作),最多5个 - 7. skillTags 是技能关键词(如Java、MySQL),最多8个 - 8. companyShortName 去掉地区后缀、招聘后缀、括号内容,保持简洁 - 9. 字符串值中不允许出现Tab、换行等控制字符,用空格或中文标点替代 - 10. 只返回JSON,不要其他内容 - """; - } - - /** 构建第一次AI的用户消息 */ - private String buildUserMessage(AppJobData data) { - StringBuilder sb = new StringBuilder(); - sb.append("【原始数据】\n"); - sb.append("岗位名称: ").append(nullToEmpty(data.getJobTitle())).append("\n"); - sb.append("薪资: ").append(nullToEmpty(data.getSalary())).append("\n"); - sb.append("工作地点: ").append(nullToEmpty(data.getLocation())).append("\n"); - sb.append("公司: ").append(nullToEmpty(data.getCompany())).append("\n"); - sb.append("经验要求: ").append(nullToEmpty(data.getExperience())).append("\n"); - sb.append("学历要求: ").append(nullToEmpty(data.getEducation())).append("\n"); - sb.append("岗位详情: ").append(nullToEmpty(data.getDescription())).append("\n\n"); - sb.append("【岗位分类列表】\n").append(dictCacheService.getJobCategoryText()).append("\n\n"); - sb.append("【行业列表】\n").append(dictCacheService.getIndustryText()); - return sb.toString(); - } - - private String nullToEmpty(String s) { - return s == null ? "" : s; - } -} diff --git a/manager/src/main/java/org/jiayunet/service/JobCleanTransactionService.java b/manager/src/main/java/org/jiayunet/service/JobCleanTransactionService.java deleted file mode 100644 index da65ab7..0000000 --- a/manager/src/main/java/org/jiayunet/service/JobCleanTransactionService.java +++ /dev/null @@ -1,203 +0,0 @@ -package org.jiayunet.service; - -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; -import com.fasterxml.jackson.databind.JsonNode; -import lombok.extern.slf4j.Slf4j; -import org.jiayunet.mapper.AppJobDataMapper; -import org.jiayunet.mapper.CompanyMapper; -import org.jiayunet.mapper.JobMapper; -import org.jiayunet.mapper.JobRegionRelationMapper; -import org.jiayunet.mapper.JobSkillTagRelationMapper; -import org.jiayunet.pojo.po.AppJobData; -import org.jiayunet.pojo.po.Company; -import org.jiayunet.pojo.po.Job; -import org.jiayunet.pojo.po.JobRegionRelation; -import org.jiayunet.pojo.po.JobSkillTagRelation; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -import java.time.Instant; -import java.util.List; - -/** - * 岗位清洗事务服务 - *

独立出来解决 @Transactional 同类自调用失效问题

- *

依赖:JobMapper、CompanyMapper、JobRegionRelationMapper、JobSkillTagRelationMapper、AppJobDataMapper

- *

使用表:bg_job(写入/更新)、bg_company(查询/创建)、bg_job_region_relation(写入)、bg_job_skill_tag_relation(写入)、app_job_data(更新状态)

- * - * @author zk - */ -@Slf4j -@Service -public class JobCleanTransactionService { - - @Autowired - private JobMapper jobMapper; - - @Autowired - private CompanyMapper companyMapper; - - @Autowired - private JobRegionRelationMapper jobRegionRelationMapper; - - @Autowired - private AppJobDataMapper appJobDataMapper; - - @Autowired - private JobSkillTagRelationMapper jobSkillTagRelationMapper; - - /** - * 写入 bg_job + bg_job_region_relation + 更新 clean_status(短事务) - */ - @Transactional(rollbackFor = Exception.class) - public void saveJobData(JsonNode root, AppJobData data, Long companyId, String sourceId, List regionCodes) { - Job job = new Job(); - job.setTitle(root.path("title").asText("")); - job.setCompanyId(companyId); - job.setCategoryId(root.path("categoryId").asLong(0)); - job.setEmploymentType(root.path("employmentType").asInt(0)); - job.setDescription(root.path("description").asText("")); - job.setRequirement(root.path("requirement").asText("")); - job.setBonus(root.path("bonus").asText(null)); - - // 解析 JSON 数组为 List - JsonNode tagsNode = root.path("tags"); - if (tagsNode.isArray()) { - List tags = new java.util.ArrayList<>(); - tagsNode.forEach(node -> tags.add(node.asText())); - job.setTags(tags); - } - - JsonNode skillTagsNode = root.path("skillTags"); - if (skillTagsNode.isArray()) { - List skillTags = new java.util.ArrayList<>(); - skillTagsNode.forEach(node -> skillTags.add(node.asText())); - job.setSkillTags(skillTags); - } - - String salary = root.path("salary").asText(null); - job.setSalary("null".equals(salary) ? null : salary); - - job.setEducation(root.path("education").asInt(0)); - job.setMinExperience(root.path("minExperience").asInt(0)); - - Long requiredIndustryId = root.path("requiredIndustryId").asLong(0); - job.setRequiredIndustryId(requiredIndustryId == 0 ? null : requiredIndustryId); - - // 从原始数据透传 recruit_category 和 expire_at - job.setRecruitCategory(data.getRecruitCategory()); - job.setExpireAt(data.getExpireAt()); - - job.setSourceUrl(data.getDetailUrl()); - job.setSourceId(sourceId); - job.setStatus(0); - job.setCreateTime(Instant.now()); - job.setUpdateTime(Instant.now()); - - jobMapper.insert(job); - - // 写入岗位-地区关联(批量插入) - if (!regionCodes.isEmpty()) { - List relations = regionCodes.stream().map(regionCode -> { - JobRegionRelation relation = new JobRegionRelation(); - relation.setJobId(job.getId()); - relation.setRegionCode(regionCode); - relation.setCreateTime(Instant.now()); - return relation; - }).toList(); - jobRegionRelationMapper.batchInsert(relations); - } - - // 更新清洗状态为已清洗 - updateCleanStatus(data.getId(), "cleaned"); - } - - /** - * 查找或创建公司(加锁防并发重复) - *

按 short_name 查询,不存在则创建一条待完善记录

- */ - public synchronized Long findOrCreateCompany(String shortName) { - Company company = companyMapper.selectOne( - new LambdaQueryWrapper() - .eq(Company::getShortName, shortName) - .last("LIMIT 1")); - - if (company != null) { - return company.getId(); - } - - Company newCompany = new Company(); - newCompany.setName(shortName); - newCompany.setShortName(shortName); - newCompany.setStatus(0); - newCompany.setCreateTime(Instant.now()); - newCompany.setUpdateTime(Instant.now()); - companyMapper.insert(newCompany); - return newCompany.getId(); - } - - /** - * 写入岗位-技能标签关联(批量插入) - */ - @Transactional(rollbackFor = Exception.class) - public void saveSkillTagRelations(Long jobId, List skillTagIds) { - List relations = skillTagIds.stream().map(skillTagId -> { - JobSkillTagRelation relation = new JobSkillTagRelation(); - relation.setJobId(jobId); - relation.setSkillTagId(skillTagId); - relation.setCreateTime(Instant.now()); - return relation; - }).toList(); - jobSkillTagRelationMapper.batchInsert(relations); - } - - /** - * 更新岗位的专业要求和专业敏感度 - */ - public void updateJobMajor(Long jobId, List requiredMajorIds, Integer majorSensitivity) { - Job job = new Job(); - job.setId(jobId); - job.setRequiredMajorIds(requiredMajorIds); - job.setMajorSensitivity(majorSensitivity); - job.setUpdateTime(Instant.now()); - jobMapper.updateById(job); - } - - /** 更新清洗状态 */ - public void updateCleanStatus(Long id, String status) { - LambdaUpdateWrapper wrapper = new LambdaUpdateWrapper() - .set(AppJobData::getCleanStatus, status) - .eq(AppJobData::getId, id); - - if ("cleaned".equals(status)) { - wrapper.set(AppJobData::getCleanedAt, Instant.now()); - } - - appJobDataMapper.update(null, wrapper); - } - - /** - * 原子锁定一批待清洗数据(事务内 SELECT FOR UPDATE + UPDATE 状态) - *

行锁保证并发安全,其他线程会阻塞直到事务提交

- * - * @return 锁定成功的数据列表,可能为空 - */ - @Transactional(rollbackFor = Exception.class) - public List lockBatch(int batchSize) { - List dataList = appJobDataMapper.selectForUpdate(batchSize); - if (dataList.isEmpty()) { - return dataList; - } - - List ids = dataList.stream().map(AppJobData::getId).toList(); - appJobDataMapper.update(null, - new LambdaUpdateWrapper() - .set(AppJobData::getCleanStatus, "cleaning") - .set(AppJobData::getCleanStartedAt, Instant.now()) - .in(AppJobData::getId, ids)); - - return dataList; - } -}