--- inclusion: manual --- # OfferPie Job Cleaner 项目结构说明 ## 1️⃣ 项目整体层次 ``` offerpie_job_cleaner/ │ ├─ .env / .env.prod # 环境变量配置(dev/prod) ├─ requirements.txt # Python 依赖清单 │ └─ app/ # 应用主目录 ├─ main.py # 入口:初始化双数据源 → 加载字典缓存 → 启动调度器 → 等待关闭信号 │ ├─ config/ # **配置层** │ └─ settings.py # Pydantic Settings 统一配置(PG、MySQL、AI供应商、清洗参数、下架参数、日志) │ ├─ core/ # **核心基础设施层** │ ├─ database.py # 双数据源:PgSession()(PostgreSQL)+ MysqlSession()(MySQL) │ └─ logger.py # Loguru 日志配置(控制台+文件,按天轮转保留30天) │ ├─ ai/ # **AI 能力层** │ ├─ models.py # LLM 模型枚举(LLM.DOUBAO_SEED_LITE、DEEPSEEK_V4_FLASH 等) │ ├─ model_config.py # AI 模型场景配置(JobCleanModel、CompanyCleanModel) │ └─ prompts.py # 各步骤 Prompt 模板(结构化提取、专业匹配、技能提取、公司补充) │ ├─ models/ # **ORM 模型层** │ ├─ pg/ # PostgreSQL 模型 │ │ └─ app_job_data.py # 爬虫原始数据表(app_job_data) │ └─ mysql/ # MySQL 模型 │ ├─ job.py # 岗位表(bg_job) │ ├─ company.py # 公司表(bg_company) │ ├─ skill_tag.py # 技能标签表(bg_skill_tag) │ └─ relations.py # 关联表(bg_job_region_relation、bg_job_skill_tag_relation) │ ├─ services/ # **业务逻辑层** │ ├─ job_clean_service.py # 岗位清洗主逻辑(协程+信号量并发,三次AI调用) │ ├─ company_clean_service.py # 公司数据补充(协程+信号量并发,AI补充公司信息) │ ├─ dict_cache_service.py # 字典数据缓存(启动时从MySQL加载岗位分类/行业/专业/地区) │ ├─ zombie_recover_service.py # 僵尸恢复(PG岗位超时重置 + MySQL公司超时重置) │ ├─ job_expire_service.py # 岗位下架(create_time超N天的岗位标记失效) │ └─ ai_tool.py # AI 调用工具封装(异步调用+JSON清洗+解析) │ └─ scheduler/ # **调度层** └─ tasks.py # APScheduler 定时任务注册(岗位清洗/公司补充/僵尸恢复/岗位下架) ``` ## 2️⃣ 各层模块职责 | 层级 | 主要职责 | 关键类/文件 | |------|----------|-------------| | **config** | 统一配置管理,双数据源连接参数、AI供应商、清洗/下架参数 | `Settings`(pg_*、db_*、volcengine_*、clean_*、company_*、job_expire_days) | | **core** | 核心基础设施:双数据库连接池、日志 | `database.py`(PgSession/MysqlSession 函数)、`logger.py`(loguru) | | **ai** | AI 模型管理 + Prompt 模板 | `LLM` 枚举、`JobCleanModel`/`CompanyCleanModel`(场景配置)、`prompts.py`(4个prompt) | | **models** | SQLAlchemy ORM 模型,分 pg/ 和 mysql/ 两个子包 | `AppJobData`(PG)、`Job`/`Company`/`SkillTag`/`Relations`(MySQL) | | **services** | 业务逻辑实现,全部异步协程 | 岗位清洗、公司补充、字典缓存、僵尸恢复、岗位下架、AI工具 | | **scheduler** | APScheduler 定时任务注册和触发 | `tasks.py`(5个定时任务) | ## 3️⃣ 技术栈 | 类别 | 技术选型 | 说明 | |------|----------|------| | **运行时** | Python 3.12 + asyncio | 纯后台定时任务,无 HTTP 服务 | | **ORM** | SQLAlchemy 2.0 (asyncio) | 双引擎:asyncpg(PG) + asyncmy(MySQL) | | **AI/LLM** | LangChain-OpenAI | 兼容 OpenAI 协议,火山引擎豆包模型 | | **调度** | APScheduler (AsyncIOScheduler) | 轻量异步定时任务调度 | | **配置** | Pydantic Settings + python-dotenv | 类型安全的 .env 配置管理 | | **日志** | Loguru | 控制台+文件日志,按天轮转 | | **ID生成** | snowflake-id | 雪花算法分布式ID | ## 4️⃣ 双数据源架构 | 数据源 | 用途 | 地址 | |--------|------|------| | **PostgreSQL** | 爬虫原始数据(app_job_data),只读写这一张表 | 本地 192.168.31.51:5432 | | **MySQL** | 业务库(bg_job、bg_company、字典表等) | 生产 | - PG:读取待清洗数据 → 更新清洗状态 - MySQL:写入清洗后的业务数据 + 读取字典缓存 ## 5️⃣ 定时任务清单 | 任务 | 频率 | 数据源 | 描述 | |------|------|--------|------| | 岗位清洗 | 每3分钟 | PG→MySQL | 批量锁定 pending → 协程并发AI清洗 → 写入 bg_job | | 公司补充 | 每5分钟 | MySQL | 锁定 status=0 → 协程并发AI补充 → 回填 bg_company | | 岗位僵尸恢复 | 每30分钟 | PG | cleaning 超时10分钟 → 重置 pending | | 公司僵尸恢复 | 每小时 | MySQL | status=3 超时10分钟 → 重置 0 | | 岗位下架 | 每天凌晨2点 | MySQL | create_time 超 N 天 → status=2 | ## 6️⃣ 清洗流程(三次AI调用) ``` app_job_data (PG, pending) ↓ 批量锁定 → cleaning ↓ 第一次AI:结构化提取(岗位名/薪资/学历/分类/地区/公司...) ↓ valid=false → discarded ↓ valid=true → 去重 → 创建公司 → 写入 bg_job ↓ 第二次AI:专业匹配(requiredMajorIds + majorSensitivity) ↓ 失败不影响 ↓ 第三次AI:技能提取(INSERT IGNORE bg_skill_tag → 写关联表) ↓ 失败不影响 ↓ 更新 PG: clean_status = cleaned ``` ## 7️⃣ 并发模型 - **asyncio 协程 + Semaphore 信号量** - 批量从数据库捞 N 条(batch_size),所有协程同时启动 - 信号量限制同时调 AI 的并发数(concurrency),做完一条立刻补一条 - 公司创建用 `asyncio.Lock()` 防并发重复插入 ## 8️⃣ 状态机 ### app_job_data.clean_status(PG) ``` pending → cleaning → cleaned → discarded cleaning 超时 → pending(僵尸恢复) ``` ### bg_company.status(MySQL) ``` 0(待完善) → 3(补充中) → 1(已完善) → 4(补充失败) 3 超时 → 0(僵尸恢复) ``` ## 9️⃣ 与其他项目的关系 - **与 back-end(Java)的关系**:共享 MySQL 业务库,清洗逻辑从 Java 迁移至此项目 - **与 offerpie_python_ai 的关系**:独立部署,AI 封装风格一致(LLM枚举 + model_config),但不共享代码 - **数据流向**:爬虫 → PG(app_job_data) → 本项目清洗 → MySQL(bg_job等) → Java/Python AI 服务使用 ## 🔟 构建与运行 - **虚拟环境**:`.venv` 目录管理 - **依赖安装**:`pip install -r requirements.txt` - **启动**:`python -m app.main` - **环境切换**:通过 `.env` / `.env.prod` 控制,ENV 环境变量指定 - **部署**:本地非公网环境运行,无需 Docker/Nginx