首页 > Python基础教程 >
-
带你认识 flask 后台作业(3)
attachments=None, sync=False):
msg = Message(subject, sender=sender, recipients=recipients)
msg.body = text_body
msg.html = html_body
if attachments:
for attachment in attachments:
msg.attach(*attachment)
if sync:
mail.send(msg)
else:
Thread(target=send_async_email,
args=(current_app._get_current_object(), msg)).start()
消息类的attach()
方法接受三个定义附件的参数:文件名,媒体类型和实际文件数据。文件名就是收件人看到的与附件关联的名称。媒体类型定义了这种附件的类型,这有助于电子邮件读者适当地渲染它。例如,如果您发送为image/png
媒体类型,则电子邮件阅读器会知道该附件是一个图像,在这种情况下,它可以显示它。对于用户动态数据文件,我将使用JSON格式,该格式使用application/json
媒体类型。最后一个参数包含附件内容的字符串或字节序列。
简单来说,send_email()
的attachments
参数将成为一个元组列表,每个元组将有三个元素对应于attach()
的三个参数。因此,我需要转换列表中的每个元素作为参数发送给attach()
。在Python中,如果你想将列表或元组中的每个元素作为参数传递给函数,你可以使用func(*args)
将这个列表或元祖解包成函数中的多个参数,而不必枯燥地一个个地传递,如func(args[0], args[1], args[2])
。例如,如果如果没有,调用将会引发一个参数,即列表。你有一个列表args = [1, 'foo']
,func(*args)
将会传递两个参数,就和你调用func(1, 'foo')
一样。*
args
如电子邮件的同步发送,我需要做的就是,当sync
是True
的时候恢复成调用mail.send(msg)
10
任务助手
尽管我上面使用的example()
任务是一个简单的独立函数,但已添加用户动态的函数却需要应用中具有的某些功能,例如访问数据库和发送电子邮件。因为这将在单独的进程中运行,所以我需要初始化Flask-SQLAlchemy和Flask-Mail,而Flask-Mail又需要Flask应用程序实例以从中获取它们的配置。因此,我将在app / tasks.py模块的顶部添加Flask应用程序实例和应用程序:
app / tasks.py:创建应用及其自身
from app import create_app
app = create_app()
app.app_context().push()
当使用flask
命令时,根目录中的microblog.py模块创建应用实例,但RQ worker实际上却一无所知,所以当任务函数时,应用程序在此模块中创建,因为这是RQ worker要导入的唯一模块。你已经在好几个地方app.app_context()
看到了方法,按下一个使其使应用成为“当前”的应用实例,这样一来Flask-SQLAlchemy等插件才可以使用current_app.config
获取它们的配置。。根本没有,current_app
表达式会返回一个错误。
然后我开始考虑如何在这个函数运行时报告进度。另外通过job.meta
字典传递进度信息之外,我还想将通知推送给客户端,刹车自动动态更新完成百分比。逐步,我将使用我在第二十一章中生成的通知机制。更新将以与未读消息徽章非常类似的方式工作。当服务器渲染模板时,则包含从job.meta
获得的“静态”进度信息,但一旦页面置于客户端的浏览器中,通知将使用通知来动态更新百分比。由于通知的原因,更新正在运行的任务的进度将比上一个示例中的操作稍微多一些,所以我将创建一个专用于更新进度的包装函数:
app / tasks.py:设置任务进度
from rq import get_current_job
from app import db
from app.models import Task
# ...
def _set_task_progress(progress):
job = get_current_job()
if job:
job.meta['progress'] = progress
job.save_meta()
task = Task.query.get(job.get_id())
task.user.add_notification('task_progress', {'task_id': job.get_id(),
'progress': progress})
if progress >= 100:
task.complete = True
db.session.commit()
任务导出可以调用_set_task_progress()
来记录进度百分比。函数该首先将百分比写入job.meta
字典搜索并将其保存到Redis的,然后从数据库加载相应的任务对象,使用并task.user
已有的add_notification()
方法将通知推送给请求该任务的用户。通知将被命名为task_progress
,并且伴随关联的数据将成为具有两个关联的字典:任务标识符和进度数值。稍后我将添加JavaScript代码来处理这种新的通知类型
该函数查看进度来确认任务函数是否已完成,并在这种情况下下更新数据库中任务对象的complete
属性。数据库提交调用通过add_notification()
添加的任务和通知对象都立即保存到数据库。任务,确保不执行任何数据库更改,因为执行本次调用父父的更改也写入数据库
11
实现导出任务
现在所有的准备工作已经完成,可以开始编写导出函数了。这个函数的高层结构如下:
app / tasks.py:导出用户动态通用结构
def export_posts(user_id):
try:
# read user posts from database
# send email with data to user
except:
# handle unexpected errors
请求处理器中的应用程序可以防止意外错误,因为Flask自身捕获异常,然后将其整个任务包装在try / except中。将运行在由RQ控制的单独前进中,而不是烧瓶,因此如果发生任何意外错误,任务将中止,RQ将向控制台显示错误,然后返回等待新的作业。worker的输出或将其记录到文件中,否则将永远不会发现有错误。
让我们从上面带有注释的三部分中最简单的错误处理部分开始梳理:
app / tasks.py:更新用户动态错误处理
import sys
# ...
def export_posts(user_id):
try:
# ...
except:
_set_task_progress(100)
app.logger.error('Unhandled exception', exc_info=sys.exc_info())
发生意外错误时,我将通过将进度设置为100%来将任务标记为完成,然后使用Flask应用程序中的日志记录器对象记录错误以及如何跟踪信息(调用sys.exc_info()
来获得)。记录器来记录错误的好处在于,你可以观察到你为瓶应用实现的任何日志记录机制。例如,在第七章中,我配置了要发送到管理员电子邮件地址的错误。只要使用app.logger
,我也可以得到这些错误信息
接下来,我将编写实际的起始代码,它只需发出一个数据库查询并在循环中遍历结果,随之而来的累积在字典中:
app / tasks.py:从数据库读取用户动态
import time
from app.models import User, Post
# ...
def export_posts(user_id):
try:
user = User.query.get(user_id)
_set_task_progress(0)
data = []
i = 0
total_posts = user.posts.count()
for post in user.posts.order_by(Post.timestamp.asc()):
data.append({'body': post.body,
'timestamp': post.timestamp.isoformat() + 'Z'})
time.sleep(5)
i += 1
_set_task_progress(100 * i // total_posts)
# send email with data to user
except:
# ...
时间格式将采用ISO 8601标准。我使用的Python的datetime
对象不存储时区,因此在以ISO格式导出时间后,我添加了'Z',它表示UTC
我维护了一个计数器i
,并且在进入循环之前还需要发出一个额外的数据库查询,查询total_posts
导致用户动态的总数。使用了i
和total_posts
,在每个循环迭代我都可以使用从0到100的数字来更新任务进度
您可能会好奇我为什么会在每个循环time.sleep(5)
迭代中加入调用。最终是我想要延长增量所需的时间,刹车在用户动态不多的情况下也可以方便地查看到逐步进度的增长
下面是函数的最后部分,将会带上data
附件发送邮件给用户:
app / tasks.py:发送带用户动态的邮件给用户