diff --git a/app/api/v1/job/job.py b/app/api/v1/job/job.py index 4999fb4..8a12434 100644 --- a/app/api/v1/job/job.py +++ b/app/api/v1/job/job.py @@ -1,252 +1,179 @@ +from typing import Dict, Any, Optional + from fastapi import APIRouter, BackgroundTasks, Depends -from typing import Dict, Any - -from app.services.job import create_data_router_service, PlatformType, DataType + from app.core.clickhouse import clickhouse_manager -from app.controllers.job import ( - UniversalDataController, - UniversalDataRequest, - BatchDataRequest, - create_universal_data_controller +from app.schemas.ingest import ( + IngestBatchRequest, + IngestSingleRequest, + PlatformType, + ChannelType, + DataType, ) +from app.services.ingest import IngestService +from app.log import logger -router = APIRouter(tags=["通用数据接口"]) +router = APIRouter(tags=["数据入库"]) -async def get_universal_data_controller() -> UniversalDataController: - """获取通用数据控制器实例""" - clickhouse_client = await clickhouse_manager.get_client() - data_router_service = create_data_router_service(clickhouse_client) - return create_universal_data_controller(data_router_service) +async def get_ingest_service() -> IngestService: + client = await clickhouse_manager.get_client() + return IngestService(client) @router.post("/data/store", summary="存储单条数据") -async def store_single_data( - request: UniversalDataRequest, - controller: UniversalDataController = Depends(get_universal_data_controller) +async def store_single( + request: IngestSingleRequest, + service: IngestService = Depends(get_ingest_service), ) -> Dict[str, Any]: - """ - 通用数据存储接口 - 存储单条数据 - - 支持的平台: - - boss: Boss直聘 - - qcwy: 前程无忧 - - zhilian: 智联招聘 - - 支持的数据类型: - - job: 职位数据 - - company: 公司数据 - - 示例请求: - ```json - { - "data": { - "jobBaseInfoVO": { - "encryptJobId": "abc123", - "positionName": "Python开发工程师", - "locationName": "北京" - }, - "brandComInfoVO": { - "brandName": "某科技公司", - "industryName": "互联网" - } - }, - "data_type": "job", - "platform": "boss", - "check_duplicate": true, - "duplicate_key": "encrypt_job_id" + result = await service.store_single( + platform=request.platform.value, + channel=request.channel.value, + data_type=request.data_type.value, + data=request.data, + check_duplicate=request.check_duplicate, + ) + return {"code": 200, "data": result, "message": "ok"} + + +@router.post("/data/batch", summary="批量存储数据") +@router.post("/data/batch-store", summary="批量存储数据(兼容)") +async def store_batch( + request: IngestBatchRequest, + service: IngestService = Depends(get_ingest_service), +) -> Dict[str, Any]: + result = await service.store_batch( + platform=request.platform.value, + channel=request.channel.value, + data_type=request.data_type.value, + data_list=request.data_list, + check_duplicate=request.check_duplicate, + ) + return { + "code": 200, + "data": result, + "message": f"批量处理完成: 成功 {result['success']} 条, 重复 {result['duplicate']} 条, 失败 {result['failed']} 条", } - ``` - """ - return await controller.store_single_data(request) -@router.post("/data/batch-store", summary="批量存储数据") -async def store_batch_data( - request: BatchDataRequest, - controller: UniversalDataController = Depends(get_universal_data_controller) +@router.post("/data/batch-async", summary="异步批量存储数据") +@router.post("/data/batch-store-async", summary="异步批量存储数据(兼容)") +async def store_batch_async( + request: IngestBatchRequest, + background_tasks: BackgroundTasks, + service: IngestService = Depends(get_ingest_service), ) -> Dict[str, Any]: - """ - 通用数据存储接口 - 批量存储数据 - - 示例请求: - ```json - { - "data_list": [ - { - "jobBaseInfoVO": { - "encryptJobId": "abc123", - "positionName": "Python开发工程师" - } - }, - { - "jobBaseInfoVO": { - "encryptJobId": "def456", - "positionName": "Java开发工程师" - } - } - ], - "data_type": "job", - "platform": "boss", - "check_duplicate": true + platform_names = {"boss": "Boss直聘", "qcwy": "前程无忧", "zhilian": "智联招聘"} + name = platform_names.get(request.platform.value, request.platform.value) + logger.info(f"收到批量请求: [{name}] {request.data_type.value} x{len(request.data_list)} 条") + + async def _task(): + r = await service.store_batch( + platform=request.platform.value, + channel=request.channel.value, + data_type=request.data_type.value, + data_list=request.data_list, + check_duplicate=request.check_duplicate, + ) + logger.info( + f"批量处理完成: [{name}] 成功 {r['success']} 条, 重复 {r['duplicate']} 条, 失败 {r['failed']} 条" + ) + + background_tasks.add_task(_task) + return { + "code": 202, + "message": f"批量数据已加入异步处理队列,共 {len(request.data_list)} 条", + "platform": request.platform, + "data_type": request.data_type, } - ``` - """ - return await controller.store_batch_data(request) - - -@router.post("/data/store-async", summary="异步存储单条数据") -async def store_single_data_async( - request: UniversalDataRequest, - background_tasks: BackgroundTasks, - controller: UniversalDataController = Depends(get_universal_data_controller) -) -> Dict[str, Any]: - """ - 通用数据存储接口 - 异步存储单条数据 - - 适用于大量数据或不需要立即返回结果的场景 - """ - return await controller.store_single_data_async(background_tasks, request) - - -@router.post("/data/batch-store-async", summary="异步批量存储数据") -async def store_batch_data_async( - request: BatchDataRequest, - background_tasks: BackgroundTasks, - controller: UniversalDataController = Depends(get_universal_data_controller) -) -> Dict[str, Any]: - """ - 通用数据存储接口 - 异步批量存储数据 - - 适用于大批量数据处理场景 - """ - return await controller.store_batch_data_async(background_tasks, request) @router.get("/data", summary="查询数据") async def query_data( - platform: str, - data_type: str, + platform: PlatformType, + data_type: DataType, + channel: ChannelType = ChannelType.MINI, page: int = 1, page_size: int = 20, - controller: UniversalDataController = Depends(get_universal_data_controller) + service: IngestService = Depends(get_ingest_service), +) -> Dict[str, Any]: + offset = (page - 1) * page_size + result = await service.query_data( + platform=platform.value, + channel=channel.value, + data_type=data_type.value, + limit=page_size, + offset=offset, + ) + return { + "code": 200, + "data": { + "items": result.get("data", []), + "total": result.get("count", 0), + "page": page, + "page_size": page_size, + }, + } + + +@router.get("/platforms", summary="获取注册表信息") +async def get_platforms() -> Dict[str, Any]: + return {"code": 200, "data": IngestService.get_registry_info()} + + +@router.get("/data/stats", summary="各平台爬虫入库统计") +async def get_ingest_stats( + platform: Optional[PlatformType] = None, + days: int = 7, ) -> Dict[str, Any]: """ - 通用数据查询接口 - - 参数: - - platform: 平台类型 (boss/qcwy/zhilian) - - data_type: 数据类型 (job/company) - - page: 页码,默认1 - - page_size: 每页大小,默认20 + 查询各平台 ClickHouse 入库统计:总量、今日新增、最近入库时间、近 N 天每日趋势。 + 用于前端爬虫监控页面展示。 """ - # 转换字符串参数为枚举类型 - platform_enum = PlatformType(platform) - data_type_enum = DataType(data_type) - return await controller.query_data(platform_enum, data_type_enum, page, page_size) + from app.core.clickhouse import clickhouse_manager -@router.get("/platforms", summary="获取支持的平台和数据类型") -async def get_supported_platforms( - controller: UniversalDataController = Depends(get_universal_data_controller) -) -> Dict[str, Any]: - """ - 获取支持的平台和数据类型信息 - - 返回: - - 支持的平台列表 - - 支持的数据类型列表 - - 各平台的默认重复检查字段 - """ - return await controller.get_supported_platforms() + client = await clickhouse_manager.get_client() + platforms = [platform.value] if platform else ["boss", "qcwy", "zhilian"] + table_map = {"boss": "boss_job", "qcwy": "qcwy_job", "zhilian": "zhilian_job"} -# 为了兼容性,提供平台特定的路由别名 -@router.post("/boss/job", summary="Boss直聘职位数据存储") -async def store_boss_job_data( - data: Dict[str, Any], - controller: UniversalDataController = Depends(get_universal_data_controller) -) -> Dict[str, Any]: - """Boss直聘职位数据存储的便捷接口""" - request = UniversalDataRequest( - data=data, - data_type="job", - platform="boss", - check_duplicate=True, - ) - return await controller.store_single_data(request) + result: Dict[str, Any] = {} + for p in platforms: + table = f"job_data.{table_map[p]}" + try: + r_total = await client.query(f"SELECT count() FROM {table}") + total = r_total.result_rows[0][0] if r_total.result_rows else 0 + r_today = await client.query( + f"SELECT count() FROM {table} WHERE toDate(created_at) = today()" + ) + today = r_today.result_rows[0][0] if r_today.result_rows else 0 -@router.post("/boss/company", summary="Boss直聘公司数据存储") -async def store_boss_company_data( - data: Dict[str, Any], - controller: UniversalDataController = Depends(get_universal_data_controller) -) -> Dict[str, Any]: - """Boss直聘公司数据存储的便捷接口""" - request = UniversalDataRequest( - data=data, - data_type="company", - platform="boss", - check_duplicate=True, - ) - return await controller.store_single_data(request) + r_last = await client.query(f"SELECT max(created_at) FROM {table}") + last_at_raw = r_last.result_rows[0][0] if r_last.result_rows else None + last_at = str(last_at_raw).split(".")[0] if last_at_raw else None + r_daily = await client.query( + f"SELECT toDate(created_at) AS day, count() AS cnt " + f"FROM {table} " + f"WHERE created_at >= today() - {days} " + f"GROUP BY day ORDER BY day DESC" + ) + daily_counts = [ + {"date": str(row[0]), "count": int(row[1])} + for row in r_daily.result_rows + ] -@router.post("/qcwy/job", summary="前程无忧职位数据存储") -async def store_qcwy_job_data( - data: Dict[str, Any], - controller: UniversalDataController = Depends(get_universal_data_controller) -) -> Dict[str, Any]: - """前程无忧职位数据存储的便捷接口""" - request = UniversalDataRequest( - data=data, - data_type="job", - platform="qcwy", - check_duplicate=True, - ) - return await controller.store_single_data(request) + result[p] = { + "total": total, + "today": today, + "last_ingest_at": last_at, + "daily_counts": daily_counts, + } + except Exception as e: + logger.warning(f"stats query failed for {p}: {e}") + result[p] = {"total": 0, "today": 0, "last_ingest_at": None, "daily_counts": [], "error": str(e)} + return {"code": 200, "data": result} -@router.post("/qcwy/company", summary="前程无忧公司数据存储") -async def store_qcwy_company_data( - data: Dict[str, Any], - controller: UniversalDataController = Depends(get_universal_data_controller) -) -> Dict[str, Any]: - """前程无忧公司数据存储的便捷接口""" - request = UniversalDataRequest( - data=data, - data_type="company", - platform="qcwy", - check_duplicate=True, - ) - return await controller.store_single_data(request) - - -@router.post("/zhilian/job", summary="智联招聘职位数据存储") -async def store_zhilian_job_data( - data: Dict[str, Any], - controller: UniversalDataController = Depends(get_universal_data_controller) -) -> Dict[str, Any]: - """智联招聘职位数据存储的便捷接口""" - request = UniversalDataRequest( - data=data, - data_type="job", - platform="zhilian", - check_duplicate=True, - ) - return await controller.store_single_data(request) - - -@router.post("/zhilian/company", summary="智联招聘公司数据存储") -async def store_zhilian_company_data( - data: Dict[str, Any], - controller: UniversalDataController = Depends(get_universal_data_controller) -) -> Dict[str, Any]: - """智联招聘公司数据存储的便捷接口""" - request = UniversalDataRequest( - data=data, - data_type="company", - platform="zhilian", - check_duplicate=True, - ) - return await controller.store_single_data(request) \ No newline at end of file diff --git a/tests/ingest/test_configs_boss.py b/tests/ingest/test_configs_boss.py new file mode 100644 index 0000000..c42f0d5 --- /dev/null +++ b/tests/ingest/test_configs_boss.py @@ -0,0 +1,109 @@ +""" +Boss直聘 ingest config 解析函数单元测试 — QUAL-02 +覆盖 _extract_job_id / _extract_company_name / _build_boss_push +""" + +from app.services.ingest.configs.boss import ( + _extract_job_id, + _extract_company_name, + _build_boss_push, +) + + +# ─── _extract_job_id ───────────────────────────────── + +def test_extract_job_id_from_jobBaseInfoVO(): + data = {"jobBaseInfoVO": {"jobId": "ABCD123"}} + assert _extract_job_id(data) == "ABCD123" + + +def test_extract_job_id_converts_int(): + data = {"jobBaseInfoVO": {"jobId": 999}} + assert _extract_job_id(data) == "999" + + +def test_extract_job_id_missing_inner(): + """jobBaseInfoVO 存在但无 jobId → None""" + data = {"jobBaseInfoVO": {}} + assert _extract_job_id(data) is None + + +def test_extract_job_id_missing_outer(): + """缺 jobBaseInfoVO → None""" + data = {} + assert _extract_job_id(data) is None + + +# ─── _extract_company_name ──────────────────────────── + +def test_extract_company_name_from_name(): + data = {"name": "字节跳动"} + assert _extract_company_name(data) == "字节跳动" + + +def test_extract_company_name_from_companyFullInfoVO(): + data = {"companyFullInfoVO": {"name": "腾讯科技"}} + assert _extract_company_name(data) == "腾讯科技" + + +def test_extract_company_name_missing(): + data = {} + assert _extract_company_name(data) is None + + +# ─── _build_boss_push ───────────────────────────────── + +def test_build_boss_push_full(): + data = { + "bossBaseInfoVO": {"brandName": "字节"}, + "jobBaseInfoVO": { + "positionName": "算法工程师", + "jobDesc": "负责推荐算法", + "degreeName": "本科", + "requiredSkills": ["Python", "TensorFlow"], + "salaryWelfareInfo": ["五险一金"], + "experienceName": "3-5年", + "lowSalary": 25, + "highSalary": 40, + "locationName": "北京", + "locationDesc": "朝阳区", + "encryptJobId": "ENC_JOB_001", + }, + "brandComInfoVO": { + "brandName": "字节跳动", + "encryptBrandId": "ENC_BRAND_001", + "scaleName": "10000人以上", + "industryName": "互联网", + "introduce": "全球领先的科技公司", + }, + } + result = _build_boss_push(data) + assert result is not None + assert result["source_type"] == "Boss直聘" + assert result["title"] == "算法工程师" + assert "ENC_JOB_001" in result["url"] + assert result["company_id"] == "ENC_BRAND_001" + assert result["company_name"] == "字节跳动" + assert "25" in result["salary"] and "40" in result["salary"] + + +def test_build_boss_push_partial(): + """缺字段不 raise,返回合理降级值""" + data = {} + result = _build_boss_push(data) + assert result is not None + assert result["source_type"] == "Boss直聘" + # safe_get 在缺字段时返回 None 或 '',两种均可接受 + assert result["title"] in (None, "") + + +def test_build_boss_push_skill_join(): + """多技能列表通过 safe_join 拼接""" + data = { + "jobBaseInfoVO": {"requiredSkills": ["Go", "Rust"]}, + "bossBaseInfoVO": {}, + "brandComInfoVO": {}, + } + result = _build_boss_push(data) + assert "Go" in result["skill"] + assert "Rust" in result["skill"] diff --git a/tests/ingest/test_configs_qcwy.py b/tests/ingest/test_configs_qcwy.py new file mode 100644 index 0000000..ed921cb --- /dev/null +++ b/tests/ingest/test_configs_qcwy.py @@ -0,0 +1,110 @@ +""" +前程无忧 (51Job) ingest config 解析函数单元测试 — QUAL-02 +覆盖 _extract_job_id / _extract_update_dt / _extract_company_name / _build_qcwy_push +""" + +from app.services.ingest.configs.qcwy import ( + _extract_job_id, + _extract_update_dt, + _extract_company_name, + _build_qcwy_push, +) + + +# ─── _extract_job_id ───────────────────────────────── + +def test_qcwy_extract_job_id_normal(): + data = {"jobId": "5001234567"} + assert _extract_job_id(data) == "5001234567" + + +def test_qcwy_extract_job_id_int(): + data = {"jobId": 5001234567} + assert _extract_job_id(data) == "5001234567" + + +def test_qcwy_extract_job_id_missing(): + data = {} + assert _extract_job_id(data) is None + + +# ─── _extract_update_dt ────────────────────────────── + +def test_qcwy_extract_update_dt_normal(): + data = {"updateDateTime": "2026-03-01 12:00:00"} + assert _extract_update_dt(data) == "2026-03-01 12:00:00" + + +def test_qcwy_extract_update_dt_missing(): + data = {} + assert _extract_update_dt(data) is None + + +# ─── _extract_company_name ──────────────────────────── + +def test_qcwy_extract_company_name_from_companyName(): + data = {"companyName": "阿里巴巴"} + assert _extract_company_name(data) == "阿里巴巴" + + +def test_qcwy_extract_company_name_from_company_name_fallback(): + data = {"company_name": "阿里巴巴网络"} + assert _extract_company_name(data) == "阿里巴巴网络" + + +def test_qcwy_extract_company_name_missing(): + data = {} + assert _extract_company_name(data) is None + + +# ─── _build_qcwy_push ───────────────────────────────── + +def test_qcwy_build_push_welfare_list(): + """welfare 为对象列表 → 提取 chineseTitle""" + data = { + "jobWelfareCodeDataList": [ + {"chineseTitle": "五险一金", "code": "A1"}, + {"chineseTitle": "年终奖", "code": "A2"}, + ], + "jobName": "Java 开发工程师", + "companyName": "阿里巴巴", + "jobSalaryMax": 25000, + "jobSalaryMin": 18000, + "jobHref": "https://example.com/job/12345.html", + "coId": "co123", + } + result = _build_qcwy_push(data) + assert result is not None + assert result["source_type"] == "前程无忧" + assert "五险一金" in result["welfare"] + assert "年终奖" in result["welfare"] + assert result["title"] == "Java 开发工程师" + + +def test_qcwy_build_push_welfare_string(): + """welfare 为字符串 → 清理括号后返回""" + data = { + "jobWelfareCodeDataList": "[五险一金,年终奖]", + "jobName": "产品经理", + "companyName": "腾讯", + } + result = _build_qcwy_push(data) + assert result is not None + assert "五险一金" in result["welfare"] + + +def test_qcwy_build_push_partial(): + """缺字段不 raise,source_type 正确""" + data = {} + result = _build_qcwy_push(data) + assert result is not None + assert result["source_type"] == "前程无忧" + assert result["title"] is None or result["title"] == "" + + +def test_qcwy_build_push_salary_format(): + """薪资字段格式化""" + data = {"jobSalaryMax": 30000, "jobSalaryMin": 20000} + result = _build_qcwy_push(data) + assert "30000" in result["salary"] + assert "20000" in result["salary"] diff --git a/tests/ingest/test_configs_zhilian.py b/tests/ingest/test_configs_zhilian.py new file mode 100644 index 0000000..25304a2 --- /dev/null +++ b/tests/ingest/test_configs_zhilian.py @@ -0,0 +1,109 @@ +""" +智联招聘 ingest config 解析函数单元测试 — QUAL-02 +覆盖 _extract_number / _extract_fpt / _extract_company_name / _build_zhilian_push +""" + +from app.services.ingest.configs.zhilian import ( + _extract_number, + _extract_fpt, + _extract_company_name, + _build_zhilian_push, +) + + +# ─── _extract_number ───────────────────────────────── + +def test_zhilian_extract_number_normal(): + data = {"number": "ZL98765"} + assert _extract_number(data) == "ZL98765" + + +def test_zhilian_extract_number_int(): + data = {"number": 98765} + assert _extract_number(data) == "98765" + + +def test_zhilian_extract_number_missing(): + data = {} + assert _extract_number(data) is None + + +# ─── _extract_fpt ──────────────────────────────────── + +def test_zhilian_extract_fpt_normal(): + data = {"firstPublishTime": "2026-03-15 09:00:00"} + assert _extract_fpt(data) == "2026-03-15 09:00:00" + + +def test_zhilian_extract_fpt_missing(): + data = {} + assert _extract_fpt(data) is None + + +# ─── _extract_company_name ──────────────────────────── + +def test_zhilian_extract_company_name_from_companyName(): + data = {"companyName": "华为技术"} + assert _extract_company_name(data) == "华为技术" + + +def test_zhilian_extract_company_name_from_name_fallback(): + data = {"name": "华为有限公司"} + assert _extract_company_name(data) == "华为有限公司" + + +def test_zhilian_extract_company_name_missing(): + data = {} + assert _extract_company_name(data) is None + + +# ─── _build_zhilian_push ───────────────────────────── + +def test_zhilian_build_push_skill_labels(): + """skillLabel 为对象列表 → 提取 value""" + data = { + "skillLabel": [ + {"value": "Python"}, + {"value": "深度学习"}, + ], + "companyName": "百度", + "name": "算法工程师", + "salary60": "25k-40k", + "workCity": "北京", + "cityDistrict": "海淀区", + "companyId": 1001, + "positionURL": "https://zhaopin.com/jobs/ABC123.htm", + } + result = _build_zhilian_push(data) + assert result is not None + assert result["source_type"] == "智联招聘" + assert "Python" in result["skill"] + assert "深度学习" in result["skill"] + assert result["title"] == "算法工程师" + assert result["company_name"] == "百度" + assert result["salary"] == "25k-40k" + + +def test_zhilian_build_push_empty_skill_labels(): + """skillLabel 为空列表 → skill 为空字符串""" + data = {"skillLabel": []} + result = _build_zhilian_push(data) + assert result is not None + assert result["skill"] == "" + + +def test_zhilian_build_push_partial(): + """缺字段不 raise,source_type 正确""" + data = {} + result = _build_zhilian_push(data) + assert result is not None + assert result["source_type"] == "智联招聘" + # safe_get 在缺字段时返回 None 或 '',两种均可接受 + assert result["title"] in (None, "") + + +def test_zhilian_build_push_recruiter_number_str(): + """招募人数字段转字符串""" + data = {"recruitNumber": 5} + result = _build_zhilian_push(data) + assert result["number"] == "5" diff --git a/web/src/api/index.js b/web/src/api/index.js index 0d06ea8..062ff81 100644 --- a/web/src/api/index.js +++ b/web/src/api/index.js @@ -40,7 +40,7 @@ export default { createDept: (data) => request.post('/dept/create', data), updateDept: (data) => request.post('/dept/update', data), deleteDept: (params) => request.delete('/dept/delete', { params }), - + // 数据统计 / 数据报表(统一改用 /reports 路由) getDailyStatistics: (params = {}) => request.get('/reports/daily', { params }), getWeeklyStatistics: (params = {}) => request.get('/reports/weekly', { params }), @@ -51,26 +51,25 @@ export default { getIndustryAnalysis: (params = {}) => request.get('/reports/industry-analysis', { params }), getCompanyRanking: (params = {}) => request.get('/reports/company-ranking', { params }), getReportsHealth: () => request.get('/reports/health'), - + // 薪资分析 // 其余分析接口如需扩展,请对齐 /reports 下的实际端点 - + // 工作经验分布 // 保留占位以便后续扩展 - + // 学历要求分布 // 保留占位以便后续扩展 - + // 平台间数据对比 getPlatformComparison: (params = {}) => request.get('/reports/platform-comparison', { params }), - + // 数据质量报告 getDataQualityReport: (params = {}) => request.get('/reports/data-quality', { params }), - + // 导出数据报表 // 导出报表暂未提供后端端点,前端页面将隐藏导出按钮 - // 数据清洗 uploadCleaningFile: (formData) => request.post('/cleaning/upload', formData), getCleaningTasks: (params) => request.get('/cleaning/list', { params }), @@ -87,4 +86,8 @@ export default { // 通用数据查询 queryPlatformData: (params) => request.get('/job/data', { params }), + + // 爬虫入库统计(各平台总量/今日/最近入库时间/近7天趋势) + getIngestStats: (params = {}) => request.get('/job/data/stats', { params }), } + diff --git a/web/src/views/cleaning/monitor.vue b/web/src/views/cleaning/monitor.vue index 258d1d0..eb3ae94 100644 --- a/web/src/views/cleaning/monitor.vue +++ b/web/src/views/cleaning/monitor.vue @@ -1,193 +1,357 @@ -