系统 Mac
Python 3.7.5
rocketmq 0.4.4 (Python模块)
rocketmq-client-python 2.0.0 (Python模块)
系统 CentOS Linux release 7.4.1708
Python 3.7.3
rocketmq 0.4.4 (Python模块)
rocketmq-client-python 2.0.0 (Python模块)
import json
from rocketmq.client import Producer, Message
#BDP_Process_GroupID:groupid
#max_message_size:消息字节长度限制,此处为1M,如果消息过大,可以修改此处参数
producer = Producer('BDP_Process',max_message_size=1024*1024)
producer.set_namesrv_addr('xxxx:9876')#ip:prot
#producer.set_name_server_address('xxxx:9876')#linux和mac代码不通,此处为mac系统书写格式
#producer.set_namesrv_addr('xxxx:9876,xxxx:9876,xxxx:9876')#集群模式可以这样写
producer.start()
msg_body = {
"id":212331,
"orderId":"320106004011202105318032",
"storeId":"X12N",
"riqi":"2020-12-12 22:12:32",
"processTime":"1595311611000"
}
#将字典封装成消息并修改消息编码,此处不修改编码可能会中文乱码
data = json.dumps(msg_body,ensure_ascii=False).encode('utf-8')
msg = Message('YOUR-TOPIC')#消息主题
msg.set_keys(msg_body.get('orderId'))#消息TAG,用于消息过滤对消息的整体分类
msg.set_tags('BPD_Process')#Message索引键
msg.set_body(data)#消息主体
ret = producer.send_sync(msg)#发送异步消息
print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()
import time
from rocketmq.client import PushConsumer, ConsumeStatus
def callback(msg):
print(msg.id, msg.body)
return ConsumeStatus.CONSUME_SUCCESS
#BDP_Process_GroupID:groupid
consumer = PushConsumer('BDP_Process_GroupID')
consumer.set_name_server_address('xxxx:9876')#ip:prot
consumer.subscribe('YOUR-TOPIC', callback)#Topic
consumer.start()
while True:
time.sleep(3600)
consumer.shutdown()
import re
import sys
import time
import json
import logging
from pyhive import hive,presto
#qry为查询hive的SQL
def Hive(self,qry):
try:
connect = hive.Connection(
host=hive_host,
port=hive_port,
database=hive_database,
username=hive_username,
password=hive_password,
auth='LDAP',#网络通信的方式
configuration={"mapreduce.job.queuename":"root.EXTRACT"}#队列
)
cursor = connect.cursor()
cursor.execute(qry)
fetchall = cursor.fetchall()
result = [list(i) for i in fetchall]
return result
except Exception as e:
if (str(e) == "No result set"):
logger.info("该sql无返回值:\t%s" % qry)
logger.info(e)
else:
logger.warning("Hive Connect ERROR\n")
logger.info("该sql有误:%s" % qry)
logger.info(e)
博客到此就告一段落了,如果代码或者书写哪里有问题,大家可以评论留言,博主看到后会修改代码,也谢谢各位啦~