当前位置: 首页 > 工具软件 > Pulsar-Python > 使用案例 >

python使用pulsar-thread生产和消费

穆德海
2023-12-01

前言

pulsar是一个云原生的分布式消息和流平台。
本文主要介绍使用pulsar-thread包进行生产和消费。

一、pulsar-thread是什么?

一个连接pulsar消息队列的包,优点是:支持多线程生产和消费。

  1. 本包是以pulsar-client为基础创建的
  2. pulsar-client使用链接:https://pulsar.apache.org/docs/en/client-libraries-python/
  3. 默认 schema=pulsar.schema.StringSchema()
  4. 若想使用其他的schema, 使用方法与pulsar-client相同, 详情可看上面pulsar-client使用链接
  5. 默认的多线程最大数thread_count为5个

二、安装pulsar-thread

pip install pulsar-thread

三、使用说明

1. 连接 (client)

import pulsar_thread as pt

client = pt.client('pulsar://0.0.0.0:6655')

#请将 0.0.0.0:6655 换成你的pulsar地址

2. 生产者(producer)

import json
import pulsar_thread as pt

# 1. 连接client
client = pt.client('pulsar://0.0.0.0:6655')
   
# 模拟要发送的数据
data = {'name':'jack', 'age':25, 'have': ['item1', 'item2']}
data2 = {'name':'rose', 'age':28, 'have': ['item1', 'item2']}
data3 = {'name':'joe', 'age':28, 'have': ['item1', 'item2']}
data4 = {'name':'mark', 'age':28, 'have': ['item1', 'item2']}

# 2. 将要发送的数据和topic组合成字典
# {'topic_1': msg_1, ... , 'topic_n': msg_n}

data_dict = {'test1':[json.dumps(data), json.dumps(data2)], 
             'test2':[json.dumps(data3), json.dumps(data4)]}

# 3. 创建生产者
producer = client.create_producer()


# 4. 发送消息 
# 可选 4.1 同步发送send 或 4.2 异步发送send_async

# 4.1 同步发送
# 可选 4.1.1 默认模式 或 4.1.2 自定义模式

# 4.1.1 默认模式
# 默认参数:thread_count=5, schema=pulsar.schema.StringSchema()
# 默认多线程最大数thread_count为5个, schema是以StringSchema()字符串模式

result = producer.send(data_dict)

# 4.1.2 自定义模式
# schema参数设置规范详见pulsar-client的使用

result = producer.send_async(data_dict,
                             thread_count=10,
                             schema=pulsar.schema.StringSchema()) 

# 4.2 异步发送
# 可选 4.2.1 默认模式 或 4.2.2 自定义模式

# 4.2.1 默认模式
# 默认参数:callback=None, thread_count=5, schema=pulsar.schema.StringSchema()
# 默认回调函数callback为None, 多线程最大数thread_count为5个, schema是以StringSchema()字符串模式

result = producer.send_async(data_dict)

# 4.2.2 自定义模式(callback, schema参数设置规范详见pulsar-client的使用)

result = producer.send_async(data_dict, 
                             callback=None,
                             thread_count=10,
                             schema=pulsar.schema.StringSchema())

3. 消费者(consumer)

import json
import pulsar_thread as pt

# 业务程序,处理消息队列发来的消息
# msg 是 接收的消息队列传来的消息
def deal_msg(msg):
    print(msg.value())
    import time
    time.sleep(5)
    print(json.loads(msg.data()))

# 1. 连接
client = pt.client('pulsar://0.0.0.0:6655')

# 2. 创建consumer
# 可从多个 topic 里接收数据
# 默认接收的 schema=pulsar.schema.StringSchema()
# 格式:consumer = client.create_consumer(['topic_1', ......, 'topic_n'], 消费者的名字, 
#                                         schema=pulsar.schema.StringSchema())


consumer = client.create_consumer(['test1', 'test2'], 'my-subscription')


# 3. 接收数据并用业务程序(例:deal_msg)处理
# 可选 3.1  单线程处理 consumer.receive() 或 
# 3.2 多线程处理 consumer.receive_thread()


