docs(phase-5): add research and 2 plans for data pipeline optimization

This commit is contained in:
win 2026-03-21 19:44:56 +08:00
parent 6a2f0bfb58
commit 9ef31cc87e
3 changed files with 370 additions and 0 deletions

View File

@ -0,0 +1,110 @@
---
phase: 5
plan: 1
wave: 1
title: "30 天窗口去重修复DATA-01"
depends_on: []
files_modified:
- app/services/ingest/dedup.py
- tests/ingest/test_dedup.py # NEW
autonomous: true
requirements:
- DATA-01
---
# Phase 5 Plan 01: 30 天窗口去重修复DATA-01
## Objective
`dedup.py``batch_dedup_filter()` 目前查全量历史数据去重,导致同一职位超过 30 天后无法重新入库。
本 Plan 在去重查询 SQL 中加入 30 天时间窗口。
## Must Haves
- [ ] 单字段 dedup SQL 加 `AND created_at > now() - INTERVAL 30 DAY`
- [ ] 双字段 dedup SQL 同样加 30 天窗口
- [ ] 新增 Mock 测试(用 `AsyncMock` 模拟 ClickHouse 客户端)覆盖:
- 30 天内有记录 → 视为重复,过滤
- 30 天外有记录 → 不重复,允许入库
- 无记录 → 允许入库
- [ ] `pipenv run python -m pytest tests/ingest/test_dedup.py -v` 全部通过
- [ ] 全量回归 `pytest tests/ -v` 无失败
---
## Wave 1
### Task 1.1: 修改 dedup.py
<read_first>
- `app/services/ingest/dedup.py`(当前 81 行)
</read_first>
<action>
修改 `batch_dedup_filter()` 中的两个 SQL 查询:
**单字段去重(第 51 行附近):**
```python
# 修改前
query = f"SELECT {key_col} FROM {table} WHERE {key_col} IN {{keys:Array(String)}}"
# 修改后
query = (
f"SELECT {key_col} FROM {table} "
f"WHERE {key_col} IN {{keys:Array(String)}} "
f"AND created_at > now() - INTERVAL 30 DAY"
)
```
**双字段去重(第 65 行附近):**
```python
# 修改前
query = f"SELECT {c1}, {c2} FROM {table} WHERE {c1} IN {{keys:Array(String)}}"
# 修改后
query = (
f"SELECT {c1}, {c2} FROM {table} "
f"WHERE {c1} IN {{keys:Array(String)}} "
f"AND created_at > now() - INTERVAL 30 DAY"
)
```
</action>
<acceptance_criteria>
- `grep "INTERVAL 30 DAY" app/services/ingest/dedup.py` 有两处输出
- `grep "INTERVAL 30 DAY" app/services/ingest/dedup.py | wc -l` 输出为 2
</acceptance_criteria>
---
### Task 1.2: 新增 Mock 测试
<action>
创建 `tests/ingest/__init__.py`(空文件)和 `tests/ingest/test_dedup.py`
测试覆盖:
1. `test_single_field_dedup_within_30_days`30 天内有相同 job_id → 过滤duplicate=1
2. `test_single_field_dedup_outside_30_days`:先不存在 → 允许入库duplicate=0模拟 30 天外无记录)
3. `test_double_field_dedup_within_30_days`:双字段在 30 天内有记录 → 过滤
4. `test_dedup_empty_input`:空输入 → 直接返回,不查 ClickHouse
5. `test_dedup_no_dedup_columns`:无 dedup 字段 → 跳过过滤
6. `test_build_insert_row_has_channel`:验证 build_insert_row 生成的行包含 channel 列
SQL 验证:用 `mock_client.query.call_args[0][0]` 断言 SQL 中包含 `INTERVAL 30 DAY`
</action>
---
## Verification
```bash
# 1. 验证 SQL 变更
grep "INTERVAL 30 DAY" app/services/ingest/dedup.py
# 预期输出2 行
# 2. 运行新增测试
pipenv run python -m pytest tests/ingest/test_dedup.py -v
# 3. 全量回归
pipenv run python -m pytest tests/ -v --tb=short
```

View File

