""" 前程无忧 (51Job) Service — 基于新算法文件的封装 保持对外公开接口不变(cleaning.py / company_cleaner.py 依赖) """ from __future__ import annotations from typing import Any, Dict, List, Optional from loguru import logger from app.services.crawler._job51_api import ( GetCompanyInfo, GetJobDetail, SearchCompanyJobs, SearchRecommendJobs, ) from app.services.crawler._job51_client import Job51Client, create_client class QcwyService: def __init__(self, proxy_url: Optional[str] = None): self._client = create_client(proxy=proxy_url or None) def set_proxy(self, proxy_url: Optional[str]) -> None: self._client = create_client(proxy=proxy_url or None) logger.info(f"QcwyService proxy set to: {proxy_url or 'direct'}") def get_job_detail(self, job_id: str) -> Dict[str, Any]: """获取职位详情""" logger.info(f"Qcwy get_job_detail: {job_id}") try: fetcher = GetJobDetail(job_id=job_id, client=self._client) result = fetcher.fetch() if result.success: return result.data or {} logger.warning(f"Qcwy get_job_detail failed: {result.error}") return {} except Exception as e: logger.error(f"Qcwy get_job_detail exception: {e}") return {} def get_company_info(self, company_id: str) -> Dict[str, Any]: """获取公司信息""" logger.info(f"Qcwy get_company_info: {company_id}") try: fetcher = GetCompanyInfo(company_id=company_id, client=self._client) result = fetcher.fetch() if result.success: return result.data or {} logger.warning(f"Qcwy get_company_info failed: {result.error}") return {} except Exception as e: logger.error(f"Qcwy get_company_info exception: {e}") return {} def search_jobs( self, keyword: str, job_area: str = "020000", page: int = 1 ) -> List[Dict[str, Any]]: """搜索职位(返回列表)""" logger.info(f"Qcwy search_jobs: keyword={keyword}, area={job_area}, page={page}") try: searcher = SearchRecommendJobs( job_area=job_area, page_size=20, client=self._client, ) result = searcher.search(page_index=page) if result.success: return result.list or [] logger.warning(f"Qcwy search_jobs failed: {result.error}") return [] except Exception as e: logger.error(f"Qcwy search_jobs exception: {e}") return [] def get_company_jobs_by_id( self, company_id: str, page: int = 1, page_size: int = 30, job_area: str = "", function: str = "", salary_type: str = "", ) -> Dict[str, Any]: """获取公司职位列表""" logger.info(f"Qcwy get_company_jobs: company={company_id}, page={page}") try: searcher = SearchCompanyJobs( company_id=company_id, job_area=job_area, function=function, salary_type=salary_type, page_size=page_size, client=self._client, ) result = searcher.search(page_index=page) if result.success: return result.data or {} logger.warning(f"Qcwy get_company_jobs failed: {result.error}") return {} except Exception as e: logger.error(f"Qcwy get_company_jobs exception: {e}") return {} # ── asyncio.to_thread 桥接(ARCH-06)──────────────────────── async def async_get_job_detail(self, job_id: str) -> Dict: import asyncio return await asyncio.to_thread(self.get_job_detail, job_id) async def async_get_company_info(self, company_id: str) -> Dict: import asyncio return await asyncio.to_thread(self.get_company_info, company_id) async def async_get_company_jobs( self, company_id: str, page: int = 1, page_size: int = 30, **kwargs ) -> Dict: import asyncio return await asyncio.to_thread( self.get_company_jobs_by_id, company_id, page, page_size ) async def async_search_jobs( self, keyword: str, job_area: str = "020000", page: int = 1 ) -> List: import asyncio return await asyncio.to_thread(self.search_jobs, keyword, job_area, page)