您应该在调用uvicorn.run 之前启动您的线程,因为uvicorn.run 正在阻塞线程。
PS:在您的问题中,您声明您希望后台任务每 5 分钟运行一次,但在您的代码中您说每 5 秒。下面的示例假定这是您想要的后者。如果您希望它改为每 5 分钟执行一次,请将时间调整为 60 * 5。
选项 1
import time
import threading
from fastapi import FastAPI
import uvicorn
app = FastAPI()
class BackgroundTasks(threading.Thread):
def run(self,*args,**kwargs):
while True:
print('Hello')
time.sleep(5)
if __name__ == '__main__':
t = BackgroundTasks()
t.start()
uvicorn.run(app, host="0.0.0.0", port=8000)
您也可以使用 FastAPI 的 启动您的线程,只要在应用程序启动前运行即可。
@app.on_event("startup")
async def startup_event():
t = BackgroundTasks()
t.start()
选项 2
您可以改为使用重复的Event scheduler 来执行后台任务,如下所示:
import sched, time
from threading import Thread
from fastapi import FastAPI
import uvicorn
app = FastAPI()
s = sched.scheduler(time.time, time.sleep)
def print_event(sc):
print("Hello")
sc.enter(5, 1, print_event, (sc,))
def start_scheduler():
s.enter(5, 1, print_event, (s,))
s.run()
@app.on_event("startup")
async def startup_event():
thread = Thread(target = start_scheduler)
thread.start()
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=8000)
© 版权声明
本站下载的源码均来自公开网络收集转发二次开发而来,
若侵犯了您的合法权益,请来信通知我们1413333033@qq.com,
我们会及时删除,给您带来的不便,我们深表歉意。
下载用户仅供学习交流,若使用商业用途,请购买正版授权,否则产生的一切后果将由下载用户自行承担,访问及下载者下载默认同意本站声明的免责申明,请合理使用切勿商用。
THE END
暂无评论内容