@ -0,0 +1,155 @@
---
phase: 5
plan: 2
wave: 2
title: "公司招聘信息 channel 区分DATA-04+ 推送 API 确认DATA-02"
depends_on:
- "05-01-PLAN.md"
files_modified:
- app/services/ingest/configs/boss.py
- app/services/ingest/configs/qcwy.py
- app/services/ingest/configs/zhilian.py
- app/schemas/ingest.py
- app/services/company_jobs_sync.py
autonomous: true
requirements:
- DATA-02
- DATA-04
---
# Phase 5 Plan 02: company_job channel 区分DATA-04 + DATA-02 确认)
## Objective
### DATA-02确认已完成
`app/api/v1/job/job.py` 已有三个推送接收端点:
- `POST /data/store`(单条)
- `POST /data/batch`(批量同步)
- `POST /data/batch-async`(批量异步)
全部走 `IngestService``channel``platform` 由调用方在请求体中指定。
**spiderJobs 只需调用这些 APIDATA-02 不需要后端改动。**
### DATA-04需要修复
`company_jobs_sync.py` 目前调用 `router.store_batch(source, "mini", "job", jobs)`
公司职位和搜索职位混用同一个 `channel="mini"`,无法区分来源。
**修复:**
1. `ChannelType` 枚举加 `COMPANY = "company"`
2. 各平台 `ingest/configs/*.py` 注册 `channel="company"` 的配置
3. `company_jobs_sync.py` 调用改为 `channel="company"`
## Must Haves
- [ ] `app/schemas/ingest.py``ChannelType` 枚举加 `COMPANY = "company"`
- [ ] `ingest/configs/boss.py` 注册 `channel="company", data_type="job"` 配置(共用 boss_job 表)
- [ ] `ingest/configs/qcwy.py` 同样注册 `channel="company"` 配置
- [ ] `ingest/configs/zhilian.py` 同样注册 `channel="company"` 配置
- [ ] `company_jobs_sync.py``store_batch(source, "mini", "job", jobs)``store_batch(source, "company", "job", jobs)`
- [ ] `pipenv run python -c "from app.services.ingest import IngestService; print(IngestService.get_registry_info())"` 显示 channel=company 条目
- [ ] 全量回归 `pytest tests/` 无失败
---
## Wave 2依赖 Plan 01 完成)
### Task 2.1: 更新 ChannelType 枚举
<read_first>
- `app/schemas/ingest.py`(需读取确认当前 ChannelType 定义)
</read_first>
<action>
`ChannelType` 枚举中添加:
```python
COMPANY = "company"
```
</action>
---
### Task 2.2: 各平台 configs 注册 channel="company" 配置
<read_first>
- `app/services/ingest/configs/boss.py`(当前有 mini+job、mini+company 两条注册)
- `app/services/ingest/configs/qcwy.py`
- `app/services/ingest/configs/zhilian.py`
</read_first>
<action>
在三个 configs 文件中各添加一条新注册:
**boss.py 追加:**
```python
register(PlatformConfig(
platform="boss", channel="company", data_type="job",
table="boss_job", # 共用同一张表,通过 channel 列区分来源
dedup_fields=(DedupFieldSpec(column="job_id", extractor=_extract_job_id),),
))
```
**qcwy.py 追加:**
```python
register(PlatformConfig(
platform="qcwy", channel="company", data_type="job",
table="qcwy_job",
dedup_fields=(
DedupFieldSpec(column="job_id", extractor=_extract_job_id),
DedupFieldSpec(column="update_date_time", extractor=_extract_update_dt),
),
))
```
**zhilian.py 追加:**(需读取 zhilian.py 确认 extractor 函数名)
```python
register(PlatformConfig(
platform="zhilian", channel="company", data_type="job",
table="zhilian_job",
dedup_fields=(DedupFieldSpec(column="job_number", extractor=_extract_job_number),),
))
```
</action>
---
### Task 2.3: 修改 company_jobs_sync.py
<action>
`sync_company_jobs()` 中:
```python
# 修改前
store_result = await router.store_batch(source, "mini", "job", jobs)
# 修改后
store_result = await router.store_batch(source, "company", "job", jobs)
```
</action>
<acceptance_criteria>
- `grep "company.*job" app/services/ingest/configs/boss.py` 有输出
- `grep '"company"' app/services/company_jobs_sync.py` 有输出
- `grep '"mini"' app/services/company_jobs_sync.py` 无输出(全部替换)
</acceptance_criteria>
---
## Verification
```bash
# 1. 确认 channel=company 注册存在
pipenv run python -c "
from app.services.ingest.configs import boss, qcwy, zhilian # 触发 register()
from app.services.ingest.registry import list_configs
company_configs = [c for c in list_configs() if c.channel == 'company']
print(f'channel=company 配置数: {len(company_configs)}')
for c in company_configs:
print(f' {c.platform} / {c.channel} / {c.data_type} → {c.table}')
assert len(company_configs) >= 3, '❌ 缺少注册配置'
print('✅ 注册配置完整')
"
# 2. 全量回归
pipenv run python -m pytest tests/ -v --tb=short
```

