From 6bddeecb7a2d1b0e92b7592ac0c7a1de92731874 Mon Sep 17 00:00:00 2001 From: zk Date: Wed, 18 Mar 2026 21:33:07 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B2=97=E4=BD=8D=E6=B8=85=E6=B4=97=EF=BC=8C?= =?UTF-8?q?=E5=B7=AE=E5=B2=97=E4=BD=8D=E6=8A=80=E8=83=BD=E6=8F=90=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .kiro/steering/数据清洗方案.md | 274 ++++++++++++++++++ .kiro/steering/项目结构说明.md | 13 +- .../src/main/resources/application-local.yml | 16 +- .../java/org/jiayunet/ai/AiChatAbility.java | 77 +++++ .../java/org/jiayunet/ai/AiChatConfig.java | 33 +++ .../main/java/org/jiayunet/tool/HttpTool.java | 2 +- .../org/jiayunet/mapper/AppJobDataMapper.java | 13 + .../java/org/jiayunet/pojo/po/AppJobData.java | 79 +++++ .../java/org/jiayunet/pojo/po/Company.java | 2 +- .../jiayunet/service/DictCacheService.java | 124 ++++++++ .../org/jiayunet/service/JobCleanService.java | 248 ++++++++++++++++ .../service/JobCleanTransactionService.java | 123 ++++++++ 12 files changed, 997 insertions(+), 7 deletions(-) create mode 100644 .kiro/steering/数据清洗方案.md create mode 100644 common/src/main/java/org/jiayunet/ai/AiChatAbility.java create mode 100644 common/src/main/java/org/jiayunet/ai/AiChatConfig.java create mode 100644 manager/src/main/java/org/jiayunet/mapper/AppJobDataMapper.java create mode 100644 manager/src/main/java/org/jiayunet/pojo/po/AppJobData.java create mode 100644 manager/src/main/java/org/jiayunet/service/DictCacheService.java create mode 100644 manager/src/main/java/org/jiayunet/service/JobCleanService.java create mode 100644 manager/src/main/java/org/jiayunet/service/JobCleanTransactionService.java diff --git a/.kiro/steering/数据清洗方案.md b/.kiro/steering/数据清洗方案.md new file mode 100644 index 0000000..199bac9 --- /dev/null +++ b/.kiro/steering/数据清洗方案.md @@ -0,0 +1,274 @@ +--- +inclusion: manual +--- + +# 数据清洗方案 + +## 总体架构 + +``` +爬虫(公司网络) → app_job_data(原始数据) + ↓ + Java定时任务读取(多线程) + ↓ + 调用AI API清洗/结构化 + ↓ + 写入业务表(bg_company + bg_job + 关联表) + ↓ + 公司信息不完整的 → 调工商API补充 +``` + +所有清洗逻辑放在 `manager` 模块,通过 `@Scheduled` 定时任务触发。 + +## 讨论分区 + +整体方案分为四部分逐步讨论: +1. ✅ 岗位清洗触发逻辑 +2. ✅ 岗位清洗逻辑 +3. ✅ 公司数据触发逻辑 +4. ⬜ 公司数据补充逻辑(待定API后补充) + +--- + +## 一、岗位清洗触发逻辑 + +### 1.1 表结构变更 + +`app_job_data` 新增字段: + +```sql +ALTER TABLE app_job_data ADD COLUMN clean_status TINYINT(1) DEFAULT 0 NOT NULL COMMENT '清洗状态 0=待清洗 1=清洗中 2=已入库 3=已丢弃'; +CREATE INDEX idx_clean_status ON app_job_data (clean_status); +``` + +状态说明: +- 0=待清洗:新爬到的数据,默认值,不影响爬虫原有插入逻辑 +- 1=清洗中:定时任务已锁定,正在处理 +- 2=已入库:清洗成功,已写入 bg_job +- 3=已丢弃:AI判定为无效数据,不入库 + +时间记录:不加额外时间字段,利用已有的 `updated_at`(ON UPDATE CURRENT_TIMESTAMP),状态变更时自动更新。 + +### 1.2 两个定时任务 + +#### 任务A:岗位清洗任务(高频,每5分钟) + +1. 批量锁定:`UPDATE app_job_data SET clean_status=1 WHERE clean_status=0 AND is_valid=1 LIMIT N`(原子操作,防止多线程重复捞取) +2. 将锁定的数据丢入线程池,多线程并发调用 AI API 清洗 +3. 每条处理完毕后,单独更新 `clean_status` 为 2(已入库)或 3(已丢弃) +4. 单条写入事务:bg_job 入库 + clean_status 更新放在同一个短事务中,保证一致性 + +#### 任务B:僵尸恢复任务(低频,每30分钟) + +处理因发布重启导致卡在"清洗中"的僵尸数据: + +```sql +UPDATE app_job_data SET clean_status=0 WHERE clean_status=1 AND updated_at < NOW() - INTERVAL 10 MINUTE +``` + +一条SQL搞定,将超时10分钟仍在"清洗中"的数据重置为待清洗,下次任务A会重新捞取处理。 + +### 1.3 去重保障 + +即使同一条数据被重复清洗(僵尸恢复后重新处理),写入 `bg_job` 时通过 `source_id` 判断是否已存在,存在则跳过,不会产生重复数据。 + +### 1.4 设计决策记录 + +| 决策点 | 结论 | 原因 | +|--------|------|------| +| 清洗状态放哪 | app_job_data 加字段 | 同库,简单直接 | +| 是否加"清洗中"状态 | 是 | 多线程并发需要锁定机制 | +| 长事务 vs 短事务 | 短事务(单条) | AI调用耗时长,不能hold连接 | +| 僵尸恢复方式 | 独立低频定时任务 | 避免每次清洗任务都多一次查询,节省性能 | +| 是否加 clean_time 字段 | 否 | updated_at 自动更新,够用 | +| 失败重试 | 僵尸恢复任务自动处理 | clean_status=1 超时后重置为0,自动重试 | + +--- + +## 二、岗位清洗逻辑 + +### 2.1 前置校验(Java侧,不调AI) + +- `description` 为空或长度 < 20 → 直接标记 `clean_status=3`(丢弃),跳过,节省AI调用成本 + +### 2.2 参考数据准备 + +应用启动时加载并缓存(定期刷新): +- `bg_job_category` 全量:拼成 `id:name` 文本列表 +- `bg_industry` 全量:拼成 `id:name` 文本列表 + +这两份列表作为 prompt 的一部分传给AI,ID由人工维护为短数字,不使用雪花ID。 + +地区数据(`bg_china_regions_code`)不传给AI,由Java侧根据AI返回的城市名自行匹配。 + +### 2.3 AI 调用(单次调用,返回结构化JSON) + +#### 输入 + +- 原始字段:job_title、salary、location、company、experience、education、description +- 参考列表:岗位分类(id:name)、行业(id:name) + +#### AI 返回 JSON 结构 + +```json +{ + "valid": true, + "title": "Java高级开发工程师", + "salary": "15-25K", + "education": 2, + "minExperience": 3, + "employmentType": 0, + "categoryId": 12, + "industryId": 5, + "description": "1. 负责核心业务系统开发...", + "requirement": "1. 本科及以上学历...", + "bonus": "1. 有分布式系统经验优先...", + "tags": ["数据分析", "产品策略", "团队协作"], + "skillTags": ["Java", "Spring Boot", "MySQL"], + "companyShortName": "字节跳动", + "cities": ["北京", "上海"] +} +``` + +#### 各字段清洗规则 + +| 字段 | 来源 | 规则 | +|------|------|------| +| valid | AI综合判断 | 数据是否有效,false则丢弃 | +| title | job_title | 存在则保留;不存在则AI从description归纳生成 | +| salary | salary | 有效则标准化(10-20K / 20K / 面议);无效或空则null | +| education | education + description | 映射为 0=不限 1=大专 2=本科 3=硕士 4=博士 | +| minExperience | experience + description | 提取最低年限数值,不要求则为0 | +| employmentType | description | 判断 0=全职 1=兼职,默认0 | +| categoryId | description + job_title | 必选,从分类列表中选最接近的,不允许返回null | +| industryId | description(任职要求部分) | 仅当明确提到行业经验要求时设置;列表中无完全匹配则选最相似的;未提到则null | +| description | description + experience + education | 提取"岗位职责"部分,保持原文风格,格式化展示 | +| requirement | description + experience + education | 提取"任职要求"部分,保持原文风格,格式化展示 | +| bonus | description + experience + education | 提取"加分项"部分,无则空 | +| tags | description + job_title | 核心职能标签(如数据分析、产品策略、团队协作),最多5个 | +| skillTags | description | 技能关键词(如Java、Spring Boot、MySQL),最多8个 | +| companyShortName | company | 提取简洁的公司简称,去掉地区后缀、招聘后缀、括号内容等,保持"中国平安""字节跳动"风格 | +| cities | location | 提取城市名列表,精确到市级 | + +### 2.4 AI 返回后的 Java 处理流程 + +1. **valid=false** → 更新 `clean_status=3`,结束 +2. **公司处理**:按AI清洗后的 `companyShortName` 查 `bg_company.short_name`,存在则拿 `company_id`;不存在则创建一条(short_name=companyShortName, status=0待完善),拿新ID +3. **地区处理**:`cities` 列表逐个匹配 `bg_china_regions_code`(按name匹配到市级),匹配上的准备写入关联表 +4. **去重**:用 `source_id`(app_job_data.id)查 `bg_job`,已存在则跳过,更新 `clean_status=2` +5. **写入 bg_job**:组装所有字段,`source_id`=app_job_data.id,`source_url`=detail_url,`status=0`(上架) +6. **写入 bg_job_region_relation**:岗位ID + 匹配到的region_code,一岗多地区 +7. **更新 app_job_data.clean_status=2** + +步骤 2-7 放在一个短事务中,保证数据一致性。 + +### 2.5 设计决策记录 + +| 决策点 | 结论 | 原因 | +|--------|------|------| +| AI调用次数 | 一次调用返回全部字段 | 减少API调用成本和延迟 | +| 分类/行业列表怎么给AI | 直接传 id:name 文本 | ID人工维护为短数字,token消耗可控 | +| 地区匹配方式 | AI输出城市名,Java侧匹配 | 城市名无歧义,不需要传参考列表 | +| categoryId 是否可空 | 不可空,必须选一个 | 岗位分类是核心维度 | +| industryId 何时设置 | 仅描述中明确提到行业经验时 | 行业经验是任职要求,不是所有岗位都有 | +| tags 定位 | 核心职能标签,最多5个 | 区别于福利标签,体现岗位核心能力要求 | +| skillTags 数量 | 最多8个 | 控制数量,保持精炼 | +| source_id 取值 | app_job_data.id | 简单直接,用于去重 | +| 公司不存在时 | 自动创建 status=0 待完善 | 后续由公司数据补充逻辑完善 | + +--- + +## 三、公司数据触发逻辑 + +### 3.1 状态扩展 + +`bg_company.status` 扩展为4个值: +- 0=待完善:岗位清洗时自动创建的公司,只有 short_name +- 1=已完善:工商API补充完成 +- 2=禁用:人工标记禁用 +- 3=补充中:定时任务已锁定,正在调用工商API + +### 3.2 两个定时任务(与岗位清洗同一套模式) + +#### 任务C:公司数据补充任务(低频,每小时) + +1. 批量锁定(原子操作): +```sql +UPDATE bg_company SET status=3, update_time=NOW() WHERE status=0 LIMIT N +``` +⚠️ 锁定时必须同时更新 `update_time`,因为 `bg_company` 的 `update_time` 不像 `app_job_data.updated_at` 那样由数据库自动维护,需要 Java 侧手动设值。如果不更新,后续僵尸恢复任务无法正确判断超时。 + +2. 将锁定的数据丢入线程池,多线程并发调用工商API +3. 每条处理完毕后,回填公司信息,更新 `status=1`(已完善) +4. 工商API查不到或返回异常 → 保持 `status=3`,由僵尸恢复任务重置 + +#### 任务D:公司僵尸恢复任务(低频,每小时,与任务C错开) + +处理因发布重启导致卡在"补充中"的僵尸数据: + +```sql +UPDATE bg_company SET status=0 WHERE status=3 AND update_time < NOW() - INTERVAL 10 MINUTE +``` + +超时10分钟仍在"补充中"的数据重置为待完善,下次任务C会重新捞取处理。 + +### 3.3 与岗位清洗触发逻辑的对比 + +| 对比项 | 岗位清洗 | 公司补充 | +|--------|----------|----------| +| 状态字段 | app_job_data.clean_status | bg_company.status | +| 锁定值 | 1=清洗中 | 3=补充中 | +| 完成值 | 2=已入库 / 3=已丢弃 | 1=已完善 | +| 时间字段 | updated_at(数据库自动) | update_time(Java手动设值) | +| 锁定时是否需手动更新时间 | 不需要 | **需要**,否则僵尸恢复无法判断超时 | +| 触发频率 | 每5分钟 | 每小时 | +| 僵尸恢复频率 | 每30分钟 | 每小时(与任务C错开) | + +### 3.4 设计决策记录 + +| 决策点 | 结论 | 原因 | +|--------|------|------| +| 是否与岗位清洗同步触发 | 否,独立定时任务 | 外部API不同,频率不同,失败场景不同 | +| 触发模式 | 复用岗位清洗的"原子锁定+僵尸恢复"模式 | 统一架构,代码可复用 | +| 锁定时是否更新时间 | 是 | bg_company.update_time 非数据库自动维护,不更新则僵尸恢复失效 | +| 补充频率 | 每小时 | 公司数据量少,工商API可能有频率限制 | + +--- + +## 四、公司数据补充逻辑(待定API后补充) + +### 4.1 补充流程概要 + +1. 用 `short_name`(公司简称)调用工商API搜索 +2. API返回匹配的企业列表,取最匹配的一条 +3. 回填 `bg_company` 各字段,更新 `status=1` + +### 4.2 需要回填的字段 + +| bg_company 字段 | 来源 | 说明 | +|-----------------|------|------| +| name | 工商API | 公司全称 | +| logoUrl | 待定 | 工商API可能不提供,需另外来源 | +| regionCode | 工商API(注册地址) | 匹配 bg_china_regions_code | +| companyType | 工商API | 上市企业、独角兽、国企等 | +| industryId | 工商API(行业分类) | 匹配 bg_industry | +| tags | 工商API / AI | 公司标签,JSON数组 | +| summary | 工商API / AI | 公司简要介绍 | +| description | 工商API | 公司详细描述 | +| foundedYear | 工商API | 成立时间 | +| address | 工商API | 注册地址 | +| scale | 工商API | 企业规模(人数) | +| website | 工商API | 官网地址 | +| financingStage | 工商API / 其他来源 | 融资状态 | +| latestValuation | 工商API / 其他来源 | 最新估值 | +| news | 新闻API(待定) | 新闻动态,JSON数组 | + +### 4.3 待定事项 + +- [ ] 选定工商信息API(天眼查、企查查、爱企查等) +- [ ] 确认API返回字段与 bg_company 的映射关系 +- [ ] 新闻动态数据来源(工商API是否包含,还是需要单独的新闻API) +- [ ] logoUrl 来源(工商API是否提供) +- [ ] 匹配到多条结果时的处理策略 +- [ ] 查不到结果时的处理策略(保持待完善 or 标记为其他状态) +- [ ] API调用频率限制和成本评估 diff --git a/.kiro/steering/项目结构说明.md b/.kiro/steering/项目结构说明.md index 37c1dd9..2d2a5cd 100644 --- a/.kiro/steering/项目结构说明.md +++ b/.kiro/steering/项目结构说明.md @@ -45,6 +45,7 @@ offerpie/back-end │ ├─ aop/ # AOP 日志切面 │ ├─ exception/ # 业务异常统一处理 │ ├─ email/ # 邮件发送抽象(EmailAbility) +│ ├─ ai/ # AI 对话能力封装(AiChatAbility、AiChatConfig) │ ├─ wxPay/ # 微信支付相关能力(Js、Native、Transfer 等) │ ├─ pojo/ # 公共 POJO(统一响应、登录/防重放 token 等) │ └─ web/ # Spring MVC 全局响应体 advice @@ -73,7 +74,8 @@ offerpie/back-end │ ├─ SkillTagMapper.java # 技能标签Mapper │ ├─ UserJobFavoriteMapper.java # 用户收藏岗位Mapper │ ├─ UserJobApplicationMapper.java # 用户投递记录Mapper - │ └─ UserJobDislikeMapper.java # 用户不感兴趣记录Mapper + │ ├─ UserJobDislikeMapper.java # 用户不感兴趣记录Mapper + │ └─ AppJobDataMapper.java # 爬虫岗位原始数据Mapper ├─ pojo/ │ ├─ po/ # 持久化实体 │ │ ├─ User.java @@ -93,9 +95,10 @@ offerpie/back-end │ │ ├─ SkillTag.java # 技能标签表(bg_skill_tag) │ │ ├─ UserJobFavorite.java # 用户收藏岗位表(bg_user_job_favorite) │ │ ├─ UserJobApplication.java # 用户投递记录表(bg_user_job_application) - │ │ └─ UserJobDislike.java # 用户不感兴趣记录表(bg_user_job_dislike) + │ │ ├─ UserJobDislike.java # 用户不感兴趣记录表(bg_user_job_dislike) + │ │ └─ AppJobData.java # 爬虫岗位原始数据表(app_job_data) │ └─ vo/ # ViewObject(OssUrlVo 等) - └─ service/ # 业务 Service(OssService、SmsService 等) + └─ service/ # 业务 Service(OssService、SmsService、DictCacheService、JobCleanService、JobCleanTransactionService 等) ``` > **设计理念** – 业务实体和 Mapper 位于 `manager`,B 端和 C 端共享;C 端特有的注解、切面、权限服务、路由菜单服务位于 `client-api`,避免 B 端误用;`common` 提供统一的技术支撑。 @@ -104,7 +107,7 @@ offerpie/back-end |------|----------|-----------| | **client-api** | - 面向终端用户的 REST API
- 启动 Spring Boot 应用
- 短信验证码登录(含自动注册、邀请码绑定)
- **功能权限校验**:注解 + 切面 + 权限服务(校验、扣减、回退)
- **路由菜单**:获取用户有效菜单树 | `ClientApplication`、`LoginController`、`RouteMenuController`、`FuncPermission`、`FuncPermissionAspect`、`FuncPermissionService`、`RouteMenuService`、`UserRegisterService`、`RouteMenuVo` | | **common** | - **统一配置**:OSS、Redis、Security、WxPay、Sms 等
- **跨层工具**:HTTP、IP、认证、验证码、Redis Server 等
- **全局拦截/切面**:日志、TraceId、黑名单、SQL 打印
- **统一异常/响应**:`GlobalExceptionAdvice`、`UnifiedResponse`
- **业务抽象**:邮件发送、微信支付(Native/JS/Transfer)
- **公共 POJO**:登录令牌、防重放信息等 | `config/`, `tool/`, `interceptor/`, `aop/`, `exception/`, `email/`, `wxPay/`, `pojo/` | -| **manager** | - **业务实体**(`User`、`OssFile`、`UserInvite`、`RouteMenu`、`FuncPermission`、`UserRouteMenuStock`、`UserFuncPermissionStock`、`UserFuncUsageLog`、`ChinaRegionsCode`、`JobCategory`、`Company`、`Job`、`JobRegion`、`UserJobFavorite`、`UserJobApplication`)
- **MyBatis Mapper**(`UserMapper`、`OssFileMapper`、`UserInviteMapper`、`RouteMenuMapper`、`FuncPermissionMapper`、`UserRouteMenuStockMapper`、`UserFuncPermissionStockMapper`、`UserFuncUsageLogMapper`、`ChinaRegionsCodeMapper`、`JobCategoryMapper`、`CompanyMapper`、`JobMapper`、`JobRegionRelationMapper`、`UserJobFavoriteMapper`、`UserJobApplicationMapper`)
- **业务 API**:文件上传/下载、健康检查等
- **业务逻辑**:服务层、工具类等
- **既供 B 端 UI(待实现)使用,也供 C 端业务直接调用** | `controller/`, `mapper/`, `pojo/po/`, `pojo/vo/`, `service/`, `constant/` | +| **manager** | - **业务实体**(`User`、`OssFile`、`UserInvite`、`RouteMenu`、`FuncPermission`、`UserRouteMenuStock`、`UserFuncPermissionStock`、`UserFuncUsageLog`、`ChinaRegionsCode`、`JobCategory`、`Company`、`Job`、`JobRegionRelation`、`Industry`、`SkillTag`、`UserJobFavorite`、`UserJobApplication`、`UserJobDislike`、`AppJobData`)
- **MyBatis Mapper**(`UserMapper`、`OssFileMapper`、`UserInviteMapper`、`RouteMenuMapper`、`FuncPermissionMapper`、`UserRouteMenuStockMapper`、`UserFuncPermissionStockMapper`、`UserFuncUsageLogMapper`、`ChinaRegionsCodeMapper`、`JobCategoryMapper`、`CompanyMapper`、`JobMapper`、`JobRegionRelationMapper`、`IndustryMapper`、`SkillTagMapper`、`UserJobFavoriteMapper`、`UserJobApplicationMapper`、`UserJobDislikeMapper`、`AppJobDataMapper`)
- **业务 API**:文件上传/下载、健康检查等
- **业务逻辑**:服务层、工具类等
- **既供 B 端 UI(待实现)使用,也供 C 端业务直接调用** | `controller/`, `mapper/`, `pojo/po/`, `pojo/vo/`, `service/`, `constant/` | ## 3️⃣ 关键业务实体 | 实体 | 所属模块 | 作用概述 | @@ -130,6 +133,7 @@ offerpie/back-end | `Industry` | manager | 行业字典表(bg_industry),树形结构,一级/二级分类。 | | `SkillTag` | manager | 技能标签表(bg_skill_tag),挂在岗位类型下,不分级,用于匹配度计算。 | | `UserJobDislike` | manager | 用户不感兴趣记录表(bg_user_job_dislike),记录用户对岗位的不感兴趣原因,冗余公司ID/地区编码/行业ID方便推荐过滤。 | +| `AppJobData` | manager | 爬虫岗位原始数据表(app_job_data),存储爬虫抓取的原始岗位数据,供清洗服务读取并写入业务表。 | ## 4️⃣ 权限体系设计 ### 整体架构 @@ -169,6 +173,7 @@ offerpie/back-end | **配置** | `OssConfig`, `RedissonConf`, `SecurityConfig`, `WxPayConfig`, `SmsConfig` | `common/config` | | **安全** | JWT 过滤器、登录令牌 (`RedisLoginTokenInfo`)、防重放 (`RedisPreventReplayInfo`) | `common/interceptor`、`common/pojo/interceptor` | | **邮件** | `EmailAbility`(封装邮件发送) | `common/email` | +| **AI** | `AiChatAbility`(OpenAI 兼容多供应商对话)、`AiChatConfig`(供应商配置) | `common/ai` | | **微信支付** | `WxJsPayAbility`, `WxNativePayAbility`, `WxTransferPayAbility`, `WxPayNotifyController` | `common/wxPay` | | **全局异常** | `GlobalExceptionAdvice`, `BusinessException`, `BusinessExpCodeEnum` | `common/exception` | | **日志 & AOP** | `ControllerLogAspect`, `LoggingOriginalRequestFilter`, `SqlLoggerInterceptor` | `common/aop`, `common/interceptor` | diff --git a/client-api/src/main/resources/application-local.yml b/client-api/src/main/resources/application-local.yml index 6ed66ef..ecd9045 100644 --- a/client-api/src/main/resources/application-local.yml +++ b/client-api/src/main/resources/application-local.yml @@ -73,4 +73,18 @@ app: #开放接口 ignore: - urls: "/public/**" \ No newline at end of file + urls: "/public/**" + + # AI 多供应商配置,第一个为默认 provider + # base-url 配到版本路径,如 DeepSeek: https://api.deepseek.com/v1,豆包: https://ark.cn-beijing.volces.com/api/v3 + ai: + providers: + job-clean: + base-url: ${AI_BASE_URL:https://ark.cn-beijing.volces.com/api/v3} + api-key: ${AI_API_KEY:fd065993-bee2-4f31-8bf2-56d5d3012c02} + model: ${AI_MODEL:doubao-seed-2-0-lite-260215} + + # 岗位清洗配置 + job-clean: + batch-size: 20 + thread-pool-size: 5 \ No newline at end of file diff --git a/common/src/main/java/org/jiayunet/ai/AiChatAbility.java b/common/src/main/java/org/jiayunet/ai/AiChatAbility.java new file mode 100644 index 0000000..76b1414 --- /dev/null +++ b/common/src/main/java/org/jiayunet/ai/AiChatAbility.java @@ -0,0 +1,77 @@ +package org.jiayunet.ai; + +import com.fasterxml.jackson.databind.JsonNode; +import lombok.extern.slf4j.Slf4j; +import org.jiayunet.tool.HttpTool; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * AI 对话能力封装(OpenAI 兼容) + *

