首页 > Python基础教程 >
-
带你认识 flask 后台作业
01
任务类别简介
任务进程为后台作业提供了一个便捷的解决方案。Worker过程独立于应用程序运行,甚至可以位于不同的系统上。应用程序和worker之间的通信是通过消息完成的。通过与物理相互作用来监视其进度。下图展示了一个典型的实现:
Python中最流行的任务类别是Celery。这是一个相当复杂的重叠,它有很多选项并支持多个消息示例。另一个流行的Python任务位置是Redis Queue(RQ),它牺牲了一些替代,,仅支持Redis消息本身,但作为交换,它的建立要比Celery简单长度
Celery和RQ都非常适合在Flask应用程序中支持后台任务,所以我可以选择更简单的RQ。不过,用Celery实现相同的功能其实也不难。如果您对Celery更有吸引力,可以阅读我的博客中的将Celery与Flask文章一起使用
02
使用RQ
RQ是一个标准的Python三方重叠,用pip
安装:
(venv) $ pip install rq
(venv) $ pip freeze > requirements.txt
正如我前面提到的,应用和RQ worker之间的通信将在Redis消息中执行,因此你需要运行Redis服务器。有很多途径来安装和运行Redis服务器,可以下载其内核并执行编译和安装。如果你使用的是Windows中,微软在此处维护了Redis的的安装程序。在Linux的上,你可以通过操作系统的软件包管理器安装Redis的。Mac OS X的用户可以运行brew install redis
,使用然后redis-server
命令手动启动服务
除了确保服务正在运行并可以识别RQ访问之外,你不需要与Redis进行其他交互
03
创建任务
一个任务,不过是一个Python函数而已。以下是一个示例任务,我将其引入一个新的app / tasks.py模块:
app / tasks.py:示例后台任务
import time
def example(seconds):
print('Starting task')
for i in range(seconds):
print(i)
time.sleep(1)
print('Task completed')
该任务将秒数作为参数,然后在该时间量内等待,并每秒打印一次计数器
04
运行 RQ 工人
任务准备就绪,可以通过rq worker
来启动一个worker进程了:
(venv) $ rq worker microblog-tasks
18:55:06 RQ worker 'rq:worker:miguelsmac.90369' started, version 0.9.1
18:55:06 Cleaning registries for queue: microblog-tasks
18:55:06
18:55:06 *** Listening on microblog-tasks...
microblog-tasks
如果您想启动多个worker来扩展量子,您只需要运行rq worker
来生成更多连接到同一个模型的进程,就可以使用Worker进程现在连接到了Redis,并在称为的上面上查看可能的分配给它的任何作业。在生产环境中,您可能希望至少运行可用的CPU数量的工人。。然后,,当作业出现在特定位置时,任何可用的worker进程都可以获取它
05
执行任务
现在打开第二个终端窗口并激活虚拟环境。我将使用shell会话来启动worker中的example()
任务:
>>> from redis import Redis
>>> import rq
>>> queue = rq.Queue('microblog-tasks', connection=Redis.from_url('redis://'))
>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.get_id()
'c651de7f-21a8-4068-afd5-8b982a6f6d32'
如果采用的是Redis服务器运行在不同的主机或端口号上,则使用RQ的Queue
类表示从应用程序端看到的任务类型。Redis
则需要使用其他URL。
队列的enqueue()
方法用于将作业添加到队列中。第一个参数是要执行的任务的名称,可直接传入函数对象或导入字符串。我发现传入字符串更加方便,因为不需要在应用程序对enqueue()
预期的任何剩余参数将被传递给worker中运行的函数。
enqueue()
只要进行了调用,运行着RQ worker的终端窗口上就会出现一些活动。你会看到example()
函数正在运行,并且连续打印一次计数器。同时,你的其他终端不会被分开,你可以继续在shell在上面的示例中,我调用job.get_id()
方法来获取分配给任务的唯一标识符。你可以尝试使用另一个有趣表达式来检查worker上的函数是否已完成:
>>> job.is_finished
False
如果你像我在上面的示例中那样传递了23
,那么函数将运行约23秒。在那之后,job.is_finished
表达式将True
转化为。就是这么简单,炫酷否?
一旦函数完成,worker又回到等待作业的状态,所以如果你想进行更多的实验,你可以用不同的参数重复执行enqueue()
调用。),但最终会被删除。这很重要,任务类别不保留已执行作业的历史记录
06
报告任务进度
通常,对于长期运行的任务,您需要将一些进度信息提供给应用程序,从而可以将其显示给用户。RQ通过使用作业对象的meta
属性来支持这一点。让我重新编写example()
任务来编写进度报告:
app / tasks.py::带进度的示例后台任务
import time
from rq import get_current_job
def example(seconds):
job = get_current_job()
print('Starting task')
for i in range(seconds):
job.meta['progress'] = 100.0 * i / seconds
job.save_meta()
print(i)
time.sleep(1)
job.meta['progress'] = 100
job.save_meta()
print('Task completed')
这个新版本的example()
使用RQ的get_current_job()
函数来获取一个作业实例,该实例与提交任务时返回给应用程序的实例类似。作业对象的meta
属性是一个字典,任务可以编写任何想要的与应用程序通信的自定义数据。在此示例中,我编写了progress
,表示完成任务的百分比。每次进程更新时,我都调用job.save_meta()
指示RQ将数据写入Redis,应用程序可以在其中找到它。
在应用程序方面(目前只是一个Python shell),我可以运行此任务,然后监视进度,如下所示:
>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.meta
{}
>>> job.refresh()
>>> job.meta
{'progress': 13.043478260869565}
>>> job.refresh()
>>> job.meta
{'progress': 69.56521739130434}
>>> job.refresh()
>>> job.meta
{'progress': 100}
>>> job.is_finished
True
如您所见,在另一侧,meta
属性可以被重新读。需要调用refresh()