VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > Python基础教程 >
  • 带你认识 flask 后台作业(3)

def send_email(subject, sender, recipients, text_body, html_body, attachments=None, sync=False): msg = Message(subject, sender=sender, recipients=recipients) msg.body = text_body msg.html = html_body if attachments: for attachment in attachments: msg.attach(*attachment) if sync: mail.send(msg) else: Thread(target=send_async_email, args=(current_app._get_current_object(), msg)).start()

消息类的attach()方法接受三个定义附件的参数:文件名,媒体类型和实际文件数据。文件名就是收件人看到的与附件关联的名称。媒体类型定义了这种附件的类型,这有助于电子邮件读者适当地渲染它。例如,如果您发送为image/png媒体类型,则电子邮件阅读器会知道该附件是一个图像,在这种情况下,它可以显示它。对于用户动态数据文件,我将使用JSON格式,该格式使用application/json媒体类型。最后一个参数包含附件内容的字符串或字节序列。

简单来说,send_email()attachments参数将成为一个元组列表,每个元组将有三个元素对应于attach()的三个参数。因此,我需要转换列表中的每个元素作为参数发送给attach()。在Python中,如果你想将列表或元组中的每个元素作为参数传递给函数,你可以使用func(*args)将这个列表或元祖解包成函数中的多个参数,而不必枯燥地一个个地传递,如func(args[0], args[1], args[2])。例如,如果如果没有,调用将会引发一个参数,即列表。你有一个列表args = [1, 'foo']func(*args)将会传递两个参数,就和你调用func(1, 'foo')一样。*args

如电子邮件的同步发送,我需要做的就是,当syncTrue的时候恢复成调用mail.send(msg)

10

任务助手

 

尽管我上面使用的example()任务是一个简单的独立函数,但已添加用户动态的函数却需要应用中具有的某些功能,例如访问数据库和发送电子邮件。因为这将在单独的进程中运行,所以我需要初始化Flask-SQLAlchemy和Flask-Mail,而Flask-Mail又需要Flask应用程序实例以从中获取它们的配置。因此,我将在app / tasks.py模块的顶部添加Flask应用程序实例和应用程序:

app / tasks.py:创建应用及其自身

  •  
  •  
  •  
  •  
from app import create_app
app = create_app()app.app_context().push()

 

当使用flask命令时,根目录中的microblog.py模块创建应用实例,但RQ worker实际上却一无所知,所以当任务函数时,应用程序在此模块中创建,因为这是RQ worker要导入的唯一模块。你已经在好几个地方app.app_context()看到了方法,按下一个使其使应用成为“当前”的应用实例,这样一来Flask-SQLAlchemy等插件才可以使用current_app.config获取它们的配置。。根本没有,current_app表达式会返回一个错误。

然后我开始考虑如何在这个函数运行时报告进度。另外通过job.meta字典传递进度信息之外,我还想将通知推送给客户端,刹车自动动态更新完成百分比。逐步,我将使用我在第二十一章中生成的通知机制。更新将以与未读消息徽章非常类似的方式工作。当服务器渲染模板时,则包含从job.meta获得的“静态”进度信息,但一旦页面置于客户端的浏览器中,通知将使用通知来动态更新百分比。由于通知的原因,更新正在运行的任务的进度将比上一个示例中的操作稍微多一些,所以我将创建一个专用于更新进度的包装函数:

app / tasks.py:设置任务进度

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
from rq import get_current_jobfrom app import dbfrom app.models import Task
# ...
def _set_task_progress(progress):    job = get_current_job()    if job:        job.meta['progress'] = progress        job.save_meta()        task = Task.query.get(job.get_id())        task.user.add_notification('task_progress', {'task_id': job.get_id(),                                                     'progress': progress})        if progress >= 100:            task.complete = True        db.session.commit()

 

任务导出可以调用_set_task_progress()来记录进度百分比。函数该首先将百分比写入job.meta字典搜索并将其保存到Redis的,然后从数据库加载相应的任务对象,使用并task.user已有的add_notification()方法将通知推送给请求该任务的用户。通知将被命名为task_progress,并且伴随关联的数据将成为具有两个关联的字典:任务标识符和进度数值。稍后我将添加JavaScript代码来处理这种新的通知类型

该函数查看进度来确认任务函数是否已完成,并在这种情况下下更新数据库中任务对象的complete属性。数据库提交调用通过add_notification()添加的任务和通知对象都立即保存到数据库。任务,确保不执行任何数据库更改,因为执行本次调用父父的更改也写入数据库

11

实现导出任务

 

现在所有的准备工作已经完成,可以开始编写导出函数了。这个函数的高层结构如下:

app / tasks.py:导出用户动态通用结构

  •  
  •  
  •  
  •  
  •  
  •  
def export_posts(user_id):    try:        # read user posts from database        # send email with data to user    except:        # handle unexpected errors

请求处理器中的应用程序可以防止意外错误,因为Flask自身捕获异常,然后将其整个任务包装在try / except中。将运行在由RQ控制的单独前进中,而不是烧瓶,因此如果发生任何意外错误,任务将中止,RQ将向控制台显示错误,然后返回等待新的作业。worker的输出或将其记录到文件中,否则将永远不会发现有错误。

让我们从上面带有注释的三部分中最简单的错误处理部分开始梳理:

app / tasks.py:更新用户动态错误处理

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
import sys# ...
def export_posts(user_id):    try:        # ...    except:        _set_task_progress(100)        app.logger.error('Unhandled exception', exc_info=sys.exc_info())

发生意外错误时,我将通过将进度设置为100%来将任务标记为完成,然后使用Flask应用程序中的日志记录器对象记录错误以及如何跟踪信息(调用sys.exc_info()来获得)。记录器来记录错误的好处在于,你可以观察到你为瓶应用实现的任何日志记录机制。例如,在第七章中,我配置了要发送到管理员电子邮件地址的错误。只要使用app.logger,我也可以得到这些错误信息

接下来,我将编写实际的起始代码,它只需发出一个数据库查询并在循环中遍历结果,随之而来的累积在字典中:

app / tasks.py:从数据库读取用户动态

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
import timefrom app.models import User, Post
# ...
def export_posts(user_id):    try:        user = User.query.get(user_id)        _set_task_progress(0)        data = []        i = 0        total_posts = user.posts.count()        for post in user.posts.order_by(Post.timestamp.asc()):            data.append({'body': post.body,                         'timestamp': post.timestamp.isoformat() + 'Z'})            time.sleep(5)            i += 1            _set_task_progress(100 * i // total_posts)
        # send email with data to user    except:        # ...

 

时间格式将采用ISO 8601标准。我使用的Python的datetime对象不存储时区,因此在以ISO格式导出时间后,我添加了'Z',它表示UTC

我维护了一个计数器i,并且在进入循环之前还需要发出一个额外的数据库查询,查询total_posts导致用户动态的总数。使用了itotal_posts,在每个循环迭代我都可以使用从0到100的数字来更新任务进度

您可能会好奇我为什么会在每个循环time.sleep(5)迭代中加入调用。最终是我想要延长增量所需的时间,刹车在用户动态不多的情况下也可以方便地查看到逐步进度的增长

下面是函数的最后部分,将会带上data附件发送邮件给用户:

app / tasks.py:发送带用户动态的邮件给用户


相关教程