diff --git a/.planning/phases/05-data-pipeline/05-01-PLAN.md b/.planning/phases/05-data-pipeline/05-01-PLAN.md new file mode 100644 index 0000000..d7cbe2e --- /dev/null +++ b/.planning/phases/05-data-pipeline/05-01-PLAN.md @@ -0,0 +1,110 @@ +--- +phase: 5 +plan: 1 +wave: 1 +title: "30 天窗口去重修复(DATA-01)" +depends_on: [] +files_modified: + - app/services/ingest/dedup.py + - tests/ingest/test_dedup.py # NEW +autonomous: true +requirements: + - DATA-01 +--- + +# Phase 5 Plan 01: 30 天窗口去重修复(DATA-01) + +## Objective + +`dedup.py` 的 `batch_dedup_filter()` 目前查全量历史数据去重,导致同一职位超过 30 天后无法重新入库。 +本 Plan 在去重查询 SQL 中加入 30 天时间窗口。 + +## Must Haves + +- [ ] 单字段 dedup SQL 加 `AND created_at > now() - INTERVAL 30 DAY` +- [ ] 双字段 dedup SQL 同样加 30 天窗口 +- [ ] 新增 Mock 测试(用 `AsyncMock` 模拟 ClickHouse 客户端)覆盖: + - 30 天内有记录 → 视为重复,过滤 + - 30 天外有记录 → 不重复,允许入库 + - 无记录 → 允许入库 +- [ ] `pipenv run python -m pytest tests/ingest/test_dedup.py -v` 全部通过 +- [ ] 全量回归 `pytest tests/ -v` 无失败 + +--- + +## Wave 1 + +### Task 1.1: 修改 dedup.py + + +- `app/services/ingest/dedup.py`(当前 81 行) + + + +修改 `batch_dedup_filter()` 中的两个 SQL 查询: + +**单字段去重(第 51 行附近):** +```python +# 修改前 +query = f"SELECT {key_col} FROM {table} WHERE {key_col} IN {{keys:Array(String)}}" + +# 修改后 +query = ( + f"SELECT {key_col} FROM {table} " + f"WHERE {key_col} IN {{keys:Array(String)}} " + f"AND created_at > now() - INTERVAL 30 DAY" +) +``` + +**双字段去重(第 65 行附近):** +```python +# 修改前 +query = f"SELECT {c1}, {c2} FROM {table} WHERE {c1} IN {{keys:Array(String)}}" + +# 修改后 +query = ( + f"SELECT {c1}, {c2} FROM {table} " + f"WHERE {c1} IN {{keys:Array(String)}} " + f"AND created_at > now() - INTERVAL 30 DAY" +) +``` + + + +- `grep "INTERVAL 30 DAY" app/services/ingest/dedup.py` 有两处输出 +- `grep "INTERVAL 30 DAY" app/services/ingest/dedup.py | wc -l` 输出为 2 + + +--- + +### Task 1.2: 新增 Mock 测试 + + +创建 `tests/ingest/__init__.py`(空文件)和 `tests/ingest/test_dedup.py`: + +测试覆盖: +1. `test_single_field_dedup_within_30_days`:30 天内有相同 job_id → 过滤(duplicate=1) +2. `test_single_field_dedup_outside_30_days`:先不存在 → 允许入库(duplicate=0)(模拟 30 天外无记录) +3. `test_double_field_dedup_within_30_days`:双字段在 30 天内有记录 → 过滤 +4. `test_dedup_empty_input`:空输入 → 直接返回,不查 ClickHouse +5. `test_dedup_no_dedup_columns`:无 dedup 字段 → 跳过过滤 +6. `test_build_insert_row_has_channel`:验证 build_insert_row 生成的行包含 channel 列 + +SQL 验证:用 `mock_client.query.call_args[0][0]` 断言 SQL 中包含 `INTERVAL 30 DAY` + + +--- + +## Verification + +```bash +# 1. 验证 SQL 变更 +grep "INTERVAL 30 DAY" app/services/ingest/dedup.py +# 预期输出:2 行 + +# 2. 运行新增测试 +pipenv run python -m pytest tests/ingest/test_dedup.py -v + +# 3. 全量回归 +pipenv run python -m pytest tests/ -v --tb=short +``` diff --git a/.planning/phases/05-data-pipeline/05-02-PLAN.md b/.planning/phases/05-data-pipeline/05-02-PLAN.md new file mode 100644 index 0000000..3d2ae99 --- /dev/null +++ b/.planning/phases/05-data-pipeline/05-02-PLAN.md @@ -0,0 +1,155 @@ +--- +phase: 5 +plan: 2 +wave: 2 +title: "公司招聘信息 channel 区分(DATA-04)+ 推送 API 确认(DATA-02)" +depends_on: + - "05-01-PLAN.md" +files_modified: + - app/services/ingest/configs/boss.py + - app/services/ingest/configs/qcwy.py + - app/services/ingest/configs/zhilian.py + - app/schemas/ingest.py + - app/services/company_jobs_sync.py +autonomous: true +requirements: + - DATA-02 + - DATA-04 +--- + +# Phase 5 Plan 02: company_job channel 区分(DATA-04 + DATA-02 确认) + +## Objective + +### DATA-02(确认已完成) + +`app/api/v1/job/job.py` 已有三个推送接收端点: +- `POST /data/store`(单条) +- `POST /data/batch`(批量同步) +- `POST /data/batch-async`(批量异步) + +全部走 `IngestService`,`channel` 和 `platform` 由调用方在请求体中指定。 +**spiderJobs 只需调用这些 API,DATA-02 不需要后端改动。** + +### DATA-04(需要修复) + +`company_jobs_sync.py` 目前调用 `router.store_batch(source, "mini", "job", jobs)`, +公司职位和搜索职位混用同一个 `channel="mini"`,无法区分来源。 + +**修复:** +1. `ChannelType` 枚举加 `COMPANY = "company"` 值 +2. 各平台 `ingest/configs/*.py` 注册 `channel="company"` 的配置 +3. `company_jobs_sync.py` 调用改为 `channel="company"` + +## Must Haves + +- [ ] `app/schemas/ingest.py` 中 `ChannelType` 枚举加 `COMPANY = "company"` +- [ ] `ingest/configs/boss.py` 注册 `channel="company", data_type="job"` 配置(共用 boss_job 表) +- [ ] `ingest/configs/qcwy.py` 同样注册 `channel="company"` 配置 +- [ ] `ingest/configs/zhilian.py` 同样注册 `channel="company"` 配置 +- [ ] `company_jobs_sync.py` 中 `store_batch(source, "mini", "job", jobs)` → `store_batch(source, "company", "job", jobs)` +- [ ] `pipenv run python -c "from app.services.ingest import IngestService; print(IngestService.get_registry_info())"` 显示 channel=company 条目 +- [ ] 全量回归 `pytest tests/` 无失败 + +--- + +## Wave 2(依赖 Plan 01 完成) + +### Task 2.1: 更新 ChannelType 枚举 + + +- `app/schemas/ingest.py`(需读取确认当前 ChannelType 定义) + + + +在 `ChannelType` 枚举中添加: +```python +COMPANY = "company" +``` + + +--- + +### Task 2.2: 各平台 configs 注册 channel="company" 配置 + + +- `app/services/ingest/configs/boss.py`(当前有 mini+job、mini+company 两条注册) +- `app/services/ingest/configs/qcwy.py` +- `app/services/ingest/configs/zhilian.py` + + + +在三个 configs 文件中各添加一条新注册: + +**boss.py 追加:** +```python +register(PlatformConfig( + platform="boss", channel="company", data_type="job", + table="boss_job", # 共用同一张表,通过 channel 列区分来源 + dedup_fields=(DedupFieldSpec(column="job_id", extractor=_extract_job_id),), +)) +``` + +**qcwy.py 追加:** +```python +register(PlatformConfig( + platform="qcwy", channel="company", data_type="job", + table="qcwy_job", + dedup_fields=( + DedupFieldSpec(column="job_id", extractor=_extract_job_id), + DedupFieldSpec(column="update_date_time", extractor=_extract_update_dt), + ), +)) +``` + +**zhilian.py 追加:**(需读取 zhilian.py 确认 extractor 函数名) +```python +register(PlatformConfig( + platform="zhilian", channel="company", data_type="job", + table="zhilian_job", + dedup_fields=(DedupFieldSpec(column="job_number", extractor=_extract_job_number),), +)) +``` + + +--- + +### Task 2.3: 修改 company_jobs_sync.py + + +将 `sync_company_jobs()` 中: +```python +# 修改前 +store_result = await router.store_batch(source, "mini", "job", jobs) + +# 修改后 +store_result = await router.store_batch(source, "company", "job", jobs) +``` + + + +- `grep "company.*job" app/services/ingest/configs/boss.py` 有输出 +- `grep '"company"' app/services/company_jobs_sync.py` 有输出 +- `grep '"mini"' app/services/company_jobs_sync.py` 无输出(全部替换) + + +--- + +## Verification + +```bash +# 1. 确认 channel=company 注册存在 +pipenv run python -c " +from app.services.ingest.configs import boss, qcwy, zhilian # 触发 register() +from app.services.ingest.registry import list_configs +company_configs = [c for c in list_configs() if c.channel == 'company'] +print(f'channel=company 配置数: {len(company_configs)}') +for c in company_configs: + print(f' {c.platform} / {c.channel} / {c.data_type} → {c.table}') +assert len(company_configs) >= 3, '❌ 缺少注册配置' +print('✅ 注册配置完整') +" + +# 2. 全量回归 +pipenv run python -m pytest tests/ -v --tb=short +``` diff --git a/.planning/phases/05-data-pipeline/05-RESEARCH.md b/.planning/phases/05-data-pipeline/05-RESEARCH.md new file mode 100644 index 0000000..ef3f750 --- /dev/null +++ b/.planning/phases/05-data-pipeline/05-RESEARCH.md @@ -0,0 +1,105 @@ +# Phase 5: 数据管道优化 — 技术研究 + +**研究日期:** 2026-03-21 +**阶段目标:** 入库去重可靠(30天窗口)、公司清洗流程顺畅、公司招聘信息写入 ClickHouse + +--- + +## 1. 现状分析 + +### 1.1 DATA-01:30 天窗口去重 + +**位置:** `app/services/ingest/dedup.py` — `batch_dedup_filter()` + +**现状(有缺口):** + +```sql +-- 当前去重 SQL(无时间窗口) +SELECT job_id FROM job_data.boss_job WHERE job_id IN {keys} +``` + +**缺口:** 查的是全量历史数据,同一职位 30 天内只入库一次的语义没有实现。 +理论上 ClickHouse 表可能有几年的旧数据,老 job_id 永远不会重新入库。 + +**正确逻辑:** 查询条件加 `AND created_at > now() - INTERVAL 30 DAY`, +只在近 30 天内去重,超过 30 天的同一职位可以重新入库。 + +**修复范围:** `batch_dedup_filter()` 的两个 SQL 查询各加一行 WHERE 条件。 + +--- + +### 1.2 DATA-02:统一入库管道(spiderJobs 推送) + +**位置:** `app/services/ingest/service.py`(已有 IngestService.store_batch) + +**现状(有缺口):** +- 后端 `cleaning.py`、`company_jobs_sync.py` 已通过 `IngestService.store_batch()` 入库 ✅ +- 外部脚本 `spiderJobs/` 是独立运行的,它们通过 HTTP 推送到服务端 API +- 服务端 API 接收推送数据后应该走 `IngestService`,需要确认 API 路由是否已经接入 + +**需确认:** 查看 `app/api/v1/` 里 spiderJobs 推送的接收端点,确认是否走 IngestService。 + +**设计决策:** +- spiderJobs 推送的应该是原始 JSON 数据 +- 接收端点 → IngestService.store_batch() → ClickHouse +- 来源字段(channel/platform)在 IngestService 中已经记录 + +--- + +### 1.3 DATA-03:公司清洗定时任务 + +**位置:** `app/services/company_cleaner.py`(335行) + +**现状(基本完整):** + +全链路已实现: +1. `collect_pending_companies()`: ClickHouse 查 30 天内有招聘的公司 ID ✅ +2. `process_pending_companies()`: 遍历 MySQL 清洗队列 → 爬公司详情 → 写 MySQL ✅ + - 用 `asyncio.to_thread(boss/qcwy/zhilian_service.get_company_detail)` ✅ +3. 同步写 company_jobs:调用 `company_jobs_sync.sync_company_jobs()` ✅ +4. `cleanup_old_records()`: 清理已处理记录 ✅ + +**缺口:** 日志链路需确认 `logger.info` 是否结构化(已使用 loguru ✅) + +**结论:** DATA-03 已基本完成,无需大改。 + +--- + +### 1.4 DATA-04:公司招聘信息写入 ClickHouse + +**位置:** `app/services/company_jobs_sync.py` + +**现状(部分完成):** + +```python +# 当前用 "mini" channel 和 "job" data_type 存公司职位 +store_result = await router.store_batch(source, "mini", "job", jobs) +``` + +**缺口:** 公司职位(company_jobs)和普通搜索职位(search_jobs)使用同一个注册配置, +无法区分来源,且 ClickHouse 表(boss_job/qcwy_job/zhilian_job)混入了不同来源。 + +**方案:** +- 新增 channel = "company"(区分公司关联职位 vs 搜索职位) +- 在 `ingest/configs/boss.py` 等添加 `channel="company"` 的额外配置 +- 或者保持现有 channel="mini",但 ClickHouse 表层面通过 `channel` 列区分 + +**决策(保守方案):** +- 现有表已有 `channel` 列,`company_jobs_sync.py` 改用 `channel="company"` 调用 +- 无需新建表,无 ClickHouse DDL 变更 +- 只需在 registry 补注册 `channel="company"` 的配置条目 + +--- + +## 2. 需确认:spiderJobs 推送接收端点 + +需要查看 `app/api/v1/` 是否有专门接收 spiderJobs 推送数据的端点, +以及该端点是否已经调用了 IngestService。 + +--- + +## 3. 计划分解 + +- **Plan 01(DATA-01):** 修改 `dedup.py` 的两个查询 SQL 加 30 天窗口条件(+新增 mock 测试) +- **Plan 02(DATA-02 + DATA-04):** 确认/修复推送 API 端点 + registry 补注册 `channel="company"` + company_jobs_sync 改用 `channel="company"` +- **(DATA-03 已完成,无需单独计划)**