commit 30e6a6e2a52037432bba4c7b80e17921d9c59817 Author: zk Date: Tue Jun 2 17:44:03 2026 +0800 初始化 diff --git a/.env.test b/.env.test new file mode 100644 index 0000000..1f7122d --- /dev/null +++ b/.env.test @@ -0,0 +1,36 @@ +ENV=dev + +# PostgreSQL(本地爬虫库) +PG_HOST=192.168.31.51 +PG_PORT=5432 +PG_USER=postgres +PG_PASSWORD=feAeyR0u2fJGSS5ooFdHnSbyHQNY4WlV +PG_DB=postgres + +# MySQL(业务库) +DB_HOST=192.168.31.105 +DB_PORT=3306 +DB_USER=root +DB_PASSWORD=123456 +DB_NAME=offerpie + + +# AI 供应商 +VOLCENGINE_API_KEY=fd065993-bee2-4f31-8bf2-56d5d3012c02 +VOLCENGINE_BASE_URL=https://ark.cn-beijing.volces.com/api/v3 + +# 岗位清洗参数 +CLEAN_BATCH_SIZE=100 +CLEAN_CONCURRENCY=50 +CLEAN_INTERVAL_SECONDS=180 + +# 公司补充参数 +COMPANY_BATCH_SIZE=20 +COMPANY_CONCURRENCY=10 +COMPANY_INTERVAL_SECONDS=300 + +# 岗位下架 +JOB_EXPIRE_DAYS=7 + +# 日志 +LOGGING_LEVEL=INFO diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..27ea602 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +.venv/ +__pycache__/ +*.pyc +.env +.env.prod +logs/ +.idea/ diff --git a/.kiro/steering/code-with-standards.md b/.kiro/steering/code-with-standards.md new file mode 100644 index 0000000..71d291c --- /dev/null +++ b/.kiro/steering/code-with-standards.md @@ -0,0 +1,33 @@ +--- +inclusion: always +--- + +# 项目规范执行指引 + +## 方案讨论前 + +必读 `#[[file:.kiro/steering/项目结构说明.md]]`,全面了解: +- 项目分层结构:`config` → `core` → `ai` → `models` → `services` → `scheduler` +- 双数据源架构(PG 源库 + MySQL 业务库) +- 定时任务清单和清洗流程 +- 并发模型(asyncio 协程 + Semaphore 信号量) +- 与 back-end、offerpie_python_ai 的关系 + +方案讨论时: +- 优先给出简洁的方案思路(涉及哪些模块、新增内容放在哪、核心流程概要),不要一开始就铺开所有细节 +- 用户明确要求时,再给出详细的方案流程 +- 说明与现有模块的关系 + +## 开发方案输出前 / 写代码前 + +必读 `#[[file:.kiro/steering/代码开发风格文档.md]]`,严格遵守: +- 命名约定、类型注解规范 +- 双数据源使用规范(PgSession / MysqlSession) +- 异步规范(Semaphore、Lock、gather) +- AI 调用规范(model_config 引用、ai_tool 封装) +- 日志规范(批次汇总 + 单条关键结果) +- 配置规范(所有可调参数走 settings) + +## 写完代码后 + +涉及新增文件、新增模块或目录结构变更时,必须同步更新 `#[[file:.kiro/steering/项目结构说明.md]]`,保持文档与实际代码一致。 diff --git a/.kiro/steering/代码开发风格文档.md b/.kiro/steering/代码开发风格文档.md new file mode 100644 index 0000000..7aadd2d --- /dev/null +++ b/.kiro/steering/代码开发风格文档.md @@ -0,0 +1,104 @@ +--- +inclusion: manual +--- + +# 代码开发风格文档 + +本项目为 Python 3.12 纯后台定时任务服务,基于 asyncio + SQLAlchemy + APScheduler,应用主目录为 `app/`。 + +## 项目结构 + +- `app/config/` — 配置层:Pydantic Settings 统一配置 +- `app/core/` — 核心基础设施:双数据源连接、日志 +- `app/ai/` — AI 能力层:LLM 模型枚举、场景配置、Prompt 模板 +- `app/models/` — ORM 模型层:分 pg/ 和 mysql/ 两个子包 +- `app/services/` — 业务逻辑层:清洗、补充、恢复、下架等异步服务 +- `app/scheduler/` — 调度层:APScheduler 定时任务注册 + +## 命名约定 + +### 文件命名 +- 全部小写,下划线分隔,如 `job_clean_service.py`、`dict_cache_service.py` +- ORM 模型文件与表名对应(去掉 `bg_` 或 `app_` 前缀),如 `job.py` 对应 `bg_job` + +### 类命名 +- ORM 模型用 PascalCase 业务名,无后缀,如 `Job`、`Company`、`AppJobData` +- 字典缓存用 `DictCacheService` +- 枚举类以大写命名,如 `LLM` +- 场景配置类以 `Model` 结尾,如 `JobCleanModel`、`CompanyCleanModel` + +### 变量与函数命名 +- 函数和变量使用 snake_case,如 `run_job_clean`、`data_id` +- 私有函数以单下划线开头,如 `_do_clean`、`_build_user_message` +- 常量使用全大写下划线,如 `JOB_STRUCTURE_SYSTEM` +- 全局单例小写,如 `dict_cache`、`_id_gen` + +## 类型注解 + +- 所有函数参数和返回值必须有类型注解 +- ORM 模型字段使用 `Mapped[T]` + `mapped_column()` 声明 +- 可选字段使用 `Optional[T]` 或 `T | None` +- 集合类型使用 `list[T]`、`dict[K, V]`(Python 3.12 内置泛型) + +## 注释规范 + +- 模块级注释使用文件顶部的 docstring,说明模块用途 +- 函数注释使用 docstring,简洁描述功能 +- 复杂逻辑用行内注释 `#` 说明 +- ORM 模型字段通过 `comment` 参数说明含义 + +## 数据库使用规范 + +### 双数据源 +- PostgreSQL 会话通过 `PgSession()` 获取,用于 `app_job_data` 表操作 +- MySQL 会话通过 `MysqlSession()` 获取,用于业务表操作 +- 每次操作用 `async with XxxSession() as session` 获取会话,手动 `await session.commit()` + +### SQL 风格 +- 简单操作使用 `sqlalchemy.text()` 原生 SQL,保持直观 +- 批量插入使用 `insert(Model).values(...)` 或 `insert(Model), [dict...]` +- 锁定使用 `FOR UPDATE SKIP LOCKED`(PG)防阻塞 + +### 不做分布式事务 +- PG 和 MySQL 独立操作,不跨库事务 +- 失败靠僵尸恢复机制重试 + +## 异步规范 + +- 所有数据库操作、AI 调用使用 `async/await` +- 并发控制使用 `asyncio.Semaphore` +- 互斥操作使用 `asyncio.Lock()`(如公司查找或创建) +- 批量任务使用 `asyncio.gather(*tasks, return_exceptions=True)` + +## AI 调用规范 + +- 业务代码从 `app.ai.model_config` 引用场景配置类,不直接使用 `LLM` 枚举 +- AI 调用统一通过 `app.services.ai_tool.ai_chat_json()` 封装(异步调用 + JSON 清洗 + 解析) +- AI 调用失败不影响主流程(第二次/第三次 AI 调用独立 try-catch) +- 修改模型或参数只需改 `model_config.py` + +## 日志规范 + +- 使用 `from app.core.logger import log` +- 批次任务:记录锁定数量和完成汇总 +- 单条处理:记录关键结果(入库成功/丢弃/跳过),格式 `[id=xxx] 描述` +- 异常:`log.error` 记录异常详情 +- 非关键失败:`log.warning` 记录但不中断 + +## 配置规范 + +- 所有可调参数放在 `settings.py`,通过 `.env` 文件覆盖 +- 运行时参数(batch_size、concurrency、interval、expire_days)必须可配置 +- 数据库密码通过 `urllib.parse.quote` 编码后拼入 URL + +## 代码格式规范 + +### 紧凑风格 +- 避免过度换行,保持代码紧凑易读 +- f-string 拼接优先写在一行 +- 方法参数列表较多时,可适当换行但保持紧凑 + +### 模块组织 +- 每个 service 文件对外暴露一个 `run_xxx()` 异步函数作为入口 +- 内部逻辑拆分为 `_xxx` 私有函数 +- import 按标准库 → 第三方库 → 项目内部 顺序排列 diff --git a/.kiro/steering/项目结构说明.md b/.kiro/steering/项目结构说明.md new file mode 100644 index 0000000..65afb28 --- /dev/null +++ b/.kiro/steering/项目结构说明.md @@ -0,0 +1,139 @@ +--- +inclusion: manual +--- + +# OfferPie Job Cleaner 项目结构说明 + +## 1️⃣ 项目整体层次 +``` +offerpie_job_cleaner/ +│ +├─ .env / .env.prod # 环境变量配置(dev/prod) +├─ requirements.txt # Python 依赖清单 +│ +└─ app/ # 应用主目录 + ├─ main.py # 入口:初始化双数据源 → 加载字典缓存 → 启动调度器 → 等待关闭信号 + │ + ├─ config/ # **配置层** + │ └─ settings.py # Pydantic Settings 统一配置(PG、MySQL、AI供应商、清洗参数、下架参数、日志) + │ + ├─ core/ # **核心基础设施层** + │ ├─ database.py # 双数据源:PgSession()(PostgreSQL)+ MysqlSession()(MySQL) + │ └─ logger.py # Loguru 日志配置(控制台+文件,按天轮转保留30天) + │ + ├─ ai/ # **AI 能力层** + │ ├─ models.py # LLM 模型枚举(LLM.DOUBAO_SEED_LITE、DEEPSEEK_V4_FLASH 等) + │ ├─ model_config.py # AI 模型场景配置(JobCleanModel、CompanyCleanModel) + │ └─ prompts.py # 各步骤 Prompt 模板(结构化提取、专业匹配、技能提取、公司补充) + │ + ├─ models/ # **ORM 模型层** + │ ├─ pg/ # PostgreSQL 模型 + │ │ └─ app_job_data.py # 爬虫原始数据表(app_job_data) + │ └─ mysql/ # MySQL 模型 + │ ├─ job.py # 岗位表(bg_job) + │ ├─ company.py # 公司表(bg_company) + │ ├─ skill_tag.py # 技能标签表(bg_skill_tag) + │ └─ relations.py # 关联表(bg_job_region_relation、bg_job_skill_tag_relation) + │ + ├─ services/ # **业务逻辑层** + │ ├─ job_clean_service.py # 岗位清洗主逻辑(协程+信号量并发,三次AI调用) + │ ├─ company_clean_service.py # 公司数据补充(协程+信号量并发,AI补充公司信息) + │ ├─ dict_cache_service.py # 字典数据缓存(启动时从MySQL加载岗位分类/行业/专业/地区) + │ ├─ zombie_recover_service.py # 僵尸恢复(PG岗位超时重置 + MySQL公司超时重置) + │ ├─ job_expire_service.py # 岗位下架(create_time超N天的岗位标记失效) + │ └─ ai_tool.py # AI 调用工具封装(异步调用+JSON清洗+解析) + │ + └─ scheduler/ # **调度层** + └─ tasks.py # APScheduler 定时任务注册(岗位清洗/公司补充/僵尸恢复/岗位下架) +``` + +## 2️⃣ 各层模块职责 +| 层级 | 主要职责 | 关键类/文件 | +|------|----------|-------------| +| **config** | 统一配置管理,双数据源连接参数、AI供应商、清洗/下架参数 | `Settings`(pg_*、db_*、volcengine_*、clean_*、company_*、job_expire_days) | +| **core** | 核心基础设施:双数据库连接池、日志 | `database.py`(PgSession/MysqlSession 函数)、`logger.py`(loguru) | +| **ai** | AI 模型管理 + Prompt 模板 | `LLM` 枚举、`JobCleanModel`/`CompanyCleanModel`(场景配置)、`prompts.py`(4个prompt) | +| **models** | SQLAlchemy ORM 模型,分 pg/ 和 mysql/ 两个子包 | `AppJobData`(PG)、`Job`/`Company`/`SkillTag`/`Relations`(MySQL) | +| **services** | 业务逻辑实现,全部异步协程 | 岗位清洗、公司补充、字典缓存、僵尸恢复、岗位下架、AI工具 | +| **scheduler** | APScheduler 定时任务注册和触发 | `tasks.py`(5个定时任务) | + +## 3️⃣ 技术栈 +| 类别 | 技术选型 | 说明 | +|------|----------|------| +| **运行时** | Python 3.12 + asyncio | 纯后台定时任务,无 HTTP 服务 | +| **ORM** | SQLAlchemy 2.0 (asyncio) | 双引擎:asyncpg(PG) + asyncmy(MySQL) | +| **AI/LLM** | LangChain-OpenAI | 兼容 OpenAI 协议,火山引擎豆包模型 | +| **调度** | APScheduler (AsyncIOScheduler) | 轻量异步定时任务调度 | +| **配置** | Pydantic Settings + python-dotenv | 类型安全的 .env 配置管理 | +| **日志** | Loguru | 控制台+文件日志,按天轮转 | +| **ID生成** | snowflake-id | 雪花算法分布式ID | + +## 4️⃣ 双数据源架构 +| 数据源 | 用途 | 地址 | +|--------|------|------| +| **PostgreSQL** | 爬虫原始数据(app_job_data),只读写这一张表 | 本地 192.168.31.51:5432 | +| **MySQL** | 业务库(bg_job、bg_company、字典表等) | 生产 | + +- PG:读取待清洗数据 → 更新清洗状态 +- MySQL:写入清洗后的业务数据 + 读取字典缓存 + +## 5️⃣ 定时任务清单 +| 任务 | 频率 | 数据源 | 描述 | +|------|------|--------|------| +| 岗位清洗 | 每3分钟 | PG→MySQL | 批量锁定 pending → 协程并发AI清洗 → 写入 bg_job | +| 公司补充 | 每5分钟 | MySQL | 锁定 status=0 → 协程并发AI补充 → 回填 bg_company | +| 岗位僵尸恢复 | 每30分钟 | PG | cleaning 超时10分钟 → 重置 pending | +| 公司僵尸恢复 | 每小时 | MySQL | status=3 超时10分钟 → 重置 0 | +| 岗位下架 | 每天凌晨2点 | MySQL | create_time 超 N 天 → status=2 | + +## 6️⃣ 清洗流程(三次AI调用) +``` +app_job_data (PG, pending) + ↓ 批量锁定 → cleaning + ↓ +第一次AI:结构化提取(岗位名/薪资/学历/分类/地区/公司...) + ↓ valid=false → discarded + ↓ valid=true → 去重 → 创建公司 → 写入 bg_job + ↓ +第二次AI:专业匹配(requiredMajorIds + majorSensitivity) + ↓ 失败不影响 + ↓ +第三次AI:技能提取(INSERT IGNORE bg_skill_tag → 写关联表) + ↓ 失败不影响 + ↓ +更新 PG: clean_status = cleaned +``` + +## 7️⃣ 并发模型 +- **asyncio 协程 + Semaphore 信号量** +- 批量从数据库捞 N 条(batch_size),所有协程同时启动 +- 信号量限制同时调 AI 的并发数(concurrency),做完一条立刻补一条 +- 公司创建用 `asyncio.Lock()` 防并发重复插入 + +## 8️⃣ 状态机 + +### app_job_data.clean_status(PG) +``` +pending → cleaning → cleaned + → discarded +cleaning 超时 → pending(僵尸恢复) +``` + +### bg_company.status(MySQL) +``` +0(待完善) → 3(补充中) → 1(已完善) + → 4(补充失败) +3 超时 → 0(僵尸恢复) +``` + +## 9️⃣ 与其他项目的关系 +- **与 back-end(Java)的关系**:共享 MySQL 业务库,清洗逻辑从 Java 迁移至此项目 +- **与 offerpie_python_ai 的关系**:独立部署,AI 封装风格一致(LLM枚举 + model_config),但不共享代码 +- **数据流向**:爬虫 → PG(app_job_data) → 本项目清洗 → MySQL(bg_job等) → Java/Python AI 服务使用 + +## 🔟 构建与运行 +- **虚拟环境**:`.venv` 目录管理 +- **依赖安装**:`pip install -r requirements.txt` +- **启动**:`python -m app.main` +- **环境切换**:通过 `.env` / `.env.prod` 控制,ENV 环境变量指定 +- **部署**:本地非公网环境运行,无需 Docker/Nginx diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/ai/__init__.py b/app/ai/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/ai/model_config.py b/app/ai/model_config.py new file mode 100644 index 0000000..03082f5 --- /dev/null +++ b/app/ai/model_config.py @@ -0,0 +1,22 @@ +"""AI 模型场景配置 + +集中管理清洗服务的模型选择与参数,修改模型只需改此文件。 +""" + +from app.ai.models import LLM + + +class JobCleanModel: + """岗位清洗模块""" + # 第一次AI:结构化提取岗位信息 + STRUCTURE = LLM.DOUBAO_SEED_LITE.create(temperature=0) + # 第二次AI:专业匹配 + MAJOR_MATCH = LLM.DOUBAO_SEED_LITE.create(temperature=0) + # 第三次AI:技能提取 + SKILL_EXTRACT = LLM.DOUBAO_SEED_LITE.create(temperature=0) + + +class CompanyCleanModel: + """公司补充模块""" + # 公司信息补充 + ENRICH = LLM.DOUBAO_SEED_LITE.create(temperature=0) diff --git a/app/ai/models.py b/app/ai/models.py new file mode 100644 index 0000000..4cb6ca9 --- /dev/null +++ b/app/ai/models.py @@ -0,0 +1,41 @@ +"""LLM 模型枚举与实例获取 + +Usage: + from app.ai.models import LLM + + llm = LLM.DOUBAO_SEED_LITE.create(temperature=0) +""" + +from enum import Enum + +from langchain_openai import ChatOpenAI + +from app.config import settings + +# 供应商连接配置 +_VOLCENGINE = (lambda: settings.volcengine_api_key, lambda: settings.volcengine_base_url) + + +class LLM(Enum): + """所有可用模型,每个枚举值 = (模型名, api_key函数, base_url函数)""" + + # 火山引擎 + DOUBAO_PRO_32K = ("doubao-1-5-pro-32k-250115", *_VOLCENGINE) + DOUBAO_LITE_32K = ("doubao-1-5-lite-32k-250115", *_VOLCENGINE) + DOUBAO_SEED_LITE = ("doubao-seed-2-0-lite-260215", *_VOLCENGINE) + DOUBAO_SEED_PRO = ("doubao-seed-2-0-pro-260215", *_VOLCENGINE) + DEEPSEEK_V4_FLASH = ("deepseek-v4-flash-260425", *_VOLCENGINE) + + def __init__(self, model_name: str, api_key_fn, base_url_fn): + self.model_name = model_name + self._api_key_fn = api_key_fn + self._base_url_fn = base_url_fn + + def create(self, **kwargs) -> ChatOpenAI: + """创建 LLM 实例,kwargs 透传给 ChatOpenAI(temperature, max_tokens 等)""" + return ChatOpenAI( + model=self.model_name, + api_key=self._api_key_fn(), + base_url=self._base_url_fn(), + **kwargs, + ) diff --git a/app/ai/prompts.py b/app/ai/prompts.py new file mode 100644 index 0000000..5331c83 --- /dev/null +++ b/app/ai/prompts.py @@ -0,0 +1,100 @@ +"""各步骤 Prompt 模板""" + +# ──────────── 第一次AI:岗位结构化提取 ──────────── +JOB_STRUCTURE_SYSTEM = """你是一个岗位数据清洗助手。请根据提供的原始岗位数据,提取并结构化为JSON格式。 + +返回JSON格式要求: +{ + "valid": true/false, + "title": "岗位名称", + "salary": "标准化薪资,如10-20K、面议,无效则null", + "education": 0-4的数字(0=不限 1=大专 2=本科 3=硕士 4=博士), + "minExperience": 最低工作年限数字(不要求则0), + "employmentType": 0或1(0=全职 1=兼职,默认0), + "categoryId": 岗位分类ID(必选,从分类列表中选最接近的), + "requiredIndustryId": 行业ID(仅当明确提到行业经验要求时设置,列表中无完全匹配则选最相似的,未提到则null), + "description": "岗位职责,保持原文风格,格式化展示", + "requirement": "任职要求,保持原文风格,格式化展示", + "bonus": "加分项,无则null", + "tags": ["核心职能标签,最多5个,如数据分析、产品策略"], + "skillTags": ["技能关键词,最多8个,如Java、Spring Boot"], + "companyShortName": "简洁的公司简称,如字节跳动、中国平安", + "cities": ["工作城市列表,精确到市"] +} + +规则: +1. description/requirement/bonus 均从原始的 description+experience+education 内容中提取,保持原文风格 +2. 岗位标题不存在时,从描述中归纳生成 +3. 薪资标准化为 10-20K、20K、面议 等格式,无效或空则null +4. categoryId 必须从分类列表中选一个,不允许为null +5. requiredIndustryId 仅当描述中明确提到行业经验要求时设置 +6. tags 是核心职能标签(如数据分析、团队协作),最多5个 +7. skillTags 是技能关键词(如Java、MySQL),最多8个 +8. companyShortName 去掉地区后缀、招聘后缀、括号内容,保持简洁 +9. 字符串值中不允许出现Tab、换行等控制字符,用空格或中文标点替代 +10. 只返回JSON,不要其他内容""" + + +# ──────────── 第二次AI:专业匹配 ──────────── +MAJOR_MATCH_SYSTEM = """你是一个岗位专业匹配助手。根据岗位信息,判断该岗位对专业的要求。 +返回JSON格式: +{ + "requiredMajorIds": [专业ID数组,从专业列表中选择最相关的,最多3个,无明确要求则空数组], + "majorSensitivity": 0-2的数字(0=专业不限 1=优先相关专业 2=强制要求专业) +} +规则: +1. 只能从给定专业列表中选择ID +2. 根据岗位描述判断专业敏感度:明确写"XX专业"→2,写"相关专业优先"→1,未提及→0 +3. majorSensitivity为0时,requiredMajorIds应为空数组 +4. 只返回JSON,不要其他内容""" + + +# ──────────── 第三次AI:技能提取 ──────────── +SKILL_EXTRACT_SYSTEM = """你是一个技能提取助手。根据岗位信息,提取该岗位要求的核心专业能力和工具技能。 +返回JSON数组格式,如:["java", "spring boot", "mysql", "redis"] +规则: +1. 统一使用小写字母 +2. 只保留核心词,去掉多余修饰(如"plc编程"→"plc","c语言"→"c","cad制图"→"cad") +3. 同一技能只保留最具体的表述,不要同时出现上位词和下位词(如有"机械设计"就不要再出"机械") +4. 提取范围包括:技术栈、专业领域知识、行业工具、专业资质能力等 +5. 不提取纯软技能(如沟通能力、团队协作、学习能力、积极主动) +6. 不提取过于宽泛的标签(如"办公软件"、"windows") +7. 如果岗位完全没有专业能力要求(纯看态度和素质),返回空数组 [] +8. 最多15个,按重要性排序 +9. 只返回JSON数组,不要其他内容""" + + +# ──────────── 公司补充 ──────────── +COMPANY_ENRICH_SYSTEM = """你是一个企业信息补充助手。根据提供的公司简称,补充该公司的详细信息。 + +返回JSON格式要求: +{ + "valid": true/false, + "name": "公司全称", + "city": "总部所在城市,精确到市", + "companyType": "企业类型", + "industryId": 行业ID, + "tags": ["公司标签,最多5个"], + "summary": "一句话简介,100字以内", + "description": "公司详细描述,500字以内", + "foundedYear": "成立年份", + "address": "总部/注册地址", + "scale": "企业规模", + "website": "官网地址", + "financingStage": "融资状态", + "latestValuation": "最新估值", + "news": ["相关新闻,最多3条,每条50字以内"] +} + +规则: +1. 如果不认识该公司,返回 {"valid": false} +2. name 根据公司简称推断完整的企业注册名称 +3. companyType 取值:上市企业、独角兽、国企、央企、民营企业、外资企业、合资企业、事业单位、其他 +4. industryId 必须从给定行业列表中选择,不确定则null +5. scale 取值:少于50人、50-150人、150-500人、500-1000人、1000-5000人、5000-10000人、10000人以上 +6. tags 体现公司核心业务特征,最多5个 +7. news 基于你的知识提供该公司最新的3条相关新闻,每条50字以内 +8. latestValuation 知道就提供,不知道则null +9. 不确定的字段返回null,不要编造 +10. 字符串值中不允许出现Tab、换行等控制字符 +11. 只返回JSON,不要其他内容""" diff --git a/app/config/__init__.py b/app/config/__init__.py new file mode 100644 index 0000000..900fc27 --- /dev/null +++ b/app/config/__init__.py @@ -0,0 +1,18 @@ +import os +from pathlib import Path + +from .settings import Settings + +_env = os.getenv("ENV", "dev") +_env_files = {"dev": ".env", "test": ".env.test", "prod": ".env.prod"} + +# 定位项目根目录(config 上两级) +_project_root = Path(__file__).resolve().parent.parent.parent +_env_file = _project_root / _env_files.get(_env, ".env") + +if not _env_file.exists(): + raise FileNotFoundError(f".env 文件不存在: {_env_file}") + +settings = Settings(_env_file=str(_env_file)) + +__all__ = ["settings"] diff --git a/app/config/settings.py b/app/config/settings.py new file mode 100644 index 0000000..de52db9 --- /dev/null +++ b/app/config/settings.py @@ -0,0 +1,70 @@ +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + """项目配置,通过 .env 文件覆盖默认值""" + + # 环境 + env: str = "dev" + + # ──────────── PostgreSQL(本地,app_job_data 源库)──────────── + pg_host: str = "192.168.31.51" + pg_port: int = 5432 + pg_user: str = "postgres" + pg_password: str = "" + pg_db: str = "postgres" + pg_pool_size: int = 10 + pg_max_overflow: int = 20 + + # ──────────── MySQL(业务库)──────────── + db_host: str = "192.168.31.105" + db_port: int = 3306 + db_user: str = "root" + db_password: str = "123456" + db_name: str = "offerpie" + mysql_pool_size: int = 10 + mysql_max_overflow: int = 20 + + # ──────────── AI 供应商 ──────────── + volcengine_api_key: str = "fd065993-bee2-4f31-8bf2-56d5d3012c02" + volcengine_base_url: str = "https://ark.cn-beijing.volces.com/api/v3" + + # ──────────── 岗位清洗参数 ──────────── + clean_batch_size: int = 100 + clean_concurrency: int = 50 + clean_interval_seconds: int = 180 + + # ──────────── 公司补充参数 ──────────── + company_batch_size: int = 20 + company_concurrency: int = 10 + company_interval_seconds: int = 300 + + # ──────────── 岗位下架参数 ──────────── + job_expire_days: int = 7 + + # ──────────── 日志 ──────────── + logging_level: str = "INFO" + log_file_name: str = "cleaner.log" + + @property + def pg_url(self) -> str: + from urllib.parse import quote + return ( + f"postgresql+asyncpg://{self.pg_user}:{quote(self.pg_password, safe='')}" + f"@{self.pg_host}:{self.pg_port}/{self.pg_db}" + ) + + @property + def mysql_url(self) -> str: + from urllib.parse import quote + return ( + f"mysql+asyncmy://{self.db_user}:{quote(self.db_password, safe='')}" + f"@{self.db_host}:{self.db_port}/{self.db_name}" + ) + + model_config = SettingsConfigDict( + env_file=".env", + env_file_encoding="utf-8", + case_sensitive=False, + extra="ignore", + ) diff --git a/app/core/__init__.py b/app/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/core/database.py b/app/core/database.py new file mode 100644 index 0000000..45b0331 --- /dev/null +++ b/app/core/database.py @@ -0,0 +1,79 @@ +"""双数据源:PostgreSQL(源库) + MySQL(业务库)""" + +from typing import Optional + +from sqlalchemy.ext.asyncio import ( + AsyncEngine, + AsyncSession, + async_sessionmaker, + create_async_engine, +) +from sqlalchemy.orm import DeclarativeBase + +from app.config import settings +from app.core.logger import log + +# ──────────── 内部变量 ──────────── +_pg_engine: Optional[AsyncEngine] = None +_pg_session_factory: Optional[async_sessionmaker[AsyncSession]] = None + +_mysql_engine: Optional[AsyncEngine] = None +_mysql_session_factory: Optional[async_sessionmaker[AsyncSession]] = None + + +class PgBase(DeclarativeBase): + """PostgreSQL ORM 声明基类""" + pass + + +class MysqlBase(DeclarativeBase): + """MySQL ORM 声明基类""" + pass + + +async def init_db() -> None: + """初始化双数据源""" + global _pg_engine, _pg_session_factory, _mysql_engine, _mysql_session_factory + + _pg_engine = create_async_engine( + settings.pg_url, + pool_size=settings.pg_pool_size, + max_overflow=settings.pg_max_overflow, + pool_recycle=3600, + echo=False, + ) + _pg_session_factory = async_sessionmaker(_pg_engine, expire_on_commit=False) + + _mysql_engine = create_async_engine( + settings.mysql_url, + pool_size=settings.mysql_pool_size, + max_overflow=settings.mysql_max_overflow, + pool_recycle=3600, + echo=False, + ) + _mysql_session_factory = async_sessionmaker(_mysql_engine, expire_on_commit=False) + + log.info("双数据源初始化完成: PG={}, MySQL={}", settings.pg_host, settings.db_host) + + +async def close_db() -> None: + """关闭双数据源""" + if _pg_engine: + await _pg_engine.dispose() + if _mysql_engine: + await _mysql_engine.dispose() + log.info("双数据源已关闭") + + +def PgSession() -> AsyncSession: + """获取 PostgreSQL 异步会话(用作 async with PgSession() as session)""" + if _pg_session_factory is None: + raise RuntimeError("数据库未初始化,请先调用 init_db()") + return _pg_session_factory() + + +def MysqlSession() -> AsyncSession: + """获取 MySQL 异步会话(用作 async with MysqlSession() as session)""" + if _mysql_session_factory is None: + raise RuntimeError("数据库未初始化,请先调用 init_db()") + return _mysql_session_factory() diff --git a/app/core/logger.py b/app/core/logger.py new file mode 100644 index 0000000..17d913e --- /dev/null +++ b/app/core/logger.py @@ -0,0 +1,34 @@ +"""日志配置""" + +import sys +from pathlib import Path + +from loguru import logger + +from app.config import settings + +# 日志目录 +_log_dir = Path("logs") +_log_dir.mkdir(exist_ok=True) + +# 移除默认 handler +logger.remove() + +# 控制台输出 +logger.add( + sys.stdout, + level=settings.logging_level, + format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{line} - {message}", +) + +# 文件输出(按天轮转,保留30天) +logger.add( + _log_dir / settings.log_file_name, + level=settings.logging_level, + rotation="00:00", + retention="30 days", + encoding="utf-8", + format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{line} - {message}", +) + +log = logger diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..c4382b2 --- /dev/null +++ b/app/main.py @@ -0,0 +1,64 @@ +"""项目入口:初始化数据源、加载字典、启动调度器""" + +import asyncio +import signal +import warnings +from datetime import datetime + +from app.core.logger import log + +# 屏蔽 asyncmy INSERT IGNORE 产生的 Duplicate entry warnings +warnings.filterwarnings("ignore", message=".*Duplicate entry.*") +from app.core.database import init_db, close_db +from app.services.dict_cache_service import dict_cache +from app.scheduler.tasks import create_scheduler + + +async def main(): + log.info("=" * 50) + log.info("OfferPie Job Cleaner 启动中...") + log.info("=" * 50) + + # 初始化双数据源 + await init_db() + + # 加载字典缓存 + await dict_cache.refresh() + + # 创建并启动调度器 + scheduler = create_scheduler() + scheduler.start() + + # 立即触发一次岗位清洗和公司补充 + scheduler.modify_job("job_clean", next_run_time=datetime.now()) + scheduler.modify_job("company_clean", next_run_time=datetime.now()) + + log.info("调度器已启动,所有定时任务已注册") + + # 优雅关闭 + stop_event = asyncio.Event() + + def _shutdown(*args): + log.info("收到关闭信号,正在关闭...") + stop_event.set() + + loop = asyncio.get_running_loop() + # Unix: SIGINT + SIGTERM,Windows: 仅靠 KeyboardInterrupt + for sig in (signal.SIGINT, signal.SIGTERM): + try: + loop.add_signal_handler(sig, _shutdown) + except (NotImplementedError, ValueError): + pass + + try: + await stop_event.wait() + except KeyboardInterrupt: + pass + finally: + scheduler.shutdown(wait=False) + await close_db() + log.info("OfferPie Job Cleaner 已关闭") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/app/models/__init__.py b/app/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/models/mysql/__init__.py b/app/models/mysql/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/models/mysql/company.py b/app/models/mysql/company.py new file mode 100644 index 0000000..4bc7d3a --- /dev/null +++ b/app/models/mysql/company.py @@ -0,0 +1,36 @@ +"""MySQL: bg_company 表模型""" + +from datetime import datetime +from typing import Optional + +from sqlalchemy import BigInteger, DateTime, Integer, JSON, String, Text +from sqlalchemy.orm import Mapped, mapped_column + +from app.core.database import MysqlBase + + +class Company(MysqlBase): + """公司表""" + + __tablename__ = "bg_company" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + name: Mapped[Optional[str]] = mapped_column(String(255)) + short_name: Mapped[str] = mapped_column(String(128), nullable=False) + logo_url: Mapped[Optional[str]] = mapped_column(String(512)) + region_code: Mapped[Optional[str]] = mapped_column(String(20)) + company_type: Mapped[Optional[str]] = mapped_column(String(32)) + industry_id: Mapped[Optional[int]] = mapped_column(BigInteger) + tags: Mapped[Optional[list]] = mapped_column(JSON) + summary: Mapped[Optional[str]] = mapped_column(String(512)) + description: Mapped[Optional[str]] = mapped_column(Text) + founded_year: Mapped[Optional[str]] = mapped_column(String(10)) + address: Mapped[Optional[str]] = mapped_column(String(255)) + scale: Mapped[Optional[str]] = mapped_column(String(32)) + website: Mapped[Optional[str]] = mapped_column(String(255)) + financing_stage: Mapped[Optional[str]] = mapped_column(String(32)) + latest_valuation: Mapped[Optional[str]] = mapped_column(String(64)) + news: Mapped[Optional[list]] = mapped_column(JSON) + status: Mapped[int] = mapped_column(Integer, default=0, comment="0=待完善 1=已完善 2=禁用 3=补充中 4=补充失败") + create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) + update_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) diff --git a/app/models/mysql/job.py b/app/models/mysql/job.py new file mode 100644 index 0000000..a12682a --- /dev/null +++ b/app/models/mysql/job.py @@ -0,0 +1,39 @@ +"""MySQL: bg_job 表模型""" + +from datetime import datetime +from typing import Optional + +from sqlalchemy import BigInteger, DateTime, Integer, JSON, String, Text +from sqlalchemy.orm import Mapped, mapped_column + +from app.core.database import MysqlBase + + +class Job(MysqlBase): + """岗位表""" + + __tablename__ = "bg_job" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + title: Mapped[str] = mapped_column(String(255), nullable=False) + company_id: Mapped[int] = mapped_column(BigInteger, nullable=False) + category_id: Mapped[int] = mapped_column(BigInteger, nullable=False) + employment_type: Mapped[int] = mapped_column(Integer, default=0) + description: Mapped[Optional[str]] = mapped_column(Text) + requirement: Mapped[Optional[str]] = mapped_column(Text) + bonus: Mapped[Optional[str]] = mapped_column(Text) + tags: Mapped[Optional[list]] = mapped_column(JSON) + skill_tags: Mapped[Optional[list]] = mapped_column(JSON) + salary: Mapped[Optional[str]] = mapped_column(String(64)) + education: Mapped[int] = mapped_column(Integer, default=0) + min_experience: Mapped[int] = mapped_column(Integer, default=0) + required_industry_id: Mapped[Optional[int]] = mapped_column(BigInteger) + required_major_ids: Mapped[Optional[list]] = mapped_column(JSON) + major_sensitivity: Mapped[Optional[int]] = mapped_column(Integer) + source_url: Mapped[Optional[str]] = mapped_column(String(1024)) + source_id: Mapped[Optional[str]] = mapped_column(String(64)) + recruit_category: Mapped[Optional[int]] = mapped_column(Integer, comment="招聘分类: 0=校招, 1=实习, 2=社招, 3=其他") + expire_at: Mapped[Optional[datetime]] = mapped_column(DateTime, comment="发布日期") + status: Mapped[int] = mapped_column(Integer, default=0, comment="0=上架 1=下架 2=已失效") + create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) + update_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) diff --git a/app/models/mysql/relations.py b/app/models/mysql/relations.py new file mode 100644 index 0000000..ee70376 --- /dev/null +++ b/app/models/mysql/relations.py @@ -0,0 +1,30 @@ +"""MySQL: 关联表模型""" + +from datetime import datetime + +from sqlalchemy import BigInteger, DateTime, String +from sqlalchemy.orm import Mapped, mapped_column + +from app.core.database import MysqlBase + + +class JobRegionRelation(MysqlBase): + """岗位-地区关联表""" + + __tablename__ = "bg_job_region_relation" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + job_id: Mapped[int] = mapped_column(BigInteger, nullable=False) + region_code: Mapped[str] = mapped_column(String(20), nullable=False) + create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) + + +class JobSkillTagRelation(MysqlBase): + """岗位-技能标签关联表""" + + __tablename__ = "bg_job_skill_tag_relation" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + job_id: Mapped[int] = mapped_column(BigInteger, nullable=False) + skill_tag_id: Mapped[int] = mapped_column(BigInteger, nullable=False) + create_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) diff --git a/app/models/mysql/skill_tag.py b/app/models/mysql/skill_tag.py new file mode 100644 index 0000000..e612a41 --- /dev/null +++ b/app/models/mysql/skill_tag.py @@ -0,0 +1,15 @@ +"""MySQL: bg_skill_tag 表模型""" + +from sqlalchemy import BigInteger, String +from sqlalchemy.orm import Mapped, mapped_column + +from app.core.database import MysqlBase + + +class SkillTag(MysqlBase): + """技能标签表""" + + __tablename__ = "bg_skill_tag" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False) diff --git a/app/models/pg/__init__.py b/app/models/pg/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/models/pg/app_job_data.py b/app/models/pg/app_job_data.py new file mode 100644 index 0000000..7ffd87c --- /dev/null +++ b/app/models/pg/app_job_data.py @@ -0,0 +1,35 @@ +"""PostgreSQL: app_job_data 表模型""" + +from datetime import datetime +from typing import Optional + +from sqlalchemy import BigInteger, DateTime, Integer, SmallInteger, String, Text +from sqlalchemy.orm import Mapped, mapped_column + +from app.core.database import PgBase + + +class AppJobData(PgBase): + """爬虫岗位原始数据""" + + __tablename__ = "app_job_data" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + urllistid: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="关联urllistid") + job_title: Mapped[Optional[str]] = mapped_column(String(255)) + salary: Mapped[Optional[str]] = mapped_column(String(128)) + location: Mapped[Optional[str]] = mapped_column(String(2048)) + company: Mapped[Optional[str]] = mapped_column(String(255), comment="公司名字") + experience: Mapped[Optional[str]] = mapped_column(String(64)) + education: Mapped[Optional[str]] = mapped_column(String(64)) + description: Mapped[str] = mapped_column(Text, nullable=False) + detail_url: Mapped[str] = mapped_column(String(1024), nullable=False) + recruit_category: Mapped[int] = mapped_column(SmallInteger, default=3, nullable=False, comment="招聘分类: 0=校招, 1=实习, 2=社招, 3=其他") + content_hash: Mapped[str] = mapped_column(String(64), nullable=False) + sources: Mapped[int] = mapped_column(SmallInteger, default=0, nullable=False) + expire_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, comment="发布日期") + created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False) + updated_at: Mapped[datetime] = mapped_column(DateTime, nullable=False) + clean_status: Mapped[str] = mapped_column(String(20), default="pending", nullable=False, comment="pending/cleaning/cleaned/discarded") + clean_started_at: Mapped[Optional[datetime]] = mapped_column(DateTime) + cleaned_at: Mapped[Optional[datetime]] = mapped_column(DateTime) diff --git a/app/scheduler/__init__.py b/app/scheduler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/scheduler/tasks.py b/app/scheduler/tasks.py new file mode 100644 index 0000000..621da1e --- /dev/null +++ b/app/scheduler/tasks.py @@ -0,0 +1,105 @@ +"""定时任务注册""" + +from datetime import datetime, timedelta + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.triggers.cron import CronTrigger + +from app.config import settings +from app.core.logger import log + + +def create_scheduler() -> AsyncIOScheduler: + """创建并注册所有定时任务""" + scheduler = AsyncIOScheduler( + timezone="Asia/Shanghai", + job_defaults={"misfire_grace_time": 60}, + ) + + # 岗位清洗(每 N 秒) + scheduler.add_job( + _job_clean_task, + trigger=IntervalTrigger(seconds=settings.clean_interval_seconds), + id="job_clean", + name="岗位清洗", + max_instances=1, + ) + + # 公司补充(每 N 秒) + scheduler.add_job( + _company_clean_task, + trigger=IntervalTrigger(seconds=settings.company_interval_seconds), + id="company_clean", + name="公司补充", + max_instances=1, + ) + + # 岗位僵尸恢复(每30分钟) + scheduler.add_job( + _job_zombie_task, + trigger=IntervalTrigger(minutes=30), + id="job_zombie_recover", + name="岗位僵尸恢复", + max_instances=1, + ) + + # 公司僵尸恢复(每小时) + scheduler.add_job( + _company_zombie_task, + trigger=IntervalTrigger(hours=1), + id="company_zombie_recover", + name="公司僵尸恢复", + max_instances=1, + ) + + # 岗位下架(每天凌晨2点) + scheduler.add_job( + _job_expire_task, + trigger=CronTrigger(hour=2, minute=0), + id="job_expire", + name="岗位下架", + max_instances=1, + ) + + return scheduler + + +async def _job_clean_task(): + from app.services.job_clean_service import run_job_clean + try: + await run_job_clean() + except Exception as e: + log.error("岗位清洗任务异常: {}", e) + + +async def _company_clean_task(): + from app.services.company_clean_service import run_company_clean + try: + await run_company_clean() + except Exception as e: + log.error("公司补充任务异常: {}", e) + + +async def _job_zombie_task(): + from app.services.zombie_recover_service import recover_job_zombie + try: + await recover_job_zombie() + except Exception as e: + log.error("岗位僵尸恢复异常: {}", e) + + +async def _company_zombie_task(): + from app.services.zombie_recover_service import recover_company_zombie + try: + await recover_company_zombie() + except Exception as e: + log.error("公司僵尸恢复异常: {}", e) + + +async def _job_expire_task(): + from app.services.job_expire_service import run_job_expire + try: + await run_job_expire() + except Exception as e: + log.error("岗位下架异常: {}", e) diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/ai_tool.py b/app/services/ai_tool.py new file mode 100644 index 0000000..6ff9837 --- /dev/null +++ b/app/services/ai_tool.py @@ -0,0 +1,65 @@ +"""AI 调用工具封装""" + +import json +import re +from typing import Any + +from langchain_openai import ChatOpenAI +from langchain_core.messages import SystemMessage, HumanMessage + +from app.core.logger import log + +# markdown 代码块正则 +_CODE_BLOCK_RE = re.compile(r"```\w*\s*\n?(.*?)\n?\s*```", re.DOTALL) +# 控制字符正则(保留 \t \n \r) +_CONTROL_CHAR_RE = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f]") + + +def clean_ai_response(response: str) -> str: + """从 AI 返回的文本中提取干净的 JSON 字符串""" + if not response or not response.strip(): + return "" + + result = response.strip() + + # 尝试从 markdown 代码块提取 + match = _CODE_BLOCK_RE.search(result) + if match: + result = match.group(1).strip() + else: + # 定位首个 JSON 起始符 + obj_start = result.find("{") + arr_start = result.find("[") + if obj_start < 0: + start = arr_start + elif arr_start < 0: + start = obj_start + else: + start = min(obj_start, arr_start) + if start > 0: + result = result[start:] + + # 清除控制字符 + result = _CONTROL_CHAR_RE.sub("", result) + return result + + +async def ai_chat(llm: ChatOpenAI, system_prompt: str, user_message: str) -> str: + """异步调用 LLM,返回原始文本""" + messages = [ + SystemMessage(content=system_prompt), + HumanMessage(content=user_message), + ] + response = await llm.ainvoke(messages) + return response.content + + +async def ai_chat_json(llm: ChatOpenAI, system_prompt: str, user_message: str) -> Any: + """异步调用 LLM,返回解析后的 JSON 对象""" + raw = await ai_chat(llm, system_prompt, user_message) + cleaned = clean_ai_response(raw) + try: + return json.loads(cleaned) + except json.JSONDecodeError as e: + log.warning("AI JSON 解析失败: {}, raw={}", e, raw[:200]) + return None diff --git a/app/services/company_clean_service.py b/app/services/company_clean_service.py new file mode 100644 index 0000000..2ff482e --- /dev/null +++ b/app/services/company_clean_service.py @@ -0,0 +1,138 @@ +"""公司数据补充服务(协程版)""" + +import asyncio +from datetime import datetime + +from sqlalchemy import text + +from app.config import settings +from app.core.database import MysqlSession +from app.core.logger import log +from app.ai.model_config import CompanyCleanModel +from app.ai.prompts import COMPANY_ENRICH_SYSTEM +from app.services.ai_tool import ai_chat_json +from app.services.dict_cache_service import dict_cache + + +async def run_company_clean() -> None: + """一次批量公司补充任务""" + # 锁定一批待完善公司 + async with MysqlSession() as mysql: + result = await mysql.execute( + text(""" + SELECT * FROM bg_company + WHERE status = 0 + LIMIT :limit + FOR UPDATE SKIP LOCKED + """), + {"limit": settings.company_batch_size}, + ) + rows = result.mappings().all() + if not rows: + return + + ids = [r["id"] for r in rows] + # MySQL 批量 IN 用 format 拼接(id 是 bigint,安全) + ids_str = ",".join(str(i) for i in ids) + await mysql.execute( + text(f"UPDATE bg_company SET status = 3, update_time = NOW() WHERE id IN ({ids_str})"), + ) + await mysql.commit() + + log.info("公司补充:锁定{}条数据", len(rows)) + + # 协程并发,信号量限流 + sem = asyncio.Semaphore(settings.company_concurrency) + tasks = [_clean_one(sem, dict(r)) for r in rows] + await asyncio.gather(*tasks, return_exceptions=True) + + +async def _clean_one(sem: asyncio.Semaphore, company: dict) -> None: + """单条公司补充""" + async with sem: + try: + await _do_clean(company) + except Exception as e: + log.error("公司补充异常, id={}, shortName={}: {}", company["id"], company.get("short_name"), e) + + +async def _do_clean(company: dict) -> None: + """公司补充逻辑""" + company_id = company["id"] + short_name = company.get("short_name", "") + + user_msg = f"【公司简称】\n{short_name}\n\n【行业列表】\n{dict_cache.industry_text}" + result = await ai_chat_json(CompanyCleanModel.ENRICH, COMPANY_ENRICH_SYSTEM, user_msg) + + if result is None or not result.get("valid", False): + await _update_status(company_id, 4) + return + + # 地区匹配 + city = result.get("city") + region_code = dict_cache.match_region_code(city) if city else None + + # 回填数据 + now = datetime.now() + async with MysqlSession() as mysql: + await mysql.execute( + text(""" + UPDATE bg_company SET + name = COALESCE(:name, name), + region_code = COALESCE(:region_code, region_code), + company_type = COALESCE(:company_type, company_type), + industry_id = :industry_id, + tags = :tags, + summary = COALESCE(:summary, summary), + description = COALESCE(:description, description), + founded_year = COALESCE(:founded_year, founded_year), + address = COALESCE(:address, address), + scale = COALESCE(:scale, scale), + website = COALESCE(:website, website), + financing_stage = COALESCE(:financing_stage, financing_stage), + latest_valuation = COALESCE(:latest_valuation, latest_valuation), + news = :news, + status = 1, + update_time = :now + WHERE id = :id + """), + { + "name": result.get("name"), + "region_code": region_code, + "company_type": result.get("companyType"), + "industry_id": result.get("industryId"), + "tags": _to_json(result.get("tags")), + "summary": result.get("summary"), + "description": result.get("description"), + "founded_year": result.get("foundedYear"), + "address": result.get("address"), + "scale": result.get("scale"), + "website": result.get("website"), + "financing_stage": result.get("financingStage"), + "latest_valuation": result.get("latestValuation"), + "news": _to_json(result.get("news")), + "now": now, + "id": company_id, + }, + ) + await mysql.commit() + + log.info("公司补充完成, id={}, shortName={}", company_id, short_name) + + +async def _update_status(company_id: int, status: int) -> None: + """更新公司状态""" + async with MysqlSession() as mysql: + await mysql.execute( + text("UPDATE bg_company SET status = :s, update_time = NOW() WHERE id = :id"), + {"s": status, "id": company_id}, + ) + await mysql.commit() + + +def _to_json(value) -> str | None: + """列表转 JSON 字符串""" + import json + if value and isinstance(value, list): + return json.dumps(value, ensure_ascii=False) + return None diff --git a/app/services/dict_cache_service.py b/app/services/dict_cache_service.py new file mode 100644 index 0000000..6872a91 --- /dev/null +++ b/app/services/dict_cache_service.py @@ -0,0 +1,91 @@ +"""字典数据缓存服务 + +启动时从 MySQL 加载岗位分类、行业、专业分类、地区数据到内存。 +""" + +from sqlalchemy import select, text + +from app.core.database import MysqlSession +from app.core.logger import log + + +class DictCacheService: + """字典缓存,单例使用""" + + def __init__(self): + self.job_category_text: str = "" + self.industry_text: str = "" + self.major_category_text: str = "" + self._region_list: list[dict] = [] + + async def refresh(self) -> None: + """加载全量字典数据""" + async with MysqlSession() as session: + # 岗位分类(三级叶子,带父级路径) + result = await session.execute(text(""" + SELECT c.id, c.name, c.parent_id, c.root_id, c.level, + p.name AS parent_name, r.name AS root_name + FROM bg_job_category c + LEFT JOIN bg_job_category p ON c.parent_id = p.id + LEFT JOIN bg_job_category r ON c.root_id = r.id + WHERE c.level = 3 + """)) + categories = result.mappings().all() + self.job_category_text = ", ".join( + f"{c['id']}:{c['name']}({c['root_name']}/{c['parent_name']})" + for c in categories + ) + + # 行业(二级叶子,带父级) + result = await session.execute(text(""" + SELECT i.id, i.name, p.name AS parent_name + FROM bg_industry i + LEFT JOIN bg_industry p ON i.parent_id = p.id + WHERE i.level = 2 + """)) + industries = result.mappings().all() + self.industry_text = ", ".join( + f"{i['id']}:{i['name']}({i['parent_name']})" + for i in industries + ) + + # 专业分类(三级叶子,带父级路径) + result = await session.execute(text(""" + SELECT m.id, m.name, m.parent_id, m.root_id, + p.name AS parent_name, r.name AS root_name + FROM bg_major_category m + LEFT JOIN bg_major_category p ON m.parent_id = p.id + LEFT JOIN bg_major_category r ON m.root_id = r.id + WHERE m.level = 3 + """)) + majors = result.mappings().all() + self.major_category_text = ", ".join( + f"{m['id']}:{m['name']}({m['root_name']}/{m['parent_name']})" + for m in majors + ) + + # 地区(省市级) + result = await session.execute(text(""" + SELECT code, name FROM bg_china_regions_code WHERE city_code IS NULL + """)) + self._region_list = [dict(r) for r in result.mappings().all()] + + log.info( + "字典缓存加载完成: 岗位分类{}条, 行业{}条, 专业{}条, 地区{}条", + len(categories), len(industries), len(majors), len(self._region_list), + ) + + def match_region_code(self, city_name: str) -> str | None: + """根据城市名模糊匹配地区编码""" + if not city_name: + return None + name = city_name.replace("市", "").replace("省", "").strip() + for r in self._region_list: + r_name = r["name"].replace("市", "").replace("省", "") + if name in r_name or r_name in name: + return r["code"] + return None + + +# 全局单例 +dict_cache = DictCacheService() diff --git a/app/services/job_clean_service.py b/app/services/job_clean_service.py new file mode 100644 index 0000000..9769d64 --- /dev/null +++ b/app/services/job_clean_service.py @@ -0,0 +1,306 @@ +"""岗位清洗服务(协程版)""" + +import asyncio +import json +from datetime import datetime + +from snowflake import SnowflakeGenerator +from sqlalchemy import text, insert + +from app.config import settings +from app.core.database import PgSession, MysqlSession +from app.core.logger import log +from app.ai.model_config import JobCleanModel +from app.ai.prompts import JOB_STRUCTURE_SYSTEM, MAJOR_MATCH_SYSTEM, SKILL_EXTRACT_SYSTEM +from app.models.mysql.job import Job +from app.models.mysql.company import Company +from app.models.mysql.relations import JobRegionRelation, JobSkillTagRelation +from app.services.ai_tool import ai_chat_json +from app.services.dict_cache_service import dict_cache + +# 雪花ID生成器 +_id_gen = SnowflakeGenerator(instance=1) + +# 公司创建锁(防止并发重复插入同一公司) +_company_lock = asyncio.Lock() + + +async def run_job_clean() -> None: + """一次批量清洗任务""" + # 1. 从 PG 锁定一批待清洗数据 + async with PgSession() as pg: + result = await pg.execute( + text(""" + SELECT * FROM app_job_data + WHERE clean_status = 'pending' + LIMIT :limit + FOR UPDATE SKIP LOCKED + """), + {"limit": settings.clean_batch_size}, + ) + rows = result.mappings().all() + if not rows: + return + + ids = [r["id"] for r in rows] + await pg.execute( + text(""" + UPDATE app_job_data + SET clean_status = 'cleaning', clean_started_at = NOW() + WHERE id = ANY(:ids) + """), + {"ids": ids}, + ) + await pg.commit() + + log.info("岗位清洗:锁定{}条数据", len(rows)) + + # 2. 协程并发清洗,信号量限流 + sem = asyncio.Semaphore(settings.clean_concurrency) + tasks = [_clean_one(sem, dict(r)) for r in rows] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # 汇总 + errors = sum(1 for r in results if isinstance(r, Exception)) + log.info("岗位清洗:本批完成,共{}条,异常{}条", len(rows), errors) + + +async def _clean_one(sem: asyncio.Semaphore, data: dict) -> None: + """单条岗位清洗""" + async with sem: + try: + await _do_clean(data) + except Exception as e: + log.error("岗位清洗异常, id={}: {}", data["id"], e) + # 保持 cleaning 状态,由僵尸恢复任务重置 + + +async def _do_clean(data: dict) -> None: + """清洗逻辑""" + data_id = data["id"] + + # 前置校验 + description = data.get("description") or "" + if len(description) < 20: + log.info("[id={}] 丢弃:描述过短({}字符)", data_id, len(description)) + await _update_pg_status(data_id, "discarded") + return + + # 第一次AI:结构化提取 + user_message = _build_user_message(data) + result = await ai_chat_json(JobCleanModel.STRUCTURE, JOB_STRUCTURE_SYSTEM, user_message) + if result is None or not result.get("valid", False): + log.info("[id={}] 丢弃:AI判定无效", data_id) + await _update_pg_status(data_id, "discarded") + return + + # 去重检查 + source_id = str(data_id) + async with MysqlSession() as mysql: + existing = await mysql.execute( + text("SELECT COUNT(*) AS cnt FROM bg_job WHERE source_id = :sid"), + {"sid": source_id}, + ) + if existing.scalar() > 0: + log.info("[id={}] 跳过:已入库(去重)", data_id) + await _update_pg_status(data_id, "cleaned") + return + + # 公司处理 + company_short_name = result.get("companyShortName") or data.get("company") or "" + company_id = await _find_or_create_company(company_short_name) + + # 地区处理 + region_codes = [] + for city in result.get("cities") or []: + code = dict_cache.match_region_code(city) + if code: + region_codes.append(code) + + # 写入 bg_job + job_id = next(_id_gen) + now = datetime.now() + async with MysqlSession() as mysql: + await mysql.execute( + insert(Job).values( + id=job_id, + title=result.get("title", ""), + company_id=company_id, + category_id=result.get("categoryId", 0), + employment_type=result.get("employmentType", 0), + description=result.get("description", ""), + requirement=result.get("requirement", ""), + bonus=result.get("bonus"), + tags=result.get("tags"), + skill_tags=result.get("skillTags"), + salary=result.get("salary"), + education=result.get("education", 0), + min_experience=result.get("minExperience", 0), + required_industry_id=result.get("requiredIndustryId"), + recruit_category=data.get("recruit_category", 3), + expire_at=data.get("expire_at"), + source_url=data.get("detail_url"), + source_id=source_id, + status=0, + create_time=now, + update_time=now, + ) + ) + + # 写入地区关联 + if region_codes: + await mysql.execute( + insert(JobRegionRelation), + [{"id": next(_id_gen), "job_id": job_id, "region_code": code, "create_time": now} for code in region_codes], + ) + + await mysql.commit() + + # 更新 PG 状态 + await _update_pg_status(data_id, "cleaned") + log.info("[id={}] 入库成功:{} | 公司={} | 地区={}", data_id, result.get("title"), company_short_name, region_codes) + + # 第二次AI:专业匹配(失败不影响) + try: + await _match_major(job_id, result) + log.debug("[id={}] 专业匹配完成", data_id) + except Exception as e: + log.warning("[id={}] 专业匹配失败: {}", data_id, e) + + # 第三次AI:技能提取(失败不影响) + try: + await _extract_skill_tags(job_id, result) + log.debug("[id={}] 技能提取完成", data_id) + except Exception as e: + log.warning("[id={}] 技能提取失败: {}", data_id, e) + + +async def _match_major(job_id: int, result: dict) -> None: + """第二次AI:专业匹配""" + title = result.get("title", "") + desc = result.get("description", "") + req = result.get("requirement", "") + user_msg = f"【岗位信息】\n标题: {title}\n职责: {desc}\n要求: {req}\n\n【专业分类列表】\n{dict_cache.major_category_text}" + + data = await ai_chat_json(JobCleanModel.MAJOR_MATCH, MAJOR_MATCH_SYSTEM, user_msg) + if data is None: + return + + major_ids = [mid for mid in (data.get("requiredMajorIds") or []) if mid > 0] + sensitivity = data.get("majorSensitivity", 0) + + async with MysqlSession() as mysql: + await mysql.execute( + text(""" + UPDATE bg_job SET required_major_ids = :ids, major_sensitivity = :s, update_time = :t + WHERE id = :jid + """), + {"ids": json.dumps(major_ids) if major_ids else None, "s": sensitivity, "t": datetime.now(), "jid": job_id}, + ) + await mysql.commit() + + +async def _extract_skill_tags(job_id: int, result: dict) -> None: + """第三次AI:技能提取""" + title = result.get("title", "") + desc = result.get("description", "") + req = result.get("requirement", "") + user_msg = f"【岗位信息】\n标题: {title}\n职责: {desc}\n要求: {req}" + + skills = await ai_chat_json(JobCleanModel.SKILL_EXTRACT, SKILL_EXTRACT_SYSTEM, user_msg) + if not skills or not isinstance(skills, list): + return + + now = datetime.now() + tag_ids = [] + async with MysqlSession() as mysql: + for name in skills: + name = str(name).strip().lower() + if not name or len(name) > 50: + continue + + tag_id = next(_id_gen) + # INSERT IGNORE + await mysql.execute( + text("INSERT IGNORE INTO bg_skill_tag (id, name) VALUES (:id, :name)"), + {"id": tag_id, "name": name}, + ) + # 查回真实ID + row = await mysql.execute( + text("SELECT id FROM bg_skill_tag WHERE name = :name LIMIT 1"), + {"name": name}, + ) + real_id = row.scalar() + if real_id and real_id not in tag_ids: + tag_ids.append(real_id) + + if tag_ids: + await mysql.execute( + insert(JobSkillTagRelation), + [{"id": next(_id_gen), "job_id": job_id, "skill_tag_id": tid, "create_time": now} for tid in tag_ids], + ) + + await mysql.commit() + + +async def _find_or_create_company(short_name: str) -> int: + """查找或创建公司(加锁防并发重复)""" + async with _company_lock: + async with MysqlSession() as mysql: + row = await mysql.execute( + text("SELECT id FROM bg_company WHERE short_name = :name LIMIT 1"), + {"name": short_name}, + ) + existing = row.scalar() + if existing: + return existing + + company_id = next(_id_gen) + now = datetime.now() + await mysql.execute( + insert(Company).values( + id=company_id, + name=short_name, + short_name=short_name, + status=0, + create_time=now, + update_time=now, + ) + ) + await mysql.commit() + return company_id + + +async def _update_pg_status(data_id: int, status: str) -> None: + """更新 PG 清洗状态""" + async with PgSession() as pg: + if status == "cleaned": + await pg.execute( + text("UPDATE app_job_data SET clean_status = :s, cleaned_at = NOW() WHERE id = :id"), + {"s": status, "id": data_id}, + ) + else: + await pg.execute( + text("UPDATE app_job_data SET clean_status = :s WHERE id = :id"), + {"s": status, "id": data_id}, + ) + await pg.commit() + + +def _build_user_message(data: dict) -> str: + """构建第一次AI的用户消息""" + parts = [ + "【原始数据】", + f"岗位名称: {data.get('job_title') or ''}", + f"薪资: {data.get('salary') or ''}", + f"工作地点: {data.get('location') or ''}", + f"公司: {data.get('company') or ''}", + f"经验要求: {data.get('experience') or ''}", + f"学历要求: {data.get('education') or ''}", + f"岗位详情: {data.get('description') or ''}", + "", + f"【岗位分类列表】\n{dict_cache.job_category_text}", + "", + f"【行业列表】\n{dict_cache.industry_text}", + ] + return "\n".join(parts) diff --git a/app/services/job_expire_service.py b/app/services/job_expire_service.py new file mode 100644 index 0000000..45de60f --- /dev/null +++ b/app/services/job_expire_service.py @@ -0,0 +1,29 @@ +"""岗位下架服务 + +每天定时执行,将 create_time 超过 N 天的岗位标记为已失效。 +""" + +from sqlalchemy import text + +from app.config import settings +from app.core.database import MysqlSession +from app.core.logger import log + + +async def run_job_expire() -> None: + """下架过期岗位""" + days = int(settings.job_expire_days) + async with MysqlSession() as mysql: + result = await mysql.execute( + text(f""" + UPDATE bg_job + SET status = 2, update_time = NOW() + WHERE status = 0 + AND create_time < DATE_SUB(NOW(), INTERVAL {days} DAY) + """), + ) + await mysql.commit() + affected = result.rowcount + + if affected > 0: + log.info("岗位下架:{}条岗位已标记为失效(超过{}天)", affected, days) diff --git a/app/services/zombie_recover_service.py b/app/services/zombie_recover_service.py new file mode 100644 index 0000000..405d14a --- /dev/null +++ b/app/services/zombie_recover_service.py @@ -0,0 +1,42 @@ +"""僵尸恢复服务""" + +from sqlalchemy import text + +from app.core.database import PgSession, MysqlSession +from app.core.logger import log + + +async def recover_job_zombie() -> None: + """岗位清洗僵尸恢复:超时10分钟的 cleaning → pending""" + async with PgSession() as pg: + result = await pg.execute( + text(""" + UPDATE app_job_data + SET clean_status = 'pending', clean_started_at = NULL + WHERE clean_status = 'cleaning' + AND clean_started_at < NOW() - INTERVAL '10 minutes' + """) + ) + await pg.commit() + affected = result.rowcount + + if affected > 0: + log.info("岗位僵尸恢复:重置{}条数据", affected) + + +async def recover_company_zombie() -> None: + """公司补充僵尸恢复:超时10分钟的 status=3 → 0""" + async with MysqlSession() as mysql: + result = await mysql.execute( + text(""" + UPDATE bg_company + SET status = 0, update_time = NOW() + WHERE status = 3 + AND update_time < NOW() - INTERVAL 10 MINUTE + """) + ) + await mysql.commit() + affected = result.rowcount + + if affected > 0: + log.info("公司僵尸恢复:重置{}条数据", affected) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c7ddeb9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,14 @@ +# 核心 +pydantic-settings>=2.0 +sqlalchemy[asyncio]>=2.0 +asyncpg>=0.29 +asyncmy>=0.2 +apscheduler>=3.10 + +# AI +langchain-openai>=0.3 +langchain-core>=0.3 + +# 工具 +loguru>=0.7 +snowflake-id>=1.0 \ No newline at end of file