refactor: 优化响应格式和错误处理 fix: 修复文件上传类型校验和删除无用PDF文件 perf: 添加估值评估审核时间字段和查询条件 docs: 更新Docker镜像版本至v1.8 test: 添加响应格式检查脚本 style: 统一API响应数据结构 chore: 清理无用静态文件和更新构建脚本
105 lines
3.5 KiB
Python
105 lines
3.5 KiB
Python
import json
|
|
from typing import Dict, Any, List, Tuple
|
|
|
|
from fastapi import FastAPI
|
|
|
|
from app import create_app
|
|
|
|
|
|
def load_openapi(app: FastAPI) -> Dict[str, Any]:
|
|
return app.openapi()
|
|
|
|
|
|
def is_object_schema(schema: Dict[str, Any]) -> bool:
|
|
return schema.get("type") == "object"
|
|
|
|
|
|
def get_schema_props(schema: Dict[str, Any]) -> Dict[str, Any]:
|
|
return schema.get("properties", {}) if schema else {}
|
|
|
|
|
|
def check_success_schema(props: Dict[str, Any]) -> Tuple[bool, List[str]]:
|
|
issues: List[str] = []
|
|
code_prop = props.get("code")
|
|
msg_prop = props.get("msg")
|
|
data_prop = props.get("data")
|
|
if code_prop is None:
|
|
issues.append("缺少字段: code")
|
|
elif code_prop.get("type") != "integer":
|
|
issues.append(f"code类型错误: {code_prop.get('type')}")
|
|
if msg_prop is None:
|
|
issues.append("缺少字段: msg")
|
|
elif msg_prop.get("type") != "string":
|
|
issues.append(f"msg类型错误: {msg_prop.get('type')}")
|
|
if data_prop is None:
|
|
issues.append("缺少字段: data")
|
|
else:
|
|
tp = data_prop.get("type")
|
|
if tp != "object":
|
|
issues.append(f"data类型错误: {tp}")
|
|
return (len(issues) == 0, issues)
|
|
|
|
|
|
def check_paths(openapi: Dict[str, Any]) -> Dict[str, Any]:
|
|
paths = openapi.get("paths", {})
|
|
compliant: List[Dict[str, Any]] = []
|
|
non_compliant: List[Dict[str, Any]] = []
|
|
for path, ops in paths.items():
|
|
for method, meta in ops.items():
|
|
op_id = meta.get("operationId")
|
|
tags = meta.get("tags", [])
|
|
responses = meta.get("responses", {})
|
|
success = responses.get("200") or responses.get("201")
|
|
if not success:
|
|
non_compliant.append({
|
|
"path": path,
|
|
"method": method.upper(),
|
|
"operationId": op_id,
|
|
"tags": tags,
|
|
"issues": ["无成功响应模型(200/201)"],
|
|
})
|
|
continue
|
|
content = success.get("content", {}).get("application/json", {})
|
|
schema = content.get("schema")
|
|
if not schema:
|
|
non_compliant.append({
|
|
"path": path,
|
|
"method": method.upper(),
|
|
"operationId": op_id,
|
|
"tags": tags,
|
|
"issues": ["成功响应未声明JSON Schema"],
|
|
})
|
|
continue
|
|
props = get_schema_props(schema)
|
|
ok, issues = check_success_schema(props)
|
|
rec = {
|
|
"path": path,
|
|
"method": method.upper(),
|
|
"operationId": op_id,
|
|
"tags": tags,
|
|
}
|
|
if ok:
|
|
compliant.append(rec)
|
|
else:
|
|
non_compliant.append({**rec, "issues": issues})
|
|
total = len(compliant) + len(non_compliant)
|
|
rate = 0 if total == 0 else round(len(compliant) / total * 100, 2)
|
|
return {
|
|
"compliant": compliant,
|
|
"non_compliant": non_compliant,
|
|
"stats": {"total": total, "compliant": len(compliant), "non_compliant": len(non_compliant), "rate": rate},
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
app = create_app()
|
|
openapi = load_openapi(app)
|
|
result = check_paths(openapi)
|
|
print(json.dumps(result, ensure_ascii=False, indent=2))
|
|
with open("scripts/response_format_report.json", "w", encoding="utf-8") as f:
|
|
json.dump(result, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|