# 3.1  单线程处理
# 可选 3.1.1 默认模式 或 3.1.2 自定义模式
# 阻塞模式,消费一个,业务程序处理一个,业务程序处理完成,再消费下一个

# 3.1.1 默认模式
# 默认参数:timeout_millis=None, logger=None
# 默认 订阅超时限制 timeout_millis(慎用) 为 None, 单位上ms
# 默认 日志收集器logger 为 None


consumer.receive(deal_msg)


# 3.1.2 自定义模式

import logging,sys

def LogSet():
    # 获取logger实例,如果参数为空则返回root logger
    logger = logging.getLogger("test.log")
    # 指定logger输出格式
    formatter = logging.Formatter('%(asctime)s %(pathname)s %(lineno)d %(levelname)-8s: %(message)s')
    # 文件日志
    file_handler = logging.FileHandler("./test.log")
    file_handler.setFormatter(formatter)  # 可以通过setFormatter指定输出格式
    # 控制台日志
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.formatter = formatter  # 也可以直接给formatter赋值
    # 为logger添加的日志处理器
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    # 指定日志的最低输出级别,默认为WARN级别
    logger.setLevel(logging.DEBUG)
    # 移除一些日志处理器
    return logger, file_handler

# 获取 logger
logger,_=LogSet()

consumer.receive(deal_msg, timeout_millis=5000, logger=logger)

# 3.2 多线程处理
# 可选 3.2.1 默认模式 或 3.2.2 自定义模式
# 可以使用多线程进行并发消费,处理业务数据,提高效率


# 3.2.1 默认模式
# 默认参数:thread_count=5, timeout_millis=None, logger=None
# 默认 最大线程数thread_count为5个
# 默认 订阅超时限制 timeout_millis(慎用) 为 None, 单位上ms
# 默认 日志收集器logger 为 None

consumer.receive_thread(deal_msg)

# 3.1.2 自定义模式

# 例 1
consumer.receive_thread(deal_msg, 2)

# 例 2
consumer.receive_thread(deal_msg, 10, timeout_millis=5000, logger=logger)

四、当使用 pulsar-client(pulsar.Client)连接时

pulsar_thread的create_producercreate_consumer的使用

1. 生产者(producer)

import json
import pulsar_thread as pt
import pulsar

# 使用 pulsar 连接
client = pulsar.Client('pulsar://0.0.0.0:6655')

data = {'name':'jack', 'age':25, 'have': ['item1', 'item2']}
data2 = {'name':'rose', 'age':28, 'have': ['item1', 'item2']}
data3 = {'name':'joe', 'age':28, 'have': ['item1', 'item2']}
data4 = {'name':'mark', 'age':28, 'have': ['item1', 'item2']}

data_dict = {'test1':[json.dumps(data), json.dumps(data2)], 
             'test2':[json.dumps(data3), json.dumps(data4)]}

# 使用 pulsar_thread 创建生产者
producer = pt.create_producer(client)

result = producer.send(data_dict)

2. 消费者(consumer)

import json
import pulsar_thread as pt
import pulsar


def deal_msg(msg):
    print(msg.value())
    import time
    time.sleep(5)
    print(json.loads(msg.data()))

# 使用 pulsar 连接
client = pulsar.Client('pulsar://0.0.0.0:6655')

# 使用 pulsar_thread 创建消费者
consumer = pt.create_consumer(client, ['test1', 'test2'], 'my-subscription')

consumer.receive_thread(deal_msg, 2)

五、关于schema模式

1. schema支持的模式

schemanote
BytesSchemaGet the raw payload as a bytes object. No serialization/deserialization are performed. This is the default schema mode
StringSchemaEncode/decode payload as a UTF-8 string. Uses str objects
JsonSchemaRequire record definition. Serializes the record into standard JSON payload
AvroSchemaRequire record definition. Serializes in AVRO format

2. schema参数用法和pulsar-client相同

若想拓展使用schema,请移步至pulsar-client文档,阅读使用schema。
pulsar-client文档链接:https://pulsar.apache.org/docs/en/client-libraries-python/

 类似资料: