工作原理 :
1. client生成任务后,
thank you-- 后面加上数字
2. worker接收到任务后,接收到数据,处理后(这里是在收到的数据后面加上.rar)向gearman服务的"test_name_mmg_ret"队列发送处理过的数据
3. 提交任务的人就可以从"test_name_mmg_ret"得到处理后的结果了...
运行worker,因为有的系统在发现没有woker工作时会发出报警
import gearman
import time
from gearman.constants import JOB_UNKNOWN
def check_request_status(job_request):
if job_request.complete:
print "Job %s finished! Result: %s - %s" % (job_request.gearman_job.unique, job_request.state, job_request.result)
elif job_request.timed_out:
print "Job %s timed out!" % job_request.gearman_job.unique
elif job_request.state == JOB_UNKNOWN:
print "Job %s connection failed!" % job_request.gearman_job.unique
#add the worker
gm_worker = gearman.GearmanWorker(['xxxxxt:xxx0', 'xxxxxt:xxx'])
# See gearman/job.py to see attributes on the GearmanJob
# Send back a reversed version of the 'data' string
def task_listener_reverse(gearman_worker, gearman_job):
orig_data = gearman_job.data
ret_data = ""
ret_data = "response:" + orig_data
ret_data = ret_data + ".rar "
#response
##add tesk to response_name_mmg
gm_client = gearman.GearmanClient(['xxxxxxx:xxx', 'xxxxxxt:xxx'])
list_of_jobs = []
data_for_process = ""
for i in range(5):
list_of_jobs.append(dict(task="test_name_mmg_ret", data=ret_data))
submitted_requests = gm_client.submit_multiple_jobs(list_of_jobs, background=False, wait_until_complete=False)
time.sleep(1.0)
completed_requests = gm_client.wait_until_jobs_completed(submitted_requests, poll_timeout= 5.0)
for completed_job_request in completed_requests:
check_request_status(completed_job_request)
print ret_data
return ret_data
gm_worker.register_task('test_name_mmg', task_listener_reverse)
# Enter our work loop and call gm_worker.after_poll() after each time we timeout/see socket activity
gm_worker.work()
再运行client.py,增加任务
import gearman
from gearman.constants import JOB_UNKNOWN
import time
def check_request_status(job_request):
if job_request.complete:
print "Job %s finished! Result: %s - %s" % (job_request.gearman_job.unique, job_request.state, job_request.result)
elif job_request.timed_out:
print "Job %s timed out!" % job_request.gearman_job.unique
elif job_request.state == JOB_UNKNOWN:
print "Job %s connection failed!" % job_request.gearman_job.unique
gm_client = gearman.GearmanClient(['xxxxx:xxxx', 'xxxxxxx:xxx'])
list_of_jobs = []
data_for_process = ""
for i in range(1000):
data_for_process = "thank you--:%d" % i
list_of_jobs.append(dict(task="test_name_mmg", data=data_for_process))
submitted_requests = gm_client.submit_multiple_jobs(list_of_jobs, background=False, wait_until_complete=False)
time.sleep(1.0)
completed_requests = gm_client.wait_until_jobs_completed(submitted_requests, poll_timeout= 5.0)
for completed_job_request in completed_requests:
check_request_status(completed_job_request)
gearman中的几个对象:
1. Request对象
.result
.exception
.state
.timed_out
2. job对象
.Connection
.handle
.task
.unique
.data