我一直在尝试遵循Celery的“ Celery第一步与后续步骤”指南。我的设置是Windows 7 64位,Anaconda Python
2.7(32位),已安装的Erlang 32位二进制文件,RabbitMQ服务器和celery(带有pip install celery
)。
按照指南,我创建了一个带有 init .py,tasks.py和celery.py的proj文件夹。我的 init
.py是空的。这是celery.py:
from __future__ import absolute_import
from celery import Celery
app = Celery('proj',
broker='amqp://',
backend='amqp://',
include=['proj.tasks'])
#Optional configuration, see the application user guide
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'], # Ignore other content
CELERY_RESULT_SERIALIZER='json',
)
if __name__ == '__main__':
app.start()
这是task.py:
from __future__ import absolute_import
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
首先,我了解我应该确保RabbitMQ服务正在运行。任务管理器的“服务”选项卡显示RabbitMQ确实正在运行。要启动celery服务器并加载我的任务,请打开cmd.exe,导航至proj
(我称为celery_demo的文件夹)的父目录,然后运行以下命令:
celery -A proj.celery worker -l debug
给出以下输出:
C:\Users\bnables\Documents\Python\celery_demo>celery -A proj.celery worker -l debug
[2014-08-25 17:00:09,308: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2014-08-25 17:00:09,313: DEBUG/MainProcess] | Worker: Building graph...
[2014-08-25 17:00:09,315: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Queues (intra), Pool, Autoreloader, Autoscaler, StateDB, Beat, Con
sumer}
[2014-08-25 17:00:09,322: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2014-08-25 17:00:09,322: DEBUG/MainProcess] | Consumer: Building graph...
[2014-08-25 17:00:09,332: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Heart, Mingle, Gossip, Tasks, Control, Agent, event loop
}
-------------- celery@MSSLW40013047 v3.1.13 (Cipater)
---- **** -----
--- * *** * -- Windows-7-6.1.7601-SP1
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: proj:0x3290370
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: amqp
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. celery.backend_cleanup
. celery.chain
. celery.chord
. celery.chord_unlock
. celery.chunks
. celery.group
. celery.map
. celery.starmap
. proj.tasks.add
. proj.tasks.mul
. proj.tasks.xsum
[2014-08-25 17:00:09,345: DEBUG/MainProcess] | Worker: Starting Pool
[2014-08-25 17:00:09,417: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:09,420: DEBUG/MainProcess] | Worker: Starting Consumer
[2014-08-25 17:00:09,421: DEBUG/MainProcess] | Consumer: Starting Connection
[2014-08-25 17:00:09,457: DEBUG/MainProcess] Start from server, version: 0.9, properties: {u'information': u'Licensed under the MPL. See http://www.r
abbitmq.com/', u'product': u'RabbitMQ', u'copyright': u'Copyright (C) 2007-2014 GoPivotal, Inc.', u'capabilities': {u'exchange_exchange_bindings': Tru
e, u'connection.blocked': True, u'authentication_failure_close': True, u'basic.nack': True, u'per_consumer_qos': True, u'consumer_priorities': True, u
'consumer_cancel_notify': True, u'publisher_confirms': True}, u'cluster_name': u'rabbit@MSSLW40013047.ndc.nasa.gov', u'platform': u'Erlang/OTP', u'ver
sion': u'3.3.5'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']
[2014-08-25 17:00:09,460: DEBUG/MainProcess] Open OK!
[2014-08-25 17:00:09,460: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2014-08-25 17:00:09,461: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:09,461: DEBUG/MainProcess] | Consumer: Starting Events
[2014-08-25 17:00:09,516: DEBUG/MainProcess] Start from server, version: 0.9, properties: {u'information': u'Licensed under the MPL. See http://www.r
abbitmq.com/', u'product': u'RabbitMQ', u'copyright': u'Copyright (C) 2007-2014 GoPivotal, Inc.', u'capabilities': {u'exchange_exchange_bindings': Tru
e, u'connection.blocked': True, u'authentication_failure_close': True, u'basic.nack': True, u'per_consumer_qos': True, u'consumer_priorities': True, u
'consumer_cancel_notify': True, u'publisher_confirms': True}, u'cluster_name': u'rabbit@MSSLW40013047.ndc.nasa.gov', u'platform': u'Erlang/OTP', u'ver
sion': u'3.3.5'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']
[2014-08-25 17:00:09,519: DEBUG/MainProcess] Open OK!
[2014-08-25 17:00:09,520: DEBUG/MainProcess] using channel_id: 1
[2014-08-25 17:00:09,522: DEBUG/MainProcess] Channel open
[2014-08-25 17:00:09,523: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:09,523: DEBUG/MainProcess] | Consumer: Starting Heart
[2014-08-25 17:00:09,530: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:09,533: DEBUG/MainProcess] | Consumer: Starting Mingle
[2014-08-25 17:00:09,538: INFO/MainProcess] mingle: searching for neighbors
[2014-08-25 17:00:09,539: DEBUG/MainProcess] using channel_id: 1
[2014-08-25 17:00:09,540: DEBUG/MainProcess] Channel open
[2014-08-25 17:00:10,552: INFO/MainProcess] mingle: all alone
[2014-08-25 17:00:10,552: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:10,552: DEBUG/MainProcess] | Consumer: Starting Gossip
[2014-08-25 17:00:10,553: DEBUG/MainProcess] using channel_id: 2
[2014-08-25 17:00:10,555: DEBUG/MainProcess] Channel open
[2014-08-25 17:00:10,559: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:10,559: DEBUG/MainProcess] | Consumer: Starting Tasks
[2014-08-25 17:00:10,566: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:10,566: DEBUG/MainProcess] | Consumer: Starting Control
[2014-08-25 17:00:10,568: DEBUG/MainProcess] using channel_id: 3
[2014-08-25 17:00:10,569: DEBUG/MainProcess] Channel open
[2014-08-25 17:00:10,572: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:10,573: DEBUG/MainProcess] | Consumer: Starting event loop
[2014-08-25 17:00:10,575: WARNING/MainProcess] celery@MSSLW40013047 ready.
[2014-08-25 17:00:10,575: DEBUG/MainProcess] basic.qos: prefetch_count->32
该-A
告诉芹菜在哪里可以找到我的芹菜应用实例。使用justproj
也可以,但是既然可以搜索proj.celery
,那么在这里更明确。
worker
是给celery的命令,它告诉celery产生一些工人来执行从proj.celery加载的任务。最后,-l debug
告诉芹菜将日志级别设置为要调试,以便获得大量信息。通常是这样-l info
。
为了测试我的任务服务器,我打开一个IPython Qt Console并导航到该celery_demo
文件夹(包含proj
)。然后输入from proj.tasks import add
。如预期的那样,简单地调用add(1, 2)
返回3
而不使用服务器。当我调用add.delay时,会发生以下情况:
add.delay(2, 3)
哪个返回:
<AsyncResult: 42123ff3-e94e-4673-808a-ec6c847679d8>
在我的cmd.exe窗口中,我得到:
[2014-08-25 17:20:38,109: INFO/MainProcess] Received task: proj.tasks.add[42123ff3-e94e-4673-808a-ec6c847679d8]
[2014-08-25 17:20:38,109: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x033CD6F0> (args:(u'proj.tasks.add', u'42123ff3-e94e-4673-
808a-ec6c847679d8', [2, 3], {}, {u'timelimit': [None, None], u'utc': True, u'is_eager': False, u'chord': None, u'group': None, u'args': [2, 3], u'retr
ies': 0, u'delivery_info': {u'priority': None, u'redelivered': False, u'routing_key': u'celery', u'exchange': u'celery'}, u'expires': None, u'hostname
': 'celery@MSSLW40013047', u'task': u'proj.tasks.add', u'callbacks': None, u'correlation_id': u'42123ff3-e94e-4673-808a-ec6c847679d8', u'errbacks': No
ne, u'reply_to': u'70ed001d-193c-319c-9447-8d77c231dc10', u'taskset': None, u'kwargs': {}, u'eta': None, u'id': u'42123ff3-e94e-4673-808a-ec6c847679d8
', u'headers': {}}) kwargs:{})
[2014-08-25 17:20:38,124: DEBUG/MainProcess] Task accepted: proj.tasks.add[42123ff3-e94e-4673-808a-ec6c847679d8] pid:4052
[2014-08-25 17:20:38,125: INFO/MainProcess] Task proj.tasks.add[42123ff3-e94e-4673-808a-ec6c847679d8] succeeded in 0.0130000114441s: 5
因此,如最后一行所示,正在计算结果5。接下来,我要存储AsyncResult对象并检查其状态并获取结果值:
result = add.delay(3, 4)
但是result.state和result.get(timeout = 1)不能按预期工作:
In: result.state
Out: 'Pending'
In: result.status
Out: 'Pending'
In: result.get(timeout=1)
---------------------------------------------------------------------------
TimeoutError Traceback (most recent call last)
<ipython-input-17-375f2d3530cb> in <module>()
----> 1 result.get(timeout=1)
C:\Anaconda32\lib\site-packages\celery\result.pyc in get(self, timeout, propagate, interval, no_ack, follow_parents)
167 interval=interval,
168 on_interval=on_interval,
--> 169 no_ack=no_ack,
170 )
171 finally:
C:\Anaconda32\lib\site-packages\celery\backends\amqp.pyc in wait_for(self, task_id, timeout, cache, propagate, no_ack, on_interval, READY_STATES, PROPAGATE_STATES, **kwargs)
155 on_interval=on_interval)
156 except socket.timeout:
--> 157 raise TimeoutError('The operation timed out.')
158
159 if meta['status'] in PROPAGATE_STATES and propagate:
TimeoutError: The operation timed out.
如果result.state或result.status的预期结果为“
SUCCESSFUL”,则的结果result.get(timeout=1)
应为5
。
似乎结果存储或消息传递无法正常工作。本教程只说backend
在对Celery()
或CELERY_RESULT_BACKEND
设置的调用中设置命名参数。在“入门”中,它具有backend='amqp'
“下一步”的位置backend='amqp://'
,该示例也在github示例中使用。
我已经在这个问题上摸索了一段时间,对如何进行工作一无所知。关于下一步尝试的任何想法?谢谢!
也随机停止工作。完全一样的问题-永远待定。重新安装Erlang或RabbitMQ没有帮助。
我也在Debian Linux 7 x86上进行了测试,这里工作正常,没有问题。
另外:https :
//github.com/celery/celery/issues/2146
这可能是Windows相关的问题,设置工作程序标志 --pool = solo 为我修复了ATM
我试图构建一个Spring Boot REST API,从MySQL DB中获取数据。这是我在Spring Boot REST API应用程序的存储库代码中定义的JPA查询方法: 这是我要查询的表的详细信息:
问题内容: 我有两个表,如果有关系的话,在PostgreSQL中有一对多的关系。我需要加入它们,以便对于每个“一个”,我只能从“许多”表中获得单个结果。不仅如此,我还需要从“许多”表中挑选出特定的结果。 TABLE_B中的SORT列实际上是CODE2中的最后一个字符。CODE2可以以1-9结尾,但是3最重要,那么5、7、4、2、1、0、6、8、9因此3-> 1,5-> 2,7-> 3等等向前。 我
问题内容: 我将从中将储存在数据库中,并将其与任务影响的项目相关联。这使我可以执行查询以检索与特定项目有关的所有任务。 因此,从数据库中检索到后,我该如何检索有关任务状态/结果/等的信息? 问题答案: 从芹菜常见问题解答:
我正在设计一个应用程序,它使用地图并要求用户输入目的地。我在xml中添加了PlaceAutoCompleteFragment
问题内容: 我在我的搜索框中使用了select2。我从URL获取结果,但是无法从中选择一个选项。我想使用“ product.productName”作为选择后要显示的文本。有什么我错过的东西或我犯的任何错误。我包括了select2.css和select2.min.js,jquery.js 这是我的resut_object 问题答案: 您缺少结果数据的id属性。如果没有,则使选项“不可选择”。 例:
问题内容: 我将向celery队列添加多个任务并等待结果。我有各种各样的想法,我将如何利用某种形式的共享存储(memcached,redis,db等)来实现这一目标,但是,我本以为Celery可以自动处理该问题,但我无法在线找到任何资源。 代码示例 问题答案: 对于Celery > = 3.0,使用taskset被弃用赞成组。 在后台启动组: 等待: