diff --git a/.kiro/steering/数据清洗方案.md b/.kiro/steering/数据清洗方案.md index 8e6fdf4..742a4a6 100644 --- a/.kiro/steering/数据清洗方案.md +++ b/.kiro/steering/数据清洗方案.md @@ -26,7 +26,7 @@ inclusion: manual 1. ✅ 岗位清洗触发逻辑 2. ✅ 岗位清洗逻辑 3. ✅ 公司数据触发逻辑 -4. ⬜ 公司数据补充逻辑(待定API后补充) +4. ✅ 公司数据补充逻辑(AI补充) --- @@ -53,7 +53,7 @@ CREATE INDEX idx_clean_status ON app_job_data (clean_status); #### 任务A:岗位清洗任务(高频,每5分钟) -1. 批量锁定:`UPDATE app_job_data SET clean_status=1 WHERE clean_status=0 AND is_valid=1 LIMIT N`(原子操作,防止多线程重复捞取) +1. 批量锁定(事务内 `SELECT ... FOR UPDATE` + `UPDATE`):先在事务内对 `clean_status=0 AND is_valid=1` 的行加行锁并查出数据,再更新 `clean_status=1`,事务提交后释放锁。行锁保证并发安全,其他线程会阻塞直到事务提交 2. 将锁定的数据丢入线程池,多线程并发调用 AI API 清洗 3. 每条处理完毕后,单独更新 `clean_status` 为 2(已入库)或 3(已丢弃) 4. 单条写入事务:bg_job 入库 + clean_status 更新放在同一个短事务中,保证一致性 @@ -221,25 +221,29 @@ CREATE TABLE bg_job_skill_tag_relation ( ### 3.1 状态扩展 -`bg_company.status` 扩展为4个值: +`bg_company.status` 扩展为5个值: - 0=待完善:岗位清洗时自动创建的公司,只有 short_name -- 1=已完善:工商API补充完成 +- 1=已完善:AI补充完成 - 2=禁用:人工标记禁用 -- 3=补充中:定时任务已锁定,正在调用工商API +- 3=补充中:定时任务已锁定,正在调用AI +- 4=补充失败:AI明确不认识该公司,不再自动重试 ### 3.2 两个定时任务(与岗位清洗同一套模式) #### 任务C:公司数据补充任务(低频,每小时) -1. 批量锁定(原子操作): +1. 批量锁定(事务内 `SELECT ... FOR UPDATE` + `UPDATE`): ```sql -UPDATE bg_company SET status=3, update_time=NOW() WHERE status=0 LIMIT N +-- 事务内执行,行锁保证并发安全 +SELECT * FROM bg_company WHERE status=0 LIMIT N FOR UPDATE; +UPDATE bg_company SET status=3, update_time=NOW() WHERE id IN (...); ``` ⚠️ 锁定时必须同时更新 `update_time`,因为 `bg_company` 的 `update_time` 不像 `app_job_data.updated_at` 那样由数据库自动维护,需要 Java 侧手动设值。如果不更新,后续僵尸恢复任务无法正确判断超时。 -2. 将锁定的数据丢入线程池,多线程并发调用工商API +2. 将锁定的数据丢入线程池,多线程并发调用AI补充 3. 每条处理完毕后,回填公司信息,更新 `status=1`(已完善) -4. 工商API查不到或返回异常 → 保持 `status=3`,由僵尸恢复任务重置 +4. AI明确不认识该公司(valid=false)→ 更新 `status=4`(补充失败) +5. AI调用异常或解析失败 → 保持 `status=3`,由僵尸恢复任务重置 #### 任务D:公司僵尸恢复任务(低频,每小时,与任务C错开) @@ -257,7 +261,7 @@ UPDATE bg_company SET status=0 WHERE status=3 AND update_time < NOW() - INTERVAL |--------|----------|----------| | 状态字段 | app_job_data.clean_status | bg_company.status | | 锁定值 | 1=清洗中 | 3=补充中 | -| 完成值 | 2=已入库 / 3=已丢弃 | 1=已完善 | +| 完成值 | 2=已入库 / 3=已丢弃 | 1=已完善 / 4=补充失败 | | 时间字段 | updated_at(数据库自动) | update_time(Java手动设值) | | 锁定时是否需手动更新时间 | 不需要 | **需要**,否则僵尸恢复无法判断超时 | | 触发频率 | 每5分钟 | 每小时 | @@ -267,47 +271,98 @@ UPDATE bg_company SET status=0 WHERE status=3 AND update_time < NOW() - INTERVAL | 决策点 | 结论 | 原因 | |--------|------|------| -| 是否与岗位清洗同步触发 | 否,独立定时任务 | 外部API不同,频率不同,失败场景不同 | -| 触发模式 | 复用岗位清洗的"原子锁定+僵尸恢复"模式 | 统一架构,代码可复用 | +| 是否与岗位清洗同步触发 | 否,独立定时任务 | AI调用场景不同,频率不同 | +| 触发模式 | 复用岗位清洗的"SELECT FOR UPDATE + 僵尸恢复"模式 | 统一架构,行锁保证并发安全 | | 锁定时是否更新时间 | 是 | bg_company.update_time 非数据库自动维护,不更新则僵尸恢复失效 | -| 补充频率 | 每小时 | 公司数据量少,工商API可能有频率限制 | +| 补充频率 | 每小时 | 公司数据量少,AI调用成本可控 | --- -## 四、公司数据补充逻辑(待定API后补充) +## 四、公司数据补充逻辑(AI补充) -### 4.1 补充流程概要 +### 4.1 方案说明 -1. 用 `short_name`(公司简称)调用工商API搜索 -2. API返回匹配的企业列表,取最匹配的一条 -3. 回填 `bg_company` 各字段,更新 `status=1` +原计划使用工商API(天眼查等)查询公司信息,但考虑到: +- 公司简称不能很好地查询到精确数据 +- 用AI生成完整企业名字再查API,准确性也无法保证 +- 我们的公司数据来源于招聘平台,基本都是有一定规模的企业,AI训练数据覆盖率高 -### 4.2 需要回填的字段 +因此改为直接使用AI补充公司数据,一次调用返回全部字段。 -| bg_company 字段 | 来源 | 说明 | -|-----------------|------|------| -| name | 工商API | 公司全称 | -| logoUrl | 待定 | 工商API可能不提供,需另外来源 | -| regionCode | 工商API(注册地址) | 匹配 bg_china_regions_code | -| companyType | 工商API | 上市企业、独角兽、国企等 | -| industryId | 工商API(行业分类) | 匹配 bg_industry | -| tags | 工商API / AI | 公司标签,JSON数组 | -| summary | 工商API / AI | 公司简要介绍 | -| description | 工商API | 公司详细描述 | -| foundedYear | 工商API | 成立时间 | -| address | 工商API | 注册地址 | -| scale | 工商API | 企业规模(人数) | -| website | 工商API | 官网地址 | -| financingStage | 工商API / 其他来源 | 融资状态 | -| latestValuation | 工商API / 其他来源 | 最新估值 | -| news | 新闻API(待定) | 新闻动态,JSON数组 | +### 4.2 补充流程 -### 4.3 待定事项 +1. 拿 `short_name`(公司简称),拼 prompt 调 AI +2. prompt 中附带行业列表(`DictCacheService.getIndustryText()`),让 AI 直接返回 `industryId` +3. AI 返回结构化 JSON,包含 `valid` 字段判断是否认识该公司 +4. `valid=false` → 更新 `status=4`(补充失败),结束 +5. `regionCode`:AI 返回城市名,Java 侧 `DictCacheService.matchRegionCode` 匹配 +6. 解析 JSON,回填 `bg_company` 各字段 +7. 更新 `status=1`(已完善) -- [ ] 选定工商信息API(天眼查、企查查、爱企查等) -- [ ] 确认API返回字段与 bg_company 的映射关系 -- [ ] 新闻动态数据来源(工商API是否包含,还是需要单独的新闻API) -- [ ] logoUrl 来源(工商API是否提供) -- [ ] 匹配到多条结果时的处理策略 -- [ ] 查不到结果时的处理策略(保持待完善 or 标记为其他状态) -- [ ] API调用频率限制和成本评估 +### 4.3 AI 返回 JSON 结构 + +```json +{ + "valid": true, + "name": "北京字节跳动科技有限公司", + "city": "北京", + "companyType": "独角兽", + "industryId": 5, + "tags": ["短视频", "人工智能", "社交平台"], + "summary": "全球领先的内容平台和科技公司,旗下拥有抖音、今日头条等产品", + "description": "字节跳动成立于2012年,是一家以技术驱动的全球化互联网公司...", + "foundedYear": "2012", + "address": "北京市海淀区北三环西路27号", + "scale": "10000人以上", + "website": "https://www.bytedance.com", + "financingStage": "已上市", + "latestValuation": "2200亿美元", + "news": [ + "字节跳动2024年营收突破1200亿美元创历史新高", + "TikTok全球月活用户突破15亿大关", + "字节跳动加大AI大模型研发投入布局人工智能赛道" + ] +} +``` + +### 4.4 各字段补充规则 + +| bg_company 字段 | AI返回字段 | 规则 | +|-----------------|-----------|------| +| name | name | 公司全称,AI不确定则null | +| logoUrl | — | AI无法提供,留空 | +| regionCode | city | AI返回城市名,Java侧matchRegionCode匹配 | +| companyType | companyType | 上市企业、独角兽、国企、民营企业、外资企业等 | +| industryId | industryId | 从行业列表中选,不确定则null | +| tags | tags | 公司标签,JSON数组,最多5个 | +| summary | summary | 一句话简介,100字以内 | +| description | description | 公司详细描述,500字以内 | +| foundedYear | foundedYear | 成立年份,如"2012" | +| address | address | 注册/总部地址 | +| scale | scale | 企业规模,如"1000-5000人"、"10000人以上" | +| website | website | 官网地址 | +| financingStage | financingStage | 融资状态,如"A轮"、"已上市"、"不需要融资" | +| latestValuation | latestValuation | 最新估值,AI知道就给,不知道null | +| news | news | 3条相关新闻,每条50字以内,JSON数组 | + +### 4.5 prompt 规则 + +- AI 不认识该公司 → `valid=false`,其他字段不需要返回 +- 不确定的字段返回 null,不要编造 +- `industryId` 必须从给定行业列表中选择 +- `news` 最多3条,每条50字以内,基于AI知识库中最新的信息 +- `tags` 最多5个,体现公司核心业务特征 +- 字符串值中不允许出现Tab、换行等控制字符 +- 只返回JSON,不要其他内容 + +### 4.6 设计决策记录 + +| 决策点 | 结论 | 原因 | +|--------|------|------| +| 数据来源 | AI补充,不用工商API | 公司简称查API不精确,AI对招聘平台企业覆盖率高 | +| AI不认识的公司 | status=4(补充失败),不再自动重试 | 避免无限重试浪费AI调用 | +| logoUrl | 留空 | AI无法提供图片URL | +| news 时效性 | 不要求实时,取AI知识库内最新的3条 | 求职场景不需要实时新闻 | +| latestValuation | AI知道就给,不知道null | 大部分招聘企业AI有数据 | +| regionCode 匹配方式 | AI返回城市名,Java侧匹配 | 复用已有的matchRegionCode逻辑 | +| industryId 匹配方式 | prompt带行业列表,AI直接返回ID | 复用已有的行业列表文本 | diff --git a/.kiro/steering/项目结构说明.md b/.kiro/steering/项目结构说明.md index 3890cd7..d868b94 100644 --- a/.kiro/steering/项目结构说明.md +++ b/.kiro/steering/项目结构说明.md @@ -100,7 +100,7 @@ offerpie/back-end │ │ ├─ UserJobDislike.java # 用户不感兴趣记录表(bg_user_job_dislike) │ │ └─ AppJobData.java # 爬虫岗位原始数据表(app_job_data) │ └─ vo/ # ViewObject(OssUrlVo 等) - └─ service/ # 业务 Service(OssService、SmsService、DictCacheService、JobCleanService、JobCleanTransactionService 等) + └─ service/ # 业务 Service(OssService、SmsService、DictCacheService、JobCleanService、JobCleanTransactionService、CompanyCleanService、CompanyCleanTransactionService 等) ``` > **设计理念** – 业务实体和 Mapper 位于 `manager`,B 端和 C 端共享;C 端特有的注解、切面、权限服务、路由菜单服务位于 `client-api`,避免 B 端误用;`common` 提供统一的技术支撑。 diff --git a/client-api/src/main/resources/application-local.yml b/client-api/src/main/resources/application-local.yml index ecd9045..b947f69 100644 --- a/client-api/src/main/resources/application-local.yml +++ b/client-api/src/main/resources/application-local.yml @@ -87,4 +87,9 @@ app: # 岗位清洗配置 job-clean: batch-size: 20 - thread-pool-size: 5 \ No newline at end of file + thread-pool-size: 5 + + # 公司数据补充配置 + company-clean: + batch-size: 10 + thread-pool-size: 3 \ No newline at end of file diff --git a/manager/src/main/java/org/jiayunet/mapper/AppJobDataMapper.java b/manager/src/main/java/org/jiayunet/mapper/AppJobDataMapper.java index 675e23c..5ee38a3 100644 --- a/manager/src/main/java/org/jiayunet/mapper/AppJobDataMapper.java +++ b/manager/src/main/java/org/jiayunet/mapper/AppJobDataMapper.java @@ -1,8 +1,12 @@ package org.jiayunet.mapper; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; import org.jiayunet.pojo.po.AppJobData; +import java.util.List; + /** * 爬虫岗位原始数据Mapper * @@ -10,4 +14,11 @@ import org.jiayunet.pojo.po.AppJobData; */ @Mapper public interface AppJobDataMapper extends CommonMapper { + + /** + * 查询待清洗数据并加行锁(SELECT ... FOR UPDATE) + *

