refactor: 优化API路由和响应模型 feat(admin): 添加App用户管理接口 feat(sms): 实现阿里云短信服务集成 feat(email): 添加SMTP邮件发送功能 feat(upload): 支持文件上传接口 feat(rate-limiter): 实现手机号限流器 fix: 修复计算步骤入库问题 docs: 更新API文档和测试计划 chore: 更新依赖和配置
90 lines
3.0 KiB
Python
90 lines
3.0 KiB
Python
import os
|
|
from pathlib import Path
|
|
from typing import List
|
|
from fastapi import UploadFile
|
|
from app.schemas.upload import ImageUploadResponse, FileUploadResponse
|
|
from app.settings.config import settings
|
|
|
|
class UploadController:
|
|
"""文件上传控制器"""
|
|
|
|
@staticmethod
|
|
async def upload_image(file: UploadFile) -> ImageUploadResponse:
|
|
"""
|
|
上传图片
|
|
:param file: 上传的图片文件
|
|
:return: 图片URL和文件名
|
|
"""
|
|
# 检查文件类型
|
|
if not file.content_type.startswith('image/'):
|
|
raise ValueError("只支持上传图片文件")
|
|
|
|
# 获取项目根目录
|
|
base_dir = Path(__file__).resolve().parent.parent
|
|
# 图片保存目录
|
|
upload_dir = base_dir / "static" / "images"
|
|
|
|
# 确保目录存在
|
|
if not upload_dir.exists():
|
|
upload_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# 生成文件名
|
|
filename = file.filename
|
|
file_path = upload_dir / filename
|
|
|
|
# 如果文件已存在,重命名
|
|
counter = 1
|
|
while file_path.exists():
|
|
name, ext = os.path.splitext(filename)
|
|
filename = f"{name}_{counter}{ext}"
|
|
file_path = upload_dir / filename
|
|
counter += 1
|
|
|
|
# 保存文件
|
|
content = await file.read()
|
|
with open(file_path, "wb") as f:
|
|
f.write(content)
|
|
|
|
# 返回完整的可访问URL
|
|
return ImageUploadResponse(
|
|
url=f"{settings.BASE_URL}/static/images/{filename}",
|
|
filename=filename
|
|
)
|
|
|
|
@staticmethod
|
|
async def upload_file(file: UploadFile) -> FileUploadResponse:
|
|
allowed = {
|
|
"application/pdf",
|
|
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
|
"application/msword",
|
|
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
"application/vnd.ms-excel",
|
|
"application/zip",
|
|
"application/x-zip-compressed",
|
|
}
|
|
if file.content_type not in allowed:
|
|
raise ValueError("不支持的文件类型")
|
|
|
|
base_dir = Path(__file__).resolve().parent.parent
|
|
upload_dir = base_dir / "static" / "files"
|
|
if not upload_dir.exists():
|
|
upload_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
filename = file.filename
|
|
file_path = upload_dir / filename
|
|
counter = 1
|
|
while file_path.exists():
|
|
name, ext = os.path.splitext(filename)
|
|
filename = f"{name}_{counter}{ext}"
|
|
file_path = upload_dir / filename
|
|
counter += 1
|
|
|
|
content = await file.read()
|
|
with open(file_path, "wb") as f:
|
|
f.write(content)
|
|
|
|
return FileUploadResponse(
|
|
url=f"{settings.BASE_URL}/static/files/{filename}",
|
|
filename=filename,
|
|
content_type=file.content_type,
|
|
) |