generated from kgod/ai-review-template
c06f595559
- btyy (倍特药业), fullsemi (富芯半导体): 北森平台爬虫 - hotjob (中国五矿): hotjob平台爬虫 - leinao (中科类脑): 静态HTML爬虫 - task_fetcher: 原子锁获取任务 - post.md: 抓取技能文档 - export_har: mitmproxy HAR导出工具
158 lines
4.8 KiB
Python
158 lines
4.8 KiB
Python
"""Export mitmproxy MCP traffic database to HAR format."""
|
|
|
|
import json
|
|
import sqlite3
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from urllib.parse import urlparse
|
|
|
|
DB_PATH = Path(__file__).parent.parent / "mitm_mcp_traffic.db"
|
|
|
|
|
|
def parse_headers(headers_str):
|
|
"""Parse stored headers JSON into HAR header list."""
|
|
if not headers_str:
|
|
return []
|
|
try:
|
|
headers = json.loads(headers_str)
|
|
if isinstance(headers, list):
|
|
return [{"name": pair[0], "value": pair[1]} for pair in headers if len(pair) >= 2]
|
|
elif isinstance(headers, dict):
|
|
return [{"name": k, "value": v} for k, v in headers.items()]
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
return []
|
|
|
|
|
|
def get_mime_type(headers_str):
|
|
"""Extract content-type from headers."""
|
|
if not headers_str:
|
|
return "application/octet-stream"
|
|
try:
|
|
headers = json.loads(headers_str)
|
|
if isinstance(headers, list):
|
|
for pair in headers:
|
|
if len(pair) >= 2 and pair[0].lower() == "content-type":
|
|
return pair[1].split(";")[0].strip()
|
|
elif isinstance(headers, dict):
|
|
for k, v in headers.items():
|
|
if k.lower() == "content-type":
|
|
return v.split(";")[0].strip()
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
return "application/octet-stream"
|
|
|
|
|
|
def build_har_entry(row):
|
|
"""Convert a DB row to a HAR entry."""
|
|
flow_id, url, method, status_code, req_headers, req_body, resp_headers, resp_body, timestamp, size = row
|
|
|
|
parsed = urlparse(url)
|
|
started = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
|
|
|
|
req_header_list = parse_headers(req_headers)
|
|
resp_header_list = parse_headers(resp_headers)
|
|
resp_mime = get_mime_type(resp_headers)
|
|
|
|
entry = {
|
|
"startedDateTime": started,
|
|
"time": 0,
|
|
"request": {
|
|
"method": method or "GET",
|
|
"url": url,
|
|
"httpVersion": "HTTP/1.1",
|
|
"cookies": [],
|
|
"headers": req_header_list,
|
|
"queryString": [
|
|
{"name": p.split("=", 1)[0], "value": p.split("=", 1)[1] if "=" in p else ""}
|
|
for p in (parsed.query.split("&") if parsed.query else [])
|
|
],
|
|
"headersSize": -1,
|
|
"bodySize": len(req_body.encode("utf-8")) if req_body else 0,
|
|
},
|
|
"response": {
|
|
"status": status_code or 0,
|
|
"statusText": "",
|
|
"httpVersion": "HTTP/1.1",
|
|
"cookies": [],
|
|
"headers": resp_header_list,
|
|
"content": {
|
|
"size": size or 0,
|
|
"mimeType": resp_mime,
|
|
"text": resp_body or "",
|
|
},
|
|
"redirectURL": "",
|
|
"headersSize": -1,
|
|
"bodySize": size or 0,
|
|
},
|
|
"cache": {},
|
|
"timings": {"send": 0, "wait": 0, "receive": 0},
|
|
}
|
|
|
|
if req_body:
|
|
req_mime = get_mime_type(req_headers)
|
|
entry["request"]["postData"] = {
|
|
"mimeType": req_mime,
|
|
"text": req_body,
|
|
}
|
|
|
|
return entry
|
|
|
|
|
|
def export_har(db_path=DB_PATH, output_path=None, domain=None):
|
|
"""Export traffic DB to HAR file."""
|
|
if not db_path.exists():
|
|
print(f"Database not found: {db_path}")
|
|
sys.exit(1)
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
cursor = conn.cursor()
|
|
|
|
query = "SELECT * FROM flows ORDER BY timestamp ASC"
|
|
params = []
|
|
if domain:
|
|
query = "SELECT * FROM flows WHERE url LIKE ? ORDER BY timestamp ASC"
|
|
params = [f"%{domain}%"]
|
|
|
|
cursor.execute(query, params)
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
|
|
if not rows:
|
|
print("No traffic found.")
|
|
sys.exit(0)
|
|
|
|
entries = [build_har_entry(row) for row in rows]
|
|
|
|
har = {
|
|
"log": {
|
|
"version": "1.2",
|
|
"creator": {"name": "mitmproxy-mcp-export", "version": "1.0"},
|
|
"entries": entries,
|
|
}
|
|
}
|
|
|
|
if output_path is None:
|
|
output_path = Path(f"traffic_{datetime.now().strftime('%Y%m%d_%H%M%S')}.har")
|
|
|
|
output_path = Path(output_path)
|
|
output_path.write_text(json.dumps(har, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
print(f"Exported {len(entries)} entries to {output_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Export mitmproxy MCP traffic to HAR")
|
|
parser.add_argument("-o", "--output", help="Output HAR file path")
|
|
parser.add_argument("-d", "--domain", help="Filter by domain")
|
|
parser.add_argument("--db", help="Database path", default=str(DB_PATH))
|
|
args = parser.parse_args()
|
|
|
|
export_har(
|
|
db_path=Path(args.db),
|
|
output_path=args.output,
|
|
domain=args.domain,
|
|
)
|