import json from typing import Dict, Any, List, Optional from clickhouse_connect.driver import AsyncClient from app.log import logger from app.services.ingest.registry import get_config, list_configs, PlatformConfig from app.services.ingest.company_enrichment import enrich_company_desc from app.services.ingest.dedup import build_insert_row, batch_dedup_filter from app.services.ingest.remote_push import batch_push_to_remote class IngestService: """统一数据入库服务""" def __init__(self, clickhouse_client: AsyncClient): self.client = clickhouse_client async def store_batch( self, platform: str, channel: str, data_type: str, data_list: List[Dict[str, Any]], check_duplicate: bool = True, ) -> Dict[str, Any]: results: Dict[str, Any] = { "total": len(data_list), "success": 0, "failed": 0, "duplicate": 0, "errors": [], } if not data_list: return results config = get_config(platform, channel, data_type) table = f"job_data.{config.table}" # 准备所有行 all_columns: Optional[List[str]] = None all_values: List[List[Any]] = [] push_data_list: List[Dict[str, Any]] = [] for i, data in enumerate(data_list): try: columns, values = build_insert_row(config, data, channel) all_columns = all_columns or columns all_values.append(values) if config.push_mapper and data_type == "job": push_item = config.push_mapper(data) if push_item: push_data_list.append(push_item) except Exception as e: results["failed"] += 1 results["errors"].append({"index": i, "error": str(e)}) if not all_values or all_columns is None: return results # 批量去重 if check_duplicate: try: filtered, ignored = await batch_dedup_filter( self.client, config, data_list, all_columns, all_values, ) all_values = filtered results["duplicate"] = ignored except Exception as e: logger.error(f"批量去重失败: {e}") # 批量插入 if all_values: try: await self.client.insert(table, all_values, column_names=all_columns) results["success"] = len(all_values) except Exception as e: logger.error(f"批量插入失败: {e}") results["failed"] += len(all_values) results["errors"].append({"error": f"批量插入失败: {e}"}) # 异步远程推送(不影响主结果) if push_data_list: # 补全 company_desc(优先 MySQL,fallback 平台 API) try: await enrich_company_desc(platform, push_data_list) except Exception as e: logger.warning(f"company_desc 补全异常: {e}") try: await batch_push_to_remote(push_data_list) except Exception as e: logger.warning(f"批量推送失败: {e}") return results async def store_single( self, platform: str, channel: str, data_type: str, data: Dict[str, Any], check_duplicate: bool = True, ) -> Dict[str, Any]: return await self.store_batch(platform, channel, data_type, [data], check_duplicate) async def query_data( self, platform: str, channel: str, data_type: str, limit: int = 100, offset: int = 0, ) -> Dict[str, Any]: config = get_config(platform, channel, data_type) table = f"job_data.{config.table}" try: count_result = await self.client.query(f"SELECT count() FROM {table}") total = count_result.result_rows[0][0] if count_result.result_rows else 0 query = f"SELECT * FROM {table} ORDER BY created_at DESC LIMIT {limit} OFFSET {offset}" result = await self.client.query(query) data_rows = [] for row in result.result_rows: item = dict(zip(result.column_names, row)) if "json_data" in item and isinstance(item["json_data"], str): try: parsed = json.loads(item["json_data"]) if isinstance(parsed, dict): item.update(parsed) except Exception: pass data_rows.append(item) return {"success": True, "data": data_rows, "count": total, "table_name": config.table} except Exception as e: logger.error(f"查询失败: {e}") return {"success": False, "message": str(e), "error": str(e)} @staticmethod def get_registry_info() -> Dict[str, Any]: configs = list_configs() platforms = sorted({c.platform for c in configs}) channels = sorted({c.channel for c in configs}) data_types = sorted({c.data_type for c in configs}) entries = [ { "platform": c.platform, "channel": c.channel, "data_type": c.data_type, "table": c.table, "dedup_columns": c.dedup_columns, } for c in configs ] return { "platforms": platforms, "channels": channels, "data_types": data_types, "entries": entries, }