From ceb359d535a0073b9d0ca4c8b8e07bd70b2c6e2a Mon Sep 17 00:00:00 2001 From: win Date: Sat, 21 Mar 2026 18:08:59 +0800 Subject: [PATCH] feat(01-01): create crawler_core/http_client.py with tenacity retry and stdlib logging MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Port HTTPClient from spiderJobs/core/http_client.py - Add tenacity @retry decorator on post() and get() (3 attempts, min=10s wait) - Use stdlib logging.getLogger('crawler_core.http_client') — no loguru - No imports from spiderJobs.* or app.* - TLS fingerprint and proxy logic preserved unchanged --- crawler_core/http_client.py | 191 ++++++++++++++++++++++++++++++++++++ 1 file changed, 191 insertions(+) create mode 100644 crawler_core/http_client.py diff --git a/crawler_core/http_client.py b/crawler_core/http_client.py new file mode 100644 index 0000000..2d30d85 --- /dev/null +++ b/crawler_core/http_client.py @@ -0,0 +1,191 @@ +""" +crawler_core.http_client — 通用 HTTP 客户端 + +基于 requests-go,自带 Chrome TLS 指纹伪装(TLS_CHROME_LATEST + random_ja3=True)。 +支持代理 IP / 隧道代理 / 代理池轮换。 +内置 tenacity 重试(3次,指数退避,最小10秒间隔)。 +使用 stdlib logging — 上层可通过 logging.getLogger('crawler_core') 配置。 + +不依赖 loguru / FastAPI / Tortoise-ORM 等应用框架。 +""" + +from __future__ import annotations + +import logging +import random +from typing import Any, Optional + +import requests_go as requests +from requests_go.tls_config import TLS_CHROME_LATEST +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_random_exponential, +) + +logger = logging.getLogger("crawler_core.http_client") + + +class HTTPClient: + """ + 通用 HTTP 客户端 + + Args: + base_url: API 基础地址 + default_headers: 默认请求头 + proxy: 固定代理地址(绑定到 session,复用连接) + tunnel_proxy: 隧道代理地址(每次请求新建 session,确保 IP 轮换) + proxy_pool: 代理池列表(每次请求随机选一个) + timeout: 请求超时秒数(默认 10) + + 代理优先级: tunnel_proxy > proxy_pool > proxy + 三者只用其一即可。 + + 代理格式示例: + 普通代理: "http://127.0.0.1:7890" + SOCKS5 代理: "socks5://127.0.0.1:1080" + 隧道代理: "http://user:pass@tunnel.example.com:12345" + 隧道代理(认证): "http://account-zone-xxx:password@proxy.host:port" + + 隧道代理用法(每次请求自动换 IP): + client = HTTPClient( + base_url="https://example.com", + tunnel_proxy="http://user:pass@tunnel.example.com:12345", + ) + # 每次 get/post 都会新建 TCP 连接,隧道代理自动分配新 IP + """ + + def __init__( + self, + base_url: str, + default_headers: Optional[dict] = None, + proxy: Optional[str] = None, + tunnel_proxy: Optional[str] = None, + proxy_pool: Optional[list[str]] = None, + timeout: int = 10, + ): + self.base_url = base_url + self.default_headers = default_headers or {} + self.timeout = timeout + + # 代理配置 + self._proxy = proxy + self._tunnel_proxy = tunnel_proxy + self._proxy_pool = proxy_pool + + # 创建 session + TLS 指纹 + self._session = requests.Session() + self._session.tls_config = TLS_CHROME_LATEST + TLS_CHROME_LATEST.random_ja3 = True + + # 固定代理直接设到 session 上 + if proxy and not proxy_pool and not tunnel_proxy: + self._session.proxies = {"http": proxy, "https": proxy} + + def _new_session(self) -> requests.Session: + """创建全新 session(用于隧道代理 IP 轮换)""" + s = requests.Session() + s.tls_config = TLS_CHROME_LATEST + TLS_CHROME_LATEST.random_ja3 = True + return s + + def _get_proxies(self) -> Optional[dict]: + """获取本次请求的代理配置""" + if self._proxy_pool: + # 代理池:随机选一个,加 #random_hash 打破连接复用 + chosen = random.choice(self._proxy_pool) + unique = f"{chosen}#{random.randint(100000, 999999)}" + return {"http": unique, "https": unique} + return None # 固定代理已在 session 上,不需要每次传 + + def _merge_headers(self, extra: Optional[dict] = None) -> dict: + headers = {**self.default_headers} + if extra: + headers.update(extra) + return headers + + @retry( + stop=stop_after_attempt(3), + wait=wait_random_exponential(multiplier=1, min=10, max=30), + retry=retry_if_exception_type((ConnectionError, TimeoutError, OSError)), + reraise=True, + before_sleep=lambda retry_state: logger.warning( + "HTTP retry attempt=%d url=%s error=%s", + retry_state.attempt_number, + retry_state.args[1] if retry_state.args else "unknown", + retry_state.outcome.exception(), + ), + ) + def post(self, path: str, body: dict, headers: Optional[dict] = None) -> tuple[int, Any]: + """发送 POST 请求""" + logger.debug("POST %s%s", self.base_url, path) + merged_headers = self._merge_headers(headers) + + # 隧道代理:每次新 session,确保 IP 轮换 + if self._tunnel_proxy: + s = self._new_session() + try: + resp = s.post( + f"{self.base_url}{path}", + json=body, + headers=merged_headers, + proxies={"http": self._tunnel_proxy, "https": self._tunnel_proxy}, + timeout=self.timeout, + ) + return resp.status_code, resp.json() + finally: + s.close() + + kwargs: dict[str, Any] = { + "json": body, + "headers": merged_headers, + "timeout": self.timeout, + } + proxies = self._get_proxies() + if proxies: + kwargs["proxies"] = proxies + resp = self._session.post(f"{self.base_url}{path}", **kwargs) + return resp.status_code, resp.json() + + @retry( + stop=stop_after_attempt(3), + wait=wait_random_exponential(multiplier=1, min=10, max=30), + retry=retry_if_exception_type((ConnectionError, TimeoutError, OSError)), + reraise=True, + before_sleep=lambda retry_state: logger.warning( + "HTTP retry attempt=%d error=%s", + retry_state.attempt_number, + retry_state.outcome.exception(), + ), + ) + def get(self, path: str, params: Optional[dict] = None, headers: Optional[dict] = None) -> tuple[int, Any]: + """发送 GET 请求""" + logger.debug("GET %s%s", self.base_url, path) + merged_headers = self._merge_headers(headers) + + # 隧道代理:每次新 session,确保 IP 轮换 + if self._tunnel_proxy: + s = self._new_session() + try: + resp = s.get( + f"{self.base_url}{path}", + params=params, + headers=merged_headers, + proxies={"http": self._tunnel_proxy, "https": self._tunnel_proxy}, + timeout=self.timeout, + ) + return resp.status_code, resp.json() + finally: + s.close() + + kwargs: dict[str, Any] = { + "params": params, + "headers": merged_headers, + "timeout": self.timeout, + } + proxies = self._get_proxies() + if proxies: + kwargs["proxies"] = proxies + resp = self._session.get(f"{self.base_url}{path}", **kwargs) + return resp.status_code, resp.json()