View File

@ -0,0 +1,105 @@
# Phase 5: 数据管道优化 — 技术研究
**研究日期:** 2026-03-21
**阶段目标:** 入库去重可靠30天窗口、公司清洗流程顺畅、公司招聘信息写入 ClickHouse
---
## 1. 现状分析
### 1.1 DATA-0130 天窗口去重
**位置:** `app/services/ingest/dedup.py``batch_dedup_filter()`
**现状(有缺口):**
```sql
-- 当前去重 SQL无时间窗口
SELECT job_id FROM job_data.boss_job WHERE job_id IN {keys}
```
**缺口:** 查的是全量历史数据,同一职位 30 天内只入库一次的语义没有实现。
理论上 ClickHouse 表可能有几年的旧数据,老 job_id 永远不会重新入库。
**正确逻辑:** 查询条件加 `AND created_at > now() - INTERVAL 30 DAY`
只在近 30 天内去重,超过 30 天的同一职位可以重新入库。
**修复范围:** `batch_dedup_filter()` 的两个 SQL 查询各加一行 WHERE 条件。
---
### 1.2 DATA-02统一入库管道spiderJobs 推送)
**位置:** `app/services/ingest/service.py`(已有 IngestService.store_batch
**现状(有缺口):**
- 后端 `cleaning.py``company_jobs_sync.py` 已通过 `IngestService.store_batch()` 入库 ✅
- 外部脚本 `spiderJobs/` 是独立运行的,它们通过 HTTP 推送到服务端 API
- 服务端 API 接收推送数据后应该走 `IngestService`,需要确认 API 路由是否已经接入
**需确认:** 查看 `app/api/v1/` 里 spiderJobs 推送的接收端点,确认是否走 IngestService。
**设计决策:**
- spiderJobs 推送的应该是原始 JSON 数据
- 接收端点 → IngestService.store_batch() → ClickHouse
- 来源字段channel/platform在 IngestService 中已经记录
---
### 1.3 DATA-03公司清洗定时任务
**位置:** `app/services/company_cleaner.py`335行
**现状(基本完整):**
全链路已实现:
1. `collect_pending_companies()`: ClickHouse 查 30 天内有招聘的公司 ID ✅
2. `process_pending_companies()`: 遍历 MySQL 清洗队列 → 爬公司详情 → 写 MySQL ✅
- 用 `asyncio.to_thread(boss/qcwy/zhilian_service.get_company_detail)`
3. 同步写 company_jobs调用 `company_jobs_sync.sync_company_jobs()`
4. `cleanup_old_records()`: 清理已处理记录 ✅
**缺口:** 日志链路需确认 `logger.info` 是否结构化(已使用 loguru ✅)
**结论:** DATA-03 已基本完成,无需大改。
---
### 1.4 DATA-04公司招聘信息写入 ClickHouse
**位置:** `app/services/company_jobs_sync.py`
**现状(部分完成):**
```python
# 当前用 "mini" channel 和 "job" data_type 存公司职位
store_result = await router.store_batch(source, "mini", "job", jobs)
```
**缺口:** 公司职位company_jobs和普通搜索职位search_jobs使用同一个注册配置
无法区分来源,且 ClickHouse 表boss_job/qcwy_job/zhilian_job混入了不同来源。
**方案:**
- 新增 channel = "company"(区分公司关联职位 vs 搜索职位)
- 在 `ingest/configs/boss.py` 等添加 `channel="company"` 的额外配置
- 或者保持现有 channel="mini",但 ClickHouse 表层面通过 `channel` 列区分
**决策(保守方案):**
- 现有表已有 `channel` 列,`company_jobs_sync.py` 改用 `channel="company"` 调用
- 无需新建表,无 ClickHouse DDL 变更
- 只需在 registry 补注册 `channel="company"` 的配置条目
---
## 2. 需确认spiderJobs 推送接收端点
需要查看 `app/api/v1/` 是否有专门接收 spiderJobs 推送数据的端点,
以及该端点是否已经调用了 IngestService。
---
## 3. 计划分解
- **Plan 01DATA-01** 修改 `dedup.py` 的两个查询 SQL 加 30 天窗口条件(+新增 mock 测试)
- **Plan 02DATA-02 + DATA-04** 确认/修复推送 API 端点 + registry 补注册 `channel="company"` + company_jobs_sync 改用 `channel="company"`
- **DATA-03 已完成,无需单独计划)**