import asyncio import json import random from datetime import datetime from typing import Any, Dict, List, Optional from loguru import logger from app.core.clickhouse import clickhouse_manager from app.models.token import BossToken from app.services.crawler.boss import BossService from app.services.crawler.qcwy import QcwyService from app.services.crawler.zhilian import ZhilianService class CompanyCleaner: def __init__(self): self.boss_service = BossService() self.qcwy_service = QcwyService() self.zhilian_service = ZhilianService() self._boss_token_loaded = False def _apply_proxy(self, proxy: Optional[str]) -> None: self.boss_service.set_proxy(proxy) self.qcwy_service.set_proxy(proxy) self.zhilian_service.set_proxy(proxy) async def _ensure_boss_token_loaded(self) -> None: if self._boss_token_loaded and self.boss_service.login_data.get("mpt"): return token_obj = await BossToken.filter(is_active=True).order_by("-updated_at").first() if not token_obj: logger.warning("BossToken not found or inactive in CompanyCleaner") return self.boss_service.set_login_data(token_obj.mpt or "", "") self._boss_token_loaded = True async def collect_pending_companies(self, limit: int = 1000, source: Optional[str] = None): client = await clickhouse_manager.get_client() logger.info(f"Starting to collect pending companies (limit={limit}, source={source or 'all'})...") if source is None or source == "zhilian": await self._collect_zhilian(client, limit) if source is None or source == "qcwy": await self._collect_qcwy(client, limit) if source is None or source == "boss": await self._collect_boss(client, limit) logger.info("Finished collecting pending companies.") async def _collect_zhilian(self, client, limit: int): logger.info("Collecting Zhilian companies...") # 优化:先获取已存在的公司ID,避免在子查询中读取json_data # 使用PREWHERE提前过滤时间范围,减少需要读取的数据量 # 检查90天内已处理的公司,避免重复请求 days_back_existing = 90 # 查询最近90天的数据,避免重复请求已处理过的公司 existing_companies_query = f""" SELECT DISTINCT JSONExtractString(json_data, 'companyNumber') as cid FROM job_data.zhilian_company PREWHERE updated_at > now() - INTERVAL {days_back_existing} DAY AND json_data != '' WHERE JSONExtractString(json_data, 'companyNumber') != '' LIMIT 50000 """ # 添加重试机制 existing_result = None existing_cids = set() # 默认使用空集合 for attempt in range(3): try: logger.info(f"Querying existing Zhilian companies (attempt {attempt+1})...") existing_result = await client.query(existing_companies_query) existing_cids = {row[0] for row in existing_result.result_rows if row[0]} break except Exception as e: error_str = str(e).lower() if "memory" in error_str or "memory_limit" in error_str: if attempt == 0: days_back_existing = 1 existing_companies_query = f""" SELECT DISTINCT JSONExtractString(json_data, 'companyNumber') as cid FROM job_data.zhilian_company PREWHERE updated_at > now() - INTERVAL {days_back_existing} DAY AND json_data != '' WHERE JSONExtractString(json_data, 'companyNumber') != '' LIMIT 5000 """ logger.warning(f"Memory error, reducing time range to {days_back_existing} days") elif attempt == 1: existing_companies_query = f""" SELECT DISTINCT JSONExtractString(json_data, 'companyNumber') as cid FROM job_data.zhilian_company SAMPLE 0.1 PREWHERE updated_at > now() - INTERVAL {days_back_existing} DAY AND json_data != '' WHERE JSONExtractString(json_data, 'companyNumber') != '' LIMIT 2000 """ logger.warning(f"Memory error persists, using SAMPLE 0.1") else: logger.error(f"Failed to query existing companies after {attempt+1} attempts: {e}") logger.warning("Using empty set for existing_cids, continuing with collection...") existing_cids = set() break else: logger.error(f"Non-memory error while querying existing companies: {e}") raise pending_query = "SELECT DISTINCT company_id FROM job_data.pending_company WHERE source = 'zhilian'" pending_result = await client.query(pending_query) pending_cids = {row[0] for row in pending_result.result_rows if row[0]} # 构建排除列表 exclude_cids = existing_cids | pending_cids # 优化:添加时间范围过滤,只查询最近30天的数据,减少扫描量 # 使用 PREWHERE 提前过滤时间范围,避免读取大量历史数据的 json_data # 增加 LIMIT 以便在 Python 中过滤后仍有足够的数据 query = f""" SELECT DISTINCT JSONExtractString(json_data, 'companyNumber') as cid, JSONExtractString(json_data, 'companyName') as cname FROM job_data.zhilian_job PREWHERE created_at > now() - INTERVAL 30 DAY WHERE json_data != '' AND JSONExtractString(json_data, 'companyNumber') != '' LIMIT {limit * 2} """ logger.info(f"Executing SQL for Zhilian (limit={limit * 2}): {query[:500]}...") result = await client.query(query) if not result.result_rows: return # 在 Python 中过滤掉已存在的和待处理的 rows: List[Dict[str, Any]] = [] for cid, cname in result.result_rows: if not cid or cid in exclude_cids: continue if len(rows) >= limit: break rows.append( { "source": "zhilian", "company_id": cid, "company_name": cname, "status": "pending", "created_at": datetime.now(), "updated_at": datetime.now(), } ) await self._insert_pending(client, rows) logger.info(f"Added {len(rows)} Zhilian companies to pending.") async def _collect_qcwy(self, client, limit: int): logger.info("Collecting QCWY companies...") # 优化:先获取已存在的公司ID,避免在子查询中读取json_data # 使用PREWHERE提前过滤时间范围,减少需要读取的数据量 # 检查90天内已处理的公司,避免重复请求 days_back_existing = 90 # 查询最近90天的数据,避免重复请求已处理过的公司 existing_companies_query = f""" SELECT DISTINCT JSONExtractString(json_data, 'companyId') as cid FROM job_data.qcwy_company PREWHERE updated_at > now() - INTERVAL {days_back_existing} DAY AND json_data != '' WHERE JSONExtractString(json_data, 'companyId') != '' LIMIT 50000 """ # 添加重试机制 existing_result = None existing_cids = set() # 默认使用空集合 for attempt in range(3): try: logger.info(f"Querying existing QCWY companies (attempt {attempt+1})...") existing_result = await client.query(existing_companies_query) # 查询成功,提取结果 existing_cids = {row[0] for row in existing_result.result_rows if row[0]} break except Exception as e: error_str = str(e).lower() if "memory" in error_str or "memory_limit" in error_str: if attempt == 0: # 第一次失败:进一步减少时间范围 days_back_existing = 1 existing_companies_query = f""" SELECT DISTINCT JSONExtractString(json_data, 'companyId') as cid FROM job_data.qcwy_company PREWHERE updated_at > now() - INTERVAL {days_back_existing} DAY AND json_data != '' WHERE JSONExtractString(json_data, 'companyId') != '' LIMIT 5000 """ logger.warning(f"Memory error, reducing time range to {days_back_existing} days") elif attempt == 1: # 第二次失败:使用采样 existing_companies_query = f""" SELECT DISTINCT JSONExtractString(json_data, 'companyId') as cid FROM job_data.qcwy_company SAMPLE 0.1 PREWHERE updated_at > now() - INTERVAL {days_back_existing} DAY AND json_data != '' WHERE JSONExtractString(json_data, 'companyId') != '' LIMIT 2000 """ logger.warning(f"Memory error persists, using SAMPLE 0.1") else: # 最后一次尝试也失败,使用空集合继续执行(避免阻塞整个流程) logger.error(f"Failed to query existing companies after {attempt+1} attempts: {e}") logger.warning("Using empty set for existing_cids, continuing with collection...") existing_cids = set() break else: # 其他错误直接抛出 logger.error(f"Non-memory error while querying existing companies: {e}") raise pending_query = "SELECT DISTINCT company_id FROM job_data.pending_company WHERE source = 'qcwy'" pending_result = await client.query(pending_query) pending_cids = {row[0] for row in pending_result.result_rows if row[0]} # 构建排除列表 exclude_cids = existing_cids | pending_cids # 优化策略: # 1. 减少时间范围:从30天减少到7天,大幅减少扫描的数据量 # 2. 减少LIMIT:从limit*2减少到更小的值,减少内存占用 # 3. 使用更严格的PREWHERE条件:先过滤时间,再过滤空json_data和超大JSON # 4. 限制JSON大小:过滤掉过大的json_data(可能包含大量嵌套数据) # 5. 分批查询:如果limit较大,分批处理,每次查询更少的数据 days_back = 7 # 从30天减少到7天,减少扫描量 # 注意:不使用length(json_data)检查,因为它需要读取整个列来计算长度 query_limit = min(limit * 2, 100) # 限制最大查询数量,避免内存超限 # 分批查询策略:如果limit较大,分批处理 result = None for attempt in range(3): # 最多尝试3次 try: # 根据尝试次数调整参数 if attempt == 1: # 第一次失败后:减少时间范围到3天 days_back = 3 query_limit = min(query_limit, 50) logger.warning(f"Retry {attempt}: Reducing time range to {days_back} days and limit to {query_limit}") elif attempt == 2: # 第二次失败后:使用采样 query = f""" SELECT DISTINCT JSONExtractString(json_data, 'coId') as cid, JSONExtractString(json_data, 'companyName') as cname FROM job_data.qcwy_job SAMPLE 0.1 PREWHERE created_at > now() - INTERVAL {days_back} DAY AND json_data != '' WHERE JSONExtractString(json_data, 'coId') != '' LIMIT {query_limit} """ logger.warning(f"Retry {attempt}: Using SAMPLE 0.1 to reduce memory usage") result = await client.query(query) break # 正常查询或第一次重试 query = f""" SELECT DISTINCT JSONExtractString(json_data, 'coId') as cid, JSONExtractString(json_data, 'companyName') as cname FROM job_data.qcwy_job PREWHERE created_at > now() - INTERVAL {days_back} DAY AND json_data != '' WHERE JSONExtractString(json_data, 'coId') != '' LIMIT {query_limit} """ logger.info(f"Executing SQL for QCWY (limit={query_limit}, days={days_back}, attempt={attempt+1}): {query[:400]}...") result = await client.query(query) break except Exception as e: error_str = str(e).lower() # 如果查询失败(可能是内存超限),继续重试 if "memory" in error_str or "memory_limit" in error_str: if attempt < 2: logger.warning(f"Memory error on attempt {attempt+1}: {e}") continue else: # 最后一次尝试也失败,抛出异常 logger.error(f"Query failed after {attempt+1} attempts: {e}") raise else: # 其他错误直接抛出 logger.error(f"Query failed with non-memory error: {e}") raise if not result or not result.result_rows: logger.info("No QCWY companies found in query result.") return # 在 Python 中过滤掉已存在的和待处理的 rows: List[Dict[str, Any]] = [] for cid, cname in result.result_rows: if not cid or cid in exclude_cids: continue if len(rows) >= limit: break rows.append( { "source": "qcwy", "company_id": cid, "company_name": cname, "status": "pending", "created_at": datetime.now(), "updated_at": datetime.now(), } ) if rows: await self._insert_pending(client, rows) logger.info(f"Added {len(rows)} QCWY companies to pending.") else: logger.info("No new QCWY companies found after filtering.") async def _collect_boss(self, client, limit: int): logger.info("Collecting Boss companies...") # 优化:先获取已存在的公司ID,避免在子查询中读取json_data # 使用PREWHERE提前过滤时间范围,减少需要读取的数据量 # 检查90天内已处理的公司,避免重复请求 days_back_existing = 90 # 查询最近90天的数据,避免重复请求已处理过的公司 existing_companies_query = f""" SELECT DISTINCT JSONExtractString(json_data, 'brandId') as cid FROM job_data.boss_company PREWHERE updated_at > now() - INTERVAL {days_back_existing} DAY AND json_data != '' WHERE JSONExtractString(json_data, 'brandId') != '' LIMIT 50000 """ # 添加重试机制 existing_result = None existing_cids = set() # 默认使用空集合 for attempt in range(3): try: logger.info(f"Querying existing Boss companies (attempt {attempt+1})...") existing_result = await client.query(existing_companies_query) existing_cids = {row[0] for row in existing_result.result_rows if row[0]} break except Exception as e: error_str = str(e).lower() if "memory" in error_str or "memory_limit" in error_str: if attempt == 0: days_back_existing = 1 existing_companies_query = f""" SELECT DISTINCT JSONExtractString(json_data, 'brandId') as cid FROM job_data.boss_company PREWHERE updated_at > now() - INTERVAL {days_back_existing} DAY AND json_data != '' WHERE JSONExtractString(json_data, 'brandId') != '' """ logger.warning(f"Memory error, reducing time range to {days_back_existing} days") elif attempt == 1: existing_companies_query = f""" SELECT DISTINCT JSONExtractString(json_data, 'brandId') as cid FROM job_data.boss_company SAMPLE 0.1 PREWHERE updated_at > now() - INTERVAL {days_back_existing} DAY AND json_data != '' WHERE JSONExtractString(json_data, 'brandId') != '' """ logger.warning(f"Memory error persists, using SAMPLE 0.1") else: logger.error(f"Failed to query existing companies after {attempt+1} attempts: {e}") logger.warning("Using empty set for existing_cids, continuing with collection...") existing_cids = set() break else: logger.error(f"Non-memory error while querying existing companies: {e}") raise pending_query = "SELECT DISTINCT company_id FROM job_data.pending_company WHERE source = 'boss'" pending_result = await client.query(pending_query) pending_cids = {row[0] for row in pending_result.result_rows if row[0]} # 构建排除列表 exclude_cids = existing_cids | pending_cids # 优化:添加时间范围过滤,只查询最近30天的数据,减少扫描量 # 使用 PREWHERE 提前过滤时间范围,避免读取大量历史数据的 json_data # 增加 LIMIT 以便在 Python 中过滤后仍有足够的数据 query = f""" SELECT DISTINCT JSONExtractString(json_data, 'brandId') as cid, JSONExtractString(json_data, 'brandName') as cname FROM job_data.boss_job PREWHERE created_at > now() - INTERVAL 30 DAY WHERE json_data != '' AND JSONExtractString(json_data, 'brandId') != '' LIMIT {limit * 2} """ logger.info(f"Executing SQL for Boss (limit={limit * 2}): {query[:500]}...") result = await client.query(query) if not result.result_rows: return # 在 Python 中过滤掉已存在的和待处理的 rows: List[Dict[str, Any]] = [] for cid, cname in result.result_rows: if not cid or cid in exclude_cids: continue if len(rows) >= limit: break rows.append( { "source": "boss", "company_id": cid, "company_name": cname, "status": "pending", "created_at": datetime.now(), "updated_at": datetime.now(), } ) await self._insert_pending(client, rows) logger.info(f"Added {len(rows)} Boss companies to pending.") async def _insert_pending(self, client, rows: List[Dict[str, Any]]): if not rows: return data: List[List[Any]] = [] for r in rows: data.append( [ r["source"], r["company_id"], r["company_name"], r["status"], "", r["created_at"], r["updated_at"], 1, ] ) await client.insert( "job_data.pending_company", data, column_names=[ "source", "company_id", "company_name", "status", "error_msg", "created_at", "updated_at", "version", ], ) async def process_single_company( self, source: str, company_id: str, proxy: Optional[str] = None, max_delay_seconds: int = 5, ) -> Dict[str, Any]: client = await clickhouse_manager.get_client() if proxy: self._apply_proxy(proxy) delay = 0 if max_delay_seconds and max_delay_seconds > 0: delay = random.randint(1, max_delay_seconds) if delay > 0: await asyncio.sleep(delay) query = f""" SELECT source, company_id, company_name, version FROM job_data.pending_company FINAL WHERE source = '{source}' AND company_id = '{company_id}' ORDER BY version DESC LIMIT 1 """ result = await client.query(query) if result.result_rows: source_value, cid, cname, version = result.result_rows[0] else: source_value = source cid = company_id cname = "" version = 0 try: success = await self._fetch_and_save(source_value, cid) status = "done" if success else "failed" error_msg = "" if success else "Fetch failed" except Exception as e: logger.error(f"Error processing {source_value} {cid}: {e}") status = "failed" error_msg = str(e) await client.insert( "job_data.pending_company", [ [ source_value, cid, cname, status, error_msg.replace("'", "''"), datetime.now(), datetime.now(), int(version) + 1, ] ], column_names=[ "source", "company_id", "company_name", "status", "error_msg", "created_at", "updated_at", "version", ], ) return { "success": status == "done", "source": source_value, "company_id": cid, "company_name": cname, "status": status, "error_msg": error_msg, "version": int(version) + 1, } async def process_pending_companies( self, limit: int = 100, source: Optional[str] = None, proxy: Optional[str] = None, max_delay_seconds: int = 0, ): client = await clickhouse_manager.get_client() logger.info(f"Processing pending companies (limit={limit}, source={source or 'all'})...") if proxy: self._apply_proxy(proxy) where_clause = "WHERE status = 'pending'" if source: where_clause += f" AND source = '{source}'" query = f""" SELECT source, company_id, company_name, version FROM job_data.pending_company FINAL {where_clause} ORDER BY created_at ASC LIMIT {limit} """ result = await client.query(query) if not result.result_rows: logger.info("No pending companies to process.") return for source_value, cid, cname, version in result.result_rows: logger.info(f"Processing {source_value} company: {cname} ({cid})") try: if max_delay_seconds and max_delay_seconds > 0: delay = random.randint(1, max_delay_seconds) if delay > 0: await asyncio.sleep(delay) success = await self._fetch_and_save(source_value, cid) status = "done" if success else "failed" error_msg = "" if success else "Fetch failed" except Exception as e: logger.error(f"Error processing {source_value} {cid}: {e}") status = "failed" error_msg = str(e) await client.insert( "job_data.pending_company", [ [ source_value, cid, cname, status, error_msg.replace("'", "''"), datetime.now(), datetime.now(), int(version) + 1, ] ], column_names=[ "source", "company_id", "company_name", "status", "error_msg", "created_at", "updated_at", "version", ], ) async def _fetch_and_save(self, source: str, company_id: str) -> bool: data: Optional[Dict[str, Any]] = None target_table = "" if source == "zhilian": data = self.zhilian_service.get_company_detail(company_id) target_table = "zhilian_company" elif source == "qcwy": data = self.qcwy_service.get_company_info(company_id) target_table = "qcwy_company" elif source == "boss": await self._ensure_boss_token_loaded() data = self.boss_service.get_company_detail_by_id(company_id) target_table = "boss_company" if not data: logger.error(f"No data returned from source={source} company_id={company_id}") return False try: logger.info( f"Raw company data from source={source} company_id={company_id}: " f"{json.dumps(data, ensure_ascii=False)[:2000]}" ) except Exception as e: logger.error(f"Failed to log raw company data for source={source} company_id={company_id}: {e}") client = await clickhouse_manager.get_client() name = "" if source == "zhilian": name = data.get("companyBase", {}).get("companyName", "") elif source == "qcwy": name = data.get("companyName", "") elif source == "boss": name = data.get("name", "") json_str = json.dumps(data, ensure_ascii=False) await client.insert( f"job_data.{target_table}", [[0, json_str, name, datetime.now(), datetime.now()]], column_names=["id", "json_data", "company_name", "created_at", "updated_at"], ) return True async def cleanup_old_records(self): """ 清理已完成或失败的记录 (每日调用) """ client = await clickhouse_manager.get_client() logger.info("Starting cleanup of processed pending companies...") # ClickHouse mutations are async, but lightweight for this purpose query = "ALTER TABLE job_data.pending_company DELETE WHERE status IN ('done', 'failed')" try: await client.command(query) logger.info("Cleanup command executed successfully.") except Exception as e: logger.error(f"Cleanup failed: {e}") company_cleaner = CompanyCleaner()