"""Export mitmproxy MCP traffic database to HAR format.""" import json import sqlite3 import sys from datetime import datetime, timezone from pathlib import Path from urllib.parse import urlparse DB_PATH = Path(__file__).parent.parent / "mitm_mcp_traffic.db" def parse_headers(headers_str): """Parse stored headers JSON into HAR header list.""" if not headers_str: return [] try: headers = json.loads(headers_str) if isinstance(headers, list): return [{"name": pair[0], "value": pair[1]} for pair in headers if len(pair) >= 2] elif isinstance(headers, dict): return [{"name": k, "value": v} for k, v in headers.items()] except (json.JSONDecodeError, TypeError): pass return [] def get_mime_type(headers_str): """Extract content-type from headers.""" if not headers_str: return "application/octet-stream" try: headers = json.loads(headers_str) if isinstance(headers, list): for pair in headers: if len(pair) >= 2 and pair[0].lower() == "content-type": return pair[1].split(";")[0].strip() elif isinstance(headers, dict): for k, v in headers.items(): if k.lower() == "content-type": return v.split(";")[0].strip() except (json.JSONDecodeError, TypeError): pass return "application/octet-stream" def build_har_entry(row): """Convert a DB row to a HAR entry.""" flow_id, url, method, status_code, req_headers, req_body, resp_headers, resp_body, timestamp, size = row parsed = urlparse(url) started = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat() req_header_list = parse_headers(req_headers) resp_header_list = parse_headers(resp_headers) resp_mime = get_mime_type(resp_headers) entry = { "startedDateTime": started, "time": 0, "request": { "method": method or "GET", "url": url, "httpVersion": "HTTP/1.1", "cookies": [], "headers": req_header_list, "queryString": [ {"name": p.split("=", 1)[0], "value": p.split("=", 1)[1] if "=" in p else ""} for p in (parsed.query.split("&") if parsed.query else []) ], "headersSize": -1, "bodySize": len(req_body.encode("utf-8")) if req_body else 0, }, "response": { "status": status_code or 0, "statusText": "", "httpVersion": "HTTP/1.1", "cookies": [], "headers": resp_header_list, "content": { "size": size or 0, "mimeType": resp_mime, "text": resp_body or "", }, "redirectURL": "", "headersSize": -1, "bodySize": size or 0, }, "cache": {}, "timings": {"send": 0, "wait": 0, "receive": 0}, } if req_body: req_mime = get_mime_type(req_headers) entry["request"]["postData"] = { "mimeType": req_mime, "text": req_body, } return entry def export_har(db_path=DB_PATH, output_path=None, domain=None): """Export traffic DB to HAR file.""" if not db_path.exists(): print(f"Database not found: {db_path}") sys.exit(1) conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() query = "SELECT * FROM flows ORDER BY timestamp ASC" params = [] if domain: query = "SELECT * FROM flows WHERE url LIKE ? ORDER BY timestamp ASC" params = [f"%{domain}%"] cursor.execute(query, params) rows = cursor.fetchall() conn.close() if not rows: print("No traffic found.") sys.exit(0) entries = [build_har_entry(row) for row in rows] har = { "log": { "version": "1.2", "creator": {"name": "mitmproxy-mcp-export", "version": "1.0"}, "entries": entries, } } if output_path is None: output_path = Path(f"traffic_{datetime.now().strftime('%Y%m%d_%H%M%S')}.har") output_path = Path(output_path) output_path.write_text(json.dumps(har, ensure_ascii=False, indent=2), encoding="utf-8") print(f"Exported {len(entries)} entries to {output_path}") if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Export mitmproxy MCP traffic to HAR") parser.add_argument("-o", "--output", help="Output HAR file path") parser.add_argument("-d", "--domain", help="Filter by domain") parser.add_argument("--db", help="Database path", default=str(DB_PATH)) args = parser.parse_args() export_har( db_path=Path(args.db), output_path=args.output, domain=args.domain, )