支持多供应商配置,不传 key 时使用第一个 provider

+ * + * @author zk + */ +@Slf4j +@Component +public class AiChatAbility { + + @Autowired + private AiChatConfig aiChatConfig; + + /** + * 使用默认 provider 发送对话 + *

默认取 providers 配置中的第一个

+ */ + public String chat(String systemPrompt, String userMessage) { + String defaultKey = aiChatConfig.getProviders().keySet().stream() + .findFirst() + .orElseThrow(() -> new RuntimeException("未配置任何 AI provider")); + return chat(defaultKey, systemPrompt, userMessage); + } + + /** + * 使用指定 provider 发送对话 + * + * @param providerKey 供应商标识,对应 yml 中 providers 的 key + * @param systemPrompt 系统提示词 + * @param userMessage 用户消息 + * @return AI 返回的文本内容 + */ + public String chat(String providerKey, String systemPrompt, String userMessage) { + AiChatConfig.ProviderConfig config = aiChatConfig.getProviders().get(providerKey); + if (config == null) { + throw new RuntimeException("AI provider 不存在: " + providerKey); + } + + String url = config.getBaseUrl() + "/chat/completions"; + log.info("AI 请求 URL: {}, model: {}", url, config.getModel()); + + Map body = new HashMap<>(); + body.put("model", config.getModel()); + body.put("messages", List.of( + Map.of("role", "system", "content", systemPrompt), + Map.of("role", "user", "content", userMessage) + )); + + Map headers = new HashMap<>(); + headers.put("Authorization", "Bearer " + config.getApiKey()); + + try { + String response = HttpTool.sendJsonPost(body, url, headers); + JsonNode root = HttpTool.objectMapper.readTree(response); + String content = root.path("choices").path(0).path("message").path("content").asText(); + if (content == null || content.isBlank()) { + throw new RuntimeException("AI 返回内容为空"); + } + return content; + } catch (Exception e) { + log.error("AI 调用失败, provider={}, model={}", providerKey, config.getModel(), e); + throw new RuntimeException("AI 调用失败: " + e.getMessage(), e); + } + } +} diff --git a/common/src/main/java/org/jiayunet/ai/AiChatConfig.java b/common/src/main/java/org/jiayunet/ai/AiChatConfig.java new file mode 100644 index 0000000..2f10cef --- /dev/null +++ b/common/src/main/java/org/jiayunet/ai/AiChatConfig.java @@ -0,0 +1,33 @@ +package org.jiayunet.ai; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * AI 多供应商配置 + *

读取 app.ai.providers,第一个为默认 provider

+ * + * @author zk + */ +@Data +@Component +@ConfigurationProperties(prefix = "app.ai") +public class AiChatConfig { + + /** 供应商配置,key 为供应商标识,第一个为默认 */ + private Map providers = new LinkedHashMap<>(); + + @Data + public static class ProviderConfig { + /** API 地址 */ + private String baseUrl; + /** API Key */ + private String apiKey; + /** 模型名称 */ + private String model; + } +} diff --git a/common/src/main/java/org/jiayunet/tool/HttpTool.java b/common/src/main/java/org/jiayunet/tool/HttpTool.java index b3bff75..4608614 100644 --- a/common/src/main/java/org/jiayunet/tool/HttpTool.java +++ b/common/src/main/java/org/jiayunet/tool/HttpTool.java @@ -76,7 +76,7 @@ public class HttpTool { if (response.getStatusLine().getStatusCode() == 200) { return bodyStr; } - throw new IOException("Http请求出现错误,响应码:" + response.getStatusLine().getStatusCode()); + throw new IOException("Http请求出现错误,响应码:" + response.getStatusLine().getStatusCode() + ",响应体:" + bodyStr); } } catch (IOException e) { diff --git a/manager/src/main/java/org/jiayunet/mapper/AppJobDataMapper.java b/manager/src/main/java/org/jiayunet/mapper/AppJobDataMapper.java new file mode 100644 index 0000000..675e23c --- /dev/null +++ b/manager/src/main/java/org/jiayunet/mapper/AppJobDataMapper.java @@ -0,0 +1,13 @@ +package org.jiayunet.mapper; + +import org.apache.ibatis.annotations.Mapper; +import org.jiayunet.pojo.po.AppJobData; + +/** + * 爬虫岗位原始数据Mapper + * + * @author zk + */ +@Mapper +public interface AppJobDataMapper extends CommonMapper { +} diff --git a/manager/src/main/java/org/jiayunet/pojo/po/AppJobData.java b/manager/src/main/java/org/jiayunet/pojo/po/AppJobData.java new file mode 100644 index 0000000..8de648c --- /dev/null +++ b/manager/src/main/java/org/jiayunet/pojo/po/AppJobData.java @@ -0,0 +1,79 @@ +package org.jiayunet.pojo.po; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.time.Instant; + +/** + * 爬虫岗位原始数据表(app_job_data) + *

存储爬虫抓取的原始岗位数据,供清洗服务读取并写入业务表

+ * + * @author zk + */ +@Data +@TableName(value = "app_job_data") +public class AppJobData { + + @TableId(type = IdType.AUTO) + private Long id; + + /** 关联爬取任务ID */ + private Long taskCrawlId; + + /** 职位名称 */ + private String jobTitle; + + /** 薪资 */ + private String salary; + + /** 工作地点 */ + private String location; + + /** 公司名称 */ + private String company; + + /** 经验要求 */ + private String experience; + + /** 学历要求 */ + private String education; + + /** 岗位详情(职责+要求+介绍) */ + private String description; + + /** 详情页URL */ + private String detailUrl; + + /** 内容哈希值,用于查重 */ + private String contentHash; + + /** 数据来源 0=官网 1=平台 */ + private Integer sources; + + /** 是否独立URL 0=页内展示 1=独立页面 */ + private Integer isIndependentUrl; + + /** 是否有效 0=无效 1=有效 */ + private Integer isValid; + + /** 有效期 */ + private Instant expireAt; + + /** 验证状态 pending=待验证 checking=验证中 checked=已验证 */ + private String checkStatus; + + /** 清洗状态 0=待清洗 1=清洗中 2=已入库 3=已丢弃 */ + private Integer cleanStatus; + + /** 上次验证时间 */ + private Instant lastCheckAt; + + /** 创建时间 */ + private Instant createdAt; + + /** 更新时间 */ + private Instant updatedAt; +} diff --git a/manager/src/main/java/org/jiayunet/pojo/po/Company.java b/manager/src/main/java/org/jiayunet/pojo/po/Company.java index 8e2b47a..976d8f4 100644 --- a/manager/src/main/java/org/jiayunet/pojo/po/Company.java +++ b/manager/src/main/java/org/jiayunet/pojo/po/Company.java @@ -67,7 +67,7 @@ public class Company { /** 新闻动态(JSON数组) */ private String news; - /** 状态 0=待完善 1=已完善 2=禁用 */ + /** 状态 0=待完善 1=已完善 2=禁用 3=补充中 */ private Integer status; /** 创建时间 */ diff --git a/manager/src/main/java/org/jiayunet/service/DictCacheService.java b/manager/src/main/java/org/jiayunet/service/DictCacheService.java new file mode 100644 index 0000000..e7350b2 --- /dev/null +++ b/manager/src/main/java/org/jiayunet/service/DictCacheService.java @@ -0,0 +1,124 @@ +package org.jiayunet.service; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import lombok.extern.slf4j.Slf4j; +import org.jiayunet.mapper.ChinaRegionsCodeMapper; +import org.jiayunet.mapper.IndustryMapper; +import org.jiayunet.mapper.JobCategoryMapper; +import org.jiayunet.pojo.po.ChinaRegionsCode; +import org.jiayunet.pojo.po.Industry; +import org.jiayunet.pojo.po.JobCategory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * 字典数据缓存服务 + *

启动时加载岗位分类、行业、地区数据到内存,供清洗/推荐等业务使用

+ *

依赖:JobCategoryMapper、IndustryMapper、ChinaRegionsCodeMapper

+ *

使用表:bg_job_category(全量缓存)、bg_industry(全量缓存)、bg_china_regions_code(市级缓存)

+ * + * @author zk + */ +@Slf4j +@Service +public class DictCacheService { + + @Autowired + private JobCategoryMapper jobCategoryMapper; + + @Autowired + private IndustryMapper industryMapper; + + @Autowired + private ChinaRegionsCodeMapper chinaRegionsCodeMapper; + + private List jobCategoryList; + private List industryList; + private List regionList; + + /** 岗位分类文本(叶子节点,带父级路径),供 AI prompt 使用 */ + private String jobCategoryText; + /** 行业文本(叶子节点,带父级路径),供 AI prompt 使用 */ + private String industryText; + + /** + * 启动时加载全量字典数据 + *

分类/行业全量加载用于构建父级路径,文本只取叶子节点

+ */ + @PostConstruct + public void refresh() { + log.info("开始加载字典缓存..."); + + jobCategoryList = jobCategoryMapper.selectList(null); + industryList = industryMapper.selectList(null); + + // 只缓存省级+市级地区(provinceCode 为 null 是省,provinceCode 不为 null 且 cityCode 为 null 是市) + regionList = chinaRegionsCodeMapper.selectList( + new LambdaQueryWrapper() + .isNull(ChinaRegionsCode::getCityCode) + ); + + // 构建岗位分类文本:只取三级(叶子),格式 id:name(一级/二级) + Map categoryNameMap = jobCategoryList.stream() + .collect(Collectors.toMap(JobCategory::getId, JobCategory::getName)); + + jobCategoryText = jobCategoryList.stream() + .filter(c -> c.getLevel() == 3) + .map(c -> { + String parentName = categoryNameMap.getOrDefault(c.getParentId(), ""); + String rootName = categoryNameMap.getOrDefault(c.getRootId(), ""); + return c.getId() + ":" + c.getName() + "(" + rootName + "/" + parentName + ")"; + }) + .collect(Collectors.joining(", ")); + + // 构建行业文本:只取二级(叶子),格式 id:name(一级) + Map industryNameMap = industryList.stream() + .collect(Collectors.toMap(Industry::getId, Industry::getName)); + industryText = industryList.stream() + .filter(i -> i.getLevel() == 2) + .map(i -> { + String parentName = industryNameMap.getOrDefault(i.getParentId(), ""); + return i.getId() + ":" + i.getName() + "(" + parentName + ")"; + }) + .collect(Collectors.joining(", ")); + + long categoryLeafCount = jobCategoryList.stream().filter(c -> c.getLevel() == 3).count(); + long industryLeafCount = industryList.stream().filter(i -> i.getLevel() == 2).count(); + log.info("字典缓存加载完成: 岗位分类{}条(叶子{}条), 行业{}条(叶子{}条), 地区{}条", + jobCategoryList.size(), categoryLeafCount, industryList.size(), industryLeafCount, regionList.size()); + } + + /** 获取岗位分类文本(叶子节点,带父级路径,逗号分隔) */ + public String getJobCategoryText() { + return jobCategoryText; + } + + /** 获取行业文本(叶子节点,带父级路径,逗号分隔) */ + public String getIndustryText() { + return industryText; + } + + /** + * 根据城市名匹配地区编码 + *

模糊匹配,如"北京"匹配"北京市"

+ * + * @param cityName 城市名 + * @return region_code,匹配不上返回 null + */ + public String matchRegionCode(String cityName) { + if (cityName == null || cityName.isBlank()) { + return null; + } + String name = cityName.replace("市", "").replace("省", "").trim(); + return regionList.stream() + .filter(r -> r.getName().contains(name) || name.contains(r.getName().replace("市", "").replace("省", ""))) + .map(ChinaRegionsCode::getCode) + .findFirst() + .orElse(null); + } +} diff --git a/manager/src/main/java/org/jiayunet/service/JobCleanService.java b/manager/src/main/java/org/jiayunet/service/JobCleanService.java new file mode 100644 index 0000000..17a6526 --- /dev/null +++ b/manager/src/main/java/org/jiayunet/service/JobCleanService.java @@ -0,0 +1,248 @@ +package org.jiayunet.service; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.fasterxml.jackson.databind.JsonNode; +import lombok.extern.slf4j.Slf4j; +import org.jiayunet.ai.AiChatAbility; +import org.jiayunet.mapper.AppJobDataMapper; +import org.jiayunet.mapper.JobMapper; +import org.jiayunet.pojo.po.AppJobData; +import org.jiayunet.pojo.po.Job; +import org.jiayunet.tool.HttpTool; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * 岗位清洗服务 + *

定时从 app_job_data 捞取待清洗数据,调用 AI 清洗后写入业务表

+ *

依赖:AiChatAbility(AI调用)、DictCacheService(字典缓存)、JobCleanTransactionService(事务操作)

+ *

使用表:app_job_data(读取/更新状态)、bg_job(去重查询)

+ * + * @author zk + */ +@Slf4j +@Service +public class JobCleanService { + + @Autowired + private AiChatAbility aiChatAbility; + + @Autowired + private DictCacheService dictCacheService; + + @Autowired + private JobCleanTransactionService jobCleanTransactionService; + + @Autowired + private AppJobDataMapper appJobDataMapper; + + @Autowired + private JobMapper jobMapper; + + @Value("${app.job-clean.batch-size:20}") + private int batchSize; + + @Value("${app.job-clean.thread-pool-size:5}") + private int threadPoolSize; + + private ExecutorService executorService; + + /** 初始化线程池 */ + @javax.annotation.PostConstruct + public void init() { + executorService = Executors.newFixedThreadPool(threadPoolSize); + } + + /** + * 定时任务A:岗位清洗(每5分钟) + *

1. 批量锁定待清洗数据 2. 多线程并发调用AI清洗 3. 写入业务表

+ */ + @Scheduled(cron = "0 */1 * * * ?") + public void cleanJob() { + // 批量锁定:原子操作,clean_status 0→1 + int locked = appJobDataMapper.update(null, + new LambdaUpdateWrapper() + .set(AppJobData::getCleanStatus, 1) + .eq(AppJobData::getCleanStatus, 0) + .eq(AppJobData::getIsValid, 1) + .last("LIMIT " + batchSize)); + + if (locked == 0) { + return; + } + log.info("岗位清洗:锁定{}条数据", locked); + + // 查出刚锁定的数据 + List dataList = appJobDataMapper.selectList( + new LambdaQueryWrapper() + .eq(AppJobData::getCleanStatus, 1) + .eq(AppJobData::getIsValid, 1) + .last("LIMIT " + batchSize)); + + // 多线程并发处理 + for (AppJobData data : dataList) { + executorService.submit(() -> { + try { + cleanOne(data); + } catch (Exception e) { + log.error("岗位清洗异常, id={}", data.getId(), e); + // 异常时保持 clean_status=1,由僵尸恢复任务重置 + } + }); + } + } + + /** + * 定时任务B:僵尸恢复(每30分钟) + *

将超时10分钟仍在清洗中的数据重置为待清洗

+ */ + @Scheduled(cron = "0 */30 * * * ?") + public void recoverZombie() { + int recovered = appJobDataMapper.update(null, + new LambdaUpdateWrapper() + .set(AppJobData::getCleanStatus, 0) + .eq(AppJobData::getCleanStatus, 1) + .lt(AppJobData::getUpdatedAt, Instant.now().minusSeconds(600))); + + if (recovered > 0) { + log.info("僵尸恢复:重置{}条数据", recovered); + } + } + + /** + * 清洗单条岗位数据 + *

1. 前置校验 2. 拼prompt调AI 3. 解析结果 4. 写入业务表

+ */ + public void cleanOne(AppJobData data) { + // 1. 前置校验 + if (data.getDescription() == null || data.getDescription().length() < 20) { + jobCleanTransactionService.updateCleanStatus(data.getId(), 3); + return; + } + + // 2. 拼 prompt + String systemPrompt = buildSystemPrompt(); + String userMessage = buildUserMessage(data); + + // 3. 调用 AI + String aiResponse = aiChatAbility.chat(systemPrompt, userMessage); + + // 4. 解析 JSON + try { + // 去掉可能的 markdown 代码块标记 + String json = aiResponse.trim(); + if (json.startsWith("```")) { + json = json.replaceAll("^```\\w*\\n?", "").replaceAll("\\n?```$", "").trim(); + } + + JsonNode root = HttpTool.objectMapper.readTree(json); + + // valid 校验 + if (!root.path("valid").asBoolean(false)) { + jobCleanTransactionService.updateCleanStatus(data.getId(), 3); + return; + } + + // 5. 去重检查 + String sourceId = String.valueOf(data.getId()); + Long existJob = jobMapper.selectCount( + new LambdaQueryWrapper().eq(Job::getSourceId, sourceId)); + if (existJob > 0) { + jobCleanTransactionService.updateCleanStatus(data.getId(), 2); + return; + } + + // 6. 公司处理(加锁防并发重复) + String companyShortName = root.path("companyShortName").asText(""); + if (companyShortName.isBlank()) { + companyShortName = data.getCompany(); + } + Long companyId = jobCleanTransactionService.findOrCreateCompany(companyShortName); + + // 7. 地区处理 + List regionCodes = new ArrayList<>(); + JsonNode citiesNode = root.path("cities"); + if (citiesNode.isArray()) { + for (JsonNode city : citiesNode) { + String code = dictCacheService.matchRegionCode(city.asText()); + if (code != null) { + regionCodes.add(code); + } + } + } + + // 8. 写入业务表(短事务,通过独立Service保证@Transactional生效) + jobCleanTransactionService.saveJobData(root, data, companyId, sourceId, regionCodes); + + } catch (Exception e) { + log.error("AI 返回解析失败, id={}, response={}", data.getId(), aiResponse, e); + // 保持 clean_status=1,由僵尸恢复任务重置 + } + } + + /** 构建系统提示词 */ + private String buildSystemPrompt() { + return """ + 你是一个岗位数据清洗助手。请根据提供的原始岗位数据,提取并结构化为JSON格式。 + + 返回JSON格式要求: + { + "valid": true/false, + "title": "岗位名称", + "salary": "标准化薪资,如10-20K、面议,无效则null", + "education": 0-4的数字(0=不限 1=大专 2=本科 3=硕士 4=博士), + "minExperience": 最低工作年限数字(不要求则0), + "employmentType": 0或1(0=全职 1=兼职,默认0), + "categoryId": 岗位分类ID(必选,从分类列表中选最接近的), + "industryId": 行业ID(仅当明确提到行业经验要求时设置,列表中无完全匹配则选最相似的,未提到则null), + "description": "岗位职责,保持原文风格,格式化展示", + "requirement": "任职要求,保持原文风格,格式化展示", + "bonus": "加分项,无则null", + "tags": ["核心职能标签,最多5个,如数据分析、产品策略"], + "skillTags": ["技能关键词,最多8个,如Java、Spring Boot"], + "companyShortName": "简洁的公司简称,如字节跳动、中国平安", + "cities": ["工作城市列表,精确到市"] + } + + 规则: + 1. description/requirement/bonus 均从原始的 description+experience+education 内容中提取,保持原文风格 + 2. 岗位标题不存在时,从描述中归纳生成 + 3. 薪资标准化为 10-20K、20K、面议 等格式,无效或空则null + 4. categoryId 必须从分类列表中选一个,不允许为null + 5. industryId 仅当描述中明确提到行业经验要求时设置 + 6. tags 是核心职能标签(如数据分析、团队协作),最多5个 + 7. skillTags 是技能关键词(如Java、MySQL),最多8个 + 8. companyShortName 去掉地区后缀、招聘后缀、括号内容,保持简洁 + 9. 只返回JSON,不要其他内容 + """; + } + + /** 构建用户消息 */ + private String buildUserMessage(AppJobData data) { + StringBuilder sb = new StringBuilder(); + sb.append("【原始数据】\n"); + sb.append("岗位名称: ").append(nullToEmpty(data.getJobTitle())).append("\n"); + sb.append("薪资: ").append(nullToEmpty(data.getSalary())).append("\n"); + sb.append("工作地点: ").append(nullToEmpty(data.getLocation())).append("\n"); + sb.append("公司: ").append(nullToEmpty(data.getCompany())).append("\n"); + sb.append("经验要求: ").append(nullToEmpty(data.getExperience())).append("\n"); + sb.append("学历要求: ").append(nullToEmpty(data.getEducation())).append("\n"); + sb.append("岗位详情: ").append(nullToEmpty(data.getDescription())).append("\n\n"); + sb.append("【岗位分类列表】\n").append(dictCacheService.getJobCategoryText()).append("\n\n"); + sb.append("【行业列表】\n").append(dictCacheService.getIndustryText()); + return sb.toString(); + } + + private String nullToEmpty(String s) { + return s == null ? "" : s; + } +} diff --git a/manager/src/main/java/org/jiayunet/service/JobCleanTransactionService.java b/manager/src/main/java/org/jiayunet/service/JobCleanTransactionService.java new file mode 100644 index 0000000..4102c9c --- /dev/null +++ b/manager/src/main/java/org/jiayunet/service/JobCleanTransactionService.java @@ -0,0 +1,123 @@ +package org.jiayunet.service; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.fasterxml.jackson.databind.JsonNode; +import lombok.extern.slf4j.Slf4j; +import org.jiayunet.mapper.AppJobDataMapper; +import org.jiayunet.mapper.CompanyMapper; +import org.jiayunet.mapper.JobMapper; +import org.jiayunet.mapper.JobRegionRelationMapper; +import org.jiayunet.pojo.po.AppJobData; +import org.jiayunet.pojo.po.Company; +import org.jiayunet.pojo.po.Job; +import org.jiayunet.pojo.po.JobRegionRelation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.Instant; +import java.util.List; + +/** + * 岗位清洗事务服务 + *

独立出来解决 @Transactional 同类自调用失效问题

+ *

依赖:JobMapper、CompanyMapper、JobRegionRelationMapper、AppJobDataMapper

+ *

使用表:bg_job(写入)、bg_company(查询/创建)、bg_job_region_relation(写入)、app_job_data(更新状态)

+ * + * @author zk + */ +@Slf4j +@Service +public class JobCleanTransactionService { + + @Autowired + private JobMapper jobMapper; + + @Autowired + private CompanyMapper companyMapper; + + @Autowired + private JobRegionRelationMapper jobRegionRelationMapper; + + @Autowired + private AppJobDataMapper appJobDataMapper; + + /** + * 写入 bg_job + bg_job_region_relation + 更新 clean_status(短事务) + */ + @Transactional(rollbackFor = Exception.class) + public void saveJobData(JsonNode root, AppJobData data, Long companyId, String sourceId, List regionCodes) { + Job job = new Job(); + job.setTitle(root.path("title").asText("")); + job.setCompanyId(companyId); + job.setCategoryId(root.path("categoryId").asLong(0)); + job.setEmploymentType(root.path("employmentType").asInt(0)); + job.setDescription(root.path("description").asText("")); + job.setRequirement(root.path("requirement").asText("")); + job.setBonus(root.path("bonus").asText(null)); + job.setTags(root.path("tags").toString()); + job.setSkillTags(root.path("skillTags").toString()); + + String salary = root.path("salary").asText(null); + job.setSalary("null".equals(salary) ? null : salary); + + job.setEducation(root.path("education").asInt(0)); + job.setMinExperience(root.path("minExperience").asInt(0)); + + Long industryId = root.path("industryId").asLong(0); + job.setIndustryId(industryId == 0 ? null : industryId); + + job.setSourceUrl(data.getDetailUrl()); + job.setSourceId(sourceId); + job.setStatus(0); + job.setCreateTime(Instant.now()); + job.setUpdateTime(Instant.now()); + + jobMapper.insert(job); + + // 写入岗位-地区关联 + for (String regionCode : regionCodes) { + JobRegionRelation relation = new JobRegionRelation(); + relation.setJobId(job.getId()); + relation.setRegionCode(regionCode); + relation.setCreateTime(Instant.now()); + jobRegionRelationMapper.insert(relation); + } + + // 更新清洗状态 + updateCleanStatus(data.getId(), 2); + } + + /** + * 查找或创建公司(加锁防并发重复) + *

按 short_name 查询,不存在则创建一条待完善记录

+ */ + public synchronized Long findOrCreateCompany(String shortName) { + Company company = companyMapper.selectOne( + new LambdaQueryWrapper() + .eq(Company::getShortName, shortName) + .last("LIMIT 1")); + + if (company != null) { + return company.getId(); + } + + Company newCompany = new Company(); + newCompany.setName(shortName); + newCompany.setShortName(shortName); + newCompany.setStatus(0); + newCompany.setCreateTime(Instant.now()); + newCompany.setUpdateTime(Instant.now()); + companyMapper.insert(newCompany); + return newCompany.getId(); + } + + /** 更新清洗状态 */ + public void updateCleanStatus(Long id, int status) { + appJobDataMapper.update(null, + new LambdaUpdateWrapper() + .set(AppJobData::getCleanStatus, status) + .eq(AppJobData::getId, id)); + } +}