""" Boss直聘 Service — 基于新算法文件的封装 保持对外公开接口不变(cleaning.py / company_cleaner.py 依赖) """ from __future__ import annotations from typing import Any, Dict, List, Optional from loguru import logger from app.services.crawler._boss_api import ( GetBrandDetail, GetJobDetail, SearchBrandJobs, SearchRecJobs, ) from app.services.crawler._boss_client import BossClient, create_client from app.services.crawler._boss_sign import BossSign class BossService: def __init__(self, proxy_pool: Optional[List[Dict[str, str]]] = None): self._signer = BossSign() proxy = None if proxy_pool: proxy = proxy_pool[0].get("https") or proxy_pool[0].get("http") if proxy_pool else None self._client = create_client(signer=self._signer, proxy=proxy) # login_data 用于外部检查(cleaning.py 通过 boss_service.login_data.get("mpt") 判断) self.login_data: Dict[str, str] = { "mpt": "", "wt2": "", "openId": "", } def set_login_data(self, mpt: str, wt2: str, open_id: str = "") -> None: self.login_data.update({"mpt": mpt, "wt2": wt2, "openId": open_id}) self._signer.mpt = mpt self._signer.wt2 = wt2 def set_proxy(self, proxy: Optional[str]) -> None: if proxy: proxy = proxy.strip().strip("`") self._client = create_client(signer=self._signer, proxy=proxy or None) logger.info(f"BossService proxy set to: {proxy or 'direct'}") def get_job_detail_by_id( self, job_id: str, lid: str = "", security_id: str = "" ) -> Optional[Dict]: """根据招聘ID获取招聘详情""" logger.info(f"获取招聘详情: {job_id}") try: fetcher = GetJobDetail( security_id=security_id, job_id=job_id, lid=lid, client=self._client, ) result = fetcher.fetch() if result.success: return result.data logger.warning(f"Boss get_job_detail failed: {result.error}") return None except Exception as e: logger.error(f"Boss get_job_detail exception: {e}") return None def get_company_detail_by_id(self, company_id: str) -> Optional[Dict]: """根据公司ID获取公司详情""" logger.info(f"获取公司详情: {company_id}") try: fetcher = GetBrandDetail(brand_id=company_id, client=self._client) result = fetcher.fetch() if result.success: return result.data logger.warning(f"Boss get_company_detail failed: {result.error}") return None except Exception as e: logger.error(f"Boss get_company_detail exception: {e}") return None def get_company_jobs_by_id( self, company_id: str, page: int = 1 ) -> Optional[Dict]: """根据公司ID获取该公司职位列表""" logger.info(f"获取公司职位列表: {company_id}, page={page}") try: searcher = SearchBrandJobs( brand_id=company_id, page_size=15, client=self._client, ) result = searcher.search(page_index=page) if result.success: return result.data logger.warning(f"Boss get_company_jobs failed: {result.error}") return None except Exception as e: logger.error(f"Boss get_company_jobs exception: {e}") return None def search_jobs( self, keyword: str, city_code: str = "101010100", page: int = 1 ) -> Optional[Dict]: """搜索职位""" logger.info(f"Boss search_jobs: keyword={keyword}, city={city_code}, page={page}") try: searcher = SearchRecJobs( city_code=city_code, page_size=15, client=self._client, ) result = searcher.search(page_index=page) if result.success: return result.data logger.warning(f"Boss search_jobs failed: {result.error}") return None except Exception as e: logger.error(f"Boss search_jobs exception: {e}") return None # ── asyncio.to_thread 桥接(ARCH-06)──────────────────────── async def async_get_job_detail( self, job_id: str, lid: str = "", security_id: str = "" ) -> Optional[Dict]: import asyncio return await asyncio.to_thread(self.get_job_detail_by_id, job_id, lid, security_id) async def async_get_company_detail(self, company_id: str) -> Optional[Dict]: import asyncio return await asyncio.to_thread(self.get_company_detail_by_id, company_id) async def async_get_company_jobs( self, company_id: str, page: int = 1 ) -> Optional[Dict]: import asyncio return await asyncio.to_thread(self.get_company_jobs_by_id, company_id, page) async def async_search_jobs( self, keyword: str, city_code: str = "101010100", page: int = 1 ) -> Optional[Dict]: import asyncio return await asyncio.to_thread(self.search_jobs, keyword, city_code, page)