软件:

您应该在调用uvicorn.run 之前启动您的线程,因为uvicorn.run 正在阻塞线程。

PS:在您的问题中,您声明您希望后台任务每 5 分钟运行一次,但在您的代码中您说每 5 秒。下面的示例假定这是您想要的后者。如果您希望它改为每 5 分钟执行一次,请将时间调整为 60 * 5。

选项 1

import time
import threading
from fastapi import FastAPI
import uvicorn
app = FastAPI()
class BackgroundTasks(threading.Thread):

图片[1]-软件:-唐朝资源网

def run(self,*args,**kwargs): while True: print('Hello') time.sleep(5) if __name__ == '__main__': t = BackgroundTasks() t.start()

图片[2]-软件:-唐朝资源网

uvicorn.run(app, host="0.0.0.0", port=8000)

您也可以使用 FastAPI 的 启动您的线程,只要在应用程序启动前运行即可。

@app.on_event("startup")
async def startup_event():
    t = BackgroundTasks()
    t.start()

图片[3]-软件:-唐朝资源网

选项 2

您可以改为使用重复的Event scheduler 来执行后台任务,如下所示:

import sched, time
from threading import Thread
from fastapi import FastAPI
import uvicorn
app = FastAPI()

s = sched.scheduler(time.time, time.sleep)
def print_event(sc): 
    print("Hello")
    sc.enter(5, 1, print_event, (sc,))
def start_scheduler():
    s.enter(5, 1, print_event, (s,))

图片[4]-软件:-唐朝资源网

s.run() @app.on_event("startup") async def startup_event(): thread = Thread(target = start_scheduler) thread.start() if __name__ == '__main__': uvicorn.run(app, host="0.0.0.0", port=8000)

© 版权声明
THE END
喜欢就支持一下吧
点赞133 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片