有一些很酷的API,比如打开通知我们可以编程地访问国际空间站的位置,以确定它何时经过特定的位置,而使用Twilio SendGrid,我们可以在发生这种情况时发送电子邮件通知。
让我们介绍一下如何在Python中使用排定电子邮件的Redis队列 .
先决条件和依赖性
在继续前进之前,请确保有以下内容:
- Python 3安装在你的机器上
- 免费的SendGrid帐户
- 测试此项目的电子邮件地址
这是你可以遵循的指南如果您要使用Python进行更多的Web开发,并且不熟悉诸如虚拟环境之类的东西,则需要设置您的开发环境。
在编写代码之前,您需要安装一些依赖项:
- 这个SendGrid Python librarY用于发送电子邮件
- RQ调度器-构建在另一个工具之上的轻量级、优雅的解决方案,该工具具有较低的进入门槛,称为Redis队列。
- 请求-用于发出HTTP请求
请确保创建并激活虚拟环境,然后使用以下命令安装这些环境:
pip install sendgrid rq-scheduler==0.11.0 requests==2.26.0
RQ和RedisPython模块将作为依赖关系安装RQScheduler。为了让RQ工作,您还需要安装Redis在你的机器上。可以使用以下命令执行以下操作:wget :
wget https://download.redis.io/releases/redis-6.2.6.tar.gz
tar xzf redis-6.2.6.tar.gz
cd redis-6.2.6
make
使用命令在默认端口上的单独终端窗口中运行Redissrc/redis-server从安装它的目录。
进入国际空间站的位置
让我们首先编写代码,为给定的坐标集调用OpenNotificationAPI,并打印下一次ISS将沿着该纬度和经度飞行的时间。
创建一个名为iss.py(“国际空间站”模块)在您希望使用代码的目录中,并添加以下功能:
from datetime import datetime
import pytz
import requests
ISS_URL = 'http://api.open-notify.org/iss-pass.json'
def get_next_pass(lat, lon):
location = { 'lat': lat, 'lon': lon }
response = requests.get(ISS_URL, params=location).json()
if 'response' in response:
next_pass = response['response'][0]['risetime']
next_pass_datetime = datetime.fromtimestamp(next_pass, tz=pytz.utc)
print('Next pass for {}, {} is: {}'
.format(lat, lon, next_pass_datetime))
return next_pass_datetime
else:
print('No ISS flyby can be determined for {}, {}'.format(lat, lon))
这个get_next_pass函数在此代码中将向具有给定纬度和经度的OpenNotificationAPI发出请求,检查是否有有效的响应,然后将从API收到的时间戳转换为PythondateTime对象,并打印下一个ISS将在空中飞行的相应时间。
要测试这段代码,请打开Pythonshell并运行以下两行。在这个例子中,我们将使用旧金山的Twilio总部作为我们的测试位置(纬度:37.788052,经度:-122.391472):
from iss import get_next_pass
get_next_pass(37.788052, -122.391472)
你应该看到这样的东西:Next pass for 37.788052, -122.391472 is: 2021-12-09 23:58:11+00:00有一个合适的时间戳。
现在我们可以继续编写代码发送电子邮件了。
注册sendGrid并创建一个API密钥
创建SendGrid帐户,您可以为本教程选择免费层。一旦你有了账户,你就需要创建API密钥从这张截图中可以看到。您可以将它命名为您想要的任何名称,但是一旦创建了它,请确保在继续之前保存它!
保存此API键的一个好方法是将其设置为可以从Python代码中访问的环境变量,以避免在代码中直接编写它。设置SENDGRID_API_KEY环境变量是SendGrid帐户中的API密钥。不过,在其他地方做笔记也没什么坏处,因为你不能再看一遍了。这是一个如果需要帮助设置环境变量,请提供有用的教程。。稍后我们将使用这个API密钥。
用Python发送电子邮件
现在您有了SendGrid帐户和API密钥,您可以更新iss.py若要包含发送电子邮件的代码:
from datetime import datetime
import os
import pytz
import requests
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail
ISS_URL = 'http://api.open-notify.org/iss-pass.json'
def send_email(from_email, to_email, body):
message = Mail(
from_email=from_email,
to_emails=to_email,
subject='International Space Station passing by!',
html_content=body)
sg = SendGridAPIClient(os.environ.get('SENDGRID_API_KEY'))
response = sg.send(message)
print(response.status_code, response.body, response.headers)
def get_next_pass(lat, lon):
location = { 'lat': lat, 'lon': lon }
response = requests.get(ISS_URL, params=location).json()
if 'response' in response:
next_pass = response['response'][0]['risetime']
next_pass_datetime = datetime.fromtimestamp(next_pass, tz=pytz.utc)
print('Next pass for {}, {} is: {}'
.format(lat, lon, next_pass_datetime))
return next_pass_datetime
else:
print('No ISS flyby can be determined for {}, {}'.format(lat, lon))
请记住,在尝试运行此代码之前,要确保设置了SendGridAPI密钥环境变量。
请注意,在生产应用程序中,建议验证 发送者身份 通过完成 域认证 ...发件人身份代表您的“从”电子邮件地址-您的收件人视为您的电子邮件发件人的地址。有关此问题的一步一步的教程,请查看: 如何为Twilio SendGrid设置域认证 .
如果要测试此代码,请打开Pythonshell并运行以下代码,替换to_email与您自己的电子邮件地址的争论:
from iss import send_email
send_email('from_email@example.com', 'your_email@example.com', 'Look up!')
您应该收到一封电子邮件,告诉您在运行此代码后要查找。
用RQ调度器调度任务
现在我们有了一个为我们提供日期时间的函数,还有一个发送电子邮件的函数,我们可以使用RQScheduler。创建另一个名为schedule_notification.py,并向其添加以下代码:
from datetime import datetime
from redis import Redis
from rq_scheduler import Scheduler
import iss
scheduler = Scheduler(connection=Redis()) # Get a scheduler for the "default" queue
# Change these latitude and longitude values for any location you want.
next_pass = iss.get_next_pass(37.788052, -122.391472)
if next_pass:
scheduler.enqueue_at(next_pass, iss.send_email,
'from_email@example.com', 'your_email@example.com',
'Look up! The ISS is flying above you!')
这只是一个快速的脚本,调用您编写的其他功能,一个是了解国际空间站何时经过您的位置下,另一个将向您发送电子邮件。在本例中,我使用了旧金山Twilio办公室的坐标,但您可以将纬度和经度更改为您所在的任何位置。
在能够运行此代码之前,必须确保在其他终端窗口或后台进程中运行Redis服务器、RQ工作者和RQ Scheduler进程。您应该已经通过使用命令运行Redis服务器了。src/redis-server从安装Redis的目录。再打开两个终端窗口,在这两个窗口中导航到代码所在的目录,并激活该项目的虚拟环境。在一个窗口中运行命令rqworker在另一个命令中运行命令rqscheduler .
这样做之后,您应该准备好运行代码来调度通知:
python schedule_notification.py
现在你要做的就是等..。
穿越时间
这很好,但是如果您不想等着看您的代码是否工作,这是可以理解的。如果你想即时满足,我们可以使用时间旅行的方法。在基于unix的系统上,可以使用日期命令。
如果空间站预定在2019年12月5日4:02飞过,那么你可以在linux上运行。date -s "12/05/2019 03:02:00"。在OSX上你会运行date 1205160219(您甚至可以使用-u参数如果要使用UTC时区,该时区对应于Python代码正在打印的日期时间)。如果所有这些都失败了,也有GUI选项来改变您的计算机在大多数操作系统上的时间。
在OSX上,您可以通过在系统首选项中打开“日期和时间”来设置(并重置)此选项。
如果您希望在ISS每次经过时而不是仅收到一次通知,则可以在每条消息之后安排另一次通知,方法是修改send_email在iss.py并添加适当的导入语句:
from redis import Redis
from rq_scheduler import Scheduler
scheduler = Scheduler(connection=Redis()) # Get a scheduler for the "default" queue
def send_email(from_email, to_email, body):
message = Mail(
from_email=from_email,
to_emails=to_email,
subject='International Space Station passing by!',
html_content=body)
sg = SendGridAPIClient(os.environ.get('SENDGRID_API_KEY'))
response = sg.send(message)
print(response.status_code, response.body, response.headers)
scheduler.enqueue_at(next_pass, iss.send_email,
'from_email@example.com', 'your_email@example.com',
'Look up! The ISS is flying above you!')
无限与超越
现在您可以在国际空间站经过时接收电子邮件,您可以使用RQ Scheduler来满足所有Python调度需求。可能性是无限的。
原文 Https://www.twilio.com/blog/scheduling-international-space-station-emails-in-python-with-sendgrid-and-redis-queue
本文暂时没有评论,来添加一个吧(●'◡'●)