VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > Python基础教程 >
  • 带你认识 flask 后台作业(2)

方法来从Redis更新内容

 

07

任务的数据库表示

 

对于Web应用程序,情况会变得更复杂一些,因为一旦任务传递请求的处理而启动,该请求随即结束,而该任务因为我希望应用程序跟踪每个用户正在运行的任务,所以我需要使用数据库表来维护状态。你可以在下面看到新的Task模型实现:

app / models.py:任务模型

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
# ...import redisimport rq
class User(UserMixin, db.Model):    # ...    tasks = db.relationship('Task', backref='user', lazy='dynamic')
# ...
class Task(db.Model):    id = db.Column(db.String(36), primary_key=True)    name = db.Column(db.String(128), index=True)    description = db.Column(db.String(128))    user_id = db.Column(db.Integer, db.ForeignKey('user.id'))    complete = db.Column(db.Boolean, default=False)
    def get_rq_job(self):        try:            rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis)        except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError):            return None        return rq_job
    def get_progress(self):        job = self.get_rq_job()        return job.meta.get('progress', 0) if job is not None else 100

 

模型这个状语从句:以前的模型有一个有趣的区别的英文id主键字段的英文字符串类型,而不是整数类型。这是因为对于这个模型,我不会依赖数据库自己的主键生成,而是使用由RQ生成的作业标识符。

该模型将存储符合任务命名规范的名称(会传递给RQ),适用于向用户显示的任务描述,该任务的所属用户的关系以及任务是否已完成的布尔值。complete字段的目的是将正在运行的任务与已完成的任务分开,因为运行中的任务需要特殊处理才能显示最新进度。

get_rq_job()辅助方法可以用给定的任务ID加载RQ Job实例。的英文这通过Job.fetch()完成的,它会从Redis的存在中的数据中加载Job实例。get_progress()方法建立在get_rq_job()的基础之上,并返回任务的进度百分比。该方法做一些有趣的假设,如果模型中的作业ID不存在于RQ变量中,则表示作业已完成和数据已过期并已从该中删除,因此在这种情况下返回的百分比为100。同时,如果job存在,但'meta'属性中找到进度相关的信息,那么可以安全地进行该作业计划运行,但还没有启动,所以在这种情况下进度是0。

改进更改数据库,需要生成新的迁移,然后升级数据库:

  •  
  •  
(venv) $ flask db migrate -m "tasks"(venv) $ flask db upgrade

 

新模型也可以添加到shell一部分中,盔甲在shell会话中访问它时无需导入:

microblog.py:添加任务模型到shell上下文中

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
from app import create_app, db, clifrom app.models import User, Post, Message, Notification, Task
app = create_app()cli.register(app)
@app.shell_context_processordef make_shell_context():    return {'db': db, 'User': User, 'Post': Post, 'Message': Message,            'Notification': Notification, 'Task': Task}

 

08

将RQ与 Flask 集合在一起

 

Redis服务的连接URL需要添加到配置中:

  •  
  •  
  •  
class Config(object):    # ...    REDIS_URL = os.environ.get('REDIS_URL') or 'redis://'

 

与往常一样,Redis连接URL将来自环境变量,如果该变量未定义,则替换为该服务在当前主机的端口上运行并使用URL。

应用工厂函数将负责初始化Redis和RQ:

app / __ init__.py:整合RQ

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
# ...from redis import Redisimport rq
# ...
def create_app(config_class=Config):    # ...    app.redis = Redis.from_url(app.config['REDIS_URL'])    app.task_queue = rq.Queue('microblog-tasks', connection=app.redis)
    # ...

app.task_queue将成为提交任务的重量。将附加到应用上会提供很大的便利,因为我可以在应用的任何地方使用current_app.task_queue来访问它。为了方便应用的任何部分提交或检查任务,我可以在User模型中创建一些辅助方法:

app / models.py:用户模型中的任务辅助方法

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
# ...
class User(UserMixin, db.Model):    # ...
    def launch_task(self, name, description, *args, **kwargs):        rq_job = current_app.task_queue.enqueue('app.tasks.' + name, self.id,                                                *args, **kwargs)        task = Task(id=rq_job.get_id(), name=name, description=description,                    user=self)        db.session.add(task)        return task
    def get_tasks_in_progress(self):        return Task.query.filter_by(user=self, complete=False).all()
    def get_task_in_progress(self, name):        return Task.query.filter_by(name=name, user=self,                                    complete=False).first()

 

launch_task()方法是将任务提交到RQ,然后将其添加到数据库中。name参数是函数名称,如app / tasks.py中所定义的那样。提交给RQ时,该函数已app.tasks.预先添加到该名称中以构建符合规范的函数名称。description参数是对呈现给用户的任务的友好描述。对于导出用户动态的函数,我将名称设置为export_posts,将描述设置为Exporting posts...。其余参数将传递给任务函数。launch_task()函数首先调用队列的enqueue()方法来提交作业。返回的作业对象包含由RQ分配的任务ID,因此我可以使用它在我的数据库中创建相应的Task对象

请注意,launch_task()将新的任务对象添加到会话中,但不会发出提交。替代,最好在更高层次函数中的数据库会话上进行操作,因为它允许您在替代事务中组合由替代这不是一个严格的规则,并且,在本章后面的子函数中也会存在一个例外的提交

get_tasks_in_progress()方法返回该用户未完成任务的列表。稍后您会看到,我使用此方法在将有关正在运行的任务的信息渲染到用户的页面中

最后,get_task_in_progress()是上一个方法的简化版本并返回指定的任务。我阻止用户同时启动两个或多个相同类型的任务,因此在启动任务之前,可以使用此方法来确定前一个任务是否还在运行

09

利用 RQ 任务发送电子邮件

 

不要认为本节偏离主题,我在上面说过,当后台完成任务完成时,将使用包含所有用户动态的JSON文件向用户发送电子邮件。我在第十章中生成的电子邮件功能需要通过两种方式进行扩展。首先,我需要添加对文件附件的支持,刹车我可以附加JSON文件。串行,send_email()函数总是使用后台线程初始化发送电子邮件。当我要从后台任务发送电子邮件时(已经是初步的了),基于线程的二级后台任务没有什么意义,所以我需要同时支持同步和异步电子邮件的发送。

幸运的是,Flask-Mail支持附件,所以我需要做的就是扩展send_email()函数的控件关键字参数,然后在Message对象中配置它。选择在前台发送电子邮件时,我只需要添加一个sync=True的关键字参数即可:

app / email.py:发送带附件的邮件

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
# ...