补充文档,修改清洗方案

This commit is contained in:
zk
2026-06-02 18:38:36 +08:00
parent 30e6a6e2a5
commit 6967e4ba54
7 changed files with 363 additions and 80 deletions
+273
View File
@@ -0,0 +1,273 @@
---
inclusion: manual
---
# 数据清洗方案
## 总体架构
```
爬虫(本地网络) → PostgreSQL(app_job_data)
offerpie_job_cleaner(本地Python服务)
asyncio协程 + Semaphore信号量并发
哈希续命 或 三次AI调用
MySQL(bg_job + bg_company + 关联表 + bg_skill_tag)
公司信息不完整的 → 调AI补充
```
本服务部署在本地非公网环境,纯后台定时任务,无HTTP接口。通过APScheduler调度,asyncio协程提供并发能力。
---
## 一、岗位清洗
### 1.1 触发方式
- 定时:每3分钟触发一次(`clean_interval_seconds` 可配)
- 启动时立即执行第一次
### 1.2 状态管理
`app_job_data.clean_status`PostgreSQL):
- `pending` = 待清洗:新爬到的数据,默认值
- `cleaning` = 清洗中:已锁定,正在处理
- `cleaned` = 已清洗:处理完成(入库或续命)
- `discarded` = 已丢弃:AI判定无效或描述过短
### 1.3 批量锁定
```sql
SELECT * FROM app_job_data
WHERE clean_status = 'pending'
LIMIT {batch_size}
FOR UPDATE SKIP LOCKED
```
锁定后立即更新:
```sql
UPDATE app_job_data
SET clean_status = 'cleaning', clean_started_at = NOW()
WHERE id = ANY(ids)
```
使用 `FOR UPDATE SKIP LOCKED` 而非 `FOR UPDATE`,避免多实例部署时阻塞等待。
### 1.4 并发模型
- 一批 `batch_size`(默认100)条数据,全部启动协程
- `asyncio.Semaphore(concurrency)`(默认50)控制同时调AI的并发数
- 做完一条立刻让位给下一条,不存在批次内空等
### 1.5 单条清洗流程
```
1. 前置校验
└─ description 为空或长度 < 20 → discarded
2. 哈希去重续命(新增)
├─ 用 content_hash 查 bg_job
├─ 命中 → UPDATE bg_job SET status=0, create_time=NOW(), update_time=NOW()
├─ 更新 PG: cleaned
└─ return(不调AI,秒完成)
3. 第一次AI:结构化提取
├─ 输入:原始字段 + 岗位分类列表 + 行业列表
├─ 输出JSONvalid/title/salary/education/categoryId/cities/companyShortName/...
└─ valid=false → discarded
4. 公司处理
├─ 按 short_name 查 bg_company
├─ 存在 → 拿 company_id
└─ 不存在 → 创建一条 status=0 的记录(asyncio.Lock 防并发重复)
6. 地区匹配
└─ AI返回的 cities → 字典缓存模糊匹配 → region_codes
7. 写入 MySQL
├─ INSERT bg_job(带 content_hash、recruit_category、expire_at
└─ INSERT bg_job_region_relation
8. 更新 PG: cleaned
9. 第二次AI:专业匹配(失败不影响岗位入库)
├─ 输入:title + description + requirement + 专业列表(845条)
└─ 输出:requiredMajorIds + majorSensitivity → UPDATE bg_job
10. 第三次AI:技能提取(失败不影响岗位入库)
├─ 输入:title + description + requirement
└─ 输出:技能名数组 → INSERT IGNORE bg_skill_tag → INSERT bg_job_skill_tag_relation
```
### 1.6 哈希续命机制
**背景**:爬虫每隔7天全量刷新,大部分岗位内容未变。
**逻辑**
- `app_job_data.content_hash`:爬虫入库时对岗位内容生成的哈希值
- `bg_job.content_hash`:清洗入库时透传过来
- 清洗时优先用 hash 查 bg_job,命中说明岗位还活着
- 续命动作:`status=0`(重新上架)+ 刷新 `create_time`/`update_time`
- 不调AI,秒级完成,大幅节省时间和API费用
### 1.7 僵尸恢复(每30分钟)
```sql
UPDATE app_job_data
SET clean_status = 'pending', clean_started_at = NULL
WHERE clean_status = 'cleaning'
AND clean_started_at < NOW() - INTERVAL '10 minutes'
```
---
## 二、公司数据补充
### 2.1 触发方式
- 定时:每5分钟触发一次(`company_interval_seconds` 可配)
- 启动时立即执行第一次
### 2.2 状态管理
`bg_company.status`MySQL):
- 0 = 待完善:岗位清洗时创建,只有 short_name
- 1 = 已完善:AI补充完成
- 2 = 禁用
- 3 = 补充中:已锁定,正在处理
- 4 = 补充失败:AI不认识该公司
### 2.3 流程
```
1. 锁定一批 status=0 的公司(batch_size=20FOR UPDATE SKIP LOCKED
2. 批量更新 status=3
3. 协程并发(Semaphore=10
4. 每条:
├─ AI调用:short_name + 行业列表 → 公司详情JSON
├─ valid=false → status=4
└─ valid=true → 回填全部字段 → status=1
```
### 2.4 AI返回字段
```json
{
"valid": true,
"name": "公司全称",
"city": "总部城市",
"companyType": "企业类型",
"industryId": ID,
"tags": ["标签"],
"summary": "一句话简介",
"description": "详细描述",
"foundedYear": "成立年份",
"address": "地址",
"scale": "规模",
"website": "官网",
"financingStage": "融资状态",
"latestValuation": "估值",
"news": ["新闻"]
}
```
### 2.5 僵尸恢复(每小时)
```sql
UPDATE bg_company
SET status = 0, update_time = NOW()
WHERE status = 3
AND update_time < NOW() - INTERVAL 10 MINUTE
```
---
## 三、岗位下架
### 3.1 触发方式
- 每天凌晨2点执行
### 3.2 逻辑
```sql
UPDATE bg_job
SET status = 2, update_time = NOW()
WHERE status = 0
AND create_time < DATE_SUB(NOW(), INTERVAL {job_expire_days} DAY)
```
- `job_expire_days` 可配,默认7天
- 配合续命机制:活跃岗位每次刷新会被续命(create_time 刷新),所以不会被误下架
- 真正过期的岗位(7天内未再被爬到)才会下架
---
## 四、字典数据缓存
### 4.1 加载时机
- 服务启动时从 MySQL 全量加载到内存
### 4.2 缓存内容
| 数据 | 来源表 | 格式 | 用途 |
|------|--------|------|------|
| 岗位分类 | bg_job_categorylevel=3 | `id:name(一级/二级)` | 第一次AI prompt |
| 行业 | bg_industrylevel=2 | `id:name(一级)` | 第一次AI prompt + 公司补充 |
| 专业分类 | bg_major_categorylevel=3 | `id:name(一级/二级)` | 第二次AI prompt |
| 地区 | bg_china_regions_code(市级) | code + name | 城市名匹配地区编码 |
---
## 五、关键设计决策
| 决策点 | 结论 | 原因 |
|--------|------|------|
| 为什么从 Java 迁到 Python | asyncio 协程在 IO 密集场景下内存开销远小于 Java 线程 | 10万+数据量,50并发协程 vs 50线程 |
| 并发控制 | Semaphore 信号量 | 做完一条立刻补一条,吞吐量最大化 |
| 哈希续命 | content_hash 匹配免AI | 7天全量刷新时大部分是重复数据,节省90%+ AI调用 |
| 行锁方式 | FOR UPDATE SKIP LOCKED | 不阻塞,适合未来多实例部署 |
| 技能并发去重 | INSERT IGNORE + 唯一索引 | 不加应用层锁,数据库保证 |
| 公司创建并发 | asyncio.Lock | 防止同名公司被多个协程同时插入 |
| AI调用次数 | 三次(结构化 + 专业 + 技能) | 各维度独立,容错互不影响 |
| 公司数据来源 | AI补充 | 公司简称查工商API不精确,AI覆盖率高 |
| 事务跨库 | 不做分布式事务 | PG和MySQL独立操作,僵尸恢复兜底 |
| 岗位下架 | create_time + N天 | 配合续命机制,活跃岗位不会被误下架 |
---
## 六、配置参数
| 参数 | 默认值 | 说明 |
|------|--------|------|
| `clean_batch_size` | 100 | 每批锁定数据量 |
| `clean_concurrency` | 50 | AI并发调用上限 |
| `clean_interval_seconds` | 180 | 清洗任务间隔(秒) |
| `company_batch_size` | 20 | 公司补充每批数量 |
| `company_concurrency` | 10 | 公司AI并发上限 |
| `company_interval_seconds` | 300 | 公司补充间隔(秒) |
| `job_expire_days` | 7 | 岗位过期天数 |
所有参数通过 `.env` 文件配置,修改后重启生效。
---
## 七、涉及的表
| 表 | 库 | 读/写 | 说明 |
|----|-----|-------|------|
| `app_job_data` | PG | 读+写状态 | 爬虫原始数据,clean_status 流转 |
| `bg_job` | MySQL | 写+改 | 清洗后岗位(含 content_hash 用于续命) |
| `bg_company` | MySQL | 读+写+改 | 公司(查或创建 + AI补充) |
| `bg_job_region_relation` | MySQL | 写 | 岗位-地区关联 |
| `bg_skill_tag` | MySQL | 写(INSERT IGNORE) | 技能标签自动入库 |
| `bg_job_skill_tag_relation` | MySQL | 写 | 岗位-技能关联 |
| `bg_job_category` | MySQL | 读(缓存) | 岗位分类字典 |
| `bg_industry` | MySQL | 读(缓存) | 行业字典 |
| `bg_major_category` | MySQL | 读(缓存) | 专业字典 |
| `bg_china_regions_code` | MySQL | 读(缓存) | 地区字典 |
+19 -19
View File
@@ -14,23 +14,23 @@ class Company(MysqlBase):
__tablename__ = "bg_company" __tablename__ = "bg_company"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True) id: Mapped[int] = mapped_column(BigInteger, primary_key=True, comment="主键ID(雪花)")
name: Mapped[Optional[str]] = mapped_column(String(255)) name: Mapped[Optional[str]] = mapped_column(String(255), comment="公司全称")
short_name: Mapped[str] = mapped_column(String(128), nullable=False) short_name: Mapped[str] = mapped_column(String(128), nullable=False, comment="公司简称")
logo_url: Mapped[Optional[str]] = mapped_column(String(512)) logo_url: Mapped[Optional[str]] = mapped_column(String(512), comment="Logo地址")
region_code: Mapped[Optional[str]] = mapped_column(String(20)) region_code: Mapped[Optional[str]] = mapped_column(String(20), comment="地区编码")
company_type: Mapped[Optional[str]] = mapped_column(String(32)) company_type: Mapped[Optional[str]] = mapped_column(String(32), comment="企业类型:上市企业/独角兽/国企等")
industry_id: Mapped[Optional[int]] = mapped_column(BigInteger) industry_id: Mapped[Optional[int]] = mapped_column(BigInteger, comment="行业ID")
tags: Mapped[Optional[list]] = mapped_column(JSON) tags: Mapped[Optional[list]] = mapped_column(JSON, comment="公司标签JSON数组")
summary: Mapped[Optional[str]] = mapped_column(String(512)) summary: Mapped[Optional[str]] = mapped_column(String(512), comment="一句话简介")
description: Mapped[Optional[str]] = mapped_column(Text) description: Mapped[Optional[str]] = mapped_column(Text, comment="公司详细描述")
founded_year: Mapped[Optional[str]] = mapped_column(String(10)) founded_year: Mapped[Optional[str]] = mapped_column(String(10), comment="成立年份")
address: Mapped[Optional[str]] = mapped_column(String(255)) address: Mapped[Optional[str]] = mapped_column(String(255), comment="总部地址")
scale: Mapped[Optional[str]] = mapped_column(String(32)) scale: Mapped[Optional[str]] = mapped_column(String(32), comment="企业规模")
website: Mapped[Optional[str]] = mapped_column(String(255)) website: Mapped[Optional[str]] = mapped_column(String(255), comment="官网地址")
financing_stage: Mapped[Optional[str]] = mapped_column(String(32)) financing_stage: Mapped[Optional[str]] = mapped_column(String(32), comment="融资状态")
latest_valuation: Mapped[Optional[str]] = mapped_column(String(64)) latest_valuation: Mapped[Optional[str]] = mapped_column(String(64), comment="最新估值")
news: Mapped[Optional[list]] = mapped_column(JSON) news: Mapped[Optional[list]] = mapped_column(JSON, comment="相关新闻JSON数组")
status: Mapped[int] = mapped_column(Integer, default=0, comment="0=待完善 1=已完善 2=禁用 3=补充中 4=补充失败") status: Mapped[int] = mapped_column(Integer, default=0, comment="0=待完善 1=已完善 2=禁用 3=补充中 4=补充失败")
create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="创建时间")
update_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) update_time: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="更新时间")
+22 -21
View File
@@ -14,26 +14,27 @@ class Job(MysqlBase):
__tablename__ = "bg_job" __tablename__ = "bg_job"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True) id: Mapped[int] = mapped_column(BigInteger, primary_key=True, comment="主键ID(雪花)")
title: Mapped[str] = mapped_column(String(255), nullable=False) title: Mapped[str] = mapped_column(String(255), nullable=False, comment="岗位名称")
company_id: Mapped[int] = mapped_column(BigInteger, nullable=False) company_id: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="关联公司ID")
category_id: Mapped[int] = mapped_column(BigInteger, nullable=False) category_id: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="岗位类型ID")
employment_type: Mapped[int] = mapped_column(Integer, default=0) employment_type: Mapped[int] = mapped_column(Integer, default=0, comment="0=全职 1=兼职")
description: Mapped[Optional[str]] = mapped_column(Text) description: Mapped[Optional[str]] = mapped_column(Text, comment="岗位职责")
requirement: Mapped[Optional[str]] = mapped_column(Text) requirement: Mapped[Optional[str]] = mapped_column(Text, comment="任职要求")
bonus: Mapped[Optional[str]] = mapped_column(Text) bonus: Mapped[Optional[str]] = mapped_column(Text, comment="加分项")
tags: Mapped[Optional[list]] = mapped_column(JSON) tags: Mapped[Optional[list]] = mapped_column(JSON, comment="岗位标签JSON数组")
skill_tags: Mapped[Optional[list]] = mapped_column(JSON) skill_tags: Mapped[Optional[list]] = mapped_column(JSON, comment="技能标签JSON数组")
salary: Mapped[Optional[str]] = mapped_column(String(64)) salary: Mapped[Optional[str]] = mapped_column(String(64), comment="薪资描述,如15-25K")
education: Mapped[int] = mapped_column(Integer, default=0) education: Mapped[int] = mapped_column(Integer, default=0, comment="学历要求 0=不限 1=大专 2=本科 3=硕士 4=博士")
min_experience: Mapped[int] = mapped_column(Integer, default=0) min_experience: Mapped[int] = mapped_column(Integer, default=0, comment="最低工作年限,0=不要求")
required_industry_id: Mapped[Optional[int]] = mapped_column(BigInteger) required_industry_id: Mapped[Optional[int]] = mapped_column(BigInteger, comment="要求的行业经验ID")
required_major_ids: Mapped[Optional[list]] = mapped_column(JSON) required_major_ids: Mapped[Optional[list]] = mapped_column(JSON, comment="要求专业ID数组")
major_sensitivity: Mapped[Optional[int]] = mapped_column(Integer) major_sensitivity: Mapped[Optional[int]] = mapped_column(Integer, comment="专业敏感度 0=不限 1=优先 2=强制")
source_url: Mapped[Optional[str]] = mapped_column(String(1024)) source_url: Mapped[Optional[str]] = mapped_column(String(1024), comment="来源链接")
source_id: Mapped[Optional[str]] = mapped_column(String(64)) source_id: Mapped[Optional[str]] = mapped_column(String(64), comment="爬虫原始数据ID,用于去重")
recruit_category: Mapped[Optional[int]] = mapped_column(Integer, comment="招聘分类: 0=校招, 1=实习, 2=社招, 3=其他") content_hash: Mapped[Optional[str]] = mapped_column(String(64), comment="内容哈希,用于去重续命")
recruit_category: Mapped[Optional[int]] = mapped_column(Integer, comment="招聘分类: 0=校招 1=实习 2=社招 3=其他")
expire_at: Mapped[Optional[datetime]] = mapped_column(DateTime, comment="发布日期") expire_at: Mapped[Optional[datetime]] = mapped_column(DateTime, comment="发布日期")
status: Mapped[int] = mapped_column(Integer, default=0, comment="0=上架 1=下架 2=已失效") status: Mapped[int] = mapped_column(Integer, default=0, comment="0=上架 1=下架 2=已失效")
create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="创建时间")
update_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) update_time: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="更新时间")
+8 -8
View File
@@ -13,10 +13,10 @@ class JobRegionRelation(MysqlBase):
__tablename__ = "bg_job_region_relation" __tablename__ = "bg_job_region_relation"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(BigInteger, primary_key=True, comment="主键ID(雪花)")
job_id: Mapped[int] = mapped_column(BigInteger, nullable=False) job_id: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="岗位ID")
region_code: Mapped[str] = mapped_column(String(20), nullable=False) region_code: Mapped[str] = mapped_column(String(20), nullable=False, comment="地区编码")
create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="创建时间")
class JobSkillTagRelation(MysqlBase): class JobSkillTagRelation(MysqlBase):
@@ -24,7 +24,7 @@ class JobSkillTagRelation(MysqlBase):
__tablename__ = "bg_job_skill_tag_relation" __tablename__ = "bg_job_skill_tag_relation"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(BigInteger, primary_key=True, comment="主键ID(雪花)")
job_id: Mapped[int] = mapped_column(BigInteger, nullable=False) job_id: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="岗位ID")
skill_tag_id: Mapped[int] = mapped_column(BigInteger, nullable=False) skill_tag_id: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="技能标签ID")
create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="创建时间")
+2 -2
View File
@@ -11,5 +11,5 @@ class SkillTag(MysqlBase):
__tablename__ = "bg_skill_tag" __tablename__ = "bg_skill_tag"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True) id: Mapped[int] = mapped_column(BigInteger, primary_key=True, comment="主键ID(雪花)")
name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False) name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False, comment="标签名称(唯一索引)")
+18 -18
View File
@@ -3,7 +3,7 @@
from datetime import datetime from datetime import datetime
from typing import Optional from typing import Optional
from sqlalchemy import BigInteger, DateTime, Integer, SmallInteger, String, Text from sqlalchemy import BigInteger, DateTime, SmallInteger, String, Text
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
from app.core.database import PgBase from app.core.database import PgBase
@@ -14,22 +14,22 @@ class AppJobData(PgBase):
__tablename__ = "app_job_data" __tablename__ = "app_job_data"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True, comment="自增主键")
urllistid: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="关联urllistid") urllistid: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="关联urllistid")
job_title: Mapped[Optional[str]] = mapped_column(String(255)) job_title: Mapped[Optional[str]] = mapped_column(String(255), comment="职位名称")
salary: Mapped[Optional[str]] = mapped_column(String(128)) salary: Mapped[Optional[str]] = mapped_column(String(128), comment="薪资")
location: Mapped[Optional[str]] = mapped_column(String(2048)) location: Mapped[Optional[str]] = mapped_column(String(2048), comment="工作地点")
company: Mapped[Optional[str]] = mapped_column(String(255), comment="公司名") company: Mapped[Optional[str]] = mapped_column(String(255), comment="公司名")
experience: Mapped[Optional[str]] = mapped_column(String(64)) experience: Mapped[Optional[str]] = mapped_column(String(64), comment="经验要求")
education: Mapped[Optional[str]] = mapped_column(String(64)) education: Mapped[Optional[str]] = mapped_column(String(64), comment="学历要求")
description: Mapped[str] = mapped_column(Text, nullable=False) description: Mapped[str] = mapped_column(Text, nullable=False, comment="岗位详情(职责+要求+介绍)")
detail_url: Mapped[str] = mapped_column(String(1024), nullable=False) detail_url: Mapped[str] = mapped_column(String(1024), nullable=False, comment="详情页URL")
recruit_category: Mapped[int] = mapped_column(SmallInteger, default=3, nullable=False, comment="招聘分类: 0=校招, 1=实习, 2=社招, 3=其他") recruit_category: Mapped[int] = mapped_column(SmallInteger, default=3, nullable=False, comment="招聘分类: 0=校招 1=实习 2=社招 3=其他")
content_hash: Mapped[str] = mapped_column(String(64), nullable=False) content_hash: Mapped[str] = mapped_column(String(64), nullable=False, comment="内容哈希,用于去重")
sources: Mapped[int] = mapped_column(SmallInteger, default=0, nullable=False) sources: Mapped[int] = mapped_column(SmallInteger, default=0, nullable=False, comment="数据来源 0=官网 1=平台")
expire_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="发布日期") expire_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="发布日期")
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False) created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="创建时间")
updated_at: Mapped[datetime] = mapped_column(DateTime, nullable=False) updated_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="更新时间")
clean_status: Mapped[str] = mapped_column(String(20), default="pending", nullable=False, comment="pending/cleaning/cleaned/discarded") clean_status: Mapped[str] = mapped_column(String(20), default="pending", nullable=False, comment="清洗状态: pending/cleaning/cleaned/discarded")
clean_started_at: Mapped[Optional[datetime]] = mapped_column(DateTime) clean_started_at: Mapped[Optional[datetime]] = mapped_column(DateTime, comment="清洗开始时间")
cleaned_at: Mapped[Optional[datetime]] = mapped_column(DateTime) cleaned_at: Mapped[Optional[datetime]] = mapped_column(DateTime, comment="清洗完成时间")
+21 -12
View File
@@ -86,6 +86,26 @@ async def _do_clean(data: dict) -> None:
await _update_pg_status(data_id, "discarded") await _update_pg_status(data_id, "discarded")
return return
# 哈希去重续命:content_hash 已存在于 bg_job → 说明是重复刷新,直接续命
content_hash = data.get("content_hash")
if content_hash:
async with MysqlSession() as mysql:
row = await mysql.execute(
text("SELECT id FROM bg_job WHERE content_hash = :hash LIMIT 1"),
{"hash": content_hash},
)
existing_id = row.scalar()
if existing_id:
now = datetime.now()
await mysql.execute(
text("UPDATE bg_job SET status = 0, create_time = :now, update_time = :now WHERE id = :id"),
{"now": now, "id": existing_id},
)
await mysql.commit()
await _update_pg_status(data_id, "cleaned")
log.info("[id={}] 续命:hash命中,岗位ID={}", data_id, existing_id)
return
# 第一次AI:结构化提取 # 第一次AI:结构化提取
user_message = _build_user_message(data) user_message = _build_user_message(data)
result = await ai_chat_json(JobCleanModel.STRUCTURE, JOB_STRUCTURE_SYSTEM, user_message) result = await ai_chat_json(JobCleanModel.STRUCTURE, JOB_STRUCTURE_SYSTEM, user_message)
@@ -94,18 +114,6 @@ async def _do_clean(data: dict) -> None:
await _update_pg_status(data_id, "discarded") await _update_pg_status(data_id, "discarded")
return return
# 去重检查
source_id = str(data_id)
async with MysqlSession() as mysql:
existing = await mysql.execute(
text("SELECT COUNT(*) AS cnt FROM bg_job WHERE source_id = :sid"),
{"sid": source_id},
)
if existing.scalar() > 0:
log.info("[id={}] 跳过:已入库(去重)", data_id)
await _update_pg_status(data_id, "cleaned")
return
# 公司处理 # 公司处理
company_short_name = result.get("companyShortName") or data.get("company") or "" company_short_name = result.get("companyShortName") or data.get("company") or ""
company_id = await _find_or_create_company(company_short_name) company_id = await _find_or_create_company(company_short_name)
@@ -139,6 +147,7 @@ async def _do_clean(data: dict) -> None:
required_industry_id=result.get("requiredIndustryId"), required_industry_id=result.get("requiredIndustryId"),
recruit_category=data.get("recruit_category", 3), recruit_category=data.get("recruit_category", 3),
expire_at=data.get("expire_at"), expire_at=data.get("expire_at"),
content_hash=data.get("content_hash"),
source_url=data.get("detail_url"), source_url=data.get("detail_url"),
source_id=source_id, source_id=source_id,
status=0, status=0,