必须在事务内调用,配合状态更新实现原子锁定

+ */ + @Select("SELECT * FROM app_job_data WHERE clean_status = 0 AND is_valid = 1 LIMIT #{limit} FOR UPDATE") + List selectForUpdate(@Param("limit") int limit); } diff --git a/manager/src/main/java/org/jiayunet/mapper/CompanyMapper.java b/manager/src/main/java/org/jiayunet/mapper/CompanyMapper.java index e0e691b..6a62238 100644 --- a/manager/src/main/java/org/jiayunet/mapper/CompanyMapper.java +++ b/manager/src/main/java/org/jiayunet/mapper/CompanyMapper.java @@ -1,8 +1,12 @@ package org.jiayunet.mapper; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; import org.jiayunet.pojo.po.Company; +import java.util.List; + /** * 公司Mapper * @@ -10,4 +14,11 @@ import org.jiayunet.pojo.po.Company; */ @Mapper public interface CompanyMapper extends CommonMapper { + + /** + * 查询待完善公司并加行锁(SELECT ... FOR UPDATE) + *

必须在事务内调用,配合状态更新实现原子锁定

+ */ + @Select("SELECT * FROM bg_company WHERE status = 0 LIMIT #{limit} FOR UPDATE") + List selectForUpdate(@Param("limit") int limit); } diff --git a/manager/src/main/java/org/jiayunet/pojo/po/Company.java b/manager/src/main/java/org/jiayunet/pojo/po/Company.java index 976d8f4..dabd2a4 100644 --- a/manager/src/main/java/org/jiayunet/pojo/po/Company.java +++ b/manager/src/main/java/org/jiayunet/pojo/po/Company.java @@ -67,7 +67,7 @@ public class Company { /** 新闻动态(JSON数组) */ private String news; - /** 状态 0=待完善 1=已完善 2=禁用 3=补充中 */ + /** 状态 0=待完善 1=已完善 2=禁用 3=补充中 4=补充失败 */ private Integer status; /** 创建时间 */ diff --git a/manager/src/main/java/org/jiayunet/service/CompanyCleanService.java b/manager/src/main/java/org/jiayunet/service/CompanyCleanService.java new file mode 100644 index 0000000..7ad5361 --- /dev/null +++ b/manager/src/main/java/org/jiayunet/service/CompanyCleanService.java @@ -0,0 +1,184 @@ +package org.jiayunet.service; + +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.fasterxml.jackson.databind.JsonNode; +import lombok.extern.slf4j.Slf4j; +import org.jiayunet.ai.AiChatAbility; +import org.jiayunet.mapper.CompanyMapper; +import org.jiayunet.pojo.po.Company; +import org.jiayunet.tool.HttpTool; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import java.time.Instant; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * 公司数据补充服务 + *

定时从 bg_company 捞取待完善数据,调用 AI 补充公司信息

+ *

依赖:AiChatAbility(AI调用)、DictCacheService(行业列表/地区匹配)、CompanyCleanTransactionService(事务操作)

+ *

使用表:bg_company(读取/更新)

+ * + * @author zk + */ +@Slf4j +@Service +public class CompanyCleanService { + + @Autowired + private AiChatAbility aiChatAbility; + + @Autowired + private DictCacheService dictCacheService; + + @Autowired + private CompanyCleanTransactionService companyCleanTransactionService; + + @Autowired + private CompanyMapper companyMapper; + + @Value("${app.company-clean.batch-size:10}") + private int batchSize; + + @Value("${app.company-clean.thread-pool-size:3}") + private int threadPoolSize; + + private ExecutorService executorService; + + /** 初始化线程池 */ + @javax.annotation.PostConstruct + public void init() { + executorService = Executors.newFixedThreadPool(threadPoolSize); + } + + /** + * 任务C:公司数据补充(每小时) + *

1. 批量锁定待完善公司 2. 多线程并发调用AI补充 3. 回填数据

+ */ + @Scheduled(cron = "0 */1 * * * ?") + public void cleanCompany() { + List companyList = companyCleanTransactionService.lockBatch(batchSize); + if (companyList.isEmpty()) { + return; + } + log.info("公司补充:锁定{}条数据", companyList.size()); + + // 多线程并发处理 + for (Company company : companyList) { + executorService.submit(() -> { + try { + cleanOne(company); + } catch (Exception e) { + log.error("公司补充异常, id={}, shortName={}", company.getId(), company.getShortName(), e); + } + }); + } + } + + /** + * 任务D:公司僵尸恢复(每小时,与任务C错开30分钟) + *

将超时10分钟仍在补充中的数据重置为待完善

+ */ + @Scheduled(cron = "0 30 */1 * * ?") + public void recoverZombie() { + int recovered = companyMapper.update(null, + new LambdaUpdateWrapper() + .set(Company::getStatus, 0) + .eq(Company::getStatus, 3) + .lt(Company::getUpdateTime, Instant.now().minusSeconds(600))); + + if (recovered > 0) { + log.info("公司僵尸恢复:重置{}条数据", recovered); + } + } + + /** + * 补充单条公司数据 + *

1. 拼prompt调AI 2. 解析结果 3. 回填数据

+ */ + public void cleanOne(Company company) { + log.info("公司补充开始, id={}, shortName={}", company.getId(), company.getShortName()); + + String systemPrompt = buildSystemPrompt(); + String userMessage = buildUserMessage(company.getShortName()); + + String aiResponse = aiChatAbility.chat(systemPrompt, userMessage); + + try { + // 去掉可能的 markdown 代码块标记 + String json = aiResponse.trim(); + if (json.startsWith("```")) { + json = json.replaceAll("^```\\w*\\n?", "").replaceAll("\\n?```$", "").trim(); + } + // 清除控制字符(Tab等),保留换行符 + json = json.replaceAll("[\\x00-\\x08\\x0B\\x0C\\x0E-\\x1F]", ""); + + JsonNode root = HttpTool.objectMapper.readTree(json); + + // valid 校验:AI不认识该公司 + if (!root.path("valid").asBoolean(false)) { + log.info("公司补充:AI不认识该公司, id={}, shortName={}", company.getId(), company.getShortName()); + companyCleanTransactionService.updateCompanyStatus(company.getId(), 4); + return; + } + + // 回填数据 + companyCleanTransactionService.saveCompanyData(root, company); + log.info("公司补充完成, id={}, shortName={}", company.getId(), company.getShortName()); + + } catch (Exception e) { + log.error("公司AI返回解析失败, id={}, shortName={}, response={}", + company.getId(), company.getShortName(), aiResponse, e); + // 保持 status=3,由僵尸恢复任务重置 + } + } + + /** 构建系统提示词 */ + private String buildSystemPrompt() { + return """ + 你是一个企业信息补充助手。根据提供的公司简称,补充该公司的详细信息。 + + 返回JSON格式要求: + { + "valid": true/false, + "name": "公司全称", + "city": "总部所在城市,精确到市", + "companyType": "企业类型", + "industryId": 行业ID, + "tags": ["公司标签,最多5个"], + "summary": "一句话简介,100字以内", + "description": "公司详细描述,500字以内", + "foundedYear": "成立年份", + "address": "总部/注册地址", + "scale": "企业规模", + "website": "官网地址", + "financingStage": "融资状态", + "latestValuation": "最新估值", + "news": ["相关新闻,最多3条,每条50字以内"] + } + + 规则: + 1. 如果不认识该公司,返回 {"valid": false} + 2. name 根据公司简称推断完整的企业注册名称 + 3. companyType 取值:上市企业、独角兽、国企、央企、民营企业、外资企业、合资企业、事业单位、其他 + 4. industryId 必须从给定行业列表中选择,不确定则null + 5. scale 取值:少于50人、50-150人、150-500人、500-1000人、1000-5000人、5000-10000人、10000人以上 + 6. tags 体现公司核心业务特征,最多5个 + 7. news 基于你的知识提供该公司最新的3条相关新闻,每条50字以内 + 8. latestValuation 知道就提供,不知道则null + 9. 不确定的字段返回null,不要编造 + 10. 字符串值中不允许出现Tab、换行等控制字符 + 11. 只返回JSON,不要其他内容 + """; + } + + /** 构建用户消息 */ + private String buildUserMessage(String shortName) { + return "【公司简称】\n" + shortName + + "\n\n【行业列表】\n" + dictCacheService.getIndustryText(); + } +} diff --git a/manager/src/main/java/org/jiayunet/service/CompanyCleanTransactionService.java b/manager/src/main/java/org/jiayunet/service/CompanyCleanTransactionService.java new file mode 100644 index 0000000..e9f887e --- /dev/null +++ b/manager/src/main/java/org/jiayunet/service/CompanyCleanTransactionService.java @@ -0,0 +1,160 @@ +package org.jiayunet.service; + +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.fasterxml.jackson.databind.JsonNode; +import lombok.extern.slf4j.Slf4j; +import org.jiayunet.mapper.CompanyMapper; +import org.jiayunet.pojo.po.Company; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.Instant; +import java.util.List; + +/** + * 公司数据补充事务服务 + *

独立出来解决 @Transactional 同类自调用失效问题

+ *

依赖:CompanyMapper、DictCacheService(地区匹配、行业校验)

+ *

使用表:bg_company(更新)

+ * + * @author zk + */ +@Slf4j +@Service +public class CompanyCleanTransactionService { + + @Autowired + private CompanyMapper companyMapper; + + @Autowired + private DictCacheService dictCacheService; + + /** + * 回填公司数据(事务) + *

1. 解析AI返回的各字段 2. 地区匹配 3. 行业校验 4. 更新bg_company

+ */ + @Transactional(rollbackFor = Exception.class) + public void saveCompanyData(JsonNode root, Company company) { + // name:AI根据shortName推断的全称 + String name = root.path("name").asText(null); + if (name != null && !name.isBlank()) { + company.setName(name); + } + + // regionCode:AI返回城市名,Java侧匹配 + String city = root.path("city").asText(null); + if (city != null && !city.isBlank()) { + String regionCode = dictCacheService.matchRegionCode(city); + company.setRegionCode(regionCode); + } + + // companyType + String companyType = root.path("companyType").asText(null); + if (companyType != null && !"null".equals(companyType)) { + company.setCompanyType(companyType); + } + + // industryId:校验是否存在于行业列表中 + long industryId = root.path("industryId").asLong(0); + company.setIndustryId(industryId > 0 ? industryId : null); + + // tags:JSON数组 + JsonNode tagsNode = root.path("tags"); + if (tagsNode.isArray() && !tagsNode.isEmpty()) { + company.setTags(tagsNode.toString()); + } + + // summary + String summary = root.path("summary").asText(null); + if (summary != null && !"null".equals(summary)) { + company.setSummary(summary); + } + + // description + String description = root.path("description").asText(null); + if (description != null && !"null".equals(description)) { + company.setDescription(description); + } + + // foundedYear + String foundedYear = root.path("foundedYear").asText(null); + if (foundedYear != null && !"null".equals(foundedYear)) { + company.setFoundedYear(foundedYear); + } + + // address + String address = root.path("address").asText(null); + if (address != null && !"null".equals(address)) { + company.setAddress(address); + } + + // scale + String scale = root.path("scale").asText(null); + if (scale != null && !"null".equals(scale)) { + company.setScale(scale); + } + + // website + String website = root.path("website").asText(null); + if (website != null && !"null".equals(website)) { + company.setWebsite(website); + } + + // financingStage + String financingStage = root.path("financingStage").asText(null); + if (financingStage != null && !"null".equals(financingStage)) { + company.setFinancingStage(financingStage); + } + + // latestValuation + String latestValuation = root.path("latestValuation").asText(null); + if (latestValuation != null && !"null".equals(latestValuation)) { + company.setLatestValuation(latestValuation); + } + + // news:JSON数组 + JsonNode newsNode = root.path("news"); + if (newsNode.isArray() && !newsNode.isEmpty()) { + company.setNews(newsNode.toString()); + } + + // 更新状态和时间 + company.setStatus(1); + company.setUpdateTime(Instant.now()); + + companyMapper.updateById(company); + } + + /** 更新公司状态 */ + public void updateCompanyStatus(Long id, int status) { + companyMapper.update(null, + new LambdaUpdateWrapper() + .set(Company::getStatus, status) + .set(Company::getUpdateTime, Instant.now()) + .eq(Company::getId, id)); + } + + /** + * 原子锁定一批待完善公司(事务内 SELECT FOR UPDATE + UPDATE 状态) + *

行锁保证并发安全,其他线程会阻塞直到事务提交

+ * + * @return 锁定成功的公司列表,可能为空 + */ + @Transactional(rollbackFor = Exception.class) + public List lockBatch(int batchSize) { + List companyList = companyMapper.selectForUpdate(batchSize); + if (companyList.isEmpty()) { + return companyList; + } + + List ids = companyList.stream().map(Company::getId).toList(); + companyMapper.update(null, + new LambdaUpdateWrapper() + .set(Company::getStatus, 3) + .set(Company::getUpdateTime, Instant.now()) + .in(Company::getId, ids)); + + return companyList; + } +} diff --git a/manager/src/main/java/org/jiayunet/service/JobCleanService.java b/manager/src/main/java/org/jiayunet/service/JobCleanService.java index a77ab6e..6144098 100644 --- a/manager/src/main/java/org/jiayunet/service/JobCleanService.java +++ b/manager/src/main/java/org/jiayunet/service/JobCleanService.java @@ -69,25 +69,11 @@ public class JobCleanService { */ @Scheduled(cron = "0 */5 * * * ?") public void cleanJob() { - // 批量锁定:原子操作,clean_status 0→1 - int locked = appJobDataMapper.update(null, - new LambdaUpdateWrapper() - .set(AppJobData::getCleanStatus, 1) - .eq(AppJobData::getCleanStatus, 0) - .eq(AppJobData::getIsValid, 1) - .last("LIMIT " + batchSize)); - - if (locked == 0) { + List dataList = jobCleanTransactionService.lockBatch(batchSize); + if (dataList.isEmpty()) { return; } - log.info("岗位清洗:锁定{}条数据", locked); - - // 查出刚锁定的数据 - List dataList = appJobDataMapper.selectList( - new LambdaQueryWrapper() - .eq(AppJobData::getCleanStatus, 1) - .eq(AppJobData::getIsValid, 1) - .last("LIMIT " + batchSize)); + log.info("岗位清洗:锁定{}条数据", dataList.size()); // 多线程并发处理 for (AppJobData data : dataList) { diff --git a/manager/src/main/java/org/jiayunet/service/JobCleanTransactionService.java b/manager/src/main/java/org/jiayunet/service/JobCleanTransactionService.java index f2ed5c4..45b23e9 100644 --- a/manager/src/main/java/org/jiayunet/service/JobCleanTransactionService.java +++ b/manager/src/main/java/org/jiayunet/service/JobCleanTransactionService.java @@ -143,4 +143,26 @@ public class JobCleanTransactionService { .set(AppJobData::getCleanStatus, status) .eq(AppJobData::getId, id)); } + + /** + * 原子锁定一批待清洗数据(事务内 SELECT FOR UPDATE + UPDATE 状态) + *

行锁保证并发安全,其他线程会阻塞直到事务提交

+ * + * @return 锁定成功的数据列表,可能为空 + */ + @Transactional(rollbackFor = Exception.class) + public List lockBatch(int batchSize) { + List dataList = appJobDataMapper.selectForUpdate(batchSize); + if (dataList.isEmpty()) { + return dataList; + } + + List ids = dataList.stream().map(AppJobData::getId).toList(); + appJobDataMapper.update(null, + new LambdaUpdateWrapper() + .set(AppJobData::getCleanStatus, 1) + .in(AppJobData::getId, ids)); + + return dataList; + } }