异步任务,超实是用D用 Web 开发中经常遇到的问题,比如说用户提交了一个请求,使实现虽然这个请求对应的个异任务非常耗时,但是步任不能让用户等在这里,通常需要立即返回结果,作流告诉用户任务已提交。超实任务可以在后续慢慢完成,用D用完成后再给用户发一个完成的使实现通知。云南idc服务商 今天分享一份代码,个异使用 Celery、步任RabbitMQ 和 MongoDB 实现一个异步任务工作流,作流你可以修改 task.py 来实现你自己的超实异步任务。 架构图如下: 其中 Celery 来执行异步任务,用D用RabbitMQ 作为消息队列,使实现MongoDB 存储任务执行结果,FastAPI 提供 Web 接口。 以上所有模块均可使用 Docker 一键部署。 下面为 Demo 使用方法: 1、确保本机已安装 Docker、Git 2、云服务器下载源代码: 3、部署并启动: cd async-demo 4、启动一个异步任务: 任务会发送到消息队列,同时会立即返回一个任务 id: ❯ curl -X POST http://localhost:8080/process 5、查询任务状态: 任务完成后的返回结果如下: ❯ curl -X POST http://localhost:8080/check_progress/a129c666-7b5b-45f7-ba54-9d7b96a1fe58 代码目录结构如下: 其中 app.py 如下: from fastapi import FastAPI from celery.result import AsyncResult from tasks import start_processing from loguru import logger from pymongo import MongoClient import uvicorn # Lets create a connection to our backend where celery stores the results client = MongoClient("mongodb://mongodb:27017") # Default database and collection names that Celery create db = client[task_results] coll = db["celery_taskmeta"] app = FastAPI() @app.post(/process) async def process_text_file(): Process endpoint to trigger the start of a process try: result = start_processing.delay() logger.info(fStarted processing the task with id { result.id}) return { "status": result.state, id: result.id, error: } except Exception as e: logger.info(fTask Execution failed: { e}) return { "status": "FAILURE", id: None, error: e } @app.post(/check_progress/{ task_id}) async def check_async_progress(task_id: str): Endpoint to check the task progress and fetch the results if the task is complete. try: result = AsyncResult(task_id) if result.ready(): data = coll.find({ _id: task_id})[0] return { status: SUCEESS, data: data[result]} else: return { "status": result.state, "error": } except Exception as e: data = coll.find({ _id: task_id})[0] if data: return { status: SUCEESS, data: data[result]} return { status: Task ID invalid, error: e} if __name__ == "__main__": 如果要实现自己的任务队列,就修改 task.py 来添加自己的异步任务,可以整合到自己的项目中。