From c06f595559329a3119bdefa3a1fb4d35892e2d1e Mon Sep 17 00:00:00 2001 From: kgod Date: Wed, 27 May 2026 23:48:30 +0800 Subject: [PATCH] feat: add crawl scripts for recruitment websites MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - btyy (倍特药业), fullsemi (富芯半导体): 北森平台爬虫 - hotjob (中国五矿): hotjob平台爬虫 - leinao (中科类脑): 静态HTML爬虫 - task_fetcher: 原子锁获取任务 - post.md: 抓取技能文档 - export_har: mitmproxy HAR导出工具 --- crawl/btyy/btyy_crawler.py | 207 +++++++++++++++++++ crawl/export_har.py | 157 ++++++++++++++ crawl/fullsemi/fullsemi_crawler.py | 175 ++++++++++++++++ crawl/hotjob/hotjob_crawler.py | 180 ++++++++++++++++ crawl/leinao/leinao_crawler.py | 216 ++++++++++++++++++++ crawl/post.md | 316 +++++++++++++++++++++++++++++ crawl/task_fetcher.py | 73 +++++++ 7 files changed, 1324 insertions(+) create mode 100644 crawl/btyy/btyy_crawler.py create mode 100644 crawl/export_har.py create mode 100644 crawl/fullsemi/fullsemi_crawler.py create mode 100644 crawl/hotjob/hotjob_crawler.py create mode 100644 crawl/leinao/leinao_crawler.py create mode 100644 crawl/post.md create mode 100644 crawl/task_fetcher.py diff --git a/crawl/btyy/btyy_crawler.py b/crawl/btyy/btyy_crawler.py new file mode 100644 index 00000000..835e522a --- /dev/null +++ b/crawl/btyy/btyy_crawler.py @@ -0,0 +1,207 @@ +import requests +import json +import time +import math +import hashlib +from datetime import datetime + + +CATEGORY_DB_MAP = { + "1": 0, # 网站社会招聘 -> 数据库社招 + "2": 1, # 网站校园招聘 -> 数据库校招 +} + + +def parse_to_db(records, task_crawl_id, company_id="btyy", company="倍特药业"): + """ + 将API返回的岗位数据清洗为 app_job_data 表所需格式 + + :param records: API返回的岗位列表 + :param task_crawl_id: 爬虫任务ID + :param company_id: 公司标识 + :return: list[dict] + """ + results = [] + for r in records: + job_title = (r.get("JobAdName") or "").strip() + if not job_title: + continue + + duty = r.get("Duty") or "" + require = r.get("Require") or "" + parts = [] + if duty and duty != "/": + parts.append(f"【工作职责】\n{duty}") + if require and require != "/": + parts.append(f"【任职要求】\n{require}") + description = "\n\n".join(parts) + + category_id = r.get("CategoryId", "1") + job_id = r.get("Id", "") + prefix = "social" if category_id == "1" else "campus" + detail_url = f"https://btyy.zhiye.com/{prefix}/jobs/{job_id}" + + content_hash = hashlib.md5( + f"{job_title}|{company_id}|{description}".encode("utf-8") + ).hexdigest() + + item = { + "task_crawl_id": task_crawl_id, + "job_title": job_title, + "company_id": company_id, + "company": company, + "detail_url": detail_url, + "recruit_category": CATEGORY_DB_MAP.get(category_id, 0), + "content_hash": content_hash, + } + + # 可选字段,有值才设置 + loc_names = r.get("LocNames") + if loc_names: + item["location"] = ",".join(loc_names) + + if r.get("Salary"): + item["salary"] = r["Salary"] + + if r.get("Degree"): + item["education"] = r["Degree"] + + if r.get("YearsOfWorking"): + item["experience"] = r["YearsOfWorking"] + + if description: + item["description"] = description + + post_date = r.get("PostDate") or "" + if post_date: + item["expire_at"] = post_date[:10] + " 00:00:00" + else: + item["expire_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + results.append(item) + return results + + +class BtyyJobCrawler: + """倍特药业招聘官网爬虫 (btyy.zhiye.com)""" + + BASE_URL = "https://btyy.zhiye.com/api/Jobad" + CATEGORY_MAP = { + "shezhao": "1", + "xiaozhao": "2", + } + + def __init__(self): + self.session = requests.Session() + self.session.headers.update({ + "Content-Type": "application/json;charset=UTF-8", + "Accept": "application/json, text/plain, */*", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36", + "Referer": "https://btyy.zhiye.com/", + }) + + def _get_job_list_page(self, category_id, page_index=0, page_size=20): + """获取单页岗位列表""" + url = f"{self.BASE_URL}/GetJobAdPageList" + payload = { + "PageIndex": page_index, + "PageSize": page_size, + "Category": [category_id], + "KeyWords": "", + "SpecialType": 0, + "PortalId": "", + "DisplayFields": [ + "Category", "LocId", "HeadCount", "PostDate", + "ClassificationTwo", "WorkWeChatQrCode", "Degree", + "Kind", "Org" + ], + } + resp = self.session.post(url, json=payload) + resp.raise_for_status() + data = resp.json() + if data.get("Code") != 200: + raise Exception(f"API错误: {data.get('Message', '未知错误')}") + return data + + def _get_all_jobs(self, category_id): + """获取某个分类下的所有岗位(自动分页)""" + first_page = self._get_job_list_page(category_id, page_index=0) + total = first_page["Count"] + all_records = first_page["Data"] + + if total == 0: + return {"total": 0, "records": [], "position_ids": []} + + total_pages = math.ceil(total / 20) + for page in range(1, total_pages): + time.sleep(0.3) + page_data = self._get_job_list_page(category_id, page_index=page) + all_records.extend(page_data["Data"]) + + position_ids = [r["Id"] for r in all_records] + return {"total": total, "records": all_records, "position_ids": position_ids} + + def get_shezhao_list(self): + """获取社会招聘列表 + 返回: {"total": int, "records": list, "position_ids": list} + """ + return self._get_all_jobs(self.CATEGORY_MAP["shezhao"]) + + def get_xiaozhao_list(self): + """获取校园招聘列表 + 返回: {"total": int, "records": list, "position_ids": list} + """ + return self._get_all_jobs(self.CATEGORY_MAP["xiaozhao"]) + + def get_shixi_list(self): + """获取实习招聘列表(该网站无实习分类,返回空)""" + return {"total": 0, "records": [], "position_ids": []} + + def get_position_detail(self, position_id): + """获取岗位详情 + 注:该网站列表接口已返回完整岗位信息(Duty、Require), + 此方法从已获取的列表数据中提取,无需额外请求。 + 如需单独请求,可访问岗位页面。 + """ + for category_id in self.CATEGORY_MAP.values(): + data = self._get_job_list_page(category_id, page_index=0, page_size=100) + for record in data.get("Data", []): + if record["Id"] == position_id: + return record + return None + + +if __name__ == "__main__": + crawler = BtyyJobCrawler() + + print("=" * 60) + print("倍特药业 - 招聘岗位爬取") + print("=" * 60) + + # 社会招聘 + print("\n[社会招聘]") + shezhao = crawler.get_shezhao_list() + print(f" 共 {shezhao['total']} 个岗位") + + # 校园招聘 + print("\n[校园招聘]") + xiaozhao = crawler.get_xiaozhao_list() + print(f" 共 {xiaozhao['total']} 个岗位") + + # 数据清洗 + print("\n[数据清洗]") + task_crawl_id = 0 + all_parsed = parse_to_db(shezhao["records"], task_crawl_id) + all_parsed += parse_to_db(xiaozhao["records"], task_crawl_id) + print(f" 清洗完成: {len(all_parsed)} 条") + + # 打印样例 + print("\n--- 样例 ---") + for k, v in all_parsed[0].items(): + print(f" {k}: {str(v)[:100]}") + + # 保存 + output_file = "crawl/btyy/btyy_parsed.json" + with open(output_file, "w", encoding="utf-8") as f: + json.dump(all_parsed, f, ensure_ascii=False, indent=2) + print(f"\n已保存到 {output_file}") diff --git a/crawl/export_har.py b/crawl/export_har.py new file mode 100644 index 00000000..c95a6a52 --- /dev/null +++ b/crawl/export_har.py @@ -0,0 +1,157 @@ +"""Export mitmproxy MCP traffic database to HAR format.""" + +import json +import sqlite3 +import sys +from datetime import datetime, timezone +from pathlib import Path +from urllib.parse import urlparse + +DB_PATH = Path(__file__).parent.parent / "mitm_mcp_traffic.db" + + +def parse_headers(headers_str): + """Parse stored headers JSON into HAR header list.""" + if not headers_str: + return [] + try: + headers = json.loads(headers_str) + if isinstance(headers, list): + return [{"name": pair[0], "value": pair[1]} for pair in headers if len(pair) >= 2] + elif isinstance(headers, dict): + return [{"name": k, "value": v} for k, v in headers.items()] + except (json.JSONDecodeError, TypeError): + pass + return [] + + +def get_mime_type(headers_str): + """Extract content-type from headers.""" + if not headers_str: + return "application/octet-stream" + try: + headers = json.loads(headers_str) + if isinstance(headers, list): + for pair in headers: + if len(pair) >= 2 and pair[0].lower() == "content-type": + return pair[1].split(";")[0].strip() + elif isinstance(headers, dict): + for k, v in headers.items(): + if k.lower() == "content-type": + return v.split(";")[0].strip() + except (json.JSONDecodeError, TypeError): + pass + return "application/octet-stream" + + +def build_har_entry(row): + """Convert a DB row to a HAR entry.""" + flow_id, url, method, status_code, req_headers, req_body, resp_headers, resp_body, timestamp, size = row + + parsed = urlparse(url) + started = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat() + + req_header_list = parse_headers(req_headers) + resp_header_list = parse_headers(resp_headers) + resp_mime = get_mime_type(resp_headers) + + entry = { + "startedDateTime": started, + "time": 0, + "request": { + "method": method or "GET", + "url": url, + "httpVersion": "HTTP/1.1", + "cookies": [], + "headers": req_header_list, + "queryString": [ + {"name": p.split("=", 1)[0], "value": p.split("=", 1)[1] if "=" in p else ""} + for p in (parsed.query.split("&") if parsed.query else []) + ], + "headersSize": -1, + "bodySize": len(req_body.encode("utf-8")) if req_body else 0, + }, + "response": { + "status": status_code or 0, + "statusText": "", + "httpVersion": "HTTP/1.1", + "cookies": [], + "headers": resp_header_list, + "content": { + "size": size or 0, + "mimeType": resp_mime, + "text": resp_body or "", + }, + "redirectURL": "", + "headersSize": -1, + "bodySize": size or 0, + }, + "cache": {}, + "timings": {"send": 0, "wait": 0, "receive": 0}, + } + + if req_body: + req_mime = get_mime_type(req_headers) + entry["request"]["postData"] = { + "mimeType": req_mime, + "text": req_body, + } + + return entry + + +def export_har(db_path=DB_PATH, output_path=None, domain=None): + """Export traffic DB to HAR file.""" + if not db_path.exists(): + print(f"Database not found: {db_path}") + sys.exit(1) + + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + query = "SELECT * FROM flows ORDER BY timestamp ASC" + params = [] + if domain: + query = "SELECT * FROM flows WHERE url LIKE ? ORDER BY timestamp ASC" + params = [f"%{domain}%"] + + cursor.execute(query, params) + rows = cursor.fetchall() + conn.close() + + if not rows: + print("No traffic found.") + sys.exit(0) + + entries = [build_har_entry(row) for row in rows] + + har = { + "log": { + "version": "1.2", + "creator": {"name": "mitmproxy-mcp-export", "version": "1.0"}, + "entries": entries, + } + } + + if output_path is None: + output_path = Path(f"traffic_{datetime.now().strftime('%Y%m%d_%H%M%S')}.har") + + output_path = Path(output_path) + output_path.write_text(json.dumps(har, ensure_ascii=False, indent=2), encoding="utf-8") + print(f"Exported {len(entries)} entries to {output_path}") + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="Export mitmproxy MCP traffic to HAR") + parser.add_argument("-o", "--output", help="Output HAR file path") + parser.add_argument("-d", "--domain", help="Filter by domain") + parser.add_argument("--db", help="Database path", default=str(DB_PATH)) + args = parser.parse_args() + + export_har( + db_path=Path(args.db), + output_path=args.output, + domain=args.domain, + ) diff --git a/crawl/fullsemi/fullsemi_crawler.py b/crawl/fullsemi/fullsemi_crawler.py new file mode 100644 index 00000000..f3ba4831 --- /dev/null +++ b/crawl/fullsemi/fullsemi_crawler.py @@ -0,0 +1,175 @@ +import requests +import json +import time +import math +import hashlib +from datetime import datetime + + +CATEGORY_DB_MAP = { + "1": 0, # 网站社会招聘 -> 数据库社招 + "2": 1, # 网站校园招聘 -> 数据库校招 +} + + +def parse_to_db(records, task_crawl_id, company_id="fullsemi", company="富芯半导体"): + """将API返回的岗位数据清洗为 app_job_data 表所需格式""" + results = [] + for r in records: + job_title = (r.get("JobAdName") or "").strip() + if not job_title: + continue + + duty = r.get("Duty") or "" + require = r.get("Require") or "" + parts = [] + if duty and duty != "/": + parts.append(f"【工作职责】\n{duty}") + if require and require != "/": + parts.append(f"【任职要求】\n{require}") + description = "\n\n".join(parts) + + category_id = r.get("CategoryId", "1") + job_id = r.get("Id", "") + prefix = "social" if category_id == "1" else "campus" + detail_url = f"https://fullsemi.zhiye.com/{prefix}/jobs/{job_id}" + + content_hash = hashlib.md5( + f"{job_title}|{company_id}|{description}".encode("utf-8") + ).hexdigest() + + item = { + "task_crawl_id": task_crawl_id, + "job_title": job_title, + "company_id": company_id, + "company": company, + "detail_url": detail_url, + "recruit_category": CATEGORY_DB_MAP.get(category_id, 0), + "content_hash": content_hash, + } + + loc_names = r.get("LocNames") + if loc_names: + item["location"] = ",".join(loc_names) + if r.get("Salary"): + item["salary"] = r["Salary"] + if r.get("Degree"): + item["education"] = r["Degree"] + if r.get("YearsOfWorking"): + item["experience"] = r["YearsOfWorking"] + if description: + item["description"] = description + + post_date = r.get("PostDate") or "" + if post_date: + item["expire_at"] = post_date[:10] + " 00:00:00" + else: + item["expire_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + results.append(item) + return results + + +class FullsemiJobCrawler: + """富芯半导体招聘官网爬虫 (fullsemi.zhiye.com)""" + + BASE_URL = "https://fullsemi.zhiye.com/api/Jobad" + CATEGORY_MAP = { + "shezhao": "1", + "xiaozhao": "2", + } + + def __init__(self): + self.session = requests.Session() + self.session.headers.update({ + "Content-Type": "application/json;charset=UTF-8", + "Accept": "application/json, text/plain, */*", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36", + "Referer": "https://fullsemi.zhiye.com/", + }) + + def _get_job_list_page(self, category_id, page_index=0, page_size=20): + url = f"{self.BASE_URL}/GetJobAdPageList" + payload = { + "PageIndex": page_index, + "PageSize": page_size, + "Category": [category_id], + "KeyWords": "", + "SpecialType": 0, + "PortalId": "", + "DisplayFields": [ + "Category", "Kind", "LocId", "PostDate", + "WorkWeChatQrCode", "Degree", "Org" + ], + } + resp = self.session.post(url, json=payload) + resp.raise_for_status() + data = resp.json() + if data.get("Code") != 200: + raise Exception(f"API错误: {data.get('Message', '未知错误')}") + return data + + def _get_all_jobs(self, category_id): + first_page = self._get_job_list_page(category_id, page_index=0) + total = first_page["Count"] + all_records = first_page["Data"] + + if total == 0: + return {"total": 0, "records": [], "position_ids": []} + + total_pages = math.ceil(total / 20) + for page in range(1, total_pages): + time.sleep(0.3) + page_data = self._get_job_list_page(category_id, page_index=page) + all_records.extend(page_data["Data"]) + + position_ids = [r["Id"] for r in all_records] + return {"total": total, "records": all_records, "position_ids": position_ids} + + def get_shezhao_list(self): + """获取社会招聘列表""" + return self._get_all_jobs(self.CATEGORY_MAP["shezhao"]) + + def get_xiaozhao_list(self): + """获取校园招聘列表""" + return self._get_all_jobs(self.CATEGORY_MAP["xiaozhao"]) + + def get_shixi_list(self): + """获取实习招聘列表(该网站无实习分类)""" + return {"total": 0, "records": [], "position_ids": []} + + def get_position_detail(self, position_id): + """获取岗位详情(列表已包含完整信息)""" + for category_id in self.CATEGORY_MAP.values(): + data = self._get_job_list_page(category_id, page_index=0, page_size=100) + for record in data.get("Data", []): + if record["Id"] == position_id: + return record + return None + + +if __name__ == "__main__": + crawler = FullsemiJobCrawler() + + print("=" * 60) + print("富芯半导体 - 招聘岗位爬取") + print("=" * 60) + + print("\n[社会招聘]") + shezhao = crawler.get_shezhao_list() + print(f" 共 {shezhao['total']} 个岗位") + + print("\n[校园招聘]") + xiaozhao = crawler.get_xiaozhao_list() + print(f" 共 {xiaozhao['total']} 个岗位") + + print("\n[数据清洗]") + task_crawl_id = 0 + all_parsed = parse_to_db(shezhao["records"], task_crawl_id) + all_parsed += parse_to_db(xiaozhao["records"], task_crawl_id) + print(f" 清洗完成: {len(all_parsed)} 条") + + if all_parsed: + print("\n--- 样例 ---") + for k, v in all_parsed[0].items(): + print(f" {k}: {str(v)[:100]}") diff --git a/crawl/hotjob/hotjob_crawler.py b/crawl/hotjob/hotjob_crawler.py new file mode 100644 index 00000000..136c08cf --- /dev/null +++ b/crawl/hotjob/hotjob_crawler.py @@ -0,0 +1,180 @@ +import requests +import hashlib +import time +import math +from datetime import datetime + + +CATEGORY_DB_MAP = { + "1": 1, # recruitType=1 校招 -> 数据库1 + "2": 0, # recruitType=2 社招 -> 数据库0 + "12": 2, # recruitType=12 实习 -> 数据库2 +} + + +def parse_to_db(records, task_crawl_id, company_id="minmetals", company="中国五矿"): + """将API返回的岗位数据清洗为 app_job_data 表所需格式""" + results = [] + for r in records: + job_title = (r.get("postName") or "").strip() + if not job_title: + continue + + work_content = r.get("workContent") or "" + service_condition = r.get("serviceCondition") or "" + subject = r.get("subject") or "" + parts = [] + if work_content: + parts.append(f"【工作职责】\n{work_content}") + if service_condition: + parts.append(f"【任职要求】\n{service_condition}") + if subject and not service_condition: + parts.append(f"【专业要求】\n{subject}") + description = "\n\n".join(parts) + + recruit_type = str(r.get("recruitType", "1")) + post_id = r.get("postId", "") + detail_url = f"https://wecruit.hotjob.cn/SU62f3786ebef57c29ead8adba/mc/detail?postId={post_id}&recruitType={'campus' if recruit_type == '1' else 'social'}" + + content_hash = hashlib.md5( + f"{job_title}|{company_id}|{description}".encode("utf-8") + ).hexdigest() + + item = { + "task_crawl_id": task_crawl_id, + "job_title": job_title, + "company_id": company_id, + "company": r.get("company") or company, + "detail_url": detail_url, + "recruit_category": CATEGORY_DB_MAP.get(recruit_type, 0), + "content_hash": content_hash, + } + + if r.get("workPlaceStr") and r["workPlaceStr"] != "全部地区": + item["location"] = r["workPlaceStr"] + if r.get("educationStr"): + item["education"] = r["educationStr"] + if r.get("workYears") and r["workYears"] != "无经验": + item["experience"] = r["workYears"] + if description: + item["description"] = description + + publish_date = r.get("publishDate") or "" + if publish_date: + item["expire_at"] = publish_date[:19] + else: + item["expire_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + results.append(item) + return results + + +class HotjobCrawler: + """中国五矿招聘官网爬虫 (wecruit.hotjob.cn)""" + + BASE_URL = "https://wecruit.hotjob.cn/wecruit/positionInfo" + SUITE_KEY = "SU62f3786ebef57c29ead8adba" + CATEGORY_MAP = { + "xiaozhao": "1", + "shezhao": "2", + "shixi": "12", + } + + def __init__(self): + self.session = requests.Session() + self.session.headers.update({ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36", + "Referer": f"https://wecruit.hotjob.cn/{self.SUITE_KEY}/mc/position/campus", + "Content-Type": "application/x-www-form-urlencoded", + }) + + def _get_list_page(self, recruit_type, page=1, page_size=10): + url = f"{self.BASE_URL}/listPosition/{self.SUITE_KEY}?iSaJAx=isAjax&request_locale=zh_CN" + data = f"recruitType={recruit_type}¤tPage={page}&pageSize={page_size}&coordinateLat=&coordinateLng=&orgCode=0" + resp = self.session.post(url, data=data) + resp.raise_for_status() + result = resp.json() + if result.get("state") != "200": + raise Exception(f"API错误: {result}") + return result["data"] + + def _get_all_jobs(self, recruit_type): + first = self._get_list_page(recruit_type, page=1) + total = first["positonNum"] + all_records = first["pageForm"]["pageData"] + + if total == 0: + return {"total": 0, "records": [], "position_ids": []} + + total_pages = first["pageForm"]["totalPage"] + for page in range(2, total_pages + 1): + time.sleep(0.3) + page_data = self._get_list_page(recruit_type, page=page) + all_records.extend(page_data["pageForm"]["pageData"]) + + position_ids = [r["postId"] for r in all_records] + return {"total": total, "records": all_records, "position_ids": position_ids} + + def get_xiaozhao_list(self): + """获取校园招聘列表""" + return self._get_all_jobs(self.CATEGORY_MAP["xiaozhao"]) + + def get_shezhao_list(self): + """获取社会招聘列表""" + return self._get_all_jobs(self.CATEGORY_MAP["shezhao"]) + + def get_shixi_list(self): + """获取实习招聘列表""" + return self._get_all_jobs(self.CATEGORY_MAP["shixi"]) + + def get_position_detail(self, post_id, recruit_type="1"): + """获取岗位详情""" + url = f"{self.BASE_URL}/listPositionDetail/{self.SUITE_KEY}?iSaJAx=isAjax&request_locale=zh_CN" + data = f"postId={post_id}&recruitType={recruit_type}" + resp = self.session.post(url, data=data) + resp.raise_for_status() + result = resp.json() + if result.get("state") != "200": + raise Exception(f"API错误: {result}") + return result["data"] + + +if __name__ == "__main__": + import sys + sys.stdout.reconfigure(encoding='utf-8') + + crawler = HotjobCrawler() + + print("=" * 60) + print("中国五矿 - 招聘岗位爬取") + print("=" * 60) + + print("\n[校园招聘]") + xiaozhao = crawler.get_xiaozhao_list() + print(f" 共 {xiaozhao['total']} 个岗位") + + print("\n[社会招聘]") + shezhao = crawler.get_shezhao_list() + print(f" 共 {shezhao['total']} 个岗位") + + print("\n[实习招聘]") + shixi = crawler.get_shixi_list() + print(f" 共 {shixi['total']} 个岗位") + + # 获取前5个校招详情测试 + print("\n[获取详情测试 - 校招前5个]") + details = [] + for r in xiaozhao["records"][:5]: + time.sleep(0.3) + detail = crawler.get_position_detail(r["postId"], "1") + details.append(detail) + print(f" {detail['postName']} | {detail.get('workPlaceStr','')}") + + # 清洗测试 + print("\n[数据清洗]") + parsed = parse_to_db(details, 0) + print(f" 清洗完成: {len(parsed)} 条") + if parsed: + print("\n--- 样例 ---") + for k, v in parsed[0].items(): + print(f" {k}: {str(v)[:100]}") diff --git a/crawl/leinao/leinao_crawler.py b/crawl/leinao/leinao_crawler.py new file mode 100644 index 00000000..3976d3da --- /dev/null +++ b/crawl/leinao/leinao_crawler.py @@ -0,0 +1,216 @@ +import requests +import hashlib +import time +from datetime import datetime +from bs4 import BeautifulSoup + + +CATEGORY_DB_MAP = { + "7": 0, # /job/7 社招 + "8": 1, # /job/8 校招 +} + + +def parse_to_db(records, task_crawl_id, company_id="leinao", company="中科类脑"): + """将解析后的岗位数据清洗为 app_job_data 表所需格式""" + results = [] + for r in records: + job_title = (r.get("job_title") or "").strip() + if not job_title: + continue + + description = r.get("description") or "" + detail_url = r.get("detail_url") or "" + recruit_category = r.get("recruit_category", 0) + + content_hash = hashlib.md5( + f"{job_title}|{company_id}|{description}".encode("utf-8") + ).hexdigest() + + item = { + "task_crawl_id": task_crawl_id, + "job_title": job_title, + "company_id": company_id, + "company": company, + "detail_url": detail_url, + "recruit_category": recruit_category, + "content_hash": content_hash, + } + + if r.get("location"): + item["location"] = r["location"] + if r.get("salary"): + item["salary"] = r["salary"] + if r.get("education"): + item["education"] = r["education"] + if r.get("experience"): + item["experience"] = r["experience"] + if description: + item["description"] = description + + post_date = r.get("post_date") or "" + if post_date: + item["expire_at"] = post_date + " 00:00:00" + else: + item["expire_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + results.append(item) + return results + + +class LeinaoJobCrawler: + """中科类脑招聘官网爬虫 (www.leinao.ai)""" + + BASE_URL = "https://www.leinao.ai" + CATEGORY_MAP = { + "shezhao": "7", + "xiaozhao": "8", + } + + def __init__(self): + self.session = requests.Session() + self.session.headers.update({ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36", + }) + + def _get_job_list(self, category_id): + """获取岗位列表页,解析HTML""" + url = f"{self.BASE_URL}/job/{category_id}" + resp = self.session.get(url) + resp.raise_for_status() + + soup = BeautifulSoup(resp.text, "html.parser") + links = soup.find_all("a", href=lambda h: h and "/jobdetail/" in h) + + records = [] + for link in links: + href = link.get("href", "") + job_id = href.split("/")[-1] + cells = link.find_all(["div", "span", "p", "generic"]) + texts = [t.get_text(strip=True) for t in link.children if hasattr(t, 'get_text')] + all_text = link.get_text(separator="|", strip=True).split("|") + all_text = [t for t in all_text if t] + + record = { + "job_id": job_id, + "detail_url": f"{self.BASE_URL}{href}", + "recruit_category": CATEGORY_DB_MAP.get(category_id, 0), + } + + if len(all_text) >= 1: + record["job_title"] = all_text[0] + if len(all_text) >= 2: + loc = all_text[1] + record["location"] = loc if loc != "不限" else None + if len(all_text) >= 3: + record["job_type"] = all_text[2] + if len(all_text) >= 4: + record["category_name"] = all_text[3] + if len(all_text) >= 5: + record["post_date"] = all_text[4] + if len(all_text) >= 6: + record["org"] = all_text[5] + + records.append(record) + + position_ids = [r["job_id"] for r in records] + return {"total": len(records), "records": records, "position_ids": position_ids} + + def get_shezhao_list(self): + """获取社会招聘列表""" + return self._get_job_list(self.CATEGORY_MAP["shezhao"]) + + def get_xiaozhao_list(self): + """获取校园招聘列表""" + return self._get_job_list(self.CATEGORY_MAP["xiaozhao"]) + + def get_shixi_list(self): + """获取实习招聘列表(该网站无实习分类)""" + return {"total": 0, "records": [], "position_ids": []} + + def get_position_detail(self, job_id): + """获取岗位详情""" + url = f"{self.BASE_URL}/jobdetail/{job_id}" + resp = self.session.get(url) + resp.raise_for_status() + + soup = BeautifulSoup(resp.text, "html.parser") + main = soup.find("main") or soup + + title_tag = soup.find("title") + job_title = title_tag.get_text().replace("-中科类脑", "").strip() if title_tag else "" + + paragraphs = main.find_all("p") + description_parts = [] + for p in paragraphs: + text = p.get_text(strip=True) + if text and len(text) > 5: + description_parts.append(text) + + h6_tags = main.find_all("h6") + location = None + salary = None + experience = None + for h6 in h6_tags: + text = h6.get_text(strip=True).replace("\xa0", " ") + if "薪资" in text or "工作经验" in text: + parts = [p.strip() for p in text.split() if p.strip()] + for part in parts: + if "薪资:" in part: + sal = part.replace("薪资:", "") + if sal and sal != "面议": + salary = sal + elif "工作经验:" in part: + exp = part.replace("工作经验:", "") + if exp and exp != "不限": + experience = exp + elif "·" in part and "不限" not in part: + location = part + + return { + "job_id": job_id, + "job_title": job_title, + "description": "\n".join(description_parts), + "location": location, + "salary": salary, + "experience": experience, + "detail_url": f"{self.BASE_URL}/jobdetail/{job_id}", + } + + def crawl_all(self): + """爬取所有岗位列表+详情""" + all_records = [] + + for name, cat_id in self.CATEGORY_MAP.items(): + job_list = self._get_job_list(cat_id) + recruit_cat = CATEGORY_DB_MAP.get(cat_id, 0) + print(f"[{name}] 共 {job_list['total']} 个岗位") + + for r in job_list["records"]: + time.sleep(0.3) + detail = self.get_position_detail(r["job_id"]) + detail["recruit_category"] = recruit_cat + detail["post_date"] = r.get("post_date") + all_records.append(detail) + + return all_records + + +if __name__ == "__main__": + crawler = LeinaoJobCrawler() + + print("=" * 60) + print("中科类脑 - 招聘岗位爬取") + print("=" * 60) + + all_details = crawler.crawl_all() + + print(f"\n[数据清洗]") + task_crawl_id = 0 + parsed = parse_to_db(all_details, task_crawl_id) + print(f" 清洗完成: {len(parsed)} 条") + + if parsed: + print("\n--- 样例 ---") + for k, v in parsed[0].items(): + print(f" {k}: {str(v)[:100]}") diff --git a/crawl/post.md b/crawl/post.md new file mode 100644 index 00000000..0a586783 --- /dev/null +++ b/crawl/post.md @@ -0,0 +1,316 @@ +# 招聘网站自动抓取技能 + +当你获取到一个"目标url"和"目标要求"时,请参考如下步骤。 + +--- + +## 第一步:创建工作目录 + +创建目录 `\crawl\{name}\`(已存在则跳过),所有产生的文件都写到这个目录。 + +临时文件(抓包中间产物、调试输出等)统一放在 `\crawl\tmp\`,完成后可清理。 + +--- + +## 第二步:使用 mitmproxy 抓包 + +### 前置条件 +- mitmproxy MCP 工具已配置可用 +- mitmproxy CA 证书已安装到系统信任存储(首次使用需安装) + +### 操作流程 + +1. **启动代理**:调用 `start_proxy(port=8080)` 确保代理运行中 +2. **清空历史数据**:调用 `clear_traffic` 清空之前的抓包记录 +3. **启动 Playwright 浏览器**(必须配置代理和忽略证书): + ```python + browser = playwright.chromium.launch( + proxy={"server": "http://127.0.0.1:8080"}, + args=["--ignore-certificate-errors"] + ) + ``` +4. **打开目标网站**,通过 `search_traffic` 确认已抓到目标域名的请求 +5. **操作浏览器**完成所有"目标要求"中的操作(详见下方"浏览策略") +6. **导出 HAR**:使用 `F:\offerpai_cw\crawl\export_har.py` 导出抓包数据 + ```bash + python F:\offerpai_cw\crawl\export_har.py -d "目标域名" -o "crawl\{name}\域名.har" + ``` + +### 备选方案:Playwright 内置 HAR 录制 + +如果 mitmproxy 不可用或证书问题无法解决,可用 Playwright 自带的 HAR 录制: +```python +context = browser.new_context(record_har_path="crawl/{name}/traffic.har") +# ... 操作浏览器 ... +context.close() # 关闭时自动保存 HAR +``` + +也可以直接使用 Playwright 的网络请求监控面板(`browser_network_requests`)抓取 API 请求,适合简单场景。 + +--- + +## 浏览策略:如何抓取招聘网站的岗位数据 + +1. **覆盖所有招聘类型**:网站通常有校招、社招、实习等分类(可能是不同页面、Tab切换、或下拉选择)。**必须逐个点击每个分类**,确认该分类下是否有数据,并抓到对应的 API 请求。不能因为某个分类看起来可能为空就跳过,必须实际点击确认。 +2. **确认每个分类的实际数据量**:进入每个分类后,记录页面显示的岗位总数。后续脚本开发完成后要与此数量对比验证。 +3. **岗位列表页**:进入列表后观察数据加载方式: + - **API 动态加载**:列表数据通过 XHR/Fetch 请求获取,通常包含岗位 ID、分页信息 + - **静态渲染**:列表数据直接在 HTML 中,点击岗位后不再发起新请求 + - **混合模式**:列表是 API 加载,但详情也在列表响应中(无需单独请求详情) +4. **岗位详情页**:点击至少一个岗位进入详情页,观察是否有独立的详情 API +5. **分页**:如果列表有多页,至少翻到第2页,确认分页参数格式 +6. **最终目标**:确保抓到能获取所有岗位完整信息的 API 请求 + +--- + +## 第三步:分析 HAR 封包,用 requests 重现 + +### 分析要点 + +1. 从 HAR 中找出关键 API 请求(通常是返回 JSON 且包含岗位数据的 POST/GET 请求) +2. 区分哪些是必要请求,哪些是埋点/日志等无关请求 +3. 关注请求中的认证信息来源:Token、Cookie、签名参数等 + +### 最小化重现 + +用 Python requests 尝试最小参数请求: +```python +import requests +resp = requests.post(url, json=payload, headers=必要headers) +``` + +如果返回与抓包一致的数据,说明重现成功。 + +### 重现失败时的排查方向 + +- **认证参数**:Cookie、Authorization header、自定义 Token header +- **动态参数**:时间戳、签名、加密字段 — 分析 HAR 中这些参数的生成规律 +- **请求体编码**:有些网站对请求体做 base64 编码或自定义加密 +- **请求顺序依赖**:某些接口需要先调用初始化接口获取 session/token +- **Referer/Origin 校验**:部分网站校验这些 header + +有了完整的 HAR 封包,所有参数来源都可追溯,一定可以完成重现。 + +--- + +## 第四步:创建 Python 爬虫脚本 + +### 前提 + +在开始这一步之前,必须已经用 requests 成功重现了目标 API 的请求。 + +### 类结构设计 + +脚本使用类方式组织,通过 `requests.Session()` 自动管理 Cookie 同步: + +```python +import requests + +class XxxCrawler: + """xxx招聘网站爬虫""" + + def __init__(self): + """初始化:建立session、获取初始cookies/token、设置公共headers""" + self.session = requests.Session() + self.session.headers.update({...}) + # 如需要:调用初始化接口获取token等 + + def get_xiaozhao_list(self) -> dict: + """获取校园招聘列表(内部处理分页,返回所有页数据) + 成功返回:{"total": int, "records": list, "position_ids": list} + 失败抛出异常 + """ + + def get_shezhao_list(self) -> dict: + """获取社会招聘列表(内部处理分页)""" + + def get_shixi_list(self) -> dict: + """获取实习招聘列表(内部处理分页)""" + + def get_position_detail(self, position_id, **kwargs) -> dict: + """获取单个岗位详情 + 参数:岗位ID及其他必要参数 + 成功返回:完整的岗位详情数据 + """ +``` + +### 关键设计要求 + +1. **Session 管理**:使用 `requests.Session()` 保持 Cookie 自动同步 +2. **分页处理**:列表方法内部自动遍历所有页,调用方无需关心分页逻辑 +3. **init 职责**:所有前置依赖(Cookie获取、Token刷新、公共参数构建)都在初始化中完成 +4. **错误处理**:网络错误和业务错误分开处理,失败时抛出有意义的异常 +5. **请求间隔**:每次请求间加 `time.sleep(0.3~0.5)` 避免被封 + +### 测试标准 + +逐个测试每个方法: +- 列表方法能返回完整的岗位列表和所有岗位 ID +- 详情方法能根据 ID 返回完整的岗位信息 +- 连续调用不会因 Cookie/Token 过期而失败 +- **数据完整性对比**:用 Playwright 打开页面,人工确认页面上可见的岗位数量和分类,与脚本最终获取的数量对比。如果页面上能看到但接口没返回的,排查原因: + - recruitType/Category 值是否正确(不一定是连续数字,如实习可能是12而不是3) + - 是否有隐藏分类或子页面未覆盖 + - 分页是否遍历完整(对比 total 和实际获取条数) + - 筛选条件是否遗漏(如 orgCode、PortalId 等参数影响结果集) + +--- + +## 第五步:数据清洗方法 (parse_to_db) + +### 目标 + +将爬虫获取的原始数据转换为数据库 `app_job_data` 表所需的格式。在爬虫脚本同文件中新增 `parse_to_db` 方法。 + +### app_job_data 表结构 + +| 字段 | 类型 | 必填 | 默认值 | 说明 | +|------|------|------|--------|------| +| `id` | bigint | 自增主键 | - | 不需要传 | +| `task_crawl_id` | bigint | **必填** | - | 爬虫任务ID,关联 app_url_list | +| `job_title` | varchar(255) | **必填** | - | 岗位名称 | +| `salary` | varchar(128) | 可选 | NULL | 薪资 | +| `location` | varchar(2048) | 可选 | NULL | 工作地点 | +| `company_id` | varchar(255) | **必填** | - | 公司标识(英文简写) | +| `company` | varchar(255) | 可选 | NULL | 公司名称(中文全称) | +| `experience` | varchar(64) | 可选 | NULL | 工作经验要求 | +| `education` | varchar(64) | 可选 | NULL | 学历要求 | +| `description` | text | 可选 | NULL | 岗位描述 | +| `detail_url` | varchar(1024) | **必填** | - | 岗位详情链接 | +| `recruit_category` | tinyint | **必填** | 3 | 0=社招, 1=校招, 2=实习 | +| `content_hash` | varchar(64) | **必填** | - | 去重MD5 | +| `expire_at` | datetime | **必填** | - | 发布日期,从岗位信息匹配,匹配不到则设为当天日期 | +| `sources` | tinyint(1) | 不需要传 | 0 | 数据库默认 | +| `is_independent_url` | tinyint(1) | 不需要传 | 1 | 数据库默认 | +| `check_status` | varchar(32) | 不需要传 | "pending" | 数据库默认 | +| `clean_status` | tinyint(1) | 不需要传 | 0 | 数据库默认 | +| `last_check_at` | datetime | 不需要传 | NULL | 数据库默认 | +| `created_at` | datetime | 不需要传 | CURRENT_TIMESTAMP | 数据库默认 | +| `updated_at` | datetime | 不需要传 | CURRENT_TIMESTAMP | 数据库默认 | + +### 清洗方法模板 + +```python +def parse_to_db(records, task_crawl_id, company_id="xxx", company="公司中文名"): + """ + 将API返回的岗位数据清洗为 app_job_data 表所需格式 + :param records: 爬虫获取的原始岗位列表 + :param task_crawl_id: 爬虫任务ID (关联 app_url_list) + :param company_id: 公司标识 + :param company: 公司中文名称 + :return: list[dict] + """ +``` + +### 清洗规则 + +1. **必填字段必须返回**:`task_crawl_id`、`job_title`、`company_id`、`detail_url`、`recruit_category`、`content_hash`、`expire_at` +2. **可选字段有值才设置**:`salary`、`location`、`experience`、`education`、`description`、`company`,没有就不放入dict +3. **不需要传的字段一律不返回**:数据库有默认值的字段由数据库处理 +4. **content_hash 生成**:`hashlib.md5(f"{job_title}|{company_id}|{description}".encode()).hexdigest()` +5. **recruit_category 映射**:根据网站的分类标识映射到 0=社招, 1=校招, 2=实习 +6. **description 拼接**:将职责和要求用 `【工作职责】` `【任职要求】` 标签拼接 +7. **空值处理**:原始数据为空、"/"、None 的字段不放入返回结果 +8. **expire_at**:优先从岗位的发布日期字段匹配,匹配不到则设为当天日期 + +--- + +## 附录:实战经验总结 + +### 平台识别与复用 + +| 特征 | 平台 | 复用策略 | +|------|------|----------| +| 域名含 `zhiye.com` | 北森招聘平台 | API 结构完全一致,改域名和 company_id 即可复用 | +| 域名含 `italent.cn` | 北森 iTalent | 同上 | +| 页面底部 "Powered by Beisen" | 北森 | 同上 | +| 域名含 `hotjob.cn` | hotjob 平台 | form 表单格式请求,recruitType 区分分类 | +| 纯静态 HTML,无 XHR 请求 | 自建官网 | 用 requests + BeautifulSoup 解析 | +| ssdp.crc.com.cn 网关 | 华润系统 | 请求体 base64 编码,响应 RETURN_DATA 也需 base64 解码 | + +### 北森平台 (zhiye.com) 通用模板 + +已验证适用于:btyy.zhiye.com、fullsemi.zhiye.com 等所有北森招聘站点。 + +```python +# 核心接口 +POST https://{domain}/api/Jobad/GetJobAdPageList + +# 请求体 +{"PageIndex": 0, "PageSize": 20, "Category": ["1"], "KeyWords": "", "SpecialType": 0, "PortalId": "", "DisplayFields": [...]} + +# Category: "1"=社招, "2"=校招 +# 响应直接包含完整岗位信息(Duty、Require),无需单独详情接口 +# 分页:PageIndex 从 0 开始,通过 Count 字段判断总数 +``` + +关键点: +- 无需认证,无 Cookie/Token 依赖 +- 列表接口已包含完整岗位详情(混合模式),不需要单独请求详情页 +- Headers 只需 Content-Type、User-Agent、Referer + +### hotjob 平台 (wecruit.hotjob.cn) 通用模板 + +```python +# 列表接口 +POST https://wecruit.hotjob.cn/wecruit/positionInfo/listPosition/{SUITE_KEY}?iSaJAx=isAjax&request_locale=zh_CN +Content-Type: application/x-www-form-urlencoded +Body: recruitType=1¤tPage=1&pageSize=10&coordinateLat=&coordinateLng=&orgCode=0 + +# 详情接口 +POST https://wecruit.hotjob.cn/wecruit/positionInfo/listPositionDetail/{SUITE_KEY}?iSaJAx=isAjax&request_locale=zh_CN +Body: postId={postId}&recruitType={recruitType} + +# recruitType: 1=校招, 2=社招, 12=实习(注意不是连续数字!) +``` + +关键点: +- 请求格式是 form 表单,不是 JSON +- 列表只有简要信息,需要单独请求详情获取 workContent、serviceCondition +- SUITE_KEY 从 URL 中提取 +- company 字段在每条岗位数据中(集团招聘,子公司不同) + +### 静态 HTML 网站通用策略 + +适用于:leinao.ai 等自建官网。 + +```python +# 列表页:解析 获取岗位ID列表 +# 详情页:逐个请求 /jobdetail/{id},用 BeautifulSoup 解析内容 +``` + +关键点: +- 没有 API,网络面板无 XHR 请求(只有埋点/统计) +- 列表页通常只有简要信息(标题、地点、类型),详情需要单独请求 +- 需要处理 HTML 结构差异,不同网站标签不同 +- 注意 `\xa0`(不间断空格)等特殊字符的清理 + +### 华润系统 (ssdp.crc.com.cn) 通用模板 + +```python +# 统一网关 +POST https://ssdp.crc.com.cn/ssdp/sys/rf/?ssdp={base64编码的认证参数} + +# 请求体:先 JSON 序列化再 base64 编码 +payload = {"base64String": base64.b64encode(json.dumps({"biz": {...}}).encode()).decode()} + +# 响应:RETURN_DATA 字段是 base64 编码的 JSON +data = json.loads(base64.b64decode(response["RESPONSE"]["RETURN_DATA"])) +``` + +关键点: +- ssdp 参数包含 Api_ID、App_Sub_ID、App_Token、时间戳等 +- 注意 App_Sub_ID 要从浏览器实际请求中精确复制(容易看错字符) +- 请求体和响应体都有 base64 编码层 + +### 常见坑与解决方案 + +| 问题 | 原因 | 解决 | +|------|------|------| +| 响应为空 body | 认证参数错误 | 对比浏览器实际 ssdp 参数,逐字符核对 | +| "不限" 出现在 location | 网站用"不限"表示无地点限制 | 过滤掉"不限"、"面议"等占位值 | +| Windows 终端中文乱码 | 控制台编码非 UTF-8 | 数据本身正确,用 Read 工具或文件验证 | +| SPA 页面刷新后抓不到请求 | hash 路由不触发新请求 | 新开标签页重新加载 | +| 列表页已包含详情 | 混合模式网站 | 不需要单独请求详情接口,直接从列表提取 | +| 分页参数从 0 还是 1 开始 | 不同平台不同 | 看抓包中第一页的 PageIndex/pageNum 值 | +| 实习 recruitType 不是预期值 | 不一定是连续数字 | 必须实际点击实习分类,从抓包确认真实值 | diff --git a/crawl/task_fetcher.py b/crawl/task_fetcher.py new file mode 100644 index 00000000..09586c53 --- /dev/null +++ b/crawl/task_fetcher.py @@ -0,0 +1,73 @@ +import pymysql +from datetime import datetime + + +DB_CONFIG = { + "host": "192.168.31.105", + "port": 3306, + "user": "root", + "password": "123456", + "database": "table_comple", + "charset": "utf8mb4", +} + + +def fetch_next_task(): + """ + 从 app_url_list 获取下一个待处理的任务。 + 使用 SELECT ... FOR UPDATE 原子锁,按 finished_at 最早排序。 + 获取后立即更新 started_at 为当前时间。 + + :return: {"id": int, "url": str, "company": str} 或 None + """ + conn = pymysql.connect(**DB_CONFIG) + try: + conn.begin() + cursor = conn.cursor(pymysql.cursors.DictCursor) + + cursor.execute(""" + SELECT id, input_url, input_company_name + FROM app_url_list + WHERE status != 'processing' + ORDER BY finished_at ASC, id ASC + LIMIT 1 + FOR UPDATE + """) + row = cursor.fetchone() + + if not row: + conn.rollback() + return None + + cursor.execute(""" + UPDATE app_url_list + SET started_at = %s, status = 'processing' + WHERE id = %s + """, (datetime.now(), row["id"])) + + conn.commit() + + return { + "id": row["id"], + "url": row["input_url"], + "company": row["input_company_name"], + } + except Exception as e: + conn.rollback() + raise e + finally: + conn.close() + + +if __name__ == "__main__": + import sys + sys.stdout.reconfigure(encoding="utf-8") + + task = fetch_next_task() + if task: + print(f"获取任务成功:") + print(f" ID: {task['id']}") + print(f" URL: {task['url']}") + print(f" 公司: {task['company']}") + else: + print("没有可用任务")