Rocketmq操作方式-python

席言
2023-12-01

#! /usr/bin/python
#encoding:utf-8
from rocketmq.client import Producer, Message
import json
import sys
import time
reload(sys)
sys.setdefaultencoding( "utf-8" )
'''
rocketmq写入消息
'''
def send():

 producer = Producer('test')
 producer.set_namesrv_addr('127.0.0.1:9876')  #rocketmq队列接口地址(服务器ip:port)
 producer.start()

 msg_body = {"id":"001","name":"test_mq","message":"abcdefg"}
 ss = json.dumps(msg_body).encode('utf-8')

 msg = Message('test')   #topic名称
 msg.set_keys('ce')
 msg.set_tags('ce')
 msg.set_body(ss)      #message body

 retmq = producer.send_sync(msg)
 print(retmq.status, retmq.msg_id, retmq.offset)
 producer.shutdown()


if __name__ =="__main__":
    print "开始时间:" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    for i in range(1000):
     send()
    print "结束时间:" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

 

#消费方式PullConsumer(全部消费)(可重复消费)
from rocketmq.client import PullConsumer
import json
consumer = PullConsumer('PID-001')
consumer.set_namesrv_addr('ip:port')
consumer.start()
for msg in consumer.pull('test'):
    print(msg.id, msg.body)
consumer.shutdown()
#! /usr/bin/python
#encoding:utf-8
from rocketmq.client import Producer, Message
from rocketmq.client import PullConsumer,PushConsumer

import json
import sys
import time
reload(sys)
sys.setdefaultencoding( "utf-8" )
#消费方式PushConsumer不可重复消费
def callback(msg):
    print(msg)
print "开始时间:" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
consumer = PushConsumer('test')
consumer.set_namesrv_addr('127.0.0.1:9876')

consumer.subscribe("test",callback)
consumer.start()

while True:
  time.sleep(30)

consumer.shutdown() 
print "结束时间:" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

 

 类似资料: