from datetime import datetime from typing import Optional, Dict, Any, List from clickhouse_connect.driver import AsyncClient from app.repositories.clickhouse_repo import JobAnalyticsRepo class AnalyticsService: """数据分析服务""" def __init__(self, clickhouse_client: AsyncClient): self.job_repo = JobAnalyticsRepo(clickhouse_client) async def get_job_statistics( self, filters: Optional[Dict[str, Any]] = None, from_dt: Optional[datetime] = None, to_dt: Optional[datetime] = None, ) -> Dict[str, Any]: """获取职位统计信息(仅返回总量)""" total_jobs = await self.job_repo.get_job_count( filters=filters, from_dt=from_dt, to_dt=to_dt ) return { "total_jobs": total_jobs, "period": { "from_date": from_dt.isoformat() if from_dt else None, "to_date": to_dt.isoformat() if to_dt else None } } async def get_volume_trend( self, interval: str = "day", filters: Optional[Dict[str, Any]] = None, from_dt: Optional[datetime] = None, to_dt: Optional[datetime] = None, ) -> List[Dict[str, Any]]: """获取数据量趋势""" return await self.job_repo.get_volume_trend( interval=interval, filters=filters, from_dt=from_dt, to_dt=to_dt ) async def get_source_distribution( self, filters: Optional[Dict[str, Any]] = None, from_dt: Optional[datetime] = None, to_dt: Optional[datetime] = None, ) -> List[Dict[str, Any]]: """获取数据来源分布""" return await self.job_repo.get_source_distribution( filters=filters, from_dt=from_dt, to_dt=to_dt )