import json from datetime import datetime from typing import Dict, Any, List, Tuple from clickhouse_connect.driver import AsyncClient from app.log import logger from app.services.ingest.registry import PlatformConfig def build_insert_row( config: PlatformConfig, data: Dict[str, Any], channel: str, ) -> Tuple[List[str], List[Any]]: """构建 ClickHouse 插入行(含 channel 列)""" now = datetime.now() columns = ["id", "json_data", "channel", "created_at", "updated_at"] values: List[Any] = [0, json.dumps(data, ensure_ascii=False), channel, now, now] for spec in config.dedup_fields: extracted = spec.extractor(data) columns.append(spec.column) values.append(str(extracted) if extracted else "") return columns, values async def batch_dedup_filter( client: AsyncClient, config: PlatformConfig, rows: List[Dict[str, Any]], all_columns: List[str], all_values: List[List[Any]], ) -> Tuple[List[List[Any]], int]: """批量去重过滤,返回 (过滤后的 values 列表, 被忽略数量)""" dedup_cols = config.dedup_columns if not dedup_cols or not all_values: return all_values, 0 table = f"job_data.{config.table}" # 建立 column name -> index 映射 col_idx = {name: i for i, name in enumerate(all_columns)} if len(dedup_cols) == 1: key_col = dedup_cols[0] idx = col_idx[key_col] candidate_keys = list({str(row[idx]) for row in all_values if row[idx]}) if not candidate_keys: return all_values, 0 query = ( f"SELECT {key_col} FROM {table} " f"WHERE {key_col} IN {{keys:Array(String)}} " f"AND created_at > now() - INTERVAL 30 DAY" ) existing = await client.query(query, parameters={"keys": candidate_keys}) existing_set = {str(r[0]) for r in existing.result_rows} filtered = [row for row in all_values if str(row[idx]) not in existing_set] return filtered, len(all_values) - len(filtered) if len(dedup_cols) == 2: c1, c2 = dedup_cols idx1, idx2 = col_idx[c1], col_idx[c2] candidate_c1 = list({str(row[idx1]) for row in all_values if row[idx1]}) if not candidate_c1: return all_values, 0 query = ( f"SELECT {c1}, {c2} FROM {table} " f"WHERE {c1} IN {{keys:Array(String)}} " f"AND created_at > now() - INTERVAL 30 DAY" ) existing = await client.query(query, parameters={"keys": candidate_c1}) existing_map: Dict[str, set] = {} for r in existing.result_rows: existing_map.setdefault(str(r[0]), set()).add(str(r[1])) filtered = [ row for row in all_values if str(row[idx1]) not in existing_map or str(row[idx2]) not in existing_map.get(str(row[idx1]), set()) ] return filtered, len(all_values) - len(filtered) # 不支持 3+ 列去重,直接返回 logger.warning(f"不支持 {len(dedup_cols)} 列去重,跳过过滤") return all_values, 0