当前位置: 首页 > 工具软件 > Pulsar-Python > 使用案例 >

pulsar在Python中的使用

郑燕七
2023-12-01

pulsar是一个消息系统,具有多租户、高性能、低延迟等优势。

客户端支持Java、Python、Go和C++。

在Python中如何创建生产者和消费者呢?

先上代码:

import pulsar
import json

def produce_method():
    client = pulsar.Client('pulsar://localhost:6650')
    producer = client.create_producer('my-topic')

    for i in range(10):
        print(f"Hello-{i}")
        dict_message = {
            "userId": "00001",
            "productName": "pulsar-test"
        }
        
        mes = json.dumps(dict_message).encode('utf-8')
        producer.send(mes )
    client.close()


def consumer_method():
    client = pulsar.Client('pulsar://localhost:6650')
    consumer = client.subscribe('my-topic', 'my-subscription')
    while True:
        msg = consumer.receive()
        try:
            print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
            consumer.acknowledge(msg)
        except:
            print(f"msg is {msg}")

            consumer.negative_acknowledge(msg)
    client.close()

这里使用到了pulsar包,本地可以安装pulsar包,安装方法可以在网上找下,因为我使用的是部署到docker中的pulsar,所以为了保持版本一致,直接将docker中的pulsar包拷贝到了本地的site-packages目录中直接在pycharm中进行引用。

代码可以加个main方法,然后扔到docker中去测试pulsar的生产者和消费者了,本地没有搭建pulsar是不能跑的。

如果需要进行token认证则要进行如下定义

client = Client('pulsar://localhost:6650/'
                authentication=AuthenticationToken('eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY'))

并确保broker.conf中配置如下

# 启用认证和鉴权
authenticationEnabled=true
authorizationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken

# 设置Broker 自身的认证。 Used when the broker connects to other brokers, either in same or other clusters
brokerClientTlsEnabled=true
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
brokerClientAuthenticationParameters={"token":"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.9OHgE9ZUDeBTZs7nSMEFIuGNEX18FLR3qvy8mqxSxXw"}
# Or, alternatively, read token from file
# brokerClientAuthenticationParameters={"file":"///path/to/proxy-token.txt"}
brokerClientTrustCertsFilePath=/path/my-ca/certs/ca.cert.pem

# If this flag is set then the broker authenticates the original Auth data
# else it just accepts the originalPrincipal and authorizes it (if required).
authenticateOriginalAuthData=true

# If using secret key (Note: key files must be DER-encoded)
tokenSecretKey=file:///path/to/secret.key
# The key can also be passed inline:
# tokenSecretKey=data:;base64,FLFyW0oLJ2Fi22KKCm21J18mbAdztfSHN/lAT5ucEKU=

# If using public/private (Note: key files must be DER-encoded)
# tokenPublicKey=file:///path/to/public.key

proxy.conf中配置如下

# For clients connecting to the proxy
authenticationEnabled=true
authorizationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
tokenSecretKey=file:///path/to/secret.key

# For the proxy to connect to brokers
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
brokerClientAuthenticationParameters={"token":"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.9OHgE9ZUDeBTZs7nSMEFIuGNEX18FLR3qvy8mqxSxXw"}
# Or, alternatively, read token from file
# brokerClientAuthenticationParameters={"file":"///path/to/proxy-token.txt"}

# Whether client authorization credentials are forwared to the broker for re-authorization.
# Authentication must be enabled via authenticationEnabled=true for this to take effect.
forwardAuthorizationCredentials=true

才能保证token认证开关时打开的,否在关闭的情况下,传不传token都能收到生产的消息。

 类似资料: