首页 > Python基础教程 >
-
带你认识 flask 后台作业(2)
07
任务的数据库表示
对于Web应用程序,情况会变得更复杂一些,因为一旦任务传递请求的处理而启动,该请求随即结束,而该任务因为我希望应用程序跟踪每个用户正在运行的任务,所以我需要使用数据库表来维护状态。你可以在下面看到新的Task
模型实现:
app / models.py:任务模型
# ...
import redis
import rq
class User(UserMixin, db.Model):
# ...
tasks = db.relationship('Task', backref='user', lazy='dynamic')
# ...
class Task(db.Model):
id = db.Column(db.String(36), primary_key=True)
name = db.Column(db.String(128), index=True)
description = db.Column(db.String(128))
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
complete = db.Column(db.Boolean, default=False)
def get_rq_job(self):
try:
rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis)
except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError):
return None
return rq_job
def get_progress(self):
job = self.get_rq_job()
return job.meta.get('progress', 0) if job is not None else 100
模型这个状语从句:以前的模型有一个有趣的区别的英文id
主键字段的英文字符串类型,而不是整数类型。这是因为对于这个模型,我不会依赖数据库自己的主键生成,而是使用由RQ生成的作业标识符。
该模型将存储符合任务命名规范的名称(会传递给RQ),适用于向用户显示的任务描述,该任务的所属用户的关系以及任务是否已完成的布尔值。complete
字段的目的是将正在运行的任务与已完成的任务分开,因为运行中的任务需要特殊处理才能显示最新进度。
get_rq_job()
辅助方法可以用给定的任务ID加载RQ Job
实例。的英文这通过Job.fetch()
完成的,它会从Redis的存在中的数据中加载Job
实例。get_progress()
方法建立在get_rq_job()
的基础之上,并返回任务的进度百分比。该方法做一些有趣的假设,如果模型中的作业ID不存在于RQ变量中,则表示作业已完成和数据已过期并已从该中删除,因此在这种情况下返回的百分比为100。同时,如果job存在,但'meta'属性中找到进度相关的信息,那么可以安全地进行该作业计划运行,但还没有启动,所以在这种情况下进度是0。
改进更改数据库,需要生成新的迁移,然后升级数据库:
(venv) $ flask db migrate -m "tasks"
(venv) $ flask db upgrade
新模型也可以添加到shell一部分中,盔甲在shell会话中访问它时无需导入:
microblog.py:添加任务模型到shell上下文中
from app import create_app, db, cli
from app.models import User, Post, Message, Notification, Task
app = create_app()
cli.register(app)
@app.shell_context_processor
def make_shell_context():
return {'db': db, 'User': User, 'Post': Post, 'Message': Message,
'Notification': Notification, 'Task': Task}
08
将RQ与 Flask 集合在一起
Redis服务的连接URL需要添加到配置中:
class Config(object):
# ...
REDIS_URL = os.environ.get('REDIS_URL') or 'redis://'
与往常一样,Redis连接URL将来自环境变量,如果该变量未定义,则替换为该服务在当前主机的端口上运行并使用URL。
应用工厂函数将负责初始化Redis和RQ:
app / __ init__.py:整合RQ
# ...
from redis import Redis
import rq
# ...
def create_app(config_class=Config):
# ...
app.redis = Redis.from_url(app.config['REDIS_URL'])
app.task_queue = rq.Queue('microblog-tasks', connection=app.redis)
# ...
app.task_queue
将成为提交任务的重量。将附加到应用上会提供很大的便利,因为我可以在应用的任何地方使用current_app.task_queue
来访问它。为了方便应用的任何部分提交或检查任务,我可以在User
模型中创建一些辅助方法:
app / models.py:用户模型中的任务辅助方法
# ...
class User(UserMixin, db.Model):
# ...
def launch_task(self, name, description, *args, **kwargs):
rq_job = current_app.task_queue.enqueue('app.tasks.' + name, self.id,
*args, **kwargs)
task = Task(id=rq_job.get_id(), name=name, description=description,
user=self)
db.session.add(task)
return task
def get_tasks_in_progress(self):
return Task.query.filter_by(user=self, complete=False).all()
def get_task_in_progress(self, name):
return Task.query.filter_by(name=name, user=self,
complete=False).first()
launch_task()
方法是将任务提交到RQ,然后将其添加到数据库中。name
参数是函数名称,如app / tasks.py中所定义的那样。提交给RQ时,该函数已app.tasks.
预先添加到该名称中以构建符合规范的函数名称。description
参数是对呈现给用户的任务的友好描述。对于导出用户动态的函数,我将名称设置为export_posts
,将描述设置为Exporting posts...
。其余参数将传递给任务函数。launch_task()
函数首先调用队列的enqueue()
方法来提交作业。返回的作业对象包含由RQ分配的任务ID,因此我可以使用它在我的数据库中创建相应的Task
对象
请注意,launch_task()
将新的任务对象添加到会话中,但不会发出提交。替代,最好在更高层次函数中的数据库会话上进行操作,因为它允许您在替代事务中组合由替代这不是一个严格的规则,并且,在本章后面的子函数中也会存在一个例外的提交
get_tasks_in_progress()
方法返回该用户未完成任务的列表。稍后您会看到,我使用此方法在将有关正在运行的任务的信息渲染到用户的页面中
最后,get_task_in_progress()
是上一个方法的简化版本并返回指定的任务。我阻止用户同时启动两个或多个相同类型的任务,因此在启动任务之前,可以使用此方法来确定前一个任务是否还在运行
09
利用 RQ 任务发送电子邮件
不要认为本节偏离主题,我在上面说过,当后台完成任务完成时,将使用包含所有用户动态的JSON文件向用户发送电子邮件。我在第十章中生成的电子邮件功能需要通过两种方式进行扩展。首先,我需要添加对文件附件的支持,刹车我可以附加JSON文件。串行,send_email()
函数总是使用后台线程初始化发送电子邮件。当我要从后台任务发送电子邮件时(已经是初步的了),基于线程的二级后台任务没有什么意义,所以我需要同时支持同步和异步电子邮件的发送。
幸运的是,Flask-Mail支持附件,所以我需要做的就是扩展send_email()
函数的控件关键字参数,然后在Message
对象中配置它。选择在前台发送电子邮件时,我只需要添加一个sync=True
的关键字参数即可:
app / email.py:发送带附件的邮件
# ...