# ⚠️ DEPRECATED — 2026-03-21 # 此文件是内部手工复制文件,已废弃,不再由任何 facade 引用。 # 请改用 spiderJobs.platforms.* 或 crawler_core 中的对应模块。 # 将在下一里程碑中删除。 # """ 前程无忧 (51Job) - 所有 API 接口 复制自 spiderJobs/platforms/job51/api.py — import 改为本地引用 """ from __future__ import annotations from typing import Any, Optional from crawler_core.base import BaseFetcher, BaseSearcher, Result as ApiResult from app.services.crawler._job51_client import Job51Client, create_client def _parse_job51_response(http_code: int, raw: Any) -> ApiResult: if http_code != 200: return ApiResult(success=False, status_code=http_code, error=f"HTTP 请求失败: {http_code}") if not isinstance(raw, dict): return ApiResult(success=False, status_code=http_code, error="响应格式异常") biz_status = raw.get("status") if biz_status is not None and str(biz_status) != "1": return ApiResult( success=False, status_code=int(biz_status) if str(biz_status).isdigit() else -1, error=raw.get("message") or f"业务错误: {biz_status}", ) payload = raw.get("resultbody") or raw.get("data") or {} if isinstance(payload, dict) and "jobList" in payload: job_list_wrap = payload.get("jobList", {}) if isinstance(job_list_wrap, dict) and "items" in job_list_wrap: items = job_list_wrap.get("items", []) return ApiResult( success=True, status_code=200, data=payload, list=items, count=len(items), is_end_page=len(items) == 0, ) if isinstance(job_list_wrap, list): return ApiResult( success=True, status_code=200, data=payload, list=job_list_wrap, count=len(job_list_wrap), is_end_page=len(job_list_wrap) == 0, ) if isinstance(payload, dict) and "items" in payload: items = payload.get("items", []) total = payload.get("totalCount", len(items)) return ApiResult( success=True, status_code=200, data=payload, list=items, count=total, is_end_page=len(items) == 0, ) if isinstance(payload, dict) and "list" in payload: items = payload.get("list", []) return ApiResult( success=True, status_code=200, data=payload, list=items, count=len(items), is_end_page=len(items) == 0, ) return ApiResult(success=True, status_code=200, data=payload) class SearchRecommendJobs(BaseSearcher): ENDPOINT = "open/noauth/recommend/job-tab-dynamic-wx-mini" def __init__( self, *, job_area: str = "020000", function_type: str = "", job_type: str = "recommend", page_size: int = 10, client: Optional[Job51Client] = None, ): super().__init__(page_size=page_size, http_client=client or create_client()) self.job_area = job_area self.function_type = function_type self.job_type = job_type def _build_params(self, page_index: int) -> dict: body = { "pageNo": page_index, "pageSize": self.page_size, "specialPageCode": True, "isTouristMode": True, "type": self.job_type, "jobArea": self.job_area, "personAsLabel": "1", } if self.function_type: body["functionType"] = self.function_type return body def _parse(self, http_code: int, raw: Any) -> ApiResult: return _parse_job51_response(http_code, raw) class GetJobDetail(BaseFetcher): ENDPOINT = "open/noauth/jobs/detail/base" def __init__(self, *, job_id: str, client: Optional[Job51Client] = None): super().__init__(http_client=client or create_client()) self.job_id = job_id def _build_params(self) -> dict: return {} def fetch(self) -> ApiResult: endpoint = f"{self.ENDPOINT}/{self.job_id}" try: http_code, data = self.http_client.get(endpoint) except Exception as e: return ApiResult(success=False, status_code=-1, error=str(e)) return self._parse(http_code, data) def _parse(self, http_code: int, raw: Any) -> ApiResult: return _parse_job51_response(http_code, raw) class GetCompanyInfo(BaseFetcher): ENDPOINT = "open/noauth/company-info/info-data" def __init__( self, *, company_id: str, color_one: str = "#ffffff", color_two: str = "#ffffffcc", client: Optional[Job51Client] = None, ): super().__init__(http_client=client or create_client()) self.company_id = company_id self.color_one = color_one self.color_two = color_two def _build_params(self) -> dict: return { "companyId": self.company_id, "colorOne": self.color_one, "colorTwo": self.color_two, } def fetch(self) -> ApiResult: try: http_code, data = self.http_client.get(self.ENDPOINT, self._build_params()) except Exception as e: return ApiResult(success=False, status_code=-1, error=str(e)) return self._parse(http_code, data) def _parse(self, http_code: int, raw: Any) -> ApiResult: return _parse_job51_response(http_code, raw) class SearchCompanyJobs(BaseSearcher): ENDPOINT = "open/noauth/jobs/company" def __init__( self, *, company_id: str, job_area: str = "", function: str = "", salary_type: str = "", page_size: int = 10, client: Optional[Job51Client] = None, ): super().__init__(page_size=page_size, http_client=client or create_client()) self.company_id = company_id self.job_area = job_area self.function = function self.salary_type = salary_type def _build_params(self, page_index: int) -> dict: return { "pageNum": page_index, "pageSize": self.page_size, "coId": self.company_id, "jobArea": self.job_area, "function": self.function, "salaryType": self.salary_type, "scene": 14, "requestId": "", } def _parse(self, http_code: int, raw: Any) -> ApiResult: return _parse_job51_response(http_code, raw)