当前位置: 首页 > 工具软件 > Redis Queue > 使用案例 >

python任务队列 http_Flask例子-实现Redis Task Queue(任务队列)

岳景明
2023-12-01

Python部落组织翻译,禁止转载,欢迎转发

在本文中,我们加入基础的 Redis task queue来进行文本处理。

有许多的工具也能够实现,例如ReTask和HotQueue。我们将使用的是Python RQ。它是一个建立在Redis之上用来创建task queue(任务队列)的简单的库,并且容易设置和实现。

请牢记,我们是这样创建的:一个基于给定的URL中的文本用来统计单词的分布对的Flask应用。这是一个完整的教程。

1、 第一部分:设置本地开发环境,接着在Heroku云平台上部署staging环境和production环境。

2、 第二部分:设置一个带有SQLAlche和Alembic的PostgreSQL数据库来处理迁移。

3、 第三部分:加入后端逻辑到scrape,然后使用requests,BeautifulSoup和Natural Language Toolkit(NLTK)库来计算处理从页面中得到的单词。

4、 第四部分:实现一个Redis task queue 来进行文本处理。(本文描述的)

5、 第五部分:安装Angular前端框架来不断查看后端框架请求是否完成。

6、 第六部分:启动在Heroku云平台上的staging服务——设置Redis,详细介绍如何在单个Dyno上运行两个进程(web和worker)。(ps:在Heroku云平台上,对于每一个进程采用一个叫Dyno的单位来进行性能管理,增加这个值,则能提高应用的相应速度和吞吐量。有适用于Web server的Web dyno和作为后台进程的Worker dyno。)

7、 第七部分:更新前端使其用户界面更友好。

8、 第八部分:将D3库添加到混合频率分布图和柱状图。

需要代码吗?可以到repo(https://github.com/realpython/flask-by-example/releases)上抓取。

安装要求

工具:Redis——http://redis.io/

Python Redis —— https://pypi.python.org/pypi/redis/2.10.5

RQ(Redis Queue)——http://python-rq.org/

首先从http://redis.io/下载和安装Redis(或者使用Homebrew命令——brew install redis),然后开启Redis服务。

$ redis-server

接下来在一个新的终端窗口安装Python Redis 库和RQ库。

$ workon wordcounts

$ pip install redis rq

$ pip freeze > requirements.txt

设置Worker

接下来我们开始创建一个worker进程来监听queued tasks(队列任务)。

import os

import redis

from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':

with Connection(conn):

worker = Worker(list(map(Queue, listen)))

worker.work()

保存以上文本为worker.py。在这段代码中,我们监听命名为default的queue(队列)并且与我们的Redis服务在localhost:6379 建立连接。

开启另一个终端窗口:

$ workon wordcounts

$ python worker.py

17:01:29 RQ worker started, version 0.4.6

17:01:29

17:01:29 *** Listening on default...

现在我们需要更新我们的app.py来向queue(队列)发送任务。

更新app.py

将下面一些模块import到app.py

from rq import Queue

from rq.job import Job

from worker import conn

然后更新配置部分:

app = Flask(__name__)

app.config.from_object(os.environ['APP_SETTINGS'])

db = SQLAlchemy(app)

q = Queue(connection=conn)

from models import *

q = Queue(connection=conn)设置一个Redis连接并基于连接初始化队列。

现在我们将文本处理功能 搬出我们的索引路径并且添加一个叫做count_and_save_words()的函数。当我们从索引路径调用这个函数时需要传入一个URL参数让它接收。

def count_and_save_words(url):

errors = []

try:

r = requests.get(url)

except:

errors.append(

"Unable to get URL. Please make sure it's valid and try again."

)

return {"error": errors}

# text processing

raw = BeautifulSoup(r.text).get_text()

nltk.data.path.append('./nltk_data/')  # set the path

tokens = nltk.word_tokenize(raw)

text = nltk.Text(tokens)

# remove punctuation, count raw words

nonPunct = re.compile('.*[A-Za-z].*')

raw_words = [w for w in text if nonPunct.match(w)]

raw_word_count = Counter(raw_words)

# stop words

no_stop_words = [w for w in raw_words if w.lower() not in stops]

no_stop_words_count = Counter(no_stop_words)

# save the results

try:

result = Result(

url=url,

result_all=raw_word_count,

result_no_stop_words=no_stop_words_count

)

db.session.add(result)

db.session.commit()

return result.id

except:

errors.append("Unable to add item to database.")

return {"error": errors}

@app.route('/', methods=['GET', 'POST'])

def index():

results = {}

if request.method == "POST":

# get url that the person has entered

url = request.form['url']

if 'http://' not in url[:7]:

url = 'http://' + url

job = q.enqueue_call(

func=count_and_save_words, args=(url,), result_ttl=5000

)

print(job.get_id())

return render_template('index.html', results=results)

注意下面的代码:

job = q.enqueue_call(

func=count_and_save_words, args=(url,), result_ttl=5000

)

print(job.get_id())

在这里我们使用我们之前初始化的queue(队列)和命名为enqueue_call() 的函数。这里添加了一个运行将URL作为参数的count_and_save_words() 函数的新job到我们的queue中。result_ttl=5000 这一行的参数告知RQ为job结果等待多久——5000秒。然后我们输出job的id到终端。这个是必要的,用来查看这个job是否处理完成。

让我们来设置一条新的route(路径)

新的Route(路径)

@app.route("/results/", methods=['GET'])

def get_results(job_key):

job = Job.fetch(job_key, connection=conn)

if job.is_finished:

return str(job.result), 200

else:

return "Nay!", 202

让我们来测试这一点。

启动服务,进入http://localhost:5000/,使用URL http://realpython.com,然后从终端抓取这个job的id。接着使用这个id在 ‘/results/’末端。

只要例如:http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197

只要不超过5000秒前检查运行状态,你就可以看到id数值,这是我们将结果添加到数据库时产生的。

# save the results

try:

from models import Result

result = Result(

url=url,

result_all=raw_word_count,

result_no_stop_words=no_stop_words_count

)

db.session.add(result)

db.session.commit()

return result.id

现在,让我们稍微重构route(路径)来用JSON将数据库的结果集返回。

@app.route("/results/", methods=['GET'])

def get_results(job_key):

job = Job.fetch(job_key, connection=conn)

if job.is_finished:

result = Result.query.filter_by(id=job.result).first()

results = sorted(

result.result_no_stop_words.items(),

key=operator.itemgetter(1),

reverse=True

)[:10]

return jsonify(results)

else:

return "Nay!", 202

确保import这个模块:

from flask import jsonify

再次测试这个点。如果一切顺利,你将在浏览器看到以下结果:

{

Course: 5,

Download: 4,

Python: 19,

Real: 11,

courses: 7,

development: 7,

return: 4,

sample: 4,

videos: 5,

web: 12

}

接下来是什么呢?

为了能够让所有整合到一起,我们将会添加AngularJS进行扩展,在下一个部分,创建一个基本轮询方式的服务端——每五秒发送一个请求或者直接在/results/ 末端请求更新。一旦数据是可用的,我们可以添加到DOM。

英文原文:https://realpython.com/blog/python/flask-by-example-implementing-a-redis-task-queue/

译者:Arvin

 类似资料: