初始化

This commit is contained in:
zk
2026-06-02 17:44:03 +08:00
commit 30e6a6e2a5
34 changed files with 1692 additions and 0 deletions
+36
View File
@@ -0,0 +1,36 @@
ENV=dev
# PostgreSQL(本地爬虫库)
PG_HOST=192.168.31.51
PG_PORT=5432
PG_USER=postgres
PG_PASSWORD=feAeyR0u2fJGSS5ooFdHnSbyHQNY4WlV
PG_DB=postgres
# MySQL(业务库)
DB_HOST=192.168.31.105
DB_PORT=3306
DB_USER=root
DB_PASSWORD=123456
DB_NAME=offerpie
# AI 供应商
VOLCENGINE_API_KEY=fd065993-bee2-4f31-8bf2-56d5d3012c02
VOLCENGINE_BASE_URL=https://ark.cn-beijing.volces.com/api/v3
# 岗位清洗参数
CLEAN_BATCH_SIZE=100
CLEAN_CONCURRENCY=50
CLEAN_INTERVAL_SECONDS=180
# 公司补充参数
COMPANY_BATCH_SIZE=20
COMPANY_CONCURRENCY=10
COMPANY_INTERVAL_SECONDS=300
# 岗位下架
JOB_EXPIRE_DAYS=7
# 日志
LOGGING_LEVEL=INFO
+7
View File
@@ -0,0 +1,7 @@
.venv/
__pycache__/
*.pyc
.env
.env.prod
logs/
.idea/
+33
View File
@@ -0,0 +1,33 @@
---
inclusion: always
---
# 项目规范执行指引
## 方案讨论前
必读 `#[[file:.kiro/steering/项目结构说明.md]]`,全面了解:
- 项目分层结构:`config``core``ai``models``services``scheduler`
- 双数据源架构(PG 源库 + MySQL 业务库)
- 定时任务清单和清洗流程
- 并发模型(asyncio 协程 + Semaphore 信号量)
- 与 back-end、offerpie_python_ai 的关系
方案讨论时:
- 优先给出简洁的方案思路(涉及哪些模块、新增内容放在哪、核心流程概要),不要一开始就铺开所有细节
- 用户明确要求时,再给出详细的方案流程
- 说明与现有模块的关系
## 开发方案输出前 / 写代码前
必读 `#[[file:.kiro/steering/代码开发风格文档.md]]`,严格遵守:
- 命名约定、类型注解规范
- 双数据源使用规范(PgSession / MysqlSession
- 异步规范(Semaphore、Lock、gather
- AI 调用规范(model_config 引用、ai_tool 封装)
- 日志规范(批次汇总 + 单条关键结果)
- 配置规范(所有可调参数走 settings)
## 写完代码后
涉及新增文件、新增模块或目录结构变更时,必须同步更新 `#[[file:.kiro/steering/项目结构说明.md]]`,保持文档与实际代码一致。
+104
View File
@@ -0,0 +1,104 @@
---
inclusion: manual
---
# 代码开发风格文档
本项目为 Python 3.12 纯后台定时任务服务,基于 asyncio + SQLAlchemy + APScheduler,应用主目录为 `app/`
## 项目结构
- `app/config/` — 配置层:Pydantic Settings 统一配置
- `app/core/` — 核心基础设施:双数据源连接、日志
- `app/ai/` — AI 能力层:LLM 模型枚举、场景配置、Prompt 模板
- `app/models/` — ORM 模型层:分 pg/ 和 mysql/ 两个子包
- `app/services/` — 业务逻辑层:清洗、补充、恢复、下架等异步服务
- `app/scheduler/` — 调度层:APScheduler 定时任务注册
## 命名约定
### 文件命名
- 全部小写,下划线分隔,如 `job_clean_service.py``dict_cache_service.py`
- ORM 模型文件与表名对应(去掉 `bg_``app_` 前缀),如 `job.py` 对应 `bg_job`
### 类命名
- ORM 模型用 PascalCase 业务名,无后缀,如 `Job``Company``AppJobData`
- 字典缓存用 `DictCacheService`
- 枚举类以大写命名,如 `LLM`
- 场景配置类以 `Model` 结尾,如 `JobCleanModel``CompanyCleanModel`
### 变量与函数命名
- 函数和变量使用 snake_case,如 `run_job_clean``data_id`
- 私有函数以单下划线开头,如 `_do_clean``_build_user_message`
- 常量使用全大写下划线,如 `JOB_STRUCTURE_SYSTEM`
- 全局单例小写,如 `dict_cache``_id_gen`
## 类型注解
- 所有函数参数和返回值必须有类型注解
- ORM 模型字段使用 `Mapped[T]` + `mapped_column()` 声明
- 可选字段使用 `Optional[T]``T | None`
- 集合类型使用 `list[T]``dict[K, V]`Python 3.12 内置泛型)
## 注释规范
- 模块级注释使用文件顶部的 docstring,说明模块用途
- 函数注释使用 docstring,简洁描述功能
- 复杂逻辑用行内注释 `#` 说明
- ORM 模型字段通过 `comment` 参数说明含义
## 数据库使用规范
### 双数据源
- PostgreSQL 会话通过 `PgSession()` 获取,用于 `app_job_data` 表操作
- MySQL 会话通过 `MysqlSession()` 获取,用于业务表操作
- 每次操作用 `async with XxxSession() as session` 获取会话,手动 `await session.commit()`
### SQL 风格
- 简单操作使用 `sqlalchemy.text()` 原生 SQL,保持直观
- 批量插入使用 `insert(Model).values(...)``insert(Model), [dict...]`
- 锁定使用 `FOR UPDATE SKIP LOCKED`PG)防阻塞
### 不做分布式事务
- PG 和 MySQL 独立操作,不跨库事务
- 失败靠僵尸恢复机制重试
## 异步规范
- 所有数据库操作、AI 调用使用 `async/await`
- 并发控制使用 `asyncio.Semaphore`
- 互斥操作使用 `asyncio.Lock()`(如公司查找或创建)
- 批量任务使用 `asyncio.gather(*tasks, return_exceptions=True)`
## AI 调用规范
- 业务代码从 `app.ai.model_config` 引用场景配置类,不直接使用 `LLM` 枚举
- AI 调用统一通过 `app.services.ai_tool.ai_chat_json()` 封装(异步调用 + JSON 清洗 + 解析)
- AI 调用失败不影响主流程(第二次/第三次 AI 调用独立 try-catch
- 修改模型或参数只需改 `model_config.py`
## 日志规范
- 使用 `from app.core.logger import log`
- 批次任务:记录锁定数量和完成汇总
- 单条处理:记录关键结果(入库成功/丢弃/跳过),格式 `[id=xxx] 描述`
- 异常:`log.error` 记录异常详情
- 非关键失败:`log.warning` 记录但不中断
## 配置规范
- 所有可调参数放在 `settings.py`,通过 `.env` 文件覆盖
- 运行时参数(batch_size、concurrency、interval、expire_days)必须可配置
- 数据库密码通过 `urllib.parse.quote` 编码后拼入 URL
## 代码格式规范
### 紧凑风格
- 避免过度换行,保持代码紧凑易读
- f-string 拼接优先写在一行
- 方法参数列表较多时,可适当换行但保持紧凑
### 模块组织
- 每个 service 文件对外暴露一个 `run_xxx()` 异步函数作为入口
- 内部逻辑拆分为 `_xxx` 私有函数
- import 按标准库 → 第三方库 → 项目内部 顺序排列
+139
View File
@@ -0,0 +1,139 @@
---
inclusion: manual
---
# OfferPie Job Cleaner 项目结构说明
## 1️⃣ 项目整体层次
```
offerpie_job_cleaner/
├─ .env / .env.prod # 环境变量配置(dev/prod
├─ requirements.txt # Python 依赖清单
└─ app/ # 应用主目录
├─ main.py # 入口:初始化双数据源 → 加载字典缓存 → 启动调度器 → 等待关闭信号
├─ config/ # **配置层**
│ └─ settings.py # Pydantic Settings 统一配置(PG、MySQL、AI供应商、清洗参数、下架参数、日志)
├─ core/ # **核心基础设施层**
│ ├─ database.py # 双数据源:PgSession()PostgreSQL+ MysqlSession()MySQL
│ └─ logger.py # Loguru 日志配置(控制台+文件,按天轮转保留30天)
├─ ai/ # **AI 能力层**
│ ├─ models.py # LLM 模型枚举(LLM.DOUBAO_SEED_LITE、DEEPSEEK_V4_FLASH 等)
│ ├─ model_config.py # AI 模型场景配置(JobCleanModel、CompanyCleanModel
│ └─ prompts.py # 各步骤 Prompt 模板(结构化提取、专业匹配、技能提取、公司补充)
├─ models/ # **ORM 模型层**
│ ├─ pg/ # PostgreSQL 模型
│ │ └─ app_job_data.py # 爬虫原始数据表(app_job_data
│ └─ mysql/ # MySQL 模型
│ ├─ job.py # 岗位表(bg_job
│ ├─ company.py # 公司表(bg_company
│ ├─ skill_tag.py # 技能标签表(bg_skill_tag
│ └─ relations.py # 关联表(bg_job_region_relation、bg_job_skill_tag_relation
├─ services/ # **业务逻辑层**
│ ├─ job_clean_service.py # 岗位清洗主逻辑(协程+信号量并发,三次AI调用)
│ ├─ company_clean_service.py # 公司数据补充(协程+信号量并发,AI补充公司信息)
│ ├─ dict_cache_service.py # 字典数据缓存(启动时从MySQL加载岗位分类/行业/专业/地区)
│ ├─ zombie_recover_service.py # 僵尸恢复(PG岗位超时重置 + MySQL公司超时重置)
│ ├─ job_expire_service.py # 岗位下架(create_time超N天的岗位标记失效)
│ └─ ai_tool.py # AI 调用工具封装(异步调用+JSON清洗+解析)
└─ scheduler/ # **调度层**
└─ tasks.py # APScheduler 定时任务注册(岗位清洗/公司补充/僵尸恢复/岗位下架)
```
## 2️⃣ 各层模块职责
| 层级 | 主要职责 | 关键类/文件 |
|------|----------|-------------|
| **config** | 统一配置管理,双数据源连接参数、AI供应商、清洗/下架参数 | `Settings`pg_*、db_*、volcengine_*、clean_*、company_*、job_expire_days |
| **core** | 核心基础设施:双数据库连接池、日志 | `database.py`PgSession/MysqlSession 函数)、`logger.py`loguru |
| **ai** | AI 模型管理 + Prompt 模板 | `LLM` 枚举、`JobCleanModel`/`CompanyCleanModel`(场景配置)、`prompts.py`4个prompt |
| **models** | SQLAlchemy ORM 模型,分 pg/ 和 mysql/ 两个子包 | `AppJobData`PG)、`Job`/`Company`/`SkillTag`/`Relations`MySQL |
| **services** | 业务逻辑实现,全部异步协程 | 岗位清洗、公司补充、字典缓存、僵尸恢复、岗位下架、AI工具 |
| **scheduler** | APScheduler 定时任务注册和触发 | `tasks.py`5个定时任务) |
## 3️⃣ 技术栈
| 类别 | 技术选型 | 说明 |
|------|----------|------|
| **运行时** | Python 3.12 + asyncio | 纯后台定时任务,无 HTTP 服务 |
| **ORM** | SQLAlchemy 2.0 (asyncio) | 双引擎:asyncpg(PG) + asyncmy(MySQL) |
| **AI/LLM** | LangChain-OpenAI | 兼容 OpenAI 协议,火山引擎豆包模型 |
| **调度** | APScheduler (AsyncIOScheduler) | 轻量异步定时任务调度 |
| **配置** | Pydantic Settings + python-dotenv | 类型安全的 .env 配置管理 |
| **日志** | Loguru | 控制台+文件日志,按天轮转 |
| **ID生成** | snowflake-id | 雪花算法分布式ID |
## 4️⃣ 双数据源架构
| 数据源 | 用途 | 地址 |
|--------|------|------|
| **PostgreSQL** | 爬虫原始数据(app_job_data),只读写这一张表 | 本地 192.168.31.51:5432 |
| **MySQL** | 业务库(bg_job、bg_company、字典表等) | 生产 |
- PG:读取待清洗数据 → 更新清洗状态
- MySQL:写入清洗后的业务数据 + 读取字典缓存
## 5️⃣ 定时任务清单
| 任务 | 频率 | 数据源 | 描述 |
|------|------|--------|------|
| 岗位清洗 | 每3分钟 | PG→MySQL | 批量锁定 pending → 协程并发AI清洗 → 写入 bg_job |
| 公司补充 | 每5分钟 | MySQL | 锁定 status=0 → 协程并发AI补充 → 回填 bg_company |
| 岗位僵尸恢复 | 每30分钟 | PG | cleaning 超时10分钟 → 重置 pending |
| 公司僵尸恢复 | 每小时 | MySQL | status=3 超时10分钟 → 重置 0 |
| 岗位下架 | 每天凌晨2点 | MySQL | create_time 超 N 天 → status=2 |
## 6️⃣ 清洗流程(三次AI调用)
```
app_job_data (PG, pending)
↓ 批量锁定 → cleaning
第一次AI:结构化提取(岗位名/薪资/学历/分类/地区/公司...)
↓ valid=false → discarded
↓ valid=true → 去重 → 创建公司 → 写入 bg_job
第二次AI:专业匹配(requiredMajorIds + majorSensitivity
↓ 失败不影响
第三次AI:技能提取(INSERT IGNORE bg_skill_tag → 写关联表)
↓ 失败不影响
更新 PG: clean_status = cleaned
```
## 7️⃣ 并发模型
- **asyncio 协程 + Semaphore 信号量**
- 批量从数据库捞 N 条(batch_size),所有协程同时启动
- 信号量限制同时调 AI 的并发数(concurrency),做完一条立刻补一条
- 公司创建用 `asyncio.Lock()` 防并发重复插入
## 8️⃣ 状态机
### app_job_data.clean_statusPG
```
pending → cleaning → cleaned
→ discarded
cleaning 超时 → pending(僵尸恢复)
```
### bg_company.statusMySQL
```
0(待完善) → 3(补充中) → 1(已完善)
→ 4(补充失败)
3 超时 → 0(僵尸恢复)
```
## 9️⃣ 与其他项目的关系
- **与 back-endJava)的关系**:共享 MySQL 业务库,清洗逻辑从 Java 迁移至此项目
- **与 offerpie_python_ai 的关系**:独立部署,AI 封装风格一致(LLM枚举 + model_config),但不共享代码
- **数据流向**:爬虫 → PG(app_job_data) → 本项目清洗 → MySQL(bg_job等) → Java/Python AI 服务使用
## 🔟 构建与运行
- **虚拟环境**`.venv` 目录管理
- **依赖安装**`pip install -r requirements.txt`
- **启动**`python -m app.main`
- **环境切换**:通过 `.env` / `.env.prod` 控制,ENV 环境变量指定
- **部署**:本地非公网环境运行,无需 Docker/Nginx
View File
View File
+22
View File
@@ -0,0 +1,22 @@
"""AI 模型场景配置
集中管理清洗服务的模型选择与参数,修改模型只需改此文件。
"""
from app.ai.models import LLM
class JobCleanModel:
"""岗位清洗模块"""
# 第一次AI:结构化提取岗位信息
STRUCTURE = LLM.DOUBAO_SEED_LITE.create(temperature=0)
# 第二次AI:专业匹配
MAJOR_MATCH = LLM.DOUBAO_SEED_LITE.create(temperature=0)
# 第三次AI:技能提取
SKILL_EXTRACT = LLM.DOUBAO_SEED_LITE.create(temperature=0)
class CompanyCleanModel:
"""公司补充模块"""
# 公司信息补充
ENRICH = LLM.DOUBAO_SEED_LITE.create(temperature=0)
+41
View File
@@ -0,0 +1,41 @@
"""LLM 模型枚举与实例获取
Usage:
from app.ai.models import LLM
llm = LLM.DOUBAO_SEED_LITE.create(temperature=0)
"""
from enum import Enum
from langchain_openai import ChatOpenAI
from app.config import settings
# 供应商连接配置
_VOLCENGINE = (lambda: settings.volcengine_api_key, lambda: settings.volcengine_base_url)
class LLM(Enum):
"""所有可用模型,每个枚举值 = (模型名, api_key函数, base_url函数)"""
# 火山引擎
DOUBAO_PRO_32K = ("doubao-1-5-pro-32k-250115", *_VOLCENGINE)
DOUBAO_LITE_32K = ("doubao-1-5-lite-32k-250115", *_VOLCENGINE)
DOUBAO_SEED_LITE = ("doubao-seed-2-0-lite-260215", *_VOLCENGINE)
DOUBAO_SEED_PRO = ("doubao-seed-2-0-pro-260215", *_VOLCENGINE)
DEEPSEEK_V4_FLASH = ("deepseek-v4-flash-260425", *_VOLCENGINE)
def __init__(self, model_name: str, api_key_fn, base_url_fn):
self.model_name = model_name
self._api_key_fn = api_key_fn
self._base_url_fn = base_url_fn
def create(self, **kwargs) -> ChatOpenAI:
"""创建 LLM 实例,kwargs 透传给 ChatOpenAItemperature, max_tokens 等)"""
return ChatOpenAI(
model=self.model_name,
api_key=self._api_key_fn(),
base_url=self._base_url_fn(),
**kwargs,
)
+100
View File
@@ -0,0 +1,100 @@
"""各步骤 Prompt 模板"""
# ──────────── 第一次AI:岗位结构化提取 ────────────
JOB_STRUCTURE_SYSTEM = """你是一个岗位数据清洗助手。请根据提供的原始岗位数据,提取并结构化为JSON格式。
返回JSON格式要求:
{
"valid": true/false,
"title": "岗位名称",
"salary": "标准化薪资,如10-20K、面议,无效则null",
"education": 0-4的数字(0=不限 1=大专 2=本科 3=硕士 4=博士),
"minExperience": 最低工作年限数字(不要求则0),
"employmentType": 0或1(0=全职 1=兼职,默认0),
"categoryId": 岗位分类ID(必选,从分类列表中选最接近的),
"requiredIndustryId": 行业ID(仅当明确提到行业经验要求时设置,列表中无完全匹配则选最相似的,未提到则null),
"description": "岗位职责,保持原文风格,格式化展示",
"requirement": "任职要求,保持原文风格,格式化展示",
"bonus": "加分项,无则null",
"tags": ["核心职能标签,最多5个,如数据分析、产品策略"],
"skillTags": ["技能关键词,最多8个,如Java、Spring Boot"],
"companyShortName": "简洁的公司简称,如字节跳动、中国平安",
"cities": ["工作城市列表,精确到市"]
}
规则:
1. description/requirement/bonus 均从原始的 description+experience+education 内容中提取,保持原文风格
2. 岗位标题不存在时,从描述中归纳生成
3. 薪资标准化为 10-20K、20K、面议 等格式,无效或空则null
4. categoryId 必须从分类列表中选一个,不允许为null
5. requiredIndustryId 仅当描述中明确提到行业经验要求时设置
6. tags 是核心职能标签(如数据分析、团队协作),最多5个
7. skillTags 是技能关键词(如Java、MySQL),最多8个
8. companyShortName 去掉地区后缀、招聘后缀、括号内容,保持简洁
9. 字符串值中不允许出现Tab、换行等控制字符,用空格或中文标点替代
10. 只返回JSON,不要其他内容"""
# ──────────── 第二次AI:专业匹配 ────────────
MAJOR_MATCH_SYSTEM = """你是一个岗位专业匹配助手。根据岗位信息,判断该岗位对专业的要求。
返回JSON格式:
{
"requiredMajorIds": [专业ID数组,从专业列表中选择最相关的,最多3个,无明确要求则空数组],
"majorSensitivity": 0-2的数字(0=专业不限 1=优先相关专业 2=强制要求专业)
}
规则:
1. 只能从给定专业列表中选择ID
2. 根据岗位描述判断专业敏感度:明确写"XX专业"→2,写"相关专业优先"→1,未提及→0
3. majorSensitivity为0时,requiredMajorIds应为空数组
4. 只返回JSON,不要其他内容"""
# ──────────── 第三次AI:技能提取 ────────────
SKILL_EXTRACT_SYSTEM = """你是一个技能提取助手。根据岗位信息,提取该岗位要求的核心专业能力和工具技能。
返回JSON数组格式,如:["java", "spring boot", "mysql", "redis"]
规则:
1. 统一使用小写字母
2. 只保留核心词,去掉多余修饰(如"plc编程""plc""c语言""c""cad制图""cad"
3. 同一技能只保留最具体的表述,不要同时出现上位词和下位词(如有"机械设计"就不要再出"机械"
4. 提取范围包括:技术栈、专业领域知识、行业工具、专业资质能力等
5. 不提取纯软技能(如沟通能力、团队协作、学习能力、积极主动)
6. 不提取过于宽泛的标签(如"办公软件""windows"
7. 如果岗位完全没有专业能力要求(纯看态度和素质),返回空数组 []
8. 最多15个,按重要性排序
9. 只返回JSON数组,不要其他内容"""
# ──────────── 公司补充 ────────────
COMPANY_ENRICH_SYSTEM = """你是一个企业信息补充助手。根据提供的公司简称,补充该公司的详细信息。
返回JSON格式要求:
{
"valid": true/false,
"name": "公司全称",
"city": "总部所在城市,精确到市",
"companyType": "企业类型",
"industryId": 行业ID,
"tags": ["公司标签,最多5个"],
"summary": "一句话简介,100字以内",
"description": "公司详细描述,500字以内",
"foundedYear": "成立年份",
"address": "总部/注册地址",
"scale": "企业规模",
"website": "官网地址",
"financingStage": "融资状态",
"latestValuation": "最新估值",
"news": ["相关新闻,最多3条,每条50字以内"]
}
规则:
1. 如果不认识该公司,返回 {"valid": false}
2. name 根据公司简称推断完整的企业注册名称
3. companyType 取值:上市企业、独角兽、国企、央企、民营企业、外资企业、合资企业、事业单位、其他
4. industryId 必须从给定行业列表中选择,不确定则null
5. scale 取值:少于50人、50-150人、150-500人、500-1000人、1000-5000人、5000-10000人、10000人以上
6. tags 体现公司核心业务特征,最多5个
7. news 基于你的知识提供该公司最新的3条相关新闻,每条50字以内
8. latestValuation 知道就提供,不知道则null
9. 不确定的字段返回null,不要编造
10. 字符串值中不允许出现Tab、换行等控制字符
11. 只返回JSON,不要其他内容"""
+18
View File
@@ -0,0 +1,18 @@
import os
from pathlib import Path
from .settings import Settings
_env = os.getenv("ENV", "dev")
_env_files = {"dev": ".env", "test": ".env.test", "prod": ".env.prod"}
# 定位项目根目录(config 上两级)
_project_root = Path(__file__).resolve().parent.parent.parent
_env_file = _project_root / _env_files.get(_env, ".env")
if not _env_file.exists():
raise FileNotFoundError(f".env 文件不存在: {_env_file}")
settings = Settings(_env_file=str(_env_file))
__all__ = ["settings"]
+70
View File
@@ -0,0 +1,70 @@
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""项目配置,通过 .env 文件覆盖默认值"""
# 环境
env: str = "dev"
# ──────────── PostgreSQL(本地,app_job_data 源库)────────────
pg_host: str = "192.168.31.51"
pg_port: int = 5432
pg_user: str = "postgres"
pg_password: str = ""
pg_db: str = "postgres"
pg_pool_size: int = 10
pg_max_overflow: int = 20
# ──────────── MySQL(业务库)────────────
db_host: str = "192.168.31.105"
db_port: int = 3306
db_user: str = "root"
db_password: str = "123456"
db_name: str = "offerpie"
mysql_pool_size: int = 10
mysql_max_overflow: int = 20
# ──────────── AI 供应商 ────────────
volcengine_api_key: str = "fd065993-bee2-4f31-8bf2-56d5d3012c02"
volcengine_base_url: str = "https://ark.cn-beijing.volces.com/api/v3"
# ──────────── 岗位清洗参数 ────────────
clean_batch_size: int = 100
clean_concurrency: int = 50
clean_interval_seconds: int = 180
# ──────────── 公司补充参数 ────────────
company_batch_size: int = 20
company_concurrency: int = 10
company_interval_seconds: int = 300
# ──────────── 岗位下架参数 ────────────
job_expire_days: int = 7
# ──────────── 日志 ────────────
logging_level: str = "INFO"
log_file_name: str = "cleaner.log"
@property
def pg_url(self) -> str:
from urllib.parse import quote
return (
f"postgresql+asyncpg://{self.pg_user}:{quote(self.pg_password, safe='')}"
f"@{self.pg_host}:{self.pg_port}/{self.pg_db}"
)
@property
def mysql_url(self) -> str:
from urllib.parse import quote
return (
f"mysql+asyncmy://{self.db_user}:{quote(self.db_password, safe='')}"
f"@{self.db_host}:{self.db_port}/{self.db_name}"
)
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
extra="ignore",
)
View File
+79
View File
@@ -0,0 +1,79 @@
"""双数据源:PostgreSQL(源库) + MySQL(业务库)"""
from typing import Optional
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase
from app.config import settings
from app.core.logger import log
# ──────────── 内部变量 ────────────
_pg_engine: Optional[AsyncEngine] = None
_pg_session_factory: Optional[async_sessionmaker[AsyncSession]] = None
_mysql_engine: Optional[AsyncEngine] = None
_mysql_session_factory: Optional[async_sessionmaker[AsyncSession]] = None
class PgBase(DeclarativeBase):
"""PostgreSQL ORM 声明基类"""
pass
class MysqlBase(DeclarativeBase):
"""MySQL ORM 声明基类"""
pass
async def init_db() -> None:
"""初始化双数据源"""
global _pg_engine, _pg_session_factory, _mysql_engine, _mysql_session_factory
_pg_engine = create_async_engine(
settings.pg_url,
pool_size=settings.pg_pool_size,
max_overflow=settings.pg_max_overflow,
pool_recycle=3600,
echo=False,
)
_pg_session_factory = async_sessionmaker(_pg_engine, expire_on_commit=False)
_mysql_engine = create_async_engine(
settings.mysql_url,
pool_size=settings.mysql_pool_size,
max_overflow=settings.mysql_max_overflow,
pool_recycle=3600,
echo=False,
)
_mysql_session_factory = async_sessionmaker(_mysql_engine, expire_on_commit=False)
log.info("双数据源初始化完成: PG={}, MySQL={}", settings.pg_host, settings.db_host)
async def close_db() -> None:
"""关闭双数据源"""
if _pg_engine:
await _pg_engine.dispose()
if _mysql_engine:
await _mysql_engine.dispose()
log.info("双数据源已关闭")
def PgSession() -> AsyncSession:
"""获取 PostgreSQL 异步会话(用作 async with PgSession() as session"""
if _pg_session_factory is None:
raise RuntimeError("数据库未初始化,请先调用 init_db()")
return _pg_session_factory()
def MysqlSession() -> AsyncSession:
"""获取 MySQL 异步会话(用作 async with MysqlSession() as session"""
if _mysql_session_factory is None:
raise RuntimeError("数据库未初始化,请先调用 init_db()")
return _mysql_session_factory()
+34
View File
@@ -0,0 +1,34 @@
"""日志配置"""
import sys
from pathlib import Path
from loguru import logger
from app.config import settings
# 日志目录
_log_dir = Path("logs")
_log_dir.mkdir(exist_ok=True)
# 移除默认 handler
logger.remove()
# 控制台输出
logger.add(
sys.stdout,
level=settings.logging_level,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
)
# 文件输出(按天轮转,保留30天)
logger.add(
_log_dir / settings.log_file_name,
level=settings.logging_level,
rotation="00:00",
retention="30 days",
encoding="utf-8",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{line} - {message}",
)
log = logger
+64
View File
@@ -0,0 +1,64 @@
"""项目入口:初始化数据源、加载字典、启动调度器"""
import asyncio
import signal
import warnings
from datetime import datetime
from app.core.logger import log
# 屏蔽 asyncmy INSERT IGNORE 产生的 Duplicate entry warnings
warnings.filterwarnings("ignore", message=".*Duplicate entry.*")
from app.core.database import init_db, close_db
from app.services.dict_cache_service import dict_cache
from app.scheduler.tasks import create_scheduler
async def main():
log.info("=" * 50)
log.info("OfferPie Job Cleaner 启动中...")
log.info("=" * 50)
# 初始化双数据源
await init_db()
# 加载字典缓存
await dict_cache.refresh()
# 创建并启动调度器
scheduler = create_scheduler()
scheduler.start()
# 立即触发一次岗位清洗和公司补充
scheduler.modify_job("job_clean", next_run_time=datetime.now())
scheduler.modify_job("company_clean", next_run_time=datetime.now())
log.info("调度器已启动,所有定时任务已注册")
# 优雅关闭
stop_event = asyncio.Event()
def _shutdown(*args):
log.info("收到关闭信号,正在关闭...")
stop_event.set()
loop = asyncio.get_running_loop()
# Unix: SIGINT + SIGTERMWindows: 仅靠 KeyboardInterrupt
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, _shutdown)
except (NotImplementedError, ValueError):
pass
try:
await stop_event.wait()
except KeyboardInterrupt:
pass
finally:
scheduler.shutdown(wait=False)
await close_db()
log.info("OfferPie Job Cleaner 已关闭")
if __name__ == "__main__":
asyncio.run(main())
View File
View File
+36
View File
@@ -0,0 +1,36 @@
"""MySQL: bg_company 表模型"""
from datetime import datetime
from typing import Optional
from sqlalchemy import BigInteger, DateTime, Integer, JSON, String, Text
from sqlalchemy.orm import Mapped, mapped_column
from app.core.database import MysqlBase
class Company(MysqlBase):
"""公司表"""
__tablename__ = "bg_company"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
name: Mapped[Optional[str]] = mapped_column(String(255))
short_name: Mapped[str] = mapped_column(String(128), nullable=False)
logo_url: Mapped[Optional[str]] = mapped_column(String(512))
region_code: Mapped[Optional[str]] = mapped_column(String(20))
company_type: Mapped[Optional[str]] = mapped_column(String(32))
industry_id: Mapped[Optional[int]] = mapped_column(BigInteger)
tags: Mapped[Optional[list]] = mapped_column(JSON)
summary: Mapped[Optional[str]] = mapped_column(String(512))
description: Mapped[Optional[str]] = mapped_column(Text)
founded_year: Mapped[Optional[str]] = mapped_column(String(10))
address: Mapped[Optional[str]] = mapped_column(String(255))
scale: Mapped[Optional[str]] = mapped_column(String(32))
website: Mapped[Optional[str]] = mapped_column(String(255))
financing_stage: Mapped[Optional[str]] = mapped_column(String(32))
latest_valuation: Mapped[Optional[str]] = mapped_column(String(64))
news: Mapped[Optional[list]] = mapped_column(JSON)
status: Mapped[int] = mapped_column(Integer, default=0, comment="0=待完善 1=已完善 2=禁用 3=补充中 4=补充失败")
create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False)
update_time: Mapped[datetime] = mapped_column(DateTime, nullable=False)
+39
View File
@@ -0,0 +1,39 @@
"""MySQL: bg_job 表模型"""
from datetime import datetime
from typing import Optional
from sqlalchemy import BigInteger, DateTime, Integer, JSON, String, Text
from sqlalchemy.orm import Mapped, mapped_column
from app.core.database import MysqlBase
class Job(MysqlBase):
"""岗位表"""
__tablename__ = "bg_job"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
title: Mapped[str] = mapped_column(String(255), nullable=False)
company_id: Mapped[int] = mapped_column(BigInteger, nullable=False)
category_id: Mapped[int] = mapped_column(BigInteger, nullable=False)
employment_type: Mapped[int] = mapped_column(Integer, default=0)
description: Mapped[Optional[str]] = mapped_column(Text)
requirement: Mapped[Optional[str]] = mapped_column(Text)
bonus: Mapped[Optional[str]] = mapped_column(Text)
tags: Mapped[Optional[list]] = mapped_column(JSON)
skill_tags: Mapped[Optional[list]] = mapped_column(JSON)
salary: Mapped[Optional[str]] = mapped_column(String(64))
education: Mapped[int] = mapped_column(Integer, default=0)
min_experience: Mapped[int] = mapped_column(Integer, default=0)
required_industry_id: Mapped[Optional[int]] = mapped_column(BigInteger)
required_major_ids: Mapped[Optional[list]] = mapped_column(JSON)
major_sensitivity: Mapped[Optional[int]] = mapped_column(Integer)
source_url: Mapped[Optional[str]] = mapped_column(String(1024))
source_id: Mapped[Optional[str]] = mapped_column(String(64))
recruit_category: Mapped[Optional[int]] = mapped_column(Integer, comment="招聘分类: 0=校招, 1=实习, 2=社招, 3=其他")
expire_at: Mapped[Optional[datetime]] = mapped_column(DateTime, comment="发布日期")
status: Mapped[int] = mapped_column(Integer, default=0, comment="0=上架 1=下架 2=已失效")
create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False)
update_time: Mapped[datetime] = mapped_column(DateTime, nullable=False)
+30
View File
@@ -0,0 +1,30 @@
"""MySQL: 关联表模型"""
from datetime import datetime
from sqlalchemy import BigInteger, DateTime, String
from sqlalchemy.orm import Mapped, mapped_column
from app.core.database import MysqlBase
class JobRegionRelation(MysqlBase):
"""岗位-地区关联表"""
__tablename__ = "bg_job_region_relation"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
job_id: Mapped[int] = mapped_column(BigInteger, nullable=False)
region_code: Mapped[str] = mapped_column(String(20), nullable=False)
create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False)
class JobSkillTagRelation(MysqlBase):
"""岗位-技能标签关联表"""
__tablename__ = "bg_job_skill_tag_relation"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
job_id: Mapped[int] = mapped_column(BigInteger, nullable=False)
skill_tag_id: Mapped[int] = mapped_column(BigInteger, nullable=False)
create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False)
+15
View File
@@ -0,0 +1,15 @@
"""MySQL: bg_skill_tag 表模型"""
from sqlalchemy import BigInteger, String
from sqlalchemy.orm import Mapped, mapped_column
from app.core.database import MysqlBase
class SkillTag(MysqlBase):
"""技能标签表"""
__tablename__ = "bg_skill_tag"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
View File
+35
View File
@@ -0,0 +1,35 @@
"""PostgreSQL: app_job_data 表模型"""
from datetime import datetime
from typing import Optional
from sqlalchemy import BigInteger, DateTime, Integer, SmallInteger, String, Text
from sqlalchemy.orm import Mapped, mapped_column
from app.core.database import PgBase
class AppJobData(PgBase):
"""爬虫岗位原始数据"""
__tablename__ = "app_job_data"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
urllistid: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="关联urllistid")
job_title: Mapped[Optional[str]] = mapped_column(String(255))
salary: Mapped[Optional[str]] = mapped_column(String(128))
location: Mapped[Optional[str]] = mapped_column(String(2048))
company: Mapped[Optional[str]] = mapped_column(String(255), comment="公司名字")
experience: Mapped[Optional[str]] = mapped_column(String(64))
education: Mapped[Optional[str]] = mapped_column(String(64))
description: Mapped[str] = mapped_column(Text, nullable=False)
detail_url: Mapped[str] = mapped_column(String(1024), nullable=False)
recruit_category: Mapped[int] = mapped_column(SmallInteger, default=3, nullable=False, comment="招聘分类: 0=校招, 1=实习, 2=社招, 3=其他")
content_hash: Mapped[str] = mapped_column(String(64), nullable=False)
sources: Mapped[int] = mapped_column(SmallInteger, default=0, nullable=False)
expire_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="发布日期")
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
updated_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
clean_status: Mapped[str] = mapped_column(String(20), default="pending", nullable=False, comment="pending/cleaning/cleaned/discarded")
clean_started_at: Mapped[Optional[datetime]] = mapped_column(DateTime)
cleaned_at: Mapped[Optional[datetime]] = mapped_column(DateTime)
View File
+105
View File
@@ -0,0 +1,105 @@
"""定时任务注册"""
from datetime import datetime, timedelta
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from app.config import settings
from app.core.logger import log
def create_scheduler() -> AsyncIOScheduler:
"""创建并注册所有定时任务"""
scheduler = AsyncIOScheduler(
timezone="Asia/Shanghai",
job_defaults={"misfire_grace_time": 60},
)
# 岗位清洗(每 N 秒)
scheduler.add_job(
_job_clean_task,
trigger=IntervalTrigger(seconds=settings.clean_interval_seconds),
id="job_clean",
name="岗位清洗",
max_instances=1,
)
# 公司补充(每 N 秒)
scheduler.add_job(
_company_clean_task,
trigger=IntervalTrigger(seconds=settings.company_interval_seconds),
id="company_clean",
name="公司补充",
max_instances=1,
)
# 岗位僵尸恢复(每30分钟)
scheduler.add_job(
_job_zombie_task,
trigger=IntervalTrigger(minutes=30),
id="job_zombie_recover",
name="岗位僵尸恢复",
max_instances=1,
)
# 公司僵尸恢复(每小时)
scheduler.add_job(
_company_zombie_task,
trigger=IntervalTrigger(hours=1),
id="company_zombie_recover",
name="公司僵尸恢复",
max_instances=1,
)
# 岗位下架(每天凌晨2点)
scheduler.add_job(
_job_expire_task,
trigger=CronTrigger(hour=2, minute=0),
id="job_expire",
name="岗位下架",
max_instances=1,
)
return scheduler
async def _job_clean_task():
from app.services.job_clean_service import run_job_clean
try:
await run_job_clean()
except Exception as e:
log.error("岗位清洗任务异常: {}", e)
async def _company_clean_task():
from app.services.company_clean_service import run_company_clean
try:
await run_company_clean()
except Exception as e:
log.error("公司补充任务异常: {}", e)
async def _job_zombie_task():
from app.services.zombie_recover_service import recover_job_zombie
try:
await recover_job_zombie()
except Exception as e:
log.error("岗位僵尸恢复异常: {}", e)
async def _company_zombie_task():
from app.services.zombie_recover_service import recover_company_zombie
try:
await recover_company_zombie()
except Exception as e:
log.error("公司僵尸恢复异常: {}", e)
async def _job_expire_task():
from app.services.job_expire_service import run_job_expire
try:
await run_job_expire()
except Exception as e:
log.error("岗位下架异常: {}", e)
View File
+65
View File
@@ -0,0 +1,65 @@
"""AI 调用工具封装"""
import json
import re
from typing import Any
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
from app.core.logger import log
# markdown 代码块正则
_CODE_BLOCK_RE = re.compile(r"```\w*\s*\n?(.*?)\n?\s*```", re.DOTALL)
# 控制字符正则(保留 \t \n \r)
_CONTROL_CHAR_RE = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f]")
def clean_ai_response(response: str) -> str:
"""从 AI 返回的文本中提取干净的 JSON 字符串"""
if not response or not response.strip():
return ""
result = response.strip()
# 尝试从 markdown 代码块提取
match = _CODE_BLOCK_RE.search(result)
if match:
result = match.group(1).strip()
else:
# 定位首个 JSON 起始符
obj_start = result.find("{")
arr_start = result.find("[")
if obj_start < 0:
start = arr_start
elif arr_start < 0:
start = obj_start
else:
start = min(obj_start, arr_start)
if start > 0:
result = result[start:]
# 清除控制字符
result = _CONTROL_CHAR_RE.sub("", result)
return result
async def ai_chat(llm: ChatOpenAI, system_prompt: str, user_message: str) -> str:
"""异步调用 LLM,返回原始文本"""
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=user_message),
]
response = await llm.ainvoke(messages)
return response.content
async def ai_chat_json(llm: ChatOpenAI, system_prompt: str, user_message: str) -> Any:
"""异步调用 LLM,返回解析后的 JSON 对象"""
raw = await ai_chat(llm, system_prompt, user_message)
cleaned = clean_ai_response(raw)
try:
return json.loads(cleaned)
except json.JSONDecodeError as e:
log.warning("AI JSON 解析失败: {}, raw={}", e, raw[:200])
return None
+138
View File
@@ -0,0 +1,138 @@
"""公司数据补充服务(协程版)"""
import asyncio
from datetime import datetime
from sqlalchemy import text
from app.config import settings
from app.core.database import MysqlSession
from app.core.logger import log
from app.ai.model_config import CompanyCleanModel
from app.ai.prompts import COMPANY_ENRICH_SYSTEM
from app.services.ai_tool import ai_chat_json
from app.services.dict_cache_service import dict_cache
async def run_company_clean() -> None:
"""一次批量公司补充任务"""
# 锁定一批待完善公司
async with MysqlSession() as mysql:
result = await mysql.execute(
text("""
SELECT * FROM bg_company
WHERE status = 0
LIMIT :limit
FOR UPDATE SKIP LOCKED
"""),
{"limit": settings.company_batch_size},
)
rows = result.mappings().all()
if not rows:
return
ids = [r["id"] for r in rows]
# MySQL 批量 IN 用 format 拼接(id 是 bigint,安全)
ids_str = ",".join(str(i) for i in ids)
await mysql.execute(
text(f"UPDATE bg_company SET status = 3, update_time = NOW() WHERE id IN ({ids_str})"),
)
await mysql.commit()
log.info("公司补充:锁定{}条数据", len(rows))
# 协程并发,信号量限流
sem = asyncio.Semaphore(settings.company_concurrency)
tasks = [_clean_one(sem, dict(r)) for r in rows]
await asyncio.gather(*tasks, return_exceptions=True)
async def _clean_one(sem: asyncio.Semaphore, company: dict) -> None:
"""单条公司补充"""
async with sem:
try:
await _do_clean(company)
except Exception as e:
log.error("公司补充异常, id={}, shortName={}: {}", company["id"], company.get("short_name"), e)
async def _do_clean(company: dict) -> None:
"""公司补充逻辑"""
company_id = company["id"]
short_name = company.get("short_name", "")
user_msg = f"【公司简称】\n{short_name}\n\n【行业列表】\n{dict_cache.industry_text}"
result = await ai_chat_json(CompanyCleanModel.ENRICH, COMPANY_ENRICH_SYSTEM, user_msg)
if result is None or not result.get("valid", False):
await _update_status(company_id, 4)
return
# 地区匹配
city = result.get("city")
region_code = dict_cache.match_region_code(city) if city else None
# 回填数据
now = datetime.now()
async with MysqlSession() as mysql:
await mysql.execute(
text("""
UPDATE bg_company SET
name = COALESCE(:name, name),
region_code = COALESCE(:region_code, region_code),
company_type = COALESCE(:company_type, company_type),
industry_id = :industry_id,
tags = :tags,
summary = COALESCE(:summary, summary),
description = COALESCE(:description, description),
founded_year = COALESCE(:founded_year, founded_year),
address = COALESCE(:address, address),
scale = COALESCE(:scale, scale),
website = COALESCE(:website, website),
financing_stage = COALESCE(:financing_stage, financing_stage),
latest_valuation = COALESCE(:latest_valuation, latest_valuation),
news = :news,
status = 1,
update_time = :now
WHERE id = :id
"""),
{
"name": result.get("name"),
"region_code": region_code,
"company_type": result.get("companyType"),
"industry_id": result.get("industryId"),
"tags": _to_json(result.get("tags")),
"summary": result.get("summary"),
"description": result.get("description"),
"founded_year": result.get("foundedYear"),
"address": result.get("address"),
"scale": result.get("scale"),
"website": result.get("website"),
"financing_stage": result.get("financingStage"),
"latest_valuation": result.get("latestValuation"),
"news": _to_json(result.get("news")),
"now": now,
"id": company_id,
},
)
await mysql.commit()
log.info("公司补充完成, id={}, shortName={}", company_id, short_name)
async def _update_status(company_id: int, status: int) -> None:
"""更新公司状态"""
async with MysqlSession() as mysql:
await mysql.execute(
text("UPDATE bg_company SET status = :s, update_time = NOW() WHERE id = :id"),
{"s": status, "id": company_id},
)
await mysql.commit()
def _to_json(value) -> str | None:
"""列表转 JSON 字符串"""
import json
if value and isinstance(value, list):
return json.dumps(value, ensure_ascii=False)
return None
+91
View File
@@ -0,0 +1,91 @@
"""字典数据缓存服务
启动时从 MySQL 加载岗位分类、行业、专业分类、地区数据到内存。
"""
from sqlalchemy import select, text
from app.core.database import MysqlSession
from app.core.logger import log
class DictCacheService:
"""字典缓存,单例使用"""
def __init__(self):
self.job_category_text: str = ""
self.industry_text: str = ""
self.major_category_text: str = ""
self._region_list: list[dict] = []
async def refresh(self) -> None:
"""加载全量字典数据"""
async with MysqlSession() as session:
# 岗位分类(三级叶子,带父级路径)
result = await session.execute(text("""
SELECT c.id, c.name, c.parent_id, c.root_id, c.level,
p.name AS parent_name, r.name AS root_name
FROM bg_job_category c
LEFT JOIN bg_job_category p ON c.parent_id = p.id
LEFT JOIN bg_job_category r ON c.root_id = r.id
WHERE c.level = 3
"""))
categories = result.mappings().all()
self.job_category_text = ", ".join(
f"{c['id']}:{c['name']}({c['root_name']}/{c['parent_name']})"
for c in categories
)
# 行业(二级叶子,带父级)
result = await session.execute(text("""
SELECT i.id, i.name, p.name AS parent_name
FROM bg_industry i
LEFT JOIN bg_industry p ON i.parent_id = p.id
WHERE i.level = 2
"""))
industries = result.mappings().all()
self.industry_text = ", ".join(
f"{i['id']}:{i['name']}({i['parent_name']})"
for i in industries
)
# 专业分类(三级叶子,带父级路径)
result = await session.execute(text("""
SELECT m.id, m.name, m.parent_id, m.root_id,
p.name AS parent_name, r.name AS root_name
FROM bg_major_category m
LEFT JOIN bg_major_category p ON m.parent_id = p.id
LEFT JOIN bg_major_category r ON m.root_id = r.id
WHERE m.level = 3
"""))
majors = result.mappings().all()
self.major_category_text = ", ".join(
f"{m['id']}:{m['name']}({m['root_name']}/{m['parent_name']})"
for m in majors
)
# 地区(省市级)
result = await session.execute(text("""
SELECT code, name FROM bg_china_regions_code WHERE city_code IS NULL
"""))
self._region_list = [dict(r) for r in result.mappings().all()]
log.info(
"字典缓存加载完成: 岗位分类{}条, 行业{}条, 专业{}条, 地区{}",
len(categories), len(industries), len(majors), len(self._region_list),
)
def match_region_code(self, city_name: str) -> str | None:
"""根据城市名模糊匹配地区编码"""
if not city_name:
return None
name = city_name.replace("", "").replace("", "").strip()
for r in self._region_list:
r_name = r["name"].replace("", "").replace("", "")
if name in r_name or r_name in name:
return r["code"]
return None
# 全局单例
dict_cache = DictCacheService()
+306
View File
@@ -0,0 +1,306 @@
"""岗位清洗服务(协程版)"""
import asyncio
import json
from datetime import datetime
from snowflake import SnowflakeGenerator
from sqlalchemy import text, insert
from app.config import settings
from app.core.database import PgSession, MysqlSession
from app.core.logger import log
from app.ai.model_config import JobCleanModel
from app.ai.prompts import JOB_STRUCTURE_SYSTEM, MAJOR_MATCH_SYSTEM, SKILL_EXTRACT_SYSTEM
from app.models.mysql.job import Job
from app.models.mysql.company import Company
from app.models.mysql.relations import JobRegionRelation, JobSkillTagRelation
from app.services.ai_tool import ai_chat_json
from app.services.dict_cache_service import dict_cache
# 雪花ID生成器
_id_gen = SnowflakeGenerator(instance=1)
# 公司创建锁(防止并发重复插入同一公司)
_company_lock = asyncio.Lock()
async def run_job_clean() -> None:
"""一次批量清洗任务"""
# 1. 从 PG 锁定一批待清洗数据
async with PgSession() as pg:
result = await pg.execute(
text("""
SELECT * FROM app_job_data
WHERE clean_status = 'pending'
LIMIT :limit
FOR UPDATE SKIP LOCKED
"""),
{"limit": settings.clean_batch_size},
)
rows = result.mappings().all()
if not rows:
return
ids = [r["id"] for r in rows]
await pg.execute(
text("""
UPDATE app_job_data
SET clean_status = 'cleaning', clean_started_at = NOW()
WHERE id = ANY(:ids)
"""),
{"ids": ids},
)
await pg.commit()
log.info("岗位清洗:锁定{}条数据", len(rows))
# 2. 协程并发清洗,信号量限流
sem = asyncio.Semaphore(settings.clean_concurrency)
tasks = [_clean_one(sem, dict(r)) for r in rows]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 汇总
errors = sum(1 for r in results if isinstance(r, Exception))
log.info("岗位清洗:本批完成,共{}条,异常{}", len(rows), errors)
async def _clean_one(sem: asyncio.Semaphore, data: dict) -> None:
"""单条岗位清洗"""
async with sem:
try:
await _do_clean(data)
except Exception as e:
log.error("岗位清洗异常, id={}: {}", data["id"], e)
# 保持 cleaning 状态,由僵尸恢复任务重置
async def _do_clean(data: dict) -> None:
"""清洗逻辑"""
data_id = data["id"]
# 前置校验
description = data.get("description") or ""
if len(description) < 20:
log.info("[id={}] 丢弃:描述过短({}字符)", data_id, len(description))
await _update_pg_status(data_id, "discarded")
return
# 第一次AI:结构化提取
user_message = _build_user_message(data)
result = await ai_chat_json(JobCleanModel.STRUCTURE, JOB_STRUCTURE_SYSTEM, user_message)
if result is None or not result.get("valid", False):
log.info("[id={}] 丢弃:AI判定无效", data_id)
await _update_pg_status(data_id, "discarded")
return
# 去重检查
source_id = str(data_id)
async with MysqlSession() as mysql:
existing = await mysql.execute(
text("SELECT COUNT(*) AS cnt FROM bg_job WHERE source_id = :sid"),
{"sid": source_id},
)
if existing.scalar() > 0:
log.info("[id={}] 跳过:已入库(去重)", data_id)
await _update_pg_status(data_id, "cleaned")
return
# 公司处理
company_short_name = result.get("companyShortName") or data.get("company") or ""
company_id = await _find_or_create_company(company_short_name)
# 地区处理
region_codes = []
for city in result.get("cities") or []:
code = dict_cache.match_region_code(city)
if code:
region_codes.append(code)
# 写入 bg_job
job_id = next(_id_gen)
now = datetime.now()
async with MysqlSession() as mysql:
await mysql.execute(
insert(Job).values(
id=job_id,
title=result.get("title", ""),
company_id=company_id,
category_id=result.get("categoryId", 0),
employment_type=result.get("employmentType", 0),
description=result.get("description", ""),
requirement=result.get("requirement", ""),
bonus=result.get("bonus"),
tags=result.get("tags"),
skill_tags=result.get("skillTags"),
salary=result.get("salary"),
education=result.get("education", 0),
min_experience=result.get("minExperience", 0),
required_industry_id=result.get("requiredIndustryId"),
recruit_category=data.get("recruit_category", 3),
expire_at=data.get("expire_at"),
source_url=data.get("detail_url"),
source_id=source_id,
status=0,
create_time=now,
update_time=now,
)
)
# 写入地区关联
if region_codes:
await mysql.execute(
insert(JobRegionRelation),
[{"id": next(_id_gen), "job_id": job_id, "region_code": code, "create_time": now} for code in region_codes],
)
await mysql.commit()
# 更新 PG 状态
await _update_pg_status(data_id, "cleaned")
log.info("[id={}] 入库成功:{} | 公司={} | 地区={}", data_id, result.get("title"), company_short_name, region_codes)
# 第二次AI:专业匹配(失败不影响)
try:
await _match_major(job_id, result)
log.debug("[id={}] 专业匹配完成", data_id)
except Exception as e:
log.warning("[id={}] 专业匹配失败: {}", data_id, e)
# 第三次AI:技能提取(失败不影响)
try:
await _extract_skill_tags(job_id, result)
log.debug("[id={}] 技能提取完成", data_id)
except Exception as e:
log.warning("[id={}] 技能提取失败: {}", data_id, e)
async def _match_major(job_id: int, result: dict) -> None:
"""第二次AI:专业匹配"""
title = result.get("title", "")
desc = result.get("description", "")
req = result.get("requirement", "")
user_msg = f"【岗位信息】\n标题: {title}\n职责: {desc}\n要求: {req}\n\n【专业分类列表】\n{dict_cache.major_category_text}"
data = await ai_chat_json(JobCleanModel.MAJOR_MATCH, MAJOR_MATCH_SYSTEM, user_msg)
if data is None:
return
major_ids = [mid for mid in (data.get("requiredMajorIds") or []) if mid > 0]
sensitivity = data.get("majorSensitivity", 0)
async with MysqlSession() as mysql:
await mysql.execute(
text("""
UPDATE bg_job SET required_major_ids = :ids, major_sensitivity = :s, update_time = :t
WHERE id = :jid
"""),
{"ids": json.dumps(major_ids) if major_ids else None, "s": sensitivity, "t": datetime.now(), "jid": job_id},
)
await mysql.commit()
async def _extract_skill_tags(job_id: int, result: dict) -> None:
"""第三次AI:技能提取"""
title = result.get("title", "")
desc = result.get("description", "")
req = result.get("requirement", "")
user_msg = f"【岗位信息】\n标题: {title}\n职责: {desc}\n要求: {req}"
skills = await ai_chat_json(JobCleanModel.SKILL_EXTRACT, SKILL_EXTRACT_SYSTEM, user_msg)
if not skills or not isinstance(skills, list):
return
now = datetime.now()
tag_ids = []
async with MysqlSession() as mysql:
for name in skills:
name = str(name).strip().lower()
if not name or len(name) > 50:
continue
tag_id = next(_id_gen)
# INSERT IGNORE
await mysql.execute(
text("INSERT IGNORE INTO bg_skill_tag (id, name) VALUES (:id, :name)"),
{"id": tag_id, "name": name},
)
# 查回真实ID
row = await mysql.execute(
text("SELECT id FROM bg_skill_tag WHERE name = :name LIMIT 1"),
{"name": name},
)
real_id = row.scalar()
if real_id and real_id not in tag_ids:
tag_ids.append(real_id)
if tag_ids:
await mysql.execute(
insert(JobSkillTagRelation),
[{"id": next(_id_gen), "job_id": job_id, "skill_tag_id": tid, "create_time": now} for tid in tag_ids],
)
await mysql.commit()
async def _find_or_create_company(short_name: str) -> int:
"""查找或创建公司(加锁防并发重复)"""
async with _company_lock:
async with MysqlSession() as mysql:
row = await mysql.execute(
text("SELECT id FROM bg_company WHERE short_name = :name LIMIT 1"),
{"name": short_name},
)
existing = row.scalar()
if existing:
return existing
company_id = next(_id_gen)
now = datetime.now()
await mysql.execute(
insert(Company).values(
id=company_id,
name=short_name,
short_name=short_name,
status=0,
create_time=now,
update_time=now,
)
)
await mysql.commit()
return company_id
async def _update_pg_status(data_id: int, status: str) -> None:
"""更新 PG 清洗状态"""
async with PgSession() as pg:
if status == "cleaned":
await pg.execute(
text("UPDATE app_job_data SET clean_status = :s, cleaned_at = NOW() WHERE id = :id"),
{"s": status, "id": data_id},
)
else:
await pg.execute(
text("UPDATE app_job_data SET clean_status = :s WHERE id = :id"),
{"s": status, "id": data_id},
)
await pg.commit()
def _build_user_message(data: dict) -> str:
"""构建第一次AI的用户消息"""
parts = [
"【原始数据】",
f"岗位名称: {data.get('job_title') or ''}",
f"薪资: {data.get('salary') or ''}",
f"工作地点: {data.get('location') or ''}",
f"公司: {data.get('company') or ''}",
f"经验要求: {data.get('experience') or ''}",
f"学历要求: {data.get('education') or ''}",
f"岗位详情: {data.get('description') or ''}",
"",
f"【岗位分类列表】\n{dict_cache.job_category_text}",
"",
f"【行业列表】\n{dict_cache.industry_text}",
]
return "\n".join(parts)
+29
View File
@@ -0,0 +1,29 @@
"""岗位下架服务
每天定时执行,将 create_time 超过 N 天的岗位标记为已失效。
"""
from sqlalchemy import text
from app.config import settings
from app.core.database import MysqlSession
from app.core.logger import log
async def run_job_expire() -> None:
"""下架过期岗位"""
days = int(settings.job_expire_days)
async with MysqlSession() as mysql:
result = await mysql.execute(
text(f"""
UPDATE bg_job
SET status = 2, update_time = NOW()
WHERE status = 0
AND create_time < DATE_SUB(NOW(), INTERVAL {days} DAY)
"""),
)
await mysql.commit()
affected = result.rowcount
if affected > 0:
log.info("岗位下架:{}条岗位已标记为失效(超过{}天)", affected, days)
+42
View File
@@ -0,0 +1,42 @@
"""僵尸恢复服务"""
from sqlalchemy import text
from app.core.database import PgSession, MysqlSession
from app.core.logger import log
async def recover_job_zombie() -> None:
"""岗位清洗僵尸恢复:超时10分钟的 cleaning → pending"""
async with PgSession() as pg:
result = await pg.execute(
text("""
UPDATE app_job_data
SET clean_status = 'pending', clean_started_at = NULL
WHERE clean_status = 'cleaning'
AND clean_started_at < NOW() - INTERVAL '10 minutes'
""")
)
await pg.commit()
affected = result.rowcount
if affected > 0:
log.info("岗位僵尸恢复:重置{}条数据", affected)
async def recover_company_zombie() -> None:
"""公司补充僵尸恢复:超时10分钟的 status=3 → 0"""
async with MysqlSession() as mysql:
result = await mysql.execute(
text("""
UPDATE bg_company
SET status = 0, update_time = NOW()
WHERE status = 3
AND update_time < NOW() - INTERVAL 10 MINUTE
""")
)
await mysql.commit()
affected = result.rowcount
if affected > 0:
log.info("公司僵尸恢复:重置{}条数据", affected)
+14
View File
@@ -0,0 +1,14 @@
# 核心
pydantic-settings>=2.0
sqlalchemy[asyncio]>=2.0
asyncpg>=0.29
asyncmy>=0.2
apscheduler>=3.10
# AI
langchain-openai>=0.3
langchain-core>=0.3
# 工具
loguru>=0.7
snowflake-id>=1.0