python使用rocketmq-client-python实现自建RocketMQ的数据消费及推送

怀浩大
2023-12-01

环境

1、安装rocketmq-client-python

pip install rocketmq-client-python

2、安装librocketmq

  • centos
  wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm
  sudo rpm -ivh rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm
  find / -name librocketmq.so
  ln -s /*****/librocketmq.so /usr/lib
  • debian
wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0.amd64.deb
sudo dpkg -i rocketmq-client-cpp-2.0.0.amd64.deb

代码

produce

from rocketmq.client import Producer, Message

producer = Producer('PID-XXX')
producer.set_name_server_address('127.0.0.1:9876')
producer.start()

msg = Message('YOUR-TOPIC')
msg.set_keys('XXX')
msg.set_tags('XXX')
msg.set_body('XXXX')
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()

PushConsumer

import time
from rocketmq.client import PushConsumer, ConsumeStatus

def callback(msg):
    print(msg.id, msg.body)
    return ConsumeStatus.CONSUME_SUCCESS

consumer = PushConsumer('CID_XXX')
consumer.set_name_server_address('127.0.0.1:9876')
consumer.subscribe('YOUR-TOPIC', callback)
consumer.start()

while True:
    time.sleep(3600)

consumer.shutdown()
 类似资料: