7.1 KiB
7.1 KiB
inclusion
| inclusion |
|---|
| manual |
OfferPie Job Cleaner 项目结构说明
1️⃣ 项目整体层次
offerpie_job_cleaner/
│
├─ .env / .env.prod # 环境变量配置(dev/prod)
├─ requirements.txt # Python 依赖清单
│
└─ app/ # 应用主目录
├─ main.py # 入口:初始化双数据源 → 加载字典缓存 → 启动调度器 → 等待关闭信号
│
├─ config/ # **配置层**
│ └─ settings.py # Pydantic Settings 统一配置(PG、MySQL、AI供应商、清洗参数、下架参数、日志)
│
├─ core/ # **核心基础设施层**
│ ├─ database.py # 双数据源:PgSession()(PostgreSQL)+ MysqlSession()(MySQL)
│ └─ logger.py # Loguru 日志配置(控制台+文件,按天轮转保留30天)
│
├─ ai/ # **AI 能力层**
│ ├─ models.py # LLM 模型枚举(LLM.DOUBAO_SEED_LITE、DEEPSEEK_V4_FLASH 等)
│ ├─ model_config.py # AI 模型场景配置(JobCleanModel、CompanyCleanModel)
│ └─ prompts.py # 各步骤 Prompt 模板(结构化提取、专业匹配、技能提取、公司补充)
│
├─ models/ # **ORM 模型层**
│ ├─ pg/ # PostgreSQL 模型
│ │ └─ app_job_data.py # 爬虫原始数据表(app_job_data)
│ └─ mysql/ # MySQL 模型
│ ├─ job.py # 岗位表(bg_job)
│ ├─ company.py # 公司表(bg_company)
│ ├─ skill_tag.py # 技能标签表(bg_skill_tag)
│ └─ relations.py # 关联表(bg_job_region_relation、bg_job_skill_tag_relation)
│
├─ services/ # **业务逻辑层**
│ ├─ job_clean_service.py # 岗位清洗主逻辑(协程+信号量并发,三次AI调用)
│ ├─ company_clean_service.py # 公司数据补充(协程+信号量并发,AI补充公司信息)
│ ├─ dict_cache_service.py # 字典数据缓存(启动时从MySQL加载岗位分类/行业/专业/地区)
│ ├─ zombie_recover_service.py # 僵尸恢复(PG岗位超时重置 + MySQL公司超时重置)
│ ├─ job_expire_service.py # 岗位下架(create_time超N天的岗位标记失效)
│ └─ ai_tool.py # AI 调用工具封装(异步调用+JSON清洗+解析)
│
└─ scheduler/ # **调度层**
└─ tasks.py # APScheduler 定时任务注册(岗位清洗/公司补充/僵尸恢复/岗位下架)
2️⃣ 各层模块职责
| 层级 | 主要职责 | 关键类/文件 |
|---|---|---|
| config | 统一配置管理,双数据源连接参数、AI供应商、清洗/下架参数 | Settings(pg_、db_、volcengine_、clean_、company_*、job_expire_days) |
| core | 核心基础设施:双数据库连接池、日志 | database.py(PgSession/MysqlSession 函数)、logger.py(loguru) |
| ai | AI 模型管理 + Prompt 模板 | LLM 枚举、JobCleanModel/CompanyCleanModel(场景配置)、prompts.py(4个prompt) |
| models | SQLAlchemy ORM 模型,分 pg/ 和 mysql/ 两个子包 | AppJobData(PG)、Job/Company/SkillTag/Relations(MySQL) |
| services | 业务逻辑实现,全部异步协程 | 岗位清洗、公司补充、字典缓存、僵尸恢复、岗位下架、AI工具 |
| scheduler | APScheduler 定时任务注册和触发 | tasks.py(5个定时任务) |
3️⃣ 技术栈
| 类别 | 技术选型 | 说明 |
|---|---|---|
| 运行时 | Python 3.12 + asyncio | 纯后台定时任务,无 HTTP 服务 |
| ORM | SQLAlchemy 2.0 (asyncio) | 双引擎:asyncpg(PG) + asyncmy(MySQL) |
| AI/LLM | LangChain-OpenAI | 兼容 OpenAI 协议,火山引擎豆包模型 |
| 调度 | APScheduler (AsyncIOScheduler) | 轻量异步定时任务调度 |
| 配置 | Pydantic Settings + python-dotenv | 类型安全的 .env 配置管理 |
| 日志 | Loguru | 控制台+文件日志,按天轮转 |
| ID生成 | snowflake-id | 雪花算法分布式ID |
4️⃣ 双数据源架构
| 数据源 | 用途 | 地址 |
|---|---|---|
| PostgreSQL | 爬虫原始数据(app_job_data),只读写这一张表 | 本地 192.168.31.51:5432 |
| MySQL | 业务库(bg_job、bg_company、字典表等) | 生产 |
- PG:读取待清洗数据 → 更新清洗状态
- MySQL:写入清洗后的业务数据 + 读取字典缓存
5️⃣ 定时任务清单
| 任务 | 频率 | 数据源 | 描述 |
|---|---|---|---|
| 岗位清洗 | 每3分钟 | PG→MySQL | 批量锁定 pending → 协程并发AI清洗 → 写入 bg_job |
| 公司补充 | 每5分钟 | MySQL | 锁定 status=0 → 协程并发AI补充 → 回填 bg_company |
| 岗位僵尸恢复 | 每30分钟 | PG | cleaning 超时10分钟 → 重置 pending |
| 公司僵尸恢复 | 每小时 | MySQL | status=3 超时10分钟 → 重置 0 |
| 岗位下架 | 每天凌晨2点 | MySQL | create_time 超 N 天 → status=2 |
6️⃣ 清洗流程(三次AI调用)
app_job_data (PG, pending)
↓ 批量锁定 → cleaning
↓
第一次AI:结构化提取(岗位名/薪资/学历/分类/地区/公司...)
↓ valid=false → discarded
↓ valid=true → 去重 → 创建公司 → 写入 bg_job
↓
第二次AI:专业匹配(requiredMajorIds + majorSensitivity)
↓ 失败不影响
↓
第三次AI:技能提取(INSERT IGNORE bg_skill_tag → 写关联表)
↓ 失败不影响
↓
更新 PG: clean_status = cleaned
7️⃣ 并发模型
- asyncio 协程 + Semaphore 信号量
- 批量从数据库捞 N 条(batch_size),所有协程同时启动
- 信号量限制同时调 AI 的并发数(concurrency),做完一条立刻补一条
- 公司创建用
asyncio.Lock()防并发重复插入
8️⃣ 状态机
app_job_data.clean_status(PG)
pending → cleaning → cleaned
→ discarded
cleaning 超时 → pending(僵尸恢复)
bg_company.status(MySQL)
0(待完善) → 3(补充中) → 1(已完善)
→ 4(补充失败)
3 超时 → 0(僵尸恢复)
9️⃣ 与其他项目的关系
- 与 back-end(Java)的关系:共享 MySQL 业务库,清洗逻辑从 Java 迁移至此项目
- 与 offerpie_python_ai 的关系:独立部署,AI 封装风格一致(LLM枚举 + model_config),但不共享代码
- 数据流向:爬虫 → PG(app_job_data) → 本项目清洗 → MySQL(bg_job等) → Java/Python AI 服务使用
🔟 构建与运行
- 虚拟环境:
.venv目录管理 - 依赖安装:
pip install -r requirements.txt - 启动:
python -m app.main - 环境切换:通过
.env/.env.prod控制,ENV 环境变量指定 - 部署:本地非公网环境运行,无需 Docker/Nginx