from fastapi import FastAPI, File, UploadFile from fastapi.staticfiles import StaticFiles import os from shutil import copyfileobj import shutil import subprocess from fastapi.responses import FileResponse from uuid import uuid4 import logging app = FastAPI() app.mount("/tool", StaticFiles(directory="./web", html=True)) @app.get("/api/test/") async def root(): return {"message": "Hello World"} @app.post("/api/uploadfile/") async def create_upload_file(file: UploadFile = File(...)): try: # 创建一个唯一的文件夹来存储文件 folder_id = str(uuid4()) folder_path = f'./uploads/{folder_id}/' # 清空并创建新的文件夹 os.makedirs(folder_path, exist_ok=True) os.makedirs(os.path.join(folder_path, 'gen'), exist_ok=True) # 复制程序到目标路径 shutil.copy2("../dbc2c/coderdbc", folder_path) # 保存上传的文件 file_location = os.path.join(folder_path, file.filename) with open(file_location, "wb") as buffer: copyfileobj(file.file, buffer) # 执行外部程序 command = [ # coderdbc -dbc ./DMK-RP-77A.dbc -out ./gen -drvname CANmatrix -nodeutils -rw -driverdir -gendate os.path.join(folder_path, "coderdbc"), "-dbc", file_location, "-out", os.path.join(folder_path, "gen"), "-drvname", "CANmatrix", "-nodeutils", "-rw", "-driverdir", # 这个参数没有值,因此可以不传入任何值 "-gendate" ] # 执行命令并捕获输出 result = subprocess.run(command, capture_output=True, text=True) # 检查stderr是否为空,执行压缩操作 if result.stderr == "": # 确保 'CANmatrix' 文件夹存在 canmatrix_folder = os.path.join(folder_path, 'gen', 'CANmatrix') if not os.path.exists(canmatrix_folder): return {"error": "CANmatrix folder not found in gen directory"} zip_command = ['zip', '-r', 'CANmatrix.zip', 'CANmatrix'] # 使用 cwd 设置为 gen 目录 zip_result = subprocess.run(zip_command, cwd=os.path.join(folder_path, 'gen'), capture_output=True, text=True) if zip_result.stderr != "": return {"error": "Failed to create zip file", "stderr": zip_result.stderr} # 复制压缩包到output目录 output_folder = './output/' os.makedirs(output_folder, exist_ok=True) # 创建output文件夹(如果不存在) shutil.copy2(os.path.join(folder_path, 'gen/CANmatrix.zip'), os.path.join(output_folder, 'CANmatrix.zip')) # 删除UUID临时文件夹 shutil.rmtree(folder_path) # 返回结果 return { "info": f"File {file.filename} uploaded and processed successfully!", "coderdbc_stdout": result.stdout, "coderdbc_stderr": result.stderr, } except Exception as e: return {"error": str(e)} @app.get("/api/download/{filename}") async def download_file(filename: str): try: # 设置文件保存的路径 output_folder = './output/' file_path = os.path.join(output_folder, filename) # 检查文件是否存在 if not os.path.exists(file_path): return JSONResponse(content={"message": "File not found"}, status_code=404) # 返回文件响应 return FileResponse(path=file_path, filename=filename, media_type="application/zip") except Exception as e: return JSONResponse(content={"message": f"Error: {str(e)}"}, status_code=500) def process_file(file_path, output_path): with open(file_path, 'r') as file: content = file.read() with open(output_path, 'w') as file: file.write(content.upper()) if __name__ == "__main__": import uvicorn # 配置日志记录 log_dir = "../logs" if not os.path.exists(log_dir): os.makedirs(log_dir) logging.basicConfig( level=logging.INFO, # 记录 INFO 级别及以上的日志 format="%(asctime)s - %(levelname)s - %(message)s", # 日志格式 handlers=[ logging.FileHandler(log_file), # 保存到文件 logging.StreamHandler() # 输出到控制台 ] ) logger = logging.getLogger(__name__) # 设置日志文件路径 log_file = os.path.join(log_dir, "app.log") uvicorn.run(app, host="0.0.0.0", port=8000)