JobData/app/api/v1/cleaning/cleaning.py

322 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, File, UploadFile, Form, Body, Query
from app.services.cleaning import CleaningService
from app.services.company_cleaner import company_cleaner
from app.controllers.cleaning import cleaning_controller
from app.schemas import Success, SuccessExtra
from app.models.cleaning import CleaningTask
from app.core.clickhouse import clickhouse_manager
from tortoise.expressions import Q
from typing import Optional
import json
router = APIRouter()
cleaning_service = CleaningService()
@router.get("/stats", summary="获取公司清洗统计信息")
async def get_stats():
"""获取 ClickHouse 中待处理公司的统计信息"""
client = await clickhouse_manager.get_client()
pending_sql = "SELECT count() FROM job_data.pending_company FINAL WHERE status = 'pending'"
pending_res = await client.query(pending_sql)
pending_count = pending_res.result_rows[0][0] if pending_res.result_rows else 0
today_sql = "SELECT count() FROM job_data.pending_company FINAL WHERE status = 'done' AND toDate(updated_at) = today()"
today_res = await client.query(today_sql)
today_count = today_res.result_rows[0][0] if today_res.result_rows else 0
dist_sql = """
SELECT source, status, count()
FROM job_data.pending_company FINAL
GROUP BY source, status
ORDER BY source, status
"""
dist_res = await client.query(dist_sql)
stats = {
"total_pending": pending_count,
"today_processed": today_count,
"details": []
}
# Process distribution
source_stats = {}
for row in dist_res.result_rows:
source, status, count = row
if source not in source_stats:
source_stats[source] = {"pending": 0, "done": 0, "failed": 0, "total": 0}
if status in source_stats[source]:
source_stats[source][status] = count
source_stats[source]["total"] += count
stats["details"] = [
{"source": k, **v} for k, v in source_stats.items()
]
return Success(data=stats)
@router.get("/companies", summary="获取公司清洗列表")
async def get_companies_list(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
source: Optional[str] = Query(None),
status: Optional[str] = Query(None)
):
"""分页获取待处理公司列表详情"""
client = await clickhouse_manager.get_client()
offset = (page - 1) * page_size
where_clauses = []
if source:
where_clauses.append(f"source = '{source}'")
if status:
where_clauses.append(f"status = '{status}'")
where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
# Count
count_sql = f"SELECT count() FROM job_data.pending_company FINAL {where_sql}"
count_res = await client.query(count_sql)
total = count_res.result_rows[0][0] if count_res.result_rows else 0
# Data
sql = f"""
SELECT source, company_id, company_name, status, error_msg, created_at, updated_at
FROM job_data.pending_company FINAL
{where_sql}
ORDER BY updated_at DESC
LIMIT {page_size} OFFSET {offset}
"""
res = await client.query(sql)
data = []
for row in res.result_rows:
data.append({
"source": row[0],
"company_id": row[1],
"company_name": row[2],
"status": row[3],
"error_msg": row[4],
"created_at": row[5].isoformat() if row[5] else None,
"updated_at": row[6].isoformat() if row[6] else None
})
return SuccessExtra(data=data, total=total, page=page, page_size=page_size)
@router.get("/company-detail", summary="获取公司清洗详情")
async def get_company_cleaning_detail(
source: str = Query(..., description="数据源"),
company_id: str = Query(..., description="公司ID"),
company_name: Optional[str] = Query(None, description="公司名称"),
):
client = await clickhouse_manager.get_client()
table_map = {
"boss": "boss_company",
"qcwy": "qcwy_company",
"zhilian": "zhilian_company",
}
table = table_map.get(source)
if not table:
return Success(code=400, msg="不支持的数据源")
if source == "qcwy":
sql = f"""
SELECT json_data, company_name, created_at, updated_at
FROM job_data.{table}
WHERE JSONExtractString(json_data, 'companyId') = {{company_id:String}}
OR JSONExtractString(json_data, 'coId') = {{company_id:String}}
OR JSONExtractString(json_data, 'coinfo', 'coid') = {{company_id:String}}
OR company_name = {{company_name:String}}
ORDER BY updated_at DESC
LIMIT 1
"""
params = {
"company_id": str(company_id),
"company_name": str(company_name or ""),
}
else:
if not company_name:
return Success(code=400, msg="缺少公司名称")
sql = f"""
SELECT json_data, company_name, created_at, updated_at
FROM job_data.{table}
WHERE company_name = {{company_name:String}}
ORDER BY updated_at DESC
LIMIT 1
"""
params = {"company_name": str(company_name)}
print(f"DEBUG: Executing SQL: {sql}")
print(f"DEBUG: Params: {params}")
res = await client.query(sql, parameters=params)
if not res.result_rows:
return Success(code=404, msg="未找到公司清洗结果")
row = res.result_rows[0]
raw_json = row[0]
try:
data = json.loads(raw_json)
except Exception:
data = {"raw": raw_json}
return Success(
data={
"source": source,
"company_id": company_id,
"company_name": row[1],
"created_at": row[2].isoformat() if row[2] else None,
"updated_at": row[3].isoformat() if row[3] else None,
"data": data,
}
)
@router.post("/collect-pending", summary="分析待处理数据")
async def collect_pending_companies_api(
limit: int = Body(1000, embed=True, ge=1, le=10000),
source: Optional[str] = Body(None, embed=True)
):
"""
分析招聘数据收集待处理的公司ID到 pending_company 表
"""
await company_cleaner.collect_pending_companies(limit=limit, source=source)
return Success(msg=f"已完成数据分析,已收集待处理公司(上限 {limit} 条)")
@router.post("/run-pending", summary="手动执行待处理公司清洗")
async def run_pending_companies(
limit: int = Body(100, embed=True, ge=1, le=5000),
source: Optional[str] = Body(None, embed=True),
proxy: Optional[str] = Body(None, embed=True),
max_delay_seconds: int = Body(5, embed=True),
):
"""
手动触发待处理公司清洗任务
仅会处理当前状态为 pending 的记录,已经处理过的记录不会重复执行。
"""
await company_cleaner.process_pending_companies(
limit=limit,
source=source,
proxy=proxy,
max_delay_seconds=max_delay_seconds,
)
return Success(msg=f"已触发执行最近 {limit} 条待处理公司清洗任务")
@router.post("/crawl-execute", summary="爬取并执行待处理公司清洗")
async def crawl_execute_pending(
limit: int = Body(100, embed=True, ge=1, le=5000),
source: Optional[str] = Body(None, embed=True),
proxy: Optional[str] = Body(None, embed=True),
max_delay_seconds: int = Body(5, embed=True),
):
await company_cleaner.collect_pending_companies(source=source)
await company_cleaner.process_pending_companies(
limit=limit,
source=source,
proxy=proxy,
max_delay_seconds=max_delay_seconds,
)
return Success(msg=f"已触发爬取并执行最近 {limit} 条待处理公司清洗任务")
@router.post("/process-company", summary="执行单个公司清洗任务")
async def process_single_company_api(
source: str = Body(..., embed=True),
company_id: str = Body(..., embed=True),
proxy: Optional[str] = Body(None, embed=True),
max_delay_seconds: int = Body(5, embed=True),
):
result = await company_cleaner.process_single_company(
source=source,
company_id=company_id,
proxy=proxy,
max_delay_seconds=max_delay_seconds,
)
success = bool(result.get("success"))
msg = "任务执行成功" if success else "任务执行失败"
return Success(msg=msg, data=result)
@router.post("/upload", summary="上传文件并保存任务")
async def upload_file(
file: UploadFile = File(...),
clean_type: str = Form("auto"),
platform: str = Form("auto"),
proxy: Optional[str] = Form(None)
):
targets = await cleaning_service.parse_file(file)
tasks = [
CleaningTask(
target=t,
clean_type=clean_type,
platform=platform,
proxy=proxy,
status="pending"
) for t in targets
]
if tasks:
await CleaningTask.bulk_create(tasks)
return Success(msg=f"Successfully imported {len(tasks)} tasks")
@router.get("/list", summary="获取清洗任务列表")
async def list_tasks(
page: int = Query(1, description="页码"),
page_size: int = Query(10, description="每页数量"),
target: str = Query(None, description="目标搜索"),
status: str = Query(None, description="状态筛选"),
clean_type: str = Query(None, description="清洗类型筛选")
):
q = Q()
if target:
q &= Q(target__contains=target)
if status:
q &= Q(status=status)
if clean_type:
q &= Q(clean_type=clean_type)
total, tasks = await cleaning_controller.list(page=page, page_size=page_size, search=q, order=["-created_at"])
data = [await t.to_dict() for t in tasks]
return SuccessExtra(data=data, total=total, page=page, page_size=page_size)
@router.post("/process/{task_id}", summary="处理单个任务")
async def process_task(task_id: int):
task = await cleaning_controller.get(id=task_id)
if not task:
return Success(code=404, msg="Task not found")
task.status = "processing"
await task.save()
result = await cleaning_service.process_single_item(
target=task.target,
clean_type=task.clean_type,
platform=task.platform,
proxy=task.proxy,
)
task.status = "success" if result.get("success") else "fail"
task.storage_status = result.get("storage_status", "unknown")
task.remote_sent = result.get("remote_sent", False)
task.result_summary = result.get("data_summary")
task.error_msg = result.get("error")
await task.save()
return Success(data=await task.to_dict(), msg="Task processed")
@router.delete("/delete", summary="删除任务")
async def delete_task(
id: int = Query(..., description="任务ID")
):
await cleaning_controller.remove(id=id)
return Success(msg="Deleted Successfully")
@router.post("/clear", summary="清空所有任务")
async def clear_tasks():
await CleaningTask.all().delete()
return Success(msg="All tasks cleared")