添加公司信息处理逻辑
This commit is contained in:
+99
-44
@@ -26,7 +26,7 @@ inclusion: manual
|
||||
1. ✅ 岗位清洗触发逻辑
|
||||
2. ✅ 岗位清洗逻辑
|
||||
3. ✅ 公司数据触发逻辑
|
||||
4. ⬜ 公司数据补充逻辑(待定API后补充)
|
||||
4. ✅ 公司数据补充逻辑(AI补充)
|
||||
|
||||
---
|
||||
|
||||
@@ -53,7 +53,7 @@ CREATE INDEX idx_clean_status ON app_job_data (clean_status);
|
||||
|
||||
#### 任务A:岗位清洗任务(高频,每5分钟)
|
||||
|
||||
1. 批量锁定:`UPDATE app_job_data SET clean_status=1 WHERE clean_status=0 AND is_valid=1 LIMIT N`(原子操作,防止多线程重复捞取)
|
||||
1. 批量锁定(事务内 `SELECT ... FOR UPDATE` + `UPDATE`):先在事务内对 `clean_status=0 AND is_valid=1` 的行加行锁并查出数据,再更新 `clean_status=1`,事务提交后释放锁。行锁保证并发安全,其他线程会阻塞直到事务提交
|
||||
2. 将锁定的数据丢入线程池,多线程并发调用 AI API 清洗
|
||||
3. 每条处理完毕后,单独更新 `clean_status` 为 2(已入库)或 3(已丢弃)
|
||||
4. 单条写入事务:bg_job 入库 + clean_status 更新放在同一个短事务中,保证一致性
|
||||
@@ -221,25 +221,29 @@ CREATE TABLE bg_job_skill_tag_relation (
|
||||
|
||||
### 3.1 状态扩展
|
||||
|
||||
`bg_company.status` 扩展为4个值:
|
||||
`bg_company.status` 扩展为5个值:
|
||||
- 0=待完善:岗位清洗时自动创建的公司,只有 short_name
|
||||
- 1=已完善:工商API补充完成
|
||||
- 1=已完善:AI补充完成
|
||||
- 2=禁用:人工标记禁用
|
||||
- 3=补充中:定时任务已锁定,正在调用工商API
|
||||
- 3=补充中:定时任务已锁定,正在调用AI
|
||||
- 4=补充失败:AI明确不认识该公司,不再自动重试
|
||||
|
||||
### 3.2 两个定时任务(与岗位清洗同一套模式)
|
||||
|
||||
#### 任务C:公司数据补充任务(低频,每小时)
|
||||
|
||||
1. 批量锁定(原子操作):
|
||||
1. 批量锁定(事务内 `SELECT ... FOR UPDATE` + `UPDATE`):
|
||||
```sql
|
||||
UPDATE bg_company SET status=3, update_time=NOW() WHERE status=0 LIMIT N
|
||||
-- 事务内执行,行锁保证并发安全
|
||||
SELECT * FROM bg_company WHERE status=0 LIMIT N FOR UPDATE;
|
||||
UPDATE bg_company SET status=3, update_time=NOW() WHERE id IN (...);
|
||||
```
|
||||
⚠️ 锁定时必须同时更新 `update_time`,因为 `bg_company` 的 `update_time` 不像 `app_job_data.updated_at` 那样由数据库自动维护,需要 Java 侧手动设值。如果不更新,后续僵尸恢复任务无法正确判断超时。
|
||||
|
||||
2. 将锁定的数据丢入线程池,多线程并发调用工商API
|
||||
2. 将锁定的数据丢入线程池,多线程并发调用AI补充
|
||||
3. 每条处理完毕后,回填公司信息,更新 `status=1`(已完善)
|
||||
4. 工商API查不到或返回异常 → 保持 `status=3`,由僵尸恢复任务重置
|
||||
4. AI明确不认识该公司(valid=false)→ 更新 `status=4`(补充失败)
|
||||
5. AI调用异常或解析失败 → 保持 `status=3`,由僵尸恢复任务重置
|
||||
|
||||
#### 任务D:公司僵尸恢复任务(低频,每小时,与任务C错开)
|
||||
|
||||
@@ -257,7 +261,7 @@ UPDATE bg_company SET status=0 WHERE status=3 AND update_time < NOW() - INTERVAL
|
||||
|--------|----------|----------|
|
||||
| 状态字段 | app_job_data.clean_status | bg_company.status |
|
||||
| 锁定值 | 1=清洗中 | 3=补充中 |
|
||||
| 完成值 | 2=已入库 / 3=已丢弃 | 1=已完善 |
|
||||
| 完成值 | 2=已入库 / 3=已丢弃 | 1=已完善 / 4=补充失败 |
|
||||
| 时间字段 | updated_at(数据库自动) | update_time(Java手动设值) |
|
||||
| 锁定时是否需手动更新时间 | 不需要 | **需要**,否则僵尸恢复无法判断超时 |
|
||||
| 触发频率 | 每5分钟 | 每小时 |
|
||||
@@ -267,47 +271,98 @@ UPDATE bg_company SET status=0 WHERE status=3 AND update_time < NOW() - INTERVAL
|
||||
|
||||
| 决策点 | 结论 | 原因 |
|
||||
|--------|------|------|
|
||||
| 是否与岗位清洗同步触发 | 否,独立定时任务 | 外部API不同,频率不同,失败场景不同 |
|
||||
| 触发模式 | 复用岗位清洗的"原子锁定+僵尸恢复"模式 | 统一架构,代码可复用 |
|
||||
| 是否与岗位清洗同步触发 | 否,独立定时任务 | AI调用场景不同,频率不同 |
|
||||
| 触发模式 | 复用岗位清洗的"SELECT FOR UPDATE + 僵尸恢复"模式 | 统一架构,行锁保证并发安全 |
|
||||
| 锁定时是否更新时间 | 是 | bg_company.update_time 非数据库自动维护,不更新则僵尸恢复失效 |
|
||||
| 补充频率 | 每小时 | 公司数据量少,工商API可能有频率限制 |
|
||||
| 补充频率 | 每小时 | 公司数据量少,AI调用成本可控 |
|
||||
|
||||
---
|
||||
|
||||
## 四、公司数据补充逻辑(待定API后补充)
|
||||
## 四、公司数据补充逻辑(AI补充)
|
||||
|
||||
### 4.1 补充流程概要
|
||||
### 4.1 方案说明
|
||||
|
||||
1. 用 `short_name`(公司简称)调用工商API搜索
|
||||
2. API返回匹配的企业列表,取最匹配的一条
|
||||
3. 回填 `bg_company` 各字段,更新 `status=1`
|
||||
原计划使用工商API(天眼查等)查询公司信息,但考虑到:
|
||||
- 公司简称不能很好地查询到精确数据
|
||||
- 用AI生成完整企业名字再查API,准确性也无法保证
|
||||
- 我们的公司数据来源于招聘平台,基本都是有一定规模的企业,AI训练数据覆盖率高
|
||||
|
||||
### 4.2 需要回填的字段
|
||||
因此改为直接使用AI补充公司数据,一次调用返回全部字段。
|
||||
|
||||
| bg_company 字段 | 来源 | 说明 |
|
||||
|-----------------|------|------|
|
||||
| name | 工商API | 公司全称 |
|
||||
| logoUrl | 待定 | 工商API可能不提供,需另外来源 |
|
||||
| regionCode | 工商API(注册地址) | 匹配 bg_china_regions_code |
|
||||
| companyType | 工商API | 上市企业、独角兽、国企等 |
|
||||
| industryId | 工商API(行业分类) | 匹配 bg_industry |
|
||||
| tags | 工商API / AI | 公司标签,JSON数组 |
|
||||
| summary | 工商API / AI | 公司简要介绍 |
|
||||
| description | 工商API | 公司详细描述 |
|
||||
| foundedYear | 工商API | 成立时间 |
|
||||
| address | 工商API | 注册地址 |
|
||||
| scale | 工商API | 企业规模(人数) |
|
||||
| website | 工商API | 官网地址 |
|
||||
| financingStage | 工商API / 其他来源 | 融资状态 |
|
||||
| latestValuation | 工商API / 其他来源 | 最新估值 |
|
||||
| news | 新闻API(待定) | 新闻动态,JSON数组 |
|
||||
### 4.2 补充流程
|
||||
|
||||
### 4.3 待定事项
|
||||
1. 拿 `short_name`(公司简称),拼 prompt 调 AI
|
||||
2. prompt 中附带行业列表(`DictCacheService.getIndustryText()`),让 AI 直接返回 `industryId`
|
||||
3. AI 返回结构化 JSON,包含 `valid` 字段判断是否认识该公司
|
||||
4. `valid=false` → 更新 `status=4`(补充失败),结束
|
||||
5. `regionCode`:AI 返回城市名,Java 侧 `DictCacheService.matchRegionCode` 匹配
|
||||
6. 解析 JSON,回填 `bg_company` 各字段
|
||||
7. 更新 `status=1`(已完善)
|
||||
|
||||
- [ ] 选定工商信息API(天眼查、企查查、爱企查等)
|
||||
- [ ] 确认API返回字段与 bg_company 的映射关系
|
||||
- [ ] 新闻动态数据来源(工商API是否包含,还是需要单独的新闻API)
|
||||
- [ ] logoUrl 来源(工商API是否提供)
|
||||
- [ ] 匹配到多条结果时的处理策略
|
||||
- [ ] 查不到结果时的处理策略(保持待完善 or 标记为其他状态)
|
||||
- [ ] API调用频率限制和成本评估
|
||||
### 4.3 AI 返回 JSON 结构
|
||||
|
||||
```json
|
||||
{
|
||||
"valid": true,
|
||||
"name": "北京字节跳动科技有限公司",
|
||||
"city": "北京",
|
||||
"companyType": "独角兽",
|
||||
"industryId": 5,
|
||||
"tags": ["短视频", "人工智能", "社交平台"],
|
||||
"summary": "全球领先的内容平台和科技公司,旗下拥有抖音、今日头条等产品",
|
||||
"description": "字节跳动成立于2012年,是一家以技术驱动的全球化互联网公司...",
|
||||
"foundedYear": "2012",
|
||||
"address": "北京市海淀区北三环西路27号",
|
||||
"scale": "10000人以上",
|
||||
"website": "https://www.bytedance.com",
|
||||
"financingStage": "已上市",
|
||||
"latestValuation": "2200亿美元",
|
||||
"news": [
|
||||
"字节跳动2024年营收突破1200亿美元创历史新高",
|
||||
"TikTok全球月活用户突破15亿大关",
|
||||
"字节跳动加大AI大模型研发投入布局人工智能赛道"
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### 4.4 各字段补充规则
|
||||
|
||||
| bg_company 字段 | AI返回字段 | 规则 |
|
||||
|-----------------|-----------|------|
|
||||
| name | name | 公司全称,AI不确定则null |
|
||||
| logoUrl | — | AI无法提供,留空 |
|
||||
| regionCode | city | AI返回城市名,Java侧matchRegionCode匹配 |
|
||||
| companyType | companyType | 上市企业、独角兽、国企、民营企业、外资企业等 |
|
||||
| industryId | industryId | 从行业列表中选,不确定则null |
|
||||
| tags | tags | 公司标签,JSON数组,最多5个 |
|
||||
| summary | summary | 一句话简介,100字以内 |
|
||||
| description | description | 公司详细描述,500字以内 |
|
||||
| foundedYear | foundedYear | 成立年份,如"2012" |
|
||||
| address | address | 注册/总部地址 |
|
||||
| scale | scale | 企业规模,如"1000-5000人"、"10000人以上" |
|
||||
| website | website | 官网地址 |
|
||||
| financingStage | financingStage | 融资状态,如"A轮"、"已上市"、"不需要融资" |
|
||||
| latestValuation | latestValuation | 最新估值,AI知道就给,不知道null |
|
||||
| news | news | 3条相关新闻,每条50字以内,JSON数组 |
|
||||
|
||||
### 4.5 prompt 规则
|
||||
|
||||
- AI 不认识该公司 → `valid=false`,其他字段不需要返回
|
||||
- 不确定的字段返回 null,不要编造
|
||||
- `industryId` 必须从给定行业列表中选择
|
||||
- `news` 最多3条,每条50字以内,基于AI知识库中最新的信息
|
||||
- `tags` 最多5个,体现公司核心业务特征
|
||||
- 字符串值中不允许出现Tab、换行等控制字符
|
||||
- 只返回JSON,不要其他内容
|
||||
|
||||
### 4.6 设计决策记录
|
||||
|
||||
| 决策点 | 结论 | 原因 |
|
||||
|--------|------|------|
|
||||
| 数据来源 | AI补充,不用工商API | 公司简称查API不精确,AI对招聘平台企业覆盖率高 |
|
||||
| AI不认识的公司 | status=4(补充失败),不再自动重试 | 避免无限重试浪费AI调用 |
|
||||
| logoUrl | 留空 | AI无法提供图片URL |
|
||||
| news 时效性 | 不要求实时,取AI知识库内最新的3条 | 求职场景不需要实时新闻 |
|
||||
| latestValuation | AI知道就给,不知道null | 大部分招聘企业AI有数据 |
|
||||
| regionCode 匹配方式 | AI返回城市名,Java侧匹配 | 复用已有的matchRegionCode逻辑 |
|
||||
| industryId 匹配方式 | prompt带行业列表,AI直接返回ID | 复用已有的行业列表文本 |
|
||||
|
||||
@@ -100,7 +100,7 @@ offerpie/back-end
|
||||
│ │ ├─ UserJobDislike.java # 用户不感兴趣记录表(bg_user_job_dislike)
|
||||
│ │ └─ AppJobData.java # 爬虫岗位原始数据表(app_job_data)
|
||||
│ └─ vo/ # ViewObject(OssUrlVo 等)
|
||||
└─ service/ # 业务 Service(OssService、SmsService、DictCacheService、JobCleanService、JobCleanTransactionService 等)
|
||||
└─ service/ # 业务 Service(OssService、SmsService、DictCacheService、JobCleanService、JobCleanTransactionService、CompanyCleanService、CompanyCleanTransactionService 等)
|
||||
```
|
||||
> **设计理念** – 业务实体和 Mapper 位于 `manager`,B 端和 C 端共享;C 端特有的注解、切面、权限服务、路由菜单服务位于 `client-api`,避免 B 端误用;`common` 提供统一的技术支撑。
|
||||
|
||||
|
||||
@@ -87,4 +87,9 @@ app:
|
||||
# 岗位清洗配置
|
||||
job-clean:
|
||||
batch-size: 20
|
||||
thread-pool-size: 5
|
||||
thread-pool-size: 5
|
||||
|
||||
# 公司数据补充配置
|
||||
company-clean:
|
||||
batch-size: 10
|
||||
thread-pool-size: 3
|
||||
@@ -1,8 +1,12 @@
|
||||
package org.jiayunet.mapper;
|
||||
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
import org.apache.ibatis.annotations.Select;
|
||||
import org.jiayunet.pojo.po.AppJobData;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 爬虫岗位原始数据Mapper
|
||||
*
|
||||
@@ -10,4 +14,11 @@ import org.jiayunet.pojo.po.AppJobData;
|
||||
*/
|
||||
@Mapper
|
||||
public interface AppJobDataMapper extends CommonMapper<AppJobData> {
|
||||
|
||||
/**
|
||||
* 查询待清洗数据并加行锁(SELECT ... FOR UPDATE)
|
||||
* <p>必须在事务内调用,配合状态更新实现原子锁定</p>
|
||||
*/
|
||||
@Select("SELECT * FROM app_job_data WHERE clean_status = 0 AND is_valid = 1 LIMIT #{limit} FOR UPDATE")
|
||||
List<AppJobData> selectForUpdate(@Param("limit") int limit);
|
||||
}
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
package org.jiayunet.mapper;
|
||||
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
import org.apache.ibatis.annotations.Select;
|
||||
import org.jiayunet.pojo.po.Company;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 公司Mapper
|
||||
*
|
||||
@@ -10,4 +14,11 @@ import org.jiayunet.pojo.po.Company;
|
||||
*/
|
||||
@Mapper
|
||||
public interface CompanyMapper extends CommonMapper<Company> {
|
||||
|
||||
/**
|
||||
* 查询待完善公司并加行锁(SELECT ... FOR UPDATE)
|
||||
* <p>必须在事务内调用,配合状态更新实现原子锁定</p>
|
||||
*/
|
||||
@Select("SELECT * FROM bg_company WHERE status = 0 LIMIT #{limit} FOR UPDATE")
|
||||
List<Company> selectForUpdate(@Param("limit") int limit);
|
||||
}
|
||||
|
||||
@@ -67,7 +67,7 @@ public class Company {
|
||||
/** 新闻动态(JSON数组) */
|
||||
private String news;
|
||||
|
||||
/** 状态 0=待完善 1=已完善 2=禁用 3=补充中 */
|
||||
/** 状态 0=待完善 1=已完善 2=禁用 3=补充中 4=补充失败 */
|
||||
private Integer status;
|
||||
|
||||
/** 创建时间 */
|
||||
|
||||
@@ -0,0 +1,184 @@
|
||||
package org.jiayunet.service;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jiayunet.ai.AiChatAbility;
|
||||
import org.jiayunet.mapper.CompanyMapper;
|
||||
import org.jiayunet.pojo.po.Company;
|
||||
import org.jiayunet.tool.HttpTool;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/**
|
||||
* 公司数据补充服务
|
||||
* <p>定时从 bg_company 捞取待完善数据,调用 AI 补充公司信息</p>
|
||||
* <p>依赖:AiChatAbility(AI调用)、DictCacheService(行业列表/地区匹配)、CompanyCleanTransactionService(事务操作)</p>
|
||||
* <p>使用表:bg_company(读取/更新)</p>
|
||||
*
|
||||
* @author zk
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class CompanyCleanService {
|
||||
|
||||
@Autowired
|
||||
private AiChatAbility aiChatAbility;
|
||||
|
||||
@Autowired
|
||||
private DictCacheService dictCacheService;
|
||||
|
||||
@Autowired
|
||||
private CompanyCleanTransactionService companyCleanTransactionService;
|
||||
|
||||
@Autowired
|
||||
private CompanyMapper companyMapper;
|
||||
|
||||
@Value("${app.company-clean.batch-size:10}")
|
||||
private int batchSize;
|
||||
|
||||
@Value("${app.company-clean.thread-pool-size:3}")
|
||||
private int threadPoolSize;
|
||||
|
||||
private ExecutorService executorService;
|
||||
|
||||
/** 初始化线程池 */
|
||||
@javax.annotation.PostConstruct
|
||||
public void init() {
|
||||
executorService = Executors.newFixedThreadPool(threadPoolSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* 任务C:公司数据补充(每小时)
|
||||
* <p>1. 批量锁定待完善公司 2. 多线程并发调用AI补充 3. 回填数据</p>
|
||||
*/
|
||||
@Scheduled(cron = "0 */1 * * * ?")
|
||||
public void cleanCompany() {
|
||||
List<Company> companyList = companyCleanTransactionService.lockBatch(batchSize);
|
||||
if (companyList.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
log.info("公司补充:锁定{}条数据", companyList.size());
|
||||
|
||||
// 多线程并发处理
|
||||
for (Company company : companyList) {
|
||||
executorService.submit(() -> {
|
||||
try {
|
||||
cleanOne(company);
|
||||
} catch (Exception e) {
|
||||
log.error("公司补充异常, id={}, shortName={}", company.getId(), company.getShortName(), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 任务D:公司僵尸恢复(每小时,与任务C错开30分钟)
|
||||
* <p>将超时10分钟仍在补充中的数据重置为待完善</p>
|
||||
*/
|
||||
@Scheduled(cron = "0 30 */1 * * ?")
|
||||
public void recoverZombie() {
|
||||
int recovered = companyMapper.update(null,
|
||||
new LambdaUpdateWrapper<Company>()
|
||||
.set(Company::getStatus, 0)
|
||||
.eq(Company::getStatus, 3)
|
||||
.lt(Company::getUpdateTime, Instant.now().minusSeconds(600)));
|
||||
|
||||
if (recovered > 0) {
|
||||
log.info("公司僵尸恢复:重置{}条数据", recovered);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 补充单条公司数据
|
||||
* <p>1. 拼prompt调AI 2. 解析结果 3. 回填数据</p>
|
||||
*/
|
||||
public void cleanOne(Company company) {
|
||||
log.info("公司补充开始, id={}, shortName={}", company.getId(), company.getShortName());
|
||||
|
||||
String systemPrompt = buildSystemPrompt();
|
||||
String userMessage = buildUserMessage(company.getShortName());
|
||||
|
||||
String aiResponse = aiChatAbility.chat(systemPrompt, userMessage);
|
||||
|
||||
try {
|
||||
// 去掉可能的 markdown 代码块标记
|
||||
String json = aiResponse.trim();
|
||||
if (json.startsWith("```")) {
|
||||
json = json.replaceAll("^```\\w*\\n?", "").replaceAll("\\n?```$", "").trim();
|
||||
}
|
||||
// 清除控制字符(Tab等),保留换行符
|
||||
json = json.replaceAll("[\\x00-\\x08\\x0B\\x0C\\x0E-\\x1F]", "");
|
||||
|
||||
JsonNode root = HttpTool.objectMapper.readTree(json);
|
||||
|
||||
// valid 校验:AI不认识该公司
|
||||
if (!root.path("valid").asBoolean(false)) {
|
||||
log.info("公司补充:AI不认识该公司, id={}, shortName={}", company.getId(), company.getShortName());
|
||||
companyCleanTransactionService.updateCompanyStatus(company.getId(), 4);
|
||||
return;
|
||||
}
|
||||
|
||||
// 回填数据
|
||||
companyCleanTransactionService.saveCompanyData(root, company);
|
||||
log.info("公司补充完成, id={}, shortName={}", company.getId(), company.getShortName());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("公司AI返回解析失败, id={}, shortName={}, response={}",
|
||||
company.getId(), company.getShortName(), aiResponse, e);
|
||||
// 保持 status=3,由僵尸恢复任务重置
|
||||
}
|
||||
}
|
||||
|
||||
/** 构建系统提示词 */
|
||||
private String buildSystemPrompt() {
|
||||
return """
|
||||
你是一个企业信息补充助手。根据提供的公司简称,补充该公司的详细信息。
|
||||
|
||||
返回JSON格式要求:
|
||||
{
|
||||
"valid": true/false,
|
||||
"name": "公司全称",
|
||||
"city": "总部所在城市,精确到市",
|
||||
"companyType": "企业类型",
|
||||
"industryId": 行业ID,
|
||||
"tags": ["公司标签,最多5个"],
|
||||
"summary": "一句话简介,100字以内",
|
||||
"description": "公司详细描述,500字以内",
|
||||
"foundedYear": "成立年份",
|
||||
"address": "总部/注册地址",
|
||||
"scale": "企业规模",
|
||||
"website": "官网地址",
|
||||
"financingStage": "融资状态",
|
||||
"latestValuation": "最新估值",
|
||||
"news": ["相关新闻,最多3条,每条50字以内"]
|
||||
}
|
||||
|
||||
规则:
|
||||
1. 如果不认识该公司,返回 {"valid": false}
|
||||
2. name 根据公司简称推断完整的企业注册名称
|
||||
3. companyType 取值:上市企业、独角兽、国企、央企、民营企业、外资企业、合资企业、事业单位、其他
|
||||
4. industryId 必须从给定行业列表中选择,不确定则null
|
||||
5. scale 取值:少于50人、50-150人、150-500人、500-1000人、1000-5000人、5000-10000人、10000人以上
|
||||
6. tags 体现公司核心业务特征,最多5个
|
||||
7. news 基于你的知识提供该公司最新的3条相关新闻,每条50字以内
|
||||
8. latestValuation 知道就提供,不知道则null
|
||||
9. 不确定的字段返回null,不要编造
|
||||
10. 字符串值中不允许出现Tab、换行等控制字符
|
||||
11. 只返回JSON,不要其他内容
|
||||
""";
|
||||
}
|
||||
|
||||
/** 构建用户消息 */
|
||||
private String buildUserMessage(String shortName) {
|
||||
return "【公司简称】\n" + shortName +
|
||||
"\n\n【行业列表】\n" + dictCacheService.getIndustryText();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,160 @@
|
||||
package org.jiayunet.service;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jiayunet.mapper.CompanyMapper;
|
||||
import org.jiayunet.pojo.po.Company;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 公司数据补充事务服务
|
||||
* <p>独立出来解决 @Transactional 同类自调用失效问题</p>
|
||||
* <p>依赖:CompanyMapper、DictCacheService(地区匹配、行业校验)</p>
|
||||
* <p>使用表:bg_company(更新)</p>
|
||||
*
|
||||
* @author zk
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class CompanyCleanTransactionService {
|
||||
|
||||
@Autowired
|
||||
private CompanyMapper companyMapper;
|
||||
|
||||
@Autowired
|
||||
private DictCacheService dictCacheService;
|
||||
|
||||
/**
|
||||
* 回填公司数据(事务)
|
||||
* <p>1. 解析AI返回的各字段 2. 地区匹配 3. 行业校验 4. 更新bg_company</p>
|
||||
*/
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void saveCompanyData(JsonNode root, Company company) {
|
||||
// name:AI根据shortName推断的全称
|
||||
String name = root.path("name").asText(null);
|
||||
if (name != null && !name.isBlank()) {
|
||||
company.setName(name);
|
||||
}
|
||||
|
||||
// regionCode:AI返回城市名,Java侧匹配
|
||||
String city = root.path("city").asText(null);
|
||||
if (city != null && !city.isBlank()) {
|
||||
String regionCode = dictCacheService.matchRegionCode(city);
|
||||
company.setRegionCode(regionCode);
|
||||
}
|
||||
|
||||
// companyType
|
||||
String companyType = root.path("companyType").asText(null);
|
||||
if (companyType != null && !"null".equals(companyType)) {
|
||||
company.setCompanyType(companyType);
|
||||
}
|
||||
|
||||
// industryId:校验是否存在于行业列表中
|
||||
long industryId = root.path("industryId").asLong(0);
|
||||
company.setIndustryId(industryId > 0 ? industryId : null);
|
||||
|
||||
// tags:JSON数组
|
||||
JsonNode tagsNode = root.path("tags");
|
||||
if (tagsNode.isArray() && !tagsNode.isEmpty()) {
|
||||
company.setTags(tagsNode.toString());
|
||||
}
|
||||
|
||||
// summary
|
||||
String summary = root.path("summary").asText(null);
|
||||
if (summary != null && !"null".equals(summary)) {
|
||||
company.setSummary(summary);
|
||||
}
|
||||
|
||||
// description
|
||||
String description = root.path("description").asText(null);
|
||||
if (description != null && !"null".equals(description)) {
|
||||
company.setDescription(description);
|
||||
}
|
||||
|
||||
// foundedYear
|
||||
String foundedYear = root.path("foundedYear").asText(null);
|
||||
if (foundedYear != null && !"null".equals(foundedYear)) {
|
||||
company.setFoundedYear(foundedYear);
|
||||
}
|
||||
|
||||
// address
|
||||
String address = root.path("address").asText(null);
|
||||
if (address != null && !"null".equals(address)) {
|
||||
company.setAddress(address);
|
||||
}
|
||||
|
||||
// scale
|
||||
String scale = root.path("scale").asText(null);
|
||||
if (scale != null && !"null".equals(scale)) {
|
||||
company.setScale(scale);
|
||||
}
|
||||
|
||||
// website
|
||||
String website = root.path("website").asText(null);
|
||||
if (website != null && !"null".equals(website)) {
|
||||
company.setWebsite(website);
|
||||
}
|
||||
|
||||
// financingStage
|
||||
String financingStage = root.path("financingStage").asText(null);
|
||||
if (financingStage != null && !"null".equals(financingStage)) {
|
||||
company.setFinancingStage(financingStage);
|
||||
}
|
||||
|
||||
// latestValuation
|
||||
String latestValuation = root.path("latestValuation").asText(null);
|
||||
if (latestValuation != null && !"null".equals(latestValuation)) {
|
||||
company.setLatestValuation(latestValuation);
|
||||
}
|
||||
|
||||
// news:JSON数组
|
||||
JsonNode newsNode = root.path("news");
|
||||
if (newsNode.isArray() && !newsNode.isEmpty()) {
|
||||
company.setNews(newsNode.toString());
|
||||
}
|
||||
|
||||
// 更新状态和时间
|
||||
company.setStatus(1);
|
||||
company.setUpdateTime(Instant.now());
|
||||
|
||||
companyMapper.updateById(company);
|
||||
}
|
||||
|
||||
/** 更新公司状态 */
|
||||
public void updateCompanyStatus(Long id, int status) {
|
||||
companyMapper.update(null,
|
||||
new LambdaUpdateWrapper<Company>()
|
||||
.set(Company::getStatus, status)
|
||||
.set(Company::getUpdateTime, Instant.now())
|
||||
.eq(Company::getId, id));
|
||||
}
|
||||
|
||||
/**
|
||||
* 原子锁定一批待完善公司(事务内 SELECT FOR UPDATE + UPDATE 状态)
|
||||
* <p>行锁保证并发安全,其他线程会阻塞直到事务提交</p>
|
||||
*
|
||||
* @return 锁定成功的公司列表,可能为空
|
||||
*/
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public List<Company> lockBatch(int batchSize) {
|
||||
List<Company> companyList = companyMapper.selectForUpdate(batchSize);
|
||||
if (companyList.isEmpty()) {
|
||||
return companyList;
|
||||
}
|
||||
|
||||
List<Long> ids = companyList.stream().map(Company::getId).toList();
|
||||
companyMapper.update(null,
|
||||
new LambdaUpdateWrapper<Company>()
|
||||
.set(Company::getStatus, 3)
|
||||
.set(Company::getUpdateTime, Instant.now())
|
||||
.in(Company::getId, ids));
|
||||
|
||||
return companyList;
|
||||
}
|
||||
}
|
||||
@@ -69,25 +69,11 @@ public class JobCleanService {
|
||||
*/
|
||||
@Scheduled(cron = "0 */5 * * * ?")
|
||||
public void cleanJob() {
|
||||
// 批量锁定:原子操作,clean_status 0→1
|
||||
int locked = appJobDataMapper.update(null,
|
||||
new LambdaUpdateWrapper<AppJobData>()
|
||||
.set(AppJobData::getCleanStatus, 1)
|
||||
.eq(AppJobData::getCleanStatus, 0)
|
||||
.eq(AppJobData::getIsValid, 1)
|
||||
.last("LIMIT " + batchSize));
|
||||
|
||||
if (locked == 0) {
|
||||
List<AppJobData> dataList = jobCleanTransactionService.lockBatch(batchSize);
|
||||
if (dataList.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
log.info("岗位清洗:锁定{}条数据", locked);
|
||||
|
||||
// 查出刚锁定的数据
|
||||
List<AppJobData> dataList = appJobDataMapper.selectList(
|
||||
new LambdaQueryWrapper<AppJobData>()
|
||||
.eq(AppJobData::getCleanStatus, 1)
|
||||
.eq(AppJobData::getIsValid, 1)
|
||||
.last("LIMIT " + batchSize));
|
||||
log.info("岗位清洗:锁定{}条数据", dataList.size());
|
||||
|
||||
// 多线程并发处理
|
||||
for (AppJobData data : dataList) {
|
||||
|
||||
@@ -143,4 +143,26 @@ public class JobCleanTransactionService {
|
||||
.set(AppJobData::getCleanStatus, status)
|
||||
.eq(AppJobData::getId, id));
|
||||
}
|
||||
|
||||
/**
|
||||
* 原子锁定一批待清洗数据(事务内 SELECT FOR UPDATE + UPDATE 状态)
|
||||
* <p>行锁保证并发安全,其他线程会阻塞直到事务提交</p>
|
||||
*
|
||||
* @return 锁定成功的数据列表,可能为空
|
||||
*/
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public List<AppJobData> lockBatch(int batchSize) {
|
||||
List<AppJobData> dataList = appJobDataMapper.selectForUpdate(batchSize);
|
||||
if (dataList.isEmpty()) {
|
||||
return dataList;
|
||||
}
|
||||
|
||||
List<Long> ids = dataList.stream().map(AppJobData::getId).toList();
|
||||
appJobDataMapper.update(null,
|
||||
new LambdaUpdateWrapper<AppJobData>()
|
||||
.set(AppJobData::getCleanStatus, 1)
|
||||
.in(AppJobData::getId, ids));
|
||||
|
||||
return dataList;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user