pulsar是一个云原生的分布式消息和流平台。
本文主要介绍使用pulsar-thread包进行生产和消费。
一个连接pulsar消息队列的包,优点是:支持多线程生产和消费。
pip install pulsar-thread
import pulsar_thread as pt
client = pt.client('pulsar://0.0.0.0:6655')
#请将 0.0.0.0:6655 换成你的pulsar地址
import json
import pulsar_thread as pt
# 1. 连接client
client = pt.client('pulsar://0.0.0.0:6655')
# 模拟要发送的数据
data = {'name':'jack', 'age':25, 'have': ['item1', 'item2']}
data2 = {'name':'rose', 'age':28, 'have': ['item1', 'item2']}
data3 = {'name':'joe', 'age':28, 'have': ['item1', 'item2']}
data4 = {'name':'mark', 'age':28, 'have': ['item1', 'item2']}
# 2. 将要发送的数据和topic组合成字典
# {'topic_1': msg_1, ... , 'topic_n': msg_n}
data_dict = {'test1':[json.dumps(data), json.dumps(data2)],
'test2':[json.dumps(data3), json.dumps(data4)]}
# 3. 创建生产者
producer = client.create_producer()
# 4. 发送消息
# 可选 4.1 同步发送send 或 4.2 异步发送send_async
# 4.1 同步发送
# 可选 4.1.1 默认模式 或 4.1.2 自定义模式
# 4.1.1 默认模式
# 默认参数:thread_count=5, schema=pulsar.schema.StringSchema()
# 默认多线程最大数thread_count为5个, schema是以StringSchema()字符串模式
result = producer.send(data_dict)
# 4.1.2 自定义模式
# schema参数设置规范详见pulsar-client的使用
result = producer.send_async(data_dict,
thread_count=10,
schema=pulsar.schema.StringSchema())
# 4.2 异步发送
# 可选 4.2.1 默认模式 或 4.2.2 自定义模式
# 4.2.1 默认模式
# 默认参数:callback=None, thread_count=5, schema=pulsar.schema.StringSchema()
# 默认回调函数callback为None, 多线程最大数thread_count为5个, schema是以StringSchema()字符串模式
result = producer.send_async(data_dict)
# 4.2.2 自定义模式(callback, schema参数设置规范详见pulsar-client的使用)
result = producer.send_async(data_dict,
callback=None,
thread_count=10,
schema=pulsar.schema.StringSchema())
import json
import pulsar_thread as pt
# 业务程序,处理消息队列发来的消息
# msg 是 接收的消息队列传来的消息
def deal_msg(msg):
print(msg.value())
import time
time.sleep(5)
print(json.loads(msg.data()))
# 1. 连接
client = pt.client('pulsar://0.0.0.0:6655')
# 2. 创建consumer
# 可从多个 topic 里接收数据
# 默认接收的 schema=pulsar.schema.StringSchema()
# 格式:consumer = client.create_consumer(['topic_1', ......, 'topic_n'], 消费者的名字,
# schema=pulsar.schema.StringSchema())
consumer = client.create_consumer(['test1', 'test2'], 'my-subscription')
# 3. 接收数据并用业务程序(例:deal_msg)处理
# 可选 3.1 单线程处理 consumer.receive() 或
# 3.2 多线程处理 consumer.receive_thread()
# 3.1 单线程处理
# 可选 3.1.1 默认模式 或 3.1.2 自定义模式
# 阻塞模式,消费一个,业务程序处理一个,业务程序处理完成,再消费下一个
# 3.1.1 默认模式
# 默认参数:timeout_millis=None, logger=None
# 默认 订阅超时限制 timeout_millis(慎用) 为 None, 单位上ms
# 默认 日志收集器logger 为 None
consumer.receive(deal_msg)
# 3.1.2 自定义模式
import logging,sys
def LogSet():
# 获取logger实例,如果参数为空则返回root logger
logger = logging.getLogger("test.log")
# 指定logger输出格式
formatter = logging.Formatter('%(asctime)s %(pathname)s %(lineno)d %(levelname)-8s: %(message)s')
# 文件日志
file_handler = logging.FileHandler("./test.log")
file_handler.setFormatter(formatter) # 可以通过setFormatter指定输出格式
# 控制台日志
console_handler = logging.StreamHandler(sys.stdout)
console_handler.formatter = formatter # 也可以直接给formatter赋值
# 为logger添加的日志处理器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# 指定日志的最低输出级别,默认为WARN级别
logger.setLevel(logging.DEBUG)
# 移除一些日志处理器
return logger, file_handler
# 获取 logger
logger,_=LogSet()
consumer.receive(deal_msg, timeout_millis=5000, logger=logger)
# 3.2 多线程处理
# 可选 3.2.1 默认模式 或 3.2.2 自定义模式
# 可以使用多线程进行并发消费,处理业务数据,提高效率
# 3.2.1 默认模式
# 默认参数:thread_count=5, timeout_millis=None, logger=None
# 默认 最大线程数thread_count为5个
# 默认 订阅超时限制 timeout_millis(慎用) 为 None, 单位上ms
# 默认 日志收集器logger 为 None
consumer.receive_thread(deal_msg)
# 3.1.2 自定义模式
# 例 1
consumer.receive_thread(deal_msg, 2)
# 例 2
consumer.receive_thread(deal_msg, 10, timeout_millis=5000, logger=logger)
pulsar_thread的create_producer和create_consumer的使用
import json
import pulsar_thread as pt
import pulsar
# 使用 pulsar 连接
client = pulsar.Client('pulsar://0.0.0.0:6655')
data = {'name':'jack', 'age':25, 'have': ['item1', 'item2']}
data2 = {'name':'rose', 'age':28, 'have': ['item1', 'item2']}
data3 = {'name':'joe', 'age':28, 'have': ['item1', 'item2']}
data4 = {'name':'mark', 'age':28, 'have': ['item1', 'item2']}
data_dict = {'test1':[json.dumps(data), json.dumps(data2)],
'test2':[json.dumps(data3), json.dumps(data4)]}
# 使用 pulsar_thread 创建生产者
producer = pt.create_producer(client)
result = producer.send(data_dict)
import json
import pulsar_thread as pt
import pulsar
def deal_msg(msg):
print(msg.value())
import time
time.sleep(5)
print(json.loads(msg.data()))
# 使用 pulsar 连接
client = pulsar.Client('pulsar://0.0.0.0:6655')
# 使用 pulsar_thread 创建消费者
consumer = pt.create_consumer(client, ['test1', 'test2'], 'my-subscription')
consumer.receive_thread(deal_msg, 2)
schema | note |
---|---|
BytesSchema | Get the raw payload as a bytes object. No serialization/deserialization are performed. This is the default schema mode |
StringSchema | Encode/decode payload as a UTF-8 string. Uses str objects |
JsonSchema | Require record definition. Serializes the record into standard JSON payload |
AvroSchema | Require record definition. Serializes in AVRO format |
若想拓展使用schema,请移步至pulsar-client文档,阅读使用schema。
pulsar-client文档链接:https://pulsar.apache.org/docs/en/client-libraries-python/