import asyncio import csv import io import re import time from typing import List, Dict, Any, Union, Optional from fastapi import UploadFile from loguru import logger from app.services.crawler.boss import BossService from app.services.crawler.qcwy import QcwyService from app.services.crawler.zhilian import ZhilianService from app.services.company_jobs_sync import CompanyJobsSyncService from app.services.company_storage import company_storage from app.services.ingest import IngestService from app.core.clickhouse import clickhouse_manager from app.models.token import BossToken class CleaningService: _TOKEN_REFRESH_INTERVAL = 3600 # 1小时刷新一次 def __init__(self): self.boss_service = BossService() self.qcwy_service = QcwyService() self.zhilian_service = ZhilianService() self.company_jobs_sync = CompanyJobsSyncService() self.data_router: Optional[IngestService] = None self._boss_token_loaded = False self._token_loaded_at: float = 0 def _apply_proxy(self, proxy: Optional[str]) -> None: self.boss_service.set_proxy(proxy) self.qcwy_service.set_proxy(proxy) self.zhilian_service.set_proxy(proxy) self.company_jobs_sync.set_proxy(proxy) async def _ensure_boss_token_loaded(self) -> None: now = time.time() if (self._boss_token_loaded and self.boss_service.login_data.get("mpt") and now - self._token_loaded_at < self._TOKEN_REFRESH_INTERVAL): return token_obj = await BossToken.filter(is_active=True).order_by("-updated_at").first() if not token_obj: logger.warning("BossToken not found or inactive") return self.boss_service.set_login_data(token_obj.mpt or "", "") self._boss_token_loaded = True self._token_loaded_at = now async def get_data_router(self) -> IngestService: if not self.data_router: client = await clickhouse_manager.get_client() self.data_router = IngestService(client) return self.data_router async def parse_file(self, file: UploadFile) -> List[str]: content = await file.read() filename = file.filename targets = [] if filename.endswith('.csv'): text = content.decode('utf-8') if text.startswith('\uFEFF'): text = text[1:] reader = csv.reader(io.StringIO(text)) for row in reader: if row: targets.append(row[0].strip()) else: text = content.decode('utf-8') targets = [line.strip() for line in text.splitlines() if line.strip()] return [t for t in targets if t] async def _store_company_record( self, source: str, data: Dict[str, Any], company_id: str, ) -> Dict[str, Any]: result = await company_storage.upsert_company(source, data, company_id=company_id) result["duplicate"] = False result["remote_sent"] = False result["message"] = "公司数据已写入MySQL" result["original_data"] = data return result async def process_single_item(self, target: str, clean_type: str = "auto", platform: str = "auto", proxy: Optional[str] = None) -> Dict[str, Any]: try: await self._ensure_boss_token_loaded() self._apply_proxy(proxy) result = None if clean_type == "auto": result = await self.clean_target_auto(target) elif clean_type == "clean_url": if platform == "auto": result = await self.clean_target_auto(target) elif platform == "boss": result = await self._process_boss_url(target) elif platform == "qcwy": result = await self._process_qcwy_url(target) elif platform == "zhilian": result = await self._process_zhilian_url(target) elif clean_type == "job_id": result = await self.clean_by_job_id(target, platform) elif clean_type == "company_name": result = await self.clean_by_company_name(target, platform) elif clean_type == "company_id": result = await self.clean_by_company_id(target, platform) elif clean_type == "company_jobs": if platform == "boss": result = await self.clean_boss_company_jobs(target) elif platform == "qcwy": result = await self.clean_qcwy_company_jobs(target) elif platform == "zhilian": result = await self.clean_zhilian_company_jobs(target) if not result: return { "success": False, "target": target, "error": "No data found or operation failed", "storage_status": "failed", "remote_sent": False } if isinstance(result, bool): return { "success": result, "target": target, "error": None if result else "Operation failed", "storage_status": "unknown", "remote_sent": False } return { "success": result.get("success", False), "target": target, "error": result.get("message") if not result.get("success") else None, "storage_status": "duplicate" if result.get("duplicate") else "saved", "remote_sent": result.get("remote_sent", False), "data_summary": result.get("data_summary"), "jobs_summary": result.get("jobs_summary"), "original_data": result.get("original_data") } except Exception as e: logger.error(f"Error processing item {target}: {e}") return { "success": False, "target": target, "error": str(e), "storage_status": "error", "remote_sent": False } async def clean_target_auto(self, target: str) -> Union[bool, Dict[str, Any]]: if "zhipin.com" in target: return await self._process_boss_url(target) elif "51job.com" in target: return await self._process_qcwy_url(target) elif "zhaopin.com" in target: return await self._process_zhilian_url(target) return await self._process_search_company(target) async def clean_by_job_id(self, target: str, platform: str) -> Union[bool, Dict[str, Any]]: router = await self.get_data_router() data = None result = None if platform == "boss": match = re.search(r'job_detail/([^.]+)\.html', target) if match: target = match.group(1) elif platform == "qcwy": match = re.search(r'/(\d+)\.html', target) if match: target = match.group(1) elif platform == "zhilian": match = re.search(r'jobs\.zhaopin\.com/(\w+)\.htm', target) if match: target = match.group(1) if platform == "boss": data = await asyncio.to_thread(self.boss_service.get_job_detail_by_id, target) if data: result = await router.store_single("boss", "mini", "job", data) elif platform == "qcwy": data = await asyncio.to_thread(self.qcwy_service.get_job_detail, target) if data: result = await router.store_single("qcwy", "mini", "job", data) elif platform == "zhilian": data = await asyncio.to_thread(self.zhilian_service.get_job_detail, target) if data: result = await router.store_single("zhilian", "mini", "job", data) if result and isinstance(result, dict) and data: result['original_data'] = data return result return False async def clean_by_company_name(self, target: str, platform: str) -> Union[bool, Dict[str, Any]]: router = await self.get_data_router() if platform == "boss": res = await asyncio.to_thread(self.boss_service.search_jobs, target) if res and res.get('zpData') and res['zpData'].get('list'): last_result = None for job in res['zpData']['list']: last_result = await router.store_single("boss", "mini", "job", job) if last_result and isinstance(last_result, dict): last_result['original_data'] = res return last_result if last_result else False elif platform == "qcwy": res = await asyncio.to_thread(self.qcwy_service.search_jobs, target) if res: last_result = None for job in res: last_result = await router.store_single("qcwy", "mini", "job", job) if last_result and isinstance(last_result, dict): last_result['original_data'] = res return last_result if last_result else False elif platform == "zhilian": res = await asyncio.to_thread(self.zhilian_service.search_company_jobs_by_name, target) if res and isinstance(res, dict): data = res.get("data") or {} items = data.get("list") or [] if not isinstance(items, list): items = [] last_result = None for job in items: last_result = await router.store_single("zhilian", "mini", "job", job) if last_result and isinstance(last_result, dict): last_result["original_data"] = res return last_result if last_result else False return False async def clean_by_company_id(self, target: str, platform: str) -> Union[bool, Dict[str, Any]]: data = None result = None if platform == "boss": data = await asyncio.to_thread(self.boss_service.get_company_detail_by_id, target) if data: result = await self._store_company_record("boss", data, target) result["jobs_summary"] = await self.company_jobs_sync.sync_company_jobs("boss", target) elif platform == "qcwy": company_id = target match = re.match(r"^co(\d+)$", company_id) if match: company_id = match.group(1) data = await asyncio.to_thread(self.qcwy_service.get_company_info, company_id) if data: result = await self._store_company_record("qcwy", data, company_id) result["jobs_summary"] = await self.company_jobs_sync.sync_company_jobs("qcwy", company_id) elif platform == "zhilian": data = await asyncio.to_thread(self.zhilian_service.get_company_detail, target) if data: result = await self._store_company_record("zhilian", data, target) result["jobs_summary"] = await self.company_jobs_sync.sync_company_jobs("zhilian", target) if result and isinstance(result, dict) and data: result['original_data'] = data return result return False async def clean_boss_company_jobs(self, target: str) -> Union[bool, Dict[str, Any]]: company_id = target match = re.search(r'gongsi/([^.]+)\.html', target) if match: company_id = match.group(1) result = await self.company_jobs_sync.sync_company_jobs("boss", company_id) return result if result.get("jobs_fetched", 0) > 0 else False async def clean_qcwy_company_jobs(self, target: str) -> Union[bool, Dict[str, Any]]: company_id = target match = re.match(r'^co(\d+)$', company_id) if match: company_id = match.group(1) result = await self.company_jobs_sync.sync_company_jobs("qcwy", company_id) return result if result.get("jobs_fetched", 0) > 0 else False async def clean_zhilian_company_jobs(self, target: str) -> Union[bool, Dict[str, Any]]: company_id = target result = await self.company_jobs_sync.sync_company_jobs("zhilian", company_id) return result if result.get("jobs_fetched", 0) > 0 else False async def _process_boss_url(self, url: str) -> Union[bool, Dict[str, Any]]: job_match = re.search(r'job_detail/([^.]+)\.html', url) if job_match: return await self.clean_by_job_id(job_match.group(1), "boss") company_match = re.search(r'gongsi/([^.]+)\.html', url) if company_match: return await self.clean_by_company_id(company_match.group(1), "boss") return await self.clean_by_job_id(url, "boss") async def _process_qcwy_url(self, url: str) -> Union[bool, Dict[str, Any]]: job_match = re.search(r'/(\d+)\.html', url) if job_match: return await self.clean_by_job_id(job_match.group(1), "qcwy") company_match = re.search(r'co(\d+)', url, re.IGNORECASE) if company_match: return await self.clean_by_company_id(company_match.group(1), "qcwy") return await self.clean_by_job_id(url, "qcwy") async def _process_zhilian_url(self, url: str) -> Union[bool, Dict[str, Any]]: job_match = re.search(r'jobs\.zhaopin\.com/(\w+)\.htm', url) if job_match: return await self.clean_by_job_id(job_match.group(1), "zhilian") company_match = re.search(r'/company/([A-Za-z0-9]+)', url) if company_match: return await self.clean_by_company_id(company_match.group(1), "zhilian") return await self.clean_by_job_id(url, "zhilian") async def _process_search_company(self, name: str) -> Union[bool, Dict[str, Any]]: return await self.clean_by_company_name(name